summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/fake_trx.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/target/trx_toolkit/fake_trx.py')
-rwxr-xr-xsrc/target/trx_toolkit/fake_trx.py135
1 files changed, 90 insertions, 45 deletions
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 8beee6e9..0daecb40 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -1,10 +1,10 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TRX Toolkit
# Virtual Um-interface (fake transceiver)
#
-# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+# (C) 2017-2019 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@@ -17,12 +17,8 @@
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
+APP_CR_HOLDERS = [("2017-2019", "Vadim Yanitskiy <axilirator@gmail.com>")]
import logging as log
import signal
@@ -106,16 +102,32 @@ class FakeTRX(Transceiver):
"""
+ NOMINAL_TX_POWER_DEFAULT = 50 # dBm
+ TX_ATT_DEFAULT = 0 # dB
+ PATH_LOSS_DEFAULT = 110 # dB
+
TOA256_BASE_DEFAULT = 0
- RSSI_BASE_DEFAULT = -60
CI_BASE_DEFAULT = 90
+ # Default values for NOPE / IDLE indications
+ TOA256_NOISE_DEFAULT = 0
+ RSSI_NOISE_DEFAULT = -110
+ CI_NOISE_DEFAULT = -30
+
def __init__(self, *trx_args, **trx_kwargs):
Transceiver.__init__(self, *trx_args, **trx_kwargs)
+ # fake RSSI is disabled by default, only enabled through TRXC FAKE_RSSI.
+ # When disabled, RSSI is calculated based on Tx power and Rx path loss
+ self.fake_rssi_enabled = False
+
+ self.rf_muted = False
+
# Actual ToA, RSSI, C/I, TA values
+ self.tx_power_base = self.NOMINAL_TX_POWER_DEFAULT
+ self.tx_att_base = self.TX_ATT_DEFAULT
self.toa256_base = self.TOA256_BASE_DEFAULT
- self.rssi_base = self.RSSI_BASE_DEFAULT
+ self.rssi_base = self.NOMINAL_TX_POWER_DEFAULT - self.TX_ATT_DEFAULT - self.PATH_LOSS_DEFAULT
self.ci_base = self.CI_BASE_DEFAULT
self.ta = 0
@@ -131,7 +143,7 @@ class FakeTRX(Transceiver):
@property
def toa256(self):
# Check if randomization is required
- if self.toa256_rand_threshold is 0:
+ if self.toa256_rand_threshold == 0:
return self.toa256_base
# Generate a random ToA value in required range
@@ -142,7 +154,7 @@ class FakeTRX(Transceiver):
@property
def rssi(self):
# Check if randomization is required
- if self.rssi_rand_threshold is 0:
+ if self.rssi_rand_threshold == 0:
return self.rssi_base
# Generate a random RSSI value in required range
@@ -151,9 +163,13 @@ class FakeTRX(Transceiver):
return random.randint(rssi_min, rssi_max)
@property
+ def tx_power(self):
+ return self.tx_power_base - self.tx_att_base
+
+ @property
def ci(self):
# Check if randomization is required
- if self.ci_rand_threshold is 0:
+ if self.ci_rand_threshold == 0:
return self.ci_base
# Generate a random C/I value in required range
@@ -165,7 +181,7 @@ class FakeTRX(Transceiver):
# Returns: True - drop, False - keep
def sim_burst_drop(self, msg):
# Check if dropping is required
- if self.burst_drop_amount is 0:
+ if self.burst_drop_amount == 0:
return False
if msg.fn % self.burst_drop_period == 0:
@@ -177,9 +193,6 @@ class FakeTRX(Transceiver):
return False
def _handle_data_msg_v1(self, src_msg, msg):
- # TODO: NOPE indications are not (yet) supported
- msg.nope_ind = False
-
# C/I (Carrier-to-Interference ratio)
msg.ci = self.ci
@@ -196,31 +209,51 @@ class FakeTRX(Transceiver):
msg.tsc_set = 0
msg.tsc = 0
- # Takes (partially initialized) TRX2L1 message,
+ # Takes (partially initialized) TRXD Rx message,
# simulates RF path parameters (such as RSSI),
# and sends towards the L1
- def send_data_msg(self, src_trx, src_msg, msg):
- # Override header version
- msg.ver = self.data_if._hdr_ver
+ def handle_data_msg(self, src_trx, src_msg, msg):
+ if self.rf_muted:
+ msg.nope_ind = True
+ elif not msg.nope_ind:
+ # Path loss simulation
+ msg.nope_ind = self.sim_burst_drop(msg)
+ if msg.nope_ind:
+ # Before TRXDv1, we simply drop the message
+ if msg.ver < 0x01:
+ del msg
+ return
+
+ # Since TRXDv1, we should send a NOPE.ind
+ del msg.burst # burst bits are omited
+ msg.burst = None
+
+ # TODO: shoud we make these values configurable?
+ msg.toa256 = self.TOA256_NOISE_DEFAULT
+ msg.rssi = self.RSSI_NOISE_DEFAULT
+ msg.ci = self.CI_NOISE_DEFAULT
+
+ self.data_if.send_msg(msg)
+ return
# Complete message header
msg.toa256 = self.toa256
- msg.rssi = self.rssi
+
+ # Apply RSSI based on transmitter:
+ if not self.fake_rssi_enabled:
+ msg.rssi = src_trx.tx_power - src_msg.pwr - self.PATH_LOSS_DEFAULT
+ else: # Apply fake RSSI
+ msg.rssi = self.rssi
# Version specific fields
if msg.ver >= 0x01:
self._handle_data_msg_v1(src_msg, msg)
# Apply optional Timing Advance
- if src_trx.ta is not 0:
+ if src_trx.ta != 0:
msg.toa256 -= src_trx.ta * 256
- # Path loss simulation
- if self.sim_burst_drop(msg):
- return
-
- # TODO: make legacy mode configurable (via argv?)
- self.data_if.send_msg(msg, legacy = True)
+ Transceiver.handle_data_msg(self, msg)
# Simulation specific CTRL command handler
def ctrl_cmd_handler(self, request):
@@ -257,9 +290,15 @@ class FakeTRX(Transceiver):
elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
log.debug("(%s) Recv FAKE_RSSI cmd" % self)
+ # Use negative threshold to disable fake_rssi if previously enabled:
+ if int(request[2]) < 0:
+ self.fake_rssi_enabled = False
+ return 0
+
# Parse and apply both base and threshold
self.rssi_base = int(request[1])
self.rssi_rand_threshold = int(request[2])
+ self.fake_rssi_enabled = True
return 0
# RSSI simulation
@@ -331,6 +370,15 @@ class FakeTRX(Transceiver):
self.burst_drop_period = period
return 0
+ # Artificial delay for the TRXC interface
+ # Syntax: CMD FAKE_TRXC_DELAY <DELAY_MS>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TRXC_DELAY", 1):
+ log.debug("(%s) Recv FAKE_TRXC_DELAY cmd", self)
+
+ self.ctrl_if.rsp_delay_ms = int(request[1])
+ log.info("(%s) Artificial TRXC delay set to %d",
+ self, self.ctrl_if.rsp_delay_ms)
+
# Unhandled command
return None
@@ -358,34 +406,31 @@ class Application(ApplicationBase):
self.fake_pm.trx_list = self.trx_list
# Init TRX instance for BTS
- self.append_trx(self.argv.bts_addr,
- self.argv.bts_base_port, name = "BTS")
+ self.append_trx(self.argv.bts_addr, self.argv.bts_base_port, name = "BTS")
# Init TRX instance for BB
- self.append_trx(self.argv.bb_addr,
- self.argv.bb_base_port, name = "MS")
+ self.append_trx(self.argv.bb_addr, self.argv.bb_base_port, name = "MS", child_mgt = False)
# Additional transceivers (optional)
if self.argv.trx_list is not None:
for trx_def in self.argv.trx_list:
(name, addr, port, idx) = trx_def
- self.append_child_trx(addr, port, idx, name)
+ self.append_child_trx(addr, port, name = name, child_idx = idx)
# Burst forwarding between transceivers
- self.burst_fwd = BurstForwarder(self.trx_list)
+ self.burst_fwd = BurstForwarder(self.trx_list.trx_list)
log.info("Init complete")
- def append_trx(self, remote_addr, base_port, name = None):
+ def append_trx(self, remote_addr, base_port, **kwargs):
trx = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port,
- clck_gen = self.clck_gen, pwr_meas = self.fake_pm,
- name = name)
+ clck_gen = self.clck_gen, pwr_meas = self.fake_pm, **kwargs)
self.trx_list.add_trx(trx)
- def append_child_trx(self, remote_addr, base_port, child_idx, name = None):
- # Index 0 corresponds to the first transceiver
- if child_idx is 0:
- self.append_trx(remote_addr, base_port, name)
+ def append_child_trx(self, remote_addr, base_port, **kwargs):
+ child_idx = kwargs.get("child_idx", 0)
+ if child_idx == 0: # Index 0 indicates parent transceiver
+ self.append_trx(remote_addr, base_port, **kwargs)
return
# Find 'parent' transceiver for a new child
@@ -396,7 +441,7 @@ class Application(ApplicationBase):
# Allocate a new child
trx_child = FakeTRX(self.argv.trx_bind_addr, remote_addr, base_port,
- child_idx = child_idx, pwr_meas = self.fake_pm, name = name)
+ pwr_meas = self.fake_pm, **kwargs)
self.trx_list.add_trx(trx_child)
# Link a new 'child' with its 'parent'
@@ -405,7 +450,7 @@ class Application(ApplicationBase):
def run(self):
# Compose list of to be monitored sockets
sock_list = []
- for trx in self.trx_list:
+ for trx in self.trx_list.trx_list:
sock_list.append(trx.ctrl_if.sock)
sock_list.append(trx.data_if.sock)
@@ -415,7 +460,7 @@ class Application(ApplicationBase):
r_event, _, _ = select.select(sock_list, [], [])
# Iterate over all transceivers
- for trx in self.trx_list:
+ for trx in self.trx_list.trx_list:
# DATA interface
if trx.data_if.sock in r_event:
msg = trx.recv_data_msg()
@@ -440,7 +485,7 @@ class Application(ApplicationBase):
@staticmethod
def trx_def(val):
try:
- result = re.match("(.+@)?(.+):([0-9]+)(\/[0-9]+)?", val)
+ result = re.match(r"(.+@)?(.+):([0-9]+)(/[0-9]+)?", val)
(name, addr, port, idx) = result.groups()
except:
raise argparse.ArgumentTypeError("Invalid TRX definition: %s" % val)