This is merely a historical archive of years 2008-2021, before the migration to mailman3.
A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.
fixeria gerrit-no-reply at lists.osmocom.orgfixeria has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmocom-bb/+/24019 ) Change subject: trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py ...................................................................... trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc Related: OS#4006, SYS#4895 --- M src/target/trx_toolkit/burst_fwd.py M src/target/trx_toolkit/data_dump.py M src/target/trx_toolkit/data_if.py M src/target/trx_toolkit/data_msg.py M src/target/trx_toolkit/fake_trx.py M src/target/trx_toolkit/test_data_dump.py M src/target/trx_toolkit/test_data_msg.py M src/target/trx_toolkit/transceiver.py M src/target/trx_toolkit/trx_sniff.py 9 files changed, 283 insertions(+), 672 deletions(-) git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/19/24019/1 diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py index 2e9e97b..03ce6e6 100644 --- a/src/target/trx_toolkit/burst_fwd.py +++ b/src/target/trx_toolkit/burst_fwd.py @@ -3,7 +3,7 @@ # TRX Toolkit # Burst forwarding between transceivers # -# (C) 2017-2020 by Vadim Yanitskiy <axilirator at gmail.com> +# (C) 2017-2021 by Vadim Yanitskiy <axilirator at gmail.com> # Contributions by sysmocom - s.f.m.c. GmbH # # All Rights Reserved @@ -25,6 +25,7 @@ import logging as log from trx_list import TRXList +from data_msg import RxMsg class BurstForwarder(TRXList): """ Performs burst forwarding between transceivers. @@ -48,11 +49,11 @@ def forward_msg(self, src_trx, rx_msg): # Originating Transceiver may use frequency hopping, # so let's precalculate its Tx frequency in advance - tx_freq = src_trx.get_tx_freq(rx_msg.fn) + tx_freq = src_trx.get_tx_freq(rx_msg.c['fn']) if src_trx.rf_muted: - del rx_msg.burst # burst bits are omited - rx_msg.burst = None + # Burst bits are omited + rx_msg.c['burst'].clear() # Iterate over all known transceivers for trx in self.trx_list: @@ -62,13 +63,13 @@ # Check transceiver state if not trx.running: continue - if rx_msg.tn not in trx.ts_list: + if rx_msg.c['tn'] not in trx.ts_list: continue # Match Tx/Rx frequencies of the both transceivers - if trx.get_rx_freq(rx_msg.fn) != tx_freq: + if trx.get_rx_freq(rx_msg.c['fn']) != tx_freq: continue - # Transform from TxMsg to RxMsg and forward - tx_msg = rx_msg.trans(ver = trx.data_if._hdr_ver) + # Transform from L12TRX to TRX2L1 and forward + tx_msg = rx_msg.trans(RxMsg, trx.data_if._hdr_ver) trx.handle_data_msg(src_trx, rx_msg, tx_msg) diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py index 8510e2d..fe8a2d8 100644 --- a/src/target/trx_toolkit/data_dump.py +++ b/src/target/trx_toolkit/data_dump.py @@ -44,7 +44,7 @@ raise ValueError("Unknown message type") # Generate a message payload - msg_raw = msg.gen_msg() + msg_raw = msg.to_bytes() # Calculate and pack the message length msg_len = len(msg_raw) @@ -118,7 +118,7 @@ # a parsed message in case of success, # or None in case of EOF or header parsing error, # or False in case of message parsing error. - def _parse_msg(self): + def _from_bytes(self): # Attempt to read a message header hdr_raw = self.f.read(self.HDR_LENGTH) if len(hdr_raw) != self.HDR_LENGTH: @@ -142,7 +142,7 @@ # Attempt to parse a message try: msg_raw = bytearray(msg_raw) - msg.parse_msg(msg_raw) + msg.from_bytes(msg_raw) except: log.error("Couldn't parse a message, skipping...") return False @@ -155,7 +155,7 @@ # a parsed message in case of success, # or None in case of EOF, out of range, or header parsing error, # or False in case of message parsing error. - def parse_msg(self, idx): + def from_bytes(self, idx): # Move descriptor to the beginning of requested message rc = self._seek2msg(idx) if not rc: @@ -163,7 +163,7 @@ return None # Attempt to parse a message - return self._parse_msg() + return self._from_bytes() # Parses all messages from a given file # Return value: @@ -185,7 +185,7 @@ # Read the capture in loop... while True: # Attempt to parse a message - msg = self._parse_msg() + msg = self._from_bytes() # EOF or broken header if msg is None: diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py index 1cded9b..f754c6c 100644 --- a/src/target/trx_toolkit/data_if.py +++ b/src/target/trx_toolkit/data_if.py @@ -23,6 +23,8 @@ import logging as log +from typing import Optional + from udp_link import UDPLink from data_msg import * @@ -50,64 +52,43 @@ # No suitable version found return -1 - def match_hdr_ver(self, msg): - if msg.ver == self._hdr_ver: - return True - - log.error("(%s) Rx DATA message (%s) with unexpected header " - "version %u (!= expected %u), ignoring..." - % (self.desc_link(), msg.desc_hdr(), - msg.ver, self._hdr_ver)) - return False - - def recv_raw_data(self): + def recv_raw_data(self) -> bytes: data, _ = self.sock.recvfrom(512) return data - def recv_tx_msg(self): + def recv_tx_msg(self) -> Optional[TxMsg]: # Read raw data from socket data = self.recv_raw_data() # Attempt to parse a TRXD Tx message try: - msg = TxMsg() - msg.parse_msg(bytearray(data)) + msg = TxMsg(self._hdr_ver) + msg.from_bytes(data) except: log.error("Failed to parse a TRXD Tx message " "from R:%s:%u" % (self.remote_addr, self.remote_port)) return None - # Make sure the header version matches - # the configured one (self._hdr_ver) - if not self.match_hdr_ver(msg): - return None - return msg - def recv_rx_msg(self): + def recv_rx_msg(self) -> Optional[RxMsg]: # Read raw data from socket data = self.recv_raw_data() # Attempt to parse a TRXD Rx message try: - msg = RxMsg() - msg.parse_msg(bytearray(data)) + msg = RxMsg(self._hdr_ver) + msg.from_bytes(data) except: log.error("Failed to parse a TRXD Rx message " "from R:%s:%u" % (self.remote_addr, self.remote_port)) return None - # Make sure the header version matches - # the configured one (self._hdr_ver) - if not self.match_hdr_ver(msg): - return None - return msg - def send_msg(self, msg, legacy = False): + def send_msg(self, msg: RxMsg) -> None: try: - # Validate and encode a TRXD message - payload = msg.gen_msg(legacy) + payload = msg.to_bytes() except ValueError as e: log.error("Failed to encode a TRXD message ('%s') " "due to error: %s" % (msg.desc_hdr(), e)) diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py index 7e785f9..570cfa0 100644 --- a/src/target/trx_toolkit/data_msg.py +++ b/src/target/trx_toolkit/data_msg.py @@ -3,7 +3,8 @@ # TRX Toolkit # DATA interface message definitions and helpers # -# (C) 2018-2019 by Vadim Yanitskiy <axilirator at gmail.com> +# (C) 2018-2021 by Vadim Yanitskiy <axilirator at gmail.com> +# (C) 2021 by sysmocom - s.f.m.c. GmbH <info at sysmocom.de> # # All Rights Reserved # @@ -25,35 +26,39 @@ import struct import abc -from typing import List +from typing import Any, Type, List, Tuple, Dict from enum import Enum from gsm_shared import * +import trxd_proto +import codec + class Modulation(Enum): """ Modulation types defined in 3GPP TS 45.002 """ - ModGMSK = (0b0000, 1 * GMSK_BURST_LEN) - Mod8PSK = (0b0100, 3 * GMSK_BURST_LEN) - ModGMSK_AB = (0b0110, 1 * GMSK_BURST_LEN) - ModRFU = (0b0111, 0) # Reserved for Future Use - Mod16QAM = (0b1000, 4 * GMSK_BURST_LEN) - Mod32QAM = (0b1010, 5 * GMSK_BURST_LEN) - ModAQPSK = (0b1100, 2 * GMSK_BURST_LEN) + ModGMSK = (0b0000, 0b1100, 1 * GMSK_BURST_LEN) + Mod8PSK = (0b0100, 0b1110, 3 * GMSK_BURST_LEN) + ModGMSK_AB = (0b0110, 0b1111, 1 * GMSK_BURST_LEN) + ModRFU = (0b0111, 0b1111, 0) # Reserved for Future Use + Mod16QAM = (0b1000, 0b1110, 4 * GMSK_BURST_LEN) + Mod32QAM = (0b1010, 0b1110, 5 * GMSK_BURST_LEN) + ModAQPSK = (0b1100, 0b1100, 2 * GMSK_BURST_LEN) - def __init__(self, coding, bl): + def __init__(self, val: int, mask: int, bl: int): # Coding in TRXD header - self.coding = coding + self.val = val + self.mask = mask # Burst length self.bl = bl @classmethod - def pick(self, coding): + def pick(self, val: int): for mod in list(self): - if mod.coding == coding: + if (val & mod.mask) == mod.val: return mod return None @classmethod - def pick_by_bl(self, bl): + def pick_by_bl(self, bl: int): for mod in list(self): if mod.bl == bl: return mod @@ -66,66 +71,87 @@ CHDR_VERSION_MAX = 0b1111 KNOWN_VERSIONS = (0, 1) - def __init__(self, fn = None, tn = None, burst = None, ver = 0): - self.burst = burst - self.ver = ver - self.fn = fn - self.tn = tn + # PDU codecs for all known versions + CODECS = NotImplemented # type: Tuple[codec.Envelope, ...] - @property - def CHDR_LEN(self): - ''' The common header length. ''' - return 1 + 4 # (VER + TN) + FN + # Default PDU content for child types + DEF_CONT = NotImplemented # type: Dict[str, Any] + + def __init__(self, ver: int, cont: Dict[str, Any] = { }): + # TRXD PDU version + if not ver in self.KNOWN_VERSIONS: + raise ValueError("Unknown TRXD PDU version %d" % ver) + self._ver = ver + + # TRXD PDU codec + self.codec = self.CODECS[ver] + + # Content of the message + self.c = { + # Default TDMA frame/timeslot number + 'fn' : 0, + 'tn' : 0, + # NOPE / IDLE frame indication + 'nope' : False, + # Modulation type and TSC info + 'mod_type' : Modulation.ModGMSK, + 'tsc_set' : 0, + 'tsc' : 0, + # Burst hard-/soft-bits + 'burst' : [], + # Default fields for particular class + **self.DEF_CONT, + # Fields provided during instantiation + **cont + } # type: Dict[str, Any] + + def __getitem__(self, key: str) -> Any: + return self.c[key] + + def __setitem__(self, key: str, val: Any) -> None: + self.c[key] = val + + def __delitem__(self, key: str) -> None: + del self.c[key] @abc.abstractmethod - def gen_hdr(self): - ''' Generate message specific header. ''' - - @abc.abstractmethod - def parse_hdr(self, hdr): - ''' Parse message specific header. ''' - - @abc.abstractmethod - def gen_burst(self): + def gen_burst(self) -> None: ''' Generate message specific burst. ''' @abc.abstractmethod - def parse_burst(self, burst): + def parse_burst(self) -> None: ''' Parse message specific burst. ''' @abc.abstractmethod - def rand_burst(self): + def rand_burst(self) -> None: ''' Generate a random message specific burst. ''' - def rand_fn(self): + @abc.abstractmethod + def trans_burst(self) -> List[int]: + ''' Convert between hard-bits and soft-bits. ''' + + def rand_fn(self) -> int: ''' Generate a random frame number. ''' return random.randint(0, GSM_HYPERFRAME) - def rand_tn(self): + def rand_tn(self) -> int: ''' Generate a random timeslot number. ''' return random.randint(0, 7) - def rand_hdr(self): + def rand_hdr(self) -> None: ''' Randomize the message header. ''' - self.fn = self.rand_fn() - self.tn = self.rand_tn() + self.c['fn'] = self.rand_fn() + self.c['tn'] = self.rand_tn() - def desc_hdr(self): + def desc_hdr(self) -> str: ''' Generate human-readable header description. ''' result = "" - if self.ver > 0: - result += ("ver=%u " % self.ver) - - if self.fn is not None: - result += ("fn=%u " % self.fn) - - if self.tn is not None: - result += ("tn=%u " % self.tn) - - if self.burst is not None and len(self.burst) > 0: - result += ("bl=%u " % len(self.burst)) + result += ("ver=%u " % self._ver) + result += ("fn=%u tn=%u " % (self.c['fn'], self.c['tn'])) + if self.c['burst']: + result += ("bl=%u " % len(self.c['burst'])) return result @@ -149,131 +175,76 @@ ''' Convert bits {1..0} to soft-bits {-127..127}. ''' return [-127 if b else 127 for b in bits] - def validate(self): + def validate(self) -> None: ''' Validate the message fields (throws ValueError). ''' - if not self.ver in self.KNOWN_VERSIONS: - raise ValueError("Unknown TRXD header version %d" % self.ver) + if self.c['fn'] < 0 or self.c['fn'] > GSM_HYPERFRAME: + raise ValueError("TDMA frame-number %d is out of range" % self.c['fn']) - if self.fn is None: - raise ValueError("TDMA frame-number is not set") + if self.c['tn'] < 0 or self.c['tn'] > 7: + raise ValueError("TDMA time-slot %d is out of range" % self.c['tn']) - if self.fn < 0 or self.fn > GSM_HYPERFRAME: - raise ValueError("TDMA frame-number %d is out of range" % self.fn) - - if self.tn is None: - raise ValueError("TDMA time-slot is not set") - - if self.tn < 0 or self.tn > 7: - raise ValueError("TDMA time-slot %d is out of range" % self.tn) - - def gen_msg(self, legacy = False): + def to_bytes(self) -> bytes: ''' Generate a TRX DATA message. ''' + if not self.c['nope']: + self.gen_burst() + return self.codec._to_bytes(self.c) - # Validate all the fields - self.validate() - - # Allocate an empty byte-array - buf = bytearray() - - # Put version (4 bits) and TDMA TN (3 bits) - buf.append((self.ver << 4) | (self.tn & 0x07)) - - # Put TDMA FN (4 octets, BE) - buf += struct.pack(">L", self.fn) - - # Generate message specific header part - hdr = self.gen_hdr() - buf += hdr - - # Generate burst - if self.burst is not None: - buf += self.gen_burst() - - # This is a rudiment from (legacy) OpenBTS transceiver, - # some L1 implementations still expect two dummy bytes. - if legacy and self.ver == 0x00: - buf += bytearray(2) - - return buf - - def parse_msg(self, msg): + def from_bytes(self, msg: bytes) -> None: ''' Parse a TRX DATA message. ''' + self.codec._from_bytes(self.c, msg) + if not self.c['nope']: + self.parse_burst() - # Make sure we have at least common header - if len(msg) < self.CHDR_LEN: - raise ValueError("Message is to short: missing common header") + def trans(self, cls: Type['Msg'], ver: int) -> 'Msg': + ''' Transform between L12TRX and TRX2L1. ''' - # Parse the header version first - self.ver = (msg[0] >> 4) - if not self.ver in self.KNOWN_VERSIONS: - raise ValueError("Unknown TRXD header version %d" % self.ver) + # Allocate a new message + msg = cls(ver, { + 'fn' : self.c['fn'], + 'tn' : self.c['tn'], + }) - # Parse TDMA TN and FN - self.tn = (msg[0] & 0x07) - self.fn = struct.unpack(">L", msg[1:5])[0] - - # Make sure we have the whole header, - # including the version specific fields - if len(msg) < self.HDR_LEN: - raise ValueError("Message is to short: missing version specific header") - - # Specific message part - self.parse_hdr(msg) - - # Copy burst, skipping header - msg_burst = msg[self.HDR_LEN:] - if len(msg_burst) > 0: - self.parse_burst(msg_burst) + # Convert burst bits + if self.c['burst'] and not self.c['nope']: + msg.c['burst'] = self.trans_burst() else: - self.burst = None + msg.c['nope'] = True + + return msg class TxMsg(Msg): ''' Tx (L1 -> TRX) message coding API. ''' + # PDU codecs for all known versions + CODECS = ( + trxd_proto.PDUv0Tx(), + trxd_proto.PDUv1Tx(), + ) + + DEF_CONT = { + # Power reduction (in dB) + 'pwr' : 0, + } + # Constants PWR_MIN = 0x00 PWR_MAX = 0xff - # Specific message fields - pwr = None - - @property - def HDR_LEN(self): - ''' Calculate header length depending on its version. ''' - - # Common header length - length = self.CHDR_LEN - - # Message specific header length - if self.ver in (0x00, 0x01): - length += 1 # PWR - else: - raise IndexError("Unhandled version %u" % self.ver) - - return length - - def validate(self): + def validate(self) -> None: ''' Validate the message fields (throws ValueError). ''' # Validate common fields Msg.validate(self) - if self.pwr is None: - raise ValueError("Tx Attenuation level is not set") - - if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX: - raise ValueError("Tx Attenuation %d is out of range" % self.pwr) + if self.c['pwr'] < self.PWR_MIN or self.c['pwr'] > self.PWR_MAX: + raise ValueError("Tx Attenuation %d is out of range" % self.c['pwr']) # FIXME: properly handle IDLE / NOPE indications - if self.burst is None: - raise ValueError("Tx burst bits are not set") + if len(self.c['burst']) not in (GMSK_BURST_LEN, EDGE_BURST_LEN): + raise ValueError("Tx burst has odd length %u" % len(self.c['burst'])) - # FIXME: properly handle IDLE / NOPE indications - if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN): - raise ValueError("Tx burst has odd length %u" % len(self.burst)) - - def rand_pwr(self, min = None, max = None): + def rand_pwr(self, min = None, max = None) -> int: ''' Generate a random power level. ''' if min is None: @@ -284,80 +255,55 @@ return random.randint(min, max) - def rand_hdr(self): + def rand_hdr(self) -> None: ''' Randomize message specific header. ''' Msg.rand_hdr(self) - self.pwr = self.rand_pwr() + self.c['pwr'] = self.rand_pwr() - def desc_hdr(self): + def desc_hdr(self) -> str: ''' Generate human-readable header description. ''' # Describe the common part result = Msg.desc_hdr(self) - if self.pwr is not None: - result += ("pwr=%u " % self.pwr) + result += ("pwr=%u " % self.c['pwr']) # Strip useless whitespace and return return result.strip() - def gen_hdr(self): - ''' Generate message specific header part. ''' - - # Allocate an empty byte-array - buf = bytearray() - - # Put power - buf.append(self.pwr) - - return buf - - def parse_hdr(self, hdr): - ''' Parse message specific header part. ''' - - # Parse power level - self.pwr = hdr[5] - - def gen_burst(self): + def gen_burst(self) -> None: ''' Generate message specific burst. ''' + self.c['hard-bits'] = bytes(self.c['burst']) - # Copy burst 'as is' - return bytearray(self.burst) - - def parse_burst(self, burst): + def parse_burst(self) -> None: ''' Parse message specific burst. ''' + self.c['burst'] = list(self.c['hard-bits']) - length = len(burst) - - # Distinguish between GSM and EDGE - if length >= EDGE_BURST_LEN: - self.burst = list(burst[:EDGE_BURST_LEN]) - else: - self.burst = list(burst[:GMSK_BURST_LEN]) - - def rand_burst(self, length = GMSK_BURST_LEN): + def rand_burst(self, length = GMSK_BURST_LEN) -> None: ''' Generate a random message specific burst. ''' - self.burst = [random.randint(0, 1) for _ in range(length)] + self.c['burst'] = [random.randint(0, 1) for _ in range(length)] - def trans(self, ver = None): - ''' Transform this message into RxMsg. ''' - - # Allocate a new message - msg = RxMsg(fn = self.fn, tn = self.tn, - ver = self.ver if ver is None else ver) - - # Convert burst bits - if self.burst is not None: - msg.burst = self.ubit2sbit(self.burst) - else: - msg.nope_ind = True - - return msg + def trans_burst(self) -> List[int]: + ''' Transform hard-bits into soft-bits. ''' + return self.ubit2sbit(self.c['burst']) class RxMsg(Msg): ''' Rx (TRX -> L1) message coding API. ''' + # PDU codecs for all known versions + CODECS = ( + trxd_proto.PDUv0Rx(), + trxd_proto.PDUv1Rx(), + ) + + DEF_CONT = { + # Specific message fields + 'rssi' : -50, + 'toa256' : 0, + 'cir' : 0, + } + # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise RSSI_MIN = -120 RSSI_MAX = -47 @@ -373,121 +319,39 @@ CI_MIN = -1280 CI_MAX = 1280 - # IDLE frame / nope detection indicator - NOPE_IND = (1 << 7) - - # Specific message fields - rssi = None - toa256 = None - - # Version 0x01 specific (default values) - mod_type = Modulation.ModGMSK - nope_ind = False - - tsc_set = None - tsc = None - ci = None - - @property - def HDR_LEN(self): - ''' Calculate header length depending on its version. ''' - - # Common header length - length = self.CHDR_LEN - - # Message specific header length - if self.ver == 0x00: - # RSSI + ToA - length += 1 + 2 - elif self.ver == 0x01: - # RSSI + ToA + TS + C/I - length += 1 + 2 + 1 + 2 - else: - raise IndexError("Unhandled version %u" % self.ver) - - return length - - def _validate_burst_v0(self): - # Burst is mandatory - if self.burst is None: - raise ValueError("Rx burst bits are not set") - - # ... and can be either of GSM (GMSK) or EDGE (8-PSK) - if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN): - raise ValueError("Rx burst has odd length %u" % len(self.burst)) - - def _validate_burst_v1(self): - # Burst is omitted in case of an IDLE / NOPE indication - if self.nope_ind and self.burst is None: - return - - if self.nope_ind and self.burst is not None: - raise ValueError("NOPE.ind comes with burst?!?") - if self.burst is None: - raise ValueError("Rx burst bits are not set") - - # Burst length depends on modulation type - if len(self.burst) != self.mod_type.bl: - raise ValueError("Rx burst has odd length %u" % len(self.burst)) - - def validate_burst(self): - ''' Validate the burst (throws ValueError). ''' - - if self.ver == 0x00: - self._validate_burst_v0() - elif self.ver >= 0x01: - self._validate_burst_v1() - - def validate(self): + def validate(self) -> None: ''' Validate the message header fields (throws ValueError). ''' # Validate common fields Msg.validate(self) - if self.rssi is None: - raise ValueError("RSSI is not set") + if self.c['rssi'] < self.RSSI_MIN or self.c['rssi'] > self.RSSI_MAX: + raise ValueError("RSSI %d is out of range" % self.c['rssi']) - if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX: - raise ValueError("RSSI %d is out of range" % self.rssi) - - if self.toa256 is None: - raise ValueError("ToA256 is not set") - - if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX: - raise ValueError("ToA256 %d is out of range" % self.toa256) + if self.c['toa256'] < self.TOA256_MIN or self.c['toa256'] > self.TOA256_MAX: + raise ValueError("ToA256 %d is out of range" % self.c['toa256']) # Version specific parameters (omited for NOPE.ind) - if self.ver >= 0x01 and not self.nope_ind: - if type(self.mod_type) is not Modulation: + if self._ver >= 1 and not self.c['nope']: + if type(self.c['mod_type']) is not Modulation: raise ValueError("Unknown Rx modulation type") - if self.tsc_set is None: - raise ValueError("TSC set is not set") - - if self.mod_type is Modulation.ModGMSK: - if self.tsc_set not in range(0, 4): - raise ValueError("TSC set %d is out of range" % self.tsc_set) + if self.c['mod_type'] is Modulation.ModGMSK: + if self.c['tsc_set'] not in range(0, 4): + raise ValueError("TSC set %d is out of range" % self.c['tsc_set']) else: - if self.tsc_set not in range(0, 2): - raise ValueError("TSC set %d is out of range" % self.tsc_set) + if self.c['tsc_set'] not in range(0, 2): + raise ValueError("TSC set %d is out of range" % self.c['tsc_set']) - if self.tsc is None: - raise ValueError("TSC is not set") - - if self.tsc not in self.TSC_RANGE: - raise ValueError("TSC %d is out of range" % self.tsc) + if self.c['tsc'] not in self.TSC_RANGE: + raise ValueError("TSC %d is out of range" % self.c['tsc']) # Version specific parameters (also present in NOPE.ind) - if self.ver >= 0x01: - if self.ci is None: - raise ValueError("C/I is not set") + if self._ver >= 1: + if self.c['cir'] < self.CI_MIN or self.c['cir'] > self.CI_MAX: + raise ValueError("C/I %d is out of range" % self.c['cir']) - if self.ci < self.CI_MIN or self.ci > self.CI_MAX: - raise ValueError("C/I %d is out of range" % self.ci) - - self.validate_burst() - - def rand_rssi(self, min = None, max = None): + def rand_rssi(self, min = None, max = None) -> int: ''' Generate a random RSSI value. ''' if min is None: @@ -498,7 +362,7 @@ return random.randint(min, max) - def rand_toa256(self, min = None, max = None): + def rand_toa256(self, min = None, max = None) -> int: ''' Generate a random ToA (Time of Arrival) value. ''' if min is None: @@ -509,185 +373,88 @@ return random.randint(min, max) - def rand_hdr(self): + def rand_hdr(self, nope: bool = False) -> None: ''' Randomize message specific header. ''' Msg.rand_hdr(self) - self.rssi = self.rand_rssi() - self.toa256 = self.rand_toa256() + self.c['rssi'] = self.rand_rssi() + self.c['toa256'] = self.rand_toa256() + self.c['nope'] = nope - if self.ver >= 0x01: - self.mod_type = random.choice(list(Modulation)) - if self.mod_type is Modulation.ModGMSK: - self.tsc_set = random.randint(0, 3) + if self._ver >= 1 and not nope: + self.c['mod_type'] = random.choice(list(Modulation)) + if self.c['mod_type'] is Modulation.ModGMSK: + self.c['tsc_set'] = random.randint(0, 3) else: - self.tsc_set = random.randint(0, 1) - self.tsc = random.choice(self.TSC_RANGE) + self.c['tsc_set'] = random.randint(0, 1) + self.c['tsc'] = random.choice(self.TSC_RANGE) + if self._ver >= 1: # C/I: Carrier-to-Interference ratio - self.ci = random.randint(self.CI_MIN, self.CI_MAX) + self.c['cir'] = random.randint(self.CI_MIN, self.CI_MAX) - def desc_hdr(self): + def desc_hdr(self) -> str: ''' Generate human-readable header description. ''' # Describe the common part result = Msg.desc_hdr(self) - if self.rssi is not None: - result += ("rssi=%d " % self.rssi) + result += ("rssi=%d " % self.c['rssi']) + result += ("toa256=%d " % self.c['toa256']) - if self.toa256 is not None: - result += ("toa256=%d " % self.toa256) - - if self.ver >= 0x01: - if not self.nope_ind: - if self.mod_type is not None: - result += ("%s " % self.mod_type) - if self.tsc_set is not None: - result += ("set=%u " % self.tsc_set) - if self.tsc is not None: - result += ("tsc=%u " % self.tsc) - if self.ci is not None: - result += ("C/I=%d cB " % self.ci) + if self._ver >= 0x01: + if not self.c['nope']: + result += ("%s " % self.c['mod_type']) + result += ("set=%u " % self.c['tsc_set']) + result += ("tsc=%u " % self.c['tsc']) + result += ("C/I=%d cB " % self.c['cir']) else: result += "(IDLE / NOPE IND) " # Strip useless whitespace and return return result.strip() - def gen_mts(self): - ''' Encode Modulation and Training Sequence info. ''' - - # IDLE / nope indication has no MTS info - if self.nope_ind: - return self.NOPE_IND - - # TSC: . . . . . X X X - mts = self.tsc & 0b111 - - # MTS: . X X X X . . . - mts |= self.mod_type.coding << 3 - mts |= self.tsc_set << 3 - - return mts - - def parse_mts(self, mts): - ''' Parse Modulation and Training Sequence info. ''' - - # IDLE / nope indication has no MTS info - self.nope_ind = (mts & self.NOPE_IND) > 0 - if self.nope_ind: - self.mod_type = None - self.tsc_set = None - self.tsc = None - return - - # TSC: . . . . . X X X - self.tsc = mts & 0b111 - - # MTS: . X X X X . . . - mts = (mts >> 3) & 0b1111 - if (mts & 0b1100) > 0: - # Mask: . . . . M M M S - self.mod_type = Modulation.pick(mts & 0b1110) - self.tsc_set = mts & 0b1 - else: - # GMSK: . . . . 0 0 S S - self.mod_type = Modulation.ModGMSK - self.tsc_set = mts & 0b11 - - def gen_hdr(self): - ''' Generate message specific header part. ''' - - # Allocate an empty byte-array - buf = bytearray() - - # Put RSSI - buf.append(-self.rssi) - - # Encode ToA (Time of Arrival) - # Big endian, 2 bytes (int32_t) - buf += struct.pack(">h", self.toa256) - - if self.ver >= 0x01: - # Modulation and Training Sequence info - mts = self.gen_mts() - buf.append(mts) - - # C/I: Carrier-to-Interference ratio (in centiBels) - buf += struct.pack(">h", self.ci) - - return buf - - def parse_hdr(self, hdr): - ''' Parse message specific header part. ''' - - # Parse RSSI - self.rssi = -(hdr[5]) - - # Parse ToA (Time of Arrival) - self.toa256 = struct.unpack(">h", hdr[6:8])[0] - - if self.ver >= 0x01: - # Modulation and Training Sequence info - self.parse_mts(hdr[8]) - - # C/I: Carrier-to-Interference ratio (in centiBels) - self.ci = struct.unpack(">h", hdr[9:11])[0] - - def gen_burst(self): + def gen_burst(self) -> None: ''' Generate message specific burst. ''' # Convert soft-bits to unsigned soft-bits - burst_usbits = self.sbit2usbit(self.burst) + burst = self.sbit2usbit(self.c['burst']) + self.c['soft-bits'] = bytes(burst) - # Encode to bytes - return bytearray(burst_usbits) - - def _parse_burst_v0(self, burst): + def _parse_burst_v0(self, burst: List[int]) -> List[int]: ''' Parse message specific burst for header version 0. ''' bl = len(burst) # We need to guess modulation by the length of burst - self.mod_type = Modulation.pick_by_bl(bl) - if self.mod_type is None: + self.c['mod_type'] = Modulation.pick_by_bl(bl) + if self.c['mod_type'] is None: # Some old transceivers append two dummy bytes - self.mod_type = Modulation.pick_by_bl(bl - 2) + self.c['mod_type'] = Modulation.pick_by_bl(bl - 2) - if self.mod_type is None: + if self.c['mod_type'] is None: raise ValueError("Odd burst length %u" % bl) - return burst[:self.mod_type.bl] + return burst[:self.c['mod_type'].bl] - def parse_burst(self, burst): + def parse_burst(self) -> None: ''' Parse message specific burst. ''' - burst = list(burst) - - if self.ver == 0x00: + burst = list(self.c['soft-bits']) + if self._ver == 0: burst = self._parse_burst_v0(burst) # Convert unsigned soft-bits to soft-bits - self.burst = self.usbit2sbit(burst) + self.c['burst'] = self.usbit2sbit(burst) - def rand_burst(self, length = None): + def rand_burst(self, length = None) -> None: ''' Generate a random message specific burst. ''' if length is None: - length = self.mod_type.bl + length = self.c['mod_type'].bl - self.burst = [random.randint(-127, 127) for _ in range(length)] + self.c['burst'] = [random.randint(-127, 127) for _ in range(length)] - def trans(self, ver = None): - ''' Transform this message to TxMsg. ''' - - # Allocate a new message - msg = TxMsg(fn = self.fn, tn = self.tn, - ver = self.ver if ver is None else ver) - - # Convert burst bits - if self.burst is not None: - msg.burst = self.sbit2ubit(self.burst) - - return msg + def trans_burst(self) -> List[int]: + ''' Transform soft-bits into hard-bits. ''' + return self.sbit2ubit(self.c['burst']) diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py index 573527b..3226a0e 100755 --- a/src/target/trx_toolkit/fake_trx.py +++ b/src/target/trx_toolkit/fake_trx.py @@ -188,9 +188,9 @@ if self.burst_drop_amount == 0: return False - if msg.fn % self.burst_drop_period == 0: + if msg.c['fn'] % self.burst_drop_period == 0: log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)" - % (self, msg.fn, self.burst_drop_period)) + % (self, msg.c['fn'], self.burst_drop_period)) self.burst_drop_amount -= 1 return True @@ -198,64 +198,63 @@ def _handle_data_msg_v1(self, src_msg, msg): # C/I (Carrier-to-Interference ratio) - msg.ci = self.ci + msg.c['cir'] = self.ci # Pick modulation type by burst length - bl = len(src_msg.burst) - msg.mod_type = Modulation.pick_by_bl(bl) + bl = len(src_msg.c['burst']) + msg.c['mod_type'] = Modulation.pick_by_bl(bl) # Pick TSC (Training Sequence Code) and TSC set - if msg.mod_type is Modulation.ModGMSK: - ss = TrainingSeqGMSK.pick(src_msg.burst) - msg.tsc = ss.tsc if ss is not None else 0 - msg.tsc_set = ss.tsc_set if ss is not None else 0 + if msg.c['mod_type'] is Modulation.ModGMSK: + ss = TrainingSeqGMSK.pick(src_msg.c['burst']) + msg.c['tsc'] = ss.tsc if ss is not None else 0 + msg.c['tsc_set'] = ss.tsc_set if ss is not None else 0 else: # TODO: other modulation types (at least 8-PSK) - msg.tsc_set = 0 - msg.tsc = 0 + msg.c['tsc_set'] = 0 + msg.c['tsc'] = 0 # Takes (partially initialized) TRXD Rx message, # simulates RF path parameters (such as RSSI), # and sends towards the L1 def handle_data_msg(self, src_trx, src_msg, msg): if self.rf_muted: - msg.nope_ind = True - elif not msg.nope_ind: + msg.c['nope'] = True + elif not msg.c['nope']: # Path loss simulation - msg.nope_ind = self.sim_burst_drop(msg) - if msg.nope_ind: + msg.c['nope'] = self.sim_burst_drop(msg) + if msg.c['nope']: # Before TRXDv1, we simply drop the message - if msg.ver < 0x01: + if msg._ver < 1: del msg return # Since TRXDv1, we should send a NOPE.ind - del msg.burst # burst bits are omited - msg.burst = None + msg.c['burst'].clear() # burst bits are omited # TODO: shoud we make these values configurable? - msg.toa256 = self.TOA256_NOISE_DEFAULT - msg.rssi = self.RSSI_NOISE_DEFAULT - msg.ci = self.CI_NOISE_DEFAULT + msg.c['toa256'] = self.TOA256_NOISE_DEFAULT + msg.c['rssi'] = self.RSSI_NOISE_DEFAULT + msg.c['cir'] = self.CI_NOISE_DEFAULT self.data_if.send_msg(msg) return # Complete message header - msg.toa256 = self.toa256 + msg.c['toa256'] = self.toa256 # Apply RSSI based on transmitter: if not self.fake_rssi_enabled: - msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT + msg.c['rssi'] = src_trx.tx_power - src_msg.c['pwr'] - self.PATH_LOSS_DEFAULT else: # Apply fake RSSI - msg.rssi = self.rssi + msg.c['rssi'] = self.rssi # Version specific fields - if msg.ver >= 0x01: + if msg._ver >= 1: self._handle_data_msg_v1(src_msg, msg) # Apply optional Timing Advance if src_trx.ta != 0: - msg.toa256 -= src_trx.ta * 256 + msg.c['toa256'] -= src_trx.ta * 256 Transceiver.handle_data_msg(self, msg) diff --git a/src/target/trx_toolkit/test_data_dump.py b/src/target/trx_toolkit/test_data_dump.py index f7b4fde..2dad6b2 100644 --- a/src/target/trx_toolkit/test_data_dump.py +++ b/src/target/trx_toolkit/test_data_dump.py @@ -97,7 +97,7 @@ msg_ref = self._gen_rand_message(cls) self._ddf.append_msg(msg_ref) - msg = self._ddf.parse_msg(0) + msg = self._ddf.from_bytes(0) self._compare_msg(msg, msg_ref) # Store one Rx message in a file, read it back and compare @@ -120,7 +120,7 @@ self._compare_msg(msg_list[i], msg_list_ref[i]) # Verify random access to stored messages - def test_parse_msg_idx(self): + def test_from_bytes_idx(self): # Store a mixed list of random messages (19 + 19) msg_list_ref = self._gen_rand_message_mix(19) self._ddf.append_all(msg_list_ref) @@ -128,13 +128,13 @@ # Random access for _ in range(100): idx = random.randrange(len(msg_list_ref)) - msg = self._ddf.parse_msg(idx) + msg = self._ddf.from_bytes(idx) self._compare_msg(msg, msg_list_ref[idx]) def test_parse_empty(self): with self.assertLogs(level = 'ERROR'): for idx in range(100): - msg = self._ddf.parse_msg(idx) + msg = self._ddf.from_bytes(idx) self.assertEqual(msg, None) def test_parse_all_empty(self): @@ -148,7 +148,7 @@ self._tf.write(b'\xff' * 90) with self.assertLogs(level = 'ERROR'): - msg = self._ddf.parse_msg(0) + msg = self._ddf.from_bytes(0) self.assertEqual(msg, None) def test_parse_unknown_tag(self): @@ -158,7 +158,7 @@ self._tf.write(b'\xff' * 90) with self.assertLogs(level = 'ERROR'): - msg = self._ddf.parse_msg(0) + msg = self._ddf.from_bytes(0) self.assertEqual(msg, None) if __name__ == '__main__': diff --git a/src/target/trx_toolkit/test_data_msg.py b/src/target/trx_toolkit/test_data_msg.py index 24fda67..b703ed4 100644 --- a/src/target/trx_toolkit/test_data_msg.py +++ b/src/target/trx_toolkit/test_data_msg.py @@ -5,6 +5,7 @@ # Unit test for TRXD message codec # # (C) 2019 by Vadim Yanitskiy <axilirator at gmail.com> +# (C) 2021 by sysmocom - s.f.m.c. GmbH <info at sysmocom.de> # # All Rights Reserved # @@ -23,171 +24,33 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import unittest +import logging from data_msg import Msg, TxMsg, RxMsg class Msg_Test(unittest.TestCase): # Compare message a with message b - def _compare_msg(self, a, b): + def _compare_msg(self, a: Msg, b: Msg) -> None: # Make sure we're comparing messages of the same type self.assertEqual(a.__class__, b.__class__) - # Compare common header fields - self.assertEqual(a.ver, b.ver) - self.assertEqual(a.fn, b.fn) - self.assertEqual(a.tn, b.tn) + # PDU version and fields + self.assertEqual(a._ver, b._ver) + for field in a.__class__.DEF_CONT: + self.assertEqual(a[field], b[field]) - # Burst bits (if present) - self.assertEqual(a.burst, b.burst) - - # TxMsg specific fields - if isinstance(a, TxMsg): - self.assertEqual(a.pwr, b.pwr) - - # RxMsg specific fields - if isinstance(a, RxMsg): - # Version independent fields - self.assertEqual(a.toa256, b.toa256) - self.assertEqual(a.rssi, b.rssi) - - # Version specific fields - if a.ver >= 1: - self.assertEqual(a.nope_ind, b.nope_ind) - self.assertEqual(a.mod_type, b.mod_type) - self.assertEqual(a.tsc_set, b.tsc_set) - self.assertEqual(a.tsc, b.tsc) - self.assertEqual(a.ci, b.ci) - - # Make sure that message validation throws a ValueError - def test_validate(self): - # Unknown version - with self.assertRaises(ValueError): - msg = RxMsg(fn = 0, tn = 0, ver = 100) - msg.validate() - - # Uninitialized field - with self.assertRaises(ValueError): - msg = RxMsg() - msg.validate() - with self.assertRaises(ValueError): - msg = RxMsg(fn = None, tn = 0) - msg.validate() - - # Out-of-range value(s) - with self.assertRaises(ValueError): - msg = RxMsg(fn = -1, tn = 0) - msg.validate() - with self.assertRaises(ValueError): - msg = RxMsg(fn = 0, tn = 10) - msg.validate() - - # Validate header and burst randomization - def test_rand_hdr_burst(self): - tx_msg = TxMsg() - rx_msg = RxMsg() - - for i in range(100): - tx_msg.rand_burst() - rx_msg.rand_burst() - tx_msg.rand_hdr() - rx_msg.rand_hdr() - - tx_msg.validate() - rx_msg.validate() - - def _test_enc_dec(self, msg, legacy = False, nope_ind = False): - # Prepare a given message (randomize) - msg.rand_hdr() - - # NOPE.ind contains no burst - if not nope_ind: - msg.rand_burst() - else: - msg.nope_ind = True - msg.mod_type = None - msg.tsc_set = None - msg.tsc = None - - # Encode a given message to bytes - msg_enc = msg.gen_msg(legacy) - - # Decode a new message from bytes - msg_dec = msg.__class__() - msg_dec.parse_msg(msg_enc) - - # Compare decoded vs the original - self._compare_msg(msg, msg_dec) - - # Validate encoding and decoding - def test_enc_dec(self): + def test_enc_dec_default(self): + ''' Encode/decode/match test for default field values. ''' for ver in Msg.KNOWN_VERSIONS: - with self.subTest("TxMsg", ver = ver): - msg = TxMsg(ver = ver) - self._test_enc_dec(msg) + with self.subTest('TxMsg(ver=%u)' % ver): + a, b = TxMsg(ver), TxMsg(ver) + b.from_bytes(a.to_bytes()) + self._compare_msg(a, b) - with self.subTest("RxMsg", ver = ver): - msg = RxMsg(ver = ver) - self._test_enc_dec(msg) - - if ver >= 1: - with self.subTest("RxMsg NOPE.ind", ver = ver): - msg = RxMsg(ver = ver) - self._test_enc_dec(msg, nope_ind = True) - - with self.subTest("RxMsg (legacy transceiver)"): - msg = RxMsg(ver = 0) - self._test_enc_dec(msg, legacy = True) - - # Validate bit conversations - def test_bit_conv(self): - usbits_ref = list(range(0, 256)) - sbits_ref = list(range(-127, 128)) - - # Test both usbit2sbit() and sbit2usbit() - sbits = Msg.usbit2sbit(usbits_ref) - usbits = Msg.sbit2usbit(sbits) - self.assertEqual(usbits[:255], usbits_ref[:255]) - self.assertEqual(usbits[255], 254) - - # Test both sbit2ubit() and ubit2sbit() - ubits = Msg.sbit2ubit(sbits_ref) - self.assertEqual(ubits, ([1] * 127 + [0] * 128)) - - sbits = Msg.ubit2sbit(ubits) - self.assertEqual(sbits, ([-127] * 127 + [127] * 128)) - - def _test_transform(self, msg): - # Prepare given messages - msg.rand_hdr() - msg.rand_burst() - - # Perform message transformation - if isinstance(msg, TxMsg): - msg_trans = msg.trans() - else: - msg_trans = msg.trans() - - self.assertEqual(msg_trans.ver, msg.ver) - self.assertEqual(msg_trans.fn, msg.fn) - self.assertEqual(msg_trans.tn, msg.tn) - - if isinstance(msg, RxMsg): - burst = Msg.sbit2ubit(msg.burst) - self.assertEqual(msg_trans.burst, burst) - else: - burst = Msg.ubit2sbit(msg.burst) - self.assertEqual(msg_trans.burst, burst) - - # Validate message transformation - def test_transform(self): - for ver in Msg.KNOWN_VERSIONS: - with self.subTest("TxMsg", ver = ver): - msg = TxMsg(ver = ver) - self._test_transform(msg) - - with self.subTest("RxMsg", ver = ver): - msg = RxMsg(ver = ver) - self._test_transform(msg) + with self.subTest('RxMsg(ver=%u)' % ver): + a, b = RxMsg(ver), RxMsg(ver) + b.from_bytes(a.to_bytes()) + self._compare_msg(a, b) if __name__ == '__main__': unittest.main() diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py index d041070..54e6a6b 100644 --- a/src/target/trx_toolkit/transceiver.py +++ b/src/target/trx_toolkit/transceiver.py @@ -269,7 +269,7 @@ return None # Make sure that indicated timeslot is configured - if msg.tn not in self.ts_list: + if msg.c['tn'] not in self.ts_list: log.warning("(%s) RX TRXD message (%s), but timeslot is not " "configured => dropping..." % (self, msg.desc_hdr())) return None @@ -278,4 +278,4 @@ def handle_data_msg(self, msg): # TODO: make legacy mode configurable (via argv?) - self.data_if.send_msg(msg, legacy = True) + self.data_if.send_msg(msg) diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py index 8b6f80c..389a663 100755 --- a/src/target/trx_toolkit/trx_sniff.py +++ b/src/target/trx_toolkit/trx_sniff.py @@ -108,7 +108,7 @@ # Attempt to parse the payload as a DATA message try: - msg.parse_msg(msg_raw) + msg.from_bytes(msg_raw) msg.validate() except ValueError as e: desc = msg.desc_hdr() @@ -160,7 +160,7 @@ # Message type specific filtering if isinstance(msg, RxMsg): # NOPE.ind filter - if not self.argv.pf_nope_ind and msg.nope_ind: + if not self.argv.pf_nope_ind and msg.c['nope']: return False # RSSI filter -- To view, visit https://gerrit.osmocom.org/c/osmocom-bb/+/24019 To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings Gerrit-Project: osmocom-bb Gerrit-Branch: master Gerrit-Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc Gerrit-Change-Number: 24019 Gerrit-PatchSet: 1 Gerrit-Owner: fixeria <vyanitskiy at sysmocom.de> Gerrit-MessageType: newchange -------------- next part -------------- An HTML attachment was scrubbed... URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20210430/cdeba577/attachment.htm>