kirr has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmocom-bb/+/40051?usp=email )
Change subject: trx_toolkit/fake_trx: Split it into fake_trx and _fake_trx modules ......................................................................
trx_toolkit/fake_trx: Split it into fake_trx and _fake_trx modules
fake_trx will remain at Python while _fake_trx will be later converted to Cython for speed to avoid py-related overhead. This patch does only plain code movement as a preparatory step for that.
Change-Id: Iadffd49de8197564e57bfd9cb660b1d11136ffd4 --- A src/target/trx_toolkit/_fake_trx.py M src/target/trx_toolkit/fake_trx.py 2 files changed, 415 insertions(+), 395 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/51/40051/1
diff --git a/src/target/trx_toolkit/_fake_trx.py b/src/target/trx_toolkit/_fake_trx.py new file mode 100644 index 0000000..87a352b --- /dev/null +++ b/src/target/trx_toolkit/_fake_trx.py @@ -0,0 +1,414 @@ +# TRX Toolkit +# Virtual Um-interface (fake transceiver) +# +# (C) 2017-2019 by Vadim Yanitskiy axilirator@gmail.com +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import logging as log +import random +import select + +from burst_fwd import BurstForwarder +from transceiver import Transceiver +from data_msg import Modulation +from gsm_shared import * + + +class FakeTRX(Transceiver): + """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation. + + == ToA / RSSI measurement simulation + + Since this is a virtual environment, we can simulate different + parameters of the physical RF interface: + + - ToA (Timing of Arrival) - measured difference between expected + and actual time of burst arrival in units of 1/256 of GSM symbol + periods. A pair of both base and threshold values defines a range + of ToA value randomization: + + from (toa256_base - toa256_rand_threshold) + to (toa256_base + toa256_rand_threshold). + + - RSSI (Received Signal Strength Indication) - measured "power" of + the signal (per burst) in dBm. A pair of both base and threshold + values defines a range of RSSI value randomization: + + from (rssi_base - rssi_rand_threshold) + to (rssi_base + rssi_rand_threshold). + + - C/I (Carrier-to-Interference ratio) - value in cB (centiBels), + computed from the training sequence of each received burst, by + comparing the "ideal" training sequence with the actual one. + A pair of both base and threshold values defines a range of + C/I randomization: + + from (ci_base - ci_rand_threshold) + to (ci_base + ci_rand_threshold). + + Please note that the randomization is optional and disabled by default. + + == Timing Advance handling + + The BTS is using ToA measurements for UL bursts in order to calculate + Timing Advance value, that is then indicated to a MS, which in its turn + shall apply this value to the transmitted signal in order to compensate + the delay. Basically, every burst is transmitted in advance defined by + the indicated Timing Advance value. The valid range is 0..63, where + each unit means one GSM symbol advance. The actual Timing Advance value + is set using SETTA control command from MS. By default, it's set to 0. + + == Path loss simulation + + === Burst dropping + + In some cases, e.g. due to a weak signal or high interference, a burst + can be lost, i.e. not detected by the receiver. This can also be + simulated using FAKE_DROP command on the control interface: + + - burst_drop_amount - the amount of DL/UL bursts + to be dropped (i.e. not forwarded towards the MS/BTS), + + - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0. + + == Configuration + + All simulation parameters mentioned above can be changed at runtime + using the commands with prefix 'FAKE_' on the control interface. + All of them are handled by our custom CTRL command handler. + + """ + + NOMINAL_TX_POWER_DEFAULT = 50 # dBm + TX_ATT_DEFAULT = 0 # dB + PATH_LOSS_DEFAULT = 110 # dB + + TOA256_BASE_DEFAULT = 0 + CI_BASE_DEFAULT = 90 + + # Default values for NOPE / IDLE indications + TOA256_NOISE_DEFAULT = 0 + RSSI_NOISE_DEFAULT = -110 + CI_NOISE_DEFAULT = -30 + + def __init__(self, *trx_args, **trx_kwargs): + Transceiver.__init__(self, *trx_args, **trx_kwargs) + + # fake RSSI is disabled by default, only enabled through TRXC FAKE_RSSI. + # When disabled, RSSI is calculated based on Tx power and Rx path loss + self.fake_rssi_enabled = False + + self.rf_muted = False + + # Actual ToA, RSSI, C/I, TA values + self.tx_power_base = self.NOMINAL_TX_POWER_DEFAULT + self.tx_att_base = self.TX_ATT_DEFAULT + self.toa256_base = self.TOA256_BASE_DEFAULT + self.rssi_base = self.NOMINAL_TX_POWER_DEFAULT - self.TX_ATT_DEFAULT - self.PATH_LOSS_DEFAULT + self.ci_base = self.CI_BASE_DEFAULT + self.ta = 0 + + # ToA, RSSI, C/I randomization thresholds + self.toa256_rand_threshold = 0 + self.rssi_rand_threshold = 0 + self.ci_rand_threshold = 0 + + # Path loss simulation (burst dropping) + self.burst_drop_amount = 0 + self.burst_drop_period = 1 + + @property + def toa256(self): + # Check if randomization is required + if self.toa256_rand_threshold == 0: + return self.toa256_base + + # Generate a random ToA value in required range + toa256_min = self.toa256_base - self.toa256_rand_threshold + toa256_max = self.toa256_base + self.toa256_rand_threshold + return random.randint(toa256_min, toa256_max) + + @property + def rssi(self): + # Check if randomization is required + if self.rssi_rand_threshold == 0: + return self.rssi_base + + # Generate a random RSSI value in required range + rssi_min = self.rssi_base - self.rssi_rand_threshold + rssi_max = self.rssi_base + self.rssi_rand_threshold + return random.randint(rssi_min, rssi_max) + + @property + def tx_power(self): + return self.tx_power_base - self.tx_att_base + + @property + def ci(self): + # Check if randomization is required + if self.ci_rand_threshold == 0: + return self.ci_base + + # Generate a random C/I value in required range + ci_min = self.ci_base - self.ci_rand_threshold + ci_max = self.ci_base + self.ci_rand_threshold + return random.randint(ci_min, ci_max) + + # Path loss simulation: burst dropping + # Returns: True - drop, False - keep + def sim_burst_drop(self, msg): + # Check if dropping is required + if self.burst_drop_amount == 0: + return False + + if msg.fn % self.burst_drop_period == 0: + log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)" + % (self, msg.fn, self.burst_drop_period)) + self.burst_drop_amount -= 1 + return True + + return False + + def _handle_data_msg_v1(self, src_msg, msg): + # C/I (Carrier-to-Interference ratio) + msg.ci = self.ci + + # Pick modulation type by burst length + bl = len(src_msg.burst) + msg.mod_type = Modulation.pick_by_bl(bl) + + # Pick TSC (Training Sequence Code) and TSC set + if msg.mod_type is Modulation.ModGMSK: + ss = TrainingSeqGMSK.pick(src_msg.burst) + msg.tsc = ss.tsc if ss is not None else 0 + msg.tsc_set = ss.tsc_set if ss is not None else 0 + else: # TODO: other modulation types (at least 8-PSK) + msg.tsc_set = 0 + msg.tsc = 0 + + # Takes (partially initialized) TRXD Rx message, + # simulates RF path parameters (such as RSSI), + # and sends towards the L1 + def handle_data_msg(self, src_trx, src_msg, msg): + if self.rf_muted: + msg.nope_ind = True + elif not msg.nope_ind: + # Path loss simulation + msg.nope_ind = self.sim_burst_drop(msg) + if msg.nope_ind: + # Before TRXDv1, we simply drop the message + if msg.ver < 0x01: + del msg + return + + # Since TRXDv1, we should send a NOPE.ind + msg.burst = None # burst bits are omited + + # TODO: shoud we make these values configurable? + msg.toa256 = self.TOA256_NOISE_DEFAULT + msg.rssi = self.RSSI_NOISE_DEFAULT + msg.ci = self.CI_NOISE_DEFAULT + + self.data_if.send_msg(msg) + return + + # Complete message header + msg.toa256 = self.toa256 + + # Apply RSSI based on transmitter: + if not self.fake_rssi_enabled: + msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT + else: # Apply fake RSSI + msg.rssi = self.rssi + + # Version specific fields + if msg.ver >= 0x01: + self._handle_data_msg_v1(src_msg, msg) + + # Apply optional Timing Advance + if src_trx.ta != 0: + msg.toa256 -= src_trx.ta * 256 + + Transceiver.handle_data_msg(self, msg) + + # Simulation specific CTRL command handler + def ctrl_cmd_handler(self, request): + # Timing Advance + # Syntax: CMD SETTA <TA> + if self.ctrl_if.verify_cmd(request, "SETTA", 1): + log.debug("(%s) Recv SETTA cmd" % self) + + # Store indicated value + self.ta = int(request[1]) + return 0 + + # Timing of Arrival simulation + # Absolute form: CMD FAKE_TOA <BASE> <THRESH> + elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2): + log.debug("(%s) Recv FAKE_TOA cmd" % self) + + # Parse and apply both base and threshold + self.toa256_base = int(request[1]) + self.toa256_rand_threshold = int(request[2]) + return 0 + + # Timing of Arrival simulation + # Relative form: CMD FAKE_TOA <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1): + log.debug("(%s) Recv FAKE_TOA cmd" % self) + + # Parse and apply delta + self.toa256_base += int(request[1]) + return 0 + + # RSSI simulation + # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> + elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2): + log.debug("(%s) Recv FAKE_RSSI cmd" % self) + + # Use negative threshold to disable fake_rssi if previously enabled: + if int(request[2]) < 0: + self.fake_rssi_enabled = False + return 0 + + # Parse and apply both base and threshold + self.rssi_base = int(request[1]) + self.rssi_rand_threshold = int(request[2]) + self.fake_rssi_enabled = True + return 0 + + # RSSI simulation + # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1): + log.debug("(%s) Recv FAKE_RSSI cmd" % self) + + # Parse and apply delta + self.rssi_base += int(request[1]) + return 0 + + # C/I simulation + # Absolute form: CMD FAKE_CI <BASE> <THRESH> + elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 2): + log.debug("(%s) Recv FAKE_CI cmd" % self) + + # Parse and apply both base and threshold + self.ci_base = int(request[1]) + self.ci_rand_threshold = int(request[2]) + return 0 + + # C/I simulation + # Relative form: CMD FAKE_CI <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 1): + log.debug("(%s) Recv FAKE_CI cmd" % self) + + # Parse and apply delta + self.ci_base += int(request[1]) + return 0 + + # Path loss simulation: burst dropping + # Syntax: CMD FAKE_DROP <AMOUNT> + # Dropping pattern: fn % 1 == 0 + elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1): + log.debug("(%s) Recv FAKE_DROP cmd" % self) + + # Parse / validate amount of bursts + num = int(request[1]) + if num < 0: + log.error("(%s) FAKE_DROP amount shall not " + "be negative" % self) + return -1 + + self.burst_drop_amount = num + self.burst_drop_period = 1 + return 0 + + # Path loss simulation: burst dropping + # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> + # Dropping pattern: fn % period == 0 + elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2): + log.debug("(%s) Recv FAKE_DROP cmd" % self) + + # Parse / validate amount of bursts + num = int(request[1]) + if num < 0: + log.error("(%s) FAKE_DROP amount shall not " + "be negative" % self) + return -1 + + # Parse / validate period + period = int(request[2]) + if period <= 0: + log.error("(%s) FAKE_DROP period shall " + "be greater than zero" % self) + return -1 + + self.burst_drop_amount = num + self.burst_drop_period = period + return 0 + + # Artificial delay for the TRXC interface + # Syntax: CMD FAKE_TRXC_DELAY <DELAY_MS> + elif self.ctrl_if.verify_cmd(request, "FAKE_TRXC_DELAY", 1): + log.debug("(%s) Recv FAKE_TRXC_DELAY cmd", self) + + self.ctrl_if.rsp_delay_ms = int(request[1]) + log.info("(%s) Artificial TRXC delay set to %d", + self, self.ctrl_if.rsp_delay_ms) + + # Unhandled command + return None + + +# Runner organizes execution of several FakeTRX instances with common clock. +class Runner: + def __init__(self, clck_gen, trx_list): + self.clck_gen = clck_gen + self.trx_list = trx_list.trx_list + + # This method will be called on each TDMA frame + self.clck_gen.clck_handler = self.clck_handler + + self.burst_fwd = BurstForwarder(trx_list.trx_list) + + # This method will be called by the clock generator + def clck_handler(self, fn): + # We assume that this list is immutable at run-time + for trx in self.trx_list: + trx.clck_tick(self.burst_fwd, fn) + + # loops runs IO loop on specified CLCKGen and TRXes forever. + def loop(self): + sock_list = [self.clck_gen._timerfd] + for trx in self.trx_list: + sock_list.append(trx.ctrl_if.sock) + sock_list.append(trx.data_if.sock) + + # Enter main loop + while True: + # Wait until we get any data on any socket + r_event, _, _ = select.select(sock_list, [], []) + + # clock is priority + if self.clck_gen._timerfd in r_event: + self.clck_gen.tick() + + # Iterate over all transceivers + for trx in self.trx_list: + # DATA interface + if trx.data_if.sock in r_event: + trx.recv_data_msg() + + # CTRL interface + if trx.ctrl_if.sock in r_event: + trx.ctrl_if.handle_rx() diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py index 9bf9efd..e72a52b 100755 --- a/src/target/trx_toolkit/fake_trx.py +++ b/src/target/trx_toolkit/fake_trx.py @@ -23,409 +23,15 @@ import logging as log import signal import argparse -import random -import select import sys import re import os
from app_common import ApplicationBase -from burst_fwd import BurstForwarder -from transceiver import Transceiver -from data_msg import Modulation from clck_gen import CLCKGen from trx_list import TRXList from fake_pm import FakePM -from gsm_shared import * - -class FakeTRX(Transceiver): - """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation. - - == ToA / RSSI measurement simulation - - Since this is a virtual environment, we can simulate different - parameters of the physical RF interface: - - - ToA (Timing of Arrival) - measured difference between expected - and actual time of burst arrival in units of 1/256 of GSM symbol - periods. A pair of both base and threshold values defines a range - of ToA value randomization: - - from (toa256_base - toa256_rand_threshold) - to (toa256_base + toa256_rand_threshold). - - - RSSI (Received Signal Strength Indication) - measured "power" of - the signal (per burst) in dBm. A pair of both base and threshold - values defines a range of RSSI value randomization: - - from (rssi_base - rssi_rand_threshold) - to (rssi_base + rssi_rand_threshold). - - - C/I (Carrier-to-Interference ratio) - value in cB (centiBels), - computed from the training sequence of each received burst, by - comparing the "ideal" training sequence with the actual one. - A pair of both base and threshold values defines a range of - C/I randomization: - - from (ci_base - ci_rand_threshold) - to (ci_base + ci_rand_threshold). - - Please note that the randomization is optional and disabled by default. - - == Timing Advance handling - - The BTS is using ToA measurements for UL bursts in order to calculate - Timing Advance value, that is then indicated to a MS, which in its turn - shall apply this value to the transmitted signal in order to compensate - the delay. Basically, every burst is transmitted in advance defined by - the indicated Timing Advance value. The valid range is 0..63, where - each unit means one GSM symbol advance. The actual Timing Advance value - is set using SETTA control command from MS. By default, it's set to 0. - - == Path loss simulation - - === Burst dropping - - In some cases, e.g. due to a weak signal or high interference, a burst - can be lost, i.e. not detected by the receiver. This can also be - simulated using FAKE_DROP command on the control interface: - - - burst_drop_amount - the amount of DL/UL bursts - to be dropped (i.e. not forwarded towards the MS/BTS), - - - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0. - - == Configuration - - All simulation parameters mentioned above can be changed at runtime - using the commands with prefix 'FAKE_' on the control interface. - All of them are handled by our custom CTRL command handler. - - """ - - NOMINAL_TX_POWER_DEFAULT = 50 # dBm - TX_ATT_DEFAULT = 0 # dB - PATH_LOSS_DEFAULT = 110 # dB - - TOA256_BASE_DEFAULT = 0 - CI_BASE_DEFAULT = 90 - - # Default values for NOPE / IDLE indications - TOA256_NOISE_DEFAULT = 0 - RSSI_NOISE_DEFAULT = -110 - CI_NOISE_DEFAULT = -30 - - def __init__(self, *trx_args, **trx_kwargs): - Transceiver.__init__(self, *trx_args, **trx_kwargs) - - # fake RSSI is disabled by default, only enabled through TRXC FAKE_RSSI. - # When disabled, RSSI is calculated based on Tx power and Rx path loss - self.fake_rssi_enabled = False - - self.rf_muted = False - - # Actual ToA, RSSI, C/I, TA values - self.tx_power_base = self.NOMINAL_TX_POWER_DEFAULT - self.tx_att_base = self.TX_ATT_DEFAULT - self.toa256_base = self.TOA256_BASE_DEFAULT - self.rssi_base = self.NOMINAL_TX_POWER_DEFAULT - self.TX_ATT_DEFAULT - self.PATH_LOSS_DEFAULT - self.ci_base = self.CI_BASE_DEFAULT - self.ta = 0 - - # ToA, RSSI, C/I randomization thresholds - self.toa256_rand_threshold = 0 - self.rssi_rand_threshold = 0 - self.ci_rand_threshold = 0 - - # Path loss simulation (burst dropping) - self.burst_drop_amount = 0 - self.burst_drop_period = 1 - - @property - def toa256(self): - # Check if randomization is required - if self.toa256_rand_threshold == 0: - return self.toa256_base - - # Generate a random ToA value in required range - toa256_min = self.toa256_base - self.toa256_rand_threshold - toa256_max = self.toa256_base + self.toa256_rand_threshold - return random.randint(toa256_min, toa256_max) - - @property - def rssi(self): - # Check if randomization is required - if self.rssi_rand_threshold == 0: - return self.rssi_base - - # Generate a random RSSI value in required range - rssi_min = self.rssi_base - self.rssi_rand_threshold - rssi_max = self.rssi_base + self.rssi_rand_threshold - return random.randint(rssi_min, rssi_max) - - @property - def tx_power(self): - return self.tx_power_base - self.tx_att_base - - @property - def ci(self): - # Check if randomization is required - if self.ci_rand_threshold == 0: - return self.ci_base - - # Generate a random C/I value in required range - ci_min = self.ci_base - self.ci_rand_threshold - ci_max = self.ci_base + self.ci_rand_threshold - return random.randint(ci_min, ci_max) - - # Path loss simulation: burst dropping - # Returns: True - drop, False - keep - def sim_burst_drop(self, msg): - # Check if dropping is required - if self.burst_drop_amount == 0: - return False - - if msg.fn % self.burst_drop_period == 0: - log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)" - % (self, msg.fn, self.burst_drop_period)) - self.burst_drop_amount -= 1 - return True - - return False - - def _handle_data_msg_v1(self, src_msg, msg): - # C/I (Carrier-to-Interference ratio) - msg.ci = self.ci - - # Pick modulation type by burst length - bl = len(src_msg.burst) - msg.mod_type = Modulation.pick_by_bl(bl) - - # Pick TSC (Training Sequence Code) and TSC set - if msg.mod_type is Modulation.ModGMSK: - ss = TrainingSeqGMSK.pick(src_msg.burst) - msg.tsc = ss.tsc if ss is not None else 0 - msg.tsc_set = ss.tsc_set if ss is not None else 0 - else: # TODO: other modulation types (at least 8-PSK) - msg.tsc_set = 0 - msg.tsc = 0 - - # Takes (partially initialized) TRXD Rx message, - # simulates RF path parameters (such as RSSI), - # and sends towards the L1 - def handle_data_msg(self, src_trx, src_msg, msg): - if self.rf_muted: - msg.nope_ind = True - elif not msg.nope_ind: - # Path loss simulation - msg.nope_ind = self.sim_burst_drop(msg) - if msg.nope_ind: - # Before TRXDv1, we simply drop the message - if msg.ver < 0x01: - del msg - return - - # Since TRXDv1, we should send a NOPE.ind - msg.burst = None # burst bits are omited - - # TODO: shoud we make these values configurable? - msg.toa256 = self.TOA256_NOISE_DEFAULT - msg.rssi = self.RSSI_NOISE_DEFAULT - msg.ci = self.CI_NOISE_DEFAULT - - self.data_if.send_msg(msg) - return - - # Complete message header - msg.toa256 = self.toa256 - - # Apply RSSI based on transmitter: - if not self.fake_rssi_enabled: - msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT - else: # Apply fake RSSI - msg.rssi = self.rssi - - # Version specific fields - if msg.ver >= 0x01: - self._handle_data_msg_v1(src_msg, msg) - - # Apply optional Timing Advance - if src_trx.ta != 0: - msg.toa256 -= src_trx.ta * 256 - - Transceiver.handle_data_msg(self, msg) - - # Simulation specific CTRL command handler - def ctrl_cmd_handler(self, request): - # Timing Advance - # Syntax: CMD SETTA <TA> - if self.ctrl_if.verify_cmd(request, "SETTA", 1): - log.debug("(%s) Recv SETTA cmd" % self) - - # Store indicated value - self.ta = int(request[1]) - return 0 - - # Timing of Arrival simulation - # Absolute form: CMD FAKE_TOA <BASE> <THRESH> - elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2): - log.debug("(%s) Recv FAKE_TOA cmd" % self) - - # Parse and apply both base and threshold - self.toa256_base = int(request[1]) - self.toa256_rand_threshold = int(request[2]) - return 0 - - # Timing of Arrival simulation - # Relative form: CMD FAKE_TOA <+-BASE_DELTA> - elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1): - log.debug("(%s) Recv FAKE_TOA cmd" % self) - - # Parse and apply delta - self.toa256_base += int(request[1]) - return 0 - - # RSSI simulation - # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> - elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2): - log.debug("(%s) Recv FAKE_RSSI cmd" % self) - - # Use negative threshold to disable fake_rssi if previously enabled: - if int(request[2]) < 0: - self.fake_rssi_enabled = False - return 0 - - # Parse and apply both base and threshold - self.rssi_base = int(request[1]) - self.rssi_rand_threshold = int(request[2]) - self.fake_rssi_enabled = True - return 0 - - # RSSI simulation - # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> - elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1): - log.debug("(%s) Recv FAKE_RSSI cmd" % self) - - # Parse and apply delta - self.rssi_base += int(request[1]) - return 0 - - # C/I simulation - # Absolute form: CMD FAKE_CI <BASE> <THRESH> - elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 2): - log.debug("(%s) Recv FAKE_CI cmd" % self) - - # Parse and apply both base and threshold - self.ci_base = int(request[1]) - self.ci_rand_threshold = int(request[2]) - return 0 - - # C/I simulation - # Relative form: CMD FAKE_CI <+-BASE_DELTA> - elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 1): - log.debug("(%s) Recv FAKE_CI cmd" % self) - - # Parse and apply delta - self.ci_base += int(request[1]) - return 0 - - # Path loss simulation: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> - # Dropping pattern: fn % 1 == 0 - elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1): - log.debug("(%s) Recv FAKE_DROP cmd" % self) - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("(%s) FAKE_DROP amount shall not " - "be negative" % self) - return -1 - - self.burst_drop_amount = num - self.burst_drop_period = 1 - return 0 - - # Path loss simulation: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> - # Dropping pattern: fn % period == 0 - elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2): - log.debug("(%s) Recv FAKE_DROP cmd" % self) - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("(%s) FAKE_DROP amount shall not " - "be negative" % self) - return -1 - - # Parse / validate period - period = int(request[2]) - if period <= 0: - log.error("(%s) FAKE_DROP period shall " - "be greater than zero" % self) - return -1 - - self.burst_drop_amount = num - self.burst_drop_period = period - return 0 - - # Artificial delay for the TRXC interface - # Syntax: CMD FAKE_TRXC_DELAY <DELAY_MS> - elif self.ctrl_if.verify_cmd(request, "FAKE_TRXC_DELAY", 1): - log.debug("(%s) Recv FAKE_TRXC_DELAY cmd", self) - - self.ctrl_if.rsp_delay_ms = int(request[1]) - log.info("(%s) Artificial TRXC delay set to %d", - self, self.ctrl_if.rsp_delay_ms) - - # Unhandled command - return None - - -# Runner organizes execution of several FakeTRX instances with common clock. -class Runner: - def __init__(self, clck_gen, trx_list): - self.clck_gen = clck_gen - self.trx_list = trx_list.trx_list - - # This method will be called on each TDMA frame - self.clck_gen.clck_handler = self.clck_handler - - self.burst_fwd = BurstForwarder(trx_list.trx_list) - - # This method will be called by the clock generator - def clck_handler(self, fn): - # We assume that this list is immutable at run-time - for trx in self.trx_list: - trx.clck_tick(self.burst_fwd, fn) - - # loops runs IO loop on specified CLCKGen and TRXes forever. - def loop(self): - sock_list = [self.clck_gen._timerfd] - for trx in self.trx_list: - sock_list.append(trx.ctrl_if.sock) - sock_list.append(trx.data_if.sock) - - # Enter main loop - while True: - # Wait until we get any data on any socket - r_event, _, _ = select.select(sock_list, [], []) - - # clock is priority - if self.clck_gen._timerfd in r_event: - self.clck_gen.tick() - - # Iterate over all transceivers - for trx in self.trx_list: - # DATA interface - if trx.data_if.sock in r_event: - trx.recv_data_msg() - - # CTRL interface - if trx.ctrl_if.sock in r_event: - trx.ctrl_if.handle_rx() - +from _fake_trx import FakeTRX, Runner
class Application(ApplicationBase): def __init__(self):