diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/target/trx_toolkit/burst_fwd.py | 363 | ||||
-rw-r--r-- | src/target/trx_toolkit/ctrl_if_bb.py | 219 | ||||
-rw-r--r-- | src/target/trx_toolkit/ctrl_if_bts.py | 189 | ||||
-rw-r--r-- | src/target/trx_toolkit/ctrl_if_trx.py | 155 | ||||
-rw-r--r-- | src/target/trx_toolkit/fake_pm.py | 66 | ||||
-rwxr-xr-x | src/target/trx_toolkit/fake_trx.py | 306 | ||||
-rw-r--r-- | src/target/trx_toolkit/transceiver.py | 155 |
7 files changed, 663 insertions, 790 deletions
diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py index 3cb6acd0..38ce18f3 100644 --- a/src/target/trx_toolkit/burst_fwd.py +++ b/src/target/trx_toolkit/burst_fwd.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # TRX Toolkit -# BTS <-> BB burst forwarding +# Burst forwarding between transceivers # # (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> # @@ -23,321 +23,62 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import logging as log -import random - -from data_msg import * class BurstForwarder: - """ Performs burst forwarding and preprocessing between MS and BTS. - - == Pass-filtering parameters - - BurstForwarder may drop or pass an UL/DL burst depending - on the following parameters: - - - bts_freq / bb_freq - the current BTS / MS frequency - that was set using RXTUNE control command. By default, - both freq. values are set to None, so nothing is being - forwarded (i.e. bursts are getting dropped). - - FIXME: currently, we don't care about TXTUNE command - and transmit frequencies. It would be great to distinguish - between RX and TX frequencies for both BTS and MS. - - - ts_pass_list - the list of active (i.e. configured) - timeslot numbers for the MS. A timeslot can be activated - or deactivated using SETSLOT control command from the MS. - - FIXME: there is no such list for the BTS side. - - == Preprocessing and measurement simulation - - Since this is a virtual environment, we can simulate different - parameters of a virtual RF interface: + """ Performs burst forwarding between transceivers. - - ToA (Timing of Arrival) - measured difference between expected - and actual time of burst arrival in units of 1/256 of GSM symbol - periods. A pair of both base and threshold values defines a range - of ToA value randomization: + BurstForwarder distributes bursts between the list of given + FakeTRX (Transceiver) instances depending on the following + parameters of each transceiver: - DL: from (toa256_dl_base - toa256_dl_threshold) - to (toa256_dl_base + toa256_dl_threshold), - UL: from (toa256_ul_base - toa256_ul_threshold) - to (toa256_ul_base + toa256_ul_threshold). + - execution state (running or idle), + - actual RX / TX frequencies, + - list of active timeslots. - - RSSI (Received Signal Strength Indication) - measured "power" of - the signal (per burst) in dBm. A pair of both base and threshold - values defines a range of RSSI value randomization: - - DL: from (rssi_dl_base - rssi_dl_threshold) - to (rssi_dl_base + rssi_dl_threshold), - UL: from (rssi_ul_base - rssi_ul_threshold) - to (rssi_ul_base + rssi_ul_threshold). - - Please note that the randomization of both RSSI and ToA - is optional, and can be enabled from the control interface. - - === Timing Advance handling - - The BTS is using ToA measurements for UL bursts in order to calculate - Timing Advance value, that is then indicated to a MS, which in its turn - shall apply this value to the transmitted signal in order to compensate - the delay. Basically, every burst is transmitted in advance defined by - the indicated Timing Advance value. The valid range is 0..63, where - each unit means one GSM symbol advance. The actual Timing Advance value - is set using SETTA control command from MS. By default, it's set to 0. - - === Path loss simulation - burst dropping - - In some cases, e.g. due to a weak signal or high interference, a burst - can be lost, i.e. not detected by the receiver. This can also be - simulated using FAKE_DROP command on both control interfaces: - - - burst_{dl|ul}_drop_amount - the amount of DL/UL bursts - to be dropped (i.e. not forwarded towards the MS/BTS), - - - burst_{dl|ul}_drop_period - drop every X DL/UL burst, e.g. - 1 - drop every consequent burst, 2 - drop every second burst, etc. + Each to be distributed L12TRX message is being transformed + into a TRX2L1 message, and then forwarded to transceivers + with partially initialized header. All uninitialized header + fields (such as rssi and toa256) shall be set by each + transceiver individually before sending towards the L1. """ - def __init__(self, bts_link, bb_link): - self.bts_link = bts_link - self.bb_link = bb_link - - # Init default parameters - self.reset_dl() - self.reset_ul() - - # Initialize (or reset to) default parameters for Downlink - def reset_dl(self): - # Unset current DL freq. - self.bts_freq = None - - # Indicated RSSI / ToA values - self.toa256_dl_base = 0 - self.rssi_dl_base = -60 - - # RSSI / ToA randomization threshold - self.toa256_dl_threshold = 0 - self.rssi_dl_threshold = 0 - - # Path loss simulation (burst dropping) - self.burst_dl_drop_amount = 0 - self.burst_dl_drop_period = 1 - - # Initialize (or reset to) default parameters for Uplink - def reset_ul(self): - # Unset current DL freq. - self.bb_freq = None - - # Indicated RSSI / ToA values - self.rssi_ul_base = -70 - self.toa256_ul_base = 0 - - # RSSI / ToA randomization threshold - self.toa256_ul_threshold = 0 - self.rssi_ul_threshold = 0 - - # Path loss simulation (burst dropping) - self.burst_ul_drop_amount = 0 - self.burst_ul_drop_period = 1 - - # Init timeslot filter (drop everything by default) - self.ts_pass_list = [] - - # Reset Timing Advance value - self.ta = 0 - - # Converts TA value from symbols to - # units of 1/256 of GSM symbol periods - def calc_ta256(self): - return self.ta * 256 - - # Calculates a random ToA value for Downlink bursts - def calc_dl_toa256(self): - # Check if randomization is required - if self.toa256_dl_threshold is 0: - return self.toa256_dl_base - - # Calculate a range for randomization - toa256_min = self.toa256_dl_base - self.toa256_dl_threshold - toa256_max = self.toa256_dl_base + self.toa256_dl_threshold - - # Generate a random ToA value - toa256 = random.randint(toa256_min, toa256_max) - - return toa256 - - # Calculates a random ToA value for Uplink bursts - def calc_ul_toa256(self): - # Check if randomization is required - if self.toa256_ul_threshold is 0: - return self.toa256_ul_base - - # Calculate a range for randomization - toa256_min = self.toa256_ul_base - self.toa256_ul_threshold - toa256_max = self.toa256_ul_base + self.toa256_ul_threshold - - # Generate a random ToA value - toa256 = random.randint(toa256_min, toa256_max) - - return toa256 - - # Calculates a random RSSI value for Downlink bursts - def calc_dl_rssi(self): - # Check if randomization is required - if self.rssi_dl_threshold is 0: - return self.rssi_dl_base - - # Calculate a range for randomization - rssi_min = self.rssi_dl_base - self.rssi_dl_threshold - rssi_max = self.rssi_dl_base + self.rssi_dl_threshold - - # Generate a random RSSI value - return random.randint(rssi_min, rssi_max) - - # Calculates a random RSSI value for Uplink bursts - def calc_ul_rssi(self): - # Check if randomization is required - if self.rssi_ul_threshold is 0: - return self.rssi_ul_base - - # Calculate a range for randomization - rssi_min = self.rssi_ul_base - self.rssi_ul_threshold - rssi_max = self.rssi_ul_base + self.rssi_ul_threshold - - # Generate a random RSSI value - return random.randint(rssi_min, rssi_max) - - # DL path loss simulation - def path_loss_sim_dl(self, msg): - # Burst dropping - if self.burst_dl_drop_amount > 0: - if msg.fn % self.burst_dl_drop_period == 0: - log.info("Simulation: dropping DL burst (fn=%u %% %u == 0)" - % (msg.fn, self.burst_dl_drop_period)) - self.burst_dl_drop_amount -= 1 - return None - - return msg - - # UL path loss simulation - def path_loss_sim_ul(self, msg): - # Burst dropping - if self.burst_ul_drop_amount > 0: - if msg.fn % self.burst_ul_drop_period == 0: - log.info("Simulation: dropping UL burst (fn=%u %% %u == 0)" - % (msg.fn, self.burst_ul_drop_period)) - self.burst_ul_drop_amount -= 1 - return None - - return msg - - # DL burst preprocessing - def preprocess_dl_burst(self, msg): - # Calculate both RSSI and ToA values - msg.toa256 = self.calc_dl_toa256() - msg.rssi = self.calc_dl_rssi() - - # UL burst preprocessing - def preprocess_ul_burst(self, msg): - # Calculate both RSSI and ToA values, - # also apply Timing Advance - msg.toa256 = self.calc_ul_toa256() - msg.toa256 -= self.calc_ta256() - msg.rssi = self.calc_ul_rssi() - - # Converts a L12TRX message to TRX2L1 message - def transform_msg(self, msg_raw): - # Attempt to parse a message - try: - msg_l12trx = DATAMSG_L12TRX() - msg_l12trx.parse_msg(bytearray(msg_raw)) - except: - log.error("Dropping unhandled DL message...") - return None - - # Compose a new message for L1 - return msg_l12trx.gen_trx2l1() - - # Downlink handler: BTS -> BB - def bts2bb(self): - # Read data from socket - data, addr = self.bts_link.sock.recvfrom(512) - - # BB is not connected / tuned - if self.bb_freq is None: - return None - - # Freq. filter - if self.bb_freq != self.bts_freq: - return None - - # Process a message - msg = self.transform_msg(data) - if msg is None: - return None - - # Timeslot filter - if msg.tn not in self.ts_pass_list: - return None - - # Path loss simulation - msg = self.path_loss_sim_dl(msg) - if msg is None: - return None - - # Burst preprocessing - self.preprocess_dl_burst(msg) - - # Validate and generate the payload - payload = msg.gen_msg() - - # Append two unused bytes at the end - # in order to keep the compatibility - payload += bytearray(2) - - # Send burst to BB - self.bb_link.send(payload) - - # Uplink handler: BB -> BTS - def bb2bts(self): - # Read data from socket - data, addr = self.bb_link.sock.recvfrom(512) - - # BTS is not connected / tuned - if self.bts_freq is None: - return None - - # Freq. filter - if self.bb_freq != self.bts_freq: - return None - - # Process a message - msg = self.transform_msg(data) - if msg is None: - return None - - # Timeslot filter - if msg.tn not in self.ts_pass_list: - log.warning("TS %u is not configured, dropping UL burst..." % msg.tn) - return None - - # Path loss simulation - msg = self.path_loss_sim_ul(msg) - if msg is None: - return None - - # Burst preprocessing - self.preprocess_ul_burst(msg) - - # Validate and generate the payload - payload = msg.gen_msg() - - # Append two unused bytes at the end - # in order to keep the compatibility - payload += bytearray(2) - - # Send burst to BTS - self.bts_link.send(payload) + def __init__(self, trx_list = []): + # List of Transceiver instances + self.trx_list = trx_list + + def add_trx(self, trx): + if trx in self.trx_list: + log.error("TRX is already in the list") + return + + self.trx_list.append(trx) + + def del_trx(self, trx): + if trx not in self.trx_list: + log.error("TRX is not in the list") + return + + self.trx_list.remove(trx) + + def forward_msg(self, src_trx, rx_msg): + # Transform from L12TRX to TRX2L1 + tx_msg = rx_msg.gen_trx2l1() + if tx_msg is None: + log.error("Forwarding failed, could not transform " + "message (%s) => dropping..." % rx_msg.desc_hdr()) + + # Iterate over all known transceivers + for trx in self.trx_list: + if trx == src_trx: + continue + + # Check transceiver state + if not trx.running: + continue + if trx.rx_freq != src_trx.tx_freq: + continue + if tx_msg.tn not in trx.ts_list: + continue + + trx.send_data_msg(src_trx, tx_msg) diff --git a/src/target/trx_toolkit/ctrl_if_bb.py b/src/target/trx_toolkit/ctrl_if_bb.py deleted file mode 100644 index d25aa306..00000000 --- a/src/target/trx_toolkit/ctrl_if_bb.py +++ /dev/null @@ -1,219 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding: utf-8 -*- - -# TRX Toolkit -# CTRL interface implementation (OsmocomBB specific) -# -# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com> -# -# All Rights Reserved -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -import logging as log - -from ctrl_if import CTRLInterface - -class CTRLInterfaceBB(CTRLInterface): - # Internal state variables - trx_started = False - burst_fwd = None - rx_freq = None - tx_freq = None - pm = None - - def __init__(self, *udp_link_args): - CTRLInterface.__init__(self, *udp_link_args) - log.info("Init CTRL interface for BB (%s)" % self.desc_link()) - - def parse_cmd(self, request): - # Power control - if self.verify_cmd(request, "POWERON", 0): - log.debug("Recv POWERON CMD") - - # Ensure transceiver isn't working - if self.trx_started: - log.error("Transceiver already started") - return -1 - - # Ensure RX / TX freq. are set - if (self.rx_freq is None) or (self.tx_freq is None): - log.error("RX / TX freq. are not set") - return -1 - - log.info("Starting transceiver...") - self.trx_started = True - return 0 - - elif self.verify_cmd(request, "POWEROFF", 0): - log.debug("Recv POWEROFF cmd") - - log.info("Stopping transceiver...") - self.trx_started = False - return 0 - - # Tuning Control - elif self.verify_cmd(request, "RXTUNE", 1): - log.debug("Recv RXTUNE cmd") - - # TODO: check freq range - self.rx_freq = int(request[1]) * 1000 - self.burst_fwd.bb_freq = self.rx_freq - return 0 - - elif self.verify_cmd(request, "TXTUNE", 1): - log.debug("Recv TXTUNE cmd") - - # TODO: check freq range - self.tx_freq = int(request[1]) * 1000 - return 0 - - # Power measurement - elif self.verify_cmd(request, "MEASURE", 1): - log.debug("Recv MEASURE cmd") - - if self.pm is None: - return -1 - - # TODO: check freq range - meas_freq = int(request[1]) * 1000 - meas_dbm = str(self.pm.measure(meas_freq)) - - return (0, [meas_dbm]) - - elif self.verify_cmd(request, "SETSLOT", 2): - log.debug("Recv SETSLOT cmd") - - if self.burst_fwd is None: - return -1 - - # Obtain TS index - ts = int(request[1]) - if ts not in range(0, 8): - log.error("TS index should be in range: 0..7") - return -1 - - # Parse TS type - ts_type = int(request[2]) - - # TS activation / deactivation - # We don't care about ts_type - if ts_type == 0: - # Deactivate TS (remove from TS pass-filter list) - if ts in self.burst_fwd.ts_pass_list: - self.burst_fwd.ts_pass_list.remove(ts) - else: - # Activate TS (add to TS pass-filter list) - if ts not in self.burst_fwd.ts_pass_list: - self.burst_fwd.ts_pass_list.append(ts) - - return 0 - - # Timing Advance - elif self.verify_cmd(request, "SETTA", 1): - log.debug("Recv SETTA cmd") - - # Save to the BurstForwarder instance - self.burst_fwd.ta = int(request[1]) - return 0 - - # Timing of Arrival simulation for Uplink - # Absolute form: CMD FAKE_TOA <BASE> <THRESH> - elif self.verify_cmd(request, "FAKE_TOA", 2): - log.debug("Recv FAKE_TOA cmd") - - # Parse and apply both base and threshold - self.burst_fwd.toa256_ul_base = int(request[1]) - self.burst_fwd.toa256_ul_threshold = int(request[2]) - - return 0 - - # Timing of Arrival simulation for Uplink - # Relative form: CMD FAKE_TOA <+-BASE_DELTA> - elif self.verify_cmd(request, "FAKE_TOA", 1): - log.debug("Recv FAKE_TOA cmd") - - # Parse and apply delta - self.burst_fwd.toa256_ul_base += int(request[1]) - - return 0 - - # RSSI simulation for Uplink - # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> - elif self.verify_cmd(request, "FAKE_RSSI", 2): - log.debug("Recv FAKE_RSSI cmd") - - # Parse and apply both base and threshold - self.burst_fwd.rssi_ul_base = int(request[1]) - self.burst_fwd.rssi_ul_threshold = int(request[2]) - - return 0 - - # RSSI simulation for Uplink - # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> - elif self.verify_cmd(request, "FAKE_RSSI", 1): - log.debug("Recv FAKE_RSSI cmd") - - # Parse and apply delta - self.burst_fwd.rssi_ul_base += int(request[1]) - - return 0 - - # Path loss simulation for UL: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> - # Dropping pattern: fn % 1 == 0 - elif self.verify_cmd(request, "FAKE_DROP", 1): - log.debug("Recv FAKE_DROP cmd") - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("FAKE_DROP amount shall not be negative") - return -1 - - self.burst_fwd.burst_ul_drop_amount = num - self.burst_fwd.burst_ul_drop_period = 1 - - return 0 - - # Path loss simulation for UL: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> - # Dropping pattern: fn % period == 0 - elif self.verify_cmd(request, "FAKE_DROP", 2): - log.debug("Recv FAKE_DROP cmd") - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("FAKE_DROP amount shall not be negative") - return -1 - - # Parse / validate period - period = int(request[2]) - if period <= 0: - log.error("FAKE_DROP period shall be greater than zero") - return -1 - - self.burst_fwd.burst_ul_drop_amount = num - self.burst_fwd.burst_ul_drop_period = period - - return 0 - - # Wrong / unknown command - else: - # We don't care about other commands, - # so let's merely ignore them ;) - log.debug("Ignore CMD %s" % request[0]) - return 0 diff --git a/src/target/trx_toolkit/ctrl_if_bts.py b/src/target/trx_toolkit/ctrl_if_bts.py deleted file mode 100644 index cb38b67c..00000000 --- a/src/target/trx_toolkit/ctrl_if_bts.py +++ /dev/null @@ -1,189 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding: utf-8 -*- - -# TRX Toolkit -# CTRL interface implementation (OsmoBTS specific) -# -# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com> -# -# All Rights Reserved -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -import logging as log - -from ctrl_if import CTRLInterface - -class CTRLInterfaceBTS(CTRLInterface): - # Internal state variables - trx_started = False - burst_fwd = None - clck_gen = None - rx_freq = None - tx_freq = None - pm = None - - def __init__(self, *udp_link_args): - CTRLInterface.__init__(self, *udp_link_args) - log.info("Init CTRL interface for BTS (%s)" % self.desc_link()) - - def parse_cmd(self, request): - # Power control - if self.verify_cmd(request, "POWERON", 0): - log.debug("Recv POWERON CMD") - - # Ensure transceiver isn't working - if self.trx_started: - log.error("Transceiver already started") - return -1 - - # Ensure RX / TX freq. are set - if (self.rx_freq is None) or (self.tx_freq is None): - log.error("RX / TX freq. are not set") - return -1 - - log.info("Starting transceiver...") - self.trx_started = True - - # Power emulation - if self.pm is not None: - self.pm.add_bts_list([self.tx_freq]) - - # Start clock indications - if self.clck_gen is not None: - self.clck_gen.start() - - return 0 - - elif self.verify_cmd(request, "POWEROFF", 0): - log.debug("Recv POWEROFF cmd") - - log.info("Stopping transceiver...") - self.trx_started = False - - # Power emulation - if self.pm is not None: - self.pm.del_bts_list([self.tx_freq]) - - # Stop clock indications - if self.clck_gen is not None: - self.clck_gen.stop() - - return 0 - - # Tuning Control - elif self.verify_cmd(request, "RXTUNE", 1): - log.debug("Recv RXTUNE cmd") - - # TODO: check freq range - self.rx_freq = int(request[1]) * 1000 - return 0 - - elif self.verify_cmd(request, "TXTUNE", 1): - log.debug("Recv TXTUNE cmd") - - # TODO: check freq range - self.tx_freq = int(request[1]) * 1000 - self.burst_fwd.bts_freq = self.tx_freq - return 0 - - # Timing of Arrival simulation for Downlink - # Absolute form: CMD FAKE_TOA <BASE> <THRESH> - elif self.verify_cmd(request, "FAKE_TOA", 2): - log.debug("Recv FAKE_TOA cmd") - - # Parse and apply both base and threshold - self.burst_fwd.toa256_dl_base = int(request[1]) - self.burst_fwd.toa256_dl_threshold = int(request[2]) - - return 0 - - # Timing of Arrival simulation for Downlink - # Relative form: CMD FAKE_TOA <+-BASE_DELTA> - elif self.verify_cmd(request, "FAKE_TOA", 1): - log.debug("Recv FAKE_TOA cmd") - - # Parse and apply delta - self.burst_fwd.toa256_dl_base += int(request[1]) - - return 0 - - # RSSI simulation for Downlink - # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> - elif self.verify_cmd(request, "FAKE_RSSI", 2): - log.debug("Recv FAKE_RSSI cmd") - - # Parse and apply both base and threshold - self.burst_fwd.rssi_dl_base = int(request[1]) - self.burst_fwd.rssi_dl_threshold = int(request[2]) - - return 0 - - # RSSI simulation for Downlink - # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> - elif self.verify_cmd(request, "FAKE_RSSI", 1): - log.debug("Recv FAKE_RSSI cmd") - - # Parse and apply delta - self.burst_fwd.rssi_dl_base += int(request[1]) - - return 0 - - # Path loss simulation for DL: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> - # Dropping pattern: fn % 1 == 0 - elif self.verify_cmd(request, "FAKE_DROP", 1): - log.debug("Recv FAKE_DROP cmd") - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("FAKE_DROP amount shall not be negative") - return -1 - - self.burst_fwd.burst_dl_drop_amount = num - self.burst_fwd.burst_dl_drop_period = 1 - - return 0 - - # Path loss simulation for DL: burst dropping - # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> - # Dropping pattern: fn % period == 0 - elif self.verify_cmd(request, "FAKE_DROP", 2): - log.debug("Recv FAKE_DROP cmd") - - # Parse / validate amount of bursts - num = int(request[1]) - if num < 0: - log.error("FAKE_DROP amount shall not be negative") - return -1 - - # Parse / validate period - period = int(request[2]) - if period <= 0: - log.error("FAKE_DROP period shall be greater than zero") - return -1 - - self.burst_fwd.burst_dl_drop_amount = num - self.burst_fwd.burst_dl_drop_period = period - - return 0 - - # Wrong / unknown command - else: - # We don't care about other commands, - # so let's merely ignore them ;) - log.debug("Ignore CMD %s" % request[0]) - return 0 diff --git a/src/target/trx_toolkit/ctrl_if_trx.py b/src/target/trx_toolkit/ctrl_if_trx.py new file mode 100644 index 00000000..83d55db1 --- /dev/null +++ b/src/target/trx_toolkit/ctrl_if_trx.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# CTRL interface implementation (common commands) +# +# (C) 2016-2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import logging as log + +from ctrl_if import CTRLInterface + +class CTRLInterfaceTRX(CTRLInterface): + """ CTRL interface handler for common transceiver management commands. + + The following set of commands is mandatory for every transceiver: + + - POWERON / POWEROFF - state management (running / idle), + - RXTUNE / TXTUNE - RX / TX frequency management, + - SETSLOT - timeslot management. + + Additionally, there is an optional MEASURE command, which is used + by OsmocomBB to perform power measurement on a given frequency. + + A given transceiver may also define its own command handler, + that is prioritized, i.e. it can overwrite any commands mentioned + above. If None is returned, a command is considered as unhandled. + + """ + + def __init__(self, trx, *udp_link_args): + CTRLInterface.__init__(self, *udp_link_args) + log.info("Init CTRL interface (%s)" % self.desc_link()) + + # Link with Transceiver instance we belong to + self.trx = trx + + def parse_cmd(self, request): + # Custom command handlers (prioritized) + res = self.trx.ctrl_cmd_handler(request) + if res is not None: + return res + + # Power control + if self.verify_cmd(request, "POWERON", 0): + log.debug("Recv POWERON CMD") + + # Ensure transceiver isn't working + if self.trx.running: + log.error("Transceiver already started") + return -1 + + # Ensure RX / TX freq. are set + if (self.trx.rx_freq is None) or (self.trx.tx_freq is None): + log.error("RX / TX freq. are not set") + return -1 + + log.info("Starting transceiver...") + self.trx.running = True + + # Notify transceiver about that + self.trx.power_event_handler("POWERON") + + return 0 + + elif self.verify_cmd(request, "POWEROFF", 0): + log.debug("Recv POWEROFF cmd") + + log.info("Stopping transceiver...") + self.trx.running = False + + # Notify transceiver about that + self.trx.power_event_handler("POWEROFF") + + return 0 + + # Tuning Control + elif self.verify_cmd(request, "RXTUNE", 1): + log.debug("Recv RXTUNE cmd") + + # TODO: check freq range + self.trx.rx_freq = int(request[1]) * 1000 + return 0 + + elif self.verify_cmd(request, "TXTUNE", 1): + log.debug("Recv TXTUNE cmd") + + # TODO: check freq range + self.trx.tx_freq = int(request[1]) * 1000 + return 0 + + elif self.verify_cmd(request, "SETSLOT", 2): + log.debug("Recv SETSLOT cmd") + + # Obtain TS index + ts = int(request[1]) + if ts not in range(0, 8): + log.error("TS index should be in range: 0..7") + return -1 + + # Parse TS type + ts_type = int(request[2]) + + # TS activation / deactivation + # We don't care about ts_type + if ts_type == 0: + # Deactivate TS (remove from the list of active timeslots) + if ts in self.trx.ts_list: + self.trx.ts_list.remove(ts) + else: + # Activate TS (add to the list of active timeslots) + if ts not in self.trx.ts_list: + self.trx.ts_list.append(ts) + + return 0 + + # Power measurement + if self.verify_cmd(request, "MEASURE", 1): + log.debug("Recv MEASURE cmd") + + # Power Measurement interface is optional + # for Transceiver, thus may be uninitialized + if self.trx.pwr_meas is None: + log.error("Power Measurement interface " + "is not initialized => rejecting command") + return -1 + + # TODO: check freq range + meas_freq = int(request[1]) * 1000 + meas_dbm = self.trx.pwr_meas.measure(meas_freq) + + return (0, [str(meas_dbm)]) + + # Wrong / unknown command + else: + # We don't care about other commands, + # so let's merely ignore them ;) + log.debug("Ignore CMD %s" % request[0]) + return 0 diff --git a/src/target/trx_toolkit/fake_pm.py b/src/target/trx_toolkit/fake_pm.py index 840b4e40..1992f8d3 100644 --- a/src/target/trx_toolkit/fake_pm.py +++ b/src/target/trx_toolkit/fake_pm.py @@ -2,9 +2,9 @@ # -*- coding: utf-8 -*- # TRX Toolkit -# Power measurement emulation for BB +# Power measurement emulation # -# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com> +# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> # # All Rights Reserved # @@ -25,29 +25,53 @@ from random import randint class FakePM: - # Freq. list for good power level - bts_list = [] + """ Power measurement emulation for fake transceivers. - def __init__(self, noise_min, noise_max, bts_min, bts_max): - # Save power level ranges + There is no such thing like RF signal level in fake Um-interface, + so we need to emulate this. The main idea is to have a list of + all running and idle transceivers. As soon as a measurement + request is received, FakePM will attempt to find a running + transceiver on a given frequency. + + The result of such "measurement" is a random RSSI value + in one of the following ranges: + + - trx_min ... trx_max - if at least one TRX was found, + - noise_min ... noise_max - no TRX instances were found. + + FIXME: it would be great to average the rate of bursts + and indicated power / attenuation values for all + matching transceivers, so "pure traffic" ARFCNs + would be handled properly. + + """ + + def __init__(self, noise_min, noise_max, trx_min, trx_max): + # Init list of transceivers + self.trx_list = [] + + # RSSI randomization ranges self.noise_min = noise_min self.noise_max = noise_max - self.bts_min = bts_min - self.bts_max = bts_max + self.trx_min = trx_min + self.trx_max = trx_max + + @property + def rssi_noise(self): + return randint(self.noise_min, self.noise_max) - def measure(self, bts): - if bts in self.bts_list: - return randint(self.bts_min, self.bts_max) - else: - return randint(self.noise_min, self.noise_max) + @property + def rssi_trx(self): + return randint(self.trx_min, self.trx_max) - def update_bts_list(self, new_list): - self.bts_list = new_list + def measure(self, freq): + # Iterate over all known transceivers + for trx in self.trx_list: + if not trx.running: + continue - def add_bts_list(self, add_list): - self.bts_list += add_list + # Match by given frequency + if trx.tx_freq == freq: + return self.rssi_trx - def del_bts_list(self, del_list): - for item in del_list: - if item in self.bts_list: - self.bts_list.remove(item) + return self.rssi_noise diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py index 95261dfe..46b413aa 100755 --- a/src/target/trx_toolkit/fake_trx.py +++ b/src/target/trx_toolkit/fake_trx.py @@ -27,17 +27,237 @@ APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")] import logging as log import signal import argparse +import random import select import sys from app_common import ApplicationBase -from ctrl_if_bts import CTRLInterfaceBTS -from ctrl_if_bb import CTRLInterfaceBB from burst_fwd import BurstForwarder +from transceiver import Transceiver +from clck_gen import CLCKGen from fake_pm import FakePM -from udp_link import UDPLink -from clck_gen import CLCKGen +class FakeTRX(Transceiver): + """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation. + + == ToA / RSSI measurement simulation + + Since this is a virtual environment, we can simulate different + parameters of the physical RF interface: + + - ToA (Timing of Arrival) - measured difference between expected + and actual time of burst arrival in units of 1/256 of GSM symbol + periods. A pair of both base and threshold values defines a range + of ToA value randomization: + + from (toa256_base - toa256_rand_threshold) + to (toa256_base + toa256_rand_threshold). + + - RSSI (Received Signal Strength Indication) - measured "power" of + the signal (per burst) in dBm. A pair of both base and threshold + values defines a range of RSSI value randomization: + + from (rssi_base - rssi_rand_threshold) + to (rssi_base + rssi_rand_threshold). + + Please note that randomization of both RSSI and ToA is optional, + and can be enabled from the control interface. + + == Timing Advance handling + + The BTS is using ToA measurements for UL bursts in order to calculate + Timing Advance value, that is then indicated to a MS, which in its turn + shall apply this value to the transmitted signal in order to compensate + the delay. Basically, every burst is transmitted in advance defined by + the indicated Timing Advance value. The valid range is 0..63, where + each unit means one GSM symbol advance. The actual Timing Advance value + is set using SETTA control command from MS. By default, it's set to 0. + + == Path loss simulation + + === Burst dropping + + In some cases, e.g. due to a weak signal or high interference, a burst + can be lost, i.e. not detected by the receiver. This can also be + simulated using FAKE_DROP command on the control interface: + + - burst_drop_amount - the amount of DL/UL bursts + to be dropped (i.e. not forwarded towards the MS/BTS), + + - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0. + + == Configuration + + All simulation parameters mentioned above can be changed at runtime + using the commands with prefix 'FAKE_' on the control interface. + All of them are handled by our custom CTRL command handler. + + """ + + TOA256_BASE_DEFAULT = 0 + RSSI_BASE_DEFAULT = -60 + + def __init__(self, *trx_args, **trx_kwargs): + Transceiver.__init__(self, *trx_args, **trx_kwargs) + + # Actual ToA / RSSI / TA values + self.toa256_base = self.TOA256_BASE_DEFAULT + self.rssi_base = self.RSSI_BASE_DEFAULT + self.ta = 0 + + # ToA / RSSI randomization threshold + self.toa256_rand_threshold = 0 + self.rssi_rand_threshold = 0 + + # Path loss simulation (burst dropping) + self.burst_drop_amount = 0 + self.burst_drop_period = 1 + + @property + def toa256(self): + # Check if randomization is required + if self.toa256_rand_threshold is 0: + return self.toa256_base + + # Generate a random ToA value in required range + toa256_min = self.toa256_base - self.toa256_rand_threshold + toa256_max = self.toa256_base + self.toa256_rand_threshold + return random.randint(toa256_min, toa256_max) + + @property + def rssi(self): + # Check if randomization is required + if self.rssi_rand_threshold is 0: + return self.rssi_base + + # Generate a random RSSI value in required range + rssi_min = self.rssi_base - self.rssi_rand_threshold + rssi_max = self.rssi_base + self.rssi_rand_threshold + return random.randint(rssi_min, rssi_max) + + # Path loss simulation: burst dropping + # Returns: True - drop, False - keep + def sim_burst_drop(self, msg): + # Check if dropping is required + if self.burst_drop_amount is 0: + return False + + if msg.fn % self.burst_drop_period == 0: + log.info("Simulation: dropping burst (fn=%u %% %u == 0)" + % (msg.fn, self.burst_drop_period)) + self.burst_drop_amount -= 1 + return True + + return False + + # Takes (partially initialized) TRX2L1 message, + # simulates RF path parameters (such as RSSI), + # and sends towards the L1 + def send_data_msg(self, src_trx, msg): + # Complete message header + msg.toa256 = self.toa256 + msg.rssi = self.rssi + + # Apply optional Timing Advance + if src_trx.ta is not 0: + msg.toa256 -= src_trx.ta * 256 + + # Path loss simulation + if self.sim_burst_drop(msg): + return + + # TODO: make legacy mode configurable (via argv?) + self.data_if.send_msg(msg, legacy = True) + + # Simulation specific CTRL command handler + def ctrl_cmd_handler(self, request): + # Timing Advance + # Syntax: CMD SETTA <TA> + if self.ctrl_if.verify_cmd(request, "SETTA", 1): + log.debug("Recv SETTA cmd") + + # Store indicated value + self.ta = int(request[1]) + return 0 + + # Timing of Arrival simulation + # Absolute form: CMD FAKE_TOA <BASE> <THRESH> + elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2): + log.debug("Recv FAKE_TOA cmd") + + # Parse and apply both base and threshold + self.toa256_base = int(request[1]) + self.toa256_rand_threshold = int(request[2]) + return 0 + + # Timing of Arrival simulation + # Relative form: CMD FAKE_TOA <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1): + log.debug("Recv FAKE_TOA cmd") + + # Parse and apply delta + self.toa256_base += int(request[1]) + return 0 + + # RSSI simulation + # Absolute form: CMD FAKE_RSSI <BASE> <THRESH> + elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2): + log.debug("Recv FAKE_RSSI cmd") + + # Parse and apply both base and threshold + self.rssi_base = int(request[1]) + self.rssi_rand_threshold = int(request[2]) + return 0 + + # RSSI simulation + # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> + elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1): + log.debug("Recv FAKE_RSSI cmd") + + # Parse and apply delta + self.rssi_base += int(request[1]) + return 0 + + # Path loss simulation: burst dropping + # Syntax: CMD FAKE_DROP <AMOUNT> + # Dropping pattern: fn % 1 == 0 + elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1): + log.debug("Recv FAKE_DROP cmd") + + # Parse / validate amount of bursts + num = int(request[1]) + if num < 0: + log.error("FAKE_DROP amount shall not be negative") + return -1 + + self.burst_drop_amount = num + self.burst_drop_period = 1 + return 0 + + # Path loss simulation: burst dropping + # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD> + # Dropping pattern: fn % period == 0 + elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2): + log.debug("Recv FAKE_DROP cmd") + + # Parse / validate amount of bursts + num = int(request[1]) + if num < 0: + log.error("FAKE_DROP amount shall not be negative") + return -1 + + # Parse / validate period + period = int(request[2]) + if period <= 0: + log.error("FAKE_DROP period shall be greater than zero") + return -1 + + self.burst_drop_amount = num + self.burst_drop_period = period + return 0 + + # Unhandled command + return None class Application(ApplicationBase): def __init__(self): @@ -51,72 +271,58 @@ class Application(ApplicationBase): self.app_init_logging(self.argv) def run(self): - # Init TRX CTRL interface for BTS - self.bts_ctrl = CTRLInterfaceBTS( - self.argv.bts_addr, self.argv.bts_base_port + 101, - self.argv.trx_bind_addr, self.argv.bts_base_port + 1) - - # Init TRX CTRL interface for BB - self.bb_ctrl = CTRLInterfaceBB( - self.argv.bb_addr, self.argv.bb_base_port + 101, - self.argv.trx_bind_addr, self.argv.bb_base_port + 1) + # Init shared clock generator + self.clck_gen = CLCKGen([]) # Power measurement emulation # Noise: -120 .. -105 # BTS: -75 .. -50 - self.pm = FakePM(-120, -105, -75, -50) - - # Share a FakePM instance between both BTS and BB - self.bts_ctrl.pm = self.pm - self.bb_ctrl.pm = self.pm - - # Init DATA links - self.bts_data = UDPLink( - self.argv.bts_addr, self.argv.bts_base_port + 102, - self.argv.trx_bind_addr, self.argv.bts_base_port + 2) - self.bb_data = UDPLink( - self.argv.bb_addr, self.argv.bb_base_port + 102, - self.argv.trx_bind_addr, self.argv.bb_base_port + 2) - - # BTS <-> BB burst forwarding - self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data) - - # Share a BurstForwarder instance between BTS and BB - self.bts_ctrl.burst_fwd = self.burst_fwd - self.bb_ctrl.burst_fwd = self.burst_fwd - - # Provide clock to BTS - self.bts_clck = UDPLink( - self.argv.bts_addr, self.argv.bts_base_port + 100, - self.argv.trx_bind_addr, self.argv.bts_base_port) - self.clck_gen = CLCKGen([self.bts_clck]) - self.bts_ctrl.clck_gen = self.clck_gen + self.fake_pm = FakePM(-120, -105, -75, -50) + + # Init TRX instance for BTS + self.bts_trx = FakeTRX(self.argv.trx_bind_addr, + self.argv.bts_addr, self.argv.bts_base_port, + clck_gen = self.clck_gen) + + # Init TRX instance for BB + self.bb_trx = FakeTRX(self.argv.trx_bind_addr, + self.argv.bb_addr, self.argv.bb_base_port, + pwr_meas = self.fake_pm) + + # Burst forwarding between transceivers + self.burst_fwd = BurstForwarder() + self.burst_fwd.add_trx(self.bts_trx) + self.burst_fwd.add_trx(self.bb_trx) log.info("Init complete") # Enter main loop while True: - socks = [self.bts_ctrl.sock, self.bb_ctrl.sock, - self.bts_data.sock, self.bb_data.sock] + socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock, + self.bts_trx.data_if.sock, self.bb_trx.data_if.sock] # Wait until we get any data on any socket r_event, w_event, x_event = select.select(socks, [], []) # Downlink: BTS -> BB - if self.bts_data.sock in r_event: - self.burst_fwd.bts2bb() + if self.bts_trx.data_if.sock in r_event: + msg = self.bts_trx.recv_data_msg() + if msg is not None: + self.burst_fwd.forward_msg(self.bts_trx, msg) # Uplink: BB -> BTS - if self.bb_data.sock in r_event: - self.burst_fwd.bb2bts() + if self.bb_trx.data_if.sock in r_event: + msg = self.bb_trx.recv_data_msg() + if msg is not None: + self.burst_fwd.forward_msg(self.bb_trx, msg) # CTRL commands from BTS - if self.bts_ctrl.sock in r_event: - self.bts_ctrl.handle_rx() + if self.bts_trx.ctrl_if.sock in r_event: + self.bts_trx.ctrl_if.handle_rx() # CTRL commands from BB - if self.bb_ctrl.sock in r_event: - self.bb_ctrl.handle_rx() + if self.bb_trx.ctrl_if.sock in r_event: + self.bb_trx.ctrl_if.handle_rx() def shutdown(self): log.info("Shutting down...") diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py new file mode 100644 index 00000000..edd4d31f --- /dev/null +++ b/src/target/trx_toolkit/transceiver.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Transceiver implementation +# +# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import logging as log + +from ctrl_if_trx import CTRLInterfaceTRX +from data_if import DATAInterface +from udp_link import UDPLink + +class Transceiver: + """ Base transceiver implementation. + + Represents a single transceiver, that can be used as for the BTS side, + as for the MS side. Each individual instance of Transceiver unifies + three basic interfaces built on three independent UDP connections: + + - CLCK (base port + 100/0) - clock indications from TRX to L1, + - CTRL (base port + 101/1) - control interface for L1, + - DATA (base port + 102/2) - bidirectional data interface for bursts. + + A transceiver can be either in active (i.e. working), or in idle mode. + The active mode should ensure that both RX/TX frequencies are set. + + NOTE: CLCK is not required for some L1 implementations, so it is optional. + + == Timeslot configuration + + Transceiver has a list of active (i.e. configured) TDMA timeslots. + The L1 should configure a timeslot before sending or expecting any + data on it. This is done by SETSLOT control command, which also + indicates an associated channel combination (see GSM TS 05.02). + + NOTE: we don't store the associated channel combinations, + as they are only useful for burst detection and demodulation. + + == Clock distribution (optional) + + The clock indications are not expected by L1 when transceiver + is not running, so we monitor both POWERON / POWEROFF events + from the control interface, and keep the list of CLCK links + in a given CLCKGen instance updated. The clock generator is + started and stopped automatically. + + NOTE: a single instance of CLCKGen can be shared between multiple + transceivers, as well as multiple transceivers may use + individual CLCKGen instances. + + == Power Measurement (optional) + + Transceiver may have an optional power measurement interface, + that shall provide at least one method: measure(freq). This + is required for the MS side (i.e. OsmocomBB). + + """ + + def __init__(self, bind_addr, remote_addr, base_port, + clck_gen = None, pwr_meas = None): + # Connection info + self.remote_addr = remote_addr + self.bind_addr = bind_addr + self.base_port = base_port + + # Init DATA interface + self.data_if = DATAInterface( + remote_addr, base_port + 102, + bind_addr, base_port + 2) + + # Init CTRL interface + self.ctrl_if = CTRLInterfaceTRX(self, + remote_addr, base_port + 101, + bind_addr, base_port + 1) + + # Init optional CLCK interface + self.clck_gen = clck_gen + if clck_gen is not None: + self.clck_if = UDPLink( + remote_addr, base_port + 100, + bind_addr, base_port) + + # Optional Power Measurement interface + self.pwr_meas = pwr_meas + + # Internal state + self.running = False + + # Actual RX / TX frequencies + self.rx_freq = None + self.tx_freq = None + + # List of active (configured) timeslots + self.ts_list = [] + + # To be overwritten if required, + # no custom command handlers by default + def ctrl_cmd_handler(self, request): + return None + + def power_event_handler(self, event): + # Trigger clock generator if required + if self.clck_gen is not None: + clck_links = self.clck_gen.clck_links + if not self.running and (self.clck_if in clck_links): + # Transceiver was stopped + clck_links.remove(self.clck_if) + elif self.running and (self.clck_if not in clck_links): + # Transceiver was started + clck_links.append(self.clck_if) + + if not self.clck_gen.timer and len(clck_links) > 0: + log.info("Starting clock generator") + self.clck_gen.start() + elif self.clck_gen.timer and not clck_links: + log.info("Stopping clock generator") + self.clck_gen.stop() + + def recv_data_msg(self): + # Read and parse data from socket + msg = self.data_if.recv_l12trx_msg() + if not msg: + return None + + # Make sure that transceiver is configured and running + if not self.running: + log.warning("RX DATA message (%s), but transceiver " + "is not running => dropping..." % msg.desc_hdr()) + return None + + # Make sure that indicated timeslot is configured + if msg.tn not in self.ts_list: + log.warning("RX DATA message (%s), but timeslot " + "is not configured => dropping..." % msg.desc_hdr()) + return None + + return msg |