This is merely a historical archive of years 2008-2021, before the migration to mailman3.
A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.
Vadim Yanitskiy gerrit-no-reply at lists.osmocom.orgVadim Yanitskiy has uploaded this change for review. ( https://gerrit.osmocom.org/12264 Change subject: (WIP) trx_toolkit/fake_trx.py: refactor global class hierarchy ...................................................................... (WIP) trx_toolkit/fake_trx.py: refactor global class hierarchy Change-Id: Ice44e2b22566b3652ef6d43896055963b13ab185 --- M src/target/trx_toolkit/burst_fwd.py A src/target/trx_toolkit/clck_router_trx.py D src/target/trx_toolkit/ctrl_if_bb.py D src/target/trx_toolkit/ctrl_if_bts.py A src/target/trx_toolkit/ctrl_if_trx.py M src/target/trx_toolkit/fake_pm.py M src/target/trx_toolkit/fake_trx.py A src/target/trx_toolkit/transceiver.py 8 files changed, 614 insertions(+), 786 deletions(-) git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/64/12264/1 diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py index 3cb6acd..3a83753 100644 --- a/src/target/trx_toolkit/burst_fwd.py +++ b/src/target/trx_toolkit/burst_fwd.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # TRX Toolkit -# BTS <-> BB burst forwarding +# Burst forwarding between transceivers # # (C) 2017-2018 by Vadim Yanitskiy <axilirator at gmail.com> # @@ -23,321 +23,58 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import logging as log -import random from data_msg import * class BurstForwarder: - """ Performs burst forwarding and preprocessing between MS and BTS. + """ Performs burst forwarding between transceivers. - == Pass-filtering parameters + BurstForwarder dispatches bursts between the list of given + FakeTRX (Transceiver) instances depending on the following + parameters of each transceiver: - BurstForwarder may drop or pass an UL/DL burst depending - on the following parameters: - - - bts_freq / bb_freq - the current BTS / MS frequency - that was set using RXTUNE control command. By default, - both freq. values are set to None, so nothing is being - forwarded (i.e. bursts are getting dropped). - - FIXME: currently, we don't care about TXTUNE command - and transmit frequencies. It would be great to distinguish - between RX and TX frequencies for both BTS and MS. - - - ts_pass_list - the list of active (i.e. configured) - timeslot numbers for the MS. A timeslot can be activated - or deactivated using SETSLOT control command from the MS. - - FIXME: there is no such list for the BTS side. - - == Preprocessing and measurement simulation - - Since this is a virtual environment, we can simulate different - parameters of a virtual RF interface: - - - ToA (Timing of Arrival) - measured difference between expected - and actual time of burst arrival in units of 1/256 of GSM symbol - periods. A pair of both base and threshold values defines a range - of ToA value randomization: - - DL: from (toa256_dl_base - toa256_dl_threshold) - to (toa256_dl_base + toa256_dl_threshold), - UL: from (toa256_ul_base - toa256_ul_threshold) - to (toa256_ul_base + toa256_ul_threshold). - - - RSSI (Received Signal Strength Indication) - measured "power" of - the signal (per burst) in dBm. A pair of both base and threshold - values defines a range of RSSI value randomization: - - DL: from (rssi_dl_base - rssi_dl_threshold) - to (rssi_dl_base + rssi_dl_threshold), - UL: from (rssi_ul_base - rssi_ul_threshold) - to (rssi_ul_base + rssi_ul_threshold). - - Please note that the randomization of both RSSI and ToA - is optional, and can be enabled from the control interface. - - === Timing Advance handling - - The BTS is using ToA measurements for UL bursts in order to calculate - Timing Advance value, that is then indicated to a MS, which in its turn - shall apply this value to the transmitted signal in order to compensate - the delay. Basically, every burst is transmitted in advance defined by - the indicated Timing Advance value. The valid range is 0..63, where - each unit means one GSM symbol advance. The actual Timing Advance value - is set using SETTA control command from MS. By default, it's set to 0. - - === Path loss simulation - burst dropping - - In some cases, e.g. due to a weak signal or high interference, a burst - can be lost, i.e. not detected by the receiver. This can also be - simulated using FAKE_DROP command on both control interfaces: - - - burst_{dl|ul}_drop_amount - the amount of DL/UL bursts - to be dropped (i.e. not forwarded towards the MS/BTS), - - - burst_{dl|ul}_drop_period - drop every X DL/UL burst, e.g. - 1 - drop every consequent burst, 2 - drop every second burst, etc. + - execution state (running or idle), + - actual RX / TX frequencies, + - list of active timeslots. """ - def __init__(self, bts_link, bb_link): - self.bts_link = bts_link - self.bb_link = bb_link + def __init__(self, trx_list = []): + # List of Transceiver instances + self.trx_list = trx_list - # Init default parameters - self.reset_dl() - self.reset_ul() + def add_trx(self, trx): + if trx in self.trx_list: + log.error("TRX is already in the list") + return - # Initialize (or reset to) default parameters for Downlink - def reset_dl(self): - # Unset current DL freq. - self.bts_freq = None + self.trx_list.append(trx) - # Indicated RSSI / ToA values - self.toa256_dl_base = 0 - self.rssi_dl_base = -60 + def del_trx(self, trx): + if trx not in self.trx_list: + log.error("TRX is not in the list") + return - # RSSI / ToA randomization threshold - self.toa256_dl_threshold = 0 - self.rssi_dl_threshold = 0 + self.trx_list.remove(trx) - # Path loss simulation (burst dropping) - self.burst_dl_drop_amount = 0 - self.burst_dl_drop_period = 1 + def forward_msg(self, src_trx, rx_msg): + # Transform from L12TRX to TRX2L1 + tx_msg = rx_msg.gen_trx2l1() + if tx_msg is None: + log.error("Forwarding failed, could not transform " + "message (%s) => dropping..." % rx_msg.desc_hdr()) - # Initialize (or reset to) default parameters for Uplink - def reset_ul(self): - # Unset current DL freq. - self.bb_freq = None + # Iterate over all known transceivers + for trx in self.trx_list: + if trx == src_trx: + continue - # Indicated RSSI / ToA values - self.rssi_ul_base = -70 - self.toa256_ul_base = 0 + # Check transceiver state + if not trx.running: + continue + if trx.rx_freq != src_trx.tx_freq: + continue + if tx_msg.tn not in trx.ts_list: + continue - # RSSI / ToA randomization threshold - self.toa256_ul_threshold = 0 - self.rssi_ul_threshold = 0 - - # Path loss simulation (burst dropping) - self.burst_ul_drop_amount = 0 - self.burst_ul_drop_period = 1 - - # Init timeslot filter (drop everything by default) - self.ts_pass_list = [] - - # Reset Timing Advance value - self.ta = 0 - - # Converts TA value from symbols to - # units of 1/256 of GSM symbol periods - def calc_ta256(self): - return self.ta * 256 - - # Calculates a random ToA value for Downlink bursts - def calc_dl_toa256(self): - # Check if randomization is required - if self.toa256_dl_threshold is 0: - return self.toa256_dl_base - - # Calculate a range for randomization - toa256_min = self.toa256_dl_base - self.toa256_dl_threshold - toa256_max = self.toa256_dl_base + self.toa256_dl_threshold - - # Generate a random ToA value - toa256 = random.randint(toa256_min, toa256_max) - - return toa256 - - # Calculates a random ToA value for Uplink bursts - def calc_ul_toa256(self): - # Check if randomization is required - if self.toa256_ul_threshold is 0: - return self.toa256_ul_base - - # Calculate a range for randomization - toa256_min = self.toa256_ul_base - self.toa256_ul_threshold - toa256_max = self.toa256_ul_base + self.toa256_ul_threshold - - # Generate a random ToA value - toa256 = random.randint(toa256_min, toa256_max) - - return toa256 - - # Calculates a random RSSI value for Downlink bursts - def calc_dl_rssi(self): - # Check if randomization is required - if self.rssi_dl_threshold is 0: - return self.rssi_dl_base - - # Calculate a range for randomization - rssi_min = self.rssi_dl_base - self.rssi_dl_threshold - rssi_max = self.rssi_dl_base + self.rssi_dl_threshold - - # Generate a random RSSI value - return random.randint(rssi_min, rssi_max) - - # Calculates a random RSSI value for Uplink bursts - def calc_ul_rssi(self): - # Check if randomization is required - if self.rssi_ul_threshold is 0: - return self.rssi_ul_base - - # Calculate a range for randomization - rssi_min = self.rssi_ul_base - self.rssi_ul_threshold - rssi_max = self.rssi_ul_base + self.rssi_ul_threshold - - # Generate a random RSSI value - return random.randint(rssi_min, rssi_max) - - # DL path loss simulation - def path_loss_sim_dl(self, msg): - # Burst dropping - if self.burst_dl_drop_amount > 0: - if msg.fn % self.burst_dl_drop_period == 0: - log.info("Simulation: dropping DL burst (fn=%u %% %u == 0)" - % (msg.fn, self.burst_dl_drop_period)) - self.burst_dl_drop_amount -= 1 - return None - - return msg - - # UL path loss simulation - def path_loss_sim_ul(self, msg): - # Burst dropping - if self.burst_ul_drop_amount > 0: - if msg.fn % self.burst_ul_drop_period == 0: - log.info("Simulation: dropping UL burst (fn=%u %% %u == 0)" - % (msg.fn, self.burst_ul_drop_period)) - self.burst_ul_drop_amount -= 1 - return None - - return msg - - # DL burst preprocessing - def preprocess_dl_burst(self, msg): - # Calculate both RSSI and ToA values - msg.toa256 = self.calc_dl_toa256() - msg.rssi = self.calc_dl_rssi() - - # UL burst preprocessing - def preprocess_ul_burst(self, msg): - # Calculate both RSSI and ToA values, - # also apply Timing Advance - msg.toa256 = self.calc_ul_toa256() - msg.toa256 -= self.calc_ta256() - msg.rssi = self.calc_ul_rssi() - - # Converts a L12TRX message to TRX2L1 message - def transform_msg(self, msg_raw): - # Attempt to parse a message - try: - msg_l12trx = DATAMSG_L12TRX() - msg_l12trx.parse_msg(bytearray(msg_raw)) - except: - log.error("Dropping unhandled DL message...") - return None - - # Compose a new message for L1 - return msg_l12trx.gen_trx2l1() - - # Downlink handler: BTS -> BB - def bts2bb(self): - # Read data from socket - data, addr = self.bts_link.sock.recvfrom(512) - - # BB is not connected / tuned - if self.bb_freq is None: - return None - - # Freq. filter - if self.bb_freq != self.bts_freq: - return None - - # Process a message - msg = self.transform_msg(data) - if msg is None: - return None - - # Timeslot filter - if msg.tn not in self.ts_pass_list: - return None - - # Path loss simulation - msg = self.path_loss_sim_dl(msg) - if msg is None: - return None - - # Burst preprocessing - self.preprocess_dl_burst(msg) - - # Validate and generate the payload - payload = msg.gen_msg() - - # Append two unused bytes at the end - # in order to keep the compatibility - payload += bytearray(2) - - # Send burst to BB - self.bb_link.send(payload) - - # Uplink handler: BB -> BTS - def bb2bts(self): - # Read data from socket - data, addr = self.bb_link.sock.recvfrom(512) - - # BTS is not connected / tuned - if self.bts_freq is None: - return None - - # Freq. filter - if self.bb_freq != self.bts_freq: - return None - - # Process a message - msg = self.transform_msg(data) - if msg is None: - return None - - # Timeslot filter - if msg.tn not in self.ts_pass_list: - log.warning("TS %u is not configured, dropping UL burst..." % msg.tn) - return None - - # Path loss simulation - msg = self.path_loss_sim_ul(msg) - if msg is None: - return None - - # Burst preprocessing - self.preprocess_ul_burst(msg) - - # Validate and generate the payload - payload = msg.gen_msg() - - # Append two unused bytes at the end - # in order to keep the compatibility - payload += bytearray(2) - - # Send burst to BTS - self.bts_link.send(payload) + trx.send_data_msg(tx_msg) diff --git a/src/target/trx_toolkit/clck_router_trx.py b/src/target/trx_toolkit/clck_router_trx.py new file mode 100755 index 0000000..c85286b --- /dev/null +++ b/src/target/trx_toolkit/clck_router_trx.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Clock distribution router for transceivers +# +# (C) 2018 by Vadim Yanitskiy <axilirator at gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import logging as log + +from clck_gen import CLCKGen + +class CLCKRouterTRX(CLCKGen): + def __init__(self, trx_list, *clck_gen_argv): + CLCKGen.__init__(self, [], *clck_gen_argv) + self.trx_list = trx_list + + def poke(self, trx): + log.debug("CLCKRouterTRX poke!") + + if (trx.clck_if not in self.clck_links) and trx.running: + # Transceiver was started + log.debug("Transceiver was started") + self.clck_links.append(trx.clck_if) + elif (trx.clck_if in self.clck_links) and not trx.running: + # Transceiver was stopped + log.debug("Transceiver was stopped") + self.clck_links.remove(trx.clck_if) + + # Start / stop clock generator + if (len(self.clck_links) > 0) and self.timer is None: + log.debug("Starting clock generator") + self.start() + elif (len(self.clck_links) is 0) and self.timer is not None: + log.debug("Stopping clock generator") + self.stop() diff --git a/src/target/trx_toolkit/ctrl_if_bb.py b/src/target/trx_toolkit/ctrl_if_bb.py deleted file mode 100644 index fe7f3e8..0000000 --- a/src/target/trx_toolkit/ctrl_if_bb.py +++ /dev/null @@ -1,227 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding: utf-8 -*- - -# TRX Toolkit -# CTRL interface implementation (OsmocomBB specific) -# -# (C) 2016-2017 by Vadim Yanitskiy <axilirator at gmail.com> -# -# All Rights Reserved -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -import logging as log - -from ctrl_if import CTRLInterface - -class CTRLInterfaceBB(CTRLInterface): - # Internal state variables - trx_started = False - burst_fwd = None - rx_freq = None - tx_freq = None - pm = None - - def __init__(self, *udp_link_args): - CTRLInterface.__init__(self, *udp_link_args) - log.info("Init CTRL interface for BB (%s)" % self.desc_link()) - - def parse_cmd(self, request): - # Power control - if self.verify_cmd(request, "POWERON", 0): - log.debug("Recv POWERON CMD") - - # Ensure transceiver isn't working - if self.trx_started: - log.error("Transceiver already started") - return -1 - - # Ensure RX / TX freq. are set - if (self.rx_freq is None) or (self.tx_freq is None): - log.error("RX / TX freq. are not set") - return -1 - - log.info("Starting transceiver...") - self.trx_started = True - return 0 - - elif self.verify_cmd(request, "POWEROFF", 0): - log.debug("Recv POWEROFF cmd") - - log.info("Stopping transceiver...") - self.trx_started = False - return 0 - - # Tuning Control - elif self.verify_cmd(request, "RXTUNE", 1): - log.debug("Recv RXTUNE cmd") - - # TODO: check freq range - self.rx_freq = int(request[1]) * 1000 - self.burst_fwd.bb_freq = self.rx_freq - return 0 - - elif self.verify_cmd(request, "TXTUNE", 1): - log.debug("Recv TXTUNE cmd") - - # TODO: check freq range - self.tx_freq = int(request[1]) * 1000 - return 0 - - # Power measurement - elif self.verify_cmd(request, "MEASURE", 1): - log.debug("Recv MEASURE cmd") - - if self.pm is None: - return -1 - - # TODO: check freq range - meas_freq = int(request[1]) * 1000 - meas_dbm = str(self.pm.measure(meas_freq)) - - return (0, [meas_dbm]) - - elif self.verify_cmd(request, "SETSLOT", 2): - log.debug("Recv SETSLOT cmd") - - if self.burst_fwd is None: - return -1 - - # Obtain TS index - ts = int(request[1]) - if ts not in range(0, 8): - log.error("TS index should be in range: 0..7") - return -1 - - # Parse TS type - ts_type = int(request[2]) - - # TS activation / deactivation - # We don't care about ts_type - if ts_type == 0: - # Deactivate TS (remove from TS pass-filter list) - if ts in self.burst_fwd.ts_pass_list: - self.burst_fwd.ts_pass_list.remove(ts) - else: - log.warning("TS %u was not activated before" % ts) - # TODO: uncomment as soon as RESET is introduced - # return -1 - else: - # Activate TS (add to TS pass-filter list) - if ts not in self.burst_fwd.ts_pass_list: - self.burst_fwd.ts_pass_list.append(ts) - else: - log.warning("TS %u was already activated before" % ts) - # TODO: uncomment as soon as RESET is introduced - # return -1 - - return 0 - - # Timing Advance - elif self.verify_cmd(request, "SETTA", 1): - log.debug("Recv SETTA cmd") - - # Save to the BurstForwarder instance - self.burst_fwd.ta = int(request[1]) - return 0 - - # Timing of Arrival simulation for Uplink - # Absolute form: CMD FAKE_TOA <BASE> <THRESH> - elif self.verify_cmd(request, "FAKE_TOA", 2): - log.debug("Recv FAKE_TOA cmd") - - # Parse and apply both base and threshold - self.burst_fwd.toa256_ul_base = int(request[1]) - self.burst_fwd.toa256_ul_threshold = int(request[2]) - - return 0 - - # Timing of Arrival simulation for Uplink - # Relative form: CMD FAKE_TOA <+-BASE_DELTA> - elif self.verify_cmd(request, "FAKE_TOA", 1): - log.debug("Recv FAKE_TOA cmd") - - # Parse and apply delta - self.burst_fwd.toa256_ul_base += int(request[1]) - - return 0 - - # RSSI simulation for Uplink - # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> - elif self.verify_cmd(request, "FAKE_RSSI", 2): - log.debug("Recv FAKE_RSSI cmd") - - # Parse and apply both base and threshold - self.burst_fwd.rssi_ul_base = int(request[1]) - self.burst_fwd.rssi_ul_threshold = int(request[2]) - - return 0 - - # RSSI simulation for Uplink - # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> - elif self.verify_cmd(request, "FAKE_RSSI", 1): - log.debug("Recv FAKE_RSSI cmd") - - # Parse and apply delta - self.burst_fwd.rssi_ul_base += int(request[1]) - - return 0 - - # Path loss simulation for UL: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> - # Dropping pattern: fn % 1 == 0 - elif self.verify_cmd(request, "FAKE_DROP", 1): - log.debug("Recv FAKE_DROP cmd") - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("FAKE_DROP amount shall not be negative") - return -1 - - self.burst_fwd.burst_ul_drop_amount = num - self.burst_fwd.burst_ul_drop_period = 1 - - return 0 - - # Path loss simulation for UL: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> - # Dropping pattern: fn % period == 0 - elif self.verify_cmd(request, "FAKE_DROP", 2): - log.debug("Recv FAKE_DROP cmd") - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("FAKE_DROP amount shall not be negative") - return -1 - - # Parse / validate period - period = int(request[2]) - if period <= 0: - log.error("FAKE_DROP period shall be greater than zero") - return -1 - - self.burst_fwd.burst_ul_drop_amount = num - self.burst_fwd.burst_ul_drop_period = period - - return 0 - - # Wrong / unknown command - else: - # We don't care about other commands, - # so let's merely ignore them ;) - log.debug("Ignore CMD %s" % request[0]) - return 0 diff --git a/src/target/trx_toolkit/ctrl_if_bts.py b/src/target/trx_toolkit/ctrl_if_bts.py deleted file mode 100644 index cb38b67..0000000 --- a/src/target/trx_toolkit/ctrl_if_bts.py +++ /dev/null @@ -1,189 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding: utf-8 -*- - -# TRX Toolkit -# CTRL interface implementation (OsmoBTS specific) -# -# (C) 2016-2017 by Vadim Yanitskiy <axilirator at gmail.com> -# -# All Rights Reserved -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -import logging as log - -from ctrl_if import CTRLInterface - -class CTRLInterfaceBTS(CTRLInterface): - # Internal state variables - trx_started = False - burst_fwd = None - clck_gen = None - rx_freq = None - tx_freq = None - pm = None - - def __init__(self, *udp_link_args): - CTRLInterface.__init__(self, *udp_link_args) - log.info("Init CTRL interface for BTS (%s)" % self.desc_link()) - - def parse_cmd(self, request): - # Power control - if self.verify_cmd(request, "POWERON", 0): - log.debug("Recv POWERON CMD") - - # Ensure transceiver isn't working - if self.trx_started: - log.error("Transceiver already started") - return -1 - - # Ensure RX / TX freq. are set - if (self.rx_freq is None) or (self.tx_freq is None): - log.error("RX / TX freq. are not set") - return -1 - - log.info("Starting transceiver...") - self.trx_started = True - - # Power emulation - if self.pm is not None: - self.pm.add_bts_list([self.tx_freq]) - - # Start clock indications - if self.clck_gen is not None: - self.clck_gen.start() - - return 0 - - elif self.verify_cmd(request, "POWEROFF", 0): - log.debug("Recv POWEROFF cmd") - - log.info("Stopping transceiver...") - self.trx_started = False - - # Power emulation - if self.pm is not None: - self.pm.del_bts_list([self.tx_freq]) - - # Stop clock indications - if self.clck_gen is not None: - self.clck_gen.stop() - - return 0 - - # Tuning Control - elif self.verify_cmd(request, "RXTUNE", 1): - log.debug("Recv RXTUNE cmd") - - # TODO: check freq range - self.rx_freq = int(request[1]) * 1000 - return 0 - - elif self.verify_cmd(request, "TXTUNE", 1): - log.debug("Recv TXTUNE cmd") - - # TODO: check freq range - self.tx_freq = int(request[1]) * 1000 - self.burst_fwd.bts_freq = self.tx_freq - return 0 - - # Timing of Arrival simulation for Downlink - # Absolute form: CMD FAKE_TOA <BASE> <THRESH> - elif self.verify_cmd(request, "FAKE_TOA", 2): - log.debug("Recv FAKE_TOA cmd") - - # Parse and apply both base and threshold - self.burst_fwd.toa256_dl_base = int(request[1]) - self.burst_fwd.toa256_dl_threshold = int(request[2]) - - return 0 - - # Timing of Arrival simulation for Downlink - # Relative form: CMD FAKE_TOA <+-BASE_DELTA> - elif self.verify_cmd(request, "FAKE_TOA", 1): - log.debug("Recv FAKE_TOA cmd") - - # Parse and apply delta - self.burst_fwd.toa256_dl_base += int(request[1]) - - return 0 - - # RSSI simulation for Downlink - # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> - elif self.verify_cmd(request, "FAKE_RSSI", 2): - log.debug("Recv FAKE_RSSI cmd") - - # Parse and apply both base and threshold - self.burst_fwd.rssi_dl_base = int(request[1]) - self.burst_fwd.rssi_dl_threshold = int(request[2]) - - return 0 - - # RSSI simulation for Downlink - # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> - elif self.verify_cmd(request, "FAKE_RSSI", 1): - log.debug("Recv FAKE_RSSI cmd") - - # Parse and apply delta - self.burst_fwd.rssi_dl_base += int(request[1]) - - return 0 - - # Path loss simulation for DL: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> - # Dropping pattern: fn % 1 == 0 - elif self.verify_cmd(request, "FAKE_DROP", 1): - log.debug("Recv FAKE_DROP cmd") - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("FAKE_DROP amount shall not be negative") - return -1 - - self.burst_fwd.burst_dl_drop_amount = num - self.burst_fwd.burst_dl_drop_period = 1 - - return 0 - - # Path loss simulation for DL: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> - # Dropping pattern: fn % period == 0 - elif self.verify_cmd(request, "FAKE_DROP", 2): - log.debug("Recv FAKE_DROP cmd") - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("FAKE_DROP amount shall not be negative") - return -1 - - # Parse / validate period - period = int(request[2]) - if period <= 0: - log.error("FAKE_DROP period shall be greater than zero") - return -1 - - self.burst_fwd.burst_dl_drop_amount = num - self.burst_fwd.burst_dl_drop_period = period - - return 0 - - # Wrong / unknown command - else: - # We don't care about other commands, - # so let's merely ignore them ;) - log.debug("Ignore CMD %s" % request[0]) - return 0 diff --git a/src/target/trx_toolkit/ctrl_if_trx.py b/src/target/trx_toolkit/ctrl_if_trx.py new file mode 100644 index 0000000..1ab1990 --- /dev/null +++ b/src/target/trx_toolkit/ctrl_if_trx.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# CTRL interface implementation (common commands) +# +# (C) 2016-2018 by Vadim Yanitskiy <axilirator at gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import logging as log + +from ctrl_if import CTRLInterface + +class CTRLInterfaceTRX(CTRLInterface): + """ + + """ + + def __init__(self, trx, *udp_link_args): + CTRLInterface.__init__(self, *udp_link_args) + log.info("Init CTRL interface (%s)" % self.desc_link()) + + # Link with Transceiver instance we belong to + self.trx = trx + + def parse_cmd(self, request): + # Custom command handlers (prioritized) + res = self.trx.ctrl_cmd_handler(request) + if res is not None: + return res + + # Power control + if self.verify_cmd(request, "POWERON", 0): + log.debug("Recv POWERON CMD") + + # Ensure transceiver isn't working + if self.trx.running: + log.error("Transceiver already started") + return -1 + + # Ensure RX / TX freq. are set + if (self.trx.rx_freq is None) or (self.trx.tx_freq is None): + log.error("RX / TX freq. are not set") + return -1 + + log.info("Starting transceiver...") + self.trx.running = True + + # Notify transceiver about that + self.trx.poweron_event_handler() + + return 0 + + elif self.verify_cmd(request, "POWEROFF", 0): + log.debug("Recv POWEROFF cmd") + + log.info("Stopping transceiver...") + self.trx.running = False + + # Notify transceiver about that + self.trx.poweroff_event_handler() + + return 0 + + # Tuning Control + elif self.verify_cmd(request, "RXTUNE", 1): + log.debug("Recv RXTUNE cmd") + + # TODO: check freq range + self.trx.rx_freq = int(request[1]) * 1000 + return 0 + + elif self.verify_cmd(request, "TXTUNE", 1): + log.debug("Recv TXTUNE cmd") + + # TODO: check freq range + self.trx.tx_freq = int(request[1]) * 1000 + return 0 + + elif self.verify_cmd(request, "SETSLOT", 2): + log.debug("Recv SETSLOT cmd") + + # Obtain TS index + ts = int(request[1]) + if ts not in range(0, 8): + log.error("TS index should be in range: 0..7") + return -1 + + # Parse TS type + ts_type = int(request[2]) + + # TS activation / deactivation + # We don't care about ts_type + if ts_type == 0: + # Deactivate TS (remove from the list of active timeslots) + if ts in self.trx.ts_list: + self.trx.ts_list.remove(ts) + else: + log.warning("TS %u was not activated before" % ts) + # TODO: uncomment as soon as RESET is introduced + # return -1 + else: + # Activate TS (add to the list of active timeslots) + if ts not in self.trx.ts_list: + self.trx.ts_list.append(ts) + else: + log.warning("TS %u was already activated before" % ts) + # TODO: uncomment as soon as RESET is introduced + # return -1 + + return 0 + + # Wrong / unknown command + else: + # We don't care about other commands, + # so let's merely ignore them ;) + log.debug("Ignore CMD %s" % request[0]) + return 0 diff --git a/src/target/trx_toolkit/fake_pm.py b/src/target/trx_toolkit/fake_pm.py index 840b4e4..b2b176f 100644 --- a/src/target/trx_toolkit/fake_pm.py +++ b/src/target/trx_toolkit/fake_pm.py @@ -2,9 +2,9 @@ # -*- coding: utf-8 -*- # TRX Toolkit -# Power measurement emulation for BB +# Power measurement emulation # -# (C) 2017 by Vadim Yanitskiy <axilirator at gmail.com> +# (C) 2017-2018 by Vadim Yanitskiy <axilirator at gmail.com> # # All Rights Reserved # @@ -25,29 +25,36 @@ from random import randint class FakePM: - # Freq. list for good power level - bts_list = [] + def __init__(self, trx_list, noise_min, noise_max, trx_min, trx_max): + # List of Transceiver instances + self.trx_list = trx_list - def __init__(self, noise_min, noise_max, bts_min, bts_max): # Save power level ranges self.noise_min = noise_min self.noise_max = noise_max - self.bts_min = bts_min - self.bts_max = bts_max + self.trx_min = trx_min + self.trx_max = trx_max - def measure(self, bts): - if bts in self.bts_list: - return randint(self.bts_min, self.bts_max) - else: - return randint(self.noise_min, self.noise_max) + @property + def rssi_noise(self): + return randint(self.noise_min, self.noise_max) - def update_bts_list(self, new_list): - self.bts_list = new_list + @property + def rssi_trx(self): + return randint(self.trx_min, self.trx_max) - def add_bts_list(self, add_list): - self.bts_list += add_list + def measure(self, freq): + # Iterate over all known transceivers + for trx in self.trx_list: + if not trx.running: + continue - def del_bts_list(self, del_list): - for item in del_list: - if item in self.bts_list: - self.bts_list.remove(item) + # FIXME: we should average the rate of bursts + # and indicated power / attenuation values for + # all known transceivers + + # Match given frequency + if trx.tx_freq == freq: + return self.rssi_trx + + return self.rssi_noise diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py index 95261df..3068f83 100755 --- a/src/target/trx_toolkit/fake_trx.py +++ b/src/target/trx_toolkit/fake_trx.py @@ -31,13 +31,250 @@ import sys from app_common import ApplicationBase -from ctrl_if_bts import CTRLInterfaceBTS -from ctrl_if_bb import CTRLInterfaceBB from burst_fwd import BurstForwarder +from transceiver import Transceiver from fake_pm import FakePM from udp_link import UDPLink -from clck_gen import CLCKGen +from clck_router_trx import CLCKRouterTRX + +class FakeTRX(Transceiver): + """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation. + + == ToA / RSSI measurement simulation + + Since this is a virtual environment, we can simulate different + parameters of the virtual RF interface: + + - ToA (Timing of Arrival) - measured difference between expected + and actual time of burst arrival in units of 1/256 of GSM symbol + periods. A pair of both base and threshold values defines a range + of ToA value randomization: + + from (toa256_base - toa256_rand_threshold) + to (toa256_base + toa256_rand_threshold). + + - RSSI (Received Signal Strength Indication) - measured "power" of + the signal (per burst) in dBm. A pair of both base and threshold + values defines a range of RSSI value randomization: + + from (rssi_base - rssi_rand_threshold) + to (rssi_base + rssi_rand_threshold). + + Please note that the randomization of both RSSI and ToA + is optional, and can be enabled from the control interface. + + === Timing Advance handling + + The BTS is using ToA measurements for UL bursts in order to calculate + Timing Advance value, that is then indicated to a MS, which in its turn + shall apply this value to the transmitted signal in order to compensate + the delay. Basically, every burst is transmitted in advance defined by + the indicated Timing Advance value. The valid range is 0..63, where + each unit means one GSM symbol advance. The actual Timing Advance value + is set using SETTA control command from MS. By default, it's set to 0. + + === Path loss simulation - burst dropping + + In some cases, e.g. due to a weak signal or high interference, a burst + can be lost, i.e. not detected by the receiver. This can also be + simulated using FAKE_DROP command on the control interface: + + - burst_drop_amount - the amount of DL/UL bursts + to be dropped (i.e. not forwarded towards the MS/BTS), + + - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0. + + TODO: add some notes about custom CTRL command handler + + """ + + def __init__(self, *transceiver_args): + Transceiver.__init__(self, *transceiver_args) + + # Actual ToA / RSSI / TA values + self.toa256_base = 0 + self.rssi_base = -60 + self.ta = 0 + + # ToA / RSSI randomization threshold + self.toa256_rand_threshold = 0 + self.rssi_rand_threshold = 0 + + # Path loss simulation (burst dropping) + self.burst_drop_amount = 0 + self.burst_drop_period = 1 + + @property + def toa256(self): + # Check if randomization is required + if self.toa256_rand_threshold is 0: + return self.toa256_base + + # Generate a random ToA value in required range + toa256_min = self.toa256_base - self.toa256_rand_threshold + toa256_max = self.toa256_base + self.toa256_rand_threshold + return random.randint(toa256_min, toa256_max) + + @property + def rssi(self): + # Check if randomization is required + if self.rssi_rand_threshold is 0: + return self.rssi_base + + # Generate a random RSSI value in required range + rssi_min = self.rssi_base - self.rssi_rand_threshold + rssi_max = self.rssi_base + self.rssi_rand_threshold + return random.randint(rssi_min, rssi_max) + + # Path loss simulation: burst dropping + # Returns: True - drop, False - keep + def sim_burst_drop(self, msg): + # Check if dropping is required + if self.burst_drop_amount is 0: + return False + + if msg.fn % self.burst_drop_period == 0: + log.info("Simulation: dropping burst (fn=%u %% %u == 0)" + % (msg.fn, self.burst_drop_period)) + self.burst_drop_amount -= 1 + return True + + return False + + def send_data_msg(self, msg): + # Complete message header + msg.toa256 = self.toa256 + msg.rssi = self.rssi + + # Apply optional Timing Advance + if self.ta is not 0: + msg.toa256 -= self.ta * 256 + + # TODO: make legacy mode configurable (via argv?) + self.data_if.send_msg(msg, legacy = True) + + # Triggered by POWERON command on CTRL + def poweron_event_handler(self): + log.debug("Recv POWERON event") + self.clck_router.poke(self) + + # Triggered by POWEROFF command on CTRL + def poweroff_event_handler(self): + log.debug("Recv POWEROFF event") + self.clck_router.poke(self) + + # FakeTRX specific CTRL command handler + def ctrl_cmd_handler(self, request): + # Timing of Arrival simulation + # Absolute form: CMD FAKE_TOA <BASE> <THRESH> + if self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2): + log.debug("Recv FAKE_TOA cmd") + + # Parse and apply both base and threshold + self.toa256_base = int(request[1]) + self.toa256_rand_threshold = int(request[2]) + return 0 + + # Timing of Arrival simulation + # Relative form: CMD FAKE_TOA <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1): + log.debug("Recv FAKE_TOA cmd") + + # Parse and apply delta + self.toa256_base += int(request[1]) + return 0 + + # RSSI simulation + # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> + elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2): + log.debug("Recv FAKE_RSSI cmd") + + # Parse and apply both base and threshold + self.rssi_base = int(request[1]) + self.rssi_rand_threshold = int(request[2]) + return 0 + + # RSSI simulation + # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1): + log.debug("Recv FAKE_RSSI cmd") + + # Parse and apply delta + self.rssi_base += int(request[1]) + return 0 + + # Path loss simulation: burst dropping + # Syntax: CMD FAKE_DROP <AMOUNT> + # Dropping pattern: fn % 1 == 0 + elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1): + log.debug("Recv FAKE_DROP cmd") + + # Parse / validate amount of bursts + num = int(request[1]) + if num < 0: + log.error("FAKE_DROP amount shall not be negative") + return -1 + + self.burst_drop_amount = num + self.burst_drop_period = 1 + return 0 + + # Path loss simulation: burst dropping + # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> + # Dropping pattern: fn % period == 0 + elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2): + log.debug("Recv FAKE_DROP cmd") + + # Parse / validate amount of bursts + num = int(request[1]) + if num < 0: + log.error("FAKE_DROP amount shall not be negative") + return -1 + + # Parse / validate period + period = int(request[2]) + if period <= 0: + log.error("FAKE_DROP period shall be greater than zero") + return -1 + + self.burst_drop_amount = num + self.burst_drop_period = period + return 0 + + # Unhandled command + return None + +class FakeTRX_BTS(FakeTRX): + pass + +class FakeTRX_BB(FakeTRX): + # BB specific CTRL command handler + def ctrl_cmd_handler(self, request): + res = FakeTRX.ctrl_cmd_handler(self, request) + if res is not None: + return res + + # Power measurement + if self.ctrl_if.verify_cmd(request, "MEASURE", 1): + log.debug("Recv MEASURE cmd") + + # TODO: check freq range + meas_freq = int(request[1]) * 1000 + meas_dbm = self.pm.measure(meas_freq) + + return (0, [str(meas_dbm)]) + + # Timing Advance + elif self.ctrl_if.verify_cmd(request, "SETTA", 1): + log.debug("Recv SETTA cmd") + + # Store indicated value + self.trx.ta = int(request[1]) + return 0 + + # Unhandled command + return None class Application(ApplicationBase): def __init__(self): @@ -51,78 +288,65 @@ self.app_init_logging(self.argv) def run(self): - # Init TRX CTRL interface for BTS - self.bts_ctrl = CTRLInterfaceBTS( - self.argv.bts_addr, self.argv.bts_base_port + 101, - self.argv.trx_bind_addr, self.argv.bts_base_port + 1) + # Init TRX instance for BTS + self.bts_trx = FakeTRX_BTS(self.argv.trx_bind_addr, + self.argv.bts_addr, self.argv.bts_base_port) - # Init TRX CTRL interface for BB - self.bb_ctrl = CTRLInterfaceBB( - self.argv.bb_addr, self.argv.bb_base_port + 101, - self.argv.trx_bind_addr, self.argv.bb_base_port + 1) + # Init TRX instance for BB + self.bb_trx = FakeTRX_BB(self.argv.trx_bind_addr, + self.argv.bb_addr, self.argv.bb_base_port) + + # Burst forwarding between transceivers + self.burst_fwd = BurstForwarder() + self.burst_fwd.add_trx(self.bts_trx) + self.burst_fwd.add_trx(self.bb_trx) # Power measurement emulation # Noise: -120 .. -105 # BTS: -75 .. -50 - self.pm = FakePM(-120, -105, -75, -50) + self.pm = FakePM(self.burst_fwd.trx_list, -120, -105, -75, -50) + self.bb_trx.pm = self.pm - # Share a FakePM instance between both BTS and BB - self.bts_ctrl.pm = self.pm - self.bb_ctrl.pm = self.pm - - # Init DATA links - self.bts_data = UDPLink( - self.argv.bts_addr, self.argv.bts_base_port + 102, - self.argv.trx_bind_addr, self.argv.bts_base_port + 2) - self.bb_data = UDPLink( - self.argv.bb_addr, self.argv.bb_base_port + 102, - self.argv.trx_bind_addr, self.argv.bb_base_port + 2) - - # BTS <-> BB burst forwarding - self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data) - - # Share a BurstForwarder instance between BTS and BB - self.bts_ctrl.burst_fwd = self.burst_fwd - self.bb_ctrl.burst_fwd = self.burst_fwd - - # Provide clock to BTS - self.bts_clck = UDPLink( - self.argv.bts_addr, self.argv.bts_base_port + 100, - self.argv.trx_bind_addr, self.argv.bts_base_port) - self.clck_gen = CLCKGen([self.bts_clck]) - self.bts_ctrl.clck_gen = self.clck_gen + # Init clock router + self.clck_router = CLCKRouterTRX(self.burst_fwd.trx_list) + self.bts_trx.clck_router = self.clck_router + self.bb_trx.clck_router = self.clck_router log.info("Init complete") # Enter main loop while True: - socks = [self.bts_ctrl.sock, self.bb_ctrl.sock, - self.bts_data.sock, self.bb_data.sock] + socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock, + self.bts_trx.data_if.sock, self.bb_trx.data_if.sock] # Wait until we get any data on any socket r_event, w_event, x_event = select.select(socks, [], []) # Downlink: BTS -> BB - if self.bts_data.sock in r_event: - self.burst_fwd.bts2bb() + if self.bts_trx.data_if.sock in r_event: + msg = self.bts_trx.recv_data_msg() + if msg is not None: + self.burst_fwd.forward_msg(self.bts_trx, msg) # Uplink: BB -> BTS - if self.bb_data.sock in r_event: - self.burst_fwd.bb2bts() + if self.bb_trx.data_if.sock in r_event: + msg = self.bb_trx.recv_data_msg() + if msg is not None: + self.burst_fwd.forward_msg(self.bb_trx, msg) # CTRL commands from BTS - if self.bts_ctrl.sock in r_event: - self.bts_ctrl.handle_rx() + if self.bts_trx.ctrl_if.sock in r_event: + self.bts_trx.ctrl_if.handle_rx() # CTRL commands from BB - if self.bb_ctrl.sock in r_event: - self.bb_ctrl.handle_rx() + if self.bb_trx.ctrl_if.sock in r_event: + self.bb_trx.ctrl_if.handle_rx() def shutdown(self): log.info("Shutting down...") # Stop clock generator - self.clck_gen.stop() + self.clck_router.stop() def parse_argv(self): parser = argparse.ArgumentParser(prog = "fake_trx", diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py new file mode 100644 index 0000000..f83d88c --- /dev/null +++ b/src/target/trx_toolkit/transceiver.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Transceiver implementation +# +# (C) 2018 by Vadim Yanitskiy <axilirator at gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import logging as log + +from ctrl_if_trx import CTRLInterfaceTRX +from data_if import DATAInterface +from udp_link import UDPLink +from data_msg import * + +class Transceiver: + """ Base transceiver class implementation. + + Represents a single transceiver, that can be used as for the BTS side, + as for the MS side. Usually, a single transceiver has DATA, CTRL and + CLCK interfaces, but since clock interface is not used by the MS side + at the moment, it is not being initialized. + + """ + + def __init__(self, bind_addr, remote_addr, base_port): + # Init DATA interface + self.data_if = DATAInterface( + remote_addr, base_port + 102, + bind_addr, base_port + 2) + + # Init CTRL interface + self.ctrl_if = CTRLInterfaceTRX(self, + remote_addr, base_port + 101, + bind_addr, base_port + 1) + + # Init CLCK interface + self.clck_if = UDPLink( + remote_addr, base_port + 100, + bind_addr, base_port) + + # Connection info + self.remote_addr = remote_addr + self.bind_addr = bind_addr + self.base_port = base_port + + # Internal state + self.running = False + + # Actual RX / TX frequencies + self.rx_freq = None + self.tx_freq = None + + # List of active (configured) timeslots + self.ts_list = [] + + def recv_data_msg(self): + # Read and parse data from socket + msg = self.data_if.recv_l12trx_msg() + if not msg: + return None + + # Make sure that transceiver is configured and running + if not self.running: + log.warning("RX DATA message (%s), but transceiver " + "is not running => dropping..." % msg.desc_hdr()) + return None + + # Make sure that indicated timeslot is configured + if msg.tn not in self.ts_list: + log.warning("RX DATA message (%s), but timeslot " + "is not configured => dropping..." % msg.desc_hdr()) + return None + + return msg -- To view, visit https://gerrit.osmocom.org/12264 To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings Gerrit-Project: osmocom-bb Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: Ice44e2b22566b3652ef6d43896055963b13ab185 Gerrit-Change-Number: 12264 Gerrit-PatchSet: 1 Gerrit-Owner: Vadim Yanitskiy <axilirator at gmail.com> -------------- next part -------------- An HTML attachment was scrubbed... URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20181211/b00695ef/attachment.htm>