[PATCH] osmocom-bb[master]: fake_trx: implement a new tool for TRX protocol sniffing

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

Harald Welte gerrit-no-reply at lists.osmocom.org
Thu Feb 22 15:33:38 UTC 2018


Review at  https://gerrit.osmocom.org/6821

fake_trx: implement a new tool for TRX protocol sniffing

This change introduces a new tool, which could be used to sniff a
single connection between L1 and TRX in both directions, filter
captured bursts by direction, timeslot and/or frame number, and
finally write them to a binary file for further analysis.

Sniffing capability is based on Scapy framework, so it should
be installed in order to run this tool. Please also note,
that sniffing requires root access. For details, see:

https://github.com/secdev/scapy
https://scapy.readthedocs.io/en/latest/

Usage example:

  sudo ./trx_sniff --frame-count 30 --timeslot 2 -o /tmp/bursts

This command will capture 30 frames on timeslot number 2, and
write them to a binary file. The format of this file is based
on TLV (Tag Length Value), that wraps each burst:

  ... |-TAG (byte)-|-LEN (byte)-|-BURST (LEN bytes)-| ...

  TAG 0x01 - a message coming from L1 to TRX
  TAG 0x02 - a message coming from TRX to L1

Change-Id: I6e65e1d657574cc3e67bc7cdb1c01ef6bf08ecde
---
M src/target/fake_trx/README
A src/target/fake_trx/trx_sniff.py
2 files changed, 324 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/21/6821/1

diff --git a/src/target/fake_trx/README b/src/target/fake_trx/README
index 6e3ce03..5d4960f 100644
--- a/src/target/fake_trx/README
+++ b/src/target/fake_trx/README
@@ -31,3 +31,9 @@
   - burst_send.py - a tool for sending existing bursts from file
     or standard input either to L1 (OsmoBTS or OsmocomBB) or to
     TRX (OsmoTRX and GR-GSM TRX).
+
+  - trx_sniff.py - Scapy-based TRX protocol sniffer. Allows one
+    to observe a single connection between TRX and L1, and vice
+    versa. Also provides some capabilities for filtering bursts
+    by direction, frame and timeslot numbers, and for recording
+    captured messages to a binary file.
diff --git a/src/target/fake_trx/trx_sniff.py b/src/target/fake_trx/trx_sniff.py
new file mode 100755
index 0000000..cf3ab9c
--- /dev/null
+++ b/src/target/fake_trx/trx_sniff.py
@@ -0,0 +1,318 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Scapy-based TRX interface sniffer
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import signal
+import getopt
+import sys
+
+import scapy.all
+
+COPYRIGHT = \
+	"Copyright (C) 2018 by Vadim Yanitskiy <axilirator at gmail.com>\n" \
+	"License GPLv2+: GNU GPL version 2 or later " \
+	"<http://gnu.org/licenses/gpl.html>\n" \
+	"This is free software: you are free to change and redistribute it.\n" \
+	"There is NO WARRANTY, to the extent permitted by law.\n"
+
+class Application:
+	# Application variables
+	sniff_interface = "lo"
+	sniff_base_port = 5700
+	print_bursts = False
+	output_file = None
+
+	# Counters
+	cnt_burst_dropped_num = 0
+	cnt_burst_break = None
+	cnt_burst_num = 0
+
+	cnt_frame_break = None
+	cnt_frame_last = None
+	cnt_frame_num = 0
+
+	# Burst direction fliter
+	bf_dir_l12trx = None
+
+	# Timeslot number filter
+	bf_tn_val = None
+
+	# Frame number fliter
+	bf_fn_lt = None
+	bf_fn_gt = None
+
+	# Internal variables
+	lo_trigger = False
+
+	def __init__(self):
+		print(COPYRIGHT)
+		self.parse_argv()
+
+		# Open requested file for writing
+		if self.output_file is not None:
+			self.output_file = open(self.output_file, "ab")
+
+	def run(self):
+		# Compose a packet filter
+		pkt_filter = "udp and (port %d or port %d)" \
+			% (self.sniff_base_port + 2, self.sniff_base_port + 102)
+
+		print("[i] Listening on interface '%s'..." % self.sniff_interface)
+
+		# Start sniffing...
+		scapy.all.sniff(iface = self.sniff_interface, store = 1,
+			filter = pkt_filter, prn = self.pkt_handler)
+
+		# Scapy registers its own signal handler
+		self.shutdown()
+
+	def pkt_handler(self, ether):
+		# Prevent loopback packet duplication
+		if self.sniff_interface == "lo":
+			self.lo_trigger = not self.lo_trigger
+			if not self.lo_trigger:
+				return
+
+		# Extract a TRX payload
+		ip = ether.payload
+		udp = ip.payload
+		trx = udp.payload
+
+		# Convert to bytearray
+		trx = bytearray(str(trx))
+
+		# Parse GSM TDMA specific data
+		fn = (trx[1] << 24) | (trx[2] << 16) | (trx[3] << 8) | trx[4]
+		tn = trx[0]
+
+		# Determine a burst direction (L1 <-> TRX)
+		l12trx = udp.sport > udp.dport
+
+		# Poke burst pass filter
+		rc = self.burst_pass_filter(l12trx, fn, tn)
+		if rc is False:
+			self.cnt_burst_dropped_num += 1
+			return
+
+		# Debug print
+		print("[i] %s burst: fn=%u, tn=%d" \
+			% ("L1 -> TRX" if l12trx else "TRX -> L1", fn, tn))
+
+		# Poke burst handler
+		rc = self.burst_handle(trx, l12trx, fn, tn)
+		if rc is False:
+			self.shutdown()
+
+		# Poke burst counter
+		rc = self.burst_count(fn, tn)
+		if rc is True:
+			self.shutdown()
+
+	def burst_pass_filter(self, l12trx, fn, tn):
+		# Direction filter
+		if self.bf_dir_l12trx is not None:
+			if l12trx != self.bf_dir_l12trx:
+				return False
+
+		# Timeslot filter
+		if self.bf_tn_val is not None:
+			if tn != self.bf_tn_val:
+				return False
+
+		# Frame number filter
+		if self.bf_fn_lt is not None:
+			if fn > self.bf_fn_lt:
+				return False
+		if self.bf_fn_gt is not None:
+			if fn < self.bf_fn_gt:
+				return False
+
+		# Burst passed ;)
+		return True
+
+	def burst_handle(self, trx_burst, l12trx, fn, tn):
+		if self.print_bursts:
+			self.burst_dump_bits(sys.stdout, trx_burst, l12trx)
+			sys.stdout.flush()
+
+		if self.output_file is not None:
+			# TLV: tag defines burst direction (one byte, BE)
+			self.output_file.write('\x01' if l12trx else '\x02')
+
+			# TLV: length of value (one byte, BE)
+			length = len(trx_burst)
+			self.output_file.write(chr(length))
+
+			# TLV: raw value
+			self.output_file.write(trx_burst)
+
+		return True
+
+	def burst_dump_bits(self, dst, trx_burst, l12trx):
+		# Split out burst header
+		if l12trx:
+			burst = trx_burst[6:]
+		else:
+			burst = trx_burst[8:]
+
+		# Print normal bits: 0 or 1
+		for i in range(0, 148):
+			# Convert bits to chars
+			if l12trx:
+				# Convert bits as is
+				bit = '1' if burst[i] else '0'
+			else:
+				# Convert trx bits {254..0} to sbits {-127..127}
+				bit = -127 if burst[i] == 255 else 127 - burst[i]
+				# Convert sbits {-127..127} to ubits {0..1}
+				bit = '1' if bit < 0 else '0'
+
+			# Write a normal bit
+			dst.write(bit)
+		dst.write("\n")
+
+	def burst_count(self, fn, tn):
+		# Update frame counter
+		if self.cnt_frame_last is None:
+			self.cnt_frame_last = fn
+			self.cnt_frame_num += 1
+		else:
+			if fn != self.cnt_frame_last:
+				self.cnt_frame_num += 1
+
+		# Update burst counter
+		self.cnt_burst_num += 1
+
+		# Stop sniffing after N bursts
+		if self.cnt_burst_break is not None:
+			if self.cnt_burst_num == self.cnt_burst_break:
+				print("[i] Collected required amount of bursts")
+				return True
+
+		# Stop sniffing after N frames
+		if self.cnt_frame_break is not None:
+			if self.cnt_frame_num == self.cnt_frame_break:
+				print("[i] Collected required amount of frames")
+				return True
+
+		return False
+
+	def shutdown(self):
+		print("[i] Shutting down...")
+
+		# Print statistics
+		print("[i] %u bursts handled, %u dropped" \
+			% (self.cnt_burst_num, self.cnt_burst_dropped_num))
+
+		# Close output file if opened
+		if self.output_file is not None:
+			self.output_file.close()
+
+		# Exit
+		sys.exit(0)
+
+	def print_help(self, msg = None):
+		s  = " Usage: " + sys.argv[0] + " [options]\n\n" \
+			 " Some help...\n" \
+			 "  -h --help              this text\n\n"
+
+		s += " Sniffing options\n" \
+			 "  -i --sniff-interface   Set network interface (default '%s')\n"  \
+			 "  -p --sniff-base-port   Set base port number (default %d)\n\n"
+
+		s += " Processing (no processing by default)\n" \
+			 "  -o --output-file       Write bursts to file\n"          \
+			 "  -v --print-bits        Print burst bits to stdout\n\n"  \
+
+		s += " Count limitations (disabled by default)\n" \
+			 "  --frame-count   NUM    Stop after sniffing NUM frames\n"  \
+			 "  --burst-count   NUM    Stop after sniffing NUM bursts\n\n"
+
+		s += " Filtering (disabled by default)\n" \
+			 "  --direction     DIR    Burst direction: L12TRX or TRX2L1\n"  \
+			 "  --timeslot      NUM    TDMA timeslot number [0..7]\n"        \
+			 "  --frame-num-lt  NUM    TDMA frame number lower than NUM\n"   \
+			 "  --burst-num-gt  NUM    TDMA frame number greater than NUM\n"
+
+		print(s % (self.sniff_interface, self.sniff_base_port))
+
+		if msg is not None:
+			print(msg)
+
+	def parse_argv(self):
+		try:
+			opts, args = getopt.getopt(sys.argv[1:],
+				"i:p:o:v:h", ["help", "sniff-interface=", "sniff-base-port=",
+					"frame-count=", "burst-count=", "direction=",
+					"timeslot=", "frame-num-lt=", "frame-num-gt=",
+					"output-file=", "print-bits"])
+		except getopt.GetoptError as err:
+			self.print_help("[!] " + str(err))
+			sys.exit(2)
+
+		for o, v in opts:
+			if o in ("-h", "--help"):
+				self.print_help()
+				sys.exit(2)
+
+			elif o in ("-i", "--sniff-interface"):
+				self.sniff_interface = v
+			elif o in ("-p", "--sniff-base-port"):
+				self.sniff_base_port = int(v)
+
+			elif o in ("-o", "--output-file"):
+				self.output_file = v
+			elif o in ("-v", "--print-bits"):
+				self.print_bursts = True
+
+			# Break counters
+			elif o == "--frame-count":
+				self.cnt_frame_break = int(v)
+			elif o == "--burst-count":
+				self.cnt_burst_break = int(v)
+
+			# Direction filter
+			elif o == "--direction":
+				if v == "L12TRX":
+					self.bf_dir_l12trx = True
+				elif v == "TRX2L1":
+					self.bf_dir_l12trx = False
+				else:
+					self.print_help("[!] Wrong direction argument")
+					sys.exit(2)
+
+			# Timeslot pass filter
+			elif o == "--timeslot":
+				self.bf_tn_val = int(v)
+				if self.bf_tn_val < 0 or self.bf_tn_val > 7:
+					self.print_help("[!] Wrong timeslot value")
+					sys.exit(2)
+
+			# Frame number pass filter
+			elif o == "--frame-num-lt":
+				self.bf_fn_lt = int(v)
+			elif o == "--frame-num-gt":
+				self.bf_fn_gt = int(v)
+
+if __name__ == '__main__':
+	app = Application()
+	app.run()

-- 
To view, visit https://gerrit.osmocom.org/6821
To unsubscribe, visit https://gerrit.osmocom.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6e65e1d657574cc3e67bc7cdb1c01ef6bf08ecde
Gerrit-PatchSet: 1
Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Owner: Harald Welte <laforge at gnumonks.org>



More information about the gerrit-log mailing list