kirr has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmocom-bb/+/40073?usp=email )
Change subject: trx_toolkit/data_msg: Factor constants out of *Msg classes ......................................................................
trx_toolkit/data_msg: Factor constants out of *Msg classes
Accessing those constants via either self.CONSTANT, or e.g. RxMsg.CONSTANT always do dict lookup on class .__dict__ and there is no way to change that even with switching to cdef class. Dict lookups are slow.
-> Fix that with factoring constants to be module-level cdef globals which are now accessed statically and quickly.
Change-Id: Ia117e2aa04bf7dce20bef61af56d66a07fe24778 --- M src/target/trx_toolkit/data_msg.pyx 1 file changed, 44 insertions(+), 40 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/73/40073/1
diff --git a/src/target/trx_toolkit/data_msg.pyx b/src/target/trx_toolkit/data_msg.pyx index 81f93ca..ccbdd71 100644 --- a/src/target/trx_toolkit/data_msg.pyx +++ b/src/target/trx_toolkit/data_msg.pyx @@ -68,12 +68,14 @@ ModAQPSK = Modulation(0b1100, 2 * GMSK_BURST_LEN)
+cdef int KNOWN_VERSION_MAX = 1 # 0, 1 + class Msg(abc.ABC): ''' TRXD (DATA) message coding API (common part). '''
# NOTE: up to 16 versions can be encoded CHDR_VERSION_MAX = 0b1111 - KNOWN_VERSIONS = (0, 1) + KNOWN_VERSIONS = tuple(range(KNOWN_VERSION_MAX+1))
def __init__(self, fn = None, tn = None, burst = None, ver = 0): self.burst = burst # bytes|bytearray for ubit, array[b] for sbit, array[B] for usbit @@ -166,7 +168,7 @@ def validate(self): ''' Validate the message fields (throws ValueError). '''
- if not self.ver in self.KNOWN_VERSIONS: + if not (0 <= self.ver <= KNOWN_VERSION_MAX): raise ValueError("Unknown TRXD header version %d" % self.ver)
if self.fn is None: @@ -219,7 +221,7 @@
# Parse the header version first self.ver = (msg[0] >> 4) - if not self.ver in self.KNOWN_VERSIONS: + if not (0 <= self.ver <= KNOWN_VERSION_MAX): raise ValueError("Unknown TRXD header version %d" % self.ver)
# Parse TDMA TN and FN @@ -241,13 +243,14 @@ msg_burst = memoryview(msg)[self.HDR_LEN:] self.parse_burst(msg_burst)
+ +# Constants +cdef int PWR_MIN = 0x00 +cdef int PWR_MAX = 0xff + class TxMsg(Msg): ''' Tx (L1 -> TRX) message coding API. '''
- # Constants - PWR_MIN = 0x00 - PWR_MAX = 0xff - # Specific message fields pwr = None
@@ -275,7 +278,7 @@ if self.pwr is None: raise ValueError("Tx Attenuation level is not set")
- if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX: + if self.pwr < PWR_MIN or self.pwr > PWR_MAX: raise ValueError("Tx Attenuation %d is out of range" % self.pwr)
# FIXME: properly handle IDLE / NOPE indications @@ -290,10 +293,10 @@ ''' Generate a random power level. '''
if min is None: - min = self.PWR_MIN + min = PWR_MIN
if max is None: - max = self.PWR_MAX + max = PWR_MAX
return random.randint(min, max)
@@ -367,27 +370,28 @@
return msg
+ +# rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise +cdef int RSSI_MIN = -120 +cdef int RSSI_MAX = -47 + +# Min and max values of int16_t +cdef int TOA256_MIN = -32768 +cdef int TOA256_MAX = 32767 + +# TSC (Training Sequence Code) range is [0, 7] +cdef int TSC_MAX = 7 + +# C/I range (in centiBels) +cdef int CI_MIN = -1280 +cdef int CI_MAX = 1280 + +# IDLE frame / nope detection indicator +cdef int NOPE_IND = (1 << 7) + class RxMsg(Msg): ''' Rx (TRX -> L1) message coding API. '''
- # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise - RSSI_MIN = -120 - RSSI_MAX = -47 - - # Min and max values of int16_t - TOA256_MIN = -32768 - TOA256_MAX = 32767 - - # TSC (Training Sequence Code) range - TSC_RANGE = range(0, 8) - - # C/I range (in centiBels) - CI_MIN = -1280 - CI_MAX = 1280 - - # IDLE frame / nope detection indicator - NOPE_IND = (1 << 7) - # Specific message fields rssi = None toa256 = None @@ -459,13 +463,13 @@ if self.rssi is None: raise ValueError("RSSI is not set")
- if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX: + if self.rssi < RSSI_MIN or self.rssi > RSSI_MAX: raise ValueError("RSSI %d is out of range" % self.rssi)
if self.toa256 is None: raise ValueError("ToA256 is not set")
- if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX: + if self.toa256 < TOA256_MIN or self.toa256 > TOA256_MAX: raise ValueError("ToA256 %d is out of range" % self.toa256)
# Version specific parameters (omited for NOPE.ind) @@ -486,7 +490,7 @@ if self.tsc is None: raise ValueError("TSC is not set")
- if self.tsc not in self.TSC_RANGE: + if not (0 <= self.tsc <= TSC_MAX): raise ValueError("TSC %d is out of range" % self.tsc)
# Version specific parameters (also present in NOPE.ind) @@ -494,7 +498,7 @@ if self.ci is None: raise ValueError("C/I is not set")
- if self.ci < self.CI_MIN or self.ci > self.CI_MAX: + if self.ci < CI_MIN or self.ci > CI_MAX: raise ValueError("C/I %d is out of range" % self.ci)
self.validate_burst() @@ -503,10 +507,10 @@ ''' Generate a random RSSI value. '''
if min is None: - min = self.RSSI_MIN + min = RSSI_MIN
if max is None: - max = self.RSSI_MAX + max = RSSI_MAX
return random.randint(min, max)
@@ -514,10 +518,10 @@ ''' Generate a random ToA (Time of Arrival) value. '''
if min is None: - min = self.TOA256_MIN + min = TOA256_MIN
if max is None: - max = self.TOA256_MAX + max = TOA256_MAX
return random.randint(min, max)
@@ -534,10 +538,10 @@ self.tsc_set = random.randint(0, 3) else: self.tsc_set = random.randint(0, 1) - self.tsc = random.choice(self.TSC_RANGE) + self.tsc = random.randint(0, TSC_MAX)
# C/I: Carrier-to-Interference ratio - self.ci = random.randint(self.CI_MIN, self.CI_MAX) + self.ci = random.randint(CI_MIN, CI_MAX)
def desc_hdr(self): ''' Generate human-readable header description. ''' @@ -572,7 +576,7 @@
# IDLE / nope indication has no MTS info if self.nope_ind: - return self.NOPE_IND + return NOPE_IND
# TSC: . . . . . X X X mts = self.tsc & 0b111 @@ -587,7 +591,7 @@ ''' Parse Modulation and Training Sequence info. '''
# IDLE / nope indication has no MTS info - self.nope_ind = (mts & self.NOPE_IND) > 0 + self.nope_ind = (mts & NOPE_IND) > 0 if self.nope_ind: self.mod_type = None self.tsc_set = None