kirr has uploaded this change for review. (
https://gerrit.osmocom.org/c/osmocom-bb/+/40076?usp=email )
Change subject: trx_toolkit/data_msg: Optimize bit conversions
......................................................................
trx_toolkit/data_msg: Optimize bit conversions
- implement convertions in C (via pyx) directly
- add in-place versions
- use those in-place versions as needed
Change-Id: Ib8237a277f97292cd99dfda44972372d370f0ada
---
M src/target/trx_toolkit/data_msg.pxd
M src/target/trx_toolkit/data_msg.pyx
M src/target/trx_toolkit/test_data_msg.py
3 files changed, 97 insertions(+), 25 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/76/40076/1
diff --git a/src/target/trx_toolkit/data_msg.pxd b/src/target/trx_toolkit/data_msg.pxd
index cbdca4b..2823b82 100644
--- a/src/target/trx_toolkit/data_msg.pxd
+++ b/src/target/trx_toolkit/data_msg.pxd
@@ -3,7 +3,7 @@
from cython cimport final
from cpython cimport array
-from libc.stdint cimport uint8_t
+from libc.stdint cimport int8_t, uint8_t
cdef class Modulation:
cdef int coding # Coding in TRXD header
@@ -36,6 +36,24 @@
cdef parse_burst(self, burst)
cdef _validate(self)
+ @staticmethod
+ cdef void __usbit2sbit(const uint8_t *usbits, Py_ssize_t l, int8_t *sbits) noexcept
+ @staticmethod
+ cdef void __sbit2usbit(const int8_t *sbits, Py_ssize_t l, uint8_t *usbits) noexcept
+ @staticmethod
+ cdef void __sbit2ubit(const int8_t *sbits, Py_ssize_t l, uint8_t *ubits) noexcept
+ @staticmethod
+ cdef void __ubit2sbit(const uint8_t *ubits, Py_ssize_t l, int8_t *sbits) noexcept
+
+ @staticmethod
+ cdef array.array[int8_t] _usbit2sbit(const uint8_t[::1] usbits)
+ @staticmethod
+ cdef array.array[uint8_t] _sbit2usbit(const int8_t[::1] sbits)
+ @staticmethod
+ cdef array.array[uint8_t] _sbit2ubit(const int8_t[::1] sbits)
+ @staticmethod
+ cdef array.array[int8_t] _ubit2sbit(const uint8_t[::1] ubits)
+
@final
cdef class TxMsg(Msg):
diff --git a/src/target/trx_toolkit/data_msg.pyx b/src/target/trx_toolkit/data_msg.pyx
index 02194ba..5929b47 100644
--- a/src/target/trx_toolkit/data_msg.pyx
+++ b/src/target/trx_toolkit/data_msg.pyx
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# cython: language_level=3
+# cython: initializedcheck=False, wraparound=False, boundscheck=False, infer_types=True
# TRX Toolkit
# DATA interface message definitions and helpers
@@ -20,22 +21,25 @@
from cython cimport final
+from cpython cimport array
+cdef array.array _Bempty = array.array('B', [])
+cdef array.array _bempty = array.array('b', [])
+
+from cpython.bytearray cimport PyByteArray_FromStringAndSize
+from cpython.bytearray cimport PyByteArray_AS_STRING, PyByteArray_GET_SIZE
+from xpy cimport PyByteArray_Resize
+
import random
import struct
import abc
from typing import List
-from array import array
import gsm_shared
cdef int GMSK_BURST_LEN = gsm_shared.GMSK_BURST_LEN
cdef int EDGE_BURST_LEN = gsm_shared.EDGE_BURST_LEN
cdef int GSM_HYPERFRAME = gsm_shared.GSM_HYPERFRAME
-# _bu2s converts unsigned byte to signed.
-def _bu2s(x):
- return struct.unpack('b', struct.pack('B', x))[0]
-
cdef list[Modulation] _mod_registry = []
cdef class Modulation:
def __cinit__(self, int coding, int bl):
@@ -145,29 +149,74 @@
return result
+
@staticmethod
- def usbit2sbit(bits): # array[B] -> array[b]
+ cdef array.array[int8_t] _usbit2sbit(const uint8_t[::1] usbits): # array[B] ->
array[b]
''' Convert unsigned soft-bits {254..0} to soft-bits {-127..127}.
'''
- return array('b', bits.tobytes().translate(Msg._tab_usbit2sbit))
- _tab_usbit2sbit = array('b', [-127 if (b == 0xff) else 127 - b for b in
range(0x100)])
+ l = len(usbits)
+ cdef array.array[int8_t] sbits = array.clone(_bempty, l, zero=False)
+ Msg.__usbit2sbit(&usbits[0], l, &sbits[0])
+ return sbits
@staticmethod
- def sbit2usbit(bits): # array[b] -> array[B]
+ cdef array.array[uint8_t] _sbit2usbit(const int8_t[::1] sbits): # array[b] ->
array[B]
''' Convert soft-bits {-127..127} to unsigned soft-bits {254..0}.
'''
- return array('B', bits.tobytes().translate(Msg._tab_sbit2usbit))
- _tab_sbit2usbit = array('B', [127 - _bu2s(b) for b in range(0x100)])
+ l = len(sbits)
+ cdef array.array[uint8_t] usbits = array.clone(_Bempty, l, zero=False)
+ Msg.__sbit2usbit(&sbits[0], l, &usbits[0])
+ return usbits
@staticmethod
- def sbit2ubit(bits): # array[b] -> bytearray
+ cdef array.array[uint8_t] _sbit2ubit(const int8_t[::1] sbits): # array[b] ->
array[B]
''' Convert soft-bits {-127..127} to bits {1..0}. '''
- return bytearray(bits).translate(Msg._tab_sbit2ubit)
- _tab_sbit2ubit = array('B', [int(_bu2s(b) < 0) for b in range(0x100)])
+ l = len(sbits)
+ cdef array.array[uint8_t] ubits = array.clone(_Bempty, l, zero=False)
+ Msg.__sbit2ubit(&sbits[0], l, &ubits[0])
+ return ubits
@staticmethod
- def ubit2sbit(bits: bytearray): # -> array[b]
+ cdef array.array[int8_t] _ubit2sbit(const uint8_t[::1] ubits): # array[B] ->
array[b]
''' Convert bits {1..0} to soft-bits {-127..127}. '''
- return array('b', bits.translate(Msg._tab_ubit2sbit))
- _tab_ubit2sbit = array('b', [-127 if b else 127 for b in range(0x100)])
+ l = len(ubits)
+ cdef array.array[int8_t] sbits = array.clone(_bempty, l, zero=False)
+ Msg.__ubit2sbit(&ubits[0], l, &sbits[0])
+ return sbits
+
+ # inplace versions of Xbit2Ybit
+ @staticmethod
+ cdef void __usbit2sbit(const uint8_t *usbits, Py_ssize_t l, int8_t *sbits) noexcept:
+ for i in range(l):
+ b = usbits[i]
+ sbits[i] = -127 if (b == 0xff) else 127 - b
+
+ @staticmethod
+ cdef void __sbit2usbit(const int8_t *sbits, Py_ssize_t l, uint8_t *usbits) noexcept:
+ for i in range(l):
+ b = sbits[i]
+ usbits[i] = 127 - b
+
+ @staticmethod
+ cdef void __sbit2ubit(const int8_t *sbits, Py_ssize_t l, uint8_t *ubits) noexcept:
+ for i in range(l):
+ b = sbits[i]
+ ubits[i] = 1 if b < 0 else 0
+
+ @staticmethod
+ cdef void __ubit2sbit(const uint8_t *ubits, Py_ssize_t l, int8_t *sbits) noexcept:
+ for i in range(l):
+ b = ubits[i]
+ sbits[i] = -127 if b else 127
+
+
+ # for tests (`@staticmethod cpdef` is not supported:
github.com/cython/cython/issues/3327)
+ @staticmethod
+ def usbit2sbit(usbits): return Msg._usbit2sbit(usbits)
+ @staticmethod
+ def sbit2usbit(sbits): return Msg._sbit2usbit(sbits)
+ @staticmethod
+ def sbit2ubit(sbits): return Msg._sbit2ubit(sbits)
+ @staticmethod
+ def ubit2sbit(ubits): return Msg._ubit2sbit(ubits)
cdef _validate(self):
@@ -370,7 +419,7 @@
# Convert burst bits
if self.burst is not None:
- msg.burst = self.ubit2sbit(self.burst)
+ msg.burst = Msg._ubit2sbit(self.burst)
else:
msg.nope_ind = True
@@ -659,7 +708,10 @@
''' Generate message specific burst appending it to buf. '''
# Convert soft-bits to unsigned soft-bits
- buf.extend( self.sbit2usbit(self.burst) ) # XXX copies; can probably remove them with
numpy
+ l = PyByteArray_GET_SIZE(buf)
+ PyByteArray_Resize(buf, l+len(self.burst))
+ Msg.__sbit2usbit(<int8_t*>self.burst.data.as_chars, len(self.burst),
+ <uint8_t*>&PyByteArray_AS_STRING(buf)[l])
cdef _parse_burst_v0(self, burst):
''' Parse message specific burst for header version 0. '''
@@ -680,13 +732,13 @@
cdef parse_burst(self, burst):
''' Parse message specific burst. '''
- burst = array('B', burst)
+ burst = array.array('B', burst)
if self.ver == 0x00:
burst = self._parse_burst_v0(burst)
# Convert unsigned soft-bits to soft-bits
- self.burst = self.usbit2sbit(burst)
+ self.burst = Msg._usbit2sbit(burst)
def rand_burst(self, length = None):
''' Generate a random message specific burst. '''
@@ -694,7 +746,7 @@
if length is None:
length = self.mod_type.bl
- self.burst = array('b', [random.randint(-127, 127) for _ in range(length)])
+ self.burst = array.array('b', [random.randint(-127, 127) for _ in
range(length)])
def trans(self, ver = None):
''' Transform this message to TxMsg. '''
@@ -705,6 +757,8 @@
# Convert burst bits
if self.burst is not None:
- msg.burst = self.sbit2ubit(self.burst)
+ msg.burst = PyByteArray_FromStringAndSize(NULL, len(self.burst))
+ Msg.__sbit2ubit(<int8_t*>&self.burst.data.as_chars[0], len(self.burst),
+ <uint8_t*>PyByteArray_AS_STRING(msg.burst))
return msg
diff --git a/src/target/trx_toolkit/test_data_msg.py
b/src/target/trx_toolkit/test_data_msg.py
index 13a64a6..e6cb906 100644
--- a/src/target/trx_toolkit/test_data_msg.py
+++ b/src/target/trx_toolkit/test_data_msg.py
@@ -148,7 +148,7 @@
# Test both sbit2ubit() and ubit2sbit()
ubits = Msg.sbit2ubit(sbits_ref)
- self.assertEqual(ubits, bytearray([1] * 127 + [0] * 128))
+ self.assertEqual(ubits, array('B', [1] * 127 + [0] * 128))
sbits = Msg.ubit2sbit(ubits)
self.assertEqual(sbits, array('b', [-127] * 127 + [127] * 128))
--
To view, visit
https://gerrit.osmocom.org/c/osmocom-bb/+/40076?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Change-Id: Ib8237a277f97292cd99dfda44972372d370f0ada
Gerrit-Change-Number: 40076
Gerrit-PatchSet: 1
Gerrit-Owner: kirr <kirr(a)nexedi.com>
Gerrit-CC: fixeria <vyanitskiy(a)sysmocom.de>
Gerrit-CC: osmith <osmith(a)sysmocom.de>
Gerrit-CC: pespin <pespin(a)sysmocom.de>