diff options
Diffstat (limited to 'src/target/trx_toolkit/fake_trx.py')
-rwxr-xr-x | src/target/trx_toolkit/fake_trx.py | 306 |
1 files changed, 256 insertions, 50 deletions
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py index 95261dfe..46b413aa 100755 --- a/src/target/trx_toolkit/fake_trx.py +++ b/src/target/trx_toolkit/fake_trx.py @@ -27,17 +27,237 @@ APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")] import logging as log import signal import argparse +import random import select import sys from app_common import ApplicationBase -from ctrl_if_bts import CTRLInterfaceBTS -from ctrl_if_bb import CTRLInterfaceBB from burst_fwd import BurstForwarder +from transceiver import Transceiver +from clck_gen import CLCKGen from fake_pm import FakePM -from udp_link import UDPLink -from clck_gen import CLCKGen +class FakeTRX(Transceiver): + """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation. + + == ToA / RSSI measurement simulation + + Since this is a virtual environment, we can simulate different + parameters of the physical RF interface: + + - ToA (Timing of Arrival) - measured difference between expected + and actual time of burst arrival in units of 1/256 of GSM symbol + periods. A pair of both base and threshold values defines a range + of ToA value randomization: + + from (toa256_base - toa256_rand_threshold) + to (toa256_base + toa256_rand_threshold). + + - RSSI (Received Signal Strength Indication) - measured "power" of + the signal (per burst) in dBm. A pair of both base and threshold + values defines a range of RSSI value randomization: + + from (rssi_base - rssi_rand_threshold) + to (rssi_base + rssi_rand_threshold). + + Please note that randomization of both RSSI and ToA is optional, + and can be enabled from the control interface. + + == Timing Advance handling + + The BTS is using ToA measurements for UL bursts in order to calculate + Timing Advance value, that is then indicated to a MS, which in its turn + shall apply this value to the transmitted signal in order to compensate + the delay. Basically, every burst is transmitted in advance defined by + the indicated Timing Advance value. The valid range is 0..63, where + each unit means one GSM symbol advance. The actual Timing Advance value + is set using SETTA control command from MS. By default, it's set to 0. + + == Path loss simulation + + === Burst dropping + + In some cases, e.g. due to a weak signal or high interference, a burst + can be lost, i.e. not detected by the receiver. This can also be + simulated using FAKE_DROP command on the control interface: + + - burst_drop_amount - the amount of DL/UL bursts + to be dropped (i.e. not forwarded towards the MS/BTS), + + - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0. + + == Configuration + + All simulation parameters mentioned above can be changed at runtime + using the commands with prefix 'FAKE_' on the control interface. + All of them are handled by our custom CTRL command handler. + + """ + + TOA256_BASE_DEFAULT = 0 + RSSI_BASE_DEFAULT = -60 + + def __init__(self, *trx_args, **trx_kwargs): + Transceiver.__init__(self, *trx_args, **trx_kwargs) + + # Actual ToA / RSSI / TA values + self.toa256_base = self.TOA256_BASE_DEFAULT + self.rssi_base = self.RSSI_BASE_DEFAULT + self.ta = 0 + + # ToA / RSSI randomization threshold + self.toa256_rand_threshold = 0 + self.rssi_rand_threshold = 0 + + # Path loss simulation (burst dropping) + self.burst_drop_amount = 0 + self.burst_drop_period = 1 + + @property + def toa256(self): + # Check if randomization is required + if self.toa256_rand_threshold is 0: + return self.toa256_base + + # Generate a random ToA value in required range + toa256_min = self.toa256_base - self.toa256_rand_threshold + toa256_max = self.toa256_base + self.toa256_rand_threshold + return random.randint(toa256_min, toa256_max) + + @property + def rssi(self): + # Check if randomization is required + if self.rssi_rand_threshold is 0: + return self.rssi_base + + # Generate a random RSSI value in required range + rssi_min = self.rssi_base - self.rssi_rand_threshold + rssi_max = self.rssi_base + self.rssi_rand_threshold + return random.randint(rssi_min, rssi_max) + + # Path loss simulation: burst dropping + # Returns: True - drop, False - keep + def sim_burst_drop(self, msg): + # Check if dropping is required + if self.burst_drop_amount is 0: + return False + + if msg.fn % self.burst_drop_period == 0: + log.info("Simulation: dropping burst (fn=%u %% %u == 0)" + % (msg.fn, self.burst_drop_period)) + self.burst_drop_amount -= 1 + return True + + return False + + # Takes (partially initialized) TRX2L1 message, + # simulates RF path parameters (such as RSSI), + # and sends towards the L1 + def send_data_msg(self, src_trx, msg): + # Complete message header + msg.toa256 = self.toa256 + msg.rssi = self.rssi + + # Apply optional Timing Advance + if src_trx.ta is not 0: + msg.toa256 -= src_trx.ta * 256 + + # Path loss simulation + if self.sim_burst_drop(msg): + return + + # TODO: make legacy mode configurable (via argv?) + self.data_if.send_msg(msg, legacy = True) + + # Simulation specific CTRL command handler + def ctrl_cmd_handler(self, request): + # Timing Advance + # Syntax: CMD SETTA <TA> + if self.ctrl_if.verify_cmd(request, "SETTA", 1): + log.debug("Recv SETTA cmd") + + # Store indicated value + self.ta = int(request[1]) + return 0 + + # Timing of Arrival simulation + # Absolute form: CMD FAKE_TOA <BASE> <THRESH> + elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2): + log.debug("Recv FAKE_TOA cmd") + + # Parse and apply both base and threshold + self.toa256_base = int(request[1]) + self.toa256_rand_threshold = int(request[2]) + return 0 + + # Timing of Arrival simulation + # Relative form: CMD FAKE_TOA <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1): + log.debug("Recv FAKE_TOA cmd") + + # Parse and apply delta + self.toa256_base += int(request[1]) + return 0 + + # RSSI simulation + # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> + elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2): + log.debug("Recv FAKE_RSSI cmd") + + # Parse and apply both base and threshold + self.rssi_base = int(request[1]) + self.rssi_rand_threshold = int(request[2]) + return 0 + + # RSSI simulation + # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1): + log.debug("Recv FAKE_RSSI cmd") + + # Parse and apply delta + self.rssi_base += int(request[1]) + return 0 + + # Path loss simulation: burst dropping + # Syntax: CMD FAKE_DROP <AMOUNT> + # Dropping pattern: fn % 1 == 0 + elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1): + log.debug("Recv FAKE_DROP cmd") + + # Parse / validate amount of bursts + num = int(request[1]) + if num < 0: + log.error("FAKE_DROP amount shall not be negative") + return -1 + + self.burst_drop_amount = num + self.burst_drop_period = 1 + return 0 + + # Path loss simulation: burst dropping + # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> + # Dropping pattern: fn % period == 0 + elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2): + log.debug("Recv FAKE_DROP cmd") + + # Parse / validate amount of bursts + num = int(request[1]) + if num < 0: + log.error("FAKE_DROP amount shall not be negative") + return -1 + + # Parse / validate period + period = int(request[2]) + if period <= 0: + log.error("FAKE_DROP period shall be greater than zero") + return -1 + + self.burst_drop_amount = num + self.burst_drop_period = period + return 0 + + # Unhandled command + return None class Application(ApplicationBase): def __init__(self): @@ -51,72 +271,58 @@ class Application(ApplicationBase): self.app_init_logging(self.argv) def run(self): - # Init TRX CTRL interface for BTS - self.bts_ctrl = CTRLInterfaceBTS( - self.argv.bts_addr, self.argv.bts_base_port + 101, - self.argv.trx_bind_addr, self.argv.bts_base_port + 1) - - # Init TRX CTRL interface for BB - self.bb_ctrl = CTRLInterfaceBB( - self.argv.bb_addr, self.argv.bb_base_port + 101, - self.argv.trx_bind_addr, self.argv.bb_base_port + 1) + # Init shared clock generator + self.clck_gen = CLCKGen([]) # Power measurement emulation # Noise: -120 .. -105 # BTS: -75 .. -50 - self.pm = FakePM(-120, -105, -75, -50) - - # Share a FakePM instance between both BTS and BB - self.bts_ctrl.pm = self.pm - self.bb_ctrl.pm = self.pm - - # Init DATA links - self.bts_data = UDPLink( - self.argv.bts_addr, self.argv.bts_base_port + 102, - self.argv.trx_bind_addr, self.argv.bts_base_port + 2) - self.bb_data = UDPLink( - self.argv.bb_addr, self.argv.bb_base_port + 102, - self.argv.trx_bind_addr, self.argv.bb_base_port + 2) - - # BTS <-> BB burst forwarding - self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data) - - # Share a BurstForwarder instance between BTS and BB - self.bts_ctrl.burst_fwd = self.burst_fwd - self.bb_ctrl.burst_fwd = self.burst_fwd - - # Provide clock to BTS - self.bts_clck = UDPLink( - self.argv.bts_addr, self.argv.bts_base_port + 100, - self.argv.trx_bind_addr, self.argv.bts_base_port) - self.clck_gen = CLCKGen([self.bts_clck]) - self.bts_ctrl.clck_gen = self.clck_gen + self.fake_pm = FakePM(-120, -105, -75, -50) + + # Init TRX instance for BTS + self.bts_trx = FakeTRX(self.argv.trx_bind_addr, + self.argv.bts_addr, self.argv.bts_base_port, + clck_gen = self.clck_gen) + + # Init TRX instance for BB + self.bb_trx = FakeTRX(self.argv.trx_bind_addr, + self.argv.bb_addr, self.argv.bb_base_port, + pwr_meas = self.fake_pm) + + # Burst forwarding between transceivers + self.burst_fwd = BurstForwarder() + self.burst_fwd.add_trx(self.bts_trx) + self.burst_fwd.add_trx(self.bb_trx) log.info("Init complete") # Enter main loop while True: - socks = [self.bts_ctrl.sock, self.bb_ctrl.sock, - self.bts_data.sock, self.bb_data.sock] + socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock, + self.bts_trx.data_if.sock, self.bb_trx.data_if.sock] # Wait until we get any data on any socket r_event, w_event, x_event = select.select(socks, [], []) # Downlink: BTS -> BB - if self.bts_data.sock in r_event: - self.burst_fwd.bts2bb() + if self.bts_trx.data_if.sock in r_event: + msg = self.bts_trx.recv_data_msg() + if msg is not None: + self.burst_fwd.forward_msg(self.bts_trx, msg) # Uplink: BB -> BTS - if self.bb_data.sock in r_event: - self.burst_fwd.bb2bts() + if self.bb_trx.data_if.sock in r_event: + msg = self.bb_trx.recv_data_msg() + if msg is not None: + self.burst_fwd.forward_msg(self.bb_trx, msg) # CTRL commands from BTS - if self.bts_ctrl.sock in r_event: - self.bts_ctrl.handle_rx() + if self.bts_trx.ctrl_if.sock in r_event: + self.bts_trx.ctrl_if.handle_rx() # CTRL commands from BB - if self.bb_ctrl.sock in r_event: - self.bb_ctrl.handle_rx() + if self.bb_trx.ctrl_if.sock in r_event: + self.bb_trx.ctrl_if.handle_rx() def shutdown(self): log.info("Shutting down...") |