kirr has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmocom-bb/+/40076?usp=email )
Change subject: trx_toolkit/data_msg: Optimize bit conversions ......................................................................
trx_toolkit/data_msg: Optimize bit conversions
- implement convertions in C (via pyx) directly - add in-place versions - use those in-place versions as needed
Change-Id: Ib8237a277f97292cd99dfda44972372d370f0ada --- M src/target/trx_toolkit/data_msg.pxd M src/target/trx_toolkit/data_msg.pyx M src/target/trx_toolkit/test_data_msg.py 3 files changed, 97 insertions(+), 25 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/76/40076/1
diff --git a/src/target/trx_toolkit/data_msg.pxd b/src/target/trx_toolkit/data_msg.pxd index cbdca4b..2823b82 100644 --- a/src/target/trx_toolkit/data_msg.pxd +++ b/src/target/trx_toolkit/data_msg.pxd @@ -3,7 +3,7 @@
from cython cimport final from cpython cimport array -from libc.stdint cimport uint8_t +from libc.stdint cimport int8_t, uint8_t
cdef class Modulation: cdef int coding # Coding in TRXD header @@ -36,6 +36,24 @@ cdef parse_burst(self, burst) cdef _validate(self)
+ @staticmethod + cdef void __usbit2sbit(const uint8_t *usbits, Py_ssize_t l, int8_t *sbits) noexcept + @staticmethod + cdef void __sbit2usbit(const int8_t *sbits, Py_ssize_t l, uint8_t *usbits) noexcept + @staticmethod + cdef void __sbit2ubit(const int8_t *sbits, Py_ssize_t l, uint8_t *ubits) noexcept + @staticmethod + cdef void __ubit2sbit(const uint8_t *ubits, Py_ssize_t l, int8_t *sbits) noexcept + + @staticmethod + cdef array.array[int8_t] _usbit2sbit(const uint8_t[::1] usbits) + @staticmethod + cdef array.array[uint8_t] _sbit2usbit(const int8_t[::1] sbits) + @staticmethod + cdef array.array[uint8_t] _sbit2ubit(const int8_t[::1] sbits) + @staticmethod + cdef array.array[int8_t] _ubit2sbit(const uint8_t[::1] ubits) +
@final cdef class TxMsg(Msg): diff --git a/src/target/trx_toolkit/data_msg.pyx b/src/target/trx_toolkit/data_msg.pyx index 02194ba..5929b47 100644 --- a/src/target/trx_toolkit/data_msg.pyx +++ b/src/target/trx_toolkit/data_msg.pyx @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # cython: language_level=3 +# cython: initializedcheck=False, wraparound=False, boundscheck=False, infer_types=True
# TRX Toolkit # DATA interface message definitions and helpers @@ -20,22 +21,25 @@
from cython cimport final
+from cpython cimport array +cdef array.array _Bempty = array.array('B', []) +cdef array.array _bempty = array.array('b', []) + +from cpython.bytearray cimport PyByteArray_FromStringAndSize +from cpython.bytearray cimport PyByteArray_AS_STRING, PyByteArray_GET_SIZE +from xpy cimport PyByteArray_Resize + import random import struct import abc
from typing import List -from array import array
import gsm_shared cdef int GMSK_BURST_LEN = gsm_shared.GMSK_BURST_LEN cdef int EDGE_BURST_LEN = gsm_shared.EDGE_BURST_LEN cdef int GSM_HYPERFRAME = gsm_shared.GSM_HYPERFRAME
-# _bu2s converts unsigned byte to signed. -def _bu2s(x): - return struct.unpack('b', struct.pack('B', x))[0] - cdef list[Modulation] _mod_registry = [] cdef class Modulation: def __cinit__(self, int coding, int bl): @@ -145,29 +149,74 @@
return result
+ @staticmethod - def usbit2sbit(bits): # array[B] -> array[b] + cdef array.array[int8_t] _usbit2sbit(const uint8_t[::1] usbits): # array[B] -> array[b] ''' Convert unsigned soft-bits {254..0} to soft-bits {-127..127}. ''' - return array('b', bits.tobytes().translate(Msg._tab_usbit2sbit)) - _tab_usbit2sbit = array('b', [-127 if (b == 0xff) else 127 - b for b in range(0x100)]) + l = len(usbits) + cdef array.array[int8_t] sbits = array.clone(_bempty, l, zero=False) + Msg.__usbit2sbit(&usbits[0], l, &sbits[0]) + return sbits
@staticmethod - def sbit2usbit(bits): # array[b] -> array[B] + cdef array.array[uint8_t] _sbit2usbit(const int8_t[::1] sbits): # array[b] -> array[B] ''' Convert soft-bits {-127..127} to unsigned soft-bits {254..0}. ''' - return array('B', bits.tobytes().translate(Msg._tab_sbit2usbit)) - _tab_sbit2usbit = array('B', [127 - _bu2s(b) for b in range(0x100)]) + l = len(sbits) + cdef array.array[uint8_t] usbits = array.clone(_Bempty, l, zero=False) + Msg.__sbit2usbit(&sbits[0], l, &usbits[0]) + return usbits
@staticmethod - def sbit2ubit(bits): # array[b] -> bytearray + cdef array.array[uint8_t] _sbit2ubit(const int8_t[::1] sbits): # array[b] -> array[B] ''' Convert soft-bits {-127..127} to bits {1..0}. ''' - return bytearray(bits).translate(Msg._tab_sbit2ubit) - _tab_sbit2ubit = array('B', [int(_bu2s(b) < 0) for b in range(0x100)]) + l = len(sbits) + cdef array.array[uint8_t] ubits = array.clone(_Bempty, l, zero=False) + Msg.__sbit2ubit(&sbits[0], l, &ubits[0]) + return ubits
@staticmethod - def ubit2sbit(bits: bytearray): # -> array[b] + cdef array.array[int8_t] _ubit2sbit(const uint8_t[::1] ubits): # array[B] -> array[b] ''' Convert bits {1..0} to soft-bits {-127..127}. ''' - return array('b', bits.translate(Msg._tab_ubit2sbit)) - _tab_ubit2sbit = array('b', [-127 if b else 127 for b in range(0x100)]) + l = len(ubits) + cdef array.array[int8_t] sbits = array.clone(_bempty, l, zero=False) + Msg.__ubit2sbit(&ubits[0], l, &sbits[0]) + return sbits + + # inplace versions of Xbit2Ybit + @staticmethod + cdef void __usbit2sbit(const uint8_t *usbits, Py_ssize_t l, int8_t *sbits) noexcept: + for i in range(l): + b = usbits[i] + sbits[i] = -127 if (b == 0xff) else 127 - b + + @staticmethod + cdef void __sbit2usbit(const int8_t *sbits, Py_ssize_t l, uint8_t *usbits) noexcept: + for i in range(l): + b = sbits[i] + usbits[i] = 127 - b + + @staticmethod + cdef void __sbit2ubit(const int8_t *sbits, Py_ssize_t l, uint8_t *ubits) noexcept: + for i in range(l): + b = sbits[i] + ubits[i] = 1 if b < 0 else 0 + + @staticmethod + cdef void __ubit2sbit(const uint8_t *ubits, Py_ssize_t l, int8_t *sbits) noexcept: + for i in range(l): + b = ubits[i] + sbits[i] = -127 if b else 127 + + + # for tests (`@staticmethod cpdef` is not supported: github.com/cython/cython/issues/3327) + @staticmethod + def usbit2sbit(usbits): return Msg._usbit2sbit(usbits) + @staticmethod + def sbit2usbit(sbits): return Msg._sbit2usbit(sbits) + @staticmethod + def sbit2ubit(sbits): return Msg._sbit2ubit(sbits) + @staticmethod + def ubit2sbit(ubits): return Msg._ubit2sbit(ubits)
cdef _validate(self): @@ -370,7 +419,7 @@
# Convert burst bits if self.burst is not None: - msg.burst = self.ubit2sbit(self.burst) + msg.burst = Msg._ubit2sbit(self.burst) else: msg.nope_ind = True
@@ -659,7 +708,10 @@ ''' Generate message specific burst appending it to buf. '''
# Convert soft-bits to unsigned soft-bits - buf.extend( self.sbit2usbit(self.burst) ) # XXX copies; can probably remove them with numpy + l = PyByteArray_GET_SIZE(buf) + PyByteArray_Resize(buf, l+len(self.burst)) + Msg.__sbit2usbit(<int8_t*>self.burst.data.as_chars, len(self.burst), + <uint8_t*>&PyByteArray_AS_STRING(buf)[l])
cdef _parse_burst_v0(self, burst): ''' Parse message specific burst for header version 0. ''' @@ -680,13 +732,13 @@ cdef parse_burst(self, burst): ''' Parse message specific burst. '''
- burst = array('B', burst) + burst = array.array('B', burst)
if self.ver == 0x00: burst = self._parse_burst_v0(burst)
# Convert unsigned soft-bits to soft-bits - self.burst = self.usbit2sbit(burst) + self.burst = Msg._usbit2sbit(burst)
def rand_burst(self, length = None): ''' Generate a random message specific burst. ''' @@ -694,7 +746,7 @@ if length is None: length = self.mod_type.bl
- self.burst = array('b', [random.randint(-127, 127) for _ in range(length)]) + self.burst = array.array('b', [random.randint(-127, 127) for _ in range(length)])
def trans(self, ver = None): ''' Transform this message to TxMsg. ''' @@ -705,6 +757,8 @@
# Convert burst bits if self.burst is not None: - msg.burst = self.sbit2ubit(self.burst) + msg.burst = PyByteArray_FromStringAndSize(NULL, len(self.burst)) + Msg.__sbit2ubit(<int8_t*>&self.burst.data.as_chars[0], len(self.burst), + <uint8_t*>PyByteArray_AS_STRING(msg.burst))
return msg diff --git a/src/target/trx_toolkit/test_data_msg.py b/src/target/trx_toolkit/test_data_msg.py index 13a64a6..e6cb906 100644 --- a/src/target/trx_toolkit/test_data_msg.py +++ b/src/target/trx_toolkit/test_data_msg.py @@ -148,7 +148,7 @@
# Test both sbit2ubit() and ubit2sbit() ubits = Msg.sbit2ubit(sbits_ref) - self.assertEqual(ubits, bytearray([1] * 127 + [0] * 128)) + self.assertEqual(ubits, array('B', [1] * 127 + [0] * 128))
sbits = Msg.ubit2sbit(ubits) self.assertEqual(sbits, array('b', [-127] * 127 + [127] * 128))