<p>fixeria has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.osmocom.org/c/osmocom-bb/+/24019">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py<br><br>Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc<br>Related: OS#4006, SYS#4895<br>---<br>M src/target/trx_toolkit/burst_fwd.py<br>M src/target/trx_toolkit/data_dump.py<br>M src/target/trx_toolkit/data_if.py<br>M src/target/trx_toolkit/data_msg.py<br>M src/target/trx_toolkit/fake_trx.py<br>M src/target/trx_toolkit/test_data_dump.py<br>M src/target/trx_toolkit/test_data_msg.py<br>M src/target/trx_toolkit/transceiver.py<br>M src/target/trx_toolkit/trx_sniff.py<br>9 files changed, 283 insertions(+), 672 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/19/24019/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py</span><br><span>index 2e9e97b..03ce6e6 100644</span><br><span>--- a/src/target/trx_toolkit/burst_fwd.py</span><br><span>+++ b/src/target/trx_toolkit/burst_fwd.py</span><br><span>@@ -3,7 +3,7 @@</span><br><span> # TRX Toolkit</span><br><span> # Burst forwarding between transceivers</span><br><span> #</span><br><span style="color: hsl(0, 100%, 40%);">-# (C) 2017-2020 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span style="color: hsl(120, 100%, 40%);">+# (C) 2017-2021 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span> # Contributions by sysmocom - s.f.m.c. GmbH</span><br><span> #</span><br><span> # All Rights Reserved</span><br><span>@@ -25,6 +25,7 @@</span><br><span> import logging as log</span><br><span> </span><br><span> from trx_list import TRXList</span><br><span style="color: hsl(120, 100%, 40%);">+from data_msg import RxMsg</span><br><span> </span><br><span> class BurstForwarder(TRXList):</span><br><span> """ Performs burst forwarding between transceivers.</span><br><span>@@ -48,11 +49,11 @@</span><br><span> def forward_msg(self, src_trx, rx_msg):</span><br><span> # Originating Transceiver may use frequency hopping,</span><br><span> # so let's precalculate its Tx frequency in advance</span><br><span style="color: hsl(0, 100%, 40%);">- tx_freq = src_trx.get_tx_freq(rx_msg.fn)</span><br><span style="color: hsl(120, 100%, 40%);">+ tx_freq = src_trx.get_tx_freq(rx_msg.c['fn'])</span><br><span> </span><br><span> if src_trx.rf_muted:</span><br><span style="color: hsl(0, 100%, 40%);">- del rx_msg.burst # burst bits are omited</span><br><span style="color: hsl(0, 100%, 40%);">- rx_msg.burst = None</span><br><span style="color: hsl(120, 100%, 40%);">+ # Burst bits are omited</span><br><span style="color: hsl(120, 100%, 40%);">+ rx_msg.c['burst'].clear()</span><br><span> </span><br><span> # Iterate over all known transceivers</span><br><span> for trx in self.trx_list:</span><br><span>@@ -62,13 +63,13 @@</span><br><span> # Check transceiver state</span><br><span> if not trx.running:</span><br><span> continue</span><br><span style="color: hsl(0, 100%, 40%);">- if rx_msg.tn not in trx.ts_list:</span><br><span style="color: hsl(120, 100%, 40%);">+ if rx_msg.c['tn'] not in trx.ts_list:</span><br><span> continue</span><br><span> </span><br><span> # Match Tx/Rx frequencies of the both transceivers</span><br><span style="color: hsl(0, 100%, 40%);">- if trx.get_rx_freq(rx_msg.fn) != tx_freq:</span><br><span style="color: hsl(120, 100%, 40%);">+ if trx.get_rx_freq(rx_msg.c['fn']) != tx_freq:</span><br><span> continue</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Transform from TxMsg to RxMsg and forward</span><br><span style="color: hsl(0, 100%, 40%);">- tx_msg = rx_msg.trans(ver = trx.data_if._hdr_ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ # Transform from L12TRX to TRX2L1 and forward</span><br><span style="color: hsl(120, 100%, 40%);">+ tx_msg = rx_msg.trans(RxMsg, trx.data_if._hdr_ver)</span><br><span> trx.handle_data_msg(src_trx, rx_msg, tx_msg)</span><br><span>diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py</span><br><span>index 8510e2d..fe8a2d8 100644</span><br><span>--- a/src/target/trx_toolkit/data_dump.py</span><br><span>+++ b/src/target/trx_toolkit/data_dump.py</span><br><span>@@ -44,7 +44,7 @@</span><br><span> raise ValueError("Unknown message type")</span><br><span> </span><br><span> # Generate a message payload</span><br><span style="color: hsl(0, 100%, 40%);">- msg_raw = msg.gen_msg()</span><br><span style="color: hsl(120, 100%, 40%);">+ msg_raw = msg.to_bytes()</span><br><span> </span><br><span> # Calculate and pack the message length</span><br><span> msg_len = len(msg_raw)</span><br><span>@@ -118,7 +118,7 @@</span><br><span> # a parsed message in case of success,</span><br><span> # or None in case of EOF or header parsing error,</span><br><span> # or False in case of message parsing error.</span><br><span style="color: hsl(0, 100%, 40%);">- def _parse_msg(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def _from_bytes(self):</span><br><span> # Attempt to read a message header</span><br><span> hdr_raw = self.f.read(self.HDR_LENGTH)</span><br><span> if len(hdr_raw) != self.HDR_LENGTH:</span><br><span>@@ -142,7 +142,7 @@</span><br><span> # Attempt to parse a message</span><br><span> try:</span><br><span> msg_raw = bytearray(msg_raw)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.parse_msg(msg_raw)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.from_bytes(msg_raw)</span><br><span> except:</span><br><span> log.error("Couldn't parse a message, skipping...")</span><br><span> return False</span><br><span>@@ -155,7 +155,7 @@</span><br><span> # a parsed message in case of success,</span><br><span> # or None in case of EOF, out of range, or header parsing error,</span><br><span> # or False in case of message parsing error.</span><br><span style="color: hsl(0, 100%, 40%);">- def parse_msg(self, idx):</span><br><span style="color: hsl(120, 100%, 40%);">+ def from_bytes(self, idx):</span><br><span> # Move descriptor to the beginning of requested message</span><br><span> rc = self._seek2msg(idx)</span><br><span> if not rc:</span><br><span>@@ -163,7 +163,7 @@</span><br><span> return None</span><br><span> </span><br><span> # Attempt to parse a message</span><br><span style="color: hsl(0, 100%, 40%);">- return self._parse_msg()</span><br><span style="color: hsl(120, 100%, 40%);">+ return self._from_bytes()</span><br><span> </span><br><span> # Parses all messages from a given file</span><br><span> # Return value:</span><br><span>@@ -185,7 +185,7 @@</span><br><span> # Read the capture in loop...</span><br><span> while True:</span><br><span> # Attempt to parse a message</span><br><span style="color: hsl(0, 100%, 40%);">- msg = self._parse_msg()</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = self._from_bytes()</span><br><span> </span><br><span> # EOF or broken header</span><br><span> if msg is None:</span><br><span>diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py</span><br><span>index 1cded9b..f754c6c 100644</span><br><span>--- a/src/target/trx_toolkit/data_if.py</span><br><span>+++ b/src/target/trx_toolkit/data_if.py</span><br><span>@@ -23,6 +23,8 @@</span><br><span> </span><br><span> import logging as log</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+from typing import Optional</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> from udp_link import UDPLink</span><br><span> from data_msg import *</span><br><span> </span><br><span>@@ -50,64 +52,43 @@</span><br><span> # No suitable version found</span><br><span> return -1</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def match_hdr_ver(self, msg):</span><br><span style="color: hsl(0, 100%, 40%);">- if msg.ver == self._hdr_ver:</span><br><span style="color: hsl(0, 100%, 40%);">- return True</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- log.error("(%s) Rx DATA message (%s) with unexpected header "</span><br><span style="color: hsl(0, 100%, 40%);">- "version %u (!= expected %u), ignoring..."</span><br><span style="color: hsl(0, 100%, 40%);">- % (self.desc_link(), msg.desc_hdr(),</span><br><span style="color: hsl(0, 100%, 40%);">- msg.ver, self._hdr_ver))</span><br><span style="color: hsl(0, 100%, 40%);">- return False</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def recv_raw_data(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def recv_raw_data(self) -> bytes:</span><br><span> data, _ = self.sock.recvfrom(512)</span><br><span> return data</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def recv_tx_msg(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def recv_tx_msg(self) -> Optional[TxMsg]:</span><br><span> # Read raw data from socket</span><br><span> data = self.recv_raw_data()</span><br><span> </span><br><span> # Attempt to parse a TRXD Tx message</span><br><span> try:</span><br><span style="color: hsl(0, 100%, 40%);">- msg = TxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">- msg.parse_msg(bytearray(data))</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = TxMsg(self._hdr_ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.from_bytes(data)</span><br><span> except:</span><br><span> log.error("Failed to parse a TRXD Tx message "</span><br><span> "from R:%s:%u" % (self.remote_addr, self.remote_port))</span><br><span> return None</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Make sure the header version matches</span><br><span style="color: hsl(0, 100%, 40%);">- # the configured one (self._hdr_ver)</span><br><span style="color: hsl(0, 100%, 40%);">- if not self.match_hdr_ver(msg):</span><br><span style="color: hsl(0, 100%, 40%);">- return None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span> return msg</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def recv_rx_msg(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def recv_rx_msg(self) -> Optional[RxMsg]:</span><br><span> # Read raw data from socket</span><br><span> data = self.recv_raw_data()</span><br><span> </span><br><span> # Attempt to parse a TRXD Rx message</span><br><span> try:</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">- msg.parse_msg(bytearray(data))</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = RxMsg(self._hdr_ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.from_bytes(data)</span><br><span> except:</span><br><span> log.error("Failed to parse a TRXD Rx message "</span><br><span> "from R:%s:%u" % (self.remote_addr, self.remote_port))</span><br><span> return None</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Make sure the header version matches</span><br><span style="color: hsl(0, 100%, 40%);">- # the configured one (self._hdr_ver)</span><br><span style="color: hsl(0, 100%, 40%);">- if not self.match_hdr_ver(msg):</span><br><span style="color: hsl(0, 100%, 40%);">- return None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span> return msg</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def send_msg(self, msg, legacy = False):</span><br><span style="color: hsl(120, 100%, 40%);">+ def send_msg(self, msg: RxMsg) -> None:</span><br><span> try:</span><br><span style="color: hsl(0, 100%, 40%);">- # Validate and encode a TRXD message</span><br><span style="color: hsl(0, 100%, 40%);">- payload = msg.gen_msg(legacy)</span><br><span style="color: hsl(120, 100%, 40%);">+ payload = msg.to_bytes()</span><br><span> except ValueError as e:</span><br><span> log.error("Failed to encode a TRXD message ('%s') "</span><br><span> "due to error: %s" % (msg.desc_hdr(), e))</span><br><span>diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py</span><br><span>index 7e785f9..570cfa0 100644</span><br><span>--- a/src/target/trx_toolkit/data_msg.py</span><br><span>+++ b/src/target/trx_toolkit/data_msg.py</span><br><span>@@ -3,7 +3,8 @@</span><br><span> # TRX Toolkit</span><br><span> # DATA interface message definitions and helpers</span><br><span> #</span><br><span style="color: hsl(0, 100%, 40%);">-# (C) 2018-2019 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span style="color: hsl(120, 100%, 40%);">+# (C) 2018-2021 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span style="color: hsl(120, 100%, 40%);">+# (C) 2021 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de></span><br><span> #</span><br><span> # All Rights Reserved</span><br><span> #</span><br><span>@@ -25,35 +26,39 @@</span><br><span> import struct</span><br><span> import abc</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-from typing import List</span><br><span style="color: hsl(120, 100%, 40%);">+from typing import Any, Type, List, Tuple, Dict</span><br><span> from enum import Enum</span><br><span> from gsm_shared import *</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+import trxd_proto</span><br><span style="color: hsl(120, 100%, 40%);">+import codec</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> class Modulation(Enum):</span><br><span> """ Modulation types defined in 3GPP TS 45.002 """</span><br><span style="color: hsl(0, 100%, 40%);">- ModGMSK = (0b0000, 1 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">- Mod8PSK = (0b0100, 3 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">- ModGMSK_AB = (0b0110, 1 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">- ModRFU = (0b0111, 0) # Reserved for Future Use</span><br><span style="color: hsl(0, 100%, 40%);">- Mod16QAM = (0b1000, 4 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">- Mod32QAM = (0b1010, 5 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">- ModAQPSK = (0b1100, 2 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+ ModGMSK = (0b0000, 0b1100, 1 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+ Mod8PSK = (0b0100, 0b1110, 3 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+ ModGMSK_AB = (0b0110, 0b1111, 1 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+ ModRFU = (0b0111, 0b1111, 0) # Reserved for Future Use</span><br><span style="color: hsl(120, 100%, 40%);">+ Mod16QAM = (0b1000, 0b1110, 4 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+ Mod32QAM = (0b1010, 0b1110, 5 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+ ModAQPSK = (0b1100, 0b1100, 2 * GMSK_BURST_LEN)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def __init__(self, coding, bl):</span><br><span style="color: hsl(120, 100%, 40%);">+ def __init__(self, val: int, mask: int, bl: int):</span><br><span> # Coding in TRXD header</span><br><span style="color: hsl(0, 100%, 40%);">- self.coding = coding</span><br><span style="color: hsl(120, 100%, 40%);">+ self.val = val</span><br><span style="color: hsl(120, 100%, 40%);">+ self.mask = mask</span><br><span> # Burst length</span><br><span> self.bl = bl</span><br><span> </span><br><span> @classmethod</span><br><span style="color: hsl(0, 100%, 40%);">- def pick(self, coding):</span><br><span style="color: hsl(120, 100%, 40%);">+ def pick(self, val: int):</span><br><span> for mod in list(self):</span><br><span style="color: hsl(0, 100%, 40%);">- if mod.coding == coding:</span><br><span style="color: hsl(120, 100%, 40%);">+ if (val & mod.mask) == mod.val:</span><br><span> return mod</span><br><span> return None</span><br><span> </span><br><span> @classmethod</span><br><span style="color: hsl(0, 100%, 40%);">- def pick_by_bl(self, bl):</span><br><span style="color: hsl(120, 100%, 40%);">+ def pick_by_bl(self, bl: int):</span><br><span> for mod in list(self):</span><br><span> if mod.bl == bl:</span><br><span> return mod</span><br><span>@@ -66,66 +71,87 @@</span><br><span> CHDR_VERSION_MAX = 0b1111</span><br><span> KNOWN_VERSIONS = (0, 1)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def __init__(self, fn = None, tn = None, burst = None, ver = 0):</span><br><span style="color: hsl(0, 100%, 40%);">- self.burst = burst</span><br><span style="color: hsl(0, 100%, 40%);">- self.ver = ver</span><br><span style="color: hsl(0, 100%, 40%);">- self.fn = fn</span><br><span style="color: hsl(0, 100%, 40%);">- self.tn = tn</span><br><span style="color: hsl(120, 100%, 40%);">+ # PDU codecs for all known versions</span><br><span style="color: hsl(120, 100%, 40%);">+ CODECS = NotImplemented # type: Tuple[codec.Envelope, ...]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- @property</span><br><span style="color: hsl(0, 100%, 40%);">- def CHDR_LEN(self):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' The common header length. '''</span><br><span style="color: hsl(0, 100%, 40%);">- return 1 + 4 # (VER + TN) + FN</span><br><span style="color: hsl(120, 100%, 40%);">+ # Default PDU content for child types</span><br><span style="color: hsl(120, 100%, 40%);">+ DEF_CONT = NotImplemented # type: Dict[str, Any]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ def __init__(self, ver: int, cont: Dict[str, Any] = { }):</span><br><span style="color: hsl(120, 100%, 40%);">+ # TRXD PDU version</span><br><span style="color: hsl(120, 100%, 40%);">+ if not ver in self.KNOWN_VERSIONS:</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("Unknown TRXD PDU version %d" % ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ self._ver = ver</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ # TRXD PDU codec</span><br><span style="color: hsl(120, 100%, 40%);">+ self.codec = self.CODECS[ver]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ # Content of the message</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c = {</span><br><span style="color: hsl(120, 100%, 40%);">+ # Default TDMA frame/timeslot number</span><br><span style="color: hsl(120, 100%, 40%);">+ 'fn' : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ 'tn' : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ # NOPE / IDLE frame indication</span><br><span style="color: hsl(120, 100%, 40%);">+ 'nope' : False,</span><br><span style="color: hsl(120, 100%, 40%);">+ # Modulation type and TSC info</span><br><span style="color: hsl(120, 100%, 40%);">+ 'mod_type' : Modulation.ModGMSK,</span><br><span style="color: hsl(120, 100%, 40%);">+ 'tsc_set' : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ 'tsc' : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ # Burst hard-/soft-bits</span><br><span style="color: hsl(120, 100%, 40%);">+ 'burst' : [],</span><br><span style="color: hsl(120, 100%, 40%);">+ # Default fields for particular class</span><br><span style="color: hsl(120, 100%, 40%);">+ **self.DEF_CONT,</span><br><span style="color: hsl(120, 100%, 40%);">+ # Fields provided during instantiation</span><br><span style="color: hsl(120, 100%, 40%);">+ **cont</span><br><span style="color: hsl(120, 100%, 40%);">+ } # type: Dict[str, Any]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ def __getitem__(self, key: str) -> Any:</span><br><span style="color: hsl(120, 100%, 40%);">+ return self.c[key]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ def __setitem__(self, key: str, val: Any) -> None:</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c[key] = val</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ def __delitem__(self, key: str) -> None:</span><br><span style="color: hsl(120, 100%, 40%);">+ del self.c[key]</span><br><span> </span><br><span> @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">- def gen_hdr(self):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Generate message specific header. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">- def parse_hdr(self, hdr):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Parse message specific header. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">- def gen_burst(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def gen_burst(self) -> None:</span><br><span> ''' Generate message specific burst. '''</span><br><span> </span><br><span> @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">- def parse_burst(self, burst):</span><br><span style="color: hsl(120, 100%, 40%);">+ def parse_burst(self) -> None:</span><br><span> ''' Parse message specific burst. '''</span><br><span> </span><br><span> @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">- def rand_burst(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_burst(self) -> None:</span><br><span> ''' Generate a random message specific burst. '''</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def rand_fn(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ @abc.abstractmethod</span><br><span style="color: hsl(120, 100%, 40%);">+ def trans_burst(self) -> List[int]:</span><br><span style="color: hsl(120, 100%, 40%);">+ ''' Convert between hard-bits and soft-bits. '''</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_fn(self) -> int:</span><br><span> ''' Generate a random frame number. '''</span><br><span> return random.randint(0, GSM_HYPERFRAME)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def rand_tn(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_tn(self) -> int:</span><br><span> ''' Generate a random timeslot number. '''</span><br><span> return random.randint(0, 7)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def rand_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_hdr(self) -> None:</span><br><span> ''' Randomize the message header. '''</span><br><span style="color: hsl(0, 100%, 40%);">- self.fn = self.rand_fn()</span><br><span style="color: hsl(0, 100%, 40%);">- self.tn = self.rand_tn()</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['fn'] = self.rand_fn()</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['tn'] = self.rand_tn()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def desc_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def desc_hdr(self) -> str:</span><br><span> ''' Generate human-readable header description. '''</span><br><span> </span><br><span> result = ""</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver > 0:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("ver=%u " % self.ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.fn is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("fn=%u " % self.fn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.tn is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("tn=%u " % self.tn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.burst is not None and len(self.burst) > 0:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("bl=%u " % len(self.burst))</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("ver=%u " % self._ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("fn=%u tn=%u " % (self.c['fn'], self.c['tn']))</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['burst']:</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("bl=%u " % len(self.c['burst']))</span><br><span> </span><br><span> return result</span><br><span> </span><br><span>@@ -149,131 +175,76 @@</span><br><span> ''' Convert bits {1..0} to soft-bits {-127..127}. '''</span><br><span> return [-127 if b else 127 for b in bits]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def validate(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def validate(self) -> None:</span><br><span> ''' Validate the message fields (throws ValueError). '''</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if not self.ver in self.KNOWN_VERSIONS:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Unknown TRXD header version %d" % self.ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['fn'] < 0 or self.c['fn'] > GSM_HYPERFRAME:</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("TDMA frame-number %d is out of range" % self.c['fn'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.fn is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TDMA frame-number is not set")</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['tn'] < 0 or self.c['tn'] > 7:</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("TDMA time-slot %d is out of range" % self.c['tn'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.fn < 0 or self.fn > GSM_HYPERFRAME:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TDMA frame-number %d is out of range" % self.fn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.tn is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TDMA time-slot is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.tn < 0 or self.tn > 7:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TDMA time-slot %d is out of range" % self.tn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def gen_msg(self, legacy = False):</span><br><span style="color: hsl(120, 100%, 40%);">+ def to_bytes(self) -> bytes:</span><br><span> ''' Generate a TRX DATA message. '''</span><br><span style="color: hsl(120, 100%, 40%);">+ if not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+ self.gen_burst()</span><br><span style="color: hsl(120, 100%, 40%);">+ return self.codec._to_bytes(self.c)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Validate all the fields</span><br><span style="color: hsl(0, 100%, 40%);">- self.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Allocate an empty byte-array</span><br><span style="color: hsl(0, 100%, 40%);">- buf = bytearray()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Put version (4 bits) and TDMA TN (3 bits)</span><br><span style="color: hsl(0, 100%, 40%);">- buf.append((self.ver << 4) | (self.tn & 0x07))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Put TDMA FN (4 octets, BE)</span><br><span style="color: hsl(0, 100%, 40%);">- buf += struct.pack(">L", self.fn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Generate message specific header part</span><br><span style="color: hsl(0, 100%, 40%);">- hdr = self.gen_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">- buf += hdr</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Generate burst</span><br><span style="color: hsl(0, 100%, 40%);">- if self.burst is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- buf += self.gen_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # This is a rudiment from (legacy) OpenBTS transceiver,</span><br><span style="color: hsl(0, 100%, 40%);">- # some L1 implementations still expect two dummy bytes.</span><br><span style="color: hsl(0, 100%, 40%);">- if legacy and self.ver == 0x00:</span><br><span style="color: hsl(0, 100%, 40%);">- buf += bytearray(2)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- return buf</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def parse_msg(self, msg):</span><br><span style="color: hsl(120, 100%, 40%);">+ def from_bytes(self, msg: bytes) -> None:</span><br><span> ''' Parse a TRX DATA message. '''</span><br><span style="color: hsl(120, 100%, 40%);">+ self.codec._from_bytes(self.c, msg)</span><br><span style="color: hsl(120, 100%, 40%);">+ if not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+ self.parse_burst()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Make sure we have at least common header</span><br><span style="color: hsl(0, 100%, 40%);">- if len(msg) < self.CHDR_LEN:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Message is to short: missing common header")</span><br><span style="color: hsl(120, 100%, 40%);">+ def trans(self, cls: Type['Msg'], ver: int) -> 'Msg':</span><br><span style="color: hsl(120, 100%, 40%);">+ ''' Transform between L12TRX and TRX2L1. '''</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Parse the header version first</span><br><span style="color: hsl(0, 100%, 40%);">- self.ver = (msg[0] >> 4)</span><br><span style="color: hsl(0, 100%, 40%);">- if not self.ver in self.KNOWN_VERSIONS:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Unknown TRXD header version %d" % self.ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ # Allocate a new message</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = cls(ver, {</span><br><span style="color: hsl(120, 100%, 40%);">+ 'fn' : self.c['fn'],</span><br><span style="color: hsl(120, 100%, 40%);">+ 'tn' : self.c['tn'],</span><br><span style="color: hsl(120, 100%, 40%);">+ })</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Parse TDMA TN and FN</span><br><span style="color: hsl(0, 100%, 40%);">- self.tn = (msg[0] & 0x07)</span><br><span style="color: hsl(0, 100%, 40%);">- self.fn = struct.unpack(">L", msg[1:5])[0]</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Make sure we have the whole header,</span><br><span style="color: hsl(0, 100%, 40%);">- # including the version specific fields</span><br><span style="color: hsl(0, 100%, 40%);">- if len(msg) < self.HDR_LEN:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Message is to short: missing version specific header")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Specific message part</span><br><span style="color: hsl(0, 100%, 40%);">- self.parse_hdr(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Copy burst, skipping header</span><br><span style="color: hsl(0, 100%, 40%);">- msg_burst = msg[self.HDR_LEN:]</span><br><span style="color: hsl(0, 100%, 40%);">- if len(msg_burst) > 0:</span><br><span style="color: hsl(0, 100%, 40%);">- self.parse_burst(msg_burst)</span><br><span style="color: hsl(120, 100%, 40%);">+ # Convert burst bits</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['burst'] and not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['burst'] = self.trans_burst()</span><br><span> else:</span><br><span style="color: hsl(0, 100%, 40%);">- self.burst = None</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['nope'] = True</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ return msg</span><br><span> </span><br><span> class TxMsg(Msg):</span><br><span> ''' Tx (L1 -> TRX) message coding API. '''</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ # PDU codecs for all known versions</span><br><span style="color: hsl(120, 100%, 40%);">+ CODECS = (</span><br><span style="color: hsl(120, 100%, 40%);">+ trxd_proto.PDUv0Tx(),</span><br><span style="color: hsl(120, 100%, 40%);">+ trxd_proto.PDUv1Tx(),</span><br><span style="color: hsl(120, 100%, 40%);">+ )</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ DEF_CONT = {</span><br><span style="color: hsl(120, 100%, 40%);">+ # Power reduction (in dB)</span><br><span style="color: hsl(120, 100%, 40%);">+ 'pwr' : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> # Constants</span><br><span> PWR_MIN = 0x00</span><br><span> PWR_MAX = 0xff</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Specific message fields</span><br><span style="color: hsl(0, 100%, 40%);">- pwr = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- @property</span><br><span style="color: hsl(0, 100%, 40%);">- def HDR_LEN(self):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Calculate header length depending on its version. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Common header length</span><br><span style="color: hsl(0, 100%, 40%);">- length = self.CHDR_LEN</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Message specific header length</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver in (0x00, 0x01):</span><br><span style="color: hsl(0, 100%, 40%);">- length += 1 # PWR</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- raise IndexError("Unhandled version %u" % self.ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- return length</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def validate(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def validate(self) -> None:</span><br><span> ''' Validate the message fields (throws ValueError). '''</span><br><span> </span><br><span> # Validate common fields</span><br><span> Msg.validate(self)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.pwr is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Tx Attenuation level is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Tx Attenuation %d is out of range" % self.pwr)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['pwr'] < self.PWR_MIN or self.c['pwr'] > self.PWR_MAX:</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("Tx Attenuation %d is out of range" % self.c['pwr'])</span><br><span> </span><br><span> # FIXME: properly handle IDLE / NOPE indications</span><br><span style="color: hsl(0, 100%, 40%);">- if self.burst is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Tx burst bits are not set")</span><br><span style="color: hsl(120, 100%, 40%);">+ if len(self.c['burst']) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("Tx burst has odd length %u" % len(self.c['burst']))</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # FIXME: properly handle IDLE / NOPE indications</span><br><span style="color: hsl(0, 100%, 40%);">- if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Tx burst has odd length %u" % len(self.burst))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def rand_pwr(self, min = None, max = None):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_pwr(self, min = None, max = None) -> int:</span><br><span> ''' Generate a random power level. '''</span><br><span> </span><br><span> if min is None:</span><br><span>@@ -284,80 +255,55 @@</span><br><span> </span><br><span> return random.randint(min, max)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def rand_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_hdr(self) -> None:</span><br><span> ''' Randomize message specific header. '''</span><br><span> </span><br><span> Msg.rand_hdr(self)</span><br><span style="color: hsl(0, 100%, 40%);">- self.pwr = self.rand_pwr()</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['pwr'] = self.rand_pwr()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def desc_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def desc_hdr(self) -> str:</span><br><span> ''' Generate human-readable header description. '''</span><br><span> </span><br><span> # Describe the common part</span><br><span> result = Msg.desc_hdr(self)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.pwr is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("pwr=%u " % self.pwr)</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("pwr=%u " % self.c['pwr'])</span><br><span> </span><br><span> # Strip useless whitespace and return</span><br><span> return result.strip()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def gen_hdr(self):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Generate message specific header part. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Allocate an empty byte-array</span><br><span style="color: hsl(0, 100%, 40%);">- buf = bytearray()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Put power</span><br><span style="color: hsl(0, 100%, 40%);">- buf.append(self.pwr)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- return buf</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def parse_hdr(self, hdr):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Parse message specific header part. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Parse power level</span><br><span style="color: hsl(0, 100%, 40%);">- self.pwr = hdr[5]</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def gen_burst(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def gen_burst(self) -> None:</span><br><span> ''' Generate message specific burst. '''</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['hard-bits'] = bytes(self.c['burst'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Copy burst 'as is'</span><br><span style="color: hsl(0, 100%, 40%);">- return bytearray(self.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def parse_burst(self, burst):</span><br><span style="color: hsl(120, 100%, 40%);">+ def parse_burst(self) -> None:</span><br><span> ''' Parse message specific burst. '''</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['burst'] = list(self.c['hard-bits'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- length = len(burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Distinguish between GSM and EDGE</span><br><span style="color: hsl(0, 100%, 40%);">- if length >= EDGE_BURST_LEN:</span><br><span style="color: hsl(0, 100%, 40%);">- self.burst = list(burst[:EDGE_BURST_LEN])</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- self.burst = list(burst[:GMSK_BURST_LEN])</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def rand_burst(self, length = GMSK_BURST_LEN):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_burst(self, length = GMSK_BURST_LEN) -> None:</span><br><span> ''' Generate a random message specific burst. '''</span><br><span style="color: hsl(0, 100%, 40%);">- self.burst = [random.randint(0, 1) for _ in range(length)]</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['burst'] = [random.randint(0, 1) for _ in range(length)]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def trans(self, ver = None):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Transform this message into RxMsg. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Allocate a new message</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(fn = self.fn, tn = self.tn,</span><br><span style="color: hsl(0, 100%, 40%);">- ver = self.ver if ver is None else ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Convert burst bits</span><br><span style="color: hsl(0, 100%, 40%);">- if self.burst is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.burst = self.ubit2sbit(self.burst)</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.nope_ind = True</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- return msg</span><br><span style="color: hsl(120, 100%, 40%);">+ def trans_burst(self) -> List[int]:</span><br><span style="color: hsl(120, 100%, 40%);">+ ''' Transform hard-bits into soft-bits. '''</span><br><span style="color: hsl(120, 100%, 40%);">+ return self.ubit2sbit(self.c['burst'])</span><br><span> </span><br><span> class RxMsg(Msg):</span><br><span> ''' Rx (TRX -> L1) message coding API. '''</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ # PDU codecs for all known versions</span><br><span style="color: hsl(120, 100%, 40%);">+ CODECS = (</span><br><span style="color: hsl(120, 100%, 40%);">+ trxd_proto.PDUv0Rx(),</span><br><span style="color: hsl(120, 100%, 40%);">+ trxd_proto.PDUv1Rx(),</span><br><span style="color: hsl(120, 100%, 40%);">+ )</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+ DEF_CONT = {</span><br><span style="color: hsl(120, 100%, 40%);">+ # Specific message fields</span><br><span style="color: hsl(120, 100%, 40%);">+ 'rssi' : -50,</span><br><span style="color: hsl(120, 100%, 40%);">+ 'toa256' : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ 'cir' : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+ }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise</span><br><span> RSSI_MIN = -120</span><br><span> RSSI_MAX = -47</span><br><span>@@ -373,121 +319,39 @@</span><br><span> CI_MIN = -1280</span><br><span> CI_MAX = 1280</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # IDLE frame / nope detection indicator</span><br><span style="color: hsl(0, 100%, 40%);">- NOPE_IND = (1 << 7)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Specific message fields</span><br><span style="color: hsl(0, 100%, 40%);">- rssi = None</span><br><span style="color: hsl(0, 100%, 40%);">- toa256 = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Version 0x01 specific (default values)</span><br><span style="color: hsl(0, 100%, 40%);">- mod_type = Modulation.ModGMSK</span><br><span style="color: hsl(0, 100%, 40%);">- nope_ind = False</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- tsc_set = None</span><br><span style="color: hsl(0, 100%, 40%);">- tsc = None</span><br><span style="color: hsl(0, 100%, 40%);">- ci = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- @property</span><br><span style="color: hsl(0, 100%, 40%);">- def HDR_LEN(self):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Calculate header length depending on its version. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Common header length</span><br><span style="color: hsl(0, 100%, 40%);">- length = self.CHDR_LEN</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Message specific header length</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver == 0x00:</span><br><span style="color: hsl(0, 100%, 40%);">- # RSSI + ToA</span><br><span style="color: hsl(0, 100%, 40%);">- length += 1 + 2</span><br><span style="color: hsl(0, 100%, 40%);">- elif self.ver == 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">- # RSSI + ToA + TS + C/I</span><br><span style="color: hsl(0, 100%, 40%);">- length += 1 + 2 + 1 + 2</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- raise IndexError("Unhandled version %u" % self.ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- return length</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def _validate_burst_v0(self):</span><br><span style="color: hsl(0, 100%, 40%);">- # Burst is mandatory</span><br><span style="color: hsl(0, 100%, 40%);">- if self.burst is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Rx burst bits are not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # ... and can be either of GSM (GMSK) or EDGE (8-PSK)</span><br><span style="color: hsl(0, 100%, 40%);">- if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Rx burst has odd length %u" % len(self.burst))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def _validate_burst_v1(self):</span><br><span style="color: hsl(0, 100%, 40%);">- # Burst is omitted in case of an IDLE / NOPE indication</span><br><span style="color: hsl(0, 100%, 40%);">- if self.nope_ind and self.burst is None:</span><br><span style="color: hsl(0, 100%, 40%);">- return</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.nope_ind and self.burst is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("NOPE.ind comes with burst?!?")</span><br><span style="color: hsl(0, 100%, 40%);">- if self.burst is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Rx burst bits are not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Burst length depends on modulation type</span><br><span style="color: hsl(0, 100%, 40%);">- if len(self.burst) != self.mod_type.bl:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("Rx burst has odd length %u" % len(self.burst))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def validate_burst(self):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Validate the burst (throws ValueError). '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver == 0x00:</span><br><span style="color: hsl(0, 100%, 40%);">- self._validate_burst_v0()</span><br><span style="color: hsl(0, 100%, 40%);">- elif self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">- self._validate_burst_v1()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def validate(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def validate(self) -> None:</span><br><span> ''' Validate the message header fields (throws ValueError). '''</span><br><span> </span><br><span> # Validate common fields</span><br><span> Msg.validate(self)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.rssi is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("RSSI is not set")</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['rssi'] < self.RSSI_MIN or self.c['rssi'] > self.RSSI_MAX:</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("RSSI %d is out of range" % self.c['rssi'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("RSSI %d is out of range" % self.rssi)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.toa256 is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("ToA256 is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("ToA256 %d is out of range" % self.toa256)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['toa256'] < self.TOA256_MIN or self.c['toa256'] > self.TOA256_MAX:</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("ToA256 %d is out of range" % self.c['toa256'])</span><br><span> </span><br><span> # Version specific parameters (omited for NOPE.ind)</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver >= 0x01 and not self.nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">- if type(self.mod_type) is not Modulation:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self._ver >= 1 and not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+ if type(self.c['mod_type']) is not Modulation:</span><br><span> raise ValueError("Unknown Rx modulation type")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.tsc_set is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TSC set is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.mod_type is Modulation.ModGMSK:</span><br><span style="color: hsl(0, 100%, 40%);">- if self.tsc_set not in range(0, 4):</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TSC set %d is out of range" % self.tsc_set)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['mod_type'] is Modulation.ModGMSK:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['tsc_set'] not in range(0, 4):</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("TSC set %d is out of range" % self.c['tsc_set'])</span><br><span> else:</span><br><span style="color: hsl(0, 100%, 40%);">- if self.tsc_set not in range(0, 2):</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TSC set %d is out of range" % self.tsc_set)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['tsc_set'] not in range(0, 2):</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("TSC set %d is out of range" % self.c['tsc_set'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.tsc is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TSC is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.tsc not in self.TSC_RANGE:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("TSC %d is out of range" % self.tsc)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['tsc'] not in self.TSC_RANGE:</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("TSC %d is out of range" % self.c['tsc'])</span><br><span> </span><br><span> # Version specific parameters (also present in NOPE.ind)</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ci is None:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("C/I is not set")</span><br><span style="color: hsl(120, 100%, 40%);">+ if self._ver >= 1:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['cir'] < self.CI_MIN or self.c['cir'] > self.CI_MAX:</span><br><span style="color: hsl(120, 100%, 40%);">+ raise ValueError("C/I %d is out of range" % self.c['cir'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.ci < self.CI_MIN or self.ci > self.CI_MAX:</span><br><span style="color: hsl(0, 100%, 40%);">- raise ValueError("C/I %d is out of range" % self.ci)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- self.validate_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def rand_rssi(self, min = None, max = None):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_rssi(self, min = None, max = None) -> int:</span><br><span> ''' Generate a random RSSI value. '''</span><br><span> </span><br><span> if min is None:</span><br><span>@@ -498,7 +362,7 @@</span><br><span> </span><br><span> return random.randint(min, max)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def rand_toa256(self, min = None, max = None):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_toa256(self, min = None, max = None) -> int:</span><br><span> ''' Generate a random ToA (Time of Arrival) value. '''</span><br><span> </span><br><span> if min is None:</span><br><span>@@ -509,185 +373,88 @@</span><br><span> </span><br><span> return random.randint(min, max)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def rand_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_hdr(self, nope: bool = False) -> None:</span><br><span> ''' Randomize message specific header. '''</span><br><span> </span><br><span> Msg.rand_hdr(self)</span><br><span style="color: hsl(0, 100%, 40%);">- self.rssi = self.rand_rssi()</span><br><span style="color: hsl(0, 100%, 40%);">- self.toa256 = self.rand_toa256()</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['rssi'] = self.rand_rssi()</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['toa256'] = self.rand_toa256()</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['nope'] = nope</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">- self.mod_type = random.choice(list(Modulation))</span><br><span style="color: hsl(0, 100%, 40%);">- if self.mod_type is Modulation.ModGMSK:</span><br><span style="color: hsl(0, 100%, 40%);">- self.tsc_set = random.randint(0, 3)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self._ver >= 1 and not nope:</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['mod_type'] = random.choice(list(Modulation))</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['mod_type'] is Modulation.ModGMSK:</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['tsc_set'] = random.randint(0, 3)</span><br><span> else:</span><br><span style="color: hsl(0, 100%, 40%);">- self.tsc_set = random.randint(0, 1)</span><br><span style="color: hsl(0, 100%, 40%);">- self.tsc = random.choice(self.TSC_RANGE)</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['tsc_set'] = random.randint(0, 1)</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['tsc'] = random.choice(self.TSC_RANGE)</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+ if self._ver >= 1:</span><br><span> # C/I: Carrier-to-Interference ratio</span><br><span style="color: hsl(0, 100%, 40%);">- self.ci = random.randint(self.CI_MIN, self.CI_MAX)</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['cir'] = random.randint(self.CI_MIN, self.CI_MAX)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def desc_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def desc_hdr(self) -> str:</span><br><span> ''' Generate human-readable header description. '''</span><br><span> </span><br><span> # Describe the common part</span><br><span> result = Msg.desc_hdr(self)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.rssi is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("rssi=%d " % self.rssi)</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("rssi=%d " % self.c['rssi'])</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("toa256=%d " % self.c['toa256'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.toa256 is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("toa256=%d " % self.toa256)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">- if not self.nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">- if self.mod_type is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("%s " % self.mod_type)</span><br><span style="color: hsl(0, 100%, 40%);">- if self.tsc_set is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("set=%u " % self.tsc_set)</span><br><span style="color: hsl(0, 100%, 40%);">- if self.tsc is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("tsc=%u " % self.tsc)</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ci is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- result += ("C/I=%d cB " % self.ci)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self._ver >= 0x01:</span><br><span style="color: hsl(120, 100%, 40%);">+ if not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("%s " % self.c['mod_type'])</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("set=%u " % self.c['tsc_set'])</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("tsc=%u " % self.c['tsc'])</span><br><span style="color: hsl(120, 100%, 40%);">+ result += ("C/I=%d cB " % self.c['cir'])</span><br><span> else:</span><br><span> result += "(IDLE / NOPE IND) "</span><br><span> </span><br><span> # Strip useless whitespace and return</span><br><span> return result.strip()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def gen_mts(self):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Encode Modulation and Training Sequence info. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # IDLE / nope indication has no MTS info</span><br><span style="color: hsl(0, 100%, 40%);">- if self.nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">- return self.NOPE_IND</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # TSC: . . . . . X X X</span><br><span style="color: hsl(0, 100%, 40%);">- mts = self.tsc & 0b111</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # MTS: . X X X X . . .</span><br><span style="color: hsl(0, 100%, 40%);">- mts |= self.mod_type.coding << 3</span><br><span style="color: hsl(0, 100%, 40%);">- mts |= self.tsc_set << 3</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- return mts</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def parse_mts(self, mts):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Parse Modulation and Training Sequence info. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # IDLE / nope indication has no MTS info</span><br><span style="color: hsl(0, 100%, 40%);">- self.nope_ind = (mts & self.NOPE_IND) > 0</span><br><span style="color: hsl(0, 100%, 40%);">- if self.nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">- self.mod_type = None</span><br><span style="color: hsl(0, 100%, 40%);">- self.tsc_set = None</span><br><span style="color: hsl(0, 100%, 40%);">- self.tsc = None</span><br><span style="color: hsl(0, 100%, 40%);">- return</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # TSC: . . . . . X X X</span><br><span style="color: hsl(0, 100%, 40%);">- self.tsc = mts & 0b111</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # MTS: . X X X X . . .</span><br><span style="color: hsl(0, 100%, 40%);">- mts = (mts >> 3) & 0b1111</span><br><span style="color: hsl(0, 100%, 40%);">- if (mts & 0b1100) > 0:</span><br><span style="color: hsl(0, 100%, 40%);">- # Mask: . . . . M M M S</span><br><span style="color: hsl(0, 100%, 40%);">- self.mod_type = Modulation.pick(mts & 0b1110)</span><br><span style="color: hsl(0, 100%, 40%);">- self.tsc_set = mts & 0b1</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- # GMSK: . . . . 0 0 S S</span><br><span style="color: hsl(0, 100%, 40%);">- self.mod_type = Modulation.ModGMSK</span><br><span style="color: hsl(0, 100%, 40%);">- self.tsc_set = mts & 0b11</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def gen_hdr(self):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Generate message specific header part. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Allocate an empty byte-array</span><br><span style="color: hsl(0, 100%, 40%);">- buf = bytearray()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Put RSSI</span><br><span style="color: hsl(0, 100%, 40%);">- buf.append(-self.rssi)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Encode ToA (Time of Arrival)</span><br><span style="color: hsl(0, 100%, 40%);">- # Big endian, 2 bytes (int32_t)</span><br><span style="color: hsl(0, 100%, 40%);">- buf += struct.pack(">h", self.toa256)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">- # Modulation and Training Sequence info</span><br><span style="color: hsl(0, 100%, 40%);">- mts = self.gen_mts()</span><br><span style="color: hsl(0, 100%, 40%);">- buf.append(mts)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # C/I: Carrier-to-Interference ratio (in centiBels)</span><br><span style="color: hsl(0, 100%, 40%);">- buf += struct.pack(">h", self.ci)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- return buf</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def parse_hdr(self, hdr):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Parse message specific header part. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Parse RSSI</span><br><span style="color: hsl(0, 100%, 40%);">- self.rssi = -(hdr[5])</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Parse ToA (Time of Arrival)</span><br><span style="color: hsl(0, 100%, 40%);">- self.toa256 = struct.unpack(">h", hdr[6:8])[0]</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">- # Modulation and Training Sequence info</span><br><span style="color: hsl(0, 100%, 40%);">- self.parse_mts(hdr[8])</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # C/I: Carrier-to-Interference ratio (in centiBels)</span><br><span style="color: hsl(0, 100%, 40%);">- self.ci = struct.unpack(">h", hdr[9:11])[0]</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def gen_burst(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def gen_burst(self) -> None:</span><br><span> ''' Generate message specific burst. '''</span><br><span> </span><br><span> # Convert soft-bits to unsigned soft-bits</span><br><span style="color: hsl(0, 100%, 40%);">- burst_usbits = self.sbit2usbit(self.burst)</span><br><span style="color: hsl(120, 100%, 40%);">+ burst = self.sbit2usbit(self.c['burst'])</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['soft-bits'] = bytes(burst)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Encode to bytes</span><br><span style="color: hsl(0, 100%, 40%);">- return bytearray(burst_usbits)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def _parse_burst_v0(self, burst):</span><br><span style="color: hsl(120, 100%, 40%);">+ def _parse_burst_v0(self, burst: List[int]) -> List[int]:</span><br><span> ''' Parse message specific burst for header version 0. '''</span><br><span> </span><br><span> bl = len(burst)</span><br><span> </span><br><span> # We need to guess modulation by the length of burst</span><br><span style="color: hsl(0, 100%, 40%);">- self.mod_type = Modulation.pick_by_bl(bl)</span><br><span style="color: hsl(0, 100%, 40%);">- if self.mod_type is None:</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['mod_type'] = Modulation.pick_by_bl(bl)</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['mod_type'] is None:</span><br><span> # Some old transceivers append two dummy bytes</span><br><span style="color: hsl(0, 100%, 40%);">- self.mod_type = Modulation.pick_by_bl(bl - 2)</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['mod_type'] = Modulation.pick_by_bl(bl - 2)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if self.mod_type is None:</span><br><span style="color: hsl(120, 100%, 40%);">+ if self.c['mod_type'] is None:</span><br><span> raise ValueError("Odd burst length %u" % bl)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- return burst[:self.mod_type.bl]</span><br><span style="color: hsl(120, 100%, 40%);">+ return burst[:self.c['mod_type'].bl]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def parse_burst(self, burst):</span><br><span style="color: hsl(120, 100%, 40%);">+ def parse_burst(self) -> None:</span><br><span> ''' Parse message specific burst. '''</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- burst = list(burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if self.ver == 0x00:</span><br><span style="color: hsl(120, 100%, 40%);">+ burst = list(self.c['soft-bits'])</span><br><span style="color: hsl(120, 100%, 40%);">+ if self._ver == 0:</span><br><span> burst = self._parse_burst_v0(burst)</span><br><span> </span><br><span> # Convert unsigned soft-bits to soft-bits</span><br><span style="color: hsl(0, 100%, 40%);">- self.burst = self.usbit2sbit(burst)</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['burst'] = self.usbit2sbit(burst)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def rand_burst(self, length = None):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_burst(self, length = None) -> None:</span><br><span> ''' Generate a random message specific burst. '''</span><br><span> </span><br><span> if length is None:</span><br><span style="color: hsl(0, 100%, 40%);">- length = self.mod_type.bl</span><br><span style="color: hsl(120, 100%, 40%);">+ length = self.c['mod_type'].bl</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- self.burst = [random.randint(-127, 127) for _ in range(length)]</span><br><span style="color: hsl(120, 100%, 40%);">+ self.c['burst'] = [random.randint(-127, 127) for _ in range(length)]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def trans(self, ver = None):</span><br><span style="color: hsl(0, 100%, 40%);">- ''' Transform this message to TxMsg. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Allocate a new message</span><br><span style="color: hsl(0, 100%, 40%);">- msg = TxMsg(fn = self.fn, tn = self.tn,</span><br><span style="color: hsl(0, 100%, 40%);">- ver = self.ver if ver is None else ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Convert burst bits</span><br><span style="color: hsl(0, 100%, 40%);">- if self.burst is not None:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.burst = self.sbit2ubit(self.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- return msg</span><br><span style="color: hsl(120, 100%, 40%);">+ def trans_burst(self) -> List[int]:</span><br><span style="color: hsl(120, 100%, 40%);">+ ''' Transform soft-bits into hard-bits. '''</span><br><span style="color: hsl(120, 100%, 40%);">+ return self.sbit2ubit(self.c['burst'])</span><br><span>diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py</span><br><span>index 573527b..3226a0e 100755</span><br><span>--- a/src/target/trx_toolkit/fake_trx.py</span><br><span>+++ b/src/target/trx_toolkit/fake_trx.py</span><br><span>@@ -188,9 +188,9 @@</span><br><span> if self.burst_drop_amount == 0:</span><br><span> return False</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- if msg.fn % self.burst_drop_period == 0:</span><br><span style="color: hsl(120, 100%, 40%);">+ if msg.c['fn'] % self.burst_drop_period == 0:</span><br><span> log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"</span><br><span style="color: hsl(0, 100%, 40%);">- % (self, msg.fn, self.burst_drop_period))</span><br><span style="color: hsl(120, 100%, 40%);">+ % (self, msg.c['fn'], self.burst_drop_period))</span><br><span> self.burst_drop_amount -= 1</span><br><span> return True</span><br><span> </span><br><span>@@ -198,64 +198,63 @@</span><br><span> </span><br><span> def _handle_data_msg_v1(self, src_msg, msg):</span><br><span> # C/I (Carrier-to-Interference ratio)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.ci = self.ci</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['cir'] = self.ci</span><br><span> </span><br><span> # Pick modulation type by burst length</span><br><span style="color: hsl(0, 100%, 40%);">- bl = len(src_msg.burst)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.mod_type = Modulation.pick_by_bl(bl)</span><br><span style="color: hsl(120, 100%, 40%);">+ bl = len(src_msg.c['burst'])</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['mod_type'] = Modulation.pick_by_bl(bl)</span><br><span> </span><br><span> # Pick TSC (Training Sequence Code) and TSC set</span><br><span style="color: hsl(0, 100%, 40%);">- if msg.mod_type is Modulation.ModGMSK:</span><br><span style="color: hsl(0, 100%, 40%);">- ss = TrainingSeqGMSK.pick(src_msg.burst)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.tsc = ss.tsc if ss is not None else 0</span><br><span style="color: hsl(0, 100%, 40%);">- msg.tsc_set = ss.tsc_set if ss is not None else 0</span><br><span style="color: hsl(120, 100%, 40%);">+ if msg.c['mod_type'] is Modulation.ModGMSK:</span><br><span style="color: hsl(120, 100%, 40%);">+ ss = TrainingSeqGMSK.pick(src_msg.c['burst'])</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['tsc'] = ss.tsc if ss is not None else 0</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['tsc_set'] = ss.tsc_set if ss is not None else 0</span><br><span> else: # TODO: other modulation types (at least 8-PSK)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.tsc_set = 0</span><br><span style="color: hsl(0, 100%, 40%);">- msg.tsc = 0</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['tsc_set'] = 0</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['tsc'] = 0</span><br><span> </span><br><span> # Takes (partially initialized) TRXD Rx message,</span><br><span> # simulates RF path parameters (such as RSSI),</span><br><span> # and sends towards the L1</span><br><span> def handle_data_msg(self, src_trx, src_msg, msg):</span><br><span> if self.rf_muted:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.nope_ind = True</span><br><span style="color: hsl(0, 100%, 40%);">- elif not msg.nope_ind:</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['nope'] = True</span><br><span style="color: hsl(120, 100%, 40%);">+ elif not msg.c['nope']:</span><br><span> # Path loss simulation</span><br><span style="color: hsl(0, 100%, 40%);">- msg.nope_ind = self.sim_burst_drop(msg)</span><br><span style="color: hsl(0, 100%, 40%);">- if msg.nope_ind:</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['nope'] = self.sim_burst_drop(msg)</span><br><span style="color: hsl(120, 100%, 40%);">+ if msg.c['nope']:</span><br><span> # Before TRXDv1, we simply drop the message</span><br><span style="color: hsl(0, 100%, 40%);">- if msg.ver < 0x01:</span><br><span style="color: hsl(120, 100%, 40%);">+ if msg._ver < 1:</span><br><span> del msg</span><br><span> return</span><br><span> </span><br><span> # Since TRXDv1, we should send a NOPE.ind</span><br><span style="color: hsl(0, 100%, 40%);">- del msg.burst # burst bits are omited</span><br><span style="color: hsl(0, 100%, 40%);">- msg.burst = None</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['burst'].clear() # burst bits are omited</span><br><span> </span><br><span> # TODO: shoud we make these values configurable?</span><br><span style="color: hsl(0, 100%, 40%);">- msg.toa256 = self.TOA256_NOISE_DEFAULT</span><br><span style="color: hsl(0, 100%, 40%);">- msg.rssi = self.RSSI_NOISE_DEFAULT</span><br><span style="color: hsl(0, 100%, 40%);">- msg.ci = self.CI_NOISE_DEFAULT</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['toa256'] = self.TOA256_NOISE_DEFAULT</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['rssi'] = self.RSSI_NOISE_DEFAULT</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['cir'] = self.CI_NOISE_DEFAULT</span><br><span> </span><br><span> self.data_if.send_msg(msg)</span><br><span> return</span><br><span> </span><br><span> # Complete message header</span><br><span style="color: hsl(0, 100%, 40%);">- msg.toa256 = self.toa256</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['toa256'] = self.toa256</span><br><span> </span><br><span> # Apply RSSI based on transmitter:</span><br><span> if not self.fake_rssi_enabled:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['rssi'] = src_trx.tx_power - src_msg.c['pwr'] - self.PATH_LOSS_DEFAULT</span><br><span> else: # Apply fake RSSI</span><br><span style="color: hsl(0, 100%, 40%);">- msg.rssi = self.rssi</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['rssi'] = self.rssi</span><br><span> </span><br><span> # Version specific fields</span><br><span style="color: hsl(0, 100%, 40%);">- if msg.ver >= 0x01:</span><br><span style="color: hsl(120, 100%, 40%);">+ if msg._ver >= 1:</span><br><span> self._handle_data_msg_v1(src_msg, msg)</span><br><span> </span><br><span> # Apply optional Timing Advance</span><br><span> if src_trx.ta != 0:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.toa256 -= src_trx.ta * 256</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.c['toa256'] -= src_trx.ta * 256</span><br><span> </span><br><span> Transceiver.handle_data_msg(self, msg)</span><br><span> </span><br><span>diff --git a/src/target/trx_toolkit/test_data_dump.py b/src/target/trx_toolkit/test_data_dump.py</span><br><span>index f7b4fde..2dad6b2 100644</span><br><span>--- a/src/target/trx_toolkit/test_data_dump.py</span><br><span>+++ b/src/target/trx_toolkit/test_data_dump.py</span><br><span>@@ -97,7 +97,7 @@</span><br><span> msg_ref = self._gen_rand_message(cls)</span><br><span> self._ddf.append_msg(msg_ref)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- msg = self._ddf.parse_msg(0)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = self._ddf.from_bytes(0)</span><br><span> self._compare_msg(msg, msg_ref)</span><br><span> </span><br><span> # Store one Rx message in a file, read it back and compare</span><br><span>@@ -120,7 +120,7 @@</span><br><span> self._compare_msg(msg_list[i], msg_list_ref[i])</span><br><span> </span><br><span> # Verify random access to stored messages</span><br><span style="color: hsl(0, 100%, 40%);">- def test_parse_msg_idx(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def test_from_bytes_idx(self):</span><br><span> # Store a mixed list of random messages (19 + 19)</span><br><span> msg_list_ref = self._gen_rand_message_mix(19)</span><br><span> self._ddf.append_all(msg_list_ref)</span><br><span>@@ -128,13 +128,13 @@</span><br><span> # Random access</span><br><span> for _ in range(100):</span><br><span> idx = random.randrange(len(msg_list_ref))</span><br><span style="color: hsl(0, 100%, 40%);">- msg = self._ddf.parse_msg(idx)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = self._ddf.from_bytes(idx)</span><br><span> self._compare_msg(msg, msg_list_ref[idx])</span><br><span> </span><br><span> def test_parse_empty(self):</span><br><span> with self.assertLogs(level = 'ERROR'):</span><br><span> for idx in range(100):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = self._ddf.parse_msg(idx)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = self._ddf.from_bytes(idx)</span><br><span> self.assertEqual(msg, None)</span><br><span> </span><br><span> def test_parse_all_empty(self):</span><br><span>@@ -148,7 +148,7 @@</span><br><span> self._tf.write(b'\xff' * 90)</span><br><span> </span><br><span> with self.assertLogs(level = 'ERROR'):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = self._ddf.parse_msg(0)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = self._ddf.from_bytes(0)</span><br><span> self.assertEqual(msg, None)</span><br><span> </span><br><span> def test_parse_unknown_tag(self):</span><br><span>@@ -158,7 +158,7 @@</span><br><span> self._tf.write(b'\xff' * 90)</span><br><span> </span><br><span> with self.assertLogs(level = 'ERROR'):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = self._ddf.parse_msg(0)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg = self._ddf.from_bytes(0)</span><br><span> self.assertEqual(msg, None)</span><br><span> </span><br><span> if __name__ == '__main__':</span><br><span>diff --git a/src/target/trx_toolkit/test_data_msg.py b/src/target/trx_toolkit/test_data_msg.py</span><br><span>index 24fda67..b703ed4 100644</span><br><span>--- a/src/target/trx_toolkit/test_data_msg.py</span><br><span>+++ b/src/target/trx_toolkit/test_data_msg.py</span><br><span>@@ -5,6 +5,7 @@</span><br><span> # Unit test for TRXD message codec</span><br><span> #</span><br><span> # (C) 2019 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span style="color: hsl(120, 100%, 40%);">+# (C) 2021 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de></span><br><span> #</span><br><span> # All Rights Reserved</span><br><span> #</span><br><span>@@ -23,171 +24,33 @@</span><br><span> # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.</span><br><span> </span><br><span> import unittest</span><br><span style="color: hsl(120, 100%, 40%);">+import logging</span><br><span> </span><br><span> from data_msg import Msg, TxMsg, RxMsg</span><br><span> </span><br><span> class Msg_Test(unittest.TestCase):</span><br><span> # Compare message a with message b</span><br><span style="color: hsl(0, 100%, 40%);">- def _compare_msg(self, a, b):</span><br><span style="color: hsl(120, 100%, 40%);">+ def _compare_msg(self, a: Msg, b: Msg) -> None:</span><br><span> # Make sure we're comparing messages of the same type</span><br><span> self.assertEqual(a.__class__, b.__class__)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Compare common header fields</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.ver, b.ver)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.fn, b.fn)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.tn, b.tn)</span><br><span style="color: hsl(120, 100%, 40%);">+ # PDU version and fields</span><br><span style="color: hsl(120, 100%, 40%);">+ self.assertEqual(a._ver, b._ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ for field in a.__class__.DEF_CONT:</span><br><span style="color: hsl(120, 100%, 40%);">+ self.assertEqual(a[field], b[field])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- # Burst bits (if present)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.burst, b.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # TxMsg specific fields</span><br><span style="color: hsl(0, 100%, 40%);">- if isinstance(a, TxMsg):</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.pwr, b.pwr)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # RxMsg specific fields</span><br><span style="color: hsl(0, 100%, 40%);">- if isinstance(a, RxMsg):</span><br><span style="color: hsl(0, 100%, 40%);">- # Version independent fields</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.toa256, b.toa256)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.rssi, b.rssi)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Version specific fields</span><br><span style="color: hsl(0, 100%, 40%);">- if a.ver >= 1:</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.nope_ind, b.nope_ind)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.mod_type, b.mod_type)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.tsc_set, b.tsc_set)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.tsc, b.tsc)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(a.ci, b.ci)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Make sure that message validation throws a ValueError</span><br><span style="color: hsl(0, 100%, 40%);">- def test_validate(self):</span><br><span style="color: hsl(0, 100%, 40%);">- # Unknown version</span><br><span style="color: hsl(0, 100%, 40%);">- with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(fn = 0, tn = 0, ver = 100)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Uninitialized field</span><br><span style="color: hsl(0, 100%, 40%);">- with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">- msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">- with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(fn = None, tn = 0)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Out-of-range value(s)</span><br><span style="color: hsl(0, 100%, 40%);">- with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(fn = -1, tn = 0)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">- with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(fn = 0, tn = 10)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Validate header and burst randomization</span><br><span style="color: hsl(0, 100%, 40%);">- def test_rand_hdr_burst(self):</span><br><span style="color: hsl(0, 100%, 40%);">- tx_msg = TxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">- rx_msg = RxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- for i in range(100):</span><br><span style="color: hsl(0, 100%, 40%);">- tx_msg.rand_burst()</span><br><span style="color: hsl(0, 100%, 40%);">- rx_msg.rand_burst()</span><br><span style="color: hsl(0, 100%, 40%);">- tx_msg.rand_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">- rx_msg.rand_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- tx_msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">- rx_msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def _test_enc_dec(self, msg, legacy = False, nope_ind = False):</span><br><span style="color: hsl(0, 100%, 40%);">- # Prepare a given message (randomize)</span><br><span style="color: hsl(0, 100%, 40%);">- msg.rand_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # NOPE.ind contains no burst</span><br><span style="color: hsl(0, 100%, 40%);">- if not nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.rand_burst()</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.nope_ind = True</span><br><span style="color: hsl(0, 100%, 40%);">- msg.mod_type = None</span><br><span style="color: hsl(0, 100%, 40%);">- msg.tsc_set = None</span><br><span style="color: hsl(0, 100%, 40%);">- msg.tsc = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Encode a given message to bytes</span><br><span style="color: hsl(0, 100%, 40%);">- msg_enc = msg.gen_msg(legacy)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Decode a new message from bytes</span><br><span style="color: hsl(0, 100%, 40%);">- msg_dec = msg.__class__()</span><br><span style="color: hsl(0, 100%, 40%);">- msg_dec.parse_msg(msg_enc)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Compare decoded vs the original</span><br><span style="color: hsl(0, 100%, 40%);">- self._compare_msg(msg, msg_dec)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Validate encoding and decoding</span><br><span style="color: hsl(0, 100%, 40%);">- def test_enc_dec(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def test_enc_dec_default(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ ''' Encode/decode/match test for default field values. '''</span><br><span> for ver in Msg.KNOWN_VERSIONS:</span><br><span style="color: hsl(0, 100%, 40%);">- with self.subTest("TxMsg", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = TxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">- self._test_enc_dec(msg)</span><br><span style="color: hsl(120, 100%, 40%);">+ with self.subTest('TxMsg(ver=%u)' % ver):</span><br><span style="color: hsl(120, 100%, 40%);">+ a, b = TxMsg(ver), TxMsg(ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ b.from_bytes(a.to_bytes())</span><br><span style="color: hsl(120, 100%, 40%);">+ self._compare_msg(a, b)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- with self.subTest("RxMsg", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">- self._test_enc_dec(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if ver >= 1:</span><br><span style="color: hsl(0, 100%, 40%);">- with self.subTest("RxMsg NOPE.ind", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">- self._test_enc_dec(msg, nope_ind = True)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- with self.subTest("RxMsg (legacy transceiver)"):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(ver = 0)</span><br><span style="color: hsl(0, 100%, 40%);">- self._test_enc_dec(msg, legacy = True)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Validate bit conversations</span><br><span style="color: hsl(0, 100%, 40%);">- def test_bit_conv(self):</span><br><span style="color: hsl(0, 100%, 40%);">- usbits_ref = list(range(0, 256))</span><br><span style="color: hsl(0, 100%, 40%);">- sbits_ref = list(range(-127, 128))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Test both usbit2sbit() and sbit2usbit()</span><br><span style="color: hsl(0, 100%, 40%);">- sbits = Msg.usbit2sbit(usbits_ref)</span><br><span style="color: hsl(0, 100%, 40%);">- usbits = Msg.sbit2usbit(sbits)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(usbits[:255], usbits_ref[:255])</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(usbits[255], 254)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Test both sbit2ubit() and ubit2sbit()</span><br><span style="color: hsl(0, 100%, 40%);">- ubits = Msg.sbit2ubit(sbits_ref)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(ubits, ([1] * 127 + [0] * 128))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- sbits = Msg.ubit2sbit(ubits)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(sbits, ([-127] * 127 + [127] * 128))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- def _test_transform(self, msg):</span><br><span style="color: hsl(0, 100%, 40%);">- # Prepare given messages</span><br><span style="color: hsl(0, 100%, 40%);">- msg.rand_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">- msg.rand_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Perform message transformation</span><br><span style="color: hsl(0, 100%, 40%);">- if isinstance(msg, TxMsg):</span><br><span style="color: hsl(0, 100%, 40%);">- msg_trans = msg.trans()</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- msg_trans = msg.trans()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(msg_trans.ver, msg.ver)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(msg_trans.fn, msg.fn)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(msg_trans.tn, msg.tn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- if isinstance(msg, RxMsg):</span><br><span style="color: hsl(0, 100%, 40%);">- burst = Msg.sbit2ubit(msg.burst)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(msg_trans.burst, burst)</span><br><span style="color: hsl(0, 100%, 40%);">- else:</span><br><span style="color: hsl(0, 100%, 40%);">- burst = Msg.ubit2sbit(msg.burst)</span><br><span style="color: hsl(0, 100%, 40%);">- self.assertEqual(msg_trans.burst, burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Validate message transformation</span><br><span style="color: hsl(0, 100%, 40%);">- def test_transform(self):</span><br><span style="color: hsl(0, 100%, 40%);">- for ver in Msg.KNOWN_VERSIONS:</span><br><span style="color: hsl(0, 100%, 40%);">- with self.subTest("TxMsg", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = TxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">- self._test_transform(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- with self.subTest("RxMsg", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">- msg = RxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">- self._test_transform(msg)</span><br><span style="color: hsl(120, 100%, 40%);">+ with self.subTest('RxMsg(ver=%u)' % ver):</span><br><span style="color: hsl(120, 100%, 40%);">+ a, b = RxMsg(ver), RxMsg(ver)</span><br><span style="color: hsl(120, 100%, 40%);">+ b.from_bytes(a.to_bytes())</span><br><span style="color: hsl(120, 100%, 40%);">+ self._compare_msg(a, b)</span><br><span> </span><br><span> if __name__ == '__main__':</span><br><span> unittest.main()</span><br><span>diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py</span><br><span>index d041070..54e6a6b 100644</span><br><span>--- a/src/target/trx_toolkit/transceiver.py</span><br><span>+++ b/src/target/trx_toolkit/transceiver.py</span><br><span>@@ -269,7 +269,7 @@</span><br><span> return None</span><br><span> </span><br><span> # Make sure that indicated timeslot is configured</span><br><span style="color: hsl(0, 100%, 40%);">- if msg.tn not in self.ts_list:</span><br><span style="color: hsl(120, 100%, 40%);">+ if msg.c['tn'] not in self.ts_list:</span><br><span> log.warning("(%s) RX TRXD message (%s), but timeslot is not "</span><br><span> "configured => dropping..." % (self, msg.desc_hdr()))</span><br><span> return None</span><br><span>@@ -278,4 +278,4 @@</span><br><span> </span><br><span> def handle_data_msg(self, msg):</span><br><span> # TODO: make legacy mode configurable (via argv?)</span><br><span style="color: hsl(0, 100%, 40%);">- self.data_if.send_msg(msg, legacy = True)</span><br><span style="color: hsl(120, 100%, 40%);">+ self.data_if.send_msg(msg)</span><br><span>diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py</span><br><span>index 8b6f80c..389a663 100755</span><br><span>--- a/src/target/trx_toolkit/trx_sniff.py</span><br><span>+++ b/src/target/trx_toolkit/trx_sniff.py</span><br><span>@@ -108,7 +108,7 @@</span><br><span> </span><br><span> # Attempt to parse the payload as a DATA message</span><br><span> try:</span><br><span style="color: hsl(0, 100%, 40%);">- msg.parse_msg(msg_raw)</span><br><span style="color: hsl(120, 100%, 40%);">+ msg.from_bytes(msg_raw)</span><br><span> msg.validate()</span><br><span> except ValueError as e:</span><br><span> desc = msg.desc_hdr()</span><br><span>@@ -160,7 +160,7 @@</span><br><span> # Message type specific filtering</span><br><span> if isinstance(msg, RxMsg):</span><br><span> # NOPE.ind filter</span><br><span style="color: hsl(0, 100%, 40%);">- if not self.argv.pf_nope_ind and msg.nope_ind:</span><br><span style="color: hsl(120, 100%, 40%);">+ if not self.argv.pf_nope_ind and msg.c['nope']:</span><br><span> return False</span><br><span> </span><br><span> # RSSI filter</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.osmocom.org/c/osmocom-bb/+/24019">change 24019</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.osmocom.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.osmocom.org/c/osmocom-bb/+/24019"/><meta itemprop="name" content="View Change"/></div></div>
<div style="display:none"> Gerrit-Project: osmocom-bb </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc </div>
<div style="display:none"> Gerrit-Change-Number: 24019 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: fixeria <vyanitskiy@sysmocom.de> </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>