summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit
diff options
context:
space:
mode:
authorVadim Yanitskiy <axilirator@gmail.com>2018-12-10 17:39:51 +0700
committerVadim Yanitskiy <axilirator@gmail.com>2018-12-18 05:47:11 +0700
commit152a2da8d21d7a10d217e3adb51f57ca4e848d82 (patch)
tree89c24746ccaaf1e82f80d59f1aeccea006e028ff /src/target/trx_toolkit
parent1197382e8db5b52fa001dc16ec76389ccf1b6f53 (diff)
trx_toolkit/fake_trx.py: refactor global class hierarchy
This change is a big step towards handling of multiple transceivers in a single process, i.e. multiple MS and multiple BTS connections. The old class hierarchy wasn't flexible enough, because initially fake_trx was designed as a bridge between OsmocomBB and OsmoBTS, but not as the burst router. There were two separate, but 90% similar implementations of the CTRL interface, two variations of each simulation parameter - one for UL, another for DL. The following new classes are introduced: - Transceiver - represents a single transceiver, that can be used as for the BTS side, as for the MS side. Each instance has its own CTRL, DATA, and (optionally) CLCK interfaces, among with basic state variables, such as both RX / TX freq., power state (running or idle) and list of active timeslots. - CTRLInterfaceTRX - unified control interface handler for common transceiver management commands, such as POWERON, RXTUNE, and SETSLOT. Deprecates both CTRLInterface{BB|BTS}. - FakeTRX - basically, a child of Transceiver, extended with RF path (burst loss, RSSI, TA, ToA) simulation. Implements a custom CTRL command handler for CTRLInterfaceTRX. The following classes were refactored: - BurstForwarder - still performs burst forwarding, but now it doesn't store any simulation parameters, and doesn't know who is BTS, and who is MS. Actually, BurstForwarder transforms each L12TRX message into a TRX2L1 message, and dispatches it between running transceivers with matching RX frequency and matching timeslot. - FakePM - still generates random RSSI values, but doesn't distinguish between MS and BTS anymore. As soon as a measurement request is received, it attempts to find at least one running TRX on a given frequency. Please note that fake_trx.py still does handle only a single pair of MS and BTS. No regressions have been observed. Both new and refactored classes were documented. Change-Id: Ice44e2b22566b3652ef6d43896055963b13ab185 Related: OS#3667
Diffstat (limited to 'src/target/trx_toolkit')
-rw-r--r--src/target/trx_toolkit/burst_fwd.py363
-rw-r--r--src/target/trx_toolkit/ctrl_if_bb.py219
-rw-r--r--src/target/trx_toolkit/ctrl_if_bts.py189
-rw-r--r--src/target/trx_toolkit/ctrl_if_trx.py155
-rw-r--r--src/target/trx_toolkit/fake_pm.py66
-rwxr-xr-xsrc/target/trx_toolkit/fake_trx.py306
-rw-r--r--src/target/trx_toolkit/transceiver.py155
7 files changed, 663 insertions, 790 deletions
diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py
index 3cb6acd0..38ce18f3 100644
--- a/src/target/trx_toolkit/burst_fwd.py
+++ b/src/target/trx_toolkit/burst_fwd.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# TRX Toolkit
-# BTS <-> BB burst forwarding
+# Burst forwarding between transceivers
#
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
@@ -23,321 +23,62 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
-import random
-
-from data_msg import *
class BurstForwarder:
- """ Performs burst forwarding and preprocessing between MS and BTS.
-
- == Pass-filtering parameters
-
- BurstForwarder may drop or pass an UL/DL burst depending
- on the following parameters:
-
- - bts_freq / bb_freq - the current BTS / MS frequency
- that was set using RXTUNE control command. By default,
- both freq. values are set to None, so nothing is being
- forwarded (i.e. bursts are getting dropped).
-
- FIXME: currently, we don't care about TXTUNE command
- and transmit frequencies. It would be great to distinguish
- between RX and TX frequencies for both BTS and MS.
-
- - ts_pass_list - the list of active (i.e. configured)
- timeslot numbers for the MS. A timeslot can be activated
- or deactivated using SETSLOT control command from the MS.
-
- FIXME: there is no such list for the BTS side.
-
- == Preprocessing and measurement simulation
-
- Since this is a virtual environment, we can simulate different
- parameters of a virtual RF interface:
+ """ Performs burst forwarding between transceivers.
- - ToA (Timing of Arrival) - measured difference between expected
- and actual time of burst arrival in units of 1/256 of GSM symbol
- periods. A pair of both base and threshold values defines a range
- of ToA value randomization:
+ BurstForwarder distributes bursts between the list of given
+ FakeTRX (Transceiver) instances depending on the following
+ parameters of each transceiver:
- DL: from (toa256_dl_base - toa256_dl_threshold)
- to (toa256_dl_base + toa256_dl_threshold),
- UL: from (toa256_ul_base - toa256_ul_threshold)
- to (toa256_ul_base + toa256_ul_threshold).
+ - execution state (running or idle),
+ - actual RX / TX frequencies,
+ - list of active timeslots.
- - RSSI (Received Signal Strength Indication) - measured "power" of
- the signal (per burst) in dBm. A pair of both base and threshold
- values defines a range of RSSI value randomization:
-
- DL: from (rssi_dl_base - rssi_dl_threshold)
- to (rssi_dl_base + rssi_dl_threshold),
- UL: from (rssi_ul_base - rssi_ul_threshold)
- to (rssi_ul_base + rssi_ul_threshold).
-
- Please note that the randomization of both RSSI and ToA
- is optional, and can be enabled from the control interface.
-
- === Timing Advance handling
-
- The BTS is using ToA measurements for UL bursts in order to calculate
- Timing Advance value, that is then indicated to a MS, which in its turn
- shall apply this value to the transmitted signal in order to compensate
- the delay. Basically, every burst is transmitted in advance defined by
- the indicated Timing Advance value. The valid range is 0..63, where
- each unit means one GSM symbol advance. The actual Timing Advance value
- is set using SETTA control command from MS. By default, it's set to 0.
-
- === Path loss simulation - burst dropping
-
- In some cases, e.g. due to a weak signal or high interference, a burst
- can be lost, i.e. not detected by the receiver. This can also be
- simulated using FAKE_DROP command on both control interfaces:
-
- - burst_{dl|ul}_drop_amount - the amount of DL/UL bursts
- to be dropped (i.e. not forwarded towards the MS/BTS),
-
- - burst_{dl|ul}_drop_period - drop every X DL/UL burst, e.g.
- 1 - drop every consequent burst, 2 - drop every second burst, etc.
+ Each to be distributed L12TRX message is being transformed
+ into a TRX2L1 message, and then forwarded to transceivers
+ with partially initialized header. All uninitialized header
+ fields (such as rssi and toa256) shall be set by each
+ transceiver individually before sending towards the L1.
"""
- def __init__(self, bts_link, bb_link):
- self.bts_link = bts_link
- self.bb_link = bb_link
-
- # Init default parameters
- self.reset_dl()
- self.reset_ul()
-
- # Initialize (or reset to) default parameters for Downlink
- def reset_dl(self):
- # Unset current DL freq.
- self.bts_freq = None
-
- # Indicated RSSI / ToA values
- self.toa256_dl_base = 0
- self.rssi_dl_base = -60
-
- # RSSI / ToA randomization threshold
- self.toa256_dl_threshold = 0
- self.rssi_dl_threshold = 0
-
- # Path loss simulation (burst dropping)
- self.burst_dl_drop_amount = 0
- self.burst_dl_drop_period = 1
-
- # Initialize (or reset to) default parameters for Uplink
- def reset_ul(self):
- # Unset current DL freq.
- self.bb_freq = None
-
- # Indicated RSSI / ToA values
- self.rssi_ul_base = -70
- self.toa256_ul_base = 0
-
- # RSSI / ToA randomization threshold
- self.toa256_ul_threshold = 0
- self.rssi_ul_threshold = 0
-
- # Path loss simulation (burst dropping)
- self.burst_ul_drop_amount = 0
- self.burst_ul_drop_period = 1
-
- # Init timeslot filter (drop everything by default)
- self.ts_pass_list = []
-
- # Reset Timing Advance value
- self.ta = 0
-
- # Converts TA value from symbols to
- # units of 1/256 of GSM symbol periods
- def calc_ta256(self):
- return self.ta * 256
-
- # Calculates a random ToA value for Downlink bursts
- def calc_dl_toa256(self):
- # Check if randomization is required
- if self.toa256_dl_threshold is 0:
- return self.toa256_dl_base
-
- # Calculate a range for randomization
- toa256_min = self.toa256_dl_base - self.toa256_dl_threshold
- toa256_max = self.toa256_dl_base + self.toa256_dl_threshold
-
- # Generate a random ToA value
- toa256 = random.randint(toa256_min, toa256_max)
-
- return toa256
-
- # Calculates a random ToA value for Uplink bursts
- def calc_ul_toa256(self):
- # Check if randomization is required
- if self.toa256_ul_threshold is 0:
- return self.toa256_ul_base
-
- # Calculate a range for randomization
- toa256_min = self.toa256_ul_base - self.toa256_ul_threshold
- toa256_max = self.toa256_ul_base + self.toa256_ul_threshold
-
- # Generate a random ToA value
- toa256 = random.randint(toa256_min, toa256_max)
-
- return toa256
-
- # Calculates a random RSSI value for Downlink bursts
- def calc_dl_rssi(self):
- # Check if randomization is required
- if self.rssi_dl_threshold is 0:
- return self.rssi_dl_base
-
- # Calculate a range for randomization
- rssi_min = self.rssi_dl_base - self.rssi_dl_threshold
- rssi_max = self.rssi_dl_base + self.rssi_dl_threshold
-
- # Generate a random RSSI value
- return random.randint(rssi_min, rssi_max)
-
- # Calculates a random RSSI value for Uplink bursts
- def calc_ul_rssi(self):
- # Check if randomization is required
- if self.rssi_ul_threshold is 0:
- return self.rssi_ul_base
-
- # Calculate a range for randomization
- rssi_min = self.rssi_ul_base - self.rssi_ul_threshold
- rssi_max = self.rssi_ul_base + self.rssi_ul_threshold
-
- # Generate a random RSSI value
- return random.randint(rssi_min, rssi_max)
-
- # DL path loss simulation
- def path_loss_sim_dl(self, msg):
- # Burst dropping
- if self.burst_dl_drop_amount > 0:
- if msg.fn % self.burst_dl_drop_period == 0:
- log.info("Simulation: dropping DL burst (fn=%u %% %u == 0)"
- % (msg.fn, self.burst_dl_drop_period))
- self.burst_dl_drop_amount -= 1
- return None
-
- return msg
-
- # UL path loss simulation
- def path_loss_sim_ul(self, msg):
- # Burst dropping
- if self.burst_ul_drop_amount > 0:
- if msg.fn % self.burst_ul_drop_period == 0:
- log.info("Simulation: dropping UL burst (fn=%u %% %u == 0)"
- % (msg.fn, self.burst_ul_drop_period))
- self.burst_ul_drop_amount -= 1
- return None
-
- return msg
-
- # DL burst preprocessing
- def preprocess_dl_burst(self, msg):
- # Calculate both RSSI and ToA values
- msg.toa256 = self.calc_dl_toa256()
- msg.rssi = self.calc_dl_rssi()
-
- # UL burst preprocessing
- def preprocess_ul_burst(self, msg):
- # Calculate both RSSI and ToA values,
- # also apply Timing Advance
- msg.toa256 = self.calc_ul_toa256()
- msg.toa256 -= self.calc_ta256()
- msg.rssi = self.calc_ul_rssi()
-
- # Converts a L12TRX message to TRX2L1 message
- def transform_msg(self, msg_raw):
- # Attempt to parse a message
- try:
- msg_l12trx = DATAMSG_L12TRX()
- msg_l12trx.parse_msg(bytearray(msg_raw))
- except:
- log.error("Dropping unhandled DL message...")
- return None
-
- # Compose a new message for L1
- return msg_l12trx.gen_trx2l1()
-
- # Downlink handler: BTS -> BB
- def bts2bb(self):
- # Read data from socket
- data, addr = self.bts_link.sock.recvfrom(512)
-
- # BB is not connected / tuned
- if self.bb_freq is None:
- return None
-
- # Freq. filter
- if self.bb_freq != self.bts_freq:
- return None
-
- # Process a message
- msg = self.transform_msg(data)
- if msg is None:
- return None
-
- # Timeslot filter
- if msg.tn not in self.ts_pass_list:
- return None
-
- # Path loss simulation
- msg = self.path_loss_sim_dl(msg)
- if msg is None:
- return None
-
- # Burst preprocessing
- self.preprocess_dl_burst(msg)
-
- # Validate and generate the payload
- payload = msg.gen_msg()
-
- # Append two unused bytes at the end
- # in order to keep the compatibility
- payload += bytearray(2)
-
- # Send burst to BB
- self.bb_link.send(payload)
-
- # Uplink handler: BB -> BTS
- def bb2bts(self):
- # Read data from socket
- data, addr = self.bb_link.sock.recvfrom(512)
-
- # BTS is not connected / tuned
- if self.bts_freq is None:
- return None
-
- # Freq. filter
- if self.bb_freq != self.bts_freq:
- return None
-
- # Process a message
- msg = self.transform_msg(data)
- if msg is None:
- return None
-
- # Timeslot filter
- if msg.tn not in self.ts_pass_list:
- log.warning("TS %u is not configured, dropping UL burst..." % msg.tn)
- return None
-
- # Path loss simulation
- msg = self.path_loss_sim_ul(msg)
- if msg is None:
- return None
-
- # Burst preprocessing
- self.preprocess_ul_burst(msg)
-
- # Validate and generate the payload
- payload = msg.gen_msg()
-
- # Append two unused bytes at the end
- # in order to keep the compatibility
- payload += bytearray(2)
-
- # Send burst to BTS
- self.bts_link.send(payload)
+ def __init__(self, trx_list = []):
+ # List of Transceiver instances
+ self.trx_list = trx_list
+
+ def add_trx(self, trx):
+ if trx in self.trx_list:
+ log.error("TRX is already in the list")
+ return
+
+ self.trx_list.append(trx)
+
+ def del_trx(self, trx):
+ if trx not in self.trx_list:
+ log.error("TRX is not in the list")
+ return
+
+ self.trx_list.remove(trx)
+
+ def forward_msg(self, src_trx, rx_msg):
+ # Transform from L12TRX to TRX2L1
+ tx_msg = rx_msg.gen_trx2l1()
+ if tx_msg is None:
+ log.error("Forwarding failed, could not transform "
+ "message (%s) => dropping..." % rx_msg.desc_hdr())
+
+ # Iterate over all known transceivers
+ for trx in self.trx_list:
+ if trx == src_trx:
+ continue
+
+ # Check transceiver state
+ if not trx.running:
+ continue
+ if trx.rx_freq != src_trx.tx_freq:
+ continue
+ if tx_msg.tn not in trx.ts_list:
+ continue
+
+ trx.send_data_msg(src_trx, tx_msg)
diff --git a/src/target/trx_toolkit/ctrl_if_bb.py b/src/target/trx_toolkit/ctrl_if_bb.py
deleted file mode 100644
index d25aa306..00000000
--- a/src/target/trx_toolkit/ctrl_if_bb.py
+++ /dev/null
@@ -1,219 +0,0 @@
-#!/usr/bin/env python2
-# -*- coding: utf-8 -*-
-
-# TRX Toolkit
-# CTRL interface implementation (OsmocomBB specific)
-#
-# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
-#
-# All Rights Reserved
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-import logging as log
-
-from ctrl_if import CTRLInterface
-
-class CTRLInterfaceBB(CTRLInterface):
- # Internal state variables
- trx_started = False
- burst_fwd = None
- rx_freq = None
- tx_freq = None
- pm = None
-
- def __init__(self, *udp_link_args):
- CTRLInterface.__init__(self, *udp_link_args)
- log.info("Init CTRL interface for BB (%s)" % self.desc_link())
-
- def parse_cmd(self, request):
- # Power control
- if self.verify_cmd(request, "POWERON", 0):
- log.debug("Recv POWERON CMD")
-
- # Ensure transceiver isn't working
- if self.trx_started:
- log.error("Transceiver already started")
- return -1
-
- # Ensure RX / TX freq. are set
- if (self.rx_freq is None) or (self.tx_freq is None):
- log.error("RX / TX freq. are not set")
- return -1
-
- log.info("Starting transceiver...")
- self.trx_started = True
- return 0
-
- elif self.verify_cmd(request, "POWEROFF", 0):
- log.debug("Recv POWEROFF cmd")
-
- log.info("Stopping transceiver...")
- self.trx_started = False
- return 0
-
- # Tuning Control
- elif self.verify_cmd(request, "RXTUNE", 1):
- log.debug("Recv RXTUNE cmd")
-
- # TODO: check freq range
- self.rx_freq = int(request[1]) * 1000
- self.burst_fwd.bb_freq = self.rx_freq
- return 0
-
- elif self.verify_cmd(request, "TXTUNE", 1):
- log.debug("Recv TXTUNE cmd")
-
- # TODO: check freq range
- self.tx_freq = int(request[1]) * 1000
- return 0
-
- # Power measurement
- elif self.verify_cmd(request, "MEASURE", 1):
- log.debug("Recv MEASURE cmd")
-
- if self.pm is None:
- return -1
-
- # TODO: check freq range
- meas_freq = int(request[1]) * 1000
- meas_dbm = str(self.pm.measure(meas_freq))
-
- return (0, [meas_dbm])
-
- elif self.verify_cmd(request, "SETSLOT", 2):
- log.debug("Recv SETSLOT cmd")
-
- if self.burst_fwd is None:
- return -1
-
- # Obtain TS index
- ts = int(request[1])
- if ts not in range(0, 8):
- log.error("TS index should be in range: 0..7")
- return -1
-
- # Parse TS type
- ts_type = int(request[2])
-
- # TS activation / deactivation
- # We don't care about ts_type
- if ts_type == 0:
- # Deactivate TS (remove from TS pass-filter list)
- if ts in self.burst_fwd.ts_pass_list:
- self.burst_fwd.ts_pass_list.remove(ts)
- else:
- # Activate TS (add to TS pass-filter list)
- if ts not in self.burst_fwd.ts_pass_list:
- self.burst_fwd.ts_pass_list.append(ts)
-
- return 0
-
- # Timing Advance
- elif self.verify_cmd(request, "SETTA", 1):
- log.debug("Recv SETTA cmd")
-
- # Save to the BurstForwarder instance
- self.burst_fwd.ta = int(request[1])
- return 0
-
- # Timing of Arrival simulation for Uplink
- # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_TOA", 2):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.toa256_ul_base = int(request[1])
- self.burst_fwd.toa256_ul_threshold = int(request[2])
-
- return 0
-
- # Timing of Arrival simulation for Uplink
- # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_TOA", 1):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply delta
- self.burst_fwd.toa256_ul_base += int(request[1])
-
- return 0
-
- # RSSI simulation for Uplink
- # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_RSSI", 2):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.rssi_ul_base = int(request[1])
- self.burst_fwd.rssi_ul_threshold = int(request[2])
-
- return 0
-
- # RSSI simulation for Uplink
- # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_RSSI", 1):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply delta
- self.burst_fwd.rssi_ul_base += int(request[1])
-
- return 0
-
- # Path loss simulation for UL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT>
- # Dropping pattern: fn % 1 == 0
- elif self.verify_cmd(request, "FAKE_DROP", 1):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- self.burst_fwd.burst_ul_drop_amount = num
- self.burst_fwd.burst_ul_drop_period = 1
-
- return 0
-
- # Path loss simulation for UL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
- # Dropping pattern: fn % period == 0
- elif self.verify_cmd(request, "FAKE_DROP", 2):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- # Parse / validate period
- period = int(request[2])
- if period <= 0:
- log.error("FAKE_DROP period shall be greater than zero")
- return -1
-
- self.burst_fwd.burst_ul_drop_amount = num
- self.burst_fwd.burst_ul_drop_period = period
-
- return 0
-
- # Wrong / unknown command
- else:
- # We don't care about other commands,
- # so let's merely ignore them ;)
- log.debug("Ignore CMD %s" % request[0])
- return 0
diff --git a/src/target/trx_toolkit/ctrl_if_bts.py b/src/target/trx_toolkit/ctrl_if_bts.py
deleted file mode 100644
index cb38b67c..00000000
--- a/src/target/trx_toolkit/ctrl_if_bts.py
+++ /dev/null
@@ -1,189 +0,0 @@
-#!/usr/bin/env python2
-# -*- coding: utf-8 -*-
-
-# TRX Toolkit
-# CTRL interface implementation (OsmoBTS specific)
-#
-# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
-#
-# All Rights Reserved
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-import logging as log
-
-from ctrl_if import CTRLInterface
-
-class CTRLInterfaceBTS(CTRLInterface):
- # Internal state variables
- trx_started = False
- burst_fwd = None
- clck_gen = None
- rx_freq = None
- tx_freq = None
- pm = None
-
- def __init__(self, *udp_link_args):
- CTRLInterface.__init__(self, *udp_link_args)
- log.info("Init CTRL interface for BTS (%s)" % self.desc_link())
-
- def parse_cmd(self, request):
- # Power control
- if self.verify_cmd(request, "POWERON", 0):
- log.debug("Recv POWERON CMD")
-
- # Ensure transceiver isn't working
- if self.trx_started:
- log.error("Transceiver already started")
- return -1
-
- # Ensure RX / TX freq. are set
- if (self.rx_freq is None) or (self.tx_freq is None):
- log.error("RX / TX freq. are not set")
- return -1
-
- log.info("Starting transceiver...")
- self.trx_started = True
-
- # Power emulation
- if self.pm is not None:
- self.pm.add_bts_list([self.tx_freq])
-
- # Start clock indications
- if self.clck_gen is not None:
- self.clck_gen.start()
-
- return 0
-
- elif self.verify_cmd(request, "POWEROFF", 0):
- log.debug("Recv POWEROFF cmd")
-
- log.info("Stopping transceiver...")
- self.trx_started = False
-
- # Power emulation
- if self.pm is not None:
- self.pm.del_bts_list([self.tx_freq])
-
- # Stop clock indications
- if self.clck_gen is not None:
- self.clck_gen.stop()
-
- return 0
-
- # Tuning Control
- elif self.verify_cmd(request, "RXTUNE", 1):
- log.debug("Recv RXTUNE cmd")
-
- # TODO: check freq range
- self.rx_freq = int(request[1]) * 1000
- return 0
-
- elif self.verify_cmd(request, "TXTUNE", 1):
- log.debug("Recv TXTUNE cmd")
-
- # TODO: check freq range
- self.tx_freq = int(request[1]) * 1000
- self.burst_fwd.bts_freq = self.tx_freq
- return 0
-
- # Timing of Arrival simulation for Downlink
- # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_TOA", 2):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.toa256_dl_base = int(request[1])
- self.burst_fwd.toa256_dl_threshold = int(request[2])
-
- return 0
-
- # Timing of Arrival simulation for Downlink
- # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_TOA", 1):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply delta
- self.burst_fwd.toa256_dl_base += int(request[1])
-
- return 0
-
- # RSSI simulation for Downlink
- # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_RSSI", 2):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.rssi_dl_base = int(request[1])
- self.burst_fwd.rssi_dl_threshold = int(request[2])
-
- return 0
-
- # RSSI simulation for Downlink
- # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_RSSI", 1):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply delta
- self.burst_fwd.rssi_dl_base += int(request[1])
-
- return 0
-
- # Path loss simulation for DL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT>
- # Dropping pattern: fn % 1 == 0
- elif self.verify_cmd(request, "FAKE_DROP", 1):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- self.burst_fwd.burst_dl_drop_amount = num
- self.burst_fwd.burst_dl_drop_period = 1
-
- return 0
-
- # Path loss simulation for DL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
- # Dropping pattern: fn % period == 0
- elif self.verify_cmd(request, "FAKE_DROP", 2):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- # Parse / validate period
- period = int(request[2])
- if period <= 0:
- log.error("FAKE_DROP period shall be greater than zero")
- return -1
-
- self.burst_fwd.burst_dl_drop_amount = num
- self.burst_fwd.burst_dl_drop_period = period
-
- return 0
-
- # Wrong / unknown command
- else:
- # We don't care about other commands,
- # so let's merely ignore them ;)
- log.debug("Ignore CMD %s" % request[0])
- return 0
diff --git a/src/target/trx_toolkit/ctrl_if_trx.py b/src/target/trx_toolkit/ctrl_if_trx.py
new file mode 100644
index 00000000..83d55db1
--- /dev/null
+++ b/src/target/trx_toolkit/ctrl_if_trx.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# CTRL interface implementation (common commands)
+#
+# (C) 2016-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import logging as log
+
+from ctrl_if import CTRLInterface
+
+class CTRLInterfaceTRX(CTRLInterface):
+ """ CTRL interface handler for common transceiver management commands.
+
+ The following set of commands is mandatory for every transceiver:
+
+ - POWERON / POWEROFF - state management (running / idle),
+ - RXTUNE / TXTUNE - RX / TX frequency management,
+ - SETSLOT - timeslot management.
+
+ Additionally, there is an optional MEASURE command, which is used
+ by OsmocomBB to perform power measurement on a given frequency.
+
+ A given transceiver may also define its own command handler,
+ that is prioritized, i.e. it can overwrite any commands mentioned
+ above. If None is returned, a command is considered as unhandled.
+
+ """
+
+ def __init__(self, trx, *udp_link_args):
+ CTRLInterface.__init__(self, *udp_link_args)
+ log.info("Init CTRL interface (%s)" % self.desc_link())
+
+ # Link with Transceiver instance we belong to
+ self.trx = trx
+
+ def parse_cmd(self, request):
+ # Custom command handlers (prioritized)
+ res = self.trx.ctrl_cmd_handler(request)
+ if res is not None:
+ return res
+
+ # Power control
+ if self.verify_cmd(request, "POWERON", 0):
+ log.debug("Recv POWERON CMD")
+
+ # Ensure transceiver isn't working
+ if self.trx.running:
+ log.error("Transceiver already started")
+ return -1
+
+ # Ensure RX / TX freq. are set
+ if (self.trx.rx_freq is None) or (self.trx.tx_freq is None):
+ log.error("RX / TX freq. are not set")
+ return -1
+
+ log.info("Starting transceiver...")
+ self.trx.running = True
+
+ # Notify transceiver about that
+ self.trx.power_event_handler("POWERON")
+
+ return 0
+
+ elif self.verify_cmd(request, "POWEROFF", 0):
+ log.debug("Recv POWEROFF cmd")
+
+ log.info("Stopping transceiver...")
+ self.trx.running = False
+
+ # Notify transceiver about that
+ self.trx.power_event_handler("POWEROFF")
+
+ return 0
+
+ # Tuning Control
+ elif self.verify_cmd(request, "RXTUNE", 1):
+ log.debug("Recv RXTUNE cmd")
+
+ # TODO: check freq range
+ self.trx.rx_freq = int(request[1]) * 1000
+ return 0
+
+ elif self.verify_cmd(request, "TXTUNE", 1):
+ log.debug("Recv TXTUNE cmd")
+
+ # TODO: check freq range
+ self.trx.tx_freq = int(request[1]) * 1000
+ return 0
+
+ elif self.verify_cmd(request, "SETSLOT", 2):
+ log.debug("Recv SETSLOT cmd")
+
+ # Obtain TS index
+ ts = int(request[1])
+ if ts not in range(0, 8):
+ log.error("TS index should be in range: 0..7")
+ return -1
+
+ # Parse TS type
+ ts_type = int(request[2])
+
+ # TS activation / deactivation
+ # We don't care about ts_type
+ if ts_type == 0:
+ # Deactivate TS (remove from the list of active timeslots)
+ if ts in self.trx.ts_list:
+ self.trx.ts_list.remove(ts)
+ else:
+ # Activate TS (add to the list of active timeslots)
+ if ts not in self.trx.ts_list:
+ self.trx.ts_list.append(ts)
+
+ return 0
+
+ # Power measurement
+ if self.verify_cmd(request, "MEASURE", 1):
+ log.debug("Recv MEASURE cmd")
+
+ # Power Measurement interface is optional
+ # for Transceiver, thus may be uninitialized
+ if self.trx.pwr_meas is None:
+ log.error("Power Measurement interface "
+ "is not initialized => rejecting command")
+ return -1
+
+ # TODO: check freq range
+ meas_freq = int(request[1]) * 1000
+ meas_dbm = self.trx.pwr_meas.measure(meas_freq)
+
+ return (0, [str(meas_dbm)])
+
+ # Wrong / unknown command
+ else:
+ # We don't care about other commands,
+ # so let's merely ignore them ;)
+ log.debug("Ignore CMD %s" % request[0])
+ return 0
diff --git a/src/target/trx_toolkit/fake_pm.py b/src/target/trx_toolkit/fake_pm.py
index 840b4e40..1992f8d3 100644
--- a/src/target/trx_toolkit/fake_pm.py
+++ b/src/target/trx_toolkit/fake_pm.py
@@ -2,9 +2,9 @@
# -*- coding: utf-8 -*-
# TRX Toolkit
-# Power measurement emulation for BB
+# Power measurement emulation
#
-# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@@ -25,29 +25,53 @@
from random import randint
class FakePM:
- # Freq. list for good power level
- bts_list = []
+ """ Power measurement emulation for fake transceivers.
- def __init__(self, noise_min, noise_max, bts_min, bts_max):
- # Save power level ranges
+ There is no such thing like RF signal level in fake Um-interface,
+ so we need to emulate this. The main idea is to have a list of
+ all running and idle transceivers. As soon as a measurement
+ request is received, FakePM will attempt to find a running
+ transceiver on a given frequency.
+
+ The result of such "measurement" is a random RSSI value
+ in one of the following ranges:
+
+ - trx_min ... trx_max - if at least one TRX was found,
+ - noise_min ... noise_max - no TRX instances were found.
+
+ FIXME: it would be great to average the rate of bursts
+ and indicated power / attenuation values for all
+ matching transceivers, so "pure traffic" ARFCNs
+ would be handled properly.
+
+ """
+
+ def __init__(self, noise_min, noise_max, trx_min, trx_max):
+ # Init list of transceivers
+ self.trx_list = []
+
+ # RSSI randomization ranges
self.noise_min = noise_min
self.noise_max = noise_max
- self.bts_min = bts_min
- self.bts_max = bts_max
+ self.trx_min = trx_min
+ self.trx_max = trx_max
+
+ @property
+ def rssi_noise(self):
+ return randint(self.noise_min, self.noise_max)
- def measure(self, bts):
- if bts in self.bts_list:
- return randint(self.bts_min, self.bts_max)
- else:
- return randint(self.noise_min, self.noise_max)
+ @property
+ def rssi_trx(self):
+ return randint(self.trx_min, self.trx_max)
- def update_bts_list(self, new_list):
- self.bts_list = new_list
+ def measure(self, freq):
+ # Iterate over all known transceivers
+ for trx in self.trx_list:
+ if not trx.running:
+ continue
- def add_bts_list(self, add_list):
- self.bts_list += add_list
+ # Match by given frequency
+ if trx.tx_freq == freq:
+ return self.rssi_trx
- def del_bts_list(self, del_list):
- for item in del_list:
- if item in self.bts_list:
- self.bts_list.remove(item)
+ return self.rssi_noise
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 95261dfe..46b413aa 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -27,17 +27,237 @@ APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
import logging as log
import signal
import argparse
+import random
import select
import sys
from app_common import ApplicationBase
-from ctrl_if_bts import CTRLInterfaceBTS
-from ctrl_if_bb import CTRLInterfaceBB
from burst_fwd import BurstForwarder
+from transceiver import Transceiver
+from clck_gen import CLCKGen
from fake_pm import FakePM
-from udp_link import UDPLink
-from clck_gen import CLCKGen
+class FakeTRX(Transceiver):
+ """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
+
+ == ToA / RSSI measurement simulation
+
+ Since this is a virtual environment, we can simulate different
+ parameters of the physical RF interface:
+
+ - ToA (Timing of Arrival) - measured difference between expected
+ and actual time of burst arrival in units of 1/256 of GSM symbol
+ periods. A pair of both base and threshold values defines a range
+ of ToA value randomization:
+
+ from (toa256_base - toa256_rand_threshold)
+ to (toa256_base + toa256_rand_threshold).
+
+ - RSSI (Received Signal Strength Indication) - measured "power" of
+ the signal (per burst) in dBm. A pair of both base and threshold
+ values defines a range of RSSI value randomization:
+
+ from (rssi_base - rssi_rand_threshold)
+ to (rssi_base + rssi_rand_threshold).
+
+ Please note that randomization of both RSSI and ToA is optional,
+ and can be enabled from the control interface.
+
+ == Timing Advance handling
+
+ The BTS is using ToA measurements for UL bursts in order to calculate
+ Timing Advance value, that is then indicated to a MS, which in its turn
+ shall apply this value to the transmitted signal in order to compensate
+ the delay. Basically, every burst is transmitted in advance defined by
+ the indicated Timing Advance value. The valid range is 0..63, where
+ each unit means one GSM symbol advance. The actual Timing Advance value
+ is set using SETTA control command from MS. By default, it's set to 0.
+
+ == Path loss simulation
+
+ === Burst dropping
+
+ In some cases, e.g. due to a weak signal or high interference, a burst
+ can be lost, i.e. not detected by the receiver. This can also be
+ simulated using FAKE_DROP command on the control interface:
+
+ - burst_drop_amount - the amount of DL/UL bursts
+ to be dropped (i.e. not forwarded towards the MS/BTS),
+
+ - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
+
+ == Configuration
+
+ All simulation parameters mentioned above can be changed at runtime
+ using the commands with prefix 'FAKE_' on the control interface.
+ All of them are handled by our custom CTRL command handler.
+
+ """
+
+ TOA256_BASE_DEFAULT = 0
+ RSSI_BASE_DEFAULT = -60
+
+ def __init__(self, *trx_args, **trx_kwargs):
+ Transceiver.__init__(self, *trx_args, **trx_kwargs)
+
+ # Actual ToA / RSSI / TA values
+ self.toa256_base = self.TOA256_BASE_DEFAULT
+ self.rssi_base = self.RSSI_BASE_DEFAULT
+ self.ta = 0
+
+ # ToA / RSSI randomization threshold
+ self.toa256_rand_threshold = 0
+ self.rssi_rand_threshold = 0
+
+ # Path loss simulation (burst dropping)
+ self.burst_drop_amount = 0
+ self.burst_drop_period = 1
+
+ @property
+ def toa256(self):
+ # Check if randomization is required
+ if self.toa256_rand_threshold is 0:
+ return self.toa256_base
+
+ # Generate a random ToA value in required range
+ toa256_min = self.toa256_base - self.toa256_rand_threshold
+ toa256_max = self.toa256_base + self.toa256_rand_threshold
+ return random.randint(toa256_min, toa256_max)
+
+ @property
+ def rssi(self):
+ # Check if randomization is required
+ if self.rssi_rand_threshold is 0:
+ return self.rssi_base
+
+ # Generate a random RSSI value in required range
+ rssi_min = self.rssi_base - self.rssi_rand_threshold
+ rssi_max = self.rssi_base + self.rssi_rand_threshold
+ return random.randint(rssi_min, rssi_max)
+
+ # Path loss simulation: burst dropping
+ # Returns: True - drop, False - keep
+ def sim_burst_drop(self, msg):
+ # Check if dropping is required
+ if self.burst_drop_amount is 0:
+ return False
+
+ if msg.fn % self.burst_drop_period == 0:
+ log.info("Simulation: dropping burst (fn=%u %% %u == 0)"
+ % (msg.fn, self.burst_drop_period))
+ self.burst_drop_amount -= 1
+ return True
+
+ return False
+
+ # Takes (partially initialized) TRX2L1 message,
+ # simulates RF path parameters (such as RSSI),
+ # and sends towards the L1
+ def send_data_msg(self, src_trx, msg):
+ # Complete message header
+ msg.toa256 = self.toa256
+ msg.rssi = self.rssi
+
+ # Apply optional Timing Advance
+ if src_trx.ta is not 0:
+ msg.toa256 -= src_trx.ta * 256
+
+ # Path loss simulation
+ if self.sim_burst_drop(msg):
+ return
+
+ # TODO: make legacy mode configurable (via argv?)
+ self.data_if.send_msg(msg, legacy = True)
+
+ # Simulation specific CTRL command handler
+ def ctrl_cmd_handler(self, request):
+ # Timing Advance
+ # Syntax: CMD SETTA <TA>
+ if self.ctrl_if.verify_cmd(request, "SETTA", 1):
+ log.debug("Recv SETTA cmd")
+
+ # Store indicated value
+ self.ta = int(request[1])
+ return 0
+
+ # Timing of Arrival simulation
+ # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply both base and threshold
+ self.toa256_base = int(request[1])
+ self.toa256_rand_threshold = int(request[2])
+ return 0
+
+ # Timing of Arrival simulation
+ # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply delta
+ self.toa256_base += int(request[1])
+ return 0
+
+ # RSSI simulation
+ # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply both base and threshold
+ self.rssi_base = int(request[1])
+ self.rssi_rand_threshold = int(request[2])
+ return 0
+
+ # RSSI simulation
+ # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply delta
+ self.rssi_base += int(request[1])
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT>
+ # Dropping pattern: fn % 1 == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = 1
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
+ # Dropping pattern: fn % period == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ # Parse / validate period
+ period = int(request[2])
+ if period <= 0:
+ log.error("FAKE_DROP period shall be greater than zero")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = period
+ return 0
+
+ # Unhandled command
+ return None
class Application(ApplicationBase):
def __init__(self):
@@ -51,72 +271,58 @@ class Application(ApplicationBase):
self.app_init_logging(self.argv)
def run(self):
- # Init TRX CTRL interface for BTS
- self.bts_ctrl = CTRLInterfaceBTS(
- self.argv.bts_addr, self.argv.bts_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 1)
-
- # Init TRX CTRL interface for BB
- self.bb_ctrl = CTRLInterfaceBB(
- self.argv.bb_addr, self.argv.bb_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 1)
+ # Init shared clock generator
+ self.clck_gen = CLCKGen([])
# Power measurement emulation
# Noise: -120 .. -105
# BTS: -75 .. -50
- self.pm = FakePM(-120, -105, -75, -50)
-
- # Share a FakePM instance between both BTS and BB
- self.bts_ctrl.pm = self.pm
- self.bb_ctrl.pm = self.pm
-
- # Init DATA links
- self.bts_data = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 2)
- self.bb_data = UDPLink(
- self.argv.bb_addr, self.argv.bb_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 2)
-
- # BTS <-> BB burst forwarding
- self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data)
-
- # Share a BurstForwarder instance between BTS and BB
- self.bts_ctrl.burst_fwd = self.burst_fwd
- self.bb_ctrl.burst_fwd = self.burst_fwd
-
- # Provide clock to BTS
- self.bts_clck = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 100,
- self.argv.trx_bind_addr, self.argv.bts_base_port)
- self.clck_gen = CLCKGen([self.bts_clck])
- self.bts_ctrl.clck_gen = self.clck_gen
+ self.fake_pm = FakePM(-120, -105, -75, -50)
+
+ # Init TRX instance for BTS
+ self.bts_trx = FakeTRX(self.argv.trx_bind_addr,
+ self.argv.bts_addr, self.argv.bts_base_port,
+ clck_gen = self.clck_gen)
+
+ # Init TRX instance for BB
+ self.bb_trx = FakeTRX(self.argv.trx_bind_addr,
+ self.argv.bb_addr, self.argv.bb_base_port,
+ pwr_meas = self.fake_pm)
+
+ # Burst forwarding between transceivers
+ self.burst_fwd = BurstForwarder()
+ self.burst_fwd.add_trx(self.bts_trx)
+ self.burst_fwd.add_trx(self.bb_trx)
log.info("Init complete")
# Enter main loop
while True:
- socks = [self.bts_ctrl.sock, self.bb_ctrl.sock,
- self.bts_data.sock, self.bb_data.sock]
+ socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock,
+ self.bts_trx.data_if.sock, self.bb_trx.data_if.sock]
# Wait until we get any data on any socket
r_event, w_event, x_event = select.select(socks, [], [])
# Downlink: BTS -> BB
- if self.bts_data.sock in r_event:
- self.burst_fwd.bts2bb()
+ if self.bts_trx.data_if.sock in r_event:
+ msg = self.bts_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bts_trx, msg)
# Uplink: BB -> BTS
- if self.bb_data.sock in r_event:
- self.burst_fwd.bb2bts()
+ if self.bb_trx.data_if.sock in r_event:
+ msg = self.bb_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bb_trx, msg)
# CTRL commands from BTS
- if self.bts_ctrl.sock in r_event:
- self.bts_ctrl.handle_rx()
+ if self.bts_trx.ctrl_if.sock in r_event:
+ self.bts_trx.ctrl_if.handle_rx()
# CTRL commands from BB
- if self.bb_ctrl.sock in r_event:
- self.bb_ctrl.handle_rx()
+ if self.bb_trx.ctrl_if.sock in r_event:
+ self.bb_trx.ctrl_if.handle_rx()
def shutdown(self):
log.info("Shutting down...")
diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py
new file mode 100644
index 00000000..edd4d31f
--- /dev/null
+++ b/src/target/trx_toolkit/transceiver.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Transceiver implementation
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import logging as log
+
+from ctrl_if_trx import CTRLInterfaceTRX
+from data_if import DATAInterface
+from udp_link import UDPLink
+
+class Transceiver:
+ """ Base transceiver implementation.
+
+ Represents a single transceiver, that can be used as for the BTS side,
+ as for the MS side. Each individual instance of Transceiver unifies
+ three basic interfaces built on three independent UDP connections:
+
+ - CLCK (base port + 100/0) - clock indications from TRX to L1,
+ - CTRL (base port + 101/1) - control interface for L1,
+ - DATA (base port + 102/2) - bidirectional data interface for bursts.
+
+ A transceiver can be either in active (i.e. working), or in idle mode.
+ The active mode should ensure that both RX/TX frequencies are set.
+
+ NOTE: CLCK is not required for some L1 implementations, so it is optional.
+
+ == Timeslot configuration
+
+ Transceiver has a list of active (i.e. configured) TDMA timeslots.
+ The L1 should configure a timeslot before sending or expecting any
+ data on it. This is done by SETSLOT control command, which also
+ indicates an associated channel combination (see GSM TS 05.02).
+
+ NOTE: we don't store the associated channel combinations,
+ as they are only useful for burst detection and demodulation.
+
+ == Clock distribution (optional)
+
+ The clock indications are not expected by L1 when transceiver
+ is not running, so we monitor both POWERON / POWEROFF events
+ from the control interface, and keep the list of CLCK links
+ in a given CLCKGen instance updated. The clock generator is
+ started and stopped automatically.
+
+ NOTE: a single instance of CLCKGen can be shared between multiple
+ transceivers, as well as multiple transceivers may use
+ individual CLCKGen instances.
+
+ == Power Measurement (optional)
+
+ Transceiver may have an optional power measurement interface,
+ that shall provide at least one method: measure(freq). This
+ is required for the MS side (i.e. OsmocomBB).
+
+ """
+
+ def __init__(self, bind_addr, remote_addr, base_port,
+ clck_gen = None, pwr_meas = None):
+ # Connection info
+ self.remote_addr = remote_addr
+ self.bind_addr = bind_addr
+ self.base_port = base_port
+
+ # Init DATA interface
+ self.data_if = DATAInterface(
+ remote_addr, base_port + 102,
+ bind_addr, base_port + 2)
+
+ # Init CTRL interface
+ self.ctrl_if = CTRLInterfaceTRX(self,
+ remote_addr, base_port + 101,
+ bind_addr, base_port + 1)
+
+ # Init optional CLCK interface
+ self.clck_gen = clck_gen
+ if clck_gen is not None:
+ self.clck_if = UDPLink(
+ remote_addr, base_port + 100,
+ bind_addr, base_port)
+
+ # Optional Power Measurement interface
+ self.pwr_meas = pwr_meas
+
+ # Internal state
+ self.running = False
+
+ # Actual RX / TX frequencies
+ self.rx_freq = None
+ self.tx_freq = None
+
+ # List of active (configured) timeslots
+ self.ts_list = []
+
+ # To be overwritten if required,
+ # no custom command handlers by default
+ def ctrl_cmd_handler(self, request):
+ return None
+
+ def power_event_handler(self, event):
+ # Trigger clock generator if required
+ if self.clck_gen is not None:
+ clck_links = self.clck_gen.clck_links
+ if not self.running and (self.clck_if in clck_links):
+ # Transceiver was stopped
+ clck_links.remove(self.clck_if)
+ elif self.running and (self.clck_if not in clck_links):
+ # Transceiver was started
+ clck_links.append(self.clck_if)
+
+ if not self.clck_gen.timer and len(clck_links) > 0:
+ log.info("Starting clock generator")
+ self.clck_gen.start()
+ elif self.clck_gen.timer and not clck_links:
+ log.info("Stopping clock generator")
+ self.clck_gen.stop()
+
+ def recv_data_msg(self):
+ # Read and parse data from socket
+ msg = self.data_if.recv_l12trx_msg()
+ if not msg:
+ return None
+
+ # Make sure that transceiver is configured and running
+ if not self.running:
+ log.warning("RX DATA message (%s), but transceiver "
+ "is not running => dropping..." % msg.desc_hdr())
+ return None
+
+ # Make sure that indicated timeslot is configured
+ if msg.tn not in self.ts_list:
+ log.warning("RX DATA message (%s), but timeslot "
+ "is not configured => dropping..." % msg.desc_hdr())
+ return None
+
+ return msg