diff options
Diffstat (limited to 'src/target/trx_toolkit/fake_trx.py')
-rwxr-xr-x | src/target/trx_toolkit/fake_trx.py | 135 |
1 files changed, 90 insertions, 45 deletions
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py index 8beee6e9..0daecb40 100755 --- a/src/target/trx_toolkit/fake_trx.py +++ b/src/target/trx_toolkit/fake_trx.py @@ -1,10 +1,10 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # TRX Toolkit # Virtual Um-interface (fake transceiver) # -# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> +# (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com> # # All Rights Reserved # @@ -17,12 +17,8 @@ # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")] +APP_CR_HOLDERS = [("2017-2019", "Vadim Yanitskiy <axilirator@gmail.com>")] import logging as log import signal @@ -106,16 +102,32 @@ class FakeTRX(Transceiver): """ + NOMINAL_TX_POWER_DEFAULT = 50 # dBm + TX_ATT_DEFAULT = 0 # dB + PATH_LOSS_DEFAULT = 110 # dB + TOA256_BASE_DEFAULT = 0 - RSSI_BASE_DEFAULT = -60 CI_BASE_DEFAULT = 90 + # Default values for NOPE / IDLE indications + TOA256_NOISE_DEFAULT = 0 + RSSI_NOISE_DEFAULT = -110 + CI_NOISE_DEFAULT = -30 + def __init__(self, *trx_args, **trx_kwargs): Transceiver.__init__(self, *trx_args, **trx_kwargs) + # fake RSSI is disabled by default, only enabled through TRXC FAKE_RSSI. + # When disabled, RSSI is calculated based on Tx power and Rx path loss + self.fake_rssi_enabled = False + + self.rf_muted = False + # Actual ToA, RSSI, C/I, TA values + self.tx_power_base = self.NOMINAL_TX_POWER_DEFAULT + self.tx_att_base = self.TX_ATT_DEFAULT self.toa256_base = self.TOA256_BASE_DEFAULT - self.rssi_base = self.RSSI_BASE_DEFAULT + self.rssi_base = self.NOMINAL_TX_POWER_DEFAULT - self.TX_ATT_DEFAULT - self.PATH_LOSS_DEFAULT self.ci_base = self.CI_BASE_DEFAULT self.ta = 0 @@ -131,7 +143,7 @@ class FakeTRX(Transceiver): @property def toa256(self): # Check if randomization is required - if self.toa256_rand_threshold is 0: + if self.toa256_rand_threshold == 0: return self.toa256_base # Generate a random ToA value in required range @@ -142,7 +154,7 @@ class FakeTRX(Transceiver): @property def rssi(self): # Check if randomization is required - if self.rssi_rand_threshold is 0: + if self.rssi_rand_threshold == 0: return self.rssi_base # Generate a random RSSI value in required range @@ -151,9 +163,13 @@ class FakeTRX(Transceiver): return random.randint(rssi_min, rssi_max) @property + def tx_power(self): + return self.tx_power_base - self.tx_att_base + + @property def ci(self): # Check if randomization is required - if self.ci_rand_threshold is 0: + if self.ci_rand_threshold == 0: return self.ci_base # Generate a random C/I value in required range @@ -165,7 +181,7 @@ class FakeTRX(Transceiver): # Returns: True - drop, False - keep def sim_burst_drop(self, msg): # Check if dropping is required - if self.burst_drop_amount is 0: + if self.burst_drop_amount == 0: return False if msg.fn % self.burst_drop_period == 0: @@ -177,9 +193,6 @@ class FakeTRX(Transceiver): return False def _handle_data_msg_v1(self, src_msg, msg): - # TODO: NOPE indications are not (yet) supported - msg.nope_ind = False - # C/I (Carrier-to-Interference ratio) msg.ci = self.ci @@ -196,31 +209,51 @@ class FakeTRX(Transceiver): msg.tsc_set = 0 msg.tsc = 0 - # Takes (partially initialized) TRX2L1 message, + # Takes (partially initialized) TRXD Rx message, # simulates RF path parameters (such as RSSI), # and sends towards the L1 - def send_data_msg(self, src_trx, src_msg, msg): - # Override header version - msg.ver = self.data_if._hdr_ver + def handle_data_msg(self, src_trx, src_msg, msg): + if self.rf_muted: + msg.nope_ind = True + elif not msg.nope_ind: + # Path loss simulation + msg.nope_ind = self.sim_burst_drop(msg) + if msg.nope_ind: + # Before TRXDv1, we simply drop the message + if msg.ver < 0x01: + del msg + return + + # Since TRXDv1, we should send a NOPE.ind + del msg.burst # burst bits are omited + msg.burst = None + + # TODO: shoud we make these values configurable? + msg.toa256 = self.TOA256_NOISE_DEFAULT + msg.rssi = self.RSSI_NOISE_DEFAULT + msg.ci = self.CI_NOISE_DEFAULT + + self.data_if.send_msg(msg) + return # Complete message header msg.toa256 = self.toa256 - msg.rssi = self.rssi + + # Apply RSSI based on transmitter: + if not self.fake_rssi_enabled: + msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT + else: # Apply fake RSSI + msg.rssi = self.rssi # Version specific fields if msg.ver >= 0x01: self._handle_data_msg_v1(src_msg, msg) # Apply optional Timing Advance - if src_trx.ta is not 0: + if src_trx.ta != 0: msg.toa256 -= src_trx.ta * 256 - # Path loss simulation - if self.sim_burst_drop(msg): - return - - # TODO: make legacy mode configurable (via argv?) - self.data_if.send_msg(msg, legacy = True) + Transceiver.handle_data_msg(self, msg) # Simulation specific CTRL command handler def ctrl_cmd_handler(self, request): @@ -257,9 +290,15 @@ class FakeTRX(Transceiver): elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2): log.debug("(%s) Recv FAKE_RSSI cmd" % self) + # Use negative threshold to disable fake_rssi if previously enabled: + if int(request[2]) < 0: + self.fake_rssi_enabled = False + return 0 + # Parse and apply both base and threshold self.rssi_base = int(request[1]) self.rssi_rand_threshold = int(request[2]) + self.fake_rssi_enabled = True return 0 # RSSI simulation @@ -331,6 +370,15 @@ class FakeTRX(Transceiver): self.burst_drop_period = period return 0 + # Artificial delay for the TRXC interface + # Syntax: CMD FAKE_TRXC_DELAY <DELAY_MS> + elif self.ctrl_if.verify_cmd(request, "FAKE_TRXC_DELAY", 1): + log.debug("(%s) Recv FAKE_TRXC_DELAY cmd", self) + + self.ctrl_if.rsp_delay_ms = int(request[1]) + log.info("(%s) Artificial TRXC delay set to %d", + self, self.ctrl_if.rsp_delay_ms) + # Unhandled command return None @@ -358,34 +406,31 @@ class Application(ApplicationBase): self.fake_pm.trx_list = self.trx_list # Init TRX instance for BTS - self.append_trx(self.argv.bts_addr, - self.argv.bts_base_port, name = "BTS") + self.append_trx(self.argv.bts_addr, self.argv.bts_base_port, name = "BTS") # Init TRX instance for BB - self.append_trx(self.argv.bb_addr, - self.argv.bb_base_port, name = "MS") + self.append_trx(self.argv.bb_addr, self.argv.bb_base_port, name = "MS", child_mgt = False) # Additional transceivers (optional) if self.argv.trx_list is not None: for trx_def in self.argv.trx_list: (name, addr, port, idx) = trx_def - self.append_child_trx(addr, port, idx, name) + self.append_child_trx(addr, port, name = name, child_idx = idx) # Burst forwarding between transceivers - self.burst_fwd = BurstForwarder(self.trx_list) + self.burst_fwd = BurstForwarder(self.trx_list.trx_list) log.info("Init complete") - def append_trx(self, remote_addr, base_port, name = None): + def append_trx(self, remote_addr, base_port, **kwargs): trx = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port, - clck_gen = self.clck_gen, pwr_meas = self.fake_pm, - name = name) + clck_gen = self.clck_gen, pwr_meas = self.fake_pm, **kwargs) self.trx_list.add_trx(trx) - def append_child_trx(self, remote_addr, base_port, child_idx, name = None): - # Index 0 corresponds to the first transceiver - if child_idx is 0: - self.append_trx(remote_addr, base_port, name) + def append_child_trx(self, remote_addr, base_port, **kwargs): + child_idx = kwargs.get("child_idx", 0) + if child_idx == 0: # Index 0 indicates parent transceiver + self.append_trx(remote_addr, base_port, **kwargs) return # Find 'parent' transceiver for a new child @@ -396,7 +441,7 @@ class Application(ApplicationBase): # Allocate a new child trx_child = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port, - child_idx = child_idx, pwr_meas = self.fake_pm, name = name) + pwr_meas = self.fake_pm, **kwargs) self.trx_list.add_trx(trx_child) # Link a new 'child' with its 'parent' @@ -405,7 +450,7 @@ class Application(ApplicationBase): def run(self): # Compose list of to be monitored sockets sock_list = [] - for trx in self.trx_list: + for trx in self.trx_list.trx_list: sock_list.append(trx.ctrl_if.sock) sock_list.append(trx.data_if.sock) @@ -415,7 +460,7 @@ class Application(ApplicationBase): r_event, _, _ = select.select(sock_list, [], []) # Iterate over all transceivers - for trx in self.trx_list: + for trx in self.trx_list.trx_list: # DATA interface if trx.data_if.sock in r_event: msg = trx.recv_data_msg() @@ -440,7 +485,7 @@ class Application(ApplicationBase): @staticmethod def trx_def(val): try: - result = re.match("(.+@)?(.+):([0-9]+)(\/[0-9]+)?", val) + result = re.match(r"(.+@)?(.+):([0-9]+)(/[0-9]+)?", val) (name, addr, port, idx) = result.groups() except: raise argparse.ArgumentTypeError("Invalid TRX definition: %s" % val) |