This is merely a historical archive of years 2008-2021, before the migration to mailman3.
A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.
fixeria gerrit-no-reply at lists.osmocom.orgfixeria has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmocom-bb/+/24019 )
Change subject: trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py
......................................................................
trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py
Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc
Related: OS#4006, SYS#4895
---
M src/target/trx_toolkit/burst_fwd.py
M src/target/trx_toolkit/data_dump.py
M src/target/trx_toolkit/data_if.py
M src/target/trx_toolkit/data_msg.py
M src/target/trx_toolkit/fake_trx.py
M src/target/trx_toolkit/test_data_dump.py
M src/target/trx_toolkit/test_data_msg.py
M src/target/trx_toolkit/transceiver.py
M src/target/trx_toolkit/trx_sniff.py
9 files changed, 283 insertions(+), 672 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/19/24019/1
diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py
index 2e9e97b..03ce6e6 100644
--- a/src/target/trx_toolkit/burst_fwd.py
+++ b/src/target/trx_toolkit/burst_fwd.py
@@ -3,7 +3,7 @@
# TRX Toolkit
# Burst forwarding between transceivers
#
-# (C) 2017-2020 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2017-2021 by Vadim Yanitskiy <axilirator at gmail.com>
# Contributions by sysmocom - s.f.m.c. GmbH
#
# All Rights Reserved
@@ -25,6 +25,7 @@
import logging as log
from trx_list import TRXList
+from data_msg import RxMsg
class BurstForwarder(TRXList):
""" Performs burst forwarding between transceivers.
@@ -48,11 +49,11 @@
def forward_msg(self, src_trx, rx_msg):
# Originating Transceiver may use frequency hopping,
# so let's precalculate its Tx frequency in advance
- tx_freq = src_trx.get_tx_freq(rx_msg.fn)
+ tx_freq = src_trx.get_tx_freq(rx_msg.c['fn'])
if src_trx.rf_muted:
- del rx_msg.burst # burst bits are omited
- rx_msg.burst = None
+ # Burst bits are omited
+ rx_msg.c['burst'].clear()
# Iterate over all known transceivers
for trx in self.trx_list:
@@ -62,13 +63,13 @@
# Check transceiver state
if not trx.running:
continue
- if rx_msg.tn not in trx.ts_list:
+ if rx_msg.c['tn'] not in trx.ts_list:
continue
# Match Tx/Rx frequencies of the both transceivers
- if trx.get_rx_freq(rx_msg.fn) != tx_freq:
+ if trx.get_rx_freq(rx_msg.c['fn']) != tx_freq:
continue
- # Transform from TxMsg to RxMsg and forward
- tx_msg = rx_msg.trans(ver = trx.data_if._hdr_ver)
+ # Transform from L12TRX to TRX2L1 and forward
+ tx_msg = rx_msg.trans(RxMsg, trx.data_if._hdr_ver)
trx.handle_data_msg(src_trx, rx_msg, tx_msg)
diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py
index 8510e2d..fe8a2d8 100644
--- a/src/target/trx_toolkit/data_dump.py
+++ b/src/target/trx_toolkit/data_dump.py
@@ -44,7 +44,7 @@
raise ValueError("Unknown message type")
# Generate a message payload
- msg_raw = msg.gen_msg()
+ msg_raw = msg.to_bytes()
# Calculate and pack the message length
msg_len = len(msg_raw)
@@ -118,7 +118,7 @@
# a parsed message in case of success,
# or None in case of EOF or header parsing error,
# or False in case of message parsing error.
- def _parse_msg(self):
+ def _from_bytes(self):
# Attempt to read a message header
hdr_raw = self.f.read(self.HDR_LENGTH)
if len(hdr_raw) != self.HDR_LENGTH:
@@ -142,7 +142,7 @@
# Attempt to parse a message
try:
msg_raw = bytearray(msg_raw)
- msg.parse_msg(msg_raw)
+ msg.from_bytes(msg_raw)
except:
log.error("Couldn't parse a message, skipping...")
return False
@@ -155,7 +155,7 @@
# a parsed message in case of success,
# or None in case of EOF, out of range, or header parsing error,
# or False in case of message parsing error.
- def parse_msg(self, idx):
+ def from_bytes(self, idx):
# Move descriptor to the beginning of requested message
rc = self._seek2msg(idx)
if not rc:
@@ -163,7 +163,7 @@
return None
# Attempt to parse a message
- return self._parse_msg()
+ return self._from_bytes()
# Parses all messages from a given file
# Return value:
@@ -185,7 +185,7 @@
# Read the capture in loop...
while True:
# Attempt to parse a message
- msg = self._parse_msg()
+ msg = self._from_bytes()
# EOF or broken header
if msg is None:
diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py
index 1cded9b..f754c6c 100644
--- a/src/target/trx_toolkit/data_if.py
+++ b/src/target/trx_toolkit/data_if.py
@@ -23,6 +23,8 @@
import logging as log
+from typing import Optional
+
from udp_link import UDPLink
from data_msg import *
@@ -50,64 +52,43 @@
# No suitable version found
return -1
- def match_hdr_ver(self, msg):
- if msg.ver == self._hdr_ver:
- return True
-
- log.error("(%s) Rx DATA message (%s) with unexpected header "
- "version %u (!= expected %u), ignoring..."
- % (self.desc_link(), msg.desc_hdr(),
- msg.ver, self._hdr_ver))
- return False
-
- def recv_raw_data(self):
+ def recv_raw_data(self) -> bytes:
data, _ = self.sock.recvfrom(512)
return data
- def recv_tx_msg(self):
+ def recv_tx_msg(self) -> Optional[TxMsg]:
# Read raw data from socket
data = self.recv_raw_data()
# Attempt to parse a TRXD Tx message
try:
- msg = TxMsg()
- msg.parse_msg(bytearray(data))
+ msg = TxMsg(self._hdr_ver)
+ msg.from_bytes(data)
except:
log.error("Failed to parse a TRXD Tx message "
"from R:%s:%u" % (self.remote_addr, self.remote_port))
return None
- # Make sure the header version matches
- # the configured one (self._hdr_ver)
- if not self.match_hdr_ver(msg):
- return None
-
return msg
- def recv_rx_msg(self):
+ def recv_rx_msg(self) -> Optional[RxMsg]:
# Read raw data from socket
data = self.recv_raw_data()
# Attempt to parse a TRXD Rx message
try:
- msg = RxMsg()
- msg.parse_msg(bytearray(data))
+ msg = RxMsg(self._hdr_ver)
+ msg.from_bytes(data)
except:
log.error("Failed to parse a TRXD Rx message "
"from R:%s:%u" % (self.remote_addr, self.remote_port))
return None
- # Make sure the header version matches
- # the configured one (self._hdr_ver)
- if not self.match_hdr_ver(msg):
- return None
-
return msg
- def send_msg(self, msg, legacy = False):
+ def send_msg(self, msg: RxMsg) -> None:
try:
- # Validate and encode a TRXD message
- payload = msg.gen_msg(legacy)
+ payload = msg.to_bytes()
except ValueError as e:
log.error("Failed to encode a TRXD message ('%s') "
"due to error: %s" % (msg.desc_hdr(), e))
diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py
index 7e785f9..570cfa0 100644
--- a/src/target/trx_toolkit/data_msg.py
+++ b/src/target/trx_toolkit/data_msg.py
@@ -3,7 +3,8 @@
# TRX Toolkit
# DATA interface message definitions and helpers
#
-# (C) 2018-2019 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2018-2021 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2021 by sysmocom - s.f.m.c. GmbH <info at sysmocom.de>
#
# All Rights Reserved
#
@@ -25,35 +26,39 @@
import struct
import abc
-from typing import List
+from typing import Any, Type, List, Tuple, Dict
from enum import Enum
from gsm_shared import *
+import trxd_proto
+import codec
+
class Modulation(Enum):
""" Modulation types defined in 3GPP TS 45.002 """
- ModGMSK = (0b0000, 1 * GMSK_BURST_LEN)
- Mod8PSK = (0b0100, 3 * GMSK_BURST_LEN)
- ModGMSK_AB = (0b0110, 1 * GMSK_BURST_LEN)
- ModRFU = (0b0111, 0) # Reserved for Future Use
- Mod16QAM = (0b1000, 4 * GMSK_BURST_LEN)
- Mod32QAM = (0b1010, 5 * GMSK_BURST_LEN)
- ModAQPSK = (0b1100, 2 * GMSK_BURST_LEN)
+ ModGMSK = (0b0000, 0b1100, 1 * GMSK_BURST_LEN)
+ Mod8PSK = (0b0100, 0b1110, 3 * GMSK_BURST_LEN)
+ ModGMSK_AB = (0b0110, 0b1111, 1 * GMSK_BURST_LEN)
+ ModRFU = (0b0111, 0b1111, 0) # Reserved for Future Use
+ Mod16QAM = (0b1000, 0b1110, 4 * GMSK_BURST_LEN)
+ Mod32QAM = (0b1010, 0b1110, 5 * GMSK_BURST_LEN)
+ ModAQPSK = (0b1100, 0b1100, 2 * GMSK_BURST_LEN)
- def __init__(self, coding, bl):
+ def __init__(self, val: int, mask: int, bl: int):
# Coding in TRXD header
- self.coding = coding
+ self.val = val
+ self.mask = mask
# Burst length
self.bl = bl
@classmethod
- def pick(self, coding):
+ def pick(self, val: int):
for mod in list(self):
- if mod.coding == coding:
+ if (val & mod.mask) == mod.val:
return mod
return None
@classmethod
- def pick_by_bl(self, bl):
+ def pick_by_bl(self, bl: int):
for mod in list(self):
if mod.bl == bl:
return mod
@@ -66,66 +71,87 @@
CHDR_VERSION_MAX = 0b1111
KNOWN_VERSIONS = (0, 1)
- def __init__(self, fn = None, tn = None, burst = None, ver = 0):
- self.burst = burst
- self.ver = ver
- self.fn = fn
- self.tn = tn
+ # PDU codecs for all known versions
+ CODECS = NotImplemented # type: Tuple[codec.Envelope, ...]
- @property
- def CHDR_LEN(self):
- ''' The common header length. '''
- return 1 + 4 # (VER + TN) + FN
+ # Default PDU content for child types
+ DEF_CONT = NotImplemented # type: Dict[str, Any]
+
+ def __init__(self, ver: int, cont: Dict[str, Any] = { }):
+ # TRXD PDU version
+ if not ver in self.KNOWN_VERSIONS:
+ raise ValueError("Unknown TRXD PDU version %d" % ver)
+ self._ver = ver
+
+ # TRXD PDU codec
+ self.codec = self.CODECS[ver]
+
+ # Content of the message
+ self.c = {
+ # Default TDMA frame/timeslot number
+ 'fn' : 0,
+ 'tn' : 0,
+ # NOPE / IDLE frame indication
+ 'nope' : False,
+ # Modulation type and TSC info
+ 'mod_type' : Modulation.ModGMSK,
+ 'tsc_set' : 0,
+ 'tsc' : 0,
+ # Burst hard-/soft-bits
+ 'burst' : [],
+ # Default fields for particular class
+ **self.DEF_CONT,
+ # Fields provided during instantiation
+ **cont
+ } # type: Dict[str, Any]
+
+ def __getitem__(self, key: str) -> Any:
+ return self.c[key]
+
+ def __setitem__(self, key: str, val: Any) -> None:
+ self.c[key] = val
+
+ def __delitem__(self, key: str) -> None:
+ del self.c[key]
@abc.abstractmethod
- def gen_hdr(self):
- ''' Generate message specific header. '''
-
- @abc.abstractmethod
- def parse_hdr(self, hdr):
- ''' Parse message specific header. '''
-
- @abc.abstractmethod
- def gen_burst(self):
+ def gen_burst(self) -> None:
''' Generate message specific burst. '''
@abc.abstractmethod
- def parse_burst(self, burst):
+ def parse_burst(self) -> None:
''' Parse message specific burst. '''
@abc.abstractmethod
- def rand_burst(self):
+ def rand_burst(self) -> None:
''' Generate a random message specific burst. '''
- def rand_fn(self):
+ @abc.abstractmethod
+ def trans_burst(self) -> List[int]:
+ ''' Convert between hard-bits and soft-bits. '''
+
+ def rand_fn(self) -> int:
''' Generate a random frame number. '''
return random.randint(0, GSM_HYPERFRAME)
- def rand_tn(self):
+ def rand_tn(self) -> int:
''' Generate a random timeslot number. '''
return random.randint(0, 7)
- def rand_hdr(self):
+ def rand_hdr(self) -> None:
''' Randomize the message header. '''
- self.fn = self.rand_fn()
- self.tn = self.rand_tn()
+ self.c['fn'] = self.rand_fn()
+ self.c['tn'] = self.rand_tn()
- def desc_hdr(self):
+ def desc_hdr(self) -> str:
''' Generate human-readable header description. '''
result = ""
- if self.ver > 0:
- result += ("ver=%u " % self.ver)
-
- if self.fn is not None:
- result += ("fn=%u " % self.fn)
-
- if self.tn is not None:
- result += ("tn=%u " % self.tn)
-
- if self.burst is not None and len(self.burst) > 0:
- result += ("bl=%u " % len(self.burst))
+ result += ("ver=%u " % self._ver)
+ result += ("fn=%u tn=%u " % (self.c['fn'], self.c['tn']))
+ if self.c['burst']:
+ result += ("bl=%u " % len(self.c['burst']))
return result
@@ -149,131 +175,76 @@
''' Convert bits {1..0} to soft-bits {-127..127}. '''
return [-127 if b else 127 for b in bits]
- def validate(self):
+ def validate(self) -> None:
''' Validate the message fields (throws ValueError). '''
- if not self.ver in self.KNOWN_VERSIONS:
- raise ValueError("Unknown TRXD header version %d" % self.ver)
+ if self.c['fn'] < 0 or self.c['fn'] > GSM_HYPERFRAME:
+ raise ValueError("TDMA frame-number %d is out of range" % self.c['fn'])
- if self.fn is None:
- raise ValueError("TDMA frame-number is not set")
+ if self.c['tn'] < 0 or self.c['tn'] > 7:
+ raise ValueError("TDMA time-slot %d is out of range" % self.c['tn'])
- if self.fn < 0 or self.fn > GSM_HYPERFRAME:
- raise ValueError("TDMA frame-number %d is out of range" % self.fn)
-
- if self.tn is None:
- raise ValueError("TDMA time-slot is not set")
-
- if self.tn < 0 or self.tn > 7:
- raise ValueError("TDMA time-slot %d is out of range" % self.tn)
-
- def gen_msg(self, legacy = False):
+ def to_bytes(self) -> bytes:
''' Generate a TRX DATA message. '''
+ if not self.c['nope']:
+ self.gen_burst()
+ return self.codec._to_bytes(self.c)
- # Validate all the fields
- self.validate()
-
- # Allocate an empty byte-array
- buf = bytearray()
-
- # Put version (4 bits) and TDMA TN (3 bits)
- buf.append((self.ver << 4) | (self.tn & 0x07))
-
- # Put TDMA FN (4 octets, BE)
- buf += struct.pack(">L", self.fn)
-
- # Generate message specific header part
- hdr = self.gen_hdr()
- buf += hdr
-
- # Generate burst
- if self.burst is not None:
- buf += self.gen_burst()
-
- # This is a rudiment from (legacy) OpenBTS transceiver,
- # some L1 implementations still expect two dummy bytes.
- if legacy and self.ver == 0x00:
- buf += bytearray(2)
-
- return buf
-
- def parse_msg(self, msg):
+ def from_bytes(self, msg: bytes) -> None:
''' Parse a TRX DATA message. '''
+ self.codec._from_bytes(self.c, msg)
+ if not self.c['nope']:
+ self.parse_burst()
- # Make sure we have at least common header
- if len(msg) < self.CHDR_LEN:
- raise ValueError("Message is to short: missing common header")
+ def trans(self, cls: Type['Msg'], ver: int) -> 'Msg':
+ ''' Transform between L12TRX and TRX2L1. '''
- # Parse the header version first
- self.ver = (msg[0] >> 4)
- if not self.ver in self.KNOWN_VERSIONS:
- raise ValueError("Unknown TRXD header version %d" % self.ver)
+ # Allocate a new message
+ msg = cls(ver, {
+ 'fn' : self.c['fn'],
+ 'tn' : self.c['tn'],
+ })
- # Parse TDMA TN and FN
- self.tn = (msg[0] & 0x07)
- self.fn = struct.unpack(">L", msg[1:5])[0]
-
- # Make sure we have the whole header,
- # including the version specific fields
- if len(msg) < self.HDR_LEN:
- raise ValueError("Message is to short: missing version specific header")
-
- # Specific message part
- self.parse_hdr(msg)
-
- # Copy burst, skipping header
- msg_burst = msg[self.HDR_LEN:]
- if len(msg_burst) > 0:
- self.parse_burst(msg_burst)
+ # Convert burst bits
+ if self.c['burst'] and not self.c['nope']:
+ msg.c['burst'] = self.trans_burst()
else:
- self.burst = None
+ msg.c['nope'] = True
+
+ return msg
class TxMsg(Msg):
''' Tx (L1 -> TRX) message coding API. '''
+ # PDU codecs for all known versions
+ CODECS = (
+ trxd_proto.PDUv0Tx(),
+ trxd_proto.PDUv1Tx(),
+ )
+
+ DEF_CONT = {
+ # Power reduction (in dB)
+ 'pwr' : 0,
+ }
+
# Constants
PWR_MIN = 0x00
PWR_MAX = 0xff
- # Specific message fields
- pwr = None
-
- @property
- def HDR_LEN(self):
- ''' Calculate header length depending on its version. '''
-
- # Common header length
- length = self.CHDR_LEN
-
- # Message specific header length
- if self.ver in (0x00, 0x01):
- length += 1 # PWR
- else:
- raise IndexError("Unhandled version %u" % self.ver)
-
- return length
-
- def validate(self):
+ def validate(self) -> None:
''' Validate the message fields (throws ValueError). '''
# Validate common fields
Msg.validate(self)
- if self.pwr is None:
- raise ValueError("Tx Attenuation level is not set")
-
- if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX:
- raise ValueError("Tx Attenuation %d is out of range" % self.pwr)
+ if self.c['pwr'] < self.PWR_MIN or self.c['pwr'] > self.PWR_MAX:
+ raise ValueError("Tx Attenuation %d is out of range" % self.c['pwr'])
# FIXME: properly handle IDLE / NOPE indications
- if self.burst is None:
- raise ValueError("Tx burst bits are not set")
+ if len(self.c['burst']) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
+ raise ValueError("Tx burst has odd length %u" % len(self.c['burst']))
- # FIXME: properly handle IDLE / NOPE indications
- if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
- raise ValueError("Tx burst has odd length %u" % len(self.burst))
-
- def rand_pwr(self, min = None, max = None):
+ def rand_pwr(self, min = None, max = None) -> int:
''' Generate a random power level. '''
if min is None:
@@ -284,80 +255,55 @@
return random.randint(min, max)
- def rand_hdr(self):
+ def rand_hdr(self) -> None:
''' Randomize message specific header. '''
Msg.rand_hdr(self)
- self.pwr = self.rand_pwr()
+ self.c['pwr'] = self.rand_pwr()
- def desc_hdr(self):
+ def desc_hdr(self) -> str:
''' Generate human-readable header description. '''
# Describe the common part
result = Msg.desc_hdr(self)
- if self.pwr is not None:
- result += ("pwr=%u " % self.pwr)
+ result += ("pwr=%u " % self.c['pwr'])
# Strip useless whitespace and return
return result.strip()
- def gen_hdr(self):
- ''' Generate message specific header part. '''
-
- # Allocate an empty byte-array
- buf = bytearray()
-
- # Put power
- buf.append(self.pwr)
-
- return buf
-
- def parse_hdr(self, hdr):
- ''' Parse message specific header part. '''
-
- # Parse power level
- self.pwr = hdr[5]
-
- def gen_burst(self):
+ def gen_burst(self) -> None:
''' Generate message specific burst. '''
+ self.c['hard-bits'] = bytes(self.c['burst'])
- # Copy burst 'as is'
- return bytearray(self.burst)
-
- def parse_burst(self, burst):
+ def parse_burst(self) -> None:
''' Parse message specific burst. '''
+ self.c['burst'] = list(self.c['hard-bits'])
- length = len(burst)
-
- # Distinguish between GSM and EDGE
- if length >= EDGE_BURST_LEN:
- self.burst = list(burst[:EDGE_BURST_LEN])
- else:
- self.burst = list(burst[:GMSK_BURST_LEN])
-
- def rand_burst(self, length = GMSK_BURST_LEN):
+ def rand_burst(self, length = GMSK_BURST_LEN) -> None:
''' Generate a random message specific burst. '''
- self.burst = [random.randint(0, 1) for _ in range(length)]
+ self.c['burst'] = [random.randint(0, 1) for _ in range(length)]
- def trans(self, ver = None):
- ''' Transform this message into RxMsg. '''
-
- # Allocate a new message
- msg = RxMsg(fn = self.fn, tn = self.tn,
- ver = self.ver if ver is None else ver)
-
- # Convert burst bits
- if self.burst is not None:
- msg.burst = self.ubit2sbit(self.burst)
- else:
- msg.nope_ind = True
-
- return msg
+ def trans_burst(self) -> List[int]:
+ ''' Transform hard-bits into soft-bits. '''
+ return self.ubit2sbit(self.c['burst'])
class RxMsg(Msg):
''' Rx (TRX -> L1) message coding API. '''
+ # PDU codecs for all known versions
+ CODECS = (
+ trxd_proto.PDUv0Rx(),
+ trxd_proto.PDUv1Rx(),
+ )
+
+ DEF_CONT = {
+ # Specific message fields
+ 'rssi' : -50,
+ 'toa256' : 0,
+ 'cir' : 0,
+ }
+
# rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise
RSSI_MIN = -120
RSSI_MAX = -47
@@ -373,121 +319,39 @@
CI_MIN = -1280
CI_MAX = 1280
- # IDLE frame / nope detection indicator
- NOPE_IND = (1 << 7)
-
- # Specific message fields
- rssi = None
- toa256 = None
-
- # Version 0x01 specific (default values)
- mod_type = Modulation.ModGMSK
- nope_ind = False
-
- tsc_set = None
- tsc = None
- ci = None
-
- @property
- def HDR_LEN(self):
- ''' Calculate header length depending on its version. '''
-
- # Common header length
- length = self.CHDR_LEN
-
- # Message specific header length
- if self.ver == 0x00:
- # RSSI + ToA
- length += 1 + 2
- elif self.ver == 0x01:
- # RSSI + ToA + TS + C/I
- length += 1 + 2 + 1 + 2
- else:
- raise IndexError("Unhandled version %u" % self.ver)
-
- return length
-
- def _validate_burst_v0(self):
- # Burst is mandatory
- if self.burst is None:
- raise ValueError("Rx burst bits are not set")
-
- # ... and can be either of GSM (GMSK) or EDGE (8-PSK)
- if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
- raise ValueError("Rx burst has odd length %u" % len(self.burst))
-
- def _validate_burst_v1(self):
- # Burst is omitted in case of an IDLE / NOPE indication
- if self.nope_ind and self.burst is None:
- return
-
- if self.nope_ind and self.burst is not None:
- raise ValueError("NOPE.ind comes with burst?!?")
- if self.burst is None:
- raise ValueError("Rx burst bits are not set")
-
- # Burst length depends on modulation type
- if len(self.burst) != self.mod_type.bl:
- raise ValueError("Rx burst has odd length %u" % len(self.burst))
-
- def validate_burst(self):
- ''' Validate the burst (throws ValueError). '''
-
- if self.ver == 0x00:
- self._validate_burst_v0()
- elif self.ver >= 0x01:
- self._validate_burst_v1()
-
- def validate(self):
+ def validate(self) -> None:
''' Validate the message header fields (throws ValueError). '''
# Validate common fields
Msg.validate(self)
- if self.rssi is None:
- raise ValueError("RSSI is not set")
+ if self.c['rssi'] < self.RSSI_MIN or self.c['rssi'] > self.RSSI_MAX:
+ raise ValueError("RSSI %d is out of range" % self.c['rssi'])
- if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX:
- raise ValueError("RSSI %d is out of range" % self.rssi)
-
- if self.toa256 is None:
- raise ValueError("ToA256 is not set")
-
- if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:
- raise ValueError("ToA256 %d is out of range" % self.toa256)
+ if self.c['toa256'] < self.TOA256_MIN or self.c['toa256'] > self.TOA256_MAX:
+ raise ValueError("ToA256 %d is out of range" % self.c['toa256'])
# Version specific parameters (omited for NOPE.ind)
- if self.ver >= 0x01 and not self.nope_ind:
- if type(self.mod_type) is not Modulation:
+ if self._ver >= 1 and not self.c['nope']:
+ if type(self.c['mod_type']) is not Modulation:
raise ValueError("Unknown Rx modulation type")
- if self.tsc_set is None:
- raise ValueError("TSC set is not set")
-
- if self.mod_type is Modulation.ModGMSK:
- if self.tsc_set not in range(0, 4):
- raise ValueError("TSC set %d is out of range" % self.tsc_set)
+ if self.c['mod_type'] is Modulation.ModGMSK:
+ if self.c['tsc_set'] not in range(0, 4):
+ raise ValueError("TSC set %d is out of range" % self.c['tsc_set'])
else:
- if self.tsc_set not in range(0, 2):
- raise ValueError("TSC set %d is out of range" % self.tsc_set)
+ if self.c['tsc_set'] not in range(0, 2):
+ raise ValueError("TSC set %d is out of range" % self.c['tsc_set'])
- if self.tsc is None:
- raise ValueError("TSC is not set")
-
- if self.tsc not in self.TSC_RANGE:
- raise ValueError("TSC %d is out of range" % self.tsc)
+ if self.c['tsc'] not in self.TSC_RANGE:
+ raise ValueError("TSC %d is out of range" % self.c['tsc'])
# Version specific parameters (also present in NOPE.ind)
- if self.ver >= 0x01:
- if self.ci is None:
- raise ValueError("C/I is not set")
+ if self._ver >= 1:
+ if self.c['cir'] < self.CI_MIN or self.c['cir'] > self.CI_MAX:
+ raise ValueError("C/I %d is out of range" % self.c['cir'])
- if self.ci < self.CI_MIN or self.ci > self.CI_MAX:
- raise ValueError("C/I %d is out of range" % self.ci)
-
- self.validate_burst()
-
- def rand_rssi(self, min = None, max = None):
+ def rand_rssi(self, min = None, max = None) -> int:
''' Generate a random RSSI value. '''
if min is None:
@@ -498,7 +362,7 @@
return random.randint(min, max)
- def rand_toa256(self, min = None, max = None):
+ def rand_toa256(self, min = None, max = None) -> int:
''' Generate a random ToA (Time of Arrival) value. '''
if min is None:
@@ -509,185 +373,88 @@
return random.randint(min, max)
- def rand_hdr(self):
+ def rand_hdr(self, nope: bool = False) -> None:
''' Randomize message specific header. '''
Msg.rand_hdr(self)
- self.rssi = self.rand_rssi()
- self.toa256 = self.rand_toa256()
+ self.c['rssi'] = self.rand_rssi()
+ self.c['toa256'] = self.rand_toa256()
+ self.c['nope'] = nope
- if self.ver >= 0x01:
- self.mod_type = random.choice(list(Modulation))
- if self.mod_type is Modulation.ModGMSK:
- self.tsc_set = random.randint(0, 3)
+ if self._ver >= 1 and not nope:
+ self.c['mod_type'] = random.choice(list(Modulation))
+ if self.c['mod_type'] is Modulation.ModGMSK:
+ self.c['tsc_set'] = random.randint(0, 3)
else:
- self.tsc_set = random.randint(0, 1)
- self.tsc = random.choice(self.TSC_RANGE)
+ self.c['tsc_set'] = random.randint(0, 1)
+ self.c['tsc'] = random.choice(self.TSC_RANGE)
+ if self._ver >= 1:
# C/I: Carrier-to-Interference ratio
- self.ci = random.randint(self.CI_MIN, self.CI_MAX)
+ self.c['cir'] = random.randint(self.CI_MIN, self.CI_MAX)
- def desc_hdr(self):
+ def desc_hdr(self) -> str:
''' Generate human-readable header description. '''
# Describe the common part
result = Msg.desc_hdr(self)
- if self.rssi is not None:
- result += ("rssi=%d " % self.rssi)
+ result += ("rssi=%d " % self.c['rssi'])
+ result += ("toa256=%d " % self.c['toa256'])
- if self.toa256 is not None:
- result += ("toa256=%d " % self.toa256)
-
- if self.ver >= 0x01:
- if not self.nope_ind:
- if self.mod_type is not None:
- result += ("%s " % self.mod_type)
- if self.tsc_set is not None:
- result += ("set=%u " % self.tsc_set)
- if self.tsc is not None:
- result += ("tsc=%u " % self.tsc)
- if self.ci is not None:
- result += ("C/I=%d cB " % self.ci)
+ if self._ver >= 0x01:
+ if not self.c['nope']:
+ result += ("%s " % self.c['mod_type'])
+ result += ("set=%u " % self.c['tsc_set'])
+ result += ("tsc=%u " % self.c['tsc'])
+ result += ("C/I=%d cB " % self.c['cir'])
else:
result += "(IDLE / NOPE IND) "
# Strip useless whitespace and return
return result.strip()
- def gen_mts(self):
- ''' Encode Modulation and Training Sequence info. '''
-
- # IDLE / nope indication has no MTS info
- if self.nope_ind:
- return self.NOPE_IND
-
- # TSC: . . . . . X X X
- mts = self.tsc & 0b111
-
- # MTS: . X X X X . . .
- mts |= self.mod_type.coding << 3
- mts |= self.tsc_set << 3
-
- return mts
-
- def parse_mts(self, mts):
- ''' Parse Modulation and Training Sequence info. '''
-
- # IDLE / nope indication has no MTS info
- self.nope_ind = (mts & self.NOPE_IND) > 0
- if self.nope_ind:
- self.mod_type = None
- self.tsc_set = None
- self.tsc = None
- return
-
- # TSC: . . . . . X X X
- self.tsc = mts & 0b111
-
- # MTS: . X X X X . . .
- mts = (mts >> 3) & 0b1111
- if (mts & 0b1100) > 0:
- # Mask: . . . . M M M S
- self.mod_type = Modulation.pick(mts & 0b1110)
- self.tsc_set = mts & 0b1
- else:
- # GMSK: . . . . 0 0 S S
- self.mod_type = Modulation.ModGMSK
- self.tsc_set = mts & 0b11
-
- def gen_hdr(self):
- ''' Generate message specific header part. '''
-
- # Allocate an empty byte-array
- buf = bytearray()
-
- # Put RSSI
- buf.append(-self.rssi)
-
- # Encode ToA (Time of Arrival)
- # Big endian, 2 bytes (int32_t)
- buf += struct.pack(">h", self.toa256)
-
- if self.ver >= 0x01:
- # Modulation and Training Sequence info
- mts = self.gen_mts()
- buf.append(mts)
-
- # C/I: Carrier-to-Interference ratio (in centiBels)
- buf += struct.pack(">h", self.ci)
-
- return buf
-
- def parse_hdr(self, hdr):
- ''' Parse message specific header part. '''
-
- # Parse RSSI
- self.rssi = -(hdr[5])
-
- # Parse ToA (Time of Arrival)
- self.toa256 = struct.unpack(">h", hdr[6:8])[0]
-
- if self.ver >= 0x01:
- # Modulation and Training Sequence info
- self.parse_mts(hdr[8])
-
- # C/I: Carrier-to-Interference ratio (in centiBels)
- self.ci = struct.unpack(">h", hdr[9:11])[0]
-
- def gen_burst(self):
+ def gen_burst(self) -> None:
''' Generate message specific burst. '''
# Convert soft-bits to unsigned soft-bits
- burst_usbits = self.sbit2usbit(self.burst)
+ burst = self.sbit2usbit(self.c['burst'])
+ self.c['soft-bits'] = bytes(burst)
- # Encode to bytes
- return bytearray(burst_usbits)
-
- def _parse_burst_v0(self, burst):
+ def _parse_burst_v0(self, burst: List[int]) -> List[int]:
''' Parse message specific burst for header version 0. '''
bl = len(burst)
# We need to guess modulation by the length of burst
- self.mod_type = Modulation.pick_by_bl(bl)
- if self.mod_type is None:
+ self.c['mod_type'] = Modulation.pick_by_bl(bl)
+ if self.c['mod_type'] is None:
# Some old transceivers append two dummy bytes
- self.mod_type = Modulation.pick_by_bl(bl - 2)
+ self.c['mod_type'] = Modulation.pick_by_bl(bl - 2)
- if self.mod_type is None:
+ if self.c['mod_type'] is None:
raise ValueError("Odd burst length %u" % bl)
- return burst[:self.mod_type.bl]
+ return burst[:self.c['mod_type'].bl]
- def parse_burst(self, burst):
+ def parse_burst(self) -> None:
''' Parse message specific burst. '''
- burst = list(burst)
-
- if self.ver == 0x00:
+ burst = list(self.c['soft-bits'])
+ if self._ver == 0:
burst = self._parse_burst_v0(burst)
# Convert unsigned soft-bits to soft-bits
- self.burst = self.usbit2sbit(burst)
+ self.c['burst'] = self.usbit2sbit(burst)
- def rand_burst(self, length = None):
+ def rand_burst(self, length = None) -> None:
''' Generate a random message specific burst. '''
if length is None:
- length = self.mod_type.bl
+ length = self.c['mod_type'].bl
- self.burst = [random.randint(-127, 127) for _ in range(length)]
+ self.c['burst'] = [random.randint(-127, 127) for _ in range(length)]
- def trans(self, ver = None):
- ''' Transform this message to TxMsg. '''
-
- # Allocate a new message
- msg = TxMsg(fn = self.fn, tn = self.tn,
- ver = self.ver if ver is None else ver)
-
- # Convert burst bits
- if self.burst is not None:
- msg.burst = self.sbit2ubit(self.burst)
-
- return msg
+ def trans_burst(self) -> List[int]:
+ ''' Transform soft-bits into hard-bits. '''
+ return self.sbit2ubit(self.c['burst'])
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 573527b..3226a0e 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -188,9 +188,9 @@
if self.burst_drop_amount == 0:
return False
- if msg.fn % self.burst_drop_period == 0:
+ if msg.c['fn'] % self.burst_drop_period == 0:
log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"
- % (self, msg.fn, self.burst_drop_period))
+ % (self, msg.c['fn'], self.burst_drop_period))
self.burst_drop_amount -= 1
return True
@@ -198,64 +198,63 @@
def _handle_data_msg_v1(self, src_msg, msg):
# C/I (Carrier-to-Interference ratio)
- msg.ci = self.ci
+ msg.c['cir'] = self.ci
# Pick modulation type by burst length
- bl = len(src_msg.burst)
- msg.mod_type = Modulation.pick_by_bl(bl)
+ bl = len(src_msg.c['burst'])
+ msg.c['mod_type'] = Modulation.pick_by_bl(bl)
# Pick TSC (Training Sequence Code) and TSC set
- if msg.mod_type is Modulation.ModGMSK:
- ss = TrainingSeqGMSK.pick(src_msg.burst)
- msg.tsc = ss.tsc if ss is not None else 0
- msg.tsc_set = ss.tsc_set if ss is not None else 0
+ if msg.c['mod_type'] is Modulation.ModGMSK:
+ ss = TrainingSeqGMSK.pick(src_msg.c['burst'])
+ msg.c['tsc'] = ss.tsc if ss is not None else 0
+ msg.c['tsc_set'] = ss.tsc_set if ss is not None else 0
else: # TODO: other modulation types (at least 8-PSK)
- msg.tsc_set = 0
- msg.tsc = 0
+ msg.c['tsc_set'] = 0
+ msg.c['tsc'] = 0
# Takes (partially initialized) TRXD Rx message,
# simulates RF path parameters (such as RSSI),
# and sends towards the L1
def handle_data_msg(self, src_trx, src_msg, msg):
if self.rf_muted:
- msg.nope_ind = True
- elif not msg.nope_ind:
+ msg.c['nope'] = True
+ elif not msg.c['nope']:
# Path loss simulation
- msg.nope_ind = self.sim_burst_drop(msg)
- if msg.nope_ind:
+ msg.c['nope'] = self.sim_burst_drop(msg)
+ if msg.c['nope']:
# Before TRXDv1, we simply drop the message
- if msg.ver < 0x01:
+ if msg._ver < 1:
del msg
return
# Since TRXDv1, we should send a NOPE.ind
- del msg.burst # burst bits are omited
- msg.burst = None
+ msg.c['burst'].clear() # burst bits are omited
# TODO: shoud we make these values configurable?
- msg.toa256 = self.TOA256_NOISE_DEFAULT
- msg.rssi = self.RSSI_NOISE_DEFAULT
- msg.ci = self.CI_NOISE_DEFAULT
+ msg.c['toa256'] = self.TOA256_NOISE_DEFAULT
+ msg.c['rssi'] = self.RSSI_NOISE_DEFAULT
+ msg.c['cir'] = self.CI_NOISE_DEFAULT
self.data_if.send_msg(msg)
return
# Complete message header
- msg.toa256 = self.toa256
+ msg.c['toa256'] = self.toa256
# Apply RSSI based on transmitter:
if not self.fake_rssi_enabled:
- msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT
+ msg.c['rssi'] = src_trx.tx_power - src_msg.c['pwr'] - self.PATH_LOSS_DEFAULT
else: # Apply fake RSSI
- msg.rssi = self.rssi
+ msg.c['rssi'] = self.rssi
# Version specific fields
- if msg.ver >= 0x01:
+ if msg._ver >= 1:
self._handle_data_msg_v1(src_msg, msg)
# Apply optional Timing Advance
if src_trx.ta != 0:
- msg.toa256 -= src_trx.ta * 256
+ msg.c['toa256'] -= src_trx.ta * 256
Transceiver.handle_data_msg(self, msg)
diff --git a/src/target/trx_toolkit/test_data_dump.py b/src/target/trx_toolkit/test_data_dump.py
index f7b4fde..2dad6b2 100644
--- a/src/target/trx_toolkit/test_data_dump.py
+++ b/src/target/trx_toolkit/test_data_dump.py
@@ -97,7 +97,7 @@
msg_ref = self._gen_rand_message(cls)
self._ddf.append_msg(msg_ref)
- msg = self._ddf.parse_msg(0)
+ msg = self._ddf.from_bytes(0)
self._compare_msg(msg, msg_ref)
# Store one Rx message in a file, read it back and compare
@@ -120,7 +120,7 @@
self._compare_msg(msg_list[i], msg_list_ref[i])
# Verify random access to stored messages
- def test_parse_msg_idx(self):
+ def test_from_bytes_idx(self):
# Store a mixed list of random messages (19 + 19)
msg_list_ref = self._gen_rand_message_mix(19)
self._ddf.append_all(msg_list_ref)
@@ -128,13 +128,13 @@
# Random access
for _ in range(100):
idx = random.randrange(len(msg_list_ref))
- msg = self._ddf.parse_msg(idx)
+ msg = self._ddf.from_bytes(idx)
self._compare_msg(msg, msg_list_ref[idx])
def test_parse_empty(self):
with self.assertLogs(level = 'ERROR'):
for idx in range(100):
- msg = self._ddf.parse_msg(idx)
+ msg = self._ddf.from_bytes(idx)
self.assertEqual(msg, None)
def test_parse_all_empty(self):
@@ -148,7 +148,7 @@
self._tf.write(b'\xff' * 90)
with self.assertLogs(level = 'ERROR'):
- msg = self._ddf.parse_msg(0)
+ msg = self._ddf.from_bytes(0)
self.assertEqual(msg, None)
def test_parse_unknown_tag(self):
@@ -158,7 +158,7 @@
self._tf.write(b'\xff' * 90)
with self.assertLogs(level = 'ERROR'):
- msg = self._ddf.parse_msg(0)
+ msg = self._ddf.from_bytes(0)
self.assertEqual(msg, None)
if __name__ == '__main__':
diff --git a/src/target/trx_toolkit/test_data_msg.py b/src/target/trx_toolkit/test_data_msg.py
index 24fda67..b703ed4 100644
--- a/src/target/trx_toolkit/test_data_msg.py
+++ b/src/target/trx_toolkit/test_data_msg.py
@@ -5,6 +5,7 @@
# Unit test for TRXD message codec
#
# (C) 2019 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2021 by sysmocom - s.f.m.c. GmbH <info at sysmocom.de>
#
# All Rights Reserved
#
@@ -23,171 +24,33 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import unittest
+import logging
from data_msg import Msg, TxMsg, RxMsg
class Msg_Test(unittest.TestCase):
# Compare message a with message b
- def _compare_msg(self, a, b):
+ def _compare_msg(self, a: Msg, b: Msg) -> None:
# Make sure we're comparing messages of the same type
self.assertEqual(a.__class__, b.__class__)
- # Compare common header fields
- self.assertEqual(a.ver, b.ver)
- self.assertEqual(a.fn, b.fn)
- self.assertEqual(a.tn, b.tn)
+ # PDU version and fields
+ self.assertEqual(a._ver, b._ver)
+ for field in a.__class__.DEF_CONT:
+ self.assertEqual(a[field], b[field])
- # Burst bits (if present)
- self.assertEqual(a.burst, b.burst)
-
- # TxMsg specific fields
- if isinstance(a, TxMsg):
- self.assertEqual(a.pwr, b.pwr)
-
- # RxMsg specific fields
- if isinstance(a, RxMsg):
- # Version independent fields
- self.assertEqual(a.toa256, b.toa256)
- self.assertEqual(a.rssi, b.rssi)
-
- # Version specific fields
- if a.ver >= 1:
- self.assertEqual(a.nope_ind, b.nope_ind)
- self.assertEqual(a.mod_type, b.mod_type)
- self.assertEqual(a.tsc_set, b.tsc_set)
- self.assertEqual(a.tsc, b.tsc)
- self.assertEqual(a.ci, b.ci)
-
- # Make sure that message validation throws a ValueError
- def test_validate(self):
- # Unknown version
- with self.assertRaises(ValueError):
- msg = RxMsg(fn = 0, tn = 0, ver = 100)
- msg.validate()
-
- # Uninitialized field
- with self.assertRaises(ValueError):
- msg = RxMsg()
- msg.validate()
- with self.assertRaises(ValueError):
- msg = RxMsg(fn = None, tn = 0)
- msg.validate()
-
- # Out-of-range value(s)
- with self.assertRaises(ValueError):
- msg = RxMsg(fn = -1, tn = 0)
- msg.validate()
- with self.assertRaises(ValueError):
- msg = RxMsg(fn = 0, tn = 10)
- msg.validate()
-
- # Validate header and burst randomization
- def test_rand_hdr_burst(self):
- tx_msg = TxMsg()
- rx_msg = RxMsg()
-
- for i in range(100):
- tx_msg.rand_burst()
- rx_msg.rand_burst()
- tx_msg.rand_hdr()
- rx_msg.rand_hdr()
-
- tx_msg.validate()
- rx_msg.validate()
-
- def _test_enc_dec(self, msg, legacy = False, nope_ind = False):
- # Prepare a given message (randomize)
- msg.rand_hdr()
-
- # NOPE.ind contains no burst
- if not nope_ind:
- msg.rand_burst()
- else:
- msg.nope_ind = True
- msg.mod_type = None
- msg.tsc_set = None
- msg.tsc = None
-
- # Encode a given message to bytes
- msg_enc = msg.gen_msg(legacy)
-
- # Decode a new message from bytes
- msg_dec = msg.__class__()
- msg_dec.parse_msg(msg_enc)
-
- # Compare decoded vs the original
- self._compare_msg(msg, msg_dec)
-
- # Validate encoding and decoding
- def test_enc_dec(self):
+ def test_enc_dec_default(self):
+ ''' Encode/decode/match test for default field values. '''
for ver in Msg.KNOWN_VERSIONS:
- with self.subTest("TxMsg", ver = ver):
- msg = TxMsg(ver = ver)
- self._test_enc_dec(msg)
+ with self.subTest('TxMsg(ver=%u)' % ver):
+ a, b = TxMsg(ver), TxMsg(ver)
+ b.from_bytes(a.to_bytes())
+ self._compare_msg(a, b)
- with self.subTest("RxMsg", ver = ver):
- msg = RxMsg(ver = ver)
- self._test_enc_dec(msg)
-
- if ver >= 1:
- with self.subTest("RxMsg NOPE.ind", ver = ver):
- msg = RxMsg(ver = ver)
- self._test_enc_dec(msg, nope_ind = True)
-
- with self.subTest("RxMsg (legacy transceiver)"):
- msg = RxMsg(ver = 0)
- self._test_enc_dec(msg, legacy = True)
-
- # Validate bit conversations
- def test_bit_conv(self):
- usbits_ref = list(range(0, 256))
- sbits_ref = list(range(-127, 128))
-
- # Test both usbit2sbit() and sbit2usbit()
- sbits = Msg.usbit2sbit(usbits_ref)
- usbits = Msg.sbit2usbit(sbits)
- self.assertEqual(usbits[:255], usbits_ref[:255])
- self.assertEqual(usbits[255], 254)
-
- # Test both sbit2ubit() and ubit2sbit()
- ubits = Msg.sbit2ubit(sbits_ref)
- self.assertEqual(ubits, ([1] * 127 + [0] * 128))
-
- sbits = Msg.ubit2sbit(ubits)
- self.assertEqual(sbits, ([-127] * 127 + [127] * 128))
-
- def _test_transform(self, msg):
- # Prepare given messages
- msg.rand_hdr()
- msg.rand_burst()
-
- # Perform message transformation
- if isinstance(msg, TxMsg):
- msg_trans = msg.trans()
- else:
- msg_trans = msg.trans()
-
- self.assertEqual(msg_trans.ver, msg.ver)
- self.assertEqual(msg_trans.fn, msg.fn)
- self.assertEqual(msg_trans.tn, msg.tn)
-
- if isinstance(msg, RxMsg):
- burst = Msg.sbit2ubit(msg.burst)
- self.assertEqual(msg_trans.burst, burst)
- else:
- burst = Msg.ubit2sbit(msg.burst)
- self.assertEqual(msg_trans.burst, burst)
-
- # Validate message transformation
- def test_transform(self):
- for ver in Msg.KNOWN_VERSIONS:
- with self.subTest("TxMsg", ver = ver):
- msg = TxMsg(ver = ver)
- self._test_transform(msg)
-
- with self.subTest("RxMsg", ver = ver):
- msg = RxMsg(ver = ver)
- self._test_transform(msg)
+ with self.subTest('RxMsg(ver=%u)' % ver):
+ a, b = RxMsg(ver), RxMsg(ver)
+ b.from_bytes(a.to_bytes())
+ self._compare_msg(a, b)
if __name__ == '__main__':
unittest.main()
diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py
index d041070..54e6a6b 100644
--- a/src/target/trx_toolkit/transceiver.py
+++ b/src/target/trx_toolkit/transceiver.py
@@ -269,7 +269,7 @@
return None
# Make sure that indicated timeslot is configured
- if msg.tn not in self.ts_list:
+ if msg.c['tn'] not in self.ts_list:
log.warning("(%s) RX TRXD message (%s), but timeslot is not "
"configured => dropping..." % (self, msg.desc_hdr()))
return None
@@ -278,4 +278,4 @@
def handle_data_msg(self, msg):
# TODO: make legacy mode configurable (via argv?)
- self.data_if.send_msg(msg, legacy = True)
+ self.data_if.send_msg(msg)
diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py
index 8b6f80c..389a663 100755
--- a/src/target/trx_toolkit/trx_sniff.py
+++ b/src/target/trx_toolkit/trx_sniff.py
@@ -108,7 +108,7 @@
# Attempt to parse the payload as a DATA message
try:
- msg.parse_msg(msg_raw)
+ msg.from_bytes(msg_raw)
msg.validate()
except ValueError as e:
desc = msg.desc_hdr()
@@ -160,7 +160,7 @@
# Message type specific filtering
if isinstance(msg, RxMsg):
# NOPE.ind filter
- if not self.argv.pf_nope_ind and msg.nope_ind:
+ if not self.argv.pf_nope_ind and msg.c['nope']:
return False
# RSSI filter
--
To view, visit https://gerrit.osmocom.org/c/osmocom-bb/+/24019
To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings
Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc
Gerrit-Change-Number: 24019
Gerrit-PatchSet: 1
Gerrit-Owner: fixeria <vyanitskiy at sysmocom.de>
Gerrit-MessageType: newchange
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20210430/cdeba577/attachment.htm>