Change in osmocom-bb[master]: trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py

fixeria gerrit-no-reply at lists.osmocom.org
Fri Apr 30 20:34:10 UTC 2021


fixeria has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmocom-bb/+/24019 )


Change subject: trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py
......................................................................

trx_toolkit/data_msg.py: migrate to codec.py and trxd_proto.py

Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc
Related: OS#4006, SYS#4895
---
M src/target/trx_toolkit/burst_fwd.py
M src/target/trx_toolkit/data_dump.py
M src/target/trx_toolkit/data_if.py
M src/target/trx_toolkit/data_msg.py
M src/target/trx_toolkit/fake_trx.py
M src/target/trx_toolkit/test_data_dump.py
M src/target/trx_toolkit/test_data_msg.py
M src/target/trx_toolkit/transceiver.py
M src/target/trx_toolkit/trx_sniff.py
9 files changed, 283 insertions(+), 672 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/19/24019/1

diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py
index 2e9e97b..03ce6e6 100644
--- a/src/target/trx_toolkit/burst_fwd.py
+++ b/src/target/trx_toolkit/burst_fwd.py
@@ -3,7 +3,7 @@
 # TRX Toolkit
 # Burst forwarding between transceivers
 #
-# (C) 2017-2020 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2017-2021 by Vadim Yanitskiy <axilirator at gmail.com>
 # Contributions by sysmocom - s.f.m.c. GmbH
 #
 # All Rights Reserved
@@ -25,6 +25,7 @@
 import logging as log
 
 from trx_list import TRXList
+from data_msg import RxMsg
 
 class BurstForwarder(TRXList):
 	""" Performs burst forwarding between transceivers.
@@ -48,11 +49,11 @@
 	def forward_msg(self, src_trx, rx_msg):
 		# Originating Transceiver may use frequency hopping,
 		# so let's precalculate its Tx frequency in advance
-		tx_freq = src_trx.get_tx_freq(rx_msg.fn)
+		tx_freq = src_trx.get_tx_freq(rx_msg.c['fn'])
 
 		if src_trx.rf_muted:
-			del rx_msg.burst # burst bits are omited
-			rx_msg.burst = None
+			# Burst bits are omited
+			rx_msg.c['burst'].clear()
 
 		# Iterate over all known transceivers
 		for trx in self.trx_list:
@@ -62,13 +63,13 @@
 			# Check transceiver state
 			if not trx.running:
 				continue
-			if rx_msg.tn not in trx.ts_list:
+			if rx_msg.c['tn'] not in trx.ts_list:
 				continue
 
 			# Match Tx/Rx frequencies of the both transceivers
-			if trx.get_rx_freq(rx_msg.fn) != tx_freq:
+			if trx.get_rx_freq(rx_msg.c['fn']) != tx_freq:
 				continue
 
-			# Transform from TxMsg to RxMsg and forward
-			tx_msg = rx_msg.trans(ver = trx.data_if._hdr_ver)
+			# Transform from L12TRX to TRX2L1 and forward
+			tx_msg = rx_msg.trans(RxMsg, trx.data_if._hdr_ver)
 			trx.handle_data_msg(src_trx, rx_msg, tx_msg)
diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py
index 8510e2d..fe8a2d8 100644
--- a/src/target/trx_toolkit/data_dump.py
+++ b/src/target/trx_toolkit/data_dump.py
@@ -44,7 +44,7 @@
 			raise ValueError("Unknown message type")
 
 		# Generate a message payload
-		msg_raw = msg.gen_msg()
+		msg_raw = msg.to_bytes()
 
 		# Calculate and pack the message length
 		msg_len = len(msg_raw)
@@ -118,7 +118,7 @@
 	#   a parsed message in case of success,
 	#   or None in case of EOF or header parsing error,
 	#   or False in case of message parsing error.
-	def _parse_msg(self):
+	def _from_bytes(self):
 		# Attempt to read a message header
 		hdr_raw = self.f.read(self.HDR_LENGTH)
 		if len(hdr_raw) != self.HDR_LENGTH:
@@ -142,7 +142,7 @@
 		# Attempt to parse a message
 		try:
 			msg_raw = bytearray(msg_raw)
-			msg.parse_msg(msg_raw)
+			msg.from_bytes(msg_raw)
 		except:
 			log.error("Couldn't parse a message, skipping...")
 			return False
@@ -155,7 +155,7 @@
 	#   a parsed message in case of success,
 	#   or None in case of EOF, out of range, or header parsing error,
 	#   or False in case of message parsing error.
-	def parse_msg(self, idx):
+	def from_bytes(self, idx):
 		# Move descriptor to the beginning of requested message
 		rc = self._seek2msg(idx)
 		if not rc:
@@ -163,7 +163,7 @@
 			return None
 
 		# Attempt to parse a message
-		return self._parse_msg()
+		return self._from_bytes()
 
 	# Parses all messages from a given file
 	# Return value:
@@ -185,7 +185,7 @@
 		# Read the capture in loop...
 		while True:
 			# Attempt to parse a message
-			msg = self._parse_msg()
+			msg = self._from_bytes()
 
 			# EOF or broken header
 			if msg is None:
diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py
index 1cded9b..f754c6c 100644
--- a/src/target/trx_toolkit/data_if.py
+++ b/src/target/trx_toolkit/data_if.py
@@ -23,6 +23,8 @@
 
 import logging as log
 
+from typing import Optional
+
 from udp_link import UDPLink
 from data_msg import *
 
@@ -50,64 +52,43 @@
 		# No suitable version found
 		return -1
 
-	def match_hdr_ver(self, msg):
-		if msg.ver == self._hdr_ver:
-			return True
-
-		log.error("(%s) Rx DATA message (%s) with unexpected header "
-			  "version %u (!= expected %u), ignoring..."
-			% (self.desc_link(), msg.desc_hdr(),
-			   msg.ver, self._hdr_ver))
-		return False
-
-	def recv_raw_data(self):
+	def recv_raw_data(self) -> bytes:
 		data, _ = self.sock.recvfrom(512)
 		return data
 
-	def recv_tx_msg(self):
+	def recv_tx_msg(self) -> Optional[TxMsg]:
 		# Read raw data from socket
 		data = self.recv_raw_data()
 
 		# Attempt to parse a TRXD Tx message
 		try:
-			msg = TxMsg()
-			msg.parse_msg(bytearray(data))
+			msg = TxMsg(self._hdr_ver)
+			msg.from_bytes(data)
 		except:
 			log.error("Failed to parse a TRXD Tx message "
 				"from R:%s:%u" % (self.remote_addr, self.remote_port))
 			return None
 
-		# Make sure the header version matches
-		# the configured one (self._hdr_ver)
-		if not self.match_hdr_ver(msg):
-			return None
-
 		return msg
 
-	def recv_rx_msg(self):
+	def recv_rx_msg(self) -> Optional[RxMsg]:
 		# Read raw data from socket
 		data = self.recv_raw_data()
 
 		# Attempt to parse a TRXD Rx message
 		try:
-			msg = RxMsg()
-			msg.parse_msg(bytearray(data))
+			msg = RxMsg(self._hdr_ver)
+			msg.from_bytes(data)
 		except:
 			log.error("Failed to parse a TRXD Rx message "
 				"from R:%s:%u" % (self.remote_addr, self.remote_port))
 			return None
 
-		# Make sure the header version matches
-		# the configured one (self._hdr_ver)
-		if not self.match_hdr_ver(msg):
-			return None
-
 		return msg
 
-	def send_msg(self, msg, legacy = False):
+	def send_msg(self, msg: RxMsg) -> None:
 		try:
-			# Validate and encode a TRXD message
-			payload = msg.gen_msg(legacy)
+			payload = msg.to_bytes()
 		except ValueError as e:
 			log.error("Failed to encode a TRXD message ('%s') "
 				"due to error: %s" % (msg.desc_hdr(), e))
diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py
index 7e785f9..570cfa0 100644
--- a/src/target/trx_toolkit/data_msg.py
+++ b/src/target/trx_toolkit/data_msg.py
@@ -3,7 +3,8 @@
 # TRX Toolkit
 # DATA interface message definitions and helpers
 #
-# (C) 2018-2019 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2018-2021 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2021 by sysmocom - s.f.m.c. GmbH <info at sysmocom.de>
 #
 # All Rights Reserved
 #
@@ -25,35 +26,39 @@
 import struct
 import abc
 
-from typing import List
+from typing import Any, Type, List, Tuple, Dict
 from enum import Enum
 from gsm_shared import *
 
+import trxd_proto
+import codec
+
 class Modulation(Enum):
 	""" Modulation types defined in 3GPP TS 45.002 """
-	ModGMSK		= (0b0000, 1 * GMSK_BURST_LEN)
-	Mod8PSK		= (0b0100, 3 * GMSK_BURST_LEN)
-	ModGMSK_AB	= (0b0110, 1 * GMSK_BURST_LEN)
-	ModRFU		= (0b0111, 0) # Reserved for Future Use
-	Mod16QAM	= (0b1000, 4 * GMSK_BURST_LEN)
-	Mod32QAM	= (0b1010, 5 * GMSK_BURST_LEN)
-	ModAQPSK	= (0b1100, 2 * GMSK_BURST_LEN)
+	ModGMSK		= (0b0000, 0b1100, 1 * GMSK_BURST_LEN)
+	Mod8PSK		= (0b0100, 0b1110, 3 * GMSK_BURST_LEN)
+	ModGMSK_AB	= (0b0110, 0b1111, 1 * GMSK_BURST_LEN)
+	ModRFU		= (0b0111, 0b1111, 0) # Reserved for Future Use
+	Mod16QAM	= (0b1000, 0b1110, 4 * GMSK_BURST_LEN)
+	Mod32QAM	= (0b1010, 0b1110, 5 * GMSK_BURST_LEN)
+	ModAQPSK	= (0b1100, 0b1100, 2 * GMSK_BURST_LEN)
 
-	def __init__(self, coding, bl):
+	def __init__(self, val: int, mask: int, bl: int):
 		# Coding in TRXD header
-		self.coding = coding
+		self.val = val
+		self.mask = mask
 		# Burst length
 		self.bl = bl
 
 	@classmethod
-	def pick(self, coding):
+	def pick(self, val: int):
 		for mod in list(self):
-			if mod.coding == coding:
+			if (val & mod.mask) == mod.val:
 				return mod
 		return None
 
 	@classmethod
-	def pick_by_bl(self, bl):
+	def pick_by_bl(self, bl: int):
 		for mod in list(self):
 			if mod.bl == bl:
 				return mod
@@ -66,66 +71,87 @@
 	CHDR_VERSION_MAX = 0b1111
 	KNOWN_VERSIONS = (0, 1)
 
-	def __init__(self, fn = None, tn = None, burst = None, ver = 0):
-		self.burst = burst
-		self.ver = ver
-		self.fn = fn
-		self.tn = tn
+	# PDU codecs for all known versions
+	CODECS = NotImplemented # type: Tuple[codec.Envelope, ...]
 
-	@property
-	def CHDR_LEN(self):
-		''' The common header length. '''
-		return 1 + 4 # (VER + TN) + FN
+	# Default PDU content for child types
+	DEF_CONT = NotImplemented # type: Dict[str, Any]
+
+	def __init__(self, ver: int, cont: Dict[str, Any] = { }):
+		# TRXD PDU version
+		if not ver in self.KNOWN_VERSIONS:
+			raise ValueError("Unknown TRXD PDU version %d" % ver)
+		self._ver = ver
+
+		# TRXD PDU codec
+		self.codec = self.CODECS[ver]
+
+		# Content of the message
+		self.c = {
+			# Default TDMA frame/timeslot number
+			'fn'		: 0,
+			'tn'		: 0,
+			# NOPE / IDLE frame indication
+			'nope'		: False,
+			# Modulation type and TSC info
+			'mod_type'	: Modulation.ModGMSK,
+			'tsc_set'	: 0,
+			'tsc'		: 0,
+			# Burst hard-/soft-bits
+			'burst'		: [],
+			# Default fields for particular class
+			**self.DEF_CONT,
+			# Fields provided during instantiation
+			**cont
+		} # type: Dict[str, Any]
+
+	def __getitem__(self, key: str) -> Any:
+		return self.c[key]
+
+	def __setitem__(self, key: str, val: Any) -> None:
+		self.c[key] = val
+
+	def __delitem__(self, key: str) -> None:
+		del self.c[key]
 
 	@abc.abstractmethod
-	def gen_hdr(self):
-		''' Generate message specific header. '''
-
-	@abc.abstractmethod
-	def parse_hdr(self, hdr):
-		''' Parse message specific header. '''
-
-	@abc.abstractmethod
-	def gen_burst(self):
+	def gen_burst(self) -> None:
 		''' Generate message specific burst. '''
 
 	@abc.abstractmethod
-	def parse_burst(self, burst):
+	def parse_burst(self) -> None:
 		''' Parse message specific burst. '''
 
 	@abc.abstractmethod
-	def rand_burst(self):
+	def rand_burst(self) -> None:
 		''' Generate a random message specific burst. '''
 
-	def rand_fn(self):
+	@abc.abstractmethod
+	def trans_burst(self) -> List[int]:
+		''' Convert between hard-bits and soft-bits. '''
+
+	def rand_fn(self) -> int:
 		''' Generate a random frame number. '''
 		return random.randint(0, GSM_HYPERFRAME)
 
-	def rand_tn(self):
+	def rand_tn(self) -> int:
 		''' Generate a random timeslot number. '''
 		return random.randint(0, 7)
 
-	def rand_hdr(self):
+	def rand_hdr(self) -> None:
 		''' Randomize the message header. '''
-		self.fn = self.rand_fn()
-		self.tn = self.rand_tn()
+		self.c['fn'] = self.rand_fn()
+		self.c['tn'] = self.rand_tn()
 
-	def desc_hdr(self):
+	def desc_hdr(self) -> str:
 		''' Generate human-readable header description. '''
 
 		result = ""
 
-		if self.ver > 0:
-			result += ("ver=%u " % self.ver)
-
-		if self.fn is not None:
-			result += ("fn=%u " % self.fn)
-
-		if self.tn is not None:
-			result += ("tn=%u " % self.tn)
-
-		if self.burst is not None and len(self.burst) > 0:
-			result += ("bl=%u " % len(self.burst))
+		result += ("ver=%u " % self._ver)
+		result += ("fn=%u tn=%u " % (self.c['fn'], self.c['tn']))
+		if self.c['burst']:
+			result += ("bl=%u " % len(self.c['burst']))
 
 		return result
 
@@ -149,131 +175,76 @@
 		''' Convert bits {1..0} to soft-bits {-127..127}. '''
 		return [-127 if b else 127 for b in bits]
 
-	def validate(self):
+	def validate(self) -> None:
 		''' Validate the message fields (throws ValueError). '''
 
-		if not self.ver in self.KNOWN_VERSIONS:
-			raise ValueError("Unknown TRXD header version %d" % self.ver)
+		if self.c['fn'] < 0 or self.c['fn'] > GSM_HYPERFRAME:
+			raise ValueError("TDMA frame-number %d is out of range" % self.c['fn'])
 
-		if self.fn is None:
-			raise ValueError("TDMA frame-number is not set")
+		if self.c['tn'] < 0 or self.c['tn'] > 7:
+			raise ValueError("TDMA time-slot %d is out of range" % self.c['tn'])
 
-		if self.fn < 0 or self.fn > GSM_HYPERFRAME:
-			raise ValueError("TDMA frame-number %d is out of range" % self.fn)
-
-		if self.tn is None:
-			raise ValueError("TDMA time-slot is not set")
-
-		if self.tn < 0 or self.tn > 7:
-			raise ValueError("TDMA time-slot %d is out of range" % self.tn)
-
-	def gen_msg(self, legacy = False):
+	def to_bytes(self) -> bytes:
 		''' Generate a TRX DATA message. '''
+		if not self.c['nope']:
+			self.gen_burst()
+		return self.codec._to_bytes(self.c)
 
-		# Validate all the fields
-		self.validate()
-
-		# Allocate an empty byte-array
-		buf = bytearray()
-
-		# Put version (4 bits) and TDMA TN (3 bits)
-		buf.append((self.ver << 4) | (self.tn & 0x07))
-
-		# Put TDMA FN (4 octets, BE)
-		buf += struct.pack(">L", self.fn)
-
-		# Generate message specific header part
-		hdr = self.gen_hdr()
-		buf += hdr
-
-		# Generate burst
-		if self.burst is not None:
-			buf += self.gen_burst()
-
-		# This is a rudiment from (legacy) OpenBTS transceiver,
-		# some L1 implementations still expect two dummy bytes.
-		if legacy and self.ver == 0x00:
-			buf += bytearray(2)
-
-		return buf
-
-	def parse_msg(self, msg):
+	def from_bytes(self, msg: bytes) -> None:
 		''' Parse a TRX DATA message. '''
+		self.codec._from_bytes(self.c, msg)
+		if not self.c['nope']:
+			self.parse_burst()
 
-		# Make sure we have at least common header
-		if len(msg) < self.CHDR_LEN:
-			raise ValueError("Message is to short: missing common header")
+	def trans(self, cls: Type['Msg'], ver: int) -> 'Msg':
+		''' Transform between L12TRX and TRX2L1. '''
 
-		# Parse the header version first
-		self.ver = (msg[0] >> 4)
-		if not self.ver in self.KNOWN_VERSIONS:
-			raise ValueError("Unknown TRXD header version %d" % self.ver)
+		# Allocate a new message
+		msg = cls(ver, {
+			'fn' : self.c['fn'],
+			'tn' : self.c['tn'],
+		})
 
-		# Parse TDMA TN and FN
-		self.tn = (msg[0] & 0x07)
-		self.fn = struct.unpack(">L", msg[1:5])[0]
-
-		# Make sure we have the whole header,
-		# including the version specific fields
-		if len(msg) < self.HDR_LEN:
-			raise ValueError("Message is to short: missing version specific header")
-
-		# Specific message part
-		self.parse_hdr(msg)
-
-		# Copy burst, skipping header
-		msg_burst = msg[self.HDR_LEN:]
-		if len(msg_burst) > 0:
-			self.parse_burst(msg_burst)
+		# Convert burst bits
+		if self.c['burst'] and not self.c['nope']:
+			msg.c['burst'] = self.trans_burst()
 		else:
-			self.burst = None
+			msg.c['nope'] = True
+
+		return msg
 
 class TxMsg(Msg):
 	''' Tx (L1 -> TRX) message coding API. '''
 
+	# PDU codecs for all known versions
+	CODECS = (
+		trxd_proto.PDUv0Tx(),
+		trxd_proto.PDUv1Tx(),
+	)
+
+	DEF_CONT = {
+		# Power reduction (in dB)
+		'pwr'		: 0,
+	}
+
 	# Constants
 	PWR_MIN = 0x00
 	PWR_MAX = 0xff
 
-	# Specific message fields
-	pwr = None
-
-	@property
-	def HDR_LEN(self):
-		''' Calculate header length depending on its version. '''
-
-		# Common header length
-		length = self.CHDR_LEN
-
-		# Message specific header length
-		if self.ver in (0x00, 0x01):
-			length += 1 # PWR
-		else:
-			raise IndexError("Unhandled version %u" % self.ver)
-
-		return length
-
-	def validate(self):
+	def validate(self) -> None:
 		''' Validate the message fields (throws ValueError). '''
 
 		# Validate common fields
 		Msg.validate(self)
 
-		if self.pwr is None:
-			raise ValueError("Tx Attenuation level is not set")
-
-		if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX:
-			raise ValueError("Tx Attenuation %d is out of range" % self.pwr)
+		if self.c['pwr'] < self.PWR_MIN or self.c['pwr'] > self.PWR_MAX:
+			raise ValueError("Tx Attenuation %d is out of range" % self.c['pwr'])
 
 		# FIXME: properly handle IDLE / NOPE indications
-		if self.burst is None:
-			raise ValueError("Tx burst bits are not set")
+		if len(self.c['burst']) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
+			raise ValueError("Tx burst has odd length %u" % len(self.c['burst']))
 
-		# FIXME: properly handle IDLE / NOPE indications
-		if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
-			raise ValueError("Tx burst has odd length %u" % len(self.burst))
-
-	def rand_pwr(self, min = None, max = None):
+	def rand_pwr(self, min = None, max = None) -> int:
 		''' Generate a random power level. '''
 
 		if min is None:
@@ -284,80 +255,55 @@
 
 		return random.randint(min, max)
 
-	def rand_hdr(self):
+	def rand_hdr(self) -> None:
 		''' Randomize message specific header. '''
 
 		Msg.rand_hdr(self)
-		self.pwr = self.rand_pwr()
+		self.c['pwr'] = self.rand_pwr()
 
-	def desc_hdr(self):
+	def desc_hdr(self) -> str:
 		''' Generate human-readable header description. '''
 
 		# Describe the common part
 		result = Msg.desc_hdr(self)
 
-		if self.pwr is not None:
-			result += ("pwr=%u " % self.pwr)
+		result += ("pwr=%u " % self.c['pwr'])
 
 		# Strip useless whitespace and return
 		return result.strip()
 
-	def gen_hdr(self):
-		''' Generate message specific header part. '''
-
-		# Allocate an empty byte-array
-		buf = bytearray()
-
-		# Put power
-		buf.append(self.pwr)
-
-		return buf
-
-	def parse_hdr(self, hdr):
-		''' Parse message specific header part. '''
-
-		# Parse power level
-		self.pwr = hdr[5]
-
-	def gen_burst(self):
+	def gen_burst(self) -> None:
 		''' Generate message specific burst. '''
+		self.c['hard-bits'] = bytes(self.c['burst'])
 
-		# Copy burst 'as is'
-		return bytearray(self.burst)
-
-	def parse_burst(self, burst):
+	def parse_burst(self) -> None:
 		''' Parse message specific burst. '''
+		self.c['burst'] = list(self.c['hard-bits'])
 
-		length = len(burst)
-
-		# Distinguish between GSM and EDGE
-		if length >= EDGE_BURST_LEN:
-			self.burst = list(burst[:EDGE_BURST_LEN])
-		else:
-			self.burst = list(burst[:GMSK_BURST_LEN])
-
-	def rand_burst(self, length = GMSK_BURST_LEN):
+	def rand_burst(self, length = GMSK_BURST_LEN) -> None:
 		''' Generate a random message specific burst. '''
-		self.burst = [random.randint(0, 1) for _ in range(length)]
+		self.c['burst'] = [random.randint(0, 1) for _ in range(length)]
 
-	def trans(self, ver = None):
-		''' Transform this message into RxMsg. '''
-
-		# Allocate a new message
-		msg = RxMsg(fn = self.fn, tn = self.tn,
-			ver = self.ver if ver is None else ver)
-
-		# Convert burst bits
-		if self.burst is not None:
-			msg.burst = self.ubit2sbit(self.burst)
-		else:
-			msg.nope_ind = True
-
-		return msg
+	def trans_burst(self) -> List[int]:
+		''' Transform hard-bits into soft-bits. '''
+		return self.ubit2sbit(self.c['burst'])
 
 class RxMsg(Msg):
 	''' Rx (TRX -> L1) message coding API. '''
 
+	# PDU codecs for all known versions
+	CODECS = (
+		trxd_proto.PDUv0Rx(),
+		trxd_proto.PDUv1Rx(),
+	)
+
+	DEF_CONT = {
+		# Specific message fields
+		'rssi'		: -50,
+		'toa256'	: 0,
+		'cir'		: 0,
+	}
+
 	# rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise
 	RSSI_MIN = -120
 	RSSI_MAX = -47
@@ -373,121 +319,39 @@
 	CI_MIN = -1280
 	CI_MAX = 1280
 
-	# IDLE frame / nope detection indicator
-	NOPE_IND = (1 << 7)
-
-	# Specific message fields
-	rssi = None
-	toa256 = None
-
-	# Version 0x01 specific (default values)
-	mod_type = Modulation.ModGMSK
-	nope_ind = False
-
-	tsc_set = None
-	tsc = None
-	ci = None
-
-	@property
-	def HDR_LEN(self):
-		''' Calculate header length depending on its version. '''
-
-		# Common header length
-		length = self.CHDR_LEN
-
-		# Message specific header length
-		if self.ver == 0x00:
-			# RSSI + ToA
-			length += 1 + 2
-		elif self.ver == 0x01:
-			# RSSI + ToA + TS + C/I
-			length += 1 + 2 + 1 + 2
-		else:
-			raise IndexError("Unhandled version %u" % self.ver)
-
-		return length
-
-	def _validate_burst_v0(self):
-		# Burst is mandatory
-		if self.burst is None:
-			raise ValueError("Rx burst bits are not set")
-
-		# ... and can be either of GSM (GMSK) or EDGE (8-PSK)
-		if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
-			raise ValueError("Rx burst has odd length %u" % len(self.burst))
-
-	def _validate_burst_v1(self):
-		# Burst is omitted in case of an IDLE / NOPE indication
-		if self.nope_ind and self.burst is None:
-			return
-
-		if self.nope_ind and self.burst is not None:
-			raise ValueError("NOPE.ind comes with burst?!?")
-		if self.burst is None:
-			raise ValueError("Rx burst bits are not set")
-
-		# Burst length depends on modulation type
-		if len(self.burst) != self.mod_type.bl:
-			raise ValueError("Rx burst has odd length %u" % len(self.burst))
-
-	def validate_burst(self):
-		''' Validate the burst (throws ValueError). '''
-
-		if self.ver == 0x00:
-			self._validate_burst_v0()
-		elif self.ver >= 0x01:
-			self._validate_burst_v1()
-
-	def validate(self):
+	def validate(self) -> None:
 		''' Validate the message header fields (throws ValueError). '''
 
 		# Validate common fields
 		Msg.validate(self)
 
-		if self.rssi is None:
-			raise ValueError("RSSI is not set")
+		if self.c['rssi'] < self.RSSI_MIN or self.c['rssi'] > self.RSSI_MAX:
+			raise ValueError("RSSI %d is out of range" % self.c['rssi'])
 
-		if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX:
-			raise ValueError("RSSI %d is out of range" % self.rssi)
-
-		if self.toa256 is None:
-			raise ValueError("ToA256 is not set")
-
-		if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:
-			raise ValueError("ToA256 %d is out of range" % self.toa256)
+		if self.c['toa256'] < self.TOA256_MIN or self.c['toa256'] > self.TOA256_MAX:
+			raise ValueError("ToA256 %d is out of range" % self.c['toa256'])
 
 		# Version specific parameters (omited for NOPE.ind)
-		if self.ver >= 0x01 and not self.nope_ind:
-			if type(self.mod_type) is not Modulation:
+		if self._ver >= 1 and not self.c['nope']:
+			if type(self.c['mod_type']) is not Modulation:
 				raise ValueError("Unknown Rx modulation type")
 
-			if self.tsc_set is None:
-				raise ValueError("TSC set is not set")
-
-			if self.mod_type is Modulation.ModGMSK:
-				if self.tsc_set not in range(0, 4):
-					raise ValueError("TSC set %d is out of range" % self.tsc_set)
+			if self.c['mod_type'] is Modulation.ModGMSK:
+				if self.c['tsc_set'] not in range(0, 4):
+					raise ValueError("TSC set %d is out of range" % self.c['tsc_set'])
 			else:
-				if self.tsc_set not in range(0, 2):
-					raise ValueError("TSC set %d is out of range" % self.tsc_set)
+				if self.c['tsc_set'] not in range(0, 2):
+					raise ValueError("TSC set %d is out of range" % self.c['tsc_set'])
 
-			if self.tsc is None:
-				raise ValueError("TSC is not set")
-
-			if self.tsc not in self.TSC_RANGE:
-				raise ValueError("TSC %d is out of range" % self.tsc)
+			if self.c['tsc'] not in self.TSC_RANGE:
+				raise ValueError("TSC %d is out of range" % self.c['tsc'])
 
 		# Version specific parameters (also present in NOPE.ind)
-		if self.ver >= 0x01:
-			if self.ci is None:
-				raise ValueError("C/I is not set")
+		if self._ver >= 1:
+			if self.c['cir'] < self.CI_MIN or self.c['cir'] > self.CI_MAX:
+				raise ValueError("C/I %d is out of range" % self.c['cir'])
 
-			if self.ci < self.CI_MIN or self.ci > self.CI_MAX:
-				raise ValueError("C/I %d is out of range" % self.ci)
-
-		self.validate_burst()
-
-	def rand_rssi(self, min = None, max = None):
+	def rand_rssi(self, min = None, max = None) -> int:
 		''' Generate a random RSSI value. '''
 
 		if min is None:
@@ -498,7 +362,7 @@
 
 		return random.randint(min, max)
 
-	def rand_toa256(self, min = None, max = None):
+	def rand_toa256(self, min = None, max = None) -> int:
 		''' Generate a random ToA (Time of Arrival) value. '''
 
 		if min is None:
@@ -509,185 +373,88 @@
 
 		return random.randint(min, max)
 
-	def rand_hdr(self):
+	def rand_hdr(self, nope: bool = False) -> None:
 		''' Randomize message specific header. '''
 
 		Msg.rand_hdr(self)
-		self.rssi = self.rand_rssi()
-		self.toa256 = self.rand_toa256()
+		self.c['rssi'] = self.rand_rssi()
+		self.c['toa256'] = self.rand_toa256()
+		self.c['nope'] = nope
 
-		if self.ver >= 0x01:
-			self.mod_type = random.choice(list(Modulation))
-			if self.mod_type is Modulation.ModGMSK:
-				self.tsc_set = random.randint(0, 3)
+		if self._ver >= 1 and not nope:
+			self.c['mod_type'] = random.choice(list(Modulation))
+			if self.c['mod_type'] is Modulation.ModGMSK:
+				self.c['tsc_set'] = random.randint(0, 3)
 			else:
-				self.tsc_set = random.randint(0, 1)
-			self.tsc = random.choice(self.TSC_RANGE)
+				self.c['tsc_set'] = random.randint(0, 1)
+			self.c['tsc'] = random.choice(self.TSC_RANGE)
 
+		if self._ver >= 1:
 			# C/I: Carrier-to-Interference ratio
-			self.ci = random.randint(self.CI_MIN, self.CI_MAX)
+			self.c['cir'] = random.randint(self.CI_MIN, self.CI_MAX)
 
-	def desc_hdr(self):
+	def desc_hdr(self) -> str:
 		''' Generate human-readable header description. '''
 
 		# Describe the common part
 		result = Msg.desc_hdr(self)
 
-		if self.rssi is not None:
-			result += ("rssi=%d " % self.rssi)
+		result += ("rssi=%d " % self.c['rssi'])
+		result += ("toa256=%d " % self.c['toa256'])
 
-		if self.toa256 is not None:
-			result += ("toa256=%d " % self.toa256)
-
-		if self.ver >= 0x01:
-			if not self.nope_ind:
-				if self.mod_type is not None:
-					result += ("%s " % self.mod_type)
-				if self.tsc_set is not None:
-					result += ("set=%u " % self.tsc_set)
-				if self.tsc is not None:
-					result += ("tsc=%u " % self.tsc)
-				if self.ci is not None:
-					result += ("C/I=%d cB " % self.ci)
+		if self._ver >= 0x01:
+			if not self.c['nope']:
+				result += ("%s " % self.c['mod_type'])
+				result += ("set=%u " % self.c['tsc_set'])
+				result += ("tsc=%u " % self.c['tsc'])
+				result += ("C/I=%d cB " % self.c['cir'])
 			else:
 				result += "(IDLE / NOPE IND) "
 
 		# Strip useless whitespace and return
 		return result.strip()
 
-	def gen_mts(self):
-		''' Encode Modulation and Training Sequence info. '''
-
-		# IDLE / nope indication has no MTS info
-		if self.nope_ind:
-			return self.NOPE_IND
-
-		# TSC: . . . . . X X X
-		mts = self.tsc & 0b111
-
-		# MTS: . X X X X . . .
-		mts |= self.mod_type.coding << 3
-		mts |= self.tsc_set << 3
-
-		return mts
-
-	def parse_mts(self, mts):
-		''' Parse Modulation and Training Sequence info. '''
-
-		# IDLE / nope indication has no MTS info
-		self.nope_ind = (mts & self.NOPE_IND) > 0
-		if self.nope_ind:
-			self.mod_type = None
-			self.tsc_set = None
-			self.tsc = None
-			return
-
-		# TSC: . . . . . X X X
-		self.tsc = mts & 0b111
-
-		# MTS: . X X X X . . .
-		mts = (mts >> 3) & 0b1111
-		if (mts & 0b1100) > 0:
-			# Mask: . . . . M M M S
-			self.mod_type = Modulation.pick(mts & 0b1110)
-			self.tsc_set = mts & 0b1
-		else:
-			# GMSK: . . . . 0 0 S S
-			self.mod_type = Modulation.ModGMSK
-			self.tsc_set = mts & 0b11
-
-	def gen_hdr(self):
-		''' Generate message specific header part. '''
-
-		# Allocate an empty byte-array
-		buf = bytearray()
-
-		# Put RSSI
-		buf.append(-self.rssi)
-
-		# Encode ToA (Time of Arrival)
-		# Big endian, 2 bytes (int32_t)
-		buf += struct.pack(">h", self.toa256)
-
-		if self.ver >= 0x01:
-			# Modulation and Training Sequence info
-			mts = self.gen_mts()
-			buf.append(mts)
-
-			# C/I: Carrier-to-Interference ratio (in centiBels)
-			buf += struct.pack(">h", self.ci)
-
-		return buf
-
-	def parse_hdr(self, hdr):
-		''' Parse message specific header part. '''
-
-		# Parse RSSI
-		self.rssi = -(hdr[5])
-
-		# Parse ToA (Time of Arrival)
-		self.toa256 = struct.unpack(">h", hdr[6:8])[0]
-
-		if self.ver >= 0x01:
-			# Modulation and Training Sequence info
-			self.parse_mts(hdr[8])
-
-			# C/I: Carrier-to-Interference ratio (in centiBels)
-			self.ci = struct.unpack(">h", hdr[9:11])[0]
-
-	def gen_burst(self):
+	def gen_burst(self) -> None:
 		''' Generate message specific burst. '''
 
 		# Convert soft-bits to unsigned soft-bits
-		burst_usbits = self.sbit2usbit(self.burst)
+		burst = self.sbit2usbit(self.c['burst'])
+		self.c['soft-bits'] = bytes(burst)
 
-		# Encode to bytes
-		return bytearray(burst_usbits)
-
-	def _parse_burst_v0(self, burst):
+	def _parse_burst_v0(self, burst: List[int]) -> List[int]:
 		''' Parse message specific burst for header version 0. '''
 
 		bl = len(burst)
 
 		# We need to guess modulation by the length of burst
-		self.mod_type = Modulation.pick_by_bl(bl)
-		if self.mod_type is None:
+		self.c['mod_type'] = Modulation.pick_by_bl(bl)
+		if self.c['mod_type'] is None:
 			# Some old transceivers append two dummy bytes
-			self.mod_type = Modulation.pick_by_bl(bl - 2)
+			self.c['mod_type'] = Modulation.pick_by_bl(bl - 2)
 
-		if self.mod_type is None:
+		if self.c['mod_type'] is None:
 			raise ValueError("Odd burst length %u" % bl)
 
-		return burst[:self.mod_type.bl]
+		return burst[:self.c['mod_type'].bl]
 
-	def parse_burst(self, burst):
+	def parse_burst(self) -> None:
 		''' Parse message specific burst. '''
 
-		burst = list(burst)
-
-		if self.ver == 0x00:
+		burst = list(self.c['soft-bits'])
+		if self._ver == 0:
 			burst = self._parse_burst_v0(burst)
 
 		# Convert unsigned soft-bits to soft-bits
-		self.burst = self.usbit2sbit(burst)
+		self.c['burst'] = self.usbit2sbit(burst)
 
-	def rand_burst(self, length = None):
+	def rand_burst(self, length = None) -> None:
 		''' Generate a random message specific burst. '''
 
 		if length is None:
-			length = self.mod_type.bl
+			length = self.c['mod_type'].bl
 
-		self.burst = [random.randint(-127, 127) for _ in range(length)]
+		self.c['burst'] = [random.randint(-127, 127) for _ in range(length)]
 
-	def trans(self, ver = None):
-		''' Transform this message to TxMsg. '''
-
-		# Allocate a new message
-		msg = TxMsg(fn = self.fn, tn = self.tn,
-			ver = self.ver if ver is None else ver)
-
-		# Convert burst bits
-		if self.burst is not None:
-			msg.burst = self.sbit2ubit(self.burst)
-
-		return msg
+	def trans_burst(self) -> List[int]:
+		''' Transform soft-bits into hard-bits. '''
+		return self.sbit2ubit(self.c['burst'])
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 573527b..3226a0e 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -188,9 +188,9 @@
 		if self.burst_drop_amount == 0:
 			return False
 
-		if msg.fn % self.burst_drop_period == 0:
+		if msg.c['fn'] % self.burst_drop_period == 0:
 			log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"
-				% (self, msg.fn, self.burst_drop_period))
+				% (self, msg.c['fn'], self.burst_drop_period))
 			self.burst_drop_amount -= 1
 			return True
 
@@ -198,64 +198,63 @@
 
 	def _handle_data_msg_v1(self, src_msg, msg):
 		# C/I (Carrier-to-Interference ratio)
-		msg.ci = self.ci
+		msg.c['cir'] = self.ci
 
 		# Pick modulation type by burst length
-		bl = len(src_msg.burst)
-		msg.mod_type = Modulation.pick_by_bl(bl)
+		bl = len(src_msg.c['burst'])
+		msg.c['mod_type'] = Modulation.pick_by_bl(bl)
 
 		# Pick TSC (Training Sequence Code) and TSC set
-		if msg.mod_type is Modulation.ModGMSK:
-			ss = TrainingSeqGMSK.pick(src_msg.burst)
-			msg.tsc = ss.tsc if ss is not None else 0
-			msg.tsc_set = ss.tsc_set if ss is not None else 0
+		if msg.c['mod_type'] is Modulation.ModGMSK:
+			ss = TrainingSeqGMSK.pick(src_msg.c['burst'])
+			msg.c['tsc'] = ss.tsc if ss is not None else 0
+			msg.c['tsc_set'] = ss.tsc_set if ss is not None else 0
 		else: # TODO: other modulation types (at least 8-PSK)
-			msg.tsc_set = 0
-			msg.tsc = 0
+			msg.c['tsc_set'] = 0
+			msg.c['tsc'] = 0
 
 	# Takes (partially initialized) TRXD Rx message,
 	# simulates RF path parameters (such as RSSI),
 	# and sends towards the L1
 	def handle_data_msg(self, src_trx, src_msg, msg):
 		if self.rf_muted:
-			msg.nope_ind = True
-		elif not msg.nope_ind:
+			msg.c['nope'] = True
+		elif not msg.c['nope']:
 			# Path loss simulation
-			msg.nope_ind = self.sim_burst_drop(msg)
-		if msg.nope_ind:
+			msg.c['nope'] = self.sim_burst_drop(msg)
+		if msg.c['nope']:
 			# Before TRXDv1, we simply drop the message
-			if msg.ver < 0x01:
+			if msg._ver < 1:
 				del msg
 				return
 
 			# Since TRXDv1, we should send a NOPE.ind
-			del msg.burst # burst bits are omited
-			msg.burst = None
+			msg.c['burst'].clear() # burst bits are omited
 
 			# TODO: shoud we make these values configurable?
-			msg.toa256 = self.TOA256_NOISE_DEFAULT
-			msg.rssi = self.RSSI_NOISE_DEFAULT
-			msg.ci = self.CI_NOISE_DEFAULT
+			msg.c['toa256'] = self.TOA256_NOISE_DEFAULT
+			msg.c['rssi'] = self.RSSI_NOISE_DEFAULT
+			msg.c['cir'] = self.CI_NOISE_DEFAULT
 
 			self.data_if.send_msg(msg)
 			return
 
 		# Complete message header
-		msg.toa256 = self.toa256
+		msg.c['toa256'] = self.toa256
 
 		# Apply RSSI based on transmitter:
 		if not self.fake_rssi_enabled:
-			msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT
+			msg.c['rssi'] = src_trx.tx_power - src_msg.c['pwr'] - self.PATH_LOSS_DEFAULT
 		else: # Apply fake RSSI
-			msg.rssi = self.rssi
+			msg.c['rssi'] = self.rssi
 
 		# Version specific fields
-		if msg.ver >= 0x01:
+		if msg._ver >= 1:
 			self._handle_data_msg_v1(src_msg, msg)
 
 		# Apply optional Timing Advance
 		if src_trx.ta != 0:
-			msg.toa256 -= src_trx.ta * 256
+			msg.c['toa256'] -= src_trx.ta * 256
 
 		Transceiver.handle_data_msg(self, msg)
 
diff --git a/src/target/trx_toolkit/test_data_dump.py b/src/target/trx_toolkit/test_data_dump.py
index f7b4fde..2dad6b2 100644
--- a/src/target/trx_toolkit/test_data_dump.py
+++ b/src/target/trx_toolkit/test_data_dump.py
@@ -97,7 +97,7 @@
 		msg_ref = self._gen_rand_message(cls)
 		self._ddf.append_msg(msg_ref)
 
-		msg = self._ddf.parse_msg(0)
+		msg = self._ddf.from_bytes(0)
 		self._compare_msg(msg, msg_ref)
 
 	# Store one Rx message in a file, read it back and compare
@@ -120,7 +120,7 @@
 			self._compare_msg(msg_list[i], msg_list_ref[i])
 
 	# Verify random access to stored messages
-	def test_parse_msg_idx(self):
+	def test_from_bytes_idx(self):
 		# Store a mixed list of random messages (19 + 19)
 		msg_list_ref = self._gen_rand_message_mix(19)
 		self._ddf.append_all(msg_list_ref)
@@ -128,13 +128,13 @@
 		# Random access
 		for _ in range(100):
 			idx = random.randrange(len(msg_list_ref))
-			msg = self._ddf.parse_msg(idx)
+			msg = self._ddf.from_bytes(idx)
 			self._compare_msg(msg, msg_list_ref[idx])
 
 	def test_parse_empty(self):
 		with self.assertLogs(level = 'ERROR'):
 			for idx in range(100):
-				msg = self._ddf.parse_msg(idx)
+				msg = self._ddf.from_bytes(idx)
 				self.assertEqual(msg, None)
 
 	def test_parse_all_empty(self):
@@ -148,7 +148,7 @@
 		self._tf.write(b'\xff' * 90)
 
 		with self.assertLogs(level = 'ERROR'):
-			msg = self._ddf.parse_msg(0)
+			msg = self._ddf.from_bytes(0)
 			self.assertEqual(msg, None)
 
 	def test_parse_unknown_tag(self):
@@ -158,7 +158,7 @@
 		self._tf.write(b'\xff' * 90)
 
 		with self.assertLogs(level = 'ERROR'):
-			msg = self._ddf.parse_msg(0)
+			msg = self._ddf.from_bytes(0)
 			self.assertEqual(msg, None)
 
 if __name__ == '__main__':
diff --git a/src/target/trx_toolkit/test_data_msg.py b/src/target/trx_toolkit/test_data_msg.py
index 24fda67..b703ed4 100644
--- a/src/target/trx_toolkit/test_data_msg.py
+++ b/src/target/trx_toolkit/test_data_msg.py
@@ -5,6 +5,7 @@
 # Unit test for TRXD message codec
 #
 # (C) 2019 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2021 by sysmocom - s.f.m.c. GmbH <info at sysmocom.de>
 #
 # All Rights Reserved
 #
@@ -23,171 +24,33 @@
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
 import unittest
+import logging
 
 from data_msg import Msg, TxMsg, RxMsg
 
 class Msg_Test(unittest.TestCase):
 	# Compare message a with message b
-	def _compare_msg(self, a, b):
+	def _compare_msg(self, a: Msg, b: Msg) -> None:
 		# Make sure we're comparing messages of the same type
 		self.assertEqual(a.__class__, b.__class__)
 
-		# Compare common header fields
-		self.assertEqual(a.ver, b.ver)
-		self.assertEqual(a.fn, b.fn)
-		self.assertEqual(a.tn, b.tn)
+		# PDU version and fields
+		self.assertEqual(a._ver, b._ver)
+		for field in a.__class__.DEF_CONT:
+			self.assertEqual(a[field], b[field])
 
-		# Burst bits (if present)
-		self.assertEqual(a.burst, b.burst)
-
-		# TxMsg specific fields
-		if isinstance(a, TxMsg):
-			self.assertEqual(a.pwr, b.pwr)
-
-		# RxMsg specific fields
-		if isinstance(a, RxMsg):
-			# Version independent fields
-			self.assertEqual(a.toa256, b.toa256)
-			self.assertEqual(a.rssi, b.rssi)
-
-			# Version specific fields
-			if a.ver >= 1:
-				self.assertEqual(a.nope_ind, b.nope_ind)
-				self.assertEqual(a.mod_type, b.mod_type)
-				self.assertEqual(a.tsc_set, b.tsc_set)
-				self.assertEqual(a.tsc, b.tsc)
-				self.assertEqual(a.ci, b.ci)
-
-	# Make sure that message validation throws a ValueError
-	def test_validate(self):
-		# Unknown version
-		with self.assertRaises(ValueError):
-			msg = RxMsg(fn = 0, tn = 0, ver = 100)
-			msg.validate()
-
-		# Uninitialized field
-		with self.assertRaises(ValueError):
-			msg = RxMsg()
-			msg.validate()
-		with self.assertRaises(ValueError):
-			msg = RxMsg(fn = None, tn = 0)
-			msg.validate()
-
-		# Out-of-range value(s)
-		with self.assertRaises(ValueError):
-			msg = RxMsg(fn = -1, tn = 0)
-			msg.validate()
-		with self.assertRaises(ValueError):
-			msg = RxMsg(fn = 0, tn = 10)
-			msg.validate()
-
-	# Validate header and burst randomization
-	def test_rand_hdr_burst(self):
-		tx_msg = TxMsg()
-		rx_msg = RxMsg()
-
-		for i in range(100):
-			tx_msg.rand_burst()
-			rx_msg.rand_burst()
-			tx_msg.rand_hdr()
-			rx_msg.rand_hdr()
-
-			tx_msg.validate()
-			rx_msg.validate()
-
-	def _test_enc_dec(self, msg, legacy = False, nope_ind = False):
-		# Prepare a given message (randomize)
-		msg.rand_hdr()
-
-		# NOPE.ind contains no burst
-		if not nope_ind:
-			msg.rand_burst()
-		else:
-			msg.nope_ind = True
-			msg.mod_type = None
-			msg.tsc_set = None
-			msg.tsc = None
-
-		# Encode a given message to bytes
-		msg_enc = msg.gen_msg(legacy)
-
-		# Decode a new message from bytes
-		msg_dec = msg.__class__()
-		msg_dec.parse_msg(msg_enc)
-
-		# Compare decoded vs the original
-		self._compare_msg(msg, msg_dec)
-
-	# Validate encoding and decoding
-	def test_enc_dec(self):
+	def test_enc_dec_default(self):
+		''' Encode/decode/match test for default field values. '''
 		for ver in Msg.KNOWN_VERSIONS:
-			with self.subTest("TxMsg", ver = ver):
-				msg = TxMsg(ver = ver)
-				self._test_enc_dec(msg)
+			with self.subTest('TxMsg(ver=%u)' % ver):
+				a, b = TxMsg(ver), TxMsg(ver)
+				b.from_bytes(a.to_bytes())
+				self._compare_msg(a, b)
 
-			with self.subTest("RxMsg", ver = ver):
-				msg = RxMsg(ver = ver)
-				self._test_enc_dec(msg)
-
-			if ver >= 1:
-				with self.subTest("RxMsg NOPE.ind", ver = ver):
-					msg = RxMsg(ver = ver)
-					self._test_enc_dec(msg, nope_ind = True)
-
-		with self.subTest("RxMsg (legacy transceiver)"):
-			msg = RxMsg(ver = 0)
-			self._test_enc_dec(msg, legacy = True)
-
-	# Validate bit conversations
-	def test_bit_conv(self):
-		usbits_ref = list(range(0, 256))
-		sbits_ref = list(range(-127, 128))
-
-		# Test both usbit2sbit() and sbit2usbit()
-		sbits = Msg.usbit2sbit(usbits_ref)
-		usbits = Msg.sbit2usbit(sbits)
-		self.assertEqual(usbits[:255], usbits_ref[:255])
-		self.assertEqual(usbits[255], 254)
-
-		# Test both sbit2ubit() and ubit2sbit()
-		ubits = Msg.sbit2ubit(sbits_ref)
-		self.assertEqual(ubits, ([1] * 127 + [0] * 128))
-
-		sbits = Msg.ubit2sbit(ubits)
-		self.assertEqual(sbits, ([-127] * 127 + [127] * 128))
-
-	def _test_transform(self, msg):
-		# Prepare given messages
-		msg.rand_hdr()
-		msg.rand_burst()
-
-		# Perform message transformation
-		if isinstance(msg, TxMsg):
-			msg_trans = msg.trans()
-		else:
-			msg_trans = msg.trans()
-
-		self.assertEqual(msg_trans.ver, msg.ver)
-		self.assertEqual(msg_trans.fn, msg.fn)
-		self.assertEqual(msg_trans.tn, msg.tn)
-
-		if isinstance(msg, RxMsg):
-			burst = Msg.sbit2ubit(msg.burst)
-			self.assertEqual(msg_trans.burst, burst)
-		else:
-			burst = Msg.ubit2sbit(msg.burst)
-			self.assertEqual(msg_trans.burst, burst)
-
-	# Validate message transformation
-	def test_transform(self):
-		for ver in Msg.KNOWN_VERSIONS:
-			with self.subTest("TxMsg", ver = ver):
-				msg = TxMsg(ver = ver)
-				self._test_transform(msg)
-
-			with self.subTest("RxMsg", ver = ver):
-				msg = RxMsg(ver = ver)
-				self._test_transform(msg)
+			with self.subTest('RxMsg(ver=%u)' % ver):
+				a, b = RxMsg(ver), RxMsg(ver)
+				b.from_bytes(a.to_bytes())
+				self._compare_msg(a, b)
 
 if __name__ == '__main__':
 	unittest.main()
diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py
index d041070..54e6a6b 100644
--- a/src/target/trx_toolkit/transceiver.py
+++ b/src/target/trx_toolkit/transceiver.py
@@ -269,7 +269,7 @@
 			return None
 
 		# Make sure that indicated timeslot is configured
-		if msg.tn not in self.ts_list:
+		if msg.c['tn'] not in self.ts_list:
 			log.warning("(%s) RX TRXD message (%s), but timeslot is not "
 				"configured => dropping..." % (self, msg.desc_hdr()))
 			return None
@@ -278,4 +278,4 @@
 
 	def handle_data_msg(self, msg):
 		# TODO: make legacy mode configurable (via argv?)
-		self.data_if.send_msg(msg, legacy = True)
+		self.data_if.send_msg(msg)
diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py
index 8b6f80c..389a663 100755
--- a/src/target/trx_toolkit/trx_sniff.py
+++ b/src/target/trx_toolkit/trx_sniff.py
@@ -108,7 +108,7 @@
 
 		# Attempt to parse the payload as a DATA message
 		try:
-			msg.parse_msg(msg_raw)
+			msg.from_bytes(msg_raw)
 			msg.validate()
 		except ValueError as e:
 			desc = msg.desc_hdr()
@@ -160,7 +160,7 @@
 		# Message type specific filtering
 		if isinstance(msg, RxMsg):
 			# NOPE.ind filter
-			if not self.argv.pf_nope_ind and msg.nope_ind:
+			if not self.argv.pf_nope_ind and msg.c['nope']:
 				return False
 
 			# RSSI filter

-- 
To view, visit https://gerrit.osmocom.org/c/osmocom-bb/+/24019
To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings

Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Change-Id: I21329419bff0b94a14b42b79fcdb460a662ad4bc
Gerrit-Change-Number: 24019
Gerrit-PatchSet: 1
Gerrit-Owner: fixeria <vyanitskiy at sysmocom.de>
Gerrit-MessageType: newchange
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20210430/cdeba577/attachment.htm>


More information about the gerrit-log mailing list