summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/fake_trx.py
diff options
context:
space:
mode:
authorVadim Yanitskiy <axilirator@gmail.com>2018-12-10 17:39:51 +0700
committerVadim Yanitskiy <axilirator@gmail.com>2018-12-18 05:47:11 +0700
commit152a2da8d21d7a10d217e3adb51f57ca4e848d82 (patch)
tree89c24746ccaaf1e82f80d59f1aeccea006e028ff /src/target/trx_toolkit/fake_trx.py
parent1197382e8db5b52fa001dc16ec76389ccf1b6f53 (diff)
trx_toolkit/fake_trx.py: refactor global class hierarchy
This change is a big step towards handling of multiple transceivers in a single process, i.e. multiple MS and multiple BTS connections. The old class hierarchy wasn't flexible enough, because initially fake_trx was designed as a bridge between OsmocomBB and OsmoBTS, but not as the burst router. There were two separate, but 90% similar implementations of the CTRL interface, two variations of each simulation parameter - one for UL, another for DL. The following new classes are introduced: - Transceiver - represents a single transceiver, that can be used as for the BTS side, as for the MS side. Each instance has its own CTRL, DATA, and (optionally) CLCK interfaces, among with basic state variables, such as both RX / TX freq., power state (running or idle) and list of active timeslots. - CTRLInterfaceTRX - unified control interface handler for common transceiver management commands, such as POWERON, RXTUNE, and SETSLOT. Deprecates both CTRLInterface{BB|BTS}. - FakeTRX - basically, a child of Transceiver, extended with RF path (burst loss, RSSI, TA, ToA) simulation. Implements a custom CTRL command handler for CTRLInterfaceTRX. The following classes were refactored: - BurstForwarder - still performs burst forwarding, but now it doesn't store any simulation parameters, and doesn't know who is BTS, and who is MS. Actually, BurstForwarder transforms each L12TRX message into a TRX2L1 message, and dispatches it between running transceivers with matching RX frequency and matching timeslot. - FakePM - still generates random RSSI values, but doesn't distinguish between MS and BTS anymore. As soon as a measurement request is received, it attempts to find at least one running TRX on a given frequency. Please note that fake_trx.py still does handle only a single pair of MS and BTS. No regressions have been observed. Both new and refactored classes were documented. Change-Id: Ice44e2b22566b3652ef6d43896055963b13ab185 Related: OS#3667
Diffstat (limited to 'src/target/trx_toolkit/fake_trx.py')
-rwxr-xr-xsrc/target/trx_toolkit/fake_trx.py306
1 files changed, 256 insertions, 50 deletions
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 95261dfe..46b413aa 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -27,17 +27,237 @@ APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
import logging as log
import signal
import argparse
+import random
import select
import sys
from app_common import ApplicationBase
-from ctrl_if_bts import CTRLInterfaceBTS
-from ctrl_if_bb import CTRLInterfaceBB
from burst_fwd import BurstForwarder
+from transceiver import Transceiver
+from clck_gen import CLCKGen
from fake_pm import FakePM
-from udp_link import UDPLink
-from clck_gen import CLCKGen
+class FakeTRX(Transceiver):
+ """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
+
+ == ToA / RSSI measurement simulation
+
+ Since this is a virtual environment, we can simulate different
+ parameters of the physical RF interface:
+
+ - ToA (Timing of Arrival) - measured difference between expected
+ and actual time of burst arrival in units of 1/256 of GSM symbol
+ periods. A pair of both base and threshold values defines a range
+ of ToA value randomization:
+
+ from (toa256_base - toa256_rand_threshold)
+ to (toa256_base + toa256_rand_threshold).
+
+ - RSSI (Received Signal Strength Indication) - measured "power" of
+ the signal (per burst) in dBm. A pair of both base and threshold
+ values defines a range of RSSI value randomization:
+
+ from (rssi_base - rssi_rand_threshold)
+ to (rssi_base + rssi_rand_threshold).
+
+ Please note that randomization of both RSSI and ToA is optional,
+ and can be enabled from the control interface.
+
+ == Timing Advance handling
+
+ The BTS is using ToA measurements for UL bursts in order to calculate
+ Timing Advance value, that is then indicated to a MS, which in its turn
+ shall apply this value to the transmitted signal in order to compensate
+ the delay. Basically, every burst is transmitted in advance defined by
+ the indicated Timing Advance value. The valid range is 0..63, where
+ each unit means one GSM symbol advance. The actual Timing Advance value
+ is set using SETTA control command from MS. By default, it's set to 0.
+
+ == Path loss simulation
+
+ === Burst dropping
+
+ In some cases, e.g. due to a weak signal or high interference, a burst
+ can be lost, i.e. not detected by the receiver. This can also be
+ simulated using FAKE_DROP command on the control interface:
+
+ - burst_drop_amount - the amount of DL/UL bursts
+ to be dropped (i.e. not forwarded towards the MS/BTS),
+
+ - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
+
+ == Configuration
+
+ All simulation parameters mentioned above can be changed at runtime
+ using the commands with prefix 'FAKE_' on the control interface.
+ All of them are handled by our custom CTRL command handler.
+
+ """
+
+ TOA256_BASE_DEFAULT = 0
+ RSSI_BASE_DEFAULT = -60
+
+ def __init__(self, *trx_args, **trx_kwargs):
+ Transceiver.__init__(self, *trx_args, **trx_kwargs)
+
+ # Actual ToA / RSSI / TA values
+ self.toa256_base = self.TOA256_BASE_DEFAULT
+ self.rssi_base = self.RSSI_BASE_DEFAULT
+ self.ta = 0
+
+ # ToA / RSSI randomization threshold
+ self.toa256_rand_threshold = 0
+ self.rssi_rand_threshold = 0
+
+ # Path loss simulation (burst dropping)
+ self.burst_drop_amount = 0
+ self.burst_drop_period = 1
+
+ @property
+ def toa256(self):
+ # Check if randomization is required
+ if self.toa256_rand_threshold is 0:
+ return self.toa256_base
+
+ # Generate a random ToA value in required range
+ toa256_min = self.toa256_base - self.toa256_rand_threshold
+ toa256_max = self.toa256_base + self.toa256_rand_threshold
+ return random.randint(toa256_min, toa256_max)
+
+ @property
+ def rssi(self):
+ # Check if randomization is required
+ if self.rssi_rand_threshold is 0:
+ return self.rssi_base
+
+ # Generate a random RSSI value in required range
+ rssi_min = self.rssi_base - self.rssi_rand_threshold
+ rssi_max = self.rssi_base + self.rssi_rand_threshold
+ return random.randint(rssi_min, rssi_max)
+
+ # Path loss simulation: burst dropping
+ # Returns: True - drop, False - keep
+ def sim_burst_drop(self, msg):
+ # Check if dropping is required
+ if self.burst_drop_amount is 0:
+ return False
+
+ if msg.fn % self.burst_drop_period == 0:
+ log.info("Simulation: dropping burst (fn=%u %% %u == 0)"
+ % (msg.fn, self.burst_drop_period))
+ self.burst_drop_amount -= 1
+ return True
+
+ return False
+
+ # Takes (partially initialized) TRX2L1 message,
+ # simulates RF path parameters (such as RSSI),
+ # and sends towards the L1
+ def send_data_msg(self, src_trx, msg):
+ # Complete message header
+ msg.toa256 = self.toa256
+ msg.rssi = self.rssi
+
+ # Apply optional Timing Advance
+ if src_trx.ta is not 0:
+ msg.toa256 -= src_trx.ta * 256
+
+ # Path loss simulation
+ if self.sim_burst_drop(msg):
+ return
+
+ # TODO: make legacy mode configurable (via argv?)
+ self.data_if.send_msg(msg, legacy = True)
+
+ # Simulation specific CTRL command handler
+ def ctrl_cmd_handler(self, request):
+ # Timing Advance
+ # Syntax: CMD SETTA <TA>
+ if self.ctrl_if.verify_cmd(request, "SETTA", 1):
+ log.debug("Recv SETTA cmd")
+
+ # Store indicated value
+ self.ta = int(request[1])
+ return 0
+
+ # Timing of Arrival simulation
+ # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply both base and threshold
+ self.toa256_base = int(request[1])
+ self.toa256_rand_threshold = int(request[2])
+ return 0
+
+ # Timing of Arrival simulation
+ # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply delta
+ self.toa256_base += int(request[1])
+ return 0
+
+ # RSSI simulation
+ # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply both base and threshold
+ self.rssi_base = int(request[1])
+ self.rssi_rand_threshold = int(request[2])
+ return 0
+
+ # RSSI simulation
+ # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply delta
+ self.rssi_base += int(request[1])
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT>
+ # Dropping pattern: fn % 1 == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = 1
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
+ # Dropping pattern: fn % period == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ # Parse / validate period
+ period = int(request[2])
+ if period <= 0:
+ log.error("FAKE_DROP period shall be greater than zero")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = period
+ return 0
+
+ # Unhandled command
+ return None
class Application(ApplicationBase):
def __init__(self):
@@ -51,72 +271,58 @@ class Application(ApplicationBase):
self.app_init_logging(self.argv)
def run(self):
- # Init TRX CTRL interface for BTS
- self.bts_ctrl = CTRLInterfaceBTS(
- self.argv.bts_addr, self.argv.bts_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 1)
-
- # Init TRX CTRL interface for BB
- self.bb_ctrl = CTRLInterfaceBB(
- self.argv.bb_addr, self.argv.bb_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 1)
+ # Init shared clock generator
+ self.clck_gen = CLCKGen([])
# Power measurement emulation
# Noise: -120 .. -105
# BTS: -75 .. -50
- self.pm = FakePM(-120, -105, -75, -50)
-
- # Share a FakePM instance between both BTS and BB
- self.bts_ctrl.pm = self.pm
- self.bb_ctrl.pm = self.pm
-
- # Init DATA links
- self.bts_data = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 2)
- self.bb_data = UDPLink(
- self.argv.bb_addr, self.argv.bb_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 2)
-
- # BTS <-> BB burst forwarding
- self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data)
-
- # Share a BurstForwarder instance between BTS and BB
- self.bts_ctrl.burst_fwd = self.burst_fwd
- self.bb_ctrl.burst_fwd = self.burst_fwd
-
- # Provide clock to BTS
- self.bts_clck = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 100,
- self.argv.trx_bind_addr, self.argv.bts_base_port)
- self.clck_gen = CLCKGen([self.bts_clck])
- self.bts_ctrl.clck_gen = self.clck_gen
+ self.fake_pm = FakePM(-120, -105, -75, -50)
+
+ # Init TRX instance for BTS
+ self.bts_trx = FakeTRX(self.argv.trx_bind_addr,
+ self.argv.bts_addr, self.argv.bts_base_port,
+ clck_gen = self.clck_gen)
+
+ # Init TRX instance for BB
+ self.bb_trx = FakeTRX(self.argv.trx_bind_addr,
+ self.argv.bb_addr, self.argv.bb_base_port,
+ pwr_meas = self.fake_pm)
+
+ # Burst forwarding between transceivers
+ self.burst_fwd = BurstForwarder()
+ self.burst_fwd.add_trx(self.bts_trx)
+ self.burst_fwd.add_trx(self.bb_trx)
log.info("Init complete")
# Enter main loop
while True:
- socks = [self.bts_ctrl.sock, self.bb_ctrl.sock,
- self.bts_data.sock, self.bb_data.sock]
+ socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock,
+ self.bts_trx.data_if.sock, self.bb_trx.data_if.sock]
# Wait until we get any data on any socket
r_event, w_event, x_event = select.select(socks, [], [])
# Downlink: BTS -> BB
- if self.bts_data.sock in r_event:
- self.burst_fwd.bts2bb()
+ if self.bts_trx.data_if.sock in r_event:
+ msg = self.bts_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bts_trx, msg)
# Uplink: BB -> BTS
- if self.bb_data.sock in r_event:
- self.burst_fwd.bb2bts()
+ if self.bb_trx.data_if.sock in r_event:
+ msg = self.bb_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bb_trx, msg)
# CTRL commands from BTS
- if self.bts_ctrl.sock in r_event:
- self.bts_ctrl.handle_rx()
+ if self.bts_trx.ctrl_if.sock in r_event:
+ self.bts_trx.ctrl_if.handle_rx()
# CTRL commands from BB
- if self.bb_ctrl.sock in r_event:
- self.bb_ctrl.handle_rx()
+ if self.bb_trx.ctrl_if.sock in r_event:
+ self.bb_trx.ctrl_if.handle_rx()
def shutdown(self):
log.info("Shutting down...")