summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/fake_trx.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/target/trx_toolkit/fake_trx.py')
-rwxr-xr-xsrc/target/trx_toolkit/fake_trx.py306
1 files changed, 256 insertions, 50 deletions
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 95261dfe..46b413aa 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -27,17 +27,237 @@ APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
import logging as log
import signal
import argparse
+import random
import select
import sys
from app_common import ApplicationBase
-from ctrl_if_bts import CTRLInterfaceBTS
-from ctrl_if_bb import CTRLInterfaceBB
from burst_fwd import BurstForwarder
+from transceiver import Transceiver
+from clck_gen import CLCKGen
from fake_pm import FakePM
-from udp_link import UDPLink
-from clck_gen import CLCKGen
+class FakeTRX(Transceiver):
+ """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
+
+ == ToA / RSSI measurement simulation
+
+ Since this is a virtual environment, we can simulate different
+ parameters of the physical RF interface:
+
+ - ToA (Timing of Arrival) - measured difference between expected
+ and actual time of burst arrival in units of 1/256 of GSM symbol
+ periods. A pair of both base and threshold values defines a range
+ of ToA value randomization:
+
+ from (toa256_base - toa256_rand_threshold)
+ to (toa256_base + toa256_rand_threshold).
+
+ - RSSI (Received Signal Strength Indication) - measured "power" of
+ the signal (per burst) in dBm. A pair of both base and threshold
+ values defines a range of RSSI value randomization:
+
+ from (rssi_base - rssi_rand_threshold)
+ to (rssi_base + rssi_rand_threshold).
+
+ Please note that randomization of both RSSI and ToA is optional,
+ and can be enabled from the control interface.
+
+ == Timing Advance handling
+
+ The BTS is using ToA measurements for UL bursts in order to calculate
+ Timing Advance value, that is then indicated to a MS, which in its turn
+ shall apply this value to the transmitted signal in order to compensate
+ the delay. Basically, every burst is transmitted in advance defined by
+ the indicated Timing Advance value. The valid range is 0..63, where
+ each unit means one GSM symbol advance. The actual Timing Advance value
+ is set using SETTA control command from MS. By default, it's set to 0.
+
+ == Path loss simulation
+
+ === Burst dropping
+
+ In some cases, e.g. due to a weak signal or high interference, a burst
+ can be lost, i.e. not detected by the receiver. This can also be
+ simulated using FAKE_DROP command on the control interface:
+
+ - burst_drop_amount - the amount of DL/UL bursts
+ to be dropped (i.e. not forwarded towards the MS/BTS),
+
+ - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
+
+ == Configuration
+
+ All simulation parameters mentioned above can be changed at runtime
+ using the commands with prefix 'FAKE_' on the control interface.
+ All of them are handled by our custom CTRL command handler.
+
+ """
+
+ TOA256_BASE_DEFAULT = 0
+ RSSI_BASE_DEFAULT = -60
+
+ def __init__(self, *trx_args, **trx_kwargs):
+ Transceiver.__init__(self, *trx_args, **trx_kwargs)
+
+ # Actual ToA / RSSI / TA values
+ self.toa256_base = self.TOA256_BASE_DEFAULT
+ self.rssi_base = self.RSSI_BASE_DEFAULT
+ self.ta = 0
+
+ # ToA / RSSI randomization threshold
+ self.toa256_rand_threshold = 0
+ self.rssi_rand_threshold = 0
+
+ # Path loss simulation (burst dropping)
+ self.burst_drop_amount = 0
+ self.burst_drop_period = 1
+
+ @property
+ def toa256(self):
+ # Check if randomization is required
+ if self.toa256_rand_threshold is 0:
+ return self.toa256_base
+
+ # Generate a random ToA value in required range
+ toa256_min = self.toa256_base - self.toa256_rand_threshold
+ toa256_max = self.toa256_base + self.toa256_rand_threshold
+ return random.randint(toa256_min, toa256_max)
+
+ @property
+ def rssi(self):
+ # Check if randomization is required
+ if self.rssi_rand_threshold is 0:
+ return self.rssi_base
+
+ # Generate a random RSSI value in required range
+ rssi_min = self.rssi_base - self.rssi_rand_threshold
+ rssi_max = self.rssi_base + self.rssi_rand_threshold
+ return random.randint(rssi_min, rssi_max)
+
+ # Path loss simulation: burst dropping
+ # Returns: True - drop, False - keep
+ def sim_burst_drop(self, msg):
+ # Check if dropping is required
+ if self.burst_drop_amount is 0:
+ return False
+
+ if msg.fn % self.burst_drop_period == 0:
+ log.info("Simulation: dropping burst (fn=%u %% %u == 0)"
+ % (msg.fn, self.burst_drop_period))
+ self.burst_drop_amount -= 1
+ return True
+
+ return False
+
+ # Takes (partially initialized) TRX2L1 message,
+ # simulates RF path parameters (such as RSSI),
+ # and sends towards the L1
+ def send_data_msg(self, src_trx, msg):
+ # Complete message header
+ msg.toa256 = self.toa256
+ msg.rssi = self.rssi
+
+ # Apply optional Timing Advance
+ if src_trx.ta is not 0:
+ msg.toa256 -= src_trx.ta * 256
+
+ # Path loss simulation
+ if self.sim_burst_drop(msg):
+ return
+
+ # TODO: make legacy mode configurable (via argv?)
+ self.data_if.send_msg(msg, legacy = True)
+
+ # Simulation specific CTRL command handler
+ def ctrl_cmd_handler(self, request):
+ # Timing Advance
+ # Syntax: CMD SETTA <TA>
+ if self.ctrl_if.verify_cmd(request, "SETTA", 1):
+ log.debug("Recv SETTA cmd")
+
+ # Store indicated value
+ self.ta = int(request[1])
+ return 0
+
+ # Timing of Arrival simulation
+ # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply both base and threshold
+ self.toa256_base = int(request[1])
+ self.toa256_rand_threshold = int(request[2])
+ return 0
+
+ # Timing of Arrival simulation
+ # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply delta
+ self.toa256_base += int(request[1])
+ return 0
+
+ # RSSI simulation
+ # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply both base and threshold
+ self.rssi_base = int(request[1])
+ self.rssi_rand_threshold = int(request[2])
+ return 0
+
+ # RSSI simulation
+ # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply delta
+ self.rssi_base += int(request[1])
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT>
+ # Dropping pattern: fn % 1 == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = 1
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
+ # Dropping pattern: fn % period == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ # Parse / validate period
+ period = int(request[2])
+ if period <= 0:
+ log.error("FAKE_DROP period shall be greater than zero")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = period
+ return 0
+
+ # Unhandled command
+ return None
class Application(ApplicationBase):
def __init__(self):
@@ -51,72 +271,58 @@ class Application(ApplicationBase):
self.app_init_logging(self.argv)
def run(self):
- # Init TRX CTRL interface for BTS
- self.bts_ctrl = CTRLInterfaceBTS(
- self.argv.bts_addr, self.argv.bts_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 1)
-
- # Init TRX CTRL interface for BB
- self.bb_ctrl = CTRLInterfaceBB(
- self.argv.bb_addr, self.argv.bb_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 1)
+ # Init shared clock generator
+ self.clck_gen = CLCKGen([])
# Power measurement emulation
# Noise: -120 .. -105
# BTS: -75 .. -50
- self.pm = FakePM(-120, -105, -75, -50)
-
- # Share a FakePM instance between both BTS and BB
- self.bts_ctrl.pm = self.pm
- self.bb_ctrl.pm = self.pm
-
- # Init DATA links
- self.bts_data = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 2)
- self.bb_data = UDPLink(
- self.argv.bb_addr, self.argv.bb_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 2)
-
- # BTS <-> BB burst forwarding
- self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data)
-
- # Share a BurstForwarder instance between BTS and BB
- self.bts_ctrl.burst_fwd = self.burst_fwd
- self.bb_ctrl.burst_fwd = self.burst_fwd
-
- # Provide clock to BTS
- self.bts_clck = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 100,
- self.argv.trx_bind_addr, self.argv.bts_base_port)
- self.clck_gen = CLCKGen([self.bts_clck])
- self.bts_ctrl.clck_gen = self.clck_gen
+ self.fake_pm = FakePM(-120, -105, -75, -50)
+
+ # Init TRX instance for BTS
+ self.bts_trx = FakeTRX(self.argv.trx_bind_addr,
+ self.argv.bts_addr, self.argv.bts_base_port,
+ clck_gen = self.clck_gen)
+
+ # Init TRX instance for BB
+ self.bb_trx = FakeTRX(self.argv.trx_bind_addr,
+ self.argv.bb_addr, self.argv.bb_base_port,
+ pwr_meas = self.fake_pm)
+
+ # Burst forwarding between transceivers
+ self.burst_fwd = BurstForwarder()
+ self.burst_fwd.add_trx(self.bts_trx)
+ self.burst_fwd.add_trx(self.bb_trx)
log.info("Init complete")
# Enter main loop
while True:
- socks = [self.bts_ctrl.sock, self.bb_ctrl.sock,
- self.bts_data.sock, self.bb_data.sock]
+ socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock,
+ self.bts_trx.data_if.sock, self.bb_trx.data_if.sock]
# Wait until we get any data on any socket
r_event, w_event, x_event = select.select(socks, [], [])
# Downlink: BTS -> BB
- if self.bts_data.sock in r_event:
- self.burst_fwd.bts2bb()
+ if self.bts_trx.data_if.sock in r_event:
+ msg = self.bts_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bts_trx, msg)
# Uplink: BB -> BTS
- if self.bb_data.sock in r_event:
- self.burst_fwd.bb2bts()
+ if self.bb_trx.data_if.sock in r_event:
+ msg = self.bb_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bb_trx, msg)
# CTRL commands from BTS
- if self.bts_ctrl.sock in r_event:
- self.bts_ctrl.handle_rx()
+ if self.bts_trx.ctrl_if.sock in r_event:
+ self.bts_trx.ctrl_if.handle_rx()
# CTRL commands from BB
- if self.bb_ctrl.sock in r_event:
- self.bb_ctrl.handle_rx()
+ if self.bb_trx.ctrl_if.sock in r_event:
+ self.bb_trx.ctrl_if.handle_rx()
def shutdown(self):
log.info("Shutting down...")