This is merely a historical archive of years 2008-2021, before the migration to mailman3.
A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.
Vadim Yanitskiy gerrit-no-reply at lists.osmocom.orgVadim Yanitskiy has uploaded this change for review. ( https://gerrit.osmocom.org/12264
Change subject: (WIP) trx_toolkit/fake_trx.py: refactor global class hierarchy
......................................................................
(WIP) trx_toolkit/fake_trx.py: refactor global class hierarchy
Change-Id: Ice44e2b22566b3652ef6d43896055963b13ab185
---
M src/target/trx_toolkit/burst_fwd.py
A src/target/trx_toolkit/clck_router_trx.py
D src/target/trx_toolkit/ctrl_if_bb.py
D src/target/trx_toolkit/ctrl_if_bts.py
A src/target/trx_toolkit/ctrl_if_trx.py
M src/target/trx_toolkit/fake_pm.py
M src/target/trx_toolkit/fake_trx.py
A src/target/trx_toolkit/transceiver.py
8 files changed, 614 insertions(+), 786 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/64/12264/1
diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py
index 3cb6acd..3a83753 100644
--- a/src/target/trx_toolkit/burst_fwd.py
+++ b/src/target/trx_toolkit/burst_fwd.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# TRX Toolkit
-# BTS <-> BB burst forwarding
+# Burst forwarding between transceivers
#
# (C) 2017-2018 by Vadim Yanitskiy <axilirator at gmail.com>
#
@@ -23,321 +23,58 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
-import random
from data_msg import *
class BurstForwarder:
- """ Performs burst forwarding and preprocessing between MS and BTS.
+ """ Performs burst forwarding between transceivers.
- == Pass-filtering parameters
+ BurstForwarder dispatches bursts between the list of given
+ FakeTRX (Transceiver) instances depending on the following
+ parameters of each transceiver:
- BurstForwarder may drop or pass an UL/DL burst depending
- on the following parameters:
-
- - bts_freq / bb_freq - the current BTS / MS frequency
- that was set using RXTUNE control command. By default,
- both freq. values are set to None, so nothing is being
- forwarded (i.e. bursts are getting dropped).
-
- FIXME: currently, we don't care about TXTUNE command
- and transmit frequencies. It would be great to distinguish
- between RX and TX frequencies for both BTS and MS.
-
- - ts_pass_list - the list of active (i.e. configured)
- timeslot numbers for the MS. A timeslot can be activated
- or deactivated using SETSLOT control command from the MS.
-
- FIXME: there is no such list for the BTS side.
-
- == Preprocessing and measurement simulation
-
- Since this is a virtual environment, we can simulate different
- parameters of a virtual RF interface:
-
- - ToA (Timing of Arrival) - measured difference between expected
- and actual time of burst arrival in units of 1/256 of GSM symbol
- periods. A pair of both base and threshold values defines a range
- of ToA value randomization:
-
- DL: from (toa256_dl_base - toa256_dl_threshold)
- to (toa256_dl_base + toa256_dl_threshold),
- UL: from (toa256_ul_base - toa256_ul_threshold)
- to (toa256_ul_base + toa256_ul_threshold).
-
- - RSSI (Received Signal Strength Indication) - measured "power" of
- the signal (per burst) in dBm. A pair of both base and threshold
- values defines a range of RSSI value randomization:
-
- DL: from (rssi_dl_base - rssi_dl_threshold)
- to (rssi_dl_base + rssi_dl_threshold),
- UL: from (rssi_ul_base - rssi_ul_threshold)
- to (rssi_ul_base + rssi_ul_threshold).
-
- Please note that the randomization of both RSSI and ToA
- is optional, and can be enabled from the control interface.
-
- === Timing Advance handling
-
- The BTS is using ToA measurements for UL bursts in order to calculate
- Timing Advance value, that is then indicated to a MS, which in its turn
- shall apply this value to the transmitted signal in order to compensate
- the delay. Basically, every burst is transmitted in advance defined by
- the indicated Timing Advance value. The valid range is 0..63, where
- each unit means one GSM symbol advance. The actual Timing Advance value
- is set using SETTA control command from MS. By default, it's set to 0.
-
- === Path loss simulation - burst dropping
-
- In some cases, e.g. due to a weak signal or high interference, a burst
- can be lost, i.e. not detected by the receiver. This can also be
- simulated using FAKE_DROP command on both control interfaces:
-
- - burst_{dl|ul}_drop_amount - the amount of DL/UL bursts
- to be dropped (i.e. not forwarded towards the MS/BTS),
-
- - burst_{dl|ul}_drop_period - drop every X DL/UL burst, e.g.
- 1 - drop every consequent burst, 2 - drop every second burst, etc.
+ - execution state (running or idle),
+ - actual RX / TX frequencies,
+ - list of active timeslots.
"""
- def __init__(self, bts_link, bb_link):
- self.bts_link = bts_link
- self.bb_link = bb_link
+ def __init__(self, trx_list = []):
+ # List of Transceiver instances
+ self.trx_list = trx_list
- # Init default parameters
- self.reset_dl()
- self.reset_ul()
+ def add_trx(self, trx):
+ if trx in self.trx_list:
+ log.error("TRX is already in the list")
+ return
- # Initialize (or reset to) default parameters for Downlink
- def reset_dl(self):
- # Unset current DL freq.
- self.bts_freq = None
+ self.trx_list.append(trx)
- # Indicated RSSI / ToA values
- self.toa256_dl_base = 0
- self.rssi_dl_base = -60
+ def del_trx(self, trx):
+ if trx not in self.trx_list:
+ log.error("TRX is not in the list")
+ return
- # RSSI / ToA randomization threshold
- self.toa256_dl_threshold = 0
- self.rssi_dl_threshold = 0
+ self.trx_list.remove(trx)
- # Path loss simulation (burst dropping)
- self.burst_dl_drop_amount = 0
- self.burst_dl_drop_period = 1
+ def forward_msg(self, src_trx, rx_msg):
+ # Transform from L12TRX to TRX2L1
+ tx_msg = rx_msg.gen_trx2l1()
+ if tx_msg is None:
+ log.error("Forwarding failed, could not transform "
+ "message (%s) => dropping..." % rx_msg.desc_hdr())
- # Initialize (or reset to) default parameters for Uplink
- def reset_ul(self):
- # Unset current DL freq.
- self.bb_freq = None
+ # Iterate over all known transceivers
+ for trx in self.trx_list:
+ if trx == src_trx:
+ continue
- # Indicated RSSI / ToA values
- self.rssi_ul_base = -70
- self.toa256_ul_base = 0
+ # Check transceiver state
+ if not trx.running:
+ continue
+ if trx.rx_freq != src_trx.tx_freq:
+ continue
+ if tx_msg.tn not in trx.ts_list:
+ continue
- # RSSI / ToA randomization threshold
- self.toa256_ul_threshold = 0
- self.rssi_ul_threshold = 0
-
- # Path loss simulation (burst dropping)
- self.burst_ul_drop_amount = 0
- self.burst_ul_drop_period = 1
-
- # Init timeslot filter (drop everything by default)
- self.ts_pass_list = []
-
- # Reset Timing Advance value
- self.ta = 0
-
- # Converts TA value from symbols to
- # units of 1/256 of GSM symbol periods
- def calc_ta256(self):
- return self.ta * 256
-
- # Calculates a random ToA value for Downlink bursts
- def calc_dl_toa256(self):
- # Check if randomization is required
- if self.toa256_dl_threshold is 0:
- return self.toa256_dl_base
-
- # Calculate a range for randomization
- toa256_min = self.toa256_dl_base - self.toa256_dl_threshold
- toa256_max = self.toa256_dl_base + self.toa256_dl_threshold
-
- # Generate a random ToA value
- toa256 = random.randint(toa256_min, toa256_max)
-
- return toa256
-
- # Calculates a random ToA value for Uplink bursts
- def calc_ul_toa256(self):
- # Check if randomization is required
- if self.toa256_ul_threshold is 0:
- return self.toa256_ul_base
-
- # Calculate a range for randomization
- toa256_min = self.toa256_ul_base - self.toa256_ul_threshold
- toa256_max = self.toa256_ul_base + self.toa256_ul_threshold
-
- # Generate a random ToA value
- toa256 = random.randint(toa256_min, toa256_max)
-
- return toa256
-
- # Calculates a random RSSI value for Downlink bursts
- def calc_dl_rssi(self):
- # Check if randomization is required
- if self.rssi_dl_threshold is 0:
- return self.rssi_dl_base
-
- # Calculate a range for randomization
- rssi_min = self.rssi_dl_base - self.rssi_dl_threshold
- rssi_max = self.rssi_dl_base + self.rssi_dl_threshold
-
- # Generate a random RSSI value
- return random.randint(rssi_min, rssi_max)
-
- # Calculates a random RSSI value for Uplink bursts
- def calc_ul_rssi(self):
- # Check if randomization is required
- if self.rssi_ul_threshold is 0:
- return self.rssi_ul_base
-
- # Calculate a range for randomization
- rssi_min = self.rssi_ul_base - self.rssi_ul_threshold
- rssi_max = self.rssi_ul_base + self.rssi_ul_threshold
-
- # Generate a random RSSI value
- return random.randint(rssi_min, rssi_max)
-
- # DL path loss simulation
- def path_loss_sim_dl(self, msg):
- # Burst dropping
- if self.burst_dl_drop_amount > 0:
- if msg.fn % self.burst_dl_drop_period == 0:
- log.info("Simulation: dropping DL burst (fn=%u %% %u == 0)"
- % (msg.fn, self.burst_dl_drop_period))
- self.burst_dl_drop_amount -= 1
- return None
-
- return msg
-
- # UL path loss simulation
- def path_loss_sim_ul(self, msg):
- # Burst dropping
- if self.burst_ul_drop_amount > 0:
- if msg.fn % self.burst_ul_drop_period == 0:
- log.info("Simulation: dropping UL burst (fn=%u %% %u == 0)"
- % (msg.fn, self.burst_ul_drop_period))
- self.burst_ul_drop_amount -= 1
- return None
-
- return msg
-
- # DL burst preprocessing
- def preprocess_dl_burst(self, msg):
- # Calculate both RSSI and ToA values
- msg.toa256 = self.calc_dl_toa256()
- msg.rssi = self.calc_dl_rssi()
-
- # UL burst preprocessing
- def preprocess_ul_burst(self, msg):
- # Calculate both RSSI and ToA values,
- # also apply Timing Advance
- msg.toa256 = self.calc_ul_toa256()
- msg.toa256 -= self.calc_ta256()
- msg.rssi = self.calc_ul_rssi()
-
- # Converts a L12TRX message to TRX2L1 message
- def transform_msg(self, msg_raw):
- # Attempt to parse a message
- try:
- msg_l12trx = DATAMSG_L12TRX()
- msg_l12trx.parse_msg(bytearray(msg_raw))
- except:
- log.error("Dropping unhandled DL message...")
- return None
-
- # Compose a new message for L1
- return msg_l12trx.gen_trx2l1()
-
- # Downlink handler: BTS -> BB
- def bts2bb(self):
- # Read data from socket
- data, addr = self.bts_link.sock.recvfrom(512)
-
- # BB is not connected / tuned
- if self.bb_freq is None:
- return None
-
- # Freq. filter
- if self.bb_freq != self.bts_freq:
- return None
-
- # Process a message
- msg = self.transform_msg(data)
- if msg is None:
- return None
-
- # Timeslot filter
- if msg.tn not in self.ts_pass_list:
- return None
-
- # Path loss simulation
- msg = self.path_loss_sim_dl(msg)
- if msg is None:
- return None
-
- # Burst preprocessing
- self.preprocess_dl_burst(msg)
-
- # Validate and generate the payload
- payload = msg.gen_msg()
-
- # Append two unused bytes at the end
- # in order to keep the compatibility
- payload += bytearray(2)
-
- # Send burst to BB
- self.bb_link.send(payload)
-
- # Uplink handler: BB -> BTS
- def bb2bts(self):
- # Read data from socket
- data, addr = self.bb_link.sock.recvfrom(512)
-
- # BTS is not connected / tuned
- if self.bts_freq is None:
- return None
-
- # Freq. filter
- if self.bb_freq != self.bts_freq:
- return None
-
- # Process a message
- msg = self.transform_msg(data)
- if msg is None:
- return None
-
- # Timeslot filter
- if msg.tn not in self.ts_pass_list:
- log.warning("TS %u is not configured, dropping UL burst..." % msg.tn)
- return None
-
- # Path loss simulation
- msg = self.path_loss_sim_ul(msg)
- if msg is None:
- return None
-
- # Burst preprocessing
- self.preprocess_ul_burst(msg)
-
- # Validate and generate the payload
- payload = msg.gen_msg()
-
- # Append two unused bytes at the end
- # in order to keep the compatibility
- payload += bytearray(2)
-
- # Send burst to BTS
- self.bts_link.send(payload)
+ trx.send_data_msg(tx_msg)
diff --git a/src/target/trx_toolkit/clck_router_trx.py b/src/target/trx_toolkit/clck_router_trx.py
new file mode 100755
index 0000000..c85286b
--- /dev/null
+++ b/src/target/trx_toolkit/clck_router_trx.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Clock distribution router for transceivers
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import logging as log
+
+from clck_gen import CLCKGen
+
+class CLCKRouterTRX(CLCKGen):
+ def __init__(self, trx_list, *clck_gen_argv):
+ CLCKGen.__init__(self, [], *clck_gen_argv)
+ self.trx_list = trx_list
+
+ def poke(self, trx):
+ log.debug("CLCKRouterTRX poke!")
+
+ if (trx.clck_if not in self.clck_links) and trx.running:
+ # Transceiver was started
+ log.debug("Transceiver was started")
+ self.clck_links.append(trx.clck_if)
+ elif (trx.clck_if in self.clck_links) and not trx.running:
+ # Transceiver was stopped
+ log.debug("Transceiver was stopped")
+ self.clck_links.remove(trx.clck_if)
+
+ # Start / stop clock generator
+ if (len(self.clck_links) > 0) and self.timer is None:
+ log.debug("Starting clock generator")
+ self.start()
+ elif (len(self.clck_links) is 0) and self.timer is not None:
+ log.debug("Stopping clock generator")
+ self.stop()
diff --git a/src/target/trx_toolkit/ctrl_if_bb.py b/src/target/trx_toolkit/ctrl_if_bb.py
deleted file mode 100644
index fe7f3e8..0000000
--- a/src/target/trx_toolkit/ctrl_if_bb.py
+++ /dev/null
@@ -1,227 +0,0 @@
-#!/usr/bin/env python2
-# -*- coding: utf-8 -*-
-
-# TRX Toolkit
-# CTRL interface implementation (OsmocomBB specific)
-#
-# (C) 2016-2017 by Vadim Yanitskiy <axilirator at gmail.com>
-#
-# All Rights Reserved
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-import logging as log
-
-from ctrl_if import CTRLInterface
-
-class CTRLInterfaceBB(CTRLInterface):
- # Internal state variables
- trx_started = False
- burst_fwd = None
- rx_freq = None
- tx_freq = None
- pm = None
-
- def __init__(self, *udp_link_args):
- CTRLInterface.__init__(self, *udp_link_args)
- log.info("Init CTRL interface for BB (%s)" % self.desc_link())
-
- def parse_cmd(self, request):
- # Power control
- if self.verify_cmd(request, "POWERON", 0):
- log.debug("Recv POWERON CMD")
-
- # Ensure transceiver isn't working
- if self.trx_started:
- log.error("Transceiver already started")
- return -1
-
- # Ensure RX / TX freq. are set
- if (self.rx_freq is None) or (self.tx_freq is None):
- log.error("RX / TX freq. are not set")
- return -1
-
- log.info("Starting transceiver...")
- self.trx_started = True
- return 0
-
- elif self.verify_cmd(request, "POWEROFF", 0):
- log.debug("Recv POWEROFF cmd")
-
- log.info("Stopping transceiver...")
- self.trx_started = False
- return 0
-
- # Tuning Control
- elif self.verify_cmd(request, "RXTUNE", 1):
- log.debug("Recv RXTUNE cmd")
-
- # TODO: check freq range
- self.rx_freq = int(request[1]) * 1000
- self.burst_fwd.bb_freq = self.rx_freq
- return 0
-
- elif self.verify_cmd(request, "TXTUNE", 1):
- log.debug("Recv TXTUNE cmd")
-
- # TODO: check freq range
- self.tx_freq = int(request[1]) * 1000
- return 0
-
- # Power measurement
- elif self.verify_cmd(request, "MEASURE", 1):
- log.debug("Recv MEASURE cmd")
-
- if self.pm is None:
- return -1
-
- # TODO: check freq range
- meas_freq = int(request[1]) * 1000
- meas_dbm = str(self.pm.measure(meas_freq))
-
- return (0, [meas_dbm])
-
- elif self.verify_cmd(request, "SETSLOT", 2):
- log.debug("Recv SETSLOT cmd")
-
- if self.burst_fwd is None:
- return -1
-
- # Obtain TS index
- ts = int(request[1])
- if ts not in range(0, 8):
- log.error("TS index should be in range: 0..7")
- return -1
-
- # Parse TS type
- ts_type = int(request[2])
-
- # TS activation / deactivation
- # We don't care about ts_type
- if ts_type == 0:
- # Deactivate TS (remove from TS pass-filter list)
- if ts in self.burst_fwd.ts_pass_list:
- self.burst_fwd.ts_pass_list.remove(ts)
- else:
- log.warning("TS %u was not activated before" % ts)
- # TODO: uncomment as soon as RESET is introduced
- # return -1
- else:
- # Activate TS (add to TS pass-filter list)
- if ts not in self.burst_fwd.ts_pass_list:
- self.burst_fwd.ts_pass_list.append(ts)
- else:
- log.warning("TS %u was already activated before" % ts)
- # TODO: uncomment as soon as RESET is introduced
- # return -1
-
- return 0
-
- # Timing Advance
- elif self.verify_cmd(request, "SETTA", 1):
- log.debug("Recv SETTA cmd")
-
- # Save to the BurstForwarder instance
- self.burst_fwd.ta = int(request[1])
- return 0
-
- # Timing of Arrival simulation for Uplink
- # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_TOA", 2):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.toa256_ul_base = int(request[1])
- self.burst_fwd.toa256_ul_threshold = int(request[2])
-
- return 0
-
- # Timing of Arrival simulation for Uplink
- # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_TOA", 1):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply delta
- self.burst_fwd.toa256_ul_base += int(request[1])
-
- return 0
-
- # RSSI simulation for Uplink
- # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_RSSI", 2):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.rssi_ul_base = int(request[1])
- self.burst_fwd.rssi_ul_threshold = int(request[2])
-
- return 0
-
- # RSSI simulation for Uplink
- # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_RSSI", 1):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply delta
- self.burst_fwd.rssi_ul_base += int(request[1])
-
- return 0
-
- # Path loss simulation for UL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT>
- # Dropping pattern: fn % 1 == 0
- elif self.verify_cmd(request, "FAKE_DROP", 1):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- self.burst_fwd.burst_ul_drop_amount = num
- self.burst_fwd.burst_ul_drop_period = 1
-
- return 0
-
- # Path loss simulation for UL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
- # Dropping pattern: fn % period == 0
- elif self.verify_cmd(request, "FAKE_DROP", 2):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- # Parse / validate period
- period = int(request[2])
- if period <= 0:
- log.error("FAKE_DROP period shall be greater than zero")
- return -1
-
- self.burst_fwd.burst_ul_drop_amount = num
- self.burst_fwd.burst_ul_drop_period = period
-
- return 0
-
- # Wrong / unknown command
- else:
- # We don't care about other commands,
- # so let's merely ignore them ;)
- log.debug("Ignore CMD %s" % request[0])
- return 0
diff --git a/src/target/trx_toolkit/ctrl_if_bts.py b/src/target/trx_toolkit/ctrl_if_bts.py
deleted file mode 100644
index cb38b67..0000000
--- a/src/target/trx_toolkit/ctrl_if_bts.py
+++ /dev/null
@@ -1,189 +0,0 @@
-#!/usr/bin/env python2
-# -*- coding: utf-8 -*-
-
-# TRX Toolkit
-# CTRL interface implementation (OsmoBTS specific)
-#
-# (C) 2016-2017 by Vadim Yanitskiy <axilirator at gmail.com>
-#
-# All Rights Reserved
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-import logging as log
-
-from ctrl_if import CTRLInterface
-
-class CTRLInterfaceBTS(CTRLInterface):
- # Internal state variables
- trx_started = False
- burst_fwd = None
- clck_gen = None
- rx_freq = None
- tx_freq = None
- pm = None
-
- def __init__(self, *udp_link_args):
- CTRLInterface.__init__(self, *udp_link_args)
- log.info("Init CTRL interface for BTS (%s)" % self.desc_link())
-
- def parse_cmd(self, request):
- # Power control
- if self.verify_cmd(request, "POWERON", 0):
- log.debug("Recv POWERON CMD")
-
- # Ensure transceiver isn't working
- if self.trx_started:
- log.error("Transceiver already started")
- return -1
-
- # Ensure RX / TX freq. are set
- if (self.rx_freq is None) or (self.tx_freq is None):
- log.error("RX / TX freq. are not set")
- return -1
-
- log.info("Starting transceiver...")
- self.trx_started = True
-
- # Power emulation
- if self.pm is not None:
- self.pm.add_bts_list([self.tx_freq])
-
- # Start clock indications
- if self.clck_gen is not None:
- self.clck_gen.start()
-
- return 0
-
- elif self.verify_cmd(request, "POWEROFF", 0):
- log.debug("Recv POWEROFF cmd")
-
- log.info("Stopping transceiver...")
- self.trx_started = False
-
- # Power emulation
- if self.pm is not None:
- self.pm.del_bts_list([self.tx_freq])
-
- # Stop clock indications
- if self.clck_gen is not None:
- self.clck_gen.stop()
-
- return 0
-
- # Tuning Control
- elif self.verify_cmd(request, "RXTUNE", 1):
- log.debug("Recv RXTUNE cmd")
-
- # TODO: check freq range
- self.rx_freq = int(request[1]) * 1000
- return 0
-
- elif self.verify_cmd(request, "TXTUNE", 1):
- log.debug("Recv TXTUNE cmd")
-
- # TODO: check freq range
- self.tx_freq = int(request[1]) * 1000
- self.burst_fwd.bts_freq = self.tx_freq
- return 0
-
- # Timing of Arrival simulation for Downlink
- # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_TOA", 2):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.toa256_dl_base = int(request[1])
- self.burst_fwd.toa256_dl_threshold = int(request[2])
-
- return 0
-
- # Timing of Arrival simulation for Downlink
- # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_TOA", 1):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply delta
- self.burst_fwd.toa256_dl_base += int(request[1])
-
- return 0
-
- # RSSI simulation for Downlink
- # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_RSSI", 2):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.rssi_dl_base = int(request[1])
- self.burst_fwd.rssi_dl_threshold = int(request[2])
-
- return 0
-
- # RSSI simulation for Downlink
- # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_RSSI", 1):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply delta
- self.burst_fwd.rssi_dl_base += int(request[1])
-
- return 0
-
- # Path loss simulation for DL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT>
- # Dropping pattern: fn % 1 == 0
- elif self.verify_cmd(request, "FAKE_DROP", 1):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- self.burst_fwd.burst_dl_drop_amount = num
- self.burst_fwd.burst_dl_drop_period = 1
-
- return 0
-
- # Path loss simulation for DL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
- # Dropping pattern: fn % period == 0
- elif self.verify_cmd(request, "FAKE_DROP", 2):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- # Parse / validate period
- period = int(request[2])
- if period <= 0:
- log.error("FAKE_DROP period shall be greater than zero")
- return -1
-
- self.burst_fwd.burst_dl_drop_amount = num
- self.burst_fwd.burst_dl_drop_period = period
-
- return 0
-
- # Wrong / unknown command
- else:
- # We don't care about other commands,
- # so let's merely ignore them ;)
- log.debug("Ignore CMD %s" % request[0])
- return 0
diff --git a/src/target/trx_toolkit/ctrl_if_trx.py b/src/target/trx_toolkit/ctrl_if_trx.py
new file mode 100644
index 0000000..1ab1990
--- /dev/null
+++ b/src/target/trx_toolkit/ctrl_if_trx.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# CTRL interface implementation (common commands)
+#
+# (C) 2016-2018 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import logging as log
+
+from ctrl_if import CTRLInterface
+
+class CTRLInterfaceTRX(CTRLInterface):
+ """
+
+ """
+
+ def __init__(self, trx, *udp_link_args):
+ CTRLInterface.__init__(self, *udp_link_args)
+ log.info("Init CTRL interface (%s)" % self.desc_link())
+
+ # Link with Transceiver instance we belong to
+ self.trx = trx
+
+ def parse_cmd(self, request):
+ # Custom command handlers (prioritized)
+ res = self.trx.ctrl_cmd_handler(request)
+ if res is not None:
+ return res
+
+ # Power control
+ if self.verify_cmd(request, "POWERON", 0):
+ log.debug("Recv POWERON CMD")
+
+ # Ensure transceiver isn't working
+ if self.trx.running:
+ log.error("Transceiver already started")
+ return -1
+
+ # Ensure RX / TX freq. are set
+ if (self.trx.rx_freq is None) or (self.trx.tx_freq is None):
+ log.error("RX / TX freq. are not set")
+ return -1
+
+ log.info("Starting transceiver...")
+ self.trx.running = True
+
+ # Notify transceiver about that
+ self.trx.poweron_event_handler()
+
+ return 0
+
+ elif self.verify_cmd(request, "POWEROFF", 0):
+ log.debug("Recv POWEROFF cmd")
+
+ log.info("Stopping transceiver...")
+ self.trx.running = False
+
+ # Notify transceiver about that
+ self.trx.poweroff_event_handler()
+
+ return 0
+
+ # Tuning Control
+ elif self.verify_cmd(request, "RXTUNE", 1):
+ log.debug("Recv RXTUNE cmd")
+
+ # TODO: check freq range
+ self.trx.rx_freq = int(request[1]) * 1000
+ return 0
+
+ elif self.verify_cmd(request, "TXTUNE", 1):
+ log.debug("Recv TXTUNE cmd")
+
+ # TODO: check freq range
+ self.trx.tx_freq = int(request[1]) * 1000
+ return 0
+
+ elif self.verify_cmd(request, "SETSLOT", 2):
+ log.debug("Recv SETSLOT cmd")
+
+ # Obtain TS index
+ ts = int(request[1])
+ if ts not in range(0, 8):
+ log.error("TS index should be in range: 0..7")
+ return -1
+
+ # Parse TS type
+ ts_type = int(request[2])
+
+ # TS activation / deactivation
+ # We don't care about ts_type
+ if ts_type == 0:
+ # Deactivate TS (remove from the list of active timeslots)
+ if ts in self.trx.ts_list:
+ self.trx.ts_list.remove(ts)
+ else:
+ log.warning("TS %u was not activated before" % ts)
+ # TODO: uncomment as soon as RESET is introduced
+ # return -1
+ else:
+ # Activate TS (add to the list of active timeslots)
+ if ts not in self.trx.ts_list:
+ self.trx.ts_list.append(ts)
+ else:
+ log.warning("TS %u was already activated before" % ts)
+ # TODO: uncomment as soon as RESET is introduced
+ # return -1
+
+ return 0
+
+ # Wrong / unknown command
+ else:
+ # We don't care about other commands,
+ # so let's merely ignore them ;)
+ log.debug("Ignore CMD %s" % request[0])
+ return 0
diff --git a/src/target/trx_toolkit/fake_pm.py b/src/target/trx_toolkit/fake_pm.py
index 840b4e4..b2b176f 100644
--- a/src/target/trx_toolkit/fake_pm.py
+++ b/src/target/trx_toolkit/fake_pm.py
@@ -2,9 +2,9 @@
# -*- coding: utf-8 -*-
# TRX Toolkit
-# Power measurement emulation for BB
+# Power measurement emulation
#
-# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator at gmail.com>
#
# All Rights Reserved
#
@@ -25,29 +25,36 @@
from random import randint
class FakePM:
- # Freq. list for good power level
- bts_list = []
+ def __init__(self, trx_list, noise_min, noise_max, trx_min, trx_max):
+ # List of Transceiver instances
+ self.trx_list = trx_list
- def __init__(self, noise_min, noise_max, bts_min, bts_max):
# Save power level ranges
self.noise_min = noise_min
self.noise_max = noise_max
- self.bts_min = bts_min
- self.bts_max = bts_max
+ self.trx_min = trx_min
+ self.trx_max = trx_max
- def measure(self, bts):
- if bts in self.bts_list:
- return randint(self.bts_min, self.bts_max)
- else:
- return randint(self.noise_min, self.noise_max)
+ @property
+ def rssi_noise(self):
+ return randint(self.noise_min, self.noise_max)
- def update_bts_list(self, new_list):
- self.bts_list = new_list
+ @property
+ def rssi_trx(self):
+ return randint(self.trx_min, self.trx_max)
- def add_bts_list(self, add_list):
- self.bts_list += add_list
+ def measure(self, freq):
+ # Iterate over all known transceivers
+ for trx in self.trx_list:
+ if not trx.running:
+ continue
- def del_bts_list(self, del_list):
- for item in del_list:
- if item in self.bts_list:
- self.bts_list.remove(item)
+ # FIXME: we should average the rate of bursts
+ # and indicated power / attenuation values for
+ # all known transceivers
+
+ # Match given frequency
+ if trx.tx_freq == freq:
+ return self.rssi_trx
+
+ return self.rssi_noise
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 95261df..3068f83 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -31,13 +31,250 @@
import sys
from app_common import ApplicationBase
-from ctrl_if_bts import CTRLInterfaceBTS
-from ctrl_if_bb import CTRLInterfaceBB
from burst_fwd import BurstForwarder
+from transceiver import Transceiver
from fake_pm import FakePM
from udp_link import UDPLink
-from clck_gen import CLCKGen
+from clck_router_trx import CLCKRouterTRX
+
+class FakeTRX(Transceiver):
+ """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
+
+ == ToA / RSSI measurement simulation
+
+ Since this is a virtual environment, we can simulate different
+ parameters of the virtual RF interface:
+
+ - ToA (Timing of Arrival) - measured difference between expected
+ and actual time of burst arrival in units of 1/256 of GSM symbol
+ periods. A pair of both base and threshold values defines a range
+ of ToA value randomization:
+
+ from (toa256_base - toa256_rand_threshold)
+ to (toa256_base + toa256_rand_threshold).
+
+ - RSSI (Received Signal Strength Indication) - measured "power" of
+ the signal (per burst) in dBm. A pair of both base and threshold
+ values defines a range of RSSI value randomization:
+
+ from (rssi_base - rssi_rand_threshold)
+ to (rssi_base + rssi_rand_threshold).
+
+ Please note that the randomization of both RSSI and ToA
+ is optional, and can be enabled from the control interface.
+
+ === Timing Advance handling
+
+ The BTS is using ToA measurements for UL bursts in order to calculate
+ Timing Advance value, that is then indicated to a MS, which in its turn
+ shall apply this value to the transmitted signal in order to compensate
+ the delay. Basically, every burst is transmitted in advance defined by
+ the indicated Timing Advance value. The valid range is 0..63, where
+ each unit means one GSM symbol advance. The actual Timing Advance value
+ is set using SETTA control command from MS. By default, it's set to 0.
+
+ === Path loss simulation - burst dropping
+
+ In some cases, e.g. due to a weak signal or high interference, a burst
+ can be lost, i.e. not detected by the receiver. This can also be
+ simulated using FAKE_DROP command on the control interface:
+
+ - burst_drop_amount - the amount of DL/UL bursts
+ to be dropped (i.e. not forwarded towards the MS/BTS),
+
+ - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
+
+ TODO: add some notes about custom CTRL command handler
+
+ """
+
+ def __init__(self, *transceiver_args):
+ Transceiver.__init__(self, *transceiver_args)
+
+ # Actual ToA / RSSI / TA values
+ self.toa256_base = 0
+ self.rssi_base = -60
+ self.ta = 0
+
+ # ToA / RSSI randomization threshold
+ self.toa256_rand_threshold = 0
+ self.rssi_rand_threshold = 0
+
+ # Path loss simulation (burst dropping)
+ self.burst_drop_amount = 0
+ self.burst_drop_period = 1
+
+ @property
+ def toa256(self):
+ # Check if randomization is required
+ if self.toa256_rand_threshold is 0:
+ return self.toa256_base
+
+ # Generate a random ToA value in required range
+ toa256_min = self.toa256_base - self.toa256_rand_threshold
+ toa256_max = self.toa256_base + self.toa256_rand_threshold
+ return random.randint(toa256_min, toa256_max)
+
+ @property
+ def rssi(self):
+ # Check if randomization is required
+ if self.rssi_rand_threshold is 0:
+ return self.rssi_base
+
+ # Generate a random RSSI value in required range
+ rssi_min = self.rssi_base - self.rssi_rand_threshold
+ rssi_max = self.rssi_base + self.rssi_rand_threshold
+ return random.randint(rssi_min, rssi_max)
+
+ # Path loss simulation: burst dropping
+ # Returns: True - drop, False - keep
+ def sim_burst_drop(self, msg):
+ # Check if dropping is required
+ if self.burst_drop_amount is 0:
+ return False
+
+ if msg.fn % self.burst_drop_period == 0:
+ log.info("Simulation: dropping burst (fn=%u %% %u == 0)"
+ % (msg.fn, self.burst_drop_period))
+ self.burst_drop_amount -= 1
+ return True
+
+ return False
+
+ def send_data_msg(self, msg):
+ # Complete message header
+ msg.toa256 = self.toa256
+ msg.rssi = self.rssi
+
+ # Apply optional Timing Advance
+ if self.ta is not 0:
+ msg.toa256 -= self.ta * 256
+
+ # TODO: make legacy mode configurable (via argv?)
+ self.data_if.send_msg(msg, legacy = True)
+
+ # Triggered by POWERON command on CTRL
+ def poweron_event_handler(self):
+ log.debug("Recv POWERON event")
+ self.clck_router.poke(self)
+
+ # Triggered by POWEROFF command on CTRL
+ def poweroff_event_handler(self):
+ log.debug("Recv POWEROFF event")
+ self.clck_router.poke(self)
+
+ # FakeTRX specific CTRL command handler
+ def ctrl_cmd_handler(self, request):
+ # Timing of Arrival simulation
+ # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
+ if self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply both base and threshold
+ self.toa256_base = int(request[1])
+ self.toa256_rand_threshold = int(request[2])
+ return 0
+
+ # Timing of Arrival simulation
+ # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply delta
+ self.toa256_base += int(request[1])
+ return 0
+
+ # RSSI simulation
+ # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply both base and threshold
+ self.rssi_base = int(request[1])
+ self.rssi_rand_threshold = int(request[2])
+ return 0
+
+ # RSSI simulation
+ # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply delta
+ self.rssi_base += int(request[1])
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT>
+ # Dropping pattern: fn % 1 == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = 1
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
+ # Dropping pattern: fn % period == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ # Parse / validate period
+ period = int(request[2])
+ if period <= 0:
+ log.error("FAKE_DROP period shall be greater than zero")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = period
+ return 0
+
+ # Unhandled command
+ return None
+
+class FakeTRX_BTS(FakeTRX):
+ pass
+
+class FakeTRX_BB(FakeTRX):
+ # BB specific CTRL command handler
+ def ctrl_cmd_handler(self, request):
+ res = FakeTRX.ctrl_cmd_handler(self, request)
+ if res is not None:
+ return res
+
+ # Power measurement
+ if self.ctrl_if.verify_cmd(request, "MEASURE", 1):
+ log.debug("Recv MEASURE cmd")
+
+ # TODO: check freq range
+ meas_freq = int(request[1]) * 1000
+ meas_dbm = self.pm.measure(meas_freq)
+
+ return (0, [str(meas_dbm)])
+
+ # Timing Advance
+ elif self.ctrl_if.verify_cmd(request, "SETTA", 1):
+ log.debug("Recv SETTA cmd")
+
+ # Store indicated value
+ self.trx.ta = int(request[1])
+ return 0
+
+ # Unhandled command
+ return None
class Application(ApplicationBase):
def __init__(self):
@@ -51,78 +288,65 @@
self.app_init_logging(self.argv)
def run(self):
- # Init TRX CTRL interface for BTS
- self.bts_ctrl = CTRLInterfaceBTS(
- self.argv.bts_addr, self.argv.bts_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 1)
+ # Init TRX instance for BTS
+ self.bts_trx = FakeTRX_BTS(self.argv.trx_bind_addr,
+ self.argv.bts_addr, self.argv.bts_base_port)
- # Init TRX CTRL interface for BB
- self.bb_ctrl = CTRLInterfaceBB(
- self.argv.bb_addr, self.argv.bb_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 1)
+ # Init TRX instance for BB
+ self.bb_trx = FakeTRX_BB(self.argv.trx_bind_addr,
+ self.argv.bb_addr, self.argv.bb_base_port)
+
+ # Burst forwarding between transceivers
+ self.burst_fwd = BurstForwarder()
+ self.burst_fwd.add_trx(self.bts_trx)
+ self.burst_fwd.add_trx(self.bb_trx)
# Power measurement emulation
# Noise: -120 .. -105
# BTS: -75 .. -50
- self.pm = FakePM(-120, -105, -75, -50)
+ self.pm = FakePM(self.burst_fwd.trx_list, -120, -105, -75, -50)
+ self.bb_trx.pm = self.pm
- # Share a FakePM instance between both BTS and BB
- self.bts_ctrl.pm = self.pm
- self.bb_ctrl.pm = self.pm
-
- # Init DATA links
- self.bts_data = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 2)
- self.bb_data = UDPLink(
- self.argv.bb_addr, self.argv.bb_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 2)
-
- # BTS <-> BB burst forwarding
- self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data)
-
- # Share a BurstForwarder instance between BTS and BB
- self.bts_ctrl.burst_fwd = self.burst_fwd
- self.bb_ctrl.burst_fwd = self.burst_fwd
-
- # Provide clock to BTS
- self.bts_clck = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 100,
- self.argv.trx_bind_addr, self.argv.bts_base_port)
- self.clck_gen = CLCKGen([self.bts_clck])
- self.bts_ctrl.clck_gen = self.clck_gen
+ # Init clock router
+ self.clck_router = CLCKRouterTRX(self.burst_fwd.trx_list)
+ self.bts_trx.clck_router = self.clck_router
+ self.bb_trx.clck_router = self.clck_router
log.info("Init complete")
# Enter main loop
while True:
- socks = [self.bts_ctrl.sock, self.bb_ctrl.sock,
- self.bts_data.sock, self.bb_data.sock]
+ socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock,
+ self.bts_trx.data_if.sock, self.bb_trx.data_if.sock]
# Wait until we get any data on any socket
r_event, w_event, x_event = select.select(socks, [], [])
# Downlink: BTS -> BB
- if self.bts_data.sock in r_event:
- self.burst_fwd.bts2bb()
+ if self.bts_trx.data_if.sock in r_event:
+ msg = self.bts_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bts_trx, msg)
# Uplink: BB -> BTS
- if self.bb_data.sock in r_event:
- self.burst_fwd.bb2bts()
+ if self.bb_trx.data_if.sock in r_event:
+ msg = self.bb_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bb_trx, msg)
# CTRL commands from BTS
- if self.bts_ctrl.sock in r_event:
- self.bts_ctrl.handle_rx()
+ if self.bts_trx.ctrl_if.sock in r_event:
+ self.bts_trx.ctrl_if.handle_rx()
# CTRL commands from BB
- if self.bb_ctrl.sock in r_event:
- self.bb_ctrl.handle_rx()
+ if self.bb_trx.ctrl_if.sock in r_event:
+ self.bb_trx.ctrl_if.handle_rx()
def shutdown(self):
log.info("Shutting down...")
# Stop clock generator
- self.clck_gen.stop()
+ self.clck_router.stop()
def parse_argv(self):
parser = argparse.ArgumentParser(prog = "fake_trx",
diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py
new file mode 100644
index 0000000..f83d88c
--- /dev/null
+++ b/src/target/trx_toolkit/transceiver.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Transceiver implementation
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator at gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import logging as log
+
+from ctrl_if_trx import CTRLInterfaceTRX
+from data_if import DATAInterface
+from udp_link import UDPLink
+from data_msg import *
+
+class Transceiver:
+ """ Base transceiver class implementation.
+
+ Represents a single transceiver, that can be used as for the BTS side,
+ as for the MS side. Usually, a single transceiver has DATA, CTRL and
+ CLCK interfaces, but since clock interface is not used by the MS side
+ at the moment, it is not being initialized.
+
+ """
+
+ def __init__(self, bind_addr, remote_addr, base_port):
+ # Init DATA interface
+ self.data_if = DATAInterface(
+ remote_addr, base_port + 102,
+ bind_addr, base_port + 2)
+
+ # Init CTRL interface
+ self.ctrl_if = CTRLInterfaceTRX(self,
+ remote_addr, base_port + 101,
+ bind_addr, base_port + 1)
+
+ # Init CLCK interface
+ self.clck_if = UDPLink(
+ remote_addr, base_port + 100,
+ bind_addr, base_port)
+
+ # Connection info
+ self.remote_addr = remote_addr
+ self.bind_addr = bind_addr
+ self.base_port = base_port
+
+ # Internal state
+ self.running = False
+
+ # Actual RX / TX frequencies
+ self.rx_freq = None
+ self.tx_freq = None
+
+ # List of active (configured) timeslots
+ self.ts_list = []
+
+ def recv_data_msg(self):
+ # Read and parse data from socket
+ msg = self.data_if.recv_l12trx_msg()
+ if not msg:
+ return None
+
+ # Make sure that transceiver is configured and running
+ if not self.running:
+ log.warning("RX DATA message (%s), but transceiver "
+ "is not running => dropping..." % msg.desc_hdr())
+ return None
+
+ # Make sure that indicated timeslot is configured
+ if msg.tn not in self.ts_list:
+ log.warning("RX DATA message (%s), but timeslot "
+ "is not configured => dropping..." % msg.desc_hdr())
+ return None
+
+ return msg
--
To view, visit https://gerrit.osmocom.org/12264
To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings
Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ice44e2b22566b3652ef6d43896055963b13ab185
Gerrit-Change-Number: 12264
Gerrit-PatchSet: 1
Gerrit-Owner: Vadim Yanitskiy <axilirator at gmail.com>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20181211/b00695ef/attachment.htm>