[PATCH] osmocom-bb[master]: target/fake_trx: initial release of virtual transceiver

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

Harald Welte gerrit-no-reply at lists.osmocom.org
Thu Feb 22 15:32:43 UTC 2018


Review at  https://gerrit.osmocom.org/6707

target/fake_trx: initial release of virtual transceiver

This is a set of tools for creating a virtual Um-interface between
OsmocomBB and OsmoBTS. It may be extremely useful for testing and
development of GSM stack, including both sides (MS and BTS). This
software implements OsmoTRX (Osmocom's fork of OpenBTS transceiver)
style clock (CLCK), control (CTRL) and data interfaces. So, OsmoBTS
source code doesn't require any modifications, while for OsmocomBB
you will need to use a new application - trxcon, which can be found
in the 'fixeria/sdr_phy' branch until one is merged to master.

Brief description of available applications:

  - fake_trx.py - main application, that allows to connect both
    OsmocomBB and OsmoBTS without actual RF hardware. Currently
    only a single MS may work with a single BTS.

  - clck_gen.py - a peripheral tool aimed to emulate TDMA frame
    clock generator. Could be used for testing and clock
    synchronization of multiple applications. It should be noted,
    that one relays on generic system timer (via Python), so
    a random clock jitter takes place.

  - ctrl_cmd.py - another peripheral tool, which could be used
    for sending CTRL commands directly in manual mode, and also
    for application fuzzing.

Change-Id: Ib1fb80682002ac85a72fa6abef459a4c44f4ab97
---
A src/target/fake_trx/.gitignore
A src/target/fake_trx/README
A src/target/fake_trx/burst_fwd.py
A src/target/fake_trx/clck_gen.py
A src/target/fake_trx/ctrl_cmd.py
A src/target/fake_trx/ctrl_if.py
A src/target/fake_trx/ctrl_if_bb.py
A src/target/fake_trx/ctrl_if_bts.py
A src/target/fake_trx/fake_trx.py
A src/target/fake_trx/udp_link.py
10 files changed, 789 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/07/6707/1

diff --git a/src/target/fake_trx/.gitignore b/src/target/fake_trx/.gitignore
new file mode 100644
index 0000000..749ccda
--- /dev/null
+++ b/src/target/fake_trx/.gitignore
@@ -0,0 +1,4 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
diff --git a/src/target/fake_trx/README b/src/target/fake_trx/README
new file mode 100644
index 0000000..ac138e0
--- /dev/null
+++ b/src/target/fake_trx/README
@@ -0,0 +1,24 @@
+This is a set of tools for creating a virtual Um-interface between
+OsmocomBB and OsmoBTS. It may be extremely useful for testing and
+development of GSM stack, including both sides (MS and BTS). This
+software implements OsmoTRX (Osmocom's fork of OpenBTS transceiver)
+style clock (CLCK), control (CTRL) and data interfaces. So, OsmoBTS
+source code doesn't require any modifications, while for OsmocomBB
+you will need to use a new application - trxcon, which can be found
+in the 'fixeria/sdr_phy' branch until one is merged to master.
+
+Brief description of available applications:
+
+  - fake_trx.py - main application, that allows to connect both
+    OsmocomBB and OsmoBTS without actual RF hardware. Currently
+    only a single MS may work with a single BTS.
+
+  - clck_gen.py - a peripheral tool aimed to emulate TDMA frame
+    clock generator. Could be used for testing and clock
+    synchronization of multiple applications. It should be noted,
+    that one relays on generic system timer (via Python), so
+    a random clock jitter takes place.
+
+  - ctrl_cmd.py - another peripheral tool, which could be used
+    for sending CTRL commands directly in manual mode, and also
+    for application fuzzing.
diff --git a/src/target/fake_trx/burst_fwd.py b/src/target/fake_trx/burst_fwd.py
new file mode 100644
index 0000000..d822747
--- /dev/null
+++ b/src/target/fake_trx/burst_fwd.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Virtual Um-interface (fake transceiver)
+# BTS <-> BB burst forwarding
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+class BurstForwarder:
+	# Timeslot filter
+	ts_pass = 0
+
+	def __init__(self, bts_link, bb_link):
+		self.bts_link = bts_link
+		self.bb_link = bb_link
+
+	def set_slot(self, ts):
+		if ts > 0 and ts < 8:
+			self.ts_pass = ts
+		else:
+			raise ValueError("Incorrect index for timeslot filter")
+
+	def process_payload(self, data):
+		payload = bytearray(data)
+		length = len(payload)
+
+		# HACK: set fake RSSI value (-53)
+		payload[5] = 0x35
+
+		# HACK: add fake TOA value (6th and 7th bytes)
+		payload[6:2] = [0x00, 0x00]
+
+		# Convert ubits to {255..0}
+		for i in range(8, length):
+			payload[i] = 255 if payload[i] else 0
+
+		return payload
+
+	# Downlink handler: BTS -> BB
+	def bts2bb(self):
+		# Read data from socket
+		data, addr = self.bts_link.sock.recvfrom(512)
+		payload = self.process_payload(data)
+
+		# Timeslot filter
+		if payload[0] != self.ts_pass:
+			return None
+
+		# Send burst to BB
+		self.bb_link.send(payload)
+
+	# Uplink handler: BB -> BTS
+	def bb2bts(self):
+		# Read data from socket
+		data, addr = self.bb_link.sock.recvfrom(512)
+		payload = self.process_payload(data)
+
+		# Send burst to BTS
+		self.bts_link.send(payload)
diff --git a/src/target/fake_trx/clck_gen.py b/src/target/fake_trx/clck_gen.py
new file mode 100755
index 0000000..4ef597f
--- /dev/null
+++ b/src/target/fake_trx/clck_gen.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Virtual Um-interface (fake transceiver)
+# Simple TDMA frame clock generator
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import signal
+import time
+import sys
+
+from threading import Timer
+from udp_link import UDPLink
+
+class CLCKGen:
+	# GSM TDMA definitions
+	SEC_DELAY_US = 1000 * 1000
+	GSM_SUPERFRAME = 2715648
+	GSM_FRAME_US = 4615.0
+
+	# Average loop back delay
+	LO_DELAY_US = 90.0
+
+	def __init__(self, clck_links, clck_start = 0, ind_period = 102):
+		self.clck_links = clck_links
+		self.ind_period = ind_period
+		self.clck_src = clck_start
+
+		# Calculate counter time
+		self.ctr_interval  = self.GSM_FRAME_US - self.LO_DELAY_US
+		self.ctr_interval /= self.SEC_DELAY_US
+		self.ctr_interval *= self.ind_period
+
+		# Create a timer manager
+		self.timer = Timer(self.ctr_interval, self.send_clck_ind)
+
+	def start(self):
+		# Schedule the first indication
+		self.timer.start()
+
+	def stop(self):
+		self.timer.cancel()
+
+	def send_clck_ind(self):
+		# Keep clock cycle
+		if self.clck_src % self.GSM_SUPERFRAME >= 0:
+			self.clck_src %= self.GSM_SUPERFRAME
+
+		# We don't need to send so often
+		if self.clck_src % self.ind_period == 0:
+			# Create UDP payload
+			payload = "IND CLOCK %u\0" % self.clck_src
+
+			# Send indication to all UDP links
+			for link in self.clck_links:
+				link.send(payload)
+
+			# Debug print
+			print("[T] %s" % payload)
+
+		# Increase frame count
+		self.clck_src += self.ind_period
+
+		# Schedule a new indication
+		self.timer = Timer(self.ctr_interval, self.send_clck_ind)
+		self.timer.start()
+
+# Just a wrapper for independent usage
+class Application:
+	def __init__(self):
+		# Set up signal handlers
+		signal.signal(signal.SIGINT, self.sig_handler)
+
+	def run(self):
+		self.link = UDPLink("127.0.0.1", 5800, 5700)
+		self.clck = CLCKGen([self.link], ind_period = 51)
+		self.clck.start()
+
+	def sig_handler(self, signum, frame):
+		print "Signal %d received" % signum
+		if signum is signal.SIGINT:
+			self.clck.stop()
+			self.link.shutdown()
+
+if __name__ == '__main__':
+	app = Application()
+	app.run()
diff --git a/src/target/fake_trx/ctrl_cmd.py b/src/target/fake_trx/ctrl_cmd.py
new file mode 100755
index 0000000..91d2a65
--- /dev/null
+++ b/src/target/fake_trx/ctrl_cmd.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Simple tool to send custom commands via TRX CTRL interface,
+# which may be also useful for fuzzing
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import signal
+import select
+import sys
+
+from udp_link import UDPLink
+
+class Application:
+	def __init__(self, remote_addr, remote_port, bind_port, fuz = False):
+		# Init UDP connection
+		self.ctrl_link = UDPLink(remote_addr, remote_port, bind_port)
+
+		# Determine working mode
+		self.fuzzing = fuz
+
+		# Set up signal handlers
+		signal.signal(signal.SIGINT, self.sig_handler)
+
+	def run(self):
+		while True:
+			self.print_prompt()
+
+			# Wait until we get any data on any socket
+			socks = [sys.stdin]
+			r_event, w_event, x_event = select.select(socks, [], [])
+
+			# Check for incoming CTRL commands
+			if sys.stdin in r_event:
+				cmd = sys.stdin.readline()
+				self.handle_cmd(cmd)
+
+	def handle_cmd(self, cmd):
+		# Strip spaces, tabs, etc.
+		cmd = cmd.strip().strip("\0")
+
+		# Send a command
+		if self.fuzzing:
+			self.ctrl_link.send("%s" % cmd)
+		else:
+			self.ctrl_link.send("CMD %s\0" % cmd)
+
+	def print_prompt(self):
+		sys.stdout.write("CTRL# ")
+		sys.stdout.flush()
+
+	def sig_handler(self, signum, frame):
+		print("\n\nSignal %d received" % signum)
+		if signum is signal.SIGINT:
+			self.ctrl_link.shutdown()
+			sys.exit(0)
+
+if __name__ == '__main__':
+	app = Application("127.0.0.1", 5701, 5801)
+	app.run()
diff --git a/src/target/fake_trx/ctrl_if.py b/src/target/fake_trx/ctrl_if.py
new file mode 100644
index 0000000..a87c4c4
--- /dev/null
+++ b/src/target/fake_trx/ctrl_if.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Virtual Um-interface (fake transceiver)
+# CTRL interface implementation
+#
+# (C) 2016-2017 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from udp_link import UDPLink
+
+class CTRLInterface(UDPLink):
+	def handle_rx(self, data):
+		# print(data)
+		if self.verify_req(data):
+			request = self.prepare_req(data)
+			rc = self.parse_cmd(request)
+			self.send_response(request, rc)
+		else:
+			print("[!] Wrong data on CTRL interface")
+
+	def verify_req(self, data):
+		# Verify command signature
+		return data.startswith("CMD")
+
+	def prepare_req(self, data):
+		# Strip signature, paddings and \0
+		request = data[4:].strip().strip("\0")
+		# Split into a command and arguments
+		request = request.split(" ")
+		# Now we have something like ["TXTUNE", "941600"]
+		return request
+
+	def verify_cmd(self, request, cmd, argc):
+		# Check if requested command matches
+		if request[0] != cmd:
+			return False
+
+		# And has enough arguments
+		if len(request) - 1 != argc:
+			return False
+
+		# Check if all arguments are numeric
+		for v in request[1:]:
+			if not v.isdigit():
+				return False
+
+		return True
+
+	def send_response(self, request, response_code):
+		# Include status code, for example ["TXTUNE", "0", "941600"]
+		request.insert(1, str(response_code))
+		# Add the response signature, and join back to string
+		response = "RSP " + " ".join(request) + "\0"
+		# Now we have something like "RSP TXTUNE 0 941600"
+		self.send(response)
+
+	def parse_cmd(self, request):
+		raise NotImplementedError
diff --git a/src/target/fake_trx/ctrl_if_bb.py b/src/target/fake_trx/ctrl_if_bb.py
new file mode 100644
index 0000000..6880000
--- /dev/null
+++ b/src/target/fake_trx/ctrl_if_bb.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Virtual Um-interface (fake transceiver)
+# CTRL interface implementation (OsmocomBB specific)
+#
+# (C) 2016-2017 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from ctrl_if import CTRLInterface
+
+class CTRLInterfaceBB(CTRLInterface):
+	# Internal state variables
+	trx_started = False
+	rx_freq = None
+	tx_freq = None
+
+	def __init__(self, remote_addr, remote_port, bind_port):
+		print("[i] Init CTRL interface for BB")
+		CTRLInterface.__init__(self, remote_addr, remote_port, bind_port)
+
+	def shutdown(self):
+		print("[i] Shutdown CTRL interface for BB")
+		CTRLInterface.shutdown(self)
+
+	def parse_cmd(self, request):
+		# Power control
+		if self.verify_cmd(request, "POWERON", 0):
+			print("[i] Recv POWERON CMD")
+
+			# Ensure transceiver isn't working
+			if self.trx_started:
+				print("[!] Transceiver already started")
+				return -1
+
+			# Ensure RX / TX freq. are set
+			if (self.rx_freq is None) or (self.tx_freq is None):
+				print("[!] Transceiver already started")
+				return -1
+
+			print("[i] Starting transceiver...")
+			self.trx_started = True
+			return 0
+
+		elif self.verify_cmd(request, "POWEROFF", 0):
+			print("[i] Recv POWEROFF cmd")
+
+			print("[i] Stopping transceiver...")
+			self.trx_started = False
+			return 0
+
+		# Tuning Control
+		elif self.verify_cmd(request, "RXTUNE", 1):
+			print("[i] Recv RXTUNE cmd")
+
+			# TODO: check freq range
+			self.rx_freq = int(request[1]) * 1000
+			return 0
+
+		elif self.verify_cmd(request, "TXTUNE", 1):
+			print("[i] Recv TXTUNE cmd")
+
+			# TODO: check freq range
+			self.tx_freq = int(request[1]) * 1000
+			return 0
+
+		# Wrong / unknown command
+		else:
+			# We don't care about other commands,
+			# so let's merely ignore them ;)
+			print("[i] Ignore CMD %s" % request[0])
+			return 0
diff --git a/src/target/fake_trx/ctrl_if_bts.py b/src/target/fake_trx/ctrl_if_bts.py
new file mode 100644
index 0000000..cdaaf81
--- /dev/null
+++ b/src/target/fake_trx/ctrl_if_bts.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Virtual Um-interface (fake transceiver)
+# CTRL interface implementation (OsmoBTS specific)
+#
+# (C) 2016-2017 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from ctrl_if import CTRLInterface
+
+class CTRLInterfaceBTS(CTRLInterface):
+	# Internal state variables
+	trx_started = False
+	rx_freq = None
+	tx_freq = None
+
+	def __init__(self, remote_addr, remote_port, bind_port):
+		print("[i] Init CTRL interface for BTS")
+		CTRLInterface.__init__(self, remote_addr, remote_port, bind_port)
+
+	def shutdown(self):
+		print("[i] Shutdown CTRL interface for BTS")
+		CTRLInterface.shutdown(self)
+
+	def parse_cmd(self, request):
+		# Power control
+		if self.verify_cmd(request, "POWERON", 0):
+			print("[i] Recv POWERON CMD")
+
+			# Ensure transceiver isn't working
+			if self.trx_started:
+				print("[!] Transceiver already started")
+				return -1
+
+			# Ensure RX / TX freq. are set
+			if (self.rx_freq is None) or (self.tx_freq is None):
+				print("[!] Transceiver already started")
+				return -1
+
+			print("[i] Starting transceiver...")
+			self.trx_started = True
+			return 0
+
+		elif self.verify_cmd(request, "POWEROFF", 0):
+			print("[i] Recv POWEROFF cmd")
+
+			print("[i] Stopping transceiver...")
+			self.trx_started = False
+			return 0
+
+		# Tuning Control
+		elif self.verify_cmd(request, "RXTUNE", 1):
+			print("[i] Recv RXTUNE cmd")
+
+			# TODO: check freq range
+			self.rx_freq = int(request[1]) * 1000
+			return 0
+
+		elif self.verify_cmd(request, "TXTUNE", 1):
+			print("[i] Recv TXTUNE cmd")
+
+			# TODO: check freq range
+			self.tx_freq = int(request[1]) * 1000
+			return 0
+
+		# Wrong / unknown command
+		else:
+			# We don't care about other commands,
+			# so let's merely ignore them ;)
+			print("[i] Ignore CMD %s" % request[0])
+			return 0
diff --git a/src/target/fake_trx/fake_trx.py b/src/target/fake_trx/fake_trx.py
new file mode 100755
index 0000000..c30382b
--- /dev/null
+++ b/src/target/fake_trx/fake_trx.py
@@ -0,0 +1,204 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Virtual Um-interface (fake transceiver)
+# OsmocomBB <-> OsmoBTS bridge
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import signal
+import getopt
+import select
+import sys
+
+from ctrl_if_bts import CTRLInterfaceBTS
+from ctrl_if_bb import CTRLInterfaceBB
+from burst_fwd import BurstForwarder
+
+from udp_link import UDPLink
+from clck_gen import CLCKGen
+
+COPYRIGHT = \
+	"Copyright (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>\n" \
+	"License GPLv2+: GNU GPL version 2 or later " \
+	"<http://gnu.org/licenses/gpl.html>\n" \
+	"This is free software: you are free to change and redistribute it.\n" \
+	"There is NO WARRANTY, to the extent permitted by law.\n"
+
+class Application:
+	# Application variables
+	bts_addr = "127.0.0.1"
+	bb_addr = "127.0.0.1"
+	bts_base_port = 5700
+	bb_base_port = 5703
+
+	def __init__(self):
+		self.print_copyright()
+		self.parse_argv()
+
+		# Set up signal handlers
+		signal.signal(signal.SIGINT, self.sig_handler)
+
+	def run(self):
+		# Init TRX CTRL interface for BTS
+		self.bts_ctrl = CTRLInterfaceBTS(self.bts_addr,
+			self.bts_base_port + 101, self.bts_base_port + 1)
+
+		# Init TRX CTRL interface for BB
+		self.bb_ctrl = CTRLInterfaceBB(self.bb_addr,
+			self.bb_base_port + 101, self.bb_base_port + 1)
+
+		# BTS <-> BB burst forwarding
+		self.bts_data = UDPLink(self.bts_addr,
+			self.bts_base_port + 102, self.bts_base_port + 2)
+		self.bb_data = UDPLink(self.bb_addr,
+			self.bb_base_port + 102, self.bb_base_port + 2)
+		self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data)
+
+		# Share clock between BTS and BB
+		self.bts_clck = UDPLink(self.bts_addr,
+			self.bts_base_port + 100, self.bts_base_port)
+		self.bb_clck = UDPLink(self.bb_addr,
+			self.bb_base_port + 100, self.bb_base_port)
+		self.clck_gen = CLCKGen([self.bts_clck, self.bb_clck])
+		self.clck_gen.start()
+
+		print("[i] Init complete")
+
+		# Enter main loop
+		while True:
+			socks = [self.bts_ctrl.sock, self.bb_ctrl.sock,
+				self.bts_data.sock, self.bb_data.sock]
+
+			# Wait until we get any data on any socket
+			r_event, w_event, x_event = select.select(socks, [], [])
+
+			# Downlink: BTS -> BB
+			if self.bts_data.sock in r_event:
+				self.burst_fwd.bts2bb()
+
+			# Uplink: BB -> BTS
+			if self.bb_data.sock in r_event:
+				self.burst_fwd.bb2bts()
+
+			# CTRL commands from BTS
+			if self.bts_ctrl.sock in r_event:
+				data, addr = self.bts_ctrl.sock.recvfrom(128)
+				self.bts_ctrl.handle_rx(data)
+
+			# CTRL commands from BB
+			if self.bb_ctrl.sock in r_event:
+				data, addr = self.bb_ctrl.sock.recvfrom(128)
+				self.bb_ctrl.handle_rx(data)
+
+	def shutdown(self):
+		print("[i] Shutting down...")
+
+		# Stop clock generator
+		self.clck_gen.stop()
+
+		# Close CLCK interfaces
+		self.bts_clck.shutdown()
+		self.bb_clck.shutdown()
+
+		# Close CTRL interfaces
+		self.bts_ctrl.shutdown()
+		self.bb_ctrl.shutdown()
+
+		# Close DATA interfaces
+		self.bts_data.shutdown()
+		self.bb_data.shutdown()
+
+
+	def print_copyright(self):
+		print(COPYRIGHT)
+
+	def print_help(self, msg = None):
+		s  = " Usage: " + sys.argv[0] + " [options]\n\n" \
+			 " Some help...\n" \
+			 "  -h --help           this text\n\n"
+
+		s += " TRX interface specific\n" \
+			 "  -R --bts-addr       Set BTS remote address (default %s)\n"   \
+			 "  -r --bb-addr        Set BB remote address (default %s)\n"    \
+			 "  -P --bts-base-port  Set BTS base port number (default %d)\n" \
+			 "  -p --bb-base-port   Set BB base port number (default %d)\n"
+
+		print(s % (self.bts_addr, self.bb_addr,
+			self.bts_base_port, self.bb_base_port))
+
+		if msg is not None:
+			print(msg)
+
+	def parse_argv(self):
+		try:
+			opts, args = getopt.getopt(sys.argv[1:],
+				"R:r:P:p:h",
+				["help", "bts-addr=", "bb-addr=",
+					"bts-base-port=", "bb-base-port="])
+		except getopt.GetoptError as err:
+			self.print_help("[!] " + str(err))
+			sys.exit(2)
+
+		for o, v in opts:
+			if o in ("-h", "--help"):
+				self.print_help()
+				sys.exit(2)
+
+			elif o in ("-R", "--bts-addr"):
+				self.bts_addr = v
+			elif o in ("-r", "--bb-addr"):
+				self.bb_addr = v
+
+			elif o in ("-P", "--bts-base-port"):
+				self.bts_base_port = int(v)
+			elif o in ("-p", "--bb-base-port"):
+				self.bb_base_port = int(v)
+
+		# Ensure there is no overlap between ports
+		if self.bts_base_port == self.bb_base_port:
+			self.print_help("[!] BTS and BB base ports should be different")
+			sys.exit(2)
+
+		bts_ports = [
+			self.bts_base_port + 0, self.bts_base_port + 100,
+			self.bts_base_port + 1, self.bts_base_port + 101,
+			self.bts_base_port + 2, self.bts_base_port + 102,
+		]
+
+		bb_ports = [
+			self.bb_base_port + 0, self.bb_base_port + 100,
+			self.bb_base_port + 1, self.bb_base_port + 101,
+			self.bb_base_port + 2, self.bb_base_port + 102,
+		]
+
+		for p in bb_ports:
+			if p in bts_ports:
+				self.print_help("[!] BTS and BB ports overlap detected")
+				sys.exit(2)
+
+	def sig_handler(self, signum, frame):
+		print("Signal %d received" % signum)
+		if signum is signal.SIGINT:
+			self.shutdown()
+			sys.exit(0)
+
+if __name__ == '__main__':
+	app = Application()
+	app.run()
diff --git a/src/target/fake_trx/udp_link.py b/src/target/fake_trx/udp_link.py
new file mode 100644
index 0000000..3fa5050
--- /dev/null
+++ b/src/target/fake_trx/udp_link.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# Virtual Um-interface (fake transceiver)
+# UDP link implementation
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import socket
+import select
+
+class UDPLink:
+	def __init__(self, remote_addr, remote_port, bind_port):
+		self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+		self.sock.bind(('0.0.0.0', bind_port))
+		self.sock.setblocking(0)
+
+		# Save remote info
+		self.remote_addr = remote_addr
+		self.remote_port = remote_port
+
+	def loop(self):
+		r_event, w_event, x_event = select.select([self.sock], [], [])
+
+		# Check for incoming data
+		if self.sock in r_event:
+			data, addr = self.sock.recvfrom(128)
+			self.handle_rx(data)
+
+	def shutdown(self):
+		self.sock.close();
+
+	def send(self, data):
+		self.sock.sendto(data, (self.remote_addr, self.remote_port))
+
+	def handle_rx(self, data):
+		raise NotImplementedError

-- 
To view, visit https://gerrit.osmocom.org/6707
To unsubscribe, visit https://gerrit.osmocom.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib1fb80682002ac85a72fa6abef459a4c44f4ab97
Gerrit-PatchSet: 1
Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Owner: Harald Welte <laforge at gnumonks.org>



More information about the gerrit-log mailing list