[PATCH] osmocom-bb[master]: fake_trx: use DATAMSG classes for DATA messages

Harald Welte gerrit-no-reply at lists.osmocom.org
Thu Feb 22 15:33:42 UTC 2018


Review at  https://gerrit.osmocom.org/6829

fake_trx: use DATAMSG classes for DATA messages

The DATAMSG API, that was introduced and extended a few commits
before, provides all required methods to create, validate,
generate and parse DATA messages. Let's use it now.

Change-Id: Ibc99126dc05d873c1ba538a5f4e74866de563f56
---
M src/target/fake_trx/burst_gen.py
M src/target/fake_trx/burst_send.py
M src/target/fake_trx/data_if.py
M src/target/fake_trx/trx_sniff.py
4 files changed, 114 insertions(+), 157 deletions(-)


  git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/29/6829/1

diff --git a/src/target/fake_trx/burst_gen.py b/src/target/fake_trx/burst_gen.py
index 04b7f47..ced2de3 100755
--- a/src/target/fake_trx/burst_gen.py
+++ b/src/target/fake_trx/burst_gen.py
@@ -4,7 +4,7 @@
 # Auxiliary tool to generate and send random bursts via TRX DATA
 # interface, which may be useful for fuzzing and testing
 #
-# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator at gmail.com>
 #
 # All Rights Reserved
 #
@@ -22,7 +22,6 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
-import random
 import signal
 import getopt
 import sys
@@ -30,6 +29,7 @@
 from rand_burst_gen import RandBurstGen
 from data_if import DATAInterface
 from gsm_shared import *
+from data_msg import *
 
 COPYRIGHT = \
 	"Copyright (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>\n" \
@@ -70,38 +70,56 @@
 		# Init random burst generator
 		burst_gen = RandBurstGen()
 
+		# Init an empty DATA message
+		if self.conn_mode == "TRX":
+			msg = DATAMSG_L12TRX()
+		elif self.conn_mode == "L1":
+			msg = DATAMSG_TRX2L1()
+
 		# Generate a random frame number or use provided one
-		if self.fn is None:
-			fn = random.randint(0, GSM_HYPERFRAME)
-		else:
-			fn = self.fn
+		fn_init = msg.rand_fn() if self.fn is None else self.fn
 
 		# Send as much bursts as required
 		for i in range(self.burst_count):
+			# Randomize the message header
+			msg.rand_hdr()
+
+			# Increase and set frame number
+			msg.fn = (fn_init + i) % GSM_HYPERFRAME
+
+			# Set timeslot number
+			if self.tn is not None:
+				msg.tn = self.tn
+
+			# Set transmit power level
+			if self.pwr is not None:
+				msg.pwr = self.pwr
+
+			# TODO: also set TRX2L1 specific fields
+
 			# Generate a random burst
 			if self.burst_type == "NB":
-				buf = burst_gen.gen_nb()
+				burst = burst_gen.gen_nb()
 			elif self.burst_type == "FB":
-				buf = burst_gen.gen_fb()
+				burst = burst_gen.gen_fb()
 			elif self.burst_type == "SB":
-				buf = burst_gen.gen_sb()
+				burst = burst_gen.gen_sb()
 			elif self.burst_type == "AB":
-				buf = burst_gen.gen_ab()
+				burst = burst_gen.gen_ab()
 
-			print("[i] Sending %d/%d %s burst (fn=%u) to %s..."
+			# Convert to soft-bits in case of TRX -> L1 message
+			if self.conn_mode == "L1":
+				burst = msg.ubit2sbit(burst)
+
+			# Set burst
+			msg.burst = burst
+
+			print("[i] Sending %d/%d %s burst %s to %s..."
 				% (i + 1, self.burst_count, self.burst_type,
-					fn, self.conn_mode))
+					msg.desc_hdr(), self.conn_mode))
 
-			# Send to TRX or L1
-			if self.conn_mode == "TRX":
-				self.data_if.send_trx_msg(buf,
-					self.tn, fn, self.pwr)
-			elif self.conn_mode == "L1":
-				self.data_if.send_l1_msg(buf,
-					self.tn, fn, self.pwr)
-
-			# Increase frame number (for count > 1)
-			fn = (fn + 1) % GSM_HYPERFRAME
+			# Send message
+			self.data_if.send_msg(msg)
 
 		self.shutdown()
 
diff --git a/src/target/fake_trx/burst_send.py b/src/target/fake_trx/burst_send.py
index ee8e51a..4dbe984 100755
--- a/src/target/fake_trx/burst_send.py
+++ b/src/target/fake_trx/burst_send.py
@@ -3,7 +3,7 @@
 
 # Auxiliary tool to send existing bursts via TRX DATA interface
 #
-# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator at gmail.com>
 #
 # All Rights Reserved
 #
@@ -21,13 +21,13 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
-import random
 import signal
 import getopt
 import sys
 
 from data_if import DATAInterface
 from gsm_shared import *
+from data_msg import *
 
 COPYRIGHT = \
 	"Copyright (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>\n" \
@@ -74,40 +74,61 @@
 			print("[i] Reading bursts from stdin...")
 			src = sys.stdin
 
+		# Init an empty DATA message
+		if self.conn_mode == "TRX":
+			msg = DATAMSG_L12TRX()
+		elif self.conn_mode == "L1":
+			msg = DATAMSG_TRX2L1()
+
 		# Generate a random frame number or use provided one
-		if self.fn is None:
-			fn = random.randint(0, GSM_HYPERFRAME)
-		else:
-			fn = self.fn
+		fn = msg.rand_fn() if self.fn is None else self.fn
 
 		# Read the burst source line-by-line
 		for line in src:
 			# Strip spaces
-			burst = line.strip()
-			buf = []
+			burst_str = line.strip()
+			burst = []
 
 			# Check length
-			if len(burst) != 148:
+			if len(burst_str) != 148:
 				print("[!] Dropping burst due to length != 148")
 				continue
 
-			print("[i] Sending a burst (fn=%u) to %s..."
-				% (fn, self.conn_mode))
+			# Randomize the message header
+			msg.rand_hdr()
+
+			# Set frame number
+			msg.fn = fn
+
+			# Set timeslot number
+			if self.tn is not None:
+				msg.tn = self.tn
+
+			# Set transmit power level
+			if self.pwr is not None:
+				msg.pwr = self.pwr
+
+			# TODO: also set TRX2L1 specific fields
 
 			# Parse a string
-			for bit in burst:
+			for bit in burst_str:
 				if bit == "1":
-					buf.append(1)
+					burst.append(1)
 				else:
-					buf.append(0)
+					burst.append(0)
 
-			# Send to TRX or L1
-			if self.conn_mode == "TRX":
-				self.data_if.send_trx_msg(buf,
-					self.tn, fn, self.pwr)
-			elif self.conn_mode == "L1":
-				self.data_if.send_l1_msg(buf,
-					self.tn, fn, self.pwr)
+			# Convert to soft-bits in case of TRX -> L1 message
+			if self.conn_mode == "L1":
+				burst = msg.ubit2sbit(burst)
+
+			# Set burst
+			msg.burst = burst
+
+			print("[i] Sending a burst %s to %s..."
+				% (msg.desc_hdr(), self.conn_mode))
+
+			# Send message
+			self.data_if.send_msg(msg)
 
 			# Increase frame number (for count > 1)
 			fn = (fn + 1) % GSM_HYPERFRAME
diff --git a/src/target/fake_trx/data_if.py b/src/target/fake_trx/data_if.py
index 0f373ab..8b0cd8e 100644
--- a/src/target/fake_trx/data_if.py
+++ b/src/target/fake_trx/data_if.py
@@ -4,7 +4,7 @@
 # Virtual Um-interface (fake transceiver)
 # DATA interface implementation
 #
-# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator at gmail.com>
 #
 # All Rights Reserved
 #
@@ -22,85 +22,18 @@
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
-import random
-
 from udp_link import UDPLink
-from gsm_shared import *
+from data_msg import *
 
 class DATAInterface(UDPLink):
 
-	def send_l1_msg(self, burst,
-		tn = None, fn = None, rssi = None):
-		# Generate random timeslot index if not preset
-		if tn is None:
-			tn = random.randint(0, 7)
+	def send_msg(self, msg):
+		# Validate a message
+		if not msg.validate():
+			raise ValueError("Message incomplete or incorrect")
 
-		# Generate random frame number if not preset
-		if fn is None:
-			fn = random.randint(0, GSM_HYPERFRAME)
-
-		# Generate random RSSI if not preset
-		if rssi is None:
-			rssi = -random.randint(-75, -50)
-
-		# Prepare a buffer for header and burst
-		buf = []
-
-		# Put timeslot index
-		buf.append(tn)
-
-		# Put frame number
-		buf.append((fn >> 24) & 0xff)
-		buf.append((fn >> 16) & 0xff)
-		buf.append((fn >>  8) & 0xff)
-		buf.append((fn >>  0) & 0xff)
-
-		# Put RSSI
-		buf.append(rssi)
-
-		# HACK: put fake TOA value
-		buf += [0x00] * 2
-
-		# Put burst
-		buf += burst
-
-		# Put two unused bytes
-		buf += [0x00] * 2
+		# Generate TRX message
+		payload = msg.gen_msg()
 
 		# Send message
-		self.send(bytearray(buf))
-
-	def send_trx_msg(self, burst,
-		tn = None, fn = None, pwr = None):
-		# Generate random timeslot index if not preset
-		if tn is None:
-			tn = random.randint(0, 7)
-
-		# Generate random frame number if not preset
-		if fn is None:
-			fn = random.randint(0, GSM_HYPERFRAME)
-
-		# Generate random power level if not preset
-		if pwr is None:
-			pwr = random.randint(0, 34)
-
-		# Prepare a buffer for header and burst
-		buf = []
-
-		# Put timeslot index
-		buf.append(tn)
-
-		# Put frame number
-		buf.append((fn >> 24) & 0xff)
-		buf.append((fn >> 16) & 0xff)
-		buf.append((fn >>  8) & 0xff)
-		buf.append((fn >>  0) & 0xff)
-
-		# Put transmit power level
-		buf.append(pwr)
-
-		# Put burst
-		buf += burst
-
-		# Send message
-		self.send(bytearray(buf))
+		self.send(payload)
diff --git a/src/target/fake_trx/trx_sniff.py b/src/target/fake_trx/trx_sniff.py
index cf3ab9c..91c2c4a 100755
--- a/src/target/fake_trx/trx_sniff.py
+++ b/src/target/fake_trx/trx_sniff.py
@@ -27,6 +27,8 @@
 
 import scapy.all
 
+from data_msg import *
+
 COPYRIGHT = \
 	"Copyright (C) 2018 by Vadim Yanitskiy <axilirator at gmail.com>\n" \
 	"License GPLv2+: GNU GPL version 2 or later " \
@@ -98,32 +100,39 @@
 		trx = udp.payload
 
 		# Convert to bytearray
-		trx = bytearray(str(trx))
-
-		# Parse GSM TDMA specific data
-		fn = (trx[1] << 24) | (trx[2] << 16) | (trx[3] << 8) | trx[4]
-		tn = trx[0]
+		msg_raw = bytearray(str(trx))
 
 		# Determine a burst direction (L1 <-> TRX)
 		l12trx = udp.sport > udp.dport
 
+		# Create an empty DATA message
+		msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1()
+
+		# Attempt to parse the payload as a DATA message
+		try:
+			msg.parse_msg(msg_raw)
+		except:
+			print("[!] Failed to parse message, dropping...")
+			self.cnt_burst_dropped_num += 1
+			return
+
 		# Poke burst pass filter
-		rc = self.burst_pass_filter(l12trx, fn, tn)
+		rc = self.burst_pass_filter(l12trx, msg.fn, msg.tn)
 		if rc is False:
 			self.cnt_burst_dropped_num += 1
 			return
 
 		# Debug print
-		print("[i] %s burst: fn=%u, tn=%d" \
-			% ("L1 -> TRX" if l12trx else "TRX -> L1", fn, tn))
+		print("[i] %s burst: %s" \
+			% ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr()))
 
 		# Poke burst handler
-		rc = self.burst_handle(trx, l12trx, fn, tn)
+		rc = self.burst_handle(l12trx, msg_raw, msg)
 		if rc is False:
 			self.shutdown()
 
 		# Poke burst counter
-		rc = self.burst_count(fn, tn)
+		rc = self.burst_count(msg.fn, msg.tn)
 		if rc is True:
 			self.shutdown()
 
@@ -149,46 +158,22 @@
 		# Burst passed ;)
 		return True
 
-	def burst_handle(self, trx_burst, l12trx, fn, tn):
+	def burst_handle(self, l12trx, msg_raw, msg):
 		if self.print_bursts:
-			self.burst_dump_bits(sys.stdout, trx_burst, l12trx)
-			sys.stdout.flush()
+			print(msg.burst)
 
 		if self.output_file is not None:
 			# TLV: tag defines burst direction (one byte, BE)
 			self.output_file.write('\x01' if l12trx else '\x02')
 
 			# TLV: length of value (one byte, BE)
-			length = len(trx_burst)
+			length = len(msg_raw)
 			self.output_file.write(chr(length))
 
 			# TLV: raw value
-			self.output_file.write(trx_burst)
+			self.output_file.write(msg_raw)
 
 		return True
-
-	def burst_dump_bits(self, dst, trx_burst, l12trx):
-		# Split out burst header
-		if l12trx:
-			burst = trx_burst[6:]
-		else:
-			burst = trx_burst[8:]
-
-		# Print normal bits: 0 or 1
-		for i in range(0, 148):
-			# Convert bits to chars
-			if l12trx:
-				# Convert bits as is
-				bit = '1' if burst[i] else '0'
-			else:
-				# Convert trx bits {254..0} to sbits {-127..127}
-				bit = -127 if burst[i] == 255 else 127 - burst[i]
-				# Convert sbits {-127..127} to ubits {0..1}
-				bit = '1' if bit < 0 else '0'
-
-			# Write a normal bit
-			dst.write(bit)
-		dst.write("\n")
 
 	def burst_count(self, fn, tn):
 		# Update frame counter

-- 
To view, visit https://gerrit.osmocom.org/6829
To unsubscribe, visit https://gerrit.osmocom.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibc99126dc05d873c1ba538a5f4e74866de563f56
Gerrit-PatchSet: 1
Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Owner: Harald Welte <laforge at gnumonks.org>


More information about the gerrit-log mailing list