Change in osmocom-bb[master]: trx_toolkit: use RxMsg/TxMsg instead of TRX2L1/L12TRX

fixeria gerrit-no-reply at lists.osmocom.org
Fri Apr 30 20:33:08 UTC 2021


fixeria has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmocom-bb/+/24018 )


Change subject: trx_toolkit: use RxMsg/TxMsg instead of TRX2L1/L12TRX
......................................................................

trx_toolkit: use RxMsg/TxMsg instead of TRX2L1/L12TRX

I intentionally do not use 'Downlink' and 'Uplink' terms in this project
because both MS and BTS transmit and receive on the opposite directions.
A burst coming from demodulator may be a Downlink or an Uplink burst
depending on the context, so we definitely need more precise terms.

Back then when I started to work on TRX toolkit, I decided to use the
'TRX2L1' and 'L12TRX' for receive and transmit directions respectively.
Now I find them hard to read, so let's replace them with 'Rx' and 'Tx'.

Change-Id: I688f24a3c09dd7e1cc00b5530ec26c8e8cfd8f7c
Related: OS#4006, SYS#4895
---
M src/target/trx_toolkit/burst_fwd.py
M src/target/trx_toolkit/burst_gen.py
M src/target/trx_toolkit/burst_send.py
M src/target/trx_toolkit/ctrl_if_trx.py
M src/target/trx_toolkit/data_dump.py
M src/target/trx_toolkit/data_if.py
M src/target/trx_toolkit/data_msg.py
M src/target/trx_toolkit/fake_trx.py
M src/target/trx_toolkit/test_data_dump.py
M src/target/trx_toolkit/test_data_msg.py
M src/target/trx_toolkit/transceiver.py
M src/target/trx_toolkit/trx_sniff.py
12 files changed, 117 insertions(+), 121 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/18/24018/1

diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py
index 0b69fe2..2e9e97b 100644
--- a/src/target/trx_toolkit/burst_fwd.py
+++ b/src/target/trx_toolkit/burst_fwd.py
@@ -37,8 +37,8 @@
 	  - actual RX / TX frequencies,
 	  - list of active timeslots.
 
-	Each to be distributed L12TRX message is being transformed
-	into a TRX2L1 message, and then forwarded to transceivers
+	Each to be distributed 'TxMsg' message is being transformed
+	into a 'RxMsg' message, and then forwarded to transceivers
 	with partially initialized header. All uninitialized header
 	fields (such as rssi and toa256) shall be set by each
 	transceiver individually before sending towards the L1.
@@ -69,6 +69,6 @@
 			if trx.get_rx_freq(rx_msg.fn) != tx_freq:
 				continue
 
-			# Transform from L12TRX to TRX2L1 and forward
-			tx_msg = rx_msg.gen_trx2l1(ver = trx.data_if._hdr_ver)
+			# Transform from TxMsg to RxMsg and forward
+			tx_msg = rx_msg.trans(ver = trx.data_if._hdr_ver)
 			trx.handle_data_msg(src_trx, rx_msg, tx_msg)
diff --git a/src/target/trx_toolkit/burst_gen.py b/src/target/trx_toolkit/burst_gen.py
index 7f154b3..f93d868 100755
--- a/src/target/trx_toolkit/burst_gen.py
+++ b/src/target/trx_toolkit/burst_gen.py
@@ -68,9 +68,9 @@
 
 		# Init an empty DATA message
 		if self.argv.conn_mode == "TRX":
-			msg = DATAMSG_L12TRX(ver = self.argv.hdr_ver)
+			msg = TxMsg(ver = self.argv.hdr_ver)
 		elif self.argv.conn_mode == "L1":
-			msg = DATAMSG_TRX2L1(ver = self.argv.hdr_ver)
+			msg = RxMsg(ver = self.argv.hdr_ver)
 
 		# Generate a random frame number or use provided one
 		fn_init = msg.rand_fn() if self.argv.tdma_fn is None \
@@ -176,7 +176,7 @@
 			help = "How many bursts to send (default %(default)s)")
 		bg_group.add_argument("-v", "--hdr-version", metavar = "VER",
 			dest = "hdr_ver", type = int,
-			default = 0, choices = DATAMSG.KNOWN_VERSIONS,
+			default = 0, choices = Msg.KNOWN_VERSIONS,
 			help = "TRXD header version (default %(default)s)")
 		bg_group.add_argument("-f", "--frame-number", metavar = "FN",
 			dest = "tdma_fn", type = int,
diff --git a/src/target/trx_toolkit/burst_send.py b/src/target/trx_toolkit/burst_send.py
index 717414d..4e16571 100755
--- a/src/target/trx_toolkit/burst_send.py
+++ b/src/target/trx_toolkit/burst_send.py
@@ -79,11 +79,10 @@
 
 	def msg_pass_filter(self, msg):
 		# Direction filter
-		l12trx = self.argv.conn_mode == "TRX"
-		if isinstance(msg, DATAMSG_L12TRX) and not l12trx:
-			return False
-		elif isinstance(msg, DATAMSG_TRX2L1) and l12trx:
-			return False
+		if isinstance(msg, RxMsg) and self.argv.conn_mode == "TRX":
+			return False # cannot send RxMsg to TRX
+		if isinstance(msg, TxMsg) and self.argv.conn_mode == "L1":
+			return False # cannot send TxMsg to L1
 
 		# Timeslot filter
 		if self.argv.pf_tn is not None:
diff --git a/src/target/trx_toolkit/ctrl_if_trx.py b/src/target/trx_toolkit/ctrl_if_trx.py
index ea08e7e..5034b75 100644
--- a/src/target/trx_toolkit/ctrl_if_trx.py
+++ b/src/target/trx_toolkit/ctrl_if_trx.py
@@ -25,7 +25,7 @@
 import logging as log
 
 from ctrl_if import CTRLInterface
-from data_msg import DATAMSG
+from data_msg import Msg
 
 class CTRLInterfaceTRX(CTRLInterface):
 	""" CTRL interface handler for common transceiver management commands.
@@ -222,7 +222,7 @@
 			# ... and store current for logging
 			ver_cur = self.trx.data_if._hdr_ver
 
-			if ver_req < 0 or ver_req > DATAMSG.CHDR_VERSION_MAX:
+			if ver_req < 0 or ver_req > Msg.CHDR_VERSION_MAX:
 				log.error("(%s) Incorrect TRXD header version %u"
 					% (self.trx, ver_req))
 				return -1
diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py
index 9a68556..8510e2d 100644
--- a/src/target/trx_toolkit/data_dump.py
+++ b/src/target/trx_toolkit/data_dump.py
@@ -28,18 +28,18 @@
 
 class DATADump:
 	# Constants
-	TAG_L12TRX = b'\x01'
-	TAG_TRX2L1 = b'\x02'
+	TAG_TxMsg = b'\x01'
+	TAG_RxMsg = b'\x02'
 	HDR_LENGTH = 3
 
 	# Generates raw bytes from a DATA message
 	# Return value: raw message bytes
 	def dump_msg(self, msg):
 		# Determine a message type
-		if isinstance(msg, DATAMSG_L12TRX):
-			tag = self.TAG_L12TRX
-		elif isinstance(msg, DATAMSG_TRX2L1):
-			tag = self.TAG_TRX2L1
+		if isinstance(msg, TxMsg):
+			tag = self.TAG_TxMsg
+		elif isinstance(msg, RxMsg):
+			tag = self.TAG_RxMsg
 		else:
 			raise ValueError("Unknown message type")
 
@@ -61,12 +61,10 @@
 		tag = hdr[:1]
 
 		# Check if tag is known
-		if tag == self.TAG_L12TRX:
-			# L1 -> TRX
-			msg = DATAMSG_L12TRX()
-		elif tag == self.TAG_TRX2L1:
-			# TRX -> L1
-			msg = DATAMSG_TRX2L1()
+		if tag == self.TAG_TxMsg:
+			msg = TxMsg()
+		elif tag == self.TAG_RxMsg:
+			msg = RxMsg()
 		else:
 			# Unknown tag
 			return False
diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py
index 07f3d32..1cded9b 100644
--- a/src/target/trx_toolkit/data_if.py
+++ b/src/target/trx_toolkit/data_if.py
@@ -35,7 +35,7 @@
 		log.debug("Init TRXD interface (%s)" % self.desc_link())
 
 	def set_hdr_ver(self, ver):
-		if not ver in DATAMSG.KNOWN_VERSIONS:
+		if not ver in Msg.KNOWN_VERSIONS:
 			return False
 
 		self._hdr_ver = ver
@@ -43,7 +43,7 @@
 
 	def pick_hdr_ver(self, ver_req):
 		# Pick a version that is lower or equal to ver_req
-		for ver in DATAMSG.KNOWN_VERSIONS[::-1]:
+		for ver in Msg.KNOWN_VERSIONS[::-1]:
 			if ver <= ver_req:
 				return ver
 
@@ -64,16 +64,16 @@
 		data, _ = self.sock.recvfrom(512)
 		return data
 
-	def recv_l12trx_msg(self):
+	def recv_tx_msg(self):
 		# Read raw data from socket
 		data = self.recv_raw_data()
 
-		# Attempt to parse as a L12TRX message
+		# Attempt to parse a TRXD Tx message
 		try:
-			msg = DATAMSG_L12TRX()
+			msg = TxMsg()
 			msg.parse_msg(bytearray(data))
 		except:
-			log.error("Failed to parse a L12TRX message "
+			log.error("Failed to parse a TRXD Tx message "
 				"from R:%s:%u" % (self.remote_addr, self.remote_port))
 			return None
 
@@ -84,16 +84,16 @@
 
 		return msg
 
-	def recv_trx2l1_msg(self):
+	def recv_rx_msg(self):
 		# Read raw data from socket
 		data = self.recv_raw_data()
 
-		# Attempt to parse as a L12TRX message
+		# Attempt to parse a TRXD Rx message
 		try:
-			msg = DATAMSG_TRX2L1()
+			msg = RxMsg()
 			msg.parse_msg(bytearray(data))
 		except:
-			log.error("Failed to parse a TRX2L1 message "
+			log.error("Failed to parse a TRXD Rx message "
 				"from R:%s:%u" % (self.remote_addr, self.remote_port))
 			return None
 
@@ -106,10 +106,10 @@
 
 	def send_msg(self, msg, legacy = False):
 		try:
-			# Validate and encode TRXD message
+			# Validate and encode a TRXD message
 			payload = msg.gen_msg(legacy)
 		except ValueError as e:
-			log.error("Failed to encode a TRX2L1 message ('%s') "
+			log.error("Failed to encode a TRXD message ('%s') "
 				"due to error: %s" % (msg.desc_hdr(), e))
 			# TODO: we may want to send a NOPE.ind here
 			return
diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py
index c62db35..7e785f9 100644
--- a/src/target/trx_toolkit/data_msg.py
+++ b/src/target/trx_toolkit/data_msg.py
@@ -59,7 +59,7 @@
 				return mod
 		return None
 
-class DATAMSG(abc.ABC):
+class Msg(abc.ABC):
 	''' TRXD (DATA) message coding API (common part). '''
 
 	# NOTE: up to 16 versions can be encoded
@@ -228,8 +228,8 @@
 		else:
 			self.burst = None
 
-class DATAMSG_L12TRX(DATAMSG):
-	''' L12TRX (L1 -> TRX) message coding API. '''
+class TxMsg(Msg):
+	''' Tx (L1 -> TRX) message coding API. '''
 
 	# Constants
 	PWR_MIN = 0x00
@@ -257,7 +257,7 @@
 		''' Validate the message fields (throws ValueError). '''
 
 		# Validate common fields
-		DATAMSG.validate(self)
+		Msg.validate(self)
 
 		if self.pwr is None:
 			raise ValueError("Tx Attenuation level is not set")
@@ -287,14 +287,14 @@
 	def rand_hdr(self):
 		''' Randomize message specific header. '''
 
-		DATAMSG.rand_hdr(self)
+		Msg.rand_hdr(self)
 		self.pwr = self.rand_pwr()
 
 	def desc_hdr(self):
 		''' Generate human-readable header description. '''
 
 		# Describe the common part
-		result = DATAMSG.desc_hdr(self)
+		result = Msg.desc_hdr(self)
 
 		if self.pwr is not None:
 			result += ("pwr=%u " % self.pwr)
@@ -340,11 +340,11 @@
 		''' Generate a random message specific burst. '''
 		self.burst = [random.randint(0, 1) for _ in range(length)]
 
-	def gen_trx2l1(self, ver = None):
-		''' Transform this message to TRX2L1 message. '''
+	def trans(self, ver = None):
+		''' Transform this message into RxMsg. '''
 
 		# Allocate a new message
-		msg = DATAMSG_TRX2L1(fn = self.fn, tn = self.tn,
+		msg = RxMsg(fn = self.fn, tn = self.tn,
 			ver = self.ver if ver is None else ver)
 
 		# Convert burst bits
@@ -355,8 +355,8 @@
 
 		return msg
 
-class DATAMSG_TRX2L1(DATAMSG):
-	''' TRX2L1 (TRX -> L1) message coding API. '''
+class RxMsg(Msg):
+	''' Rx (TRX -> L1) message coding API. '''
 
 	# rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise
 	RSSI_MIN = -120
@@ -442,7 +442,7 @@
 		''' Validate the message header fields (throws ValueError). '''
 
 		# Validate common fields
-		DATAMSG.validate(self)
+		Msg.validate(self)
 
 		if self.rssi is None:
 			raise ValueError("RSSI is not set")
@@ -512,7 +512,7 @@
 	def rand_hdr(self):
 		''' Randomize message specific header. '''
 
-		DATAMSG.rand_hdr(self)
+		Msg.rand_hdr(self)
 		self.rssi = self.rand_rssi()
 		self.toa256 = self.rand_toa256()
 
@@ -531,7 +531,7 @@
 		''' Generate human-readable header description. '''
 
 		# Describe the common part
-		result = DATAMSG.desc_hdr(self)
+		result = Msg.desc_hdr(self)
 
 		if self.rssi is not None:
 			result += ("rssi=%d " % self.rssi)
@@ -679,11 +679,11 @@
 
 		self.burst = [random.randint(-127, 127) for _ in range(length)]
 
-	def gen_l12trx(self, ver = None):
-		''' Transform this message to L12TRX message. '''
+	def trans(self, ver = None):
+		''' Transform this message to TxMsg. '''
 
 		# Allocate a new message
-		msg = DATAMSG_L12TRX(fn = self.fn, tn = self.tn,
+		msg = TxMsg(fn = self.fn, tn = self.tn,
 			ver = self.ver if ver is None else ver)
 
 		# Convert burst bits
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 6f46473..573527b 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -213,7 +213,7 @@
 			msg.tsc_set = 0
 			msg.tsc = 0
 
-	# Takes (partially initialized) TRX2L1 message,
+	# Takes (partially initialized) TRXD Rx message,
 	# simulates RF path parameters (such as RSSI),
 	# and sends towards the L1
 	def handle_data_msg(self, src_trx, src_msg, msg):
diff --git a/src/target/trx_toolkit/test_data_dump.py b/src/target/trx_toolkit/test_data_dump.py
index 9cd9b79..f7b4fde 100644
--- a/src/target/trx_toolkit/test_data_dump.py
+++ b/src/target/trx_toolkit/test_data_dump.py
@@ -50,12 +50,12 @@
 		# Burst bits (if present)
 		self.assertEqual(a.burst, b.burst)
 
-		# TRX2L1 specific fields
-		if isinstance(a, DATAMSG_L12TRX):
+		# TxMsg specific fields
+		if isinstance(a, TxMsg):
 			self.assertEqual(a.pwr, b.pwr)
 
-		# L12TRX specific fields
-		if isinstance(a, DATAMSG_TRX2L1):
+		# RxMsg specific fields
+		if isinstance(a, RxMsg):
 			# Version independent fields
 			self.assertEqual(a.toa256, b.toa256)
 			self.assertEqual(a.rssi, b.rssi)
@@ -88,8 +88,8 @@
 	# Generate a mixed list of random messages
 	def _gen_rand_message_mix(self, count, ver = 1):
 		msg_list = []
-		msg_list += self._gen_rand_messages(DATAMSG_TRX2L1, count)
-		msg_list += self._gen_rand_messages(DATAMSG_L12TRX, count)
+		msg_list += self._gen_rand_messages(RxMsg, count)
+		msg_list += self._gen_rand_messages(TxMsg, count)
 		random.shuffle(msg_list)
 		return msg_list
 
@@ -100,15 +100,15 @@
 		msg = self._ddf.parse_msg(0)
 		self._compare_msg(msg, msg_ref)
 
-	# Store one TRX2L1 message in a file, read it back and compare
-	def test_store_and_parse_trx2l1(self):
-		self._test_store_and_parse(DATAMSG_TRX2L1)
+	# Store one Rx message in a file, read it back and compare
+	def test_store_and_parse_rx_msg(self):
+		self._test_store_and_parse(RxMsg)
 
-	# Store one L12TRX message in a file, read it back and compare
-	def test_store_and_parse_l12trx(self):
-		self._test_store_and_parse(DATAMSG_L12TRX)
+	# Store one Tx message in a file, read it back and compare
+	def test_store_and_parse_tx_msg(self):
+		self._test_store_and_parse(TxMsg)
 
-	# Store multiple TRX2L1/L12TRX messages in a file, read them back and compare
+	# Store multiple Rx/Tx messages in a file, read them back and compare
 	def test_store_and_parse_all(self):
 		# Store a mixed list of random messages (19 + 19)
 		msg_list_ref = self._gen_rand_message_mix(19)
@@ -143,7 +143,7 @@
 
 	def test_parse_len_overflow(self):
 		# Write a malformed message directly
-		self._tf.write(DATADump.TAG_L12TRX)
+		self._tf.write(DATADump.TAG_TxMsg)
 		self._tf.write(b'\x00\x63') # 99
 		self._tf.write(b'\xff' * 90)
 
diff --git a/src/target/trx_toolkit/test_data_msg.py b/src/target/trx_toolkit/test_data_msg.py
index 991dd25..24fda67 100644
--- a/src/target/trx_toolkit/test_data_msg.py
+++ b/src/target/trx_toolkit/test_data_msg.py
@@ -24,9 +24,9 @@
 
 import unittest
 
-from data_msg import DATAMSG, DATAMSG_L12TRX, DATAMSG_TRX2L1
+from data_msg import Msg, TxMsg, RxMsg
 
-class DATAMSG_Test(unittest.TestCase):
+class Msg_Test(unittest.TestCase):
 	# Compare message a with message b
 	def _compare_msg(self, a, b):
 		# Make sure we're comparing messages of the same type
@@ -40,12 +40,12 @@
 		# Burst bits (if present)
 		self.assertEqual(a.burst, b.burst)
 
-		# TRX2L1 specific fields
-		if isinstance(a, DATAMSG_L12TRX):
+		# TxMsg specific fields
+		if isinstance(a, TxMsg):
 			self.assertEqual(a.pwr, b.pwr)
 
-		# L12TRX specific fields
-		if isinstance(a, DATAMSG_TRX2L1):
+		# RxMsg specific fields
+		if isinstance(a, RxMsg):
 			# Version independent fields
 			self.assertEqual(a.toa256, b.toa256)
 			self.assertEqual(a.rssi, b.rssi)
@@ -62,38 +62,38 @@
 	def test_validate(self):
 		# Unknown version
 		with self.assertRaises(ValueError):
-			msg = DATAMSG_TRX2L1(fn = 0, tn = 0, ver = 100)
+			msg = RxMsg(fn = 0, tn = 0, ver = 100)
 			msg.validate()
 
 		# Uninitialized field
 		with self.assertRaises(ValueError):
-			msg = DATAMSG_TRX2L1()
+			msg = RxMsg()
 			msg.validate()
 		with self.assertRaises(ValueError):
-			msg = DATAMSG_TRX2L1(fn = None, tn = 0)
+			msg = RxMsg(fn = None, tn = 0)
 			msg.validate()
 
 		# Out-of-range value(s)
 		with self.assertRaises(ValueError):
-			msg = DATAMSG_TRX2L1(fn = -1, tn = 0)
+			msg = RxMsg(fn = -1, tn = 0)
 			msg.validate()
 		with self.assertRaises(ValueError):
-			msg = DATAMSG_TRX2L1(fn = 0, tn = 10)
+			msg = RxMsg(fn = 0, tn = 10)
 			msg.validate()
 
 	# Validate header and burst randomization
 	def test_rand_hdr_burst(self):
-		msg_l12trx = DATAMSG_L12TRX()
-		msg_trx2l1 = DATAMSG_TRX2L1()
+		tx_msg = TxMsg()
+		rx_msg = RxMsg()
 
 		for i in range(100):
-			msg_l12trx.rand_burst()
-			msg_trx2l1.rand_burst()
-			msg_l12trx.rand_hdr()
-			msg_trx2l1.rand_hdr()
+			tx_msg.rand_burst()
+			rx_msg.rand_burst()
+			tx_msg.rand_hdr()
+			rx_msg.rand_hdr()
 
-			msg_l12trx.validate()
-			msg_trx2l1.validate()
+			tx_msg.validate()
+			rx_msg.validate()
 
 	def _test_enc_dec(self, msg, legacy = False, nope_ind = False):
 		# Prepare a given message (randomize)
@@ -120,22 +120,22 @@
 
 	# Validate encoding and decoding
 	def test_enc_dec(self):
-		for ver in DATAMSG.KNOWN_VERSIONS:
-			with self.subTest("L1 -> TRX message", ver = ver):
-				msg = DATAMSG_L12TRX(ver = ver)
+		for ver in Msg.KNOWN_VERSIONS:
+			with self.subTest("TxMsg", ver = ver):
+				msg = TxMsg(ver = ver)
 				self._test_enc_dec(msg)
 
-			with self.subTest("TRX -> L1 message", ver = ver):
-				msg = DATAMSG_TRX2L1(ver = ver)
+			with self.subTest("RxMsg", ver = ver):
+				msg = RxMsg(ver = ver)
 				self._test_enc_dec(msg)
 
 			if ver >= 1:
-				with self.subTest("TRX -> L1 NOPE.ind", ver = ver):
-					msg = DATAMSG_TRX2L1(ver = ver)
+				with self.subTest("RxMsg NOPE.ind", ver = ver):
+					msg = RxMsg(ver = ver)
 					self._test_enc_dec(msg, nope_ind = True)
 
-		with self.subTest("TRX -> L1 message (legacy)"):
-			msg = DATAMSG_TRX2L1(ver = 0)
+		with self.subTest("RxMsg (legacy transceiver)"):
+			msg = RxMsg(ver = 0)
 			self._test_enc_dec(msg, legacy = True)
 
 	# Validate bit conversations
@@ -144,16 +144,16 @@
 		sbits_ref = list(range(-127, 128))
 
 		# Test both usbit2sbit() and sbit2usbit()
-		sbits = DATAMSG.usbit2sbit(usbits_ref)
-		usbits = DATAMSG.sbit2usbit(sbits)
+		sbits = Msg.usbit2sbit(usbits_ref)
+		usbits = Msg.sbit2usbit(sbits)
 		self.assertEqual(usbits[:255], usbits_ref[:255])
 		self.assertEqual(usbits[255], 254)
 
 		# Test both sbit2ubit() and ubit2sbit()
-		ubits = DATAMSG.sbit2ubit(sbits_ref)
+		ubits = Msg.sbit2ubit(sbits_ref)
 		self.assertEqual(ubits, ([1] * 127 + [0] * 128))
 
-		sbits = DATAMSG.ubit2sbit(ubits)
+		sbits = Msg.ubit2sbit(ubits)
 		self.assertEqual(sbits, ([-127] * 127 + [127] * 128))
 
 	def _test_transform(self, msg):
@@ -162,31 +162,31 @@
 		msg.rand_burst()
 
 		# Perform message transformation
-		if isinstance(msg, DATAMSG_L12TRX):
-			msg_trans = msg.gen_trx2l1()
+		if isinstance(msg, TxMsg):
+			msg_trans = msg.trans()
 		else:
-			msg_trans = msg.gen_l12trx()
+			msg_trans = msg.trans()
 
 		self.assertEqual(msg_trans.ver, msg.ver)
 		self.assertEqual(msg_trans.fn, msg.fn)
 		self.assertEqual(msg_trans.tn, msg.tn)
 
-		if isinstance(msg, DATAMSG_TRX2L1):
-			burst = DATAMSG.sbit2ubit(msg.burst)
+		if isinstance(msg, RxMsg):
+			burst = Msg.sbit2ubit(msg.burst)
 			self.assertEqual(msg_trans.burst, burst)
 		else:
-			burst = DATAMSG.ubit2sbit(msg.burst)
+			burst = Msg.ubit2sbit(msg.burst)
 			self.assertEqual(msg_trans.burst, burst)
 
 	# Validate message transformation
 	def test_transform(self):
-		for ver in DATAMSG.KNOWN_VERSIONS:
-			with self.subTest("L1 -> TRX message", ver = ver):
-				msg = DATAMSG_L12TRX(ver = ver)
+		for ver in Msg.KNOWN_VERSIONS:
+			with self.subTest("TxMsg", ver = ver):
+				msg = TxMsg(ver = ver)
 				self._test_transform(msg)
 
-			with self.subTest("TRX -> L1 message", ver = ver):
-				msg = DATAMSG_TRX2L1(ver = ver)
+			with self.subTest("RxMsg", ver = ver):
+				msg = RxMsg(ver = ver)
 				self._test_transform(msg)
 
 if __name__ == '__main__':
diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py
index 19b998e..d041070 100644
--- a/src/target/trx_toolkit/transceiver.py
+++ b/src/target/trx_toolkit/transceiver.py
@@ -258,7 +258,7 @@
 
 	def recv_data_msg(self):
 		# Read and parse data from socket
-		msg = self.data_if.recv_l12trx_msg()
+		msg = self.data_if.recv_tx_msg()
 		if not msg:
 			return None
 
diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py
index 0cb62d3..8b6f80c 100755
--- a/src/target/trx_toolkit/trx_sniff.py
+++ b/src/target/trx_toolkit/trx_sniff.py
@@ -101,10 +101,10 @@
 		msg_raw = bytearray(trx.load)
 
 		# Determine a burst direction (L1 <-> TRX)
-		l12trx = udp.sport > udp.dport
+		tx_dir = udp.sport > udp.dport
 
 		# Create an empty DATA message
-		msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1()
+		msg = TxMsg() if tx_dir else RxMsg()
 
 		# Attempt to parse the payload as a DATA message
 		try:
@@ -124,8 +124,7 @@
 			return
 
 		# Debug print
-		log.debug("%s burst: %s" \
-			% ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr()))
+		log.debug("%s burst: %s", "L1 -> TRX" if tx_dir else "TRX -> L1", msg.desc_hdr())
 
 		# Poke message handler
 		self.msg_handle(msg)
@@ -139,10 +138,10 @@
 		# Direction filter
 		if self.argv.direction is not None:
 			if self.argv.direction == "TRX": # L1 -> TRX
-				if not isinstance(msg, DATAMSG_L12TRX):
+				if not isinstance(msg, TxMsg):
 					return False
 			elif self.argv.direction == "L1": # TRX -> L1
-				if not isinstance(msg, DATAMSG_TRX2L1):
+				if not isinstance(msg, RxMsg):
 					return False
 
 		# Timeslot filter
@@ -159,7 +158,7 @@
 				return False
 
 		# Message type specific filtering
-		if isinstance(msg, DATAMSG_TRX2L1):
+		if isinstance(msg, RxMsg):
 			# NOPE.ind filter
 			if not self.argv.pf_nope_ind and msg.nope_ind:
 				return False

-- 
To view, visit https://gerrit.osmocom.org/c/osmocom-bb/+/24018
To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings

Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Change-Id: I688f24a3c09dd7e1cc00b5530ec26c8e8cfd8f7c
Gerrit-Change-Number: 24018
Gerrit-PatchSet: 1
Gerrit-Owner: fixeria <vyanitskiy at sysmocom.de>
Gerrit-MessageType: newchange
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20210430/56d594f1/attachment.htm>


More information about the gerrit-log mailing list