<p>fixeria has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.osmocom.org/c/osmocom-bb/+/24019">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py<br><br>Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc<br>Related: OS#4006, SYS#4895<br>---<br>M src/target/trx_toolkit/burst_fwd.py<br>M src/target/trx_toolkit/data_dump.py<br>M src/target/trx_toolkit/data_if.py<br>M src/target/trx_toolkit/data_msg.py<br>M src/target/trx_toolkit/fake_trx.py<br>M src/target/trx_toolkit/test_data_dump.py<br>M src/target/trx_toolkit/test_data_msg.py<br>M src/target/trx_toolkit/transceiver.py<br>M src/target/trx_toolkit/trx_sniff.py<br>9 files changed, 283 insertions(+), 672 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/19/24019/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py</span><br><span>index 2e9e97b..03ce6e6 100644</span><br><span>--- a/src/target/trx_toolkit/burst_fwd.py</span><br><span>+++ b/src/target/trx_toolkit/burst_fwd.py</span><br><span>@@ -3,7 +3,7 @@</span><br><span> # TRX Toolkit</span><br><span> # Burst forwarding between transceivers</span><br><span> #</span><br><span style="color: hsl(0, 100%, 40%);">-# (C) 2017-2020 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span style="color: hsl(120, 100%, 40%);">+# (C) 2017-2021 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span> # Contributions by sysmocom - s.f.m.c. GmbH</span><br><span> #</span><br><span> # All Rights Reserved</span><br><span>@@ -25,6 +25,7 @@</span><br><span> import logging as log</span><br><span> </span><br><span> from trx_list import TRXList</span><br><span style="color: hsl(120, 100%, 40%);">+from data_msg import RxMsg</span><br><span> </span><br><span> class BurstForwarder(TRXList):</span><br><span>       """ Performs burst forwarding between transceivers.</span><br><span>@@ -48,11 +49,11 @@</span><br><span>     def forward_msg(self, src_trx, rx_msg):</span><br><span>              # Originating Transceiver may use frequency hopping,</span><br><span>                 # so let's precalculate its Tx frequency in advance</span><br><span style="color: hsl(0, 100%, 40%);">-         tx_freq = src_trx.get_tx_freq(rx_msg.fn)</span><br><span style="color: hsl(120, 100%, 40%);">+              tx_freq = src_trx.get_tx_freq(rx_msg.c['fn'])</span><br><span> </span><br><span>            if src_trx.rf_muted:</span><br><span style="color: hsl(0, 100%, 40%);">-                    del rx_msg.burst # burst bits are omited</span><br><span style="color: hsl(0, 100%, 40%);">-                        rx_msg.burst = None</span><br><span style="color: hsl(120, 100%, 40%);">+                   # Burst bits are omited</span><br><span style="color: hsl(120, 100%, 40%);">+                       rx_msg.c['burst'].clear()</span><br><span> </span><br><span>                # Iterate over all known transceivers</span><br><span>                for trx in self.trx_list:</span><br><span>@@ -62,13 +63,13 @@</span><br><span>                      # Check transceiver state</span><br><span>                    if not trx.running:</span><br><span>                          continue</span><br><span style="color: hsl(0, 100%, 40%);">-                        if rx_msg.tn not in trx.ts_list:</span><br><span style="color: hsl(120, 100%, 40%);">+                      if rx_msg.c['tn'] not in trx.ts_list:</span><br><span>                                continue</span><br><span> </span><br><span>                         # Match Tx/Rx frequencies of the both transceivers</span><br><span style="color: hsl(0, 100%, 40%);">-                      if trx.get_rx_freq(rx_msg.fn) != tx_freq:</span><br><span style="color: hsl(120, 100%, 40%);">+                     if trx.get_rx_freq(rx_msg.c['fn']) != tx_freq:</span><br><span>                               continue</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                    # Transform from TxMsg to RxMsg and forward</span><br><span style="color: hsl(0, 100%, 40%);">-                     tx_msg = rx_msg.trans(ver = trx.data_if._hdr_ver)</span><br><span style="color: hsl(120, 100%, 40%);">+                     # Transform from L12TRX to TRX2L1 and forward</span><br><span style="color: hsl(120, 100%, 40%);">+                 tx_msg = rx_msg.trans(RxMsg, trx.data_if._hdr_ver)</span><br><span>                   trx.handle_data_msg(src_trx, rx_msg, tx_msg)</span><br><span>diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py</span><br><span>index 8510e2d..fe8a2d8 100644</span><br><span>--- a/src/target/trx_toolkit/data_dump.py</span><br><span>+++ b/src/target/trx_toolkit/data_dump.py</span><br><span>@@ -44,7 +44,7 @@</span><br><span>                    raise ValueError("Unknown message type")</span><br><span> </span><br><span>               # Generate a message payload</span><br><span style="color: hsl(0, 100%, 40%);">-            msg_raw = msg.gen_msg()</span><br><span style="color: hsl(120, 100%, 40%);">+               msg_raw = msg.to_bytes()</span><br><span> </span><br><span>                 # Calculate and pack the message length</span><br><span>              msg_len = len(msg_raw)</span><br><span>@@ -118,7 +118,7 @@</span><br><span>         #   a parsed message in case of success,</span><br><span>     #   or None in case of EOF or header parsing error,</span><br><span>  #   or False in case of message parsing error.</span><br><span style="color: hsl(0, 100%, 40%);">-  def _parse_msg(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def _from_bytes(self):</span><br><span>               # Attempt to read a message header</span><br><span>           hdr_raw = self.f.read(self.HDR_LENGTH)</span><br><span>               if len(hdr_raw) != self.HDR_LENGTH:</span><br><span>@@ -142,7 +142,7 @@</span><br><span>            # Attempt to parse a message</span><br><span>                 try:</span><br><span>                         msg_raw = bytearray(msg_raw)</span><br><span style="color: hsl(0, 100%, 40%);">-                    msg.parse_msg(msg_raw)</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg.from_bytes(msg_raw)</span><br><span>              except:</span><br><span>                      log.error("Couldn't parse a message, skipping...")</span><br><span>                     return False</span><br><span>@@ -155,7 +155,7 @@</span><br><span>   #   a parsed message in case of success,</span><br><span>     #   or None in case of EOF, out of range, or header parsing error,</span><br><span>   #   or False in case of message parsing error.</span><br><span style="color: hsl(0, 100%, 40%);">-  def parse_msg(self, idx):</span><br><span style="color: hsl(120, 100%, 40%);">+     def from_bytes(self, idx):</span><br><span>           # Move descriptor to the beginning of requested message</span><br><span>              rc = self._seek2msg(idx)</span><br><span>             if not rc:</span><br><span>@@ -163,7 +163,7 @@</span><br><span>                     return None</span><br><span> </span><br><span>              # Attempt to parse a message</span><br><span style="color: hsl(0, 100%, 40%);">-            return self._parse_msg()</span><br><span style="color: hsl(120, 100%, 40%);">+              return self._from_bytes()</span><br><span> </span><br><span>        # Parses all messages from a given file</span><br><span>      # Return value:</span><br><span>@@ -185,7 +185,7 @@</span><br><span>                # Read the capture in loop...</span><br><span>                while True:</span><br><span>                  # Attempt to parse a message</span><br><span style="color: hsl(0, 100%, 40%);">-                    msg = self._parse_msg()</span><br><span style="color: hsl(120, 100%, 40%);">+                       msg = self._from_bytes()</span><br><span> </span><br><span>                         # EOF or broken header</span><br><span>                       if msg is None:</span><br><span>diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py</span><br><span>index 1cded9b..f754c6c 100644</span><br><span>--- a/src/target/trx_toolkit/data_if.py</span><br><span>+++ b/src/target/trx_toolkit/data_if.py</span><br><span>@@ -23,6 +23,8 @@</span><br><span> </span><br><span> import logging as log</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+from typing import Optional</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> from udp_link import UDPLink</span><br><span> from data_msg import *</span><br><span> </span><br><span>@@ -50,64 +52,43 @@</span><br><span>             # No suitable version found</span><br><span>          return -1</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-   def match_hdr_ver(self, msg):</span><br><span style="color: hsl(0, 100%, 40%);">-           if msg.ver == self._hdr_ver:</span><br><span style="color: hsl(0, 100%, 40%);">-                    return True</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             log.error("(%s) Rx DATA message (%s) with unexpected header "</span><br><span style="color: hsl(0, 100%, 40%);">-                   "version %u (!= expected %u), ignoring..."</span><br><span style="color: hsl(0, 100%, 40%);">-                  % (self.desc_link(), msg.desc_hdr(),</span><br><span style="color: hsl(0, 100%, 40%);">-                       msg.ver, self._hdr_ver))</span><br><span style="color: hsl(0, 100%, 40%);">-             return False</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-    def recv_raw_data(self):</span><br><span style="color: hsl(120, 100%, 40%);">+      def recv_raw_data(self) -> bytes:</span><br><span>                 data, _ = self.sock.recvfrom(512)</span><br><span>            return data</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def recv_tx_msg(self):</span><br><span style="color: hsl(120, 100%, 40%);">+        def recv_tx_msg(self) -> Optional[TxMsg]:</span><br><span>                 # Read raw data from socket</span><br><span>          data = self.recv_raw_data()</span><br><span> </span><br><span>              # Attempt to parse a TRXD Tx message</span><br><span>                 try:</span><br><span style="color: hsl(0, 100%, 40%);">-                    msg = TxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">-                   msg.parse_msg(bytearray(data))</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg = TxMsg(self._hdr_ver)</span><br><span style="color: hsl(120, 100%, 40%);">+                    msg.from_bytes(data)</span><br><span>                 except:</span><br><span>                      log.error("Failed to parse a TRXD Tx message "</span><br><span>                             "from R:%s:%u" % (self.remote_addr, self.remote_port))</span><br><span>                     return None</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-         # Make sure the header version matches</span><br><span style="color: hsl(0, 100%, 40%);">-          # the configured one (self._hdr_ver)</span><br><span style="color: hsl(0, 100%, 40%);">-            if not self.match_hdr_ver(msg):</span><br><span style="color: hsl(0, 100%, 40%);">-                 return None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span>          return msg</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-  def recv_rx_msg(self):</span><br><span style="color: hsl(120, 100%, 40%);">+        def recv_rx_msg(self) -> Optional[RxMsg]:</span><br><span>                 # Read raw data from socket</span><br><span>          data = self.recv_raw_data()</span><br><span> </span><br><span>              # Attempt to parse a TRXD Rx message</span><br><span>                 try:</span><br><span style="color: hsl(0, 100%, 40%);">-                    msg = RxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">-                   msg.parse_msg(bytearray(data))</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg = RxMsg(self._hdr_ver)</span><br><span style="color: hsl(120, 100%, 40%);">+                    msg.from_bytes(data)</span><br><span>                 except:</span><br><span>                      log.error("Failed to parse a TRXD Rx message "</span><br><span>                             "from R:%s:%u" % (self.remote_addr, self.remote_port))</span><br><span>                     return None</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-         # Make sure the header version matches</span><br><span style="color: hsl(0, 100%, 40%);">-          # the configured one (self._hdr_ver)</span><br><span style="color: hsl(0, 100%, 40%);">-            if not self.match_hdr_ver(msg):</span><br><span style="color: hsl(0, 100%, 40%);">-                 return None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span>          return msg</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-  def send_msg(self, msg, legacy = False):</span><br><span style="color: hsl(120, 100%, 40%);">+      def send_msg(self, msg: RxMsg) -> None:</span><br><span>           try:</span><br><span style="color: hsl(0, 100%, 40%);">-                    # Validate and encode a TRXD message</span><br><span style="color: hsl(0, 100%, 40%);">-                    payload = msg.gen_msg(legacy)</span><br><span style="color: hsl(120, 100%, 40%);">+                 payload = msg.to_bytes()</span><br><span>             except ValueError as e:</span><br><span>                      log.error("Failed to encode a TRXD message ('%s') "</span><br><span>                                "due to error: %s" % (msg.desc_hdr(), e))</span><br><span>diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py</span><br><span>index 7e785f9..570cfa0 100644</span><br><span>--- a/src/target/trx_toolkit/data_msg.py</span><br><span>+++ b/src/target/trx_toolkit/data_msg.py</span><br><span>@@ -3,7 +3,8 @@</span><br><span> # TRX Toolkit</span><br><span> # DATA interface message definitions and helpers</span><br><span> #</span><br><span style="color: hsl(0, 100%, 40%);">-# (C) 2018-2019 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span style="color: hsl(120, 100%, 40%);">+# (C) 2018-2021 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span style="color: hsl(120, 100%, 40%);">+# (C) 2021 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de></span><br><span> #</span><br><span> # All Rights Reserved</span><br><span> #</span><br><span>@@ -25,35 +26,39 @@</span><br><span> import struct</span><br><span> import abc</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-from typing import List</span><br><span style="color: hsl(120, 100%, 40%);">+from typing import Any, Type, List, Tuple, Dict</span><br><span> from enum import Enum</span><br><span> from gsm_shared import *</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+import trxd_proto</span><br><span style="color: hsl(120, 100%, 40%);">+import codec</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span> class Modulation(Enum):</span><br><span>     """ Modulation types defined in 3GPP TS 45.002 """</span><br><span style="color: hsl(0, 100%, 40%);">-        ModGMSK         = (0b0000, 1 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">-  Mod8PSK         = (0b0100, 3 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">-  ModGMSK_AB      = (0b0110, 1 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">-  ModRFU          = (0b0111, 0) # Reserved for Future Use</span><br><span style="color: hsl(0, 100%, 40%);">- Mod16QAM        = (0b1000, 4 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">-  Mod32QAM        = (0b1010, 5 * GMSK_BURST_LEN)</span><br><span style="color: hsl(0, 100%, 40%);">-  ModAQPSK        = (0b1100, 2 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+        ModGMSK         = (0b0000, 0b1100, 1 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+        Mod8PSK         = (0b0100, 0b1110, 3 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+        ModGMSK_AB      = (0b0110, 0b1111, 1 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+        ModRFU          = (0b0111, 0b1111, 0) # Reserved for Future Use</span><br><span style="color: hsl(120, 100%, 40%);">+       Mod16QAM        = (0b1000, 0b1110, 4 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+        Mod32QAM        = (0b1010, 0b1110, 5 * GMSK_BURST_LEN)</span><br><span style="color: hsl(120, 100%, 40%);">+        ModAQPSK        = (0b1100, 0b1100, 2 * GMSK_BURST_LEN)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-      def __init__(self, coding, bl):</span><br><span style="color: hsl(120, 100%, 40%);">+       def __init__(self, val: int, mask: int, bl: int):</span><br><span>            # Coding in TRXD header</span><br><span style="color: hsl(0, 100%, 40%);">-         self.coding = coding</span><br><span style="color: hsl(120, 100%, 40%);">+          self.val = val</span><br><span style="color: hsl(120, 100%, 40%);">+                self.mask = mask</span><br><span>             # Burst length</span><br><span>               self.bl = bl</span><br><span> </span><br><span>     @classmethod</span><br><span style="color: hsl(0, 100%, 40%);">-    def pick(self, coding):</span><br><span style="color: hsl(120, 100%, 40%);">+       def pick(self, val: int):</span><br><span>            for mod in list(self):</span><br><span style="color: hsl(0, 100%, 40%);">-                  if mod.coding == coding:</span><br><span style="color: hsl(120, 100%, 40%);">+                      if (val & mod.mask) == mod.val:</span><br><span>                          return mod</span><br><span>           return None</span><br><span> </span><br><span>      @classmethod</span><br><span style="color: hsl(0, 100%, 40%);">-    def pick_by_bl(self, bl):</span><br><span style="color: hsl(120, 100%, 40%);">+     def pick_by_bl(self, bl: int):</span><br><span>               for mod in list(self):</span><br><span>                       if mod.bl == bl:</span><br><span>                             return mod</span><br><span>@@ -66,66 +71,87 @@</span><br><span>     CHDR_VERSION_MAX = 0b1111</span><br><span>    KNOWN_VERSIONS = (0, 1)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-     def __init__(self, fn = None, tn = None, burst = None, ver = 0):</span><br><span style="color: hsl(0, 100%, 40%);">-                self.burst = burst</span><br><span style="color: hsl(0, 100%, 40%);">-              self.ver = ver</span><br><span style="color: hsl(0, 100%, 40%);">-          self.fn = fn</span><br><span style="color: hsl(0, 100%, 40%);">-            self.tn = tn</span><br><span style="color: hsl(120, 100%, 40%);">+  # PDU codecs for all known versions</span><br><span style="color: hsl(120, 100%, 40%);">+   CODECS = NotImplemented # type: Tuple[codec.Envelope, ...]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-  @property</span><br><span style="color: hsl(0, 100%, 40%);">-       def CHDR_LEN(self):</span><br><span style="color: hsl(0, 100%, 40%);">-             ''' The common header length. '''</span><br><span style="color: hsl(0, 100%, 40%);">-               return 1 + 4 # (VER + TN) + FN</span><br><span style="color: hsl(120, 100%, 40%);">+        # Default PDU content for child types</span><br><span style="color: hsl(120, 100%, 40%);">+ DEF_CONT = NotImplemented # type: Dict[str, Any]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def __init__(self, ver: int, cont: Dict[str, Any] = { }):</span><br><span style="color: hsl(120, 100%, 40%);">+             # TRXD PDU version</span><br><span style="color: hsl(120, 100%, 40%);">+            if not ver in self.KNOWN_VERSIONS:</span><br><span style="color: hsl(120, 100%, 40%);">+                    raise ValueError("Unknown TRXD PDU version %d" % ver)</span><br><span style="color: hsl(120, 100%, 40%);">+               self._ver = ver</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+             # TRXD PDU codec</span><br><span style="color: hsl(120, 100%, 40%);">+              self.codec = self.CODECS[ver]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+               # Content of the message</span><br><span style="color: hsl(120, 100%, 40%);">+              self.c = {</span><br><span style="color: hsl(120, 100%, 40%);">+                    # Default TDMA frame/timeslot number</span><br><span style="color: hsl(120, 100%, 40%);">+                  'fn'            : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+                  'tn'            : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+                  # NOPE / IDLE frame indication</span><br><span style="color: hsl(120, 100%, 40%);">+                        'nope'          : False,</span><br><span style="color: hsl(120, 100%, 40%);">+                      # Modulation type and TSC info</span><br><span style="color: hsl(120, 100%, 40%);">+                        'mod_type'      : Modulation.ModGMSK,</span><br><span style="color: hsl(120, 100%, 40%);">+                 'tsc_set'       : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+                  'tsc'           : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+                  # Burst hard-/soft-bits</span><br><span style="color: hsl(120, 100%, 40%);">+                       'burst'         : [],</span><br><span style="color: hsl(120, 100%, 40%);">+                 # Default fields for particular class</span><br><span style="color: hsl(120, 100%, 40%);">+                 **self.DEF_CONT,</span><br><span style="color: hsl(120, 100%, 40%);">+                      # Fields provided during instantiation</span><br><span style="color: hsl(120, 100%, 40%);">+                        **cont</span><br><span style="color: hsl(120, 100%, 40%);">+                } # type: Dict[str, Any]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def __getitem__(self, key: str) -> Any:</span><br><span style="color: hsl(120, 100%, 40%);">+            return self.c[key]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+  def __setitem__(self, key: str, val: Any) -> None:</span><br><span style="color: hsl(120, 100%, 40%);">+         self.c[key] = val</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   def __delitem__(self, key: str) -> None:</span><br><span style="color: hsl(120, 100%, 40%);">+           del self.c[key]</span><br><span> </span><br><span>  @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">-     def gen_hdr(self):</span><br><span style="color: hsl(0, 100%, 40%);">-              ''' Generate message specific header. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-       @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">-     def parse_hdr(self, hdr):</span><br><span style="color: hsl(0, 100%, 40%);">-               ''' Parse message specific header. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-  @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">-     def gen_burst(self):</span><br><span style="color: hsl(120, 100%, 40%);">+  def gen_burst(self) -> None:</span><br><span>              ''' Generate message specific burst. '''</span><br><span> </span><br><span>         @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">-     def parse_burst(self, burst):</span><br><span style="color: hsl(120, 100%, 40%);">+ def parse_burst(self) -> None:</span><br><span>            ''' Parse message specific burst. '''</span><br><span> </span><br><span>    @abc.abstractmethod</span><br><span style="color: hsl(0, 100%, 40%);">-     def rand_burst(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def rand_burst(self) -> None:</span><br><span>             ''' Generate a random message specific burst. '''</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-   def rand_fn(self):</span><br><span style="color: hsl(120, 100%, 40%);">+    @abc.abstractmethod</span><br><span style="color: hsl(120, 100%, 40%);">+   def trans_burst(self) -> List[int]:</span><br><span style="color: hsl(120, 100%, 40%);">+                ''' Convert between hard-bits and soft-bits. '''</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def rand_fn(self) -> int:</span><br><span>                 ''' Generate a random frame number. '''</span><br><span>              return random.randint(0, GSM_HYPERFRAME)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-    def rand_tn(self):</span><br><span style="color: hsl(120, 100%, 40%);">+    def rand_tn(self) -> int:</span><br><span>                 ''' Generate a random timeslot number. '''</span><br><span>           return random.randint(0, 7)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">- def rand_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def rand_hdr(self) -> None:</span><br><span>               ''' Randomize the message header. '''</span><br><span style="color: hsl(0, 100%, 40%);">-           self.fn = self.rand_fn()</span><br><span style="color: hsl(0, 100%, 40%);">-                self.tn = self.rand_tn()</span><br><span style="color: hsl(120, 100%, 40%);">+              self.c['fn'] = self.rand_fn()</span><br><span style="color: hsl(120, 100%, 40%);">+         self.c['tn'] = self.rand_tn()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-       def desc_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def desc_hdr(self) -> str:</span><br><span>                ''' Generate human-readable header description. '''</span><br><span> </span><br><span>              result = ""</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-               if self.ver > 0:</span><br><span style="color: hsl(0, 100%, 40%);">-                     result += ("ver=%u " % self.ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              if self.fn is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                 result += ("fn=%u " % self.fn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.tn is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                 result += ("tn=%u " % self.tn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.burst is not None and len(self.burst) > 0:</span><br><span style="color: hsl(0, 100%, 40%);">-                   result += ("bl=%u " % len(self.burst))</span><br><span style="color: hsl(120, 100%, 40%);">+              result += ("ver=%u " % self._ver)</span><br><span style="color: hsl(120, 100%, 40%);">+           result += ("fn=%u tn=%u " % (self.c['fn'], self.c['tn']))</span><br><span style="color: hsl(120, 100%, 40%);">+           if self.c['burst']:</span><br><span style="color: hsl(120, 100%, 40%);">+                   result += ("bl=%u " % len(self.c['burst']))</span><br><span> </span><br><span>            return result</span><br><span> </span><br><span>@@ -149,131 +175,76 @@</span><br><span>           ''' Convert bits {1..0} to soft-bits {-127..127}. '''</span><br><span>                return [-127 if b else 127 for b in bits]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-   def validate(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def validate(self) -> None:</span><br><span>               ''' Validate the message fields (throws ValueError). '''</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-            if not self.ver in self.KNOWN_VERSIONS:</span><br><span style="color: hsl(0, 100%, 40%);">-                 raise ValueError("Unknown TRXD header version %d" % self.ver)</span><br><span style="color: hsl(120, 100%, 40%);">+               if self.c['fn'] < 0 or self.c['fn'] > GSM_HYPERFRAME:</span><br><span style="color: hsl(120, 100%, 40%);">+                   raise ValueError("TDMA frame-number %d is out of range" % self.c['fn'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-           if self.fn is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                     raise ValueError("TDMA frame-number is not set")</span><br><span style="color: hsl(120, 100%, 40%);">+            if self.c['tn'] < 0 or self.c['tn'] > 7:</span><br><span style="color: hsl(120, 100%, 40%);">+                        raise ValueError("TDMA time-slot %d is out of range" % self.c['tn'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-              if self.fn < 0 or self.fn > GSM_HYPERFRAME:</span><br><span style="color: hsl(0, 100%, 40%);">-                       raise ValueError("TDMA frame-number %d is out of range" % self.fn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-            if self.tn is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                     raise ValueError("TDMA time-slot is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         if self.tn < 0 or self.tn > 7:</span><br><span style="color: hsl(0, 100%, 40%);">-                    raise ValueError("TDMA time-slot %d is out of range" % self.tn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-       def gen_msg(self, legacy = False):</span><br><span style="color: hsl(120, 100%, 40%);">+    def to_bytes(self) -> bytes:</span><br><span>              ''' Generate a TRX DATA message. '''</span><br><span style="color: hsl(120, 100%, 40%);">+          if not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+                        self.gen_burst()</span><br><span style="color: hsl(120, 100%, 40%);">+              return self.codec._to_bytes(self.c)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-         # Validate all the fields</span><br><span style="color: hsl(0, 100%, 40%);">-               self.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         # Allocate an empty byte-array</span><br><span style="color: hsl(0, 100%, 40%);">-          buf = bytearray()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-               # Put version (4 bits) and TDMA TN (3 bits)</span><br><span style="color: hsl(0, 100%, 40%);">-             buf.append((self.ver << 4) | (self.tn & 0x07))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                # Put TDMA FN (4 octets, BE)</span><br><span style="color: hsl(0, 100%, 40%);">-            buf += struct.pack(">L", self.fn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Generate message specific header part</span><br><span style="color: hsl(0, 100%, 40%);">-         hdr = self.gen_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">-            buf += hdr</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # Generate burst</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.burst is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                      buf += self.gen_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         # This is a rudiment from (legacy) OpenBTS transceiver,</span><br><span style="color: hsl(0, 100%, 40%);">-         # some L1 implementations still expect two dummy bytes.</span><br><span style="color: hsl(0, 100%, 40%);">-         if legacy and self.ver == 0x00:</span><br><span style="color: hsl(0, 100%, 40%);">-                 buf += bytearray(2)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             return buf</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-      def parse_msg(self, msg):</span><br><span style="color: hsl(120, 100%, 40%);">+     def from_bytes(self, msg: bytes) -> None:</span><br><span>                 ''' Parse a TRX DATA message. '''</span><br><span style="color: hsl(120, 100%, 40%);">+             self.codec._from_bytes(self.c, msg)</span><br><span style="color: hsl(120, 100%, 40%);">+           if not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+                        self.parse_burst()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-          # Make sure we have at least common header</span><br><span style="color: hsl(0, 100%, 40%);">-              if len(msg) < self.CHDR_LEN:</span><br><span style="color: hsl(0, 100%, 40%);">-                 raise ValueError("Message is to short: missing common header")</span><br><span style="color: hsl(120, 100%, 40%);">+      def trans(self, cls: Type['Msg'], ver: int) -> 'Msg':</span><br><span style="color: hsl(120, 100%, 40%);">+              ''' Transform between L12TRX and TRX2L1. '''</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                # Parse the header version first</span><br><span style="color: hsl(0, 100%, 40%);">-                self.ver = (msg[0] >> 4)</span><br><span style="color: hsl(0, 100%, 40%);">-          if not self.ver in self.KNOWN_VERSIONS:</span><br><span style="color: hsl(0, 100%, 40%);">-                 raise ValueError("Unknown TRXD header version %d" % self.ver)</span><br><span style="color: hsl(120, 100%, 40%);">+               # Allocate a new message</span><br><span style="color: hsl(120, 100%, 40%);">+              msg = cls(ver, {</span><br><span style="color: hsl(120, 100%, 40%);">+                      'fn' : self.c['fn'],</span><br><span style="color: hsl(120, 100%, 40%);">+                  'tn' : self.c['tn'],</span><br><span style="color: hsl(120, 100%, 40%);">+          })</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-          # Parse TDMA TN and FN</span><br><span style="color: hsl(0, 100%, 40%);">-          self.tn = (msg[0] & 0x07)</span><br><span style="color: hsl(0, 100%, 40%);">-           self.fn = struct.unpack(">L", msg[1:5])[0]</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         # Make sure we have the whole header,</span><br><span style="color: hsl(0, 100%, 40%);">-           # including the version specific fields</span><br><span style="color: hsl(0, 100%, 40%);">-         if len(msg) < self.HDR_LEN:</span><br><span style="color: hsl(0, 100%, 40%);">-                  raise ValueError("Message is to short: missing version specific header")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # Specific message part</span><br><span style="color: hsl(0, 100%, 40%);">-         self.parse_hdr(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             # Copy burst, skipping header</span><br><span style="color: hsl(0, 100%, 40%);">-           msg_burst = msg[self.HDR_LEN:]</span><br><span style="color: hsl(0, 100%, 40%);">-          if len(msg_burst) > 0:</span><br><span style="color: hsl(0, 100%, 40%);">-                       self.parse_burst(msg_burst)</span><br><span style="color: hsl(120, 100%, 40%);">+           # Convert burst bits</span><br><span style="color: hsl(120, 100%, 40%);">+          if self.c['burst'] and not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+                    msg.c['burst'] = self.trans_burst()</span><br><span>          else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   self.burst = None</span><br><span style="color: hsl(120, 100%, 40%);">+                     msg.c['nope'] = True</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+                return msg</span><br><span> </span><br><span> class TxMsg(Msg):</span><br><span>  ''' Tx (L1 -> TRX) message coding API. '''</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+     # PDU codecs for all known versions</span><br><span style="color: hsl(120, 100%, 40%);">+   CODECS = (</span><br><span style="color: hsl(120, 100%, 40%);">+            trxd_proto.PDUv0Tx(),</span><br><span style="color: hsl(120, 100%, 40%);">+         trxd_proto.PDUv1Tx(),</span><br><span style="color: hsl(120, 100%, 40%);">+ )</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   DEF_CONT = {</span><br><span style="color: hsl(120, 100%, 40%);">+          # Power reduction (in dB)</span><br><span style="color: hsl(120, 100%, 40%);">+             'pwr'           : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+  }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>  # Constants</span><br><span>  PWR_MIN = 0x00</span><br><span>       PWR_MAX = 0xff</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-      # Specific message fields</span><br><span style="color: hsl(0, 100%, 40%);">-       pwr = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-      @property</span><br><span style="color: hsl(0, 100%, 40%);">-       def HDR_LEN(self):</span><br><span style="color: hsl(0, 100%, 40%);">-              ''' Calculate header length depending on its version. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-               # Common header length</span><br><span style="color: hsl(0, 100%, 40%);">-          length = self.CHDR_LEN</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Message specific header length</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.ver in (0x00, 0x01):</span><br><span style="color: hsl(0, 100%, 40%);">-                    length += 1 # PWR</span><br><span style="color: hsl(0, 100%, 40%);">-               else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   raise IndexError("Unhandled version %u" % self.ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-           return length</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-   def validate(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def validate(self) -> None:</span><br><span>               ''' Validate the message fields (throws ValueError). '''</span><br><span> </span><br><span>                 # Validate common fields</span><br><span>             Msg.validate(self)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-          if self.pwr is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                    raise ValueError("Tx Attenuation level is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-           if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX:</span><br><span style="color: hsl(0, 100%, 40%);">-                    raise ValueError("Tx Attenuation %d is out of range" % self.pwr)</span><br><span style="color: hsl(120, 100%, 40%);">+            if self.c['pwr'] < self.PWR_MIN or self.c['pwr'] > self.PWR_MAX:</span><br><span style="color: hsl(120, 100%, 40%);">+                        raise ValueError("Tx Attenuation %d is out of range" % self.c['pwr'])</span><br><span> </span><br><span>          # FIXME: properly handle IDLE / NOPE indications</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.burst is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                  raise ValueError("Tx burst bits are not set")</span><br><span style="color: hsl(120, 100%, 40%);">+               if len(self.c['burst']) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):</span><br><span style="color: hsl(120, 100%, 40%);">+                      raise ValueError("Tx burst has odd length %u" % len(self.c['burst']))</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-             # FIXME: properly handle IDLE / NOPE indications</span><br><span style="color: hsl(0, 100%, 40%);">-                if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):</span><br><span style="color: hsl(0, 100%, 40%);">-                     raise ValueError("Tx burst has odd length %u" % len(self.burst))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-      def rand_pwr(self, min = None, max = None):</span><br><span style="color: hsl(120, 100%, 40%);">+   def rand_pwr(self, min = None, max = None) -> int:</span><br><span>                ''' Generate a random power level. '''</span><br><span> </span><br><span>           if min is None:</span><br><span>@@ -284,80 +255,55 @@</span><br><span> </span><br><span>          return random.randint(min, max)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-     def rand_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def rand_hdr(self) -> None:</span><br><span>               ''' Randomize message specific header. '''</span><br><span> </span><br><span>               Msg.rand_hdr(self)</span><br><span style="color: hsl(0, 100%, 40%);">-              self.pwr = self.rand_pwr()</span><br><span style="color: hsl(120, 100%, 40%);">+            self.c['pwr'] = self.rand_pwr()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-     def desc_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def desc_hdr(self) -> str:</span><br><span>                ''' Generate human-readable header description. '''</span><br><span> </span><br><span>              # Describe the common part</span><br><span>           result = Msg.desc_hdr(self)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-         if self.pwr is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                        result += ("pwr=%u " % self.pwr)</span><br><span style="color: hsl(120, 100%, 40%);">+            result += ("pwr=%u " % self.c['pwr'])</span><br><span> </span><br><span>          # Strip useless whitespace and return</span><br><span>                return result.strip()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-       def gen_hdr(self):</span><br><span style="color: hsl(0, 100%, 40%);">-              ''' Generate message specific header part. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Allocate an empty byte-array</span><br><span style="color: hsl(0, 100%, 40%);">-          buf = bytearray()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-               # Put power</span><br><span style="color: hsl(0, 100%, 40%);">-             buf.append(self.pwr)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-            return buf</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-      def parse_hdr(self, hdr):</span><br><span style="color: hsl(0, 100%, 40%);">-               ''' Parse message specific header part. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             # Parse power level</span><br><span style="color: hsl(0, 100%, 40%);">-             self.pwr = hdr[5]</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-       def gen_burst(self):</span><br><span style="color: hsl(120, 100%, 40%);">+  def gen_burst(self) -> None:</span><br><span>              ''' Generate message specific burst. '''</span><br><span style="color: hsl(120, 100%, 40%);">+              self.c['hard-bits'] = bytes(self.c['burst'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                # Copy burst 'as is'</span><br><span style="color: hsl(0, 100%, 40%);">-            return bytearray(self.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-    def parse_burst(self, burst):</span><br><span style="color: hsl(120, 100%, 40%);">+ def parse_burst(self) -> None:</span><br><span>            ''' Parse message specific burst. '''</span><br><span style="color: hsl(120, 100%, 40%);">+         self.c['burst'] = list(self.c['hard-bits'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-         length = len(burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             # Distinguish between GSM and EDGE</span><br><span style="color: hsl(0, 100%, 40%);">-              if length >= EDGE_BURST_LEN:</span><br><span style="color: hsl(0, 100%, 40%);">-                 self.burst = list(burst[:EDGE_BURST_LEN])</span><br><span style="color: hsl(0, 100%, 40%);">-               else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   self.burst = list(burst[:GMSK_BURST_LEN])</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-       def rand_burst(self, length = GMSK_BURST_LEN):</span><br><span style="color: hsl(120, 100%, 40%);">+        def rand_burst(self, length = GMSK_BURST_LEN) -> None:</span><br><span>            ''' Generate a random message specific burst. '''</span><br><span style="color: hsl(0, 100%, 40%);">-               self.burst = [random.randint(0, 1) for _ in range(length)]</span><br><span style="color: hsl(120, 100%, 40%);">+            self.c['burst'] = [random.randint(0, 1) for _ in range(length)]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-     def trans(self, ver = None):</span><br><span style="color: hsl(0, 100%, 40%);">-            ''' Transform this message into RxMsg. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # Allocate a new message</span><br><span style="color: hsl(0, 100%, 40%);">-                msg = RxMsg(fn = self.fn, tn = self.tn,</span><br><span style="color: hsl(0, 100%, 40%);">-                 ver = self.ver if ver is None else ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         # Convert burst bits</span><br><span style="color: hsl(0, 100%, 40%);">-            if self.burst is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                      msg.burst = self.ubit2sbit(self.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-          else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   msg.nope_ind = True</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             return msg</span><br><span style="color: hsl(120, 100%, 40%);">+    def trans_burst(self) -> List[int]:</span><br><span style="color: hsl(120, 100%, 40%);">+                ''' Transform hard-bits into soft-bits. '''</span><br><span style="color: hsl(120, 100%, 40%);">+           return self.ubit2sbit(self.c['burst'])</span><br><span> </span><br><span> class RxMsg(Msg):</span><br><span>      ''' Rx (TRX -> L1) message coding API. '''</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+     # PDU codecs for all known versions</span><br><span style="color: hsl(120, 100%, 40%);">+   CODECS = (</span><br><span style="color: hsl(120, 100%, 40%);">+            trxd_proto.PDUv0Rx(),</span><br><span style="color: hsl(120, 100%, 40%);">+         trxd_proto.PDUv1Rx(),</span><br><span style="color: hsl(120, 100%, 40%);">+ )</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+   DEF_CONT = {</span><br><span style="color: hsl(120, 100%, 40%);">+          # Specific message fields</span><br><span style="color: hsl(120, 100%, 40%);">+             'rssi'          : -50,</span><br><span style="color: hsl(120, 100%, 40%);">+                'toa256'        : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+          'cir'           : 0,</span><br><span style="color: hsl(120, 100%, 40%);">+  }</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span>  # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise</span><br><span>      RSSI_MIN = -120</span><br><span>      RSSI_MAX = -47</span><br><span>@@ -373,121 +319,39 @@</span><br><span>      CI_MIN = -1280</span><br><span>       CI_MAX = 1280</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-       # IDLE frame / nope detection indicator</span><br><span style="color: hsl(0, 100%, 40%);">- NOPE_IND = (1 << 7)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-       # Specific message fields</span><br><span style="color: hsl(0, 100%, 40%);">-       rssi = None</span><br><span style="color: hsl(0, 100%, 40%);">-     toa256 = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-   # Version 0x01 specific (default values)</span><br><span style="color: hsl(0, 100%, 40%);">-        mod_type = Modulation.ModGMSK</span><br><span style="color: hsl(0, 100%, 40%);">-   nope_ind = False</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-        tsc_set = None</span><br><span style="color: hsl(0, 100%, 40%);">-  tsc = None</span><br><span style="color: hsl(0, 100%, 40%);">-      ci = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-       @property</span><br><span style="color: hsl(0, 100%, 40%);">-       def HDR_LEN(self):</span><br><span style="color: hsl(0, 100%, 40%);">-              ''' Calculate header length depending on its version. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-               # Common header length</span><br><span style="color: hsl(0, 100%, 40%);">-          length = self.CHDR_LEN</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Message specific header length</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.ver == 0x00:</span><br><span style="color: hsl(0, 100%, 40%);">-                    # RSSI + ToA</span><br><span style="color: hsl(0, 100%, 40%);">-                    length += 1 + 2</span><br><span style="color: hsl(0, 100%, 40%);">-         elif self.ver == 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">-                  # RSSI + ToA + TS + C/I</span><br><span style="color: hsl(0, 100%, 40%);">-                 length += 1 + 2 + 1 + 2</span><br><span style="color: hsl(0, 100%, 40%);">-         else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   raise IndexError("Unhandled version %u" % self.ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-           return length</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-   def _validate_burst_v0(self):</span><br><span style="color: hsl(0, 100%, 40%);">-           # Burst is mandatory</span><br><span style="color: hsl(0, 100%, 40%);">-            if self.burst is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                  raise ValueError("Rx burst bits are not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         # ... and can be either of GSM (GMSK) or EDGE (8-PSK)</span><br><span style="color: hsl(0, 100%, 40%);">-           if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):</span><br><span style="color: hsl(0, 100%, 40%);">-                     raise ValueError("Rx burst has odd length %u" % len(self.burst))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-      def _validate_burst_v1(self):</span><br><span style="color: hsl(0, 100%, 40%);">-           # Burst is omitted in case of an IDLE / NOPE indication</span><br><span style="color: hsl(0, 100%, 40%);">-         if self.nope_ind and self.burst is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                        return</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          if self.nope_ind and self.burst is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                    raise ValueError("NOPE.ind comes with burst?!?")</span><br><span style="color: hsl(0, 100%, 40%);">-              if self.burst is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                  raise ValueError("Rx burst bits are not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         # Burst length depends on modulation type</span><br><span style="color: hsl(0, 100%, 40%);">-               if len(self.burst) != self.mod_type.bl:</span><br><span style="color: hsl(0, 100%, 40%);">-                 raise ValueError("Rx burst has odd length %u" % len(self.burst))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-      def validate_burst(self):</span><br><span style="color: hsl(0, 100%, 40%);">-               ''' Validate the burst (throws ValueError). '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         if self.ver == 0x00:</span><br><span style="color: hsl(0, 100%, 40%);">-                    self._validate_burst_v0()</span><br><span style="color: hsl(0, 100%, 40%);">-               elif self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">-                       self._validate_burst_v1()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-       def validate(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def validate(self) -> None:</span><br><span>               ''' Validate the message header fields (throws ValueError). '''</span><br><span> </span><br><span>          # Validate common fields</span><br><span>             Msg.validate(self)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-          if self.rssi is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                   raise ValueError("RSSI is not set")</span><br><span style="color: hsl(120, 100%, 40%);">+         if self.c['rssi'] < self.RSSI_MIN or self.c['rssi'] > self.RSSI_MAX:</span><br><span style="color: hsl(120, 100%, 40%);">+                    raise ValueError("RSSI %d is out of range" % self.c['rssi'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-              if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX:</span><br><span style="color: hsl(0, 100%, 40%);">-                        raise ValueError("RSSI %d is out of range" % self.rssi)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-               if self.toa256 is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                 raise ValueError("ToA256 is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:</span><br><span style="color: hsl(0, 100%, 40%);">-                        raise ValueError("ToA256 %d is out of range" % self.toa256)</span><br><span style="color: hsl(120, 100%, 40%);">+         if self.c['toa256'] < self.TOA256_MIN or self.c['toa256'] > self.TOA256_MAX:</span><br><span style="color: hsl(120, 100%, 40%);">+                    raise ValueError("ToA256 %d is out of range" % self.c['toa256'])</span><br><span> </span><br><span>               # Version specific parameters (omited for NOPE.ind)</span><br><span style="color: hsl(0, 100%, 40%);">-             if self.ver >= 0x01 and not self.nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">-                   if type(self.mod_type) is not Modulation:</span><br><span style="color: hsl(120, 100%, 40%);">+             if self._ver >= 1 and not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+                  if type(self.c['mod_type']) is not Modulation:</span><br><span>                               raise ValueError("Unknown Rx modulation type")</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                    if self.tsc_set is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                                raise ValueError("TSC set is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                        if self.mod_type is Modulation.ModGMSK:</span><br><span style="color: hsl(0, 100%, 40%);">-                         if self.tsc_set not in range(0, 4):</span><br><span style="color: hsl(0, 100%, 40%);">-                                     raise ValueError("TSC set %d is out of range" % self.tsc_set)</span><br><span style="color: hsl(120, 100%, 40%);">+                       if self.c['mod_type'] is Modulation.ModGMSK:</span><br><span style="color: hsl(120, 100%, 40%);">+                          if self.c['tsc_set'] not in range(0, 4):</span><br><span style="color: hsl(120, 100%, 40%);">+                                      raise ValueError("TSC set %d is out of range" % self.c['tsc_set'])</span><br><span>                         else:</span><br><span style="color: hsl(0, 100%, 40%);">-                           if self.tsc_set not in range(0, 2):</span><br><span style="color: hsl(0, 100%, 40%);">-                                     raise ValueError("TSC set %d is out of range" % self.tsc_set)</span><br><span style="color: hsl(120, 100%, 40%);">+                               if self.c['tsc_set'] not in range(0, 2):</span><br><span style="color: hsl(120, 100%, 40%);">+                                      raise ValueError("TSC set %d is out of range" % self.c['tsc_set'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                        if self.tsc is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                            raise ValueError("TSC is not set")</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                    if self.tsc not in self.TSC_RANGE:</span><br><span style="color: hsl(0, 100%, 40%);">-                              raise ValueError("TSC %d is out of range" % self.tsc)</span><br><span style="color: hsl(120, 100%, 40%);">+                       if self.c['tsc'] not in self.TSC_RANGE:</span><br><span style="color: hsl(120, 100%, 40%);">+                               raise ValueError("TSC %d is out of range" % self.c['tsc'])</span><br><span> </span><br><span>             # Version specific parameters (also present in NOPE.ind)</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">-                 if self.ci is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                             raise ValueError("C/I is not set")</span><br><span style="color: hsl(120, 100%, 40%);">+          if self._ver >= 1:</span><br><span style="color: hsl(120, 100%, 40%);">+                 if self.c['cir'] < self.CI_MIN or self.c['cir'] > self.CI_MAX:</span><br><span style="color: hsl(120, 100%, 40%);">+                          raise ValueError("C/I %d is out of range" % self.c['cir'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                        if self.ci < self.CI_MIN or self.ci > self.CI_MAX:</span><br><span style="color: hsl(0, 100%, 40%);">-                                raise ValueError("C/I %d is out of range" % self.ci)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          self.validate_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-   def rand_rssi(self, min = None, max = None):</span><br><span style="color: hsl(120, 100%, 40%);">+  def rand_rssi(self, min = None, max = None) -> int:</span><br><span>               ''' Generate a random RSSI value. '''</span><br><span> </span><br><span>            if min is None:</span><br><span>@@ -498,7 +362,7 @@</span><br><span> </span><br><span>            return random.randint(min, max)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-     def rand_toa256(self, min = None, max = None):</span><br><span style="color: hsl(120, 100%, 40%);">+        def rand_toa256(self, min = None, max = None) -> int:</span><br><span>             ''' Generate a random ToA (Time of Arrival) value. '''</span><br><span> </span><br><span>           if min is None:</span><br><span>@@ -509,185 +373,88 @@</span><br><span> </span><br><span>                 return random.randint(min, max)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-     def rand_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def rand_hdr(self, nope: bool = False) -> None:</span><br><span>           ''' Randomize message specific header. '''</span><br><span> </span><br><span>               Msg.rand_hdr(self)</span><br><span style="color: hsl(0, 100%, 40%);">-              self.rssi = self.rand_rssi()</span><br><span style="color: hsl(0, 100%, 40%);">-            self.toa256 = self.rand_toa256()</span><br><span style="color: hsl(120, 100%, 40%);">+              self.c['rssi'] = self.rand_rssi()</span><br><span style="color: hsl(120, 100%, 40%);">+             self.c['toa256'] = self.rand_toa256()</span><br><span style="color: hsl(120, 100%, 40%);">+         self.c['nope'] = nope</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-               if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">-                 self.mod_type = random.choice(list(Modulation))</span><br><span style="color: hsl(0, 100%, 40%);">-                 if self.mod_type is Modulation.ModGMSK:</span><br><span style="color: hsl(0, 100%, 40%);">-                         self.tsc_set = random.randint(0, 3)</span><br><span style="color: hsl(120, 100%, 40%);">+           if self._ver >= 1 and not nope:</span><br><span style="color: hsl(120, 100%, 40%);">+                    self.c['mod_type'] = random.choice(list(Modulation))</span><br><span style="color: hsl(120, 100%, 40%);">+                  if self.c['mod_type'] is Modulation.ModGMSK:</span><br><span style="color: hsl(120, 100%, 40%);">+                          self.c['tsc_set'] = random.randint(0, 3)</span><br><span>                     else:</span><br><span style="color: hsl(0, 100%, 40%);">-                           self.tsc_set = random.randint(0, 1)</span><br><span style="color: hsl(0, 100%, 40%);">-                     self.tsc = random.choice(self.TSC_RANGE)</span><br><span style="color: hsl(120, 100%, 40%);">+                              self.c['tsc_set'] = random.randint(0, 1)</span><br><span style="color: hsl(120, 100%, 40%);">+                      self.c['tsc'] = random.choice(self.TSC_RANGE)</span><br><span> </span><br><span style="color: hsl(120, 100%, 40%);">+             if self._ver >= 1:</span><br><span>                        # C/I: Carrier-to-Interference ratio</span><br><span style="color: hsl(0, 100%, 40%);">-                    self.ci = random.randint(self.CI_MIN, self.CI_MAX)</span><br><span style="color: hsl(120, 100%, 40%);">+                    self.c['cir'] = random.randint(self.CI_MIN, self.CI_MAX)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-    def desc_hdr(self):</span><br><span style="color: hsl(120, 100%, 40%);">+   def desc_hdr(self) -> str:</span><br><span>                ''' Generate human-readable header description. '''</span><br><span> </span><br><span>              # Describe the common part</span><br><span>           result = Msg.desc_hdr(self)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-         if self.rssi is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                       result += ("rssi=%d " % self.rssi)</span><br><span style="color: hsl(120, 100%, 40%);">+          result += ("rssi=%d " % self.c['rssi'])</span><br><span style="color: hsl(120, 100%, 40%);">+             result += ("toa256=%d " % self.c['toa256'])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-               if self.toa256 is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                     result += ("toa256=%d " % self.toa256)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">-                 if not self.nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">-                           if self.mod_type is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                                   result += ("%s " % self.mod_type)</span><br><span style="color: hsl(0, 100%, 40%);">-                             if self.tsc_set is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                                    result += ("set=%u " % self.tsc_set)</span><br><span style="color: hsl(0, 100%, 40%);">-                          if self.tsc is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                                        result += ("tsc=%u " % self.tsc)</span><br><span style="color: hsl(0, 100%, 40%);">-                              if self.ci is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                                 result += ("C/I=%d cB " % self.ci)</span><br><span style="color: hsl(120, 100%, 40%);">+          if self._ver >= 0x01:</span><br><span style="color: hsl(120, 100%, 40%);">+                      if not self.c['nope']:</span><br><span style="color: hsl(120, 100%, 40%);">+                                result += ("%s " % self.c['mod_type'])</span><br><span style="color: hsl(120, 100%, 40%);">+                              result += ("set=%u " % self.c['tsc_set'])</span><br><span style="color: hsl(120, 100%, 40%);">+                           result += ("tsc=%u " % self.c['tsc'])</span><br><span style="color: hsl(120, 100%, 40%);">+                               result += ("C/I=%d cB " % self.c['cir'])</span><br><span>                   else:</span><br><span>                                result += "(IDLE / NOPE IND) "</span><br><span> </span><br><span>                 # Strip useless whitespace and return</span><br><span>                return result.strip()</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-       def gen_mts(self):</span><br><span style="color: hsl(0, 100%, 40%);">-              ''' Encode Modulation and Training Sequence info. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-           # IDLE / nope indication has no MTS info</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">-                       return self.NOPE_IND</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-            # TSC: . . . . . X X X</span><br><span style="color: hsl(0, 100%, 40%);">-          mts = self.tsc & 0b111</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # MTS: . X X X X . . .</span><br><span style="color: hsl(0, 100%, 40%);">-          mts |= self.mod_type.coding << 3</span><br><span style="color: hsl(0, 100%, 40%);">-          mts |= self.tsc_set << 3</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          return mts</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-      def parse_mts(self, mts):</span><br><span style="color: hsl(0, 100%, 40%);">-               ''' Parse Modulation and Training Sequence info. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-            # IDLE / nope indication has no MTS info</span><br><span style="color: hsl(0, 100%, 40%);">-                self.nope_ind = (mts & self.NOPE_IND) > 0</span><br><span style="color: hsl(0, 100%, 40%);">-                if self.nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">-                       self.mod_type = None</span><br><span style="color: hsl(0, 100%, 40%);">-                    self.tsc_set = None</span><br><span style="color: hsl(0, 100%, 40%);">-                     self.tsc = None</span><br><span style="color: hsl(0, 100%, 40%);">-                 return</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # TSC: . . . . . X X X</span><br><span style="color: hsl(0, 100%, 40%);">-          self.tsc = mts & 0b111</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # MTS: . X X X X . . .</span><br><span style="color: hsl(0, 100%, 40%);">-          mts = (mts >> 3) & 0b1111</span><br><span style="color: hsl(0, 100%, 40%);">-             if (mts & 0b1100) > 0:</span><br><span style="color: hsl(0, 100%, 40%);">-                   # Mask: . . . . M M M S</span><br><span style="color: hsl(0, 100%, 40%);">-                 self.mod_type = Modulation.pick(mts & 0b1110)</span><br><span style="color: hsl(0, 100%, 40%);">-                       self.tsc_set = mts & 0b1</span><br><span style="color: hsl(0, 100%, 40%);">-            else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   # GMSK: . . . . 0 0 S S</span><br><span style="color: hsl(0, 100%, 40%);">-                 self.mod_type = Modulation.ModGMSK</span><br><span style="color: hsl(0, 100%, 40%);">-                      self.tsc_set = mts & 0b11</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-   def gen_hdr(self):</span><br><span style="color: hsl(0, 100%, 40%);">-              ''' Generate message specific header part. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Allocate an empty byte-array</span><br><span style="color: hsl(0, 100%, 40%);">-          buf = bytearray()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-               # Put RSSI</span><br><span style="color: hsl(0, 100%, 40%);">-              buf.append(-self.rssi)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Encode ToA (Time of Arrival)</span><br><span style="color: hsl(0, 100%, 40%);">-          # Big endian, 2 bytes (int32_t)</span><br><span style="color: hsl(0, 100%, 40%);">-         buf += struct.pack(">h", self.toa256)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">-                 # Modulation and Training Sequence info</span><br><span style="color: hsl(0, 100%, 40%);">-                 mts = self.gen_mts()</span><br><span style="color: hsl(0, 100%, 40%);">-                    buf.append(mts)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                 # C/I: Carrier-to-Interference ratio (in centiBels)</span><br><span style="color: hsl(0, 100%, 40%);">-                     buf += struct.pack(">h", self.ci)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          return buf</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-      def parse_hdr(self, hdr):</span><br><span style="color: hsl(0, 100%, 40%);">-               ''' Parse message specific header part. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             # Parse RSSI</span><br><span style="color: hsl(0, 100%, 40%);">-            self.rssi = -(hdr[5])</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-           # Parse ToA (Time of Arrival)</span><br><span style="color: hsl(0, 100%, 40%);">-           self.toa256 = struct.unpack(">h", hdr[6:8])[0]</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             if self.ver >= 0x01:</span><br><span style="color: hsl(0, 100%, 40%);">-                 # Modulation and Training Sequence info</span><br><span style="color: hsl(0, 100%, 40%);">-                 self.parse_mts(hdr[8])</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                  # C/I: Carrier-to-Interference ratio (in centiBels)</span><br><span style="color: hsl(0, 100%, 40%);">-                     self.ci = struct.unpack(">h", hdr[9:11])[0]</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-        def gen_burst(self):</span><br><span style="color: hsl(120, 100%, 40%);">+  def gen_burst(self) -> None:</span><br><span>              ''' Generate message specific burst. '''</span><br><span> </span><br><span>                 # Convert soft-bits to unsigned soft-bits</span><br><span style="color: hsl(0, 100%, 40%);">-               burst_usbits = self.sbit2usbit(self.burst)</span><br><span style="color: hsl(120, 100%, 40%);">+            burst = self.sbit2usbit(self.c['burst'])</span><br><span style="color: hsl(120, 100%, 40%);">+              self.c['soft-bits'] = bytes(burst)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-          # Encode to bytes</span><br><span style="color: hsl(0, 100%, 40%);">-               return bytearray(burst_usbits)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-  def _parse_burst_v0(self, burst):</span><br><span style="color: hsl(120, 100%, 40%);">+     def _parse_burst_v0(self, burst: List[int]) -> List[int]:</span><br><span>                 ''' Parse message specific burst for header version 0. '''</span><br><span> </span><br><span>               bl = len(burst)</span><br><span> </span><br><span>          # We need to guess modulation by the length of burst</span><br><span style="color: hsl(0, 100%, 40%);">-            self.mod_type = Modulation.pick_by_bl(bl)</span><br><span style="color: hsl(0, 100%, 40%);">-               if self.mod_type is None:</span><br><span style="color: hsl(120, 100%, 40%);">+             self.c['mod_type'] = Modulation.pick_by_bl(bl)</span><br><span style="color: hsl(120, 100%, 40%);">+                if self.c['mod_type'] is None:</span><br><span>                       # Some old transceivers append two dummy bytes</span><br><span style="color: hsl(0, 100%, 40%);">-                  self.mod_type = Modulation.pick_by_bl(bl - 2)</span><br><span style="color: hsl(120, 100%, 40%);">+                 self.c['mod_type'] = Modulation.pick_by_bl(bl - 2)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-          if self.mod_type is None:</span><br><span style="color: hsl(120, 100%, 40%);">+             if self.c['mod_type'] is None:</span><br><span>                       raise ValueError("Odd burst length %u" % bl)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-              return burst[:self.mod_type.bl]</span><br><span style="color: hsl(120, 100%, 40%);">+               return burst[:self.c['mod_type'].bl]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-        def parse_burst(self, burst):</span><br><span style="color: hsl(120, 100%, 40%);">+ def parse_burst(self) -> None:</span><br><span>            ''' Parse message specific burst. '''</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-               burst = list(burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-             if self.ver == 0x00:</span><br><span style="color: hsl(120, 100%, 40%);">+          burst = list(self.c['soft-bits'])</span><br><span style="color: hsl(120, 100%, 40%);">+             if self._ver == 0:</span><br><span>                   burst = self._parse_burst_v0(burst)</span><br><span> </span><br><span>              # Convert unsigned soft-bits to soft-bits</span><br><span style="color: hsl(0, 100%, 40%);">-               self.burst = self.usbit2sbit(burst)</span><br><span style="color: hsl(120, 100%, 40%);">+           self.c['burst'] = self.usbit2sbit(burst)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-    def rand_burst(self, length = None):</span><br><span style="color: hsl(120, 100%, 40%);">+  def rand_burst(self, length = None) -> None:</span><br><span>              ''' Generate a random message specific burst. '''</span><br><span> </span><br><span>                if length is None:</span><br><span style="color: hsl(0, 100%, 40%);">-                      length = self.mod_type.bl</span><br><span style="color: hsl(120, 100%, 40%);">+                     length = self.c['mod_type'].bl</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-              self.burst = [random.randint(-127, 127) for _ in range(length)]</span><br><span style="color: hsl(120, 100%, 40%);">+               self.c['burst'] = [random.randint(-127, 127) for _ in range(length)]</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-        def trans(self, ver = None):</span><br><span style="color: hsl(0, 100%, 40%);">-            ''' Transform this message to TxMsg. '''</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                # Allocate a new message</span><br><span style="color: hsl(0, 100%, 40%);">-                msg = TxMsg(fn = self.fn, tn = self.tn,</span><br><span style="color: hsl(0, 100%, 40%);">-                 ver = self.ver if ver is None else ver)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         # Convert burst bits</span><br><span style="color: hsl(0, 100%, 40%);">-            if self.burst is not None:</span><br><span style="color: hsl(0, 100%, 40%);">-                      msg.burst = self.sbit2ubit(self.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          return msg</span><br><span style="color: hsl(120, 100%, 40%);">+    def trans_burst(self) -> List[int]:</span><br><span style="color: hsl(120, 100%, 40%);">+                ''' Transform soft-bits into hard-bits. '''</span><br><span style="color: hsl(120, 100%, 40%);">+           return self.sbit2ubit(self.c['burst'])</span><br><span>diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py</span><br><span>index 573527b..3226a0e 100755</span><br><span>--- a/src/target/trx_toolkit/fake_trx.py</span><br><span>+++ b/src/target/trx_toolkit/fake_trx.py</span><br><span>@@ -188,9 +188,9 @@</span><br><span>            if self.burst_drop_amount == 0:</span><br><span>                      return False</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                if msg.fn % self.burst_drop_period == 0:</span><br><span style="color: hsl(120, 100%, 40%);">+              if msg.c['fn'] % self.burst_drop_period == 0:</span><br><span>                        log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"</span><br><span style="color: hsl(0, 100%, 40%);">-                         % (self, msg.fn, self.burst_drop_period))</span><br><span style="color: hsl(120, 100%, 40%);">+                             % (self, msg.c['fn'], self.burst_drop_period))</span><br><span>                       self.burst_drop_amount -= 1</span><br><span>                  return True</span><br><span> </span><br><span>@@ -198,64 +198,63 @@</span><br><span> </span><br><span>  def _handle_data_msg_v1(self, src_msg, msg):</span><br><span>                 # C/I (Carrier-to-Interference ratio)</span><br><span style="color: hsl(0, 100%, 40%);">-           msg.ci = self.ci</span><br><span style="color: hsl(120, 100%, 40%);">+              msg.c['cir'] = self.ci</span><br><span> </span><br><span>           # Pick modulation type by burst length</span><br><span style="color: hsl(0, 100%, 40%);">-          bl = len(src_msg.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-         msg.mod_type = Modulation.pick_by_bl(bl)</span><br><span style="color: hsl(120, 100%, 40%);">+              bl = len(src_msg.c['burst'])</span><br><span style="color: hsl(120, 100%, 40%);">+          msg.c['mod_type'] = Modulation.pick_by_bl(bl)</span><br><span> </span><br><span>            # Pick TSC (Training Sequence Code) and TSC set</span><br><span style="color: hsl(0, 100%, 40%);">-         if msg.mod_type is Modulation.ModGMSK:</span><br><span style="color: hsl(0, 100%, 40%);">-                  ss = TrainingSeqGMSK.pick(src_msg.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-                        msg.tsc = ss.tsc if ss is not None else 0</span><br><span style="color: hsl(0, 100%, 40%);">-                       msg.tsc_set = ss.tsc_set if ss is not None else 0</span><br><span style="color: hsl(120, 100%, 40%);">+             if msg.c['mod_type'] is Modulation.ModGMSK:</span><br><span style="color: hsl(120, 100%, 40%);">+                   ss = TrainingSeqGMSK.pick(src_msg.c['burst'])</span><br><span style="color: hsl(120, 100%, 40%);">+                 msg.c['tsc'] = ss.tsc if ss is not None else 0</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg.c['tsc_set'] = ss.tsc_set if ss is not None else 0</span><br><span>               else: # TODO: other modulation types (at least 8-PSK)</span><br><span style="color: hsl(0, 100%, 40%);">-                   msg.tsc_set = 0</span><br><span style="color: hsl(0, 100%, 40%);">-                 msg.tsc = 0</span><br><span style="color: hsl(120, 100%, 40%);">+                   msg.c['tsc_set'] = 0</span><br><span style="color: hsl(120, 100%, 40%);">+                  msg.c['tsc'] = 0</span><br><span> </span><br><span>         # Takes (partially initialized) TRXD Rx message,</span><br><span>     # simulates RF path parameters (such as RSSI),</span><br><span>       # and sends towards the L1</span><br><span>   def handle_data_msg(self, src_trx, src_msg, msg):</span><br><span>            if self.rf_muted:</span><br><span style="color: hsl(0, 100%, 40%);">-                       msg.nope_ind = True</span><br><span style="color: hsl(0, 100%, 40%);">-             elif not msg.nope_ind:</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg.c['nope'] = True</span><br><span style="color: hsl(120, 100%, 40%);">+          elif not msg.c['nope']:</span><br><span>                      # Path loss simulation</span><br><span style="color: hsl(0, 100%, 40%);">-                  msg.nope_ind = self.sim_burst_drop(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-         if msg.nope_ind:</span><br><span style="color: hsl(120, 100%, 40%);">+                      msg.c['nope'] = self.sim_burst_drop(msg)</span><br><span style="color: hsl(120, 100%, 40%);">+              if msg.c['nope']:</span><br><span>                    # Before TRXDv1, we simply drop the message</span><br><span style="color: hsl(0, 100%, 40%);">-                     if msg.ver < 0x01:</span><br><span style="color: hsl(120, 100%, 40%);">+                 if msg._ver < 1:</span><br><span>                          del msg</span><br><span>                              return</span><br><span> </span><br><span>                   # Since TRXDv1, we should send a NOPE.ind</span><br><span style="color: hsl(0, 100%, 40%);">-                       del msg.burst # burst bits are omited</span><br><span style="color: hsl(0, 100%, 40%);">-                   msg.burst = None</span><br><span style="color: hsl(120, 100%, 40%);">+                      msg.c['burst'].clear() # burst bits are omited</span><br><span> </span><br><span>                   # TODO: shoud we make these values configurable?</span><br><span style="color: hsl(0, 100%, 40%);">-                        msg.toa256 = self.TOA256_NOISE_DEFAULT</span><br><span style="color: hsl(0, 100%, 40%);">-                  msg.rssi = self.RSSI_NOISE_DEFAULT</span><br><span style="color: hsl(0, 100%, 40%);">-                      msg.ci = self.CI_NOISE_DEFAULT</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg.c['toa256'] = self.TOA256_NOISE_DEFAULT</span><br><span style="color: hsl(120, 100%, 40%);">+                   msg.c['rssi'] = self.RSSI_NOISE_DEFAULT</span><br><span style="color: hsl(120, 100%, 40%);">+                       msg.c['cir'] = self.CI_NOISE_DEFAULT</span><br><span> </span><br><span>                     self.data_if.send_msg(msg)</span><br><span>                   return</span><br><span> </span><br><span>           # Complete message header</span><br><span style="color: hsl(0, 100%, 40%);">-               msg.toa256 = self.toa256</span><br><span style="color: hsl(120, 100%, 40%);">+              msg.c['toa256'] = self.toa256</span><br><span> </span><br><span>            # Apply RSSI based on transmitter:</span><br><span>           if not self.fake_rssi_enabled:</span><br><span style="color: hsl(0, 100%, 40%);">-                  msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT</span><br><span style="color: hsl(120, 100%, 40%);">+                    msg.c['rssi'] = src_trx.tx_power - src_msg.c['pwr'] - self.PATH_LOSS_DEFAULT</span><br><span>                 else: # Apply fake RSSI</span><br><span style="color: hsl(0, 100%, 40%);">-                 msg.rssi = self.rssi</span><br><span style="color: hsl(120, 100%, 40%);">+                  msg.c['rssi'] = self.rssi</span><br><span> </span><br><span>                # Version specific fields</span><br><span style="color: hsl(0, 100%, 40%);">-               if msg.ver >= 0x01:</span><br><span style="color: hsl(120, 100%, 40%);">+                if msg._ver >= 1:</span><br><span>                         self._handle_data_msg_v1(src_msg, msg)</span><br><span> </span><br><span>           # Apply optional Timing Advance</span><br><span>              if src_trx.ta != 0:</span><br><span style="color: hsl(0, 100%, 40%);">-                     msg.toa256 -= src_trx.ta * 256</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg.c['toa256'] -= src_trx.ta * 256</span><br><span> </span><br><span>              Transceiver.handle_data_msg(self, msg)</span><br><span> </span><br><span>diff --git a/src/target/trx_toolkit/test_data_dump.py b/src/target/trx_toolkit/test_data_dump.py</span><br><span>index f7b4fde..2dad6b2 100644</span><br><span>--- a/src/target/trx_toolkit/test_data_dump.py</span><br><span>+++ b/src/target/trx_toolkit/test_data_dump.py</span><br><span>@@ -97,7 +97,7 @@</span><br><span>          msg_ref = self._gen_rand_message(cls)</span><br><span>                self._ddf.append_msg(msg_ref)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-               msg = self._ddf.parse_msg(0)</span><br><span style="color: hsl(120, 100%, 40%);">+          msg = self._ddf.from_bytes(0)</span><br><span>                self._compare_msg(msg, msg_ref)</span><br><span> </span><br><span>  # Store one Rx message in a file, read it back and compare</span><br><span>@@ -120,7 +120,7 @@</span><br><span>                     self._compare_msg(msg_list[i], msg_list_ref[i])</span><br><span> </span><br><span>  # Verify random access to stored messages</span><br><span style="color: hsl(0, 100%, 40%);">-       def test_parse_msg_idx(self):</span><br><span style="color: hsl(120, 100%, 40%);">+ def test_from_bytes_idx(self):</span><br><span>               # Store a mixed list of random messages (19 + 19)</span><br><span>            msg_list_ref = self._gen_rand_message_mix(19)</span><br><span>                self._ddf.append_all(msg_list_ref)</span><br><span>@@ -128,13 +128,13 @@</span><br><span>           # Random access</span><br><span>              for _ in range(100):</span><br><span>                         idx = random.randrange(len(msg_list_ref))</span><br><span style="color: hsl(0, 100%, 40%);">-                       msg = self._ddf.parse_msg(idx)</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg = self._ddf.from_bytes(idx)</span><br><span>                      self._compare_msg(msg, msg_list_ref[idx])</span><br><span> </span><br><span>        def test_parse_empty(self):</span><br><span>          with self.assertLogs(level = 'ERROR'):</span><br><span>                       for idx in range(100):</span><br><span style="color: hsl(0, 100%, 40%);">-                          msg = self._ddf.parse_msg(idx)</span><br><span style="color: hsl(120, 100%, 40%);">+                                msg = self._ddf.from_bytes(idx)</span><br><span>                              self.assertEqual(msg, None)</span><br><span> </span><br><span>      def test_parse_all_empty(self):</span><br><span>@@ -148,7 +148,7 @@</span><br><span>                self._tf.write(b'\xff' * 90)</span><br><span> </span><br><span>             with self.assertLogs(level = 'ERROR'):</span><br><span style="color: hsl(0, 100%, 40%);">-                  msg = self._ddf.parse_msg(0)</span><br><span style="color: hsl(120, 100%, 40%);">+                  msg = self._ddf.from_bytes(0)</span><br><span>                        self.assertEqual(msg, None)</span><br><span> </span><br><span>      def test_parse_unknown_tag(self):</span><br><span>@@ -158,7 +158,7 @@</span><br><span>              self._tf.write(b'\xff' * 90)</span><br><span> </span><br><span>             with self.assertLogs(level = 'ERROR'):</span><br><span style="color: hsl(0, 100%, 40%);">-                  msg = self._ddf.parse_msg(0)</span><br><span style="color: hsl(120, 100%, 40%);">+                  msg = self._ddf.from_bytes(0)</span><br><span>                        self.assertEqual(msg, None)</span><br><span> </span><br><span> if __name__ == '__main__':</span><br><span>diff --git a/src/target/trx_toolkit/test_data_msg.py b/src/target/trx_toolkit/test_data_msg.py</span><br><span>index 24fda67..b703ed4 100644</span><br><span>--- a/src/target/trx_toolkit/test_data_msg.py</span><br><span>+++ b/src/target/trx_toolkit/test_data_msg.py</span><br><span>@@ -5,6 +5,7 @@</span><br><span> # Unit test for TRXD message codec</span><br><span> #</span><br><span> # (C) 2019 by Vadim Yanitskiy <axilirator@gmail.com></span><br><span style="color: hsl(120, 100%, 40%);">+# (C) 2021 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de></span><br><span> #</span><br><span> # All Rights Reserved</span><br><span> #</span><br><span>@@ -23,171 +24,33 @@</span><br><span> # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.</span><br><span> </span><br><span> import unittest</span><br><span style="color: hsl(120, 100%, 40%);">+import logging</span><br><span> </span><br><span> from data_msg import Msg, TxMsg, RxMsg</span><br><span> </span><br><span> class Msg_Test(unittest.TestCase):</span><br><span>         # Compare message a with message b</span><br><span style="color: hsl(0, 100%, 40%);">-      def _compare_msg(self, a, b):</span><br><span style="color: hsl(120, 100%, 40%);">+ def _compare_msg(self, a: Msg, b: Msg) -> None:</span><br><span>           # Make sure we're comparing messages of the same type</span><br><span>            self.assertEqual(a.__class__, b.__class__)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-          # Compare common header fields</span><br><span style="color: hsl(0, 100%, 40%);">-          self.assertEqual(a.ver, b.ver)</span><br><span style="color: hsl(0, 100%, 40%);">-          self.assertEqual(a.fn, b.fn)</span><br><span style="color: hsl(0, 100%, 40%);">-            self.assertEqual(a.tn, b.tn)</span><br><span style="color: hsl(120, 100%, 40%);">+          # PDU version and fields</span><br><span style="color: hsl(120, 100%, 40%);">+              self.assertEqual(a._ver, b._ver)</span><br><span style="color: hsl(120, 100%, 40%);">+              for field in a.__class__.DEF_CONT:</span><br><span style="color: hsl(120, 100%, 40%);">+                    self.assertEqual(a[field], b[field])</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                # Burst bits (if present)</span><br><span style="color: hsl(0, 100%, 40%);">-               self.assertEqual(a.burst, b.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # TxMsg specific fields</span><br><span style="color: hsl(0, 100%, 40%);">-         if isinstance(a, TxMsg):</span><br><span style="color: hsl(0, 100%, 40%);">-                        self.assertEqual(a.pwr, b.pwr)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # RxMsg specific fields</span><br><span style="color: hsl(0, 100%, 40%);">-         if isinstance(a, RxMsg):</span><br><span style="color: hsl(0, 100%, 40%);">-                        # Version independent fields</span><br><span style="color: hsl(0, 100%, 40%);">-                    self.assertEqual(a.toa256, b.toa256)</span><br><span style="color: hsl(0, 100%, 40%);">-                    self.assertEqual(a.rssi, b.rssi)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                        # Version specific fields</span><br><span style="color: hsl(0, 100%, 40%);">-                       if a.ver >= 1:</span><br><span style="color: hsl(0, 100%, 40%);">-                               self.assertEqual(a.nope_ind, b.nope_ind)</span><br><span style="color: hsl(0, 100%, 40%);">-                                self.assertEqual(a.mod_type, b.mod_type)</span><br><span style="color: hsl(0, 100%, 40%);">-                                self.assertEqual(a.tsc_set, b.tsc_set)</span><br><span style="color: hsl(0, 100%, 40%);">-                          self.assertEqual(a.tsc, b.tsc)</span><br><span style="color: hsl(0, 100%, 40%);">-                          self.assertEqual(a.ci, b.ci)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-    # Make sure that message validation throws a ValueError</span><br><span style="color: hsl(0, 100%, 40%);">- def test_validate(self):</span><br><span style="color: hsl(0, 100%, 40%);">-                # Unknown version</span><br><span style="color: hsl(0, 100%, 40%);">-               with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">-                     msg = RxMsg(fn = 0, tn = 0, ver = 100)</span><br><span style="color: hsl(0, 100%, 40%);">-                  msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Uninitialized field</span><br><span style="color: hsl(0, 100%, 40%);">-           with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">-                     msg = RxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">-                   msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-          with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">-                     msg = RxMsg(fn = None, tn = 0)</span><br><span style="color: hsl(0, 100%, 40%);">-                  msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Out-of-range value(s)</span><br><span style="color: hsl(0, 100%, 40%);">-         with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">-                     msg = RxMsg(fn = -1, tn = 0)</span><br><span style="color: hsl(0, 100%, 40%);">-                    msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-          with self.assertRaises(ValueError):</span><br><span style="color: hsl(0, 100%, 40%);">-                     msg = RxMsg(fn = 0, tn = 10)</span><br><span style="color: hsl(0, 100%, 40%);">-                    msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-  # Validate header and burst randomization</span><br><span style="color: hsl(0, 100%, 40%);">-       def test_rand_hdr_burst(self):</span><br><span style="color: hsl(0, 100%, 40%);">-          tx_msg = TxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">-                rx_msg = RxMsg()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                for i in range(100):</span><br><span style="color: hsl(0, 100%, 40%);">-                    tx_msg.rand_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-                     rx_msg.rand_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-                     tx_msg.rand_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">-                       rx_msg.rand_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                       tx_msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-                       rx_msg.validate()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-       def _test_enc_dec(self, msg, legacy = False, nope_ind = False):</span><br><span style="color: hsl(0, 100%, 40%);">-         # Prepare a given message (randomize)</span><br><span style="color: hsl(0, 100%, 40%);">-           msg.rand_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # NOPE.ind contains no burst</span><br><span style="color: hsl(0, 100%, 40%);">-            if not nope_ind:</span><br><span style="color: hsl(0, 100%, 40%);">-                        msg.rand_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-                else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   msg.nope_ind = True</span><br><span style="color: hsl(0, 100%, 40%);">-                     msg.mod_type = None</span><br><span style="color: hsl(0, 100%, 40%);">-                     msg.tsc_set = None</span><br><span style="color: hsl(0, 100%, 40%);">-                      msg.tsc = None</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          # Encode a given message to bytes</span><br><span style="color: hsl(0, 100%, 40%);">-               msg_enc = msg.gen_msg(legacy)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-           # Decode a new message from bytes</span><br><span style="color: hsl(0, 100%, 40%);">-               msg_dec = msg.__class__()</span><br><span style="color: hsl(0, 100%, 40%);">-               msg_dec.parse_msg(msg_enc)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # Compare decoded vs the original</span><br><span style="color: hsl(0, 100%, 40%);">-               self._compare_msg(msg, msg_dec)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">- # Validate encoding and decoding</span><br><span style="color: hsl(0, 100%, 40%);">-        def test_enc_dec(self):</span><br><span style="color: hsl(120, 100%, 40%);">+       def test_enc_dec_default(self):</span><br><span style="color: hsl(120, 100%, 40%);">+               ''' Encode/decode/match test for default field values. '''</span><br><span>           for ver in Msg.KNOWN_VERSIONS:</span><br><span style="color: hsl(0, 100%, 40%);">-                  with self.subTest("TxMsg", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">-                                msg = TxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">-                          self._test_enc_dec(msg)</span><br><span style="color: hsl(120, 100%, 40%);">+                       with self.subTest('TxMsg(ver=%u)' % ver):</span><br><span style="color: hsl(120, 100%, 40%);">+                             a, b = TxMsg(ver), TxMsg(ver)</span><br><span style="color: hsl(120, 100%, 40%);">+                         b.from_bytes(a.to_bytes())</span><br><span style="color: hsl(120, 100%, 40%);">+                            self._compare_msg(a, b)</span><br><span> </span><br><span style="color: hsl(0, 100%, 40%);">-                     with self.subTest("RxMsg", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">-                                msg = RxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">-                          self._test_enc_dec(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                 if ver >= 1:</span><br><span style="color: hsl(0, 100%, 40%);">-                         with self.subTest("RxMsg NOPE.ind", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">-                                       msg = RxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">-                                  self._test_enc_dec(msg, nope_ind = True)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                with self.subTest("RxMsg (legacy transceiver)"):</span><br><span style="color: hsl(0, 100%, 40%);">-                      msg = RxMsg(ver = 0)</span><br><span style="color: hsl(0, 100%, 40%);">-                    self._test_enc_dec(msg, legacy = True)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-  # Validate bit conversations</span><br><span style="color: hsl(0, 100%, 40%);">-    def test_bit_conv(self):</span><br><span style="color: hsl(0, 100%, 40%);">-                usbits_ref = list(range(0, 256))</span><br><span style="color: hsl(0, 100%, 40%);">-                sbits_ref = list(range(-127, 128))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # Test both usbit2sbit() and sbit2usbit()</span><br><span style="color: hsl(0, 100%, 40%);">-               sbits = Msg.usbit2sbit(usbits_ref)</span><br><span style="color: hsl(0, 100%, 40%);">-              usbits = Msg.sbit2usbit(sbits)</span><br><span style="color: hsl(0, 100%, 40%);">-          self.assertEqual(usbits[:255], usbits_ref[:255])</span><br><span style="color: hsl(0, 100%, 40%);">-                self.assertEqual(usbits[255], 254)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-              # Test both sbit2ubit() and ubit2sbit()</span><br><span style="color: hsl(0, 100%, 40%);">-         ubits = Msg.sbit2ubit(sbits_ref)</span><br><span style="color: hsl(0, 100%, 40%);">-                self.assertEqual(ubits, ([1] * 127 + [0] * 128))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                sbits = Msg.ubit2sbit(ubits)</span><br><span style="color: hsl(0, 100%, 40%);">-            self.assertEqual(sbits, ([-127] * 127 + [127] * 128))</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-   def _test_transform(self, msg):</span><br><span style="color: hsl(0, 100%, 40%);">-         # Prepare given messages</span><br><span style="color: hsl(0, 100%, 40%);">-                msg.rand_hdr()</span><br><span style="color: hsl(0, 100%, 40%);">-          msg.rand_burst()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                # Perform message transformation</span><br><span style="color: hsl(0, 100%, 40%);">-                if isinstance(msg, TxMsg):</span><br><span style="color: hsl(0, 100%, 40%);">-                      msg_trans = msg.trans()</span><br><span style="color: hsl(0, 100%, 40%);">-         else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   msg_trans = msg.trans()</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-         self.assertEqual(msg_trans.ver, msg.ver)</span><br><span style="color: hsl(0, 100%, 40%);">-                self.assertEqual(msg_trans.fn, msg.fn)</span><br><span style="color: hsl(0, 100%, 40%);">-          self.assertEqual(msg_trans.tn, msg.tn)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-          if isinstance(msg, RxMsg):</span><br><span style="color: hsl(0, 100%, 40%);">-                      burst = Msg.sbit2ubit(msg.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-                        self.assertEqual(msg_trans.burst, burst)</span><br><span style="color: hsl(0, 100%, 40%);">-                else:</span><br><span style="color: hsl(0, 100%, 40%);">-                   burst = Msg.ubit2sbit(msg.burst)</span><br><span style="color: hsl(0, 100%, 40%);">-                        self.assertEqual(msg_trans.burst, burst)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-        # Validate message transformation</span><br><span style="color: hsl(0, 100%, 40%);">-       def test_transform(self):</span><br><span style="color: hsl(0, 100%, 40%);">-               for ver in Msg.KNOWN_VERSIONS:</span><br><span style="color: hsl(0, 100%, 40%);">-                  with self.subTest("TxMsg", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">-                                msg = TxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">-                          self._test_transform(msg)</span><br><span style="color: hsl(0, 100%, 40%);">-</span><br><span style="color: hsl(0, 100%, 40%);">-                       with self.subTest("RxMsg", ver = ver):</span><br><span style="color: hsl(0, 100%, 40%);">-                                msg = RxMsg(ver = ver)</span><br><span style="color: hsl(0, 100%, 40%);">-                          self._test_transform(msg)</span><br><span style="color: hsl(120, 100%, 40%);">+                     with self.subTest('RxMsg(ver=%u)' % ver):</span><br><span style="color: hsl(120, 100%, 40%);">+                             a, b = RxMsg(ver), RxMsg(ver)</span><br><span style="color: hsl(120, 100%, 40%);">+                         b.from_bytes(a.to_bytes())</span><br><span style="color: hsl(120, 100%, 40%);">+                            self._compare_msg(a, b)</span><br><span> </span><br><span> if __name__ == '__main__':</span><br><span>    unittest.main()</span><br><span>diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py</span><br><span>index d041070..54e6a6b 100644</span><br><span>--- a/src/target/trx_toolkit/transceiver.py</span><br><span>+++ b/src/target/trx_toolkit/transceiver.py</span><br><span>@@ -269,7 +269,7 @@</span><br><span>                       return None</span><br><span> </span><br><span>              # Make sure that indicated timeslot is configured</span><br><span style="color: hsl(0, 100%, 40%);">-               if msg.tn not in self.ts_list:</span><br><span style="color: hsl(120, 100%, 40%);">+                if msg.c['tn'] not in self.ts_list:</span><br><span>                  log.warning("(%s) RX TRXD message (%s), but timeslot is not "</span><br><span>                              "configured => dropping..." % (self, msg.desc_hdr()))</span><br><span>                   return None</span><br><span>@@ -278,4 +278,4 @@</span><br><span> </span><br><span>        def handle_data_msg(self, msg):</span><br><span>              # TODO: make legacy mode configurable (via argv?)</span><br><span style="color: hsl(0, 100%, 40%);">-               self.data_if.send_msg(msg, legacy = True)</span><br><span style="color: hsl(120, 100%, 40%);">+             self.data_if.send_msg(msg)</span><br><span>diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py</span><br><span>index 8b6f80c..389a663 100755</span><br><span>--- a/src/target/trx_toolkit/trx_sniff.py</span><br><span>+++ b/src/target/trx_toolkit/trx_sniff.py</span><br><span>@@ -108,7 +108,7 @@</span><br><span> </span><br><span>                # Attempt to parse the payload as a DATA message</span><br><span>             try:</span><br><span style="color: hsl(0, 100%, 40%);">-                    msg.parse_msg(msg_raw)</span><br><span style="color: hsl(120, 100%, 40%);">+                        msg.from_bytes(msg_raw)</span><br><span>                      msg.validate()</span><br><span>               except ValueError as e:</span><br><span>                      desc = msg.desc_hdr()</span><br><span>@@ -160,7 +160,7 @@</span><br><span>          # Message type specific filtering</span><br><span>            if isinstance(msg, RxMsg):</span><br><span>                   # NOPE.ind filter</span><br><span style="color: hsl(0, 100%, 40%);">-                       if not self.argv.pf_nope_ind and msg.nope_ind:</span><br><span style="color: hsl(120, 100%, 40%);">+                        if not self.argv.pf_nope_ind and msg.c['nope']:</span><br><span>                              return False</span><br><span> </span><br><span>                     # RSSI filter</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.osmocom.org/c/osmocom-bb/+/24019">change 24019</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.osmocom.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.osmocom.org/c/osmocom-bb/+/24019"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: osmocom-bb </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc </div>
<div style="display:none"> Gerrit-Change-Number: 24019 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: fixeria <vyanitskiy@sysmocom.de> </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>