kirr has uploaded this change for review. (
https://gerrit.osmocom.org/c/osmocom-bb/+/40051?usp=email )
Change subject: trx_toolkit/fake_trx: Split it into fake_trx and _fake_trx modules
......................................................................
trx_toolkit/fake_trx: Split it into fake_trx and _fake_trx modules
fake_trx will remain at Python while _fake_trx will be later converted
to Cython for speed to avoid py-related overhead. This patch does only
plain code movement as a preparatory step for that.
Change-Id: Iadffd49de8197564e57bfd9cb660b1d11136ffd4
---
A src/target/trx_toolkit/_fake_trx.py
M src/target/trx_toolkit/fake_trx.py
2 files changed, 415 insertions(+), 395 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmocom-bb refs/changes/51/40051/1
diff --git a/src/target/trx_toolkit/_fake_trx.py b/src/target/trx_toolkit/_fake_trx.py
new file mode 100644
index 0000000..87a352b
--- /dev/null
+++ b/src/target/trx_toolkit/_fake_trx.py
@@ -0,0 +1,414 @@
+# TRX Toolkit
+# Virtual Um-interface (fake transceiver)
+#
+# (C) 2017-2019 by Vadim Yanitskiy <axilirator(a)gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+import logging as log
+import random
+import select
+
+from burst_fwd import BurstForwarder
+from transceiver import Transceiver
+from data_msg import Modulation
+from gsm_shared import *
+
+
+class FakeTRX(Transceiver):
+ """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA)
simulation.
+
+ == ToA / RSSI measurement simulation
+
+ Since this is a virtual environment, we can simulate different
+ parameters of the physical RF interface:
+
+ - ToA (Timing of Arrival) - measured difference between expected
+ and actual time of burst arrival in units of 1/256 of GSM symbol
+ periods. A pair of both base and threshold values defines a range
+ of ToA value randomization:
+
+ from (toa256_base - toa256_rand_threshold)
+ to (toa256_base + toa256_rand_threshold).
+
+ - RSSI (Received Signal Strength Indication) - measured "power" of
+ the signal (per burst) in dBm. A pair of both base and threshold
+ values defines a range of RSSI value randomization:
+
+ from (rssi_base - rssi_rand_threshold)
+ to (rssi_base + rssi_rand_threshold).
+
+ - C/I (Carrier-to-Interference ratio) - value in cB (centiBels),
+ computed from the training sequence of each received burst, by
+ comparing the "ideal" training sequence with the actual one.
+ A pair of both base and threshold values defines a range of
+ C/I randomization:
+
+ from (ci_base - ci_rand_threshold)
+ to (ci_base + ci_rand_threshold).
+
+ Please note that the randomization is optional and disabled by default.
+
+ == Timing Advance handling
+
+ The BTS is using ToA measurements for UL bursts in order to calculate
+ Timing Advance value, that is then indicated to a MS, which in its turn
+ shall apply this value to the transmitted signal in order to compensate
+ the delay. Basically, every burst is transmitted in advance defined by
+ the indicated Timing Advance value. The valid range is 0..63, where
+ each unit means one GSM symbol advance. The actual Timing Advance value
+ is set using SETTA control command from MS. By default, it's set to 0.
+
+ == Path loss simulation
+
+ === Burst dropping
+
+ In some cases, e.g. due to a weak signal or high interference, a burst
+ can be lost, i.e. not detected by the receiver. This can also be
+ simulated using FAKE_DROP command on the control interface:
+
+ - burst_drop_amount - the amount of DL/UL bursts
+ to be dropped (i.e. not forwarded towards the MS/BTS),
+
+ - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
+
+ == Configuration
+
+ All simulation parameters mentioned above can be changed at runtime
+ using the commands with prefix 'FAKE_' on the control interface.
+ All of them are handled by our custom CTRL command handler.
+
+ """
+
+ NOMINAL_TX_POWER_DEFAULT = 50 # dBm
+ TX_ATT_DEFAULT = 0 # dB
+ PATH_LOSS_DEFAULT = 110 # dB
+
+ TOA256_BASE_DEFAULT = 0
+ CI_BASE_DEFAULT = 90
+
+ # Default values for NOPE / IDLE indications
+ TOA256_NOISE_DEFAULT = 0
+ RSSI_NOISE_DEFAULT = -110
+ CI_NOISE_DEFAULT = -30
+
+ def __init__(self, *trx_args, **trx_kwargs):
+ Transceiver.__init__(self, *trx_args, **trx_kwargs)
+
+ # fake RSSI is disabled by default, only enabled through TRXC FAKE_RSSI.
+ # When disabled, RSSI is calculated based on Tx power and Rx path loss
+ self.fake_rssi_enabled = False
+
+ self.rf_muted = False
+
+ # Actual ToA, RSSI, C/I, TA values
+ self.tx_power_base = self.NOMINAL_TX_POWER_DEFAULT
+ self.tx_att_base = self.TX_ATT_DEFAULT
+ self.toa256_base = self.TOA256_BASE_DEFAULT
+ self.rssi_base = self.NOMINAL_TX_POWER_DEFAULT - self.TX_ATT_DEFAULT -
self.PATH_LOSS_DEFAULT
+ self.ci_base = self.CI_BASE_DEFAULT
+ self.ta = 0
+
+ # ToA, RSSI, C/I randomization thresholds
+ self.toa256_rand_threshold = 0
+ self.rssi_rand_threshold = 0
+ self.ci_rand_threshold = 0
+
+ # Path loss simulation (burst dropping)
+ self.burst_drop_amount = 0
+ self.burst_drop_period = 1
+
+ @property
+ def toa256(self):
+ # Check if randomization is required
+ if self.toa256_rand_threshold == 0:
+ return self.toa256_base
+
+ # Generate a random ToA value in required range
+ toa256_min = self.toa256_base - self.toa256_rand_threshold
+ toa256_max = self.toa256_base + self.toa256_rand_threshold
+ return random.randint(toa256_min, toa256_max)
+
+ @property
+ def rssi(self):
+ # Check if randomization is required
+ if self.rssi_rand_threshold == 0:
+ return self.rssi_base
+
+ # Generate a random RSSI value in required range
+ rssi_min = self.rssi_base - self.rssi_rand_threshold
+ rssi_max = self.rssi_base + self.rssi_rand_threshold
+ return random.randint(rssi_min, rssi_max)
+
+ @property
+ def tx_power(self):
+ return self.tx_power_base - self.tx_att_base
+
+ @property
+ def ci(self):
+ # Check if randomization is required
+ if self.ci_rand_threshold == 0:
+ return self.ci_base
+
+ # Generate a random C/I value in required range
+ ci_min = self.ci_base - self.ci_rand_threshold
+ ci_max = self.ci_base + self.ci_rand_threshold
+ return random.randint(ci_min, ci_max)
+
+ # Path loss simulation: burst dropping
+ # Returns: True - drop, False - keep
+ def sim_burst_drop(self, msg):
+ # Check if dropping is required
+ if self.burst_drop_amount == 0:
+ return False
+
+ if msg.fn % self.burst_drop_period == 0:
+ log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"
+ % (self, msg.fn, self.burst_drop_period))
+ self.burst_drop_amount -= 1
+ return True
+
+ return False
+
+ def _handle_data_msg_v1(self, src_msg, msg):
+ # C/I (Carrier-to-Interference ratio)
+ msg.ci = self.ci
+
+ # Pick modulation type by burst length
+ bl = len(src_msg.burst)
+ msg.mod_type = Modulation.pick_by_bl(bl)
+
+ # Pick TSC (Training Sequence Code) and TSC set
+ if msg.mod_type is Modulation.ModGMSK:
+ ss = TrainingSeqGMSK.pick(src_msg.burst)
+ msg.tsc = ss.tsc if ss is not None else 0
+ msg.tsc_set = ss.tsc_set if ss is not None else 0
+ else: # TODO: other modulation types (at least 8-PSK)
+ msg.tsc_set = 0
+ msg.tsc = 0
+
+ # Takes (partially initialized) TRXD Rx message,
+ # simulates RF path parameters (such as RSSI),
+ # and sends towards the L1
+ def handle_data_msg(self, src_trx, src_msg, msg):
+ if self.rf_muted:
+ msg.nope_ind = True
+ elif not msg.nope_ind:
+ # Path loss simulation
+ msg.nope_ind = self.sim_burst_drop(msg)
+ if msg.nope_ind:
+ # Before TRXDv1, we simply drop the message
+ if msg.ver < 0x01:
+ del msg
+ return
+
+ # Since TRXDv1, we should send a NOPE.ind
+ msg.burst = None # burst bits are omited
+
+ # TODO: shoud we make these values configurable?
+ msg.toa256 = self.TOA256_NOISE_DEFAULT
+ msg.rssi = self.RSSI_NOISE_DEFAULT
+ msg.ci = self.CI_NOISE_DEFAULT
+
+ self.data_if.send_msg(msg)
+ return
+
+ # Complete message header
+ msg.toa256 = self.toa256
+
+ # Apply RSSI based on transmitter:
+ if not self.fake_rssi_enabled:
+ msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT
+ else: # Apply fake RSSI
+ msg.rssi = self.rssi
+
+ # Version specific fields
+ if msg.ver >= 0x01:
+ self._handle_data_msg_v1(src_msg, msg)
+
+ # Apply optional Timing Advance
+ if src_trx.ta != 0:
+ msg.toa256 -= src_trx.ta * 256
+
+ Transceiver.handle_data_msg(self, msg)
+
+ # Simulation specific CTRL command handler
+ def ctrl_cmd_handler(self, request):
+ # Timing Advance
+ # Syntax: CMD SETTA <TA>
+ if self.ctrl_if.verify_cmd(request, "SETTA", 1):
+ log.debug("(%s) Recv SETTA cmd" % self)
+
+ # Store indicated value
+ self.ta = int(request[1])
+ return 0
+
+ # Timing of Arrival simulation
+ # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
+ log.debug("(%s) Recv FAKE_TOA cmd" % self)
+
+ # Parse and apply both base and threshold
+ self.toa256_base = int(request[1])
+ self.toa256_rand_threshold = int(request[2])
+ return 0
+
+ # Timing of Arrival simulation
+ # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
+ log.debug("(%s) Recv FAKE_TOA cmd" % self)
+
+ # Parse and apply delta
+ self.toa256_base += int(request[1])
+ return 0
+
+ # RSSI simulation
+ # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
+ log.debug("(%s) Recv FAKE_RSSI cmd" % self)
+
+ # Use negative threshold to disable fake_rssi if previously enabled:
+ if int(request[2]) < 0:
+ self.fake_rssi_enabled = False
+ return 0
+
+ # Parse and apply both base and threshold
+ self.rssi_base = int(request[1])
+ self.rssi_rand_threshold = int(request[2])
+ self.fake_rssi_enabled = True
+ return 0
+
+ # RSSI simulation
+ # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
+ log.debug("(%s) Recv FAKE_RSSI cmd" % self)
+
+ # Parse and apply delta
+ self.rssi_base += int(request[1])
+ return 0
+
+ # C/I simulation
+ # Absolute form: CMD FAKE_CI <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 2):
+ log.debug("(%s) Recv FAKE_CI cmd" % self)
+
+ # Parse and apply both base and threshold
+ self.ci_base = int(request[1])
+ self.ci_rand_threshold = int(request[2])
+ return 0
+
+ # C/I simulation
+ # Relative form: CMD FAKE_CI <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 1):
+ log.debug("(%s) Recv FAKE_CI cmd" % self)
+
+ # Parse and apply delta
+ self.ci_base += int(request[1])
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT>
+ # Dropping pattern: fn % 1 == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
+ log.debug("(%s) Recv FAKE_DROP cmd" % self)
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("(%s) FAKE_DROP amount shall not "
+ "be negative" % self)
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = 1
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
+ # Dropping pattern: fn % period == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
+ log.debug("(%s) Recv FAKE_DROP cmd" % self)
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("(%s) FAKE_DROP amount shall not "
+ "be negative" % self)
+ return -1
+
+ # Parse / validate period
+ period = int(request[2])
+ if period <= 0:
+ log.error("(%s) FAKE_DROP period shall "
+ "be greater than zero" % self)
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = period
+ return 0
+
+ # Artificial delay for the TRXC interface
+ # Syntax: CMD FAKE_TRXC_DELAY <DELAY_MS>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TRXC_DELAY", 1):
+ log.debug("(%s) Recv FAKE_TRXC_DELAY cmd", self)
+
+ self.ctrl_if.rsp_delay_ms = int(request[1])
+ log.info("(%s) Artificial TRXC delay set to %d",
+ self, self.ctrl_if.rsp_delay_ms)
+
+ # Unhandled command
+ return None
+
+
+# Runner organizes execution of several FakeTRX instances with common clock.
+class Runner:
+ def __init__(self, clck_gen, trx_list):
+ self.clck_gen = clck_gen
+ self.trx_list = trx_list.trx_list
+
+ # This method will be called on each TDMA frame
+ self.clck_gen.clck_handler = self.clck_handler
+
+ self.burst_fwd = BurstForwarder(trx_list.trx_list)
+
+ # This method will be called by the clock generator
+ def clck_handler(self, fn):
+ # We assume that this list is immutable at run-time
+ for trx in self.trx_list:
+ trx.clck_tick(self.burst_fwd, fn)
+
+ # loops runs IO loop on specified CLCKGen and TRXes forever.
+ def loop(self):
+ sock_list = [self.clck_gen._timerfd]
+ for trx in self.trx_list:
+ sock_list.append(trx.ctrl_if.sock)
+ sock_list.append(trx.data_if.sock)
+
+ # Enter main loop
+ while True:
+ # Wait until we get any data on any socket
+ r_event, _, _ = select.select(sock_list, [], [])
+
+ # clock is priority
+ if self.clck_gen._timerfd in r_event:
+ self.clck_gen.tick()
+
+ # Iterate over all transceivers
+ for trx in self.trx_list:
+ # DATA interface
+ if trx.data_if.sock in r_event:
+ trx.recv_data_msg()
+
+ # CTRL interface
+ if trx.ctrl_if.sock in r_event:
+ trx.ctrl_if.handle_rx()
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 9bf9efd..e72a52b 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -23,409 +23,15 @@
import logging as log
import signal
import argparse
-import random
-import select
import sys
import re
import os
from app_common import ApplicationBase
-from burst_fwd import BurstForwarder
-from transceiver import Transceiver
-from data_msg import Modulation
from clck_gen import CLCKGen
from trx_list import TRXList
from fake_pm import FakePM
-from gsm_shared import *
-
-class FakeTRX(Transceiver):
- """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA)
simulation.
-
- == ToA / RSSI measurement simulation
-
- Since this is a virtual environment, we can simulate different
- parameters of the physical RF interface:
-
- - ToA (Timing of Arrival) - measured difference between expected
- and actual time of burst arrival in units of 1/256 of GSM symbol
- periods. A pair of both base and threshold values defines a range
- of ToA value randomization:
-
- from (toa256_base - toa256_rand_threshold)
- to (toa256_base + toa256_rand_threshold).
-
- - RSSI (Received Signal Strength Indication) - measured "power" of
- the signal (per burst) in dBm. A pair of both base and threshold
- values defines a range of RSSI value randomization:
-
- from (rssi_base - rssi_rand_threshold)
- to (rssi_base + rssi_rand_threshold).
-
- - C/I (Carrier-to-Interference ratio) - value in cB (centiBels),
- computed from the training sequence of each received burst, by
- comparing the "ideal" training sequence with the actual one.
- A pair of both base and threshold values defines a range of
- C/I randomization:
-
- from (ci_base - ci_rand_threshold)
- to (ci_base + ci_rand_threshold).
-
- Please note that the randomization is optional and disabled by default.
-
- == Timing Advance handling
-
- The BTS is using ToA measurements for UL bursts in order to calculate
- Timing Advance value, that is then indicated to a MS, which in its turn
- shall apply this value to the transmitted signal in order to compensate
- the delay. Basically, every burst is transmitted in advance defined by
- the indicated Timing Advance value. The valid range is 0..63, where
- each unit means one GSM symbol advance. The actual Timing Advance value
- is set using SETTA control command from MS. By default, it's set to 0.
-
- == Path loss simulation
-
- === Burst dropping
-
- In some cases, e.g. due to a weak signal or high interference, a burst
- can be lost, i.e. not detected by the receiver. This can also be
- simulated using FAKE_DROP command on the control interface:
-
- - burst_drop_amount - the amount of DL/UL bursts
- to be dropped (i.e. not forwarded towards the MS/BTS),
-
- - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
-
- == Configuration
-
- All simulation parameters mentioned above can be changed at runtime
- using the commands with prefix 'FAKE_' on the control interface.
- All of them are handled by our custom CTRL command handler.
-
- """
-
- NOMINAL_TX_POWER_DEFAULT = 50 # dBm
- TX_ATT_DEFAULT = 0 # dB
- PATH_LOSS_DEFAULT = 110 # dB
-
- TOA256_BASE_DEFAULT = 0
- CI_BASE_DEFAULT = 90
-
- # Default values for NOPE / IDLE indications
- TOA256_NOISE_DEFAULT = 0
- RSSI_NOISE_DEFAULT = -110
- CI_NOISE_DEFAULT = -30
-
- def __init__(self, *trx_args, **trx_kwargs):
- Transceiver.__init__(self, *trx_args, **trx_kwargs)
-
- # fake RSSI is disabled by default, only enabled through TRXC FAKE_RSSI.
- # When disabled, RSSI is calculated based on Tx power and Rx path loss
- self.fake_rssi_enabled = False
-
- self.rf_muted = False
-
- # Actual ToA, RSSI, C/I, TA values
- self.tx_power_base = self.NOMINAL_TX_POWER_DEFAULT
- self.tx_att_base = self.TX_ATT_DEFAULT
- self.toa256_base = self.TOA256_BASE_DEFAULT
- self.rssi_base = self.NOMINAL_TX_POWER_DEFAULT - self.TX_ATT_DEFAULT -
self.PATH_LOSS_DEFAULT
- self.ci_base = self.CI_BASE_DEFAULT
- self.ta = 0
-
- # ToA, RSSI, C/I randomization thresholds
- self.toa256_rand_threshold = 0
- self.rssi_rand_threshold = 0
- self.ci_rand_threshold = 0
-
- # Path loss simulation (burst dropping)
- self.burst_drop_amount = 0
- self.burst_drop_period = 1
-
- @property
- def toa256(self):
- # Check if randomization is required
- if self.toa256_rand_threshold == 0:
- return self.toa256_base
-
- # Generate a random ToA value in required range
- toa256_min = self.toa256_base - self.toa256_rand_threshold
- toa256_max = self.toa256_base + self.toa256_rand_threshold
- return random.randint(toa256_min, toa256_max)
-
- @property
- def rssi(self):
- # Check if randomization is required
- if self.rssi_rand_threshold == 0:
- return self.rssi_base
-
- # Generate a random RSSI value in required range
- rssi_min = self.rssi_base - self.rssi_rand_threshold
- rssi_max = self.rssi_base + self.rssi_rand_threshold
- return random.randint(rssi_min, rssi_max)
-
- @property
- def tx_power(self):
- return self.tx_power_base - self.tx_att_base
-
- @property
- def ci(self):
- # Check if randomization is required
- if self.ci_rand_threshold == 0:
- return self.ci_base
-
- # Generate a random C/I value in required range
- ci_min = self.ci_base - self.ci_rand_threshold
- ci_max = self.ci_base + self.ci_rand_threshold
- return random.randint(ci_min, ci_max)
-
- # Path loss simulation: burst dropping
- # Returns: True - drop, False - keep
- def sim_burst_drop(self, msg):
- # Check if dropping is required
- if self.burst_drop_amount == 0:
- return False
-
- if msg.fn % self.burst_drop_period == 0:
- log.info("(%s) Simulation: dropping burst (fn=%u %% %u == 0)"
- % (self, msg.fn, self.burst_drop_period))
- self.burst_drop_amount -= 1
- return True
-
- return False
-
- def _handle_data_msg_v1(self, src_msg, msg):
- # C/I (Carrier-to-Interference ratio)
- msg.ci = self.ci
-
- # Pick modulation type by burst length
- bl = len(src_msg.burst)
- msg.mod_type = Modulation.pick_by_bl(bl)
-
- # Pick TSC (Training Sequence Code) and TSC set
- if msg.mod_type is Modulation.ModGMSK:
- ss = TrainingSeqGMSK.pick(src_msg.burst)
- msg.tsc = ss.tsc if ss is not None else 0
- msg.tsc_set = ss.tsc_set if ss is not None else 0
- else: # TODO: other modulation types (at least 8-PSK)
- msg.tsc_set = 0
- msg.tsc = 0
-
- # Takes (partially initialized) TRXD Rx message,
- # simulates RF path parameters (such as RSSI),
- # and sends towards the L1
- def handle_data_msg(self, src_trx, src_msg, msg):
- if self.rf_muted:
- msg.nope_ind = True
- elif not msg.nope_ind:
- # Path loss simulation
- msg.nope_ind = self.sim_burst_drop(msg)
- if msg.nope_ind:
- # Before TRXDv1, we simply drop the message
- if msg.ver < 0x01:
- del msg
- return
-
- # Since TRXDv1, we should send a NOPE.ind
- msg.burst = None # burst bits are omited
-
- # TODO: shoud we make these values configurable?
- msg.toa256 = self.TOA256_NOISE_DEFAULT
- msg.rssi = self.RSSI_NOISE_DEFAULT
- msg.ci = self.CI_NOISE_DEFAULT
-
- self.data_if.send_msg(msg)
- return
-
- # Complete message header
- msg.toa256 = self.toa256
-
- # Apply RSSI based on transmitter:
- if not self.fake_rssi_enabled:
- msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT
- else: # Apply fake RSSI
- msg.rssi = self.rssi
-
- # Version specific fields
- if msg.ver >= 0x01:
- self._handle_data_msg_v1(src_msg, msg)
-
- # Apply optional Timing Advance
- if src_trx.ta != 0:
- msg.toa256 -= src_trx.ta * 256
-
- Transceiver.handle_data_msg(self, msg)
-
- # Simulation specific CTRL command handler
- def ctrl_cmd_handler(self, request):
- # Timing Advance
- # Syntax: CMD SETTA <TA>
- if self.ctrl_if.verify_cmd(request, "SETTA", 1):
- log.debug("(%s) Recv SETTA cmd" % self)
-
- # Store indicated value
- self.ta = int(request[1])
- return 0
-
- # Timing of Arrival simulation
- # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
- elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
- log.debug("(%s) Recv FAKE_TOA cmd" % self)
-
- # Parse and apply both base and threshold
- self.toa256_base = int(request[1])
- self.toa256_rand_threshold = int(request[2])
- return 0
-
- # Timing of Arrival simulation
- # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
- elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
- log.debug("(%s) Recv FAKE_TOA cmd" % self)
-
- # Parse and apply delta
- self.toa256_base += int(request[1])
- return 0
-
- # RSSI simulation
- # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
- elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
- log.debug("(%s) Recv FAKE_RSSI cmd" % self)
-
- # Use negative threshold to disable fake_rssi if previously enabled:
- if int(request[2]) < 0:
- self.fake_rssi_enabled = False
- return 0
-
- # Parse and apply both base and threshold
- self.rssi_base = int(request[1])
- self.rssi_rand_threshold = int(request[2])
- self.fake_rssi_enabled = True
- return 0
-
- # RSSI simulation
- # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
- elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
- log.debug("(%s) Recv FAKE_RSSI cmd" % self)
-
- # Parse and apply delta
- self.rssi_base += int(request[1])
- return 0
-
- # C/I simulation
- # Absolute form: CMD FAKE_CI <BASE> <THRESH>
- elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 2):
- log.debug("(%s) Recv FAKE_CI cmd" % self)
-
- # Parse and apply both base and threshold
- self.ci_base = int(request[1])
- self.ci_rand_threshold = int(request[2])
- return 0
-
- # C/I simulation
- # Relative form: CMD FAKE_CI <+-BASE_DELTA>
- elif self.ctrl_if.verify_cmd(request, "FAKE_CI", 1):
- log.debug("(%s) Recv FAKE_CI cmd" % self)
-
- # Parse and apply delta
- self.ci_base += int(request[1])
- return 0
-
- # Path loss simulation: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT>
- # Dropping pattern: fn % 1 == 0
- elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
- log.debug("(%s) Recv FAKE_DROP cmd" % self)
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("(%s) FAKE_DROP amount shall not "
- "be negative" % self)
- return -1
-
- self.burst_drop_amount = num
- self.burst_drop_period = 1
- return 0
-
- # Path loss simulation: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
- # Dropping pattern: fn % period == 0
- elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
- log.debug("(%s) Recv FAKE_DROP cmd" % self)
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("(%s) FAKE_DROP amount shall not "
- "be negative" % self)
- return -1
-
- # Parse / validate period
- period = int(request[2])
- if period <= 0:
- log.error("(%s) FAKE_DROP period shall "
- "be greater than zero" % self)
- return -1
-
- self.burst_drop_amount = num
- self.burst_drop_period = period
- return 0
-
- # Artificial delay for the TRXC interface
- # Syntax: CMD FAKE_TRXC_DELAY <DELAY_MS>
- elif self.ctrl_if.verify_cmd(request, "FAKE_TRXC_DELAY", 1):
- log.debug("(%s) Recv FAKE_TRXC_DELAY cmd", self)
-
- self.ctrl_if.rsp_delay_ms = int(request[1])
- log.info("(%s) Artificial TRXC delay set to %d",
- self, self.ctrl_if.rsp_delay_ms)
-
- # Unhandled command
- return None
-
-
-# Runner organizes execution of several FakeTRX instances with common clock.
-class Runner:
- def __init__(self, clck_gen, trx_list):
- self.clck_gen = clck_gen
- self.trx_list = trx_list.trx_list
-
- # This method will be called on each TDMA frame
- self.clck_gen.clck_handler = self.clck_handler
-
- self.burst_fwd = BurstForwarder(trx_list.trx_list)
-
- # This method will be called by the clock generator
- def clck_handler(self, fn):
- # We assume that this list is immutable at run-time
- for trx in self.trx_list:
- trx.clck_tick(self.burst_fwd, fn)
-
- # loops runs IO loop on specified CLCKGen and TRXes forever.
- def loop(self):
- sock_list = [self.clck_gen._timerfd]
- for trx in self.trx_list:
- sock_list.append(trx.ctrl_if.sock)
- sock_list.append(trx.data_if.sock)
-
- # Enter main loop
- while True:
- # Wait until we get any data on any socket
- r_event, _, _ = select.select(sock_list, [], [])
-
- # clock is priority
- if self.clck_gen._timerfd in r_event:
- self.clck_gen.tick()
-
- # Iterate over all transceivers
- for trx in self.trx_list:
- # DATA interface
- if trx.data_if.sock in r_event:
- trx.recv_data_msg()
-
- # CTRL interface
- if trx.ctrl_if.sock in r_event:
- trx.ctrl_if.handle_rx()
-
+from _fake_trx import FakeTRX, Runner
class Application(ApplicationBase):
def __init__(self):
--
To view, visit
https://gerrit.osmocom.org/c/osmocom-bb/+/40051?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: osmocom-bb
Gerrit-Branch: master
Gerrit-Change-Id: Iadffd49de8197564e57bfd9cb660b1d11136ffd4
Gerrit-Change-Number: 40051
Gerrit-PatchSet: 1
Gerrit-Owner: kirr <kirr(a)nexedi.com>
Gerrit-CC: fixeria <vyanitskiy(a)sysmocom.de>
Gerrit-CC: osmith <osmith(a)sysmocom.de>
Gerrit-CC: pespin <pespin(a)sysmocom.de>