summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/target/trx_toolkit/burst_fwd.py363
-rw-r--r--src/target/trx_toolkit/ctrl_if_bb.py219
-rw-r--r--src/target/trx_toolkit/ctrl_if_bts.py189
-rw-r--r--src/target/trx_toolkit/ctrl_if_trx.py155
-rw-r--r--src/target/trx_toolkit/fake_pm.py66
-rwxr-xr-xsrc/target/trx_toolkit/fake_trx.py306
-rw-r--r--src/target/trx_toolkit/transceiver.py155
7 files changed, 663 insertions, 790 deletions
diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py
index 3cb6acd0..38ce18f3 100644
--- a/src/target/trx_toolkit/burst_fwd.py
+++ b/src/target/trx_toolkit/burst_fwd.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# TRX Toolkit
-# BTS <-> BB burst forwarding
+# Burst forwarding between transceivers
#
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
@@ -23,321 +23,62 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
-import random
-
-from data_msg import *
class BurstForwarder:
- """ Performs burst forwarding and preprocessing between MS and BTS.
-
- == Pass-filtering parameters
-
- BurstForwarder may drop or pass an UL/DL burst depending
- on the following parameters:
-
- - bts_freq / bb_freq - the current BTS / MS frequency
- that was set using RXTUNE control command. By default,
- both freq. values are set to None, so nothing is being
- forwarded (i.e. bursts are getting dropped).
-
- FIXME: currently, we don't care about TXTUNE command
- and transmit frequencies. It would be great to distinguish
- between RX and TX frequencies for both BTS and MS.
-
- - ts_pass_list - the list of active (i.e. configured)
- timeslot numbers for the MS. A timeslot can be activated
- or deactivated using SETSLOT control command from the MS.
-
- FIXME: there is no such list for the BTS side.
-
- == Preprocessing and measurement simulation
-
- Since this is a virtual environment, we can simulate different
- parameters of a virtual RF interface:
+ """ Performs burst forwarding between transceivers.
- - ToA (Timing of Arrival) - measured difference between expected
- and actual time of burst arrival in units of 1/256 of GSM symbol
- periods. A pair of both base and threshold values defines a range
- of ToA value randomization:
+ BurstForwarder distributes bursts between the list of given
+ FakeTRX (Transceiver) instances depending on the following
+ parameters of each transceiver:
- DL: from (toa256_dl_base - toa256_dl_threshold)
- to (toa256_dl_base + toa256_dl_threshold),
- UL: from (toa256_ul_base - toa256_ul_threshold)
- to (toa256_ul_base + toa256_ul_threshold).
+ - execution state (running or idle),
+ - actual RX / TX frequencies,
+ - list of active timeslots.
- - RSSI (Received Signal Strength Indication) - measured "power" of
- the signal (per burst) in dBm. A pair of both base and threshold
- values defines a range of RSSI value randomization:
-
- DL: from (rssi_dl_base - rssi_dl_threshold)
- to (rssi_dl_base + rssi_dl_threshold),
- UL: from (rssi_ul_base - rssi_ul_threshold)
- to (rssi_ul_base + rssi_ul_threshold).
-
- Please note that the randomization of both RSSI and ToA
- is optional, and can be enabled from the control interface.
-
- === Timing Advance handling
-
- The BTS is using ToA measurements for UL bursts in order to calculate
- Timing Advance value, that is then indicated to a MS, which in its turn
- shall apply this value to the transmitted signal in order to compensate
- the delay. Basically, every burst is transmitted in advance defined by
- the indicated Timing Advance value. The valid range is 0..63, where
- each unit means one GSM symbol advance. The actual Timing Advance value
- is set using SETTA control command from MS. By default, it's set to 0.
-
- === Path loss simulation - burst dropping
-
- In some cases, e.g. due to a weak signal or high interference, a burst
- can be lost, i.e. not detected by the receiver. This can also be
- simulated using FAKE_DROP command on both control interfaces:
-
- - burst_{dl|ul}_drop_amount - the amount of DL/UL bursts
- to be dropped (i.e. not forwarded towards the MS/BTS),
-
- - burst_{dl|ul}_drop_period - drop every X DL/UL burst, e.g.
- 1 - drop every consequent burst, 2 - drop every second burst, etc.
+ Each to be distributed L12TRX message is being transformed
+ into a TRX2L1 message, and then forwarded to transceivers
+ with partially initialized header. All uninitialized header
+ fields (such as rssi and toa256) shall be set by each
+ transceiver individually before sending towards the L1.
"""
- def __init__(self, bts_link, bb_link):
- self.bts_link = bts_link
- self.bb_link = bb_link
-
- # Init default parameters
- self.reset_dl()
- self.reset_ul()
-
- # Initialize (or reset to) default parameters for Downlink
- def reset_dl(self):
- # Unset current DL freq.
- self.bts_freq = None
-
- # Indicated RSSI / ToA values
- self.toa256_dl_base = 0
- self.rssi_dl_base = -60
-
- # RSSI / ToA randomization threshold
- self.toa256_dl_threshold = 0
- self.rssi_dl_threshold = 0
-
- # Path loss simulation (burst dropping)
- self.burst_dl_drop_amount = 0
- self.burst_dl_drop_period = 1
-
- # Initialize (or reset to) default parameters for Uplink
- def reset_ul(self):
- # Unset current DL freq.
- self.bb_freq = None
-
- # Indicated RSSI / ToA values
- self.rssi_ul_base = -70
- self.toa256_ul_base = 0
-
- # RSSI / ToA randomization threshold
- self.toa256_ul_threshold = 0
- self.rssi_ul_threshold = 0
-
- # Path loss simulation (burst dropping)
- self.burst_ul_drop_amount = 0
- self.burst_ul_drop_period = 1
-
- # Init timeslot filter (drop everything by default)
- self.ts_pass_list = []
-
- # Reset Timing Advance value
- self.ta = 0
-
- # Converts TA value from symbols to
- # units of 1/256 of GSM symbol periods
- def calc_ta256(self):
- return self.ta * 256
-
- # Calculates a random ToA value for Downlink bursts
- def calc_dl_toa256(self):
- # Check if randomization is required
- if self.toa256_dl_threshold is 0:
- return self.toa256_dl_base
-
- # Calculate a range for randomization
- toa256_min = self.toa256_dl_base - self.toa256_dl_threshold
- toa256_max = self.toa256_dl_base + self.toa256_dl_threshold
-
- # Generate a random ToA value
- toa256 = random.randint(toa256_min, toa256_max)
-
- return toa256
-
- # Calculates a random ToA value for Uplink bursts
- def calc_ul_toa256(self):
- # Check if randomization is required
- if self.toa256_ul_threshold is 0:
- return self.toa256_ul_base
-
- # Calculate a range for randomization
- toa256_min = self.toa256_ul_base - self.toa256_ul_threshold
- toa256_max = self.toa256_ul_base + self.toa256_ul_threshold
-
- # Generate a random ToA value
- toa256 = random.randint(toa256_min, toa256_max)
-
- return toa256
-
- # Calculates a random RSSI value for Downlink bursts
- def calc_dl_rssi(self):
- # Check if randomization is required
- if self.rssi_dl_threshold is 0:
- return self.rssi_dl_base
-
- # Calculate a range for randomization
- rssi_min = self.rssi_dl_base - self.rssi_dl_threshold
- rssi_max = self.rssi_dl_base + self.rssi_dl_threshold
-
- # Generate a random RSSI value
- return random.randint(rssi_min, rssi_max)
-
- # Calculates a random RSSI value for Uplink bursts
- def calc_ul_rssi(self):
- # Check if randomization is required
- if self.rssi_ul_threshold is 0:
- return self.rssi_ul_base
-
- # Calculate a range for randomization
- rssi_min = self.rssi_ul_base - self.rssi_ul_threshold
- rssi_max = self.rssi_ul_base + self.rssi_ul_threshold
-
- # Generate a random RSSI value
- return random.randint(rssi_min, rssi_max)
-
- # DL path loss simulation
- def path_loss_sim_dl(self, msg):
- # Burst dropping
- if self.burst_dl_drop_amount > 0:
- if msg.fn % self.burst_dl_drop_period == 0:
- log.info("Simulation: dropping DL burst (fn=%u %% %u == 0)"
- % (msg.fn, self.burst_dl_drop_period))
- self.burst_dl_drop_amount -= 1
- return None
-
- return msg
-
- # UL path loss simulation
- def path_loss_sim_ul(self, msg):
- # Burst dropping
- if self.burst_ul_drop_amount > 0:
- if msg.fn % self.burst_ul_drop_period == 0:
- log.info("Simulation: dropping UL burst (fn=%u %% %u == 0)"
- % (msg.fn, self.burst_ul_drop_period))
- self.burst_ul_drop_amount -= 1
- return None
-
- return msg
-
- # DL burst preprocessing
- def preprocess_dl_burst(self, msg):
- # Calculate both RSSI and ToA values
- msg.toa256 = self.calc_dl_toa256()
- msg.rssi = self.calc_dl_rssi()
-
- # UL burst preprocessing
- def preprocess_ul_burst(self, msg):
- # Calculate both RSSI and ToA values,
- # also apply Timing Advance
- msg.toa256 = self.calc_ul_toa256()
- msg.toa256 -= self.calc_ta256()
- msg.rssi = self.calc_ul_rssi()
-
- # Converts a L12TRX message to TRX2L1 message
- def transform_msg(self, msg_raw):
- # Attempt to parse a message
- try:
- msg_l12trx = DATAMSG_L12TRX()
- msg_l12trx.parse_msg(bytearray(msg_raw))
- except:
- log.error("Dropping unhandled DL message...")
- return None
-
- # Compose a new message for L1
- return msg_l12trx.gen_trx2l1()
-
- # Downlink handler: BTS -> BB
- def bts2bb(self):
- # Read data from socket
- data, addr = self.bts_link.sock.recvfrom(512)
-
- # BB is not connected / tuned
- if self.bb_freq is None:
- return None
-
- # Freq. filter
- if self.bb_freq != self.bts_freq:
- return None
-
- # Process a message
- msg = self.transform_msg(data)
- if msg is None:
- return None
-
- # Timeslot filter
- if msg.tn not in self.ts_pass_list:
- return None
-
- # Path loss simulation
- msg = self.path_loss_sim_dl(msg)
- if msg is None:
- return None
-
- # Burst preprocessing
- self.preprocess_dl_burst(msg)
-
- # Validate and generate the payload
- payload = msg.gen_msg()
-
- # Append two unused bytes at the end
- # in order to keep the compatibility
- payload += bytearray(2)
-
- # Send burst to BB
- self.bb_link.send(payload)
-
- # Uplink handler: BB -> BTS
- def bb2bts(self):
- # Read data from socket
- data, addr = self.bb_link.sock.recvfrom(512)
-
- # BTS is not connected / tuned
- if self.bts_freq is None:
- return None
-
- # Freq. filter
- if self.bb_freq != self.bts_freq:
- return None
-
- # Process a message
- msg = self.transform_msg(data)
- if msg is None:
- return None
-
- # Timeslot filter
- if msg.tn not in self.ts_pass_list:
- log.warning("TS %u is not configured, dropping UL burst..." % msg.tn)
- return None
-
- # Path loss simulation
- msg = self.path_loss_sim_ul(msg)
- if msg is None:
- return None
-
- # Burst preprocessing
- self.preprocess_ul_burst(msg)
-
- # Validate and generate the payload
- payload = msg.gen_msg()
-
- # Append two unused bytes at the end
- # in order to keep the compatibility
- payload += bytearray(2)
-
- # Send burst to BTS
- self.bts_link.send(payload)
+ def __init__(self, trx_list = []):
+ # List of Transceiver instances
+ self.trx_list = trx_list
+
+ def add_trx(self, trx):
+ if trx in self.trx_list:
+ log.error("TRX is already in the list")
+ return
+
+ self.trx_list.append(trx)
+
+ def del_trx(self, trx):
+ if trx not in self.trx_list:
+ log.error("TRX is not in the list")
+ return
+
+ self.trx_list.remove(trx)
+
+ def forward_msg(self, src_trx, rx_msg):
+ # Transform from L12TRX to TRX2L1
+ tx_msg = rx_msg.gen_trx2l1()
+ if tx_msg is None:
+ log.error("Forwarding failed, could not transform "
+ "message (%s) => dropping..." % rx_msg.desc_hdr())
+
+ # Iterate over all known transceivers
+ for trx in self.trx_list:
+ if trx == src_trx:
+ continue
+
+ # Check transceiver state
+ if not trx.running:
+ continue
+ if trx.rx_freq != src_trx.tx_freq:
+ continue
+ if tx_msg.tn not in trx.ts_list:
+ continue
+
+ trx.send_data_msg(src_trx, tx_msg)
diff --git a/src/target/trx_toolkit/ctrl_if_bb.py b/src/target/trx_toolkit/ctrl_if_bb.py
deleted file mode 100644
index d25aa306..00000000
--- a/src/target/trx_toolkit/ctrl_if_bb.py
+++ /dev/null
@@ -1,219 +0,0 @@
-#!/usr/bin/env python2
-# -*- coding: utf-8 -*-
-
-# TRX Toolkit
-# CTRL interface implementation (OsmocomBB specific)
-#
-# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
-#
-# All Rights Reserved
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-import logging as log
-
-from ctrl_if import CTRLInterface
-
-class CTRLInterfaceBB(CTRLInterface):
- # Internal state variables
- trx_started = False
- burst_fwd = None
- rx_freq = None
- tx_freq = None
- pm = None
-
- def __init__(self, *udp_link_args):
- CTRLInterface.__init__(self, *udp_link_args)
- log.info("Init CTRL interface for BB (%s)" % self.desc_link())
-
- def parse_cmd(self, request):
- # Power control
- if self.verify_cmd(request, "POWERON", 0):
- log.debug("Recv POWERON CMD")
-
- # Ensure transceiver isn't working
- if self.trx_started:
- log.error("Transceiver already started")
- return -1
-
- # Ensure RX / TX freq. are set
- if (self.rx_freq is None) or (self.tx_freq is None):
- log.error("RX / TX freq. are not set")
- return -1
-
- log.info("Starting transceiver...")
- self.trx_started = True
- return 0
-
- elif self.verify_cmd(request, "POWEROFF", 0):
- log.debug("Recv POWEROFF cmd")
-
- log.info("Stopping transceiver...")
- self.trx_started = False
- return 0
-
- # Tuning Control
- elif self.verify_cmd(request, "RXTUNE", 1):
- log.debug("Recv RXTUNE cmd")
-
- # TODO: check freq range
- self.rx_freq = int(request[1]) * 1000
- self.burst_fwd.bb_freq = self.rx_freq
- return 0
-
- elif self.verify_cmd(request, "TXTUNE", 1):
- log.debug("Recv TXTUNE cmd")
-
- # TODO: check freq range
- self.tx_freq = int(request[1]) * 1000
- return 0
-
- # Power measurement
- elif self.verify_cmd(request, "MEASURE", 1):
- log.debug("Recv MEASURE cmd")
-
- if self.pm is None:
- return -1
-
- # TODO: check freq range
- meas_freq = int(request[1]) * 1000
- meas_dbm = str(self.pm.measure(meas_freq))
-
- return (0, [meas_dbm])
-
- elif self.verify_cmd(request, "SETSLOT", 2):
- log.debug("Recv SETSLOT cmd")
-
- if self.burst_fwd is None:
- return -1
-
- # Obtain TS index
- ts = int(request[1])
- if ts not in range(0, 8):
- log.error("TS index should be in range: 0..7")
- return -1
-
- # Parse TS type
- ts_type = int(request[2])
-
- # TS activation / deactivation
- # We don't care about ts_type
- if ts_type == 0:
- # Deactivate TS (remove from TS pass-filter list)
- if ts in self.burst_fwd.ts_pass_list:
- self.burst_fwd.ts_pass_list.remove(ts)
- else:
- # Activate TS (add to TS pass-filter list)
- if ts not in self.burst_fwd.ts_pass_list:
- self.burst_fwd.ts_pass_list.append(ts)
-
- return 0
-
- # Timing Advance
- elif self.verify_cmd(request, "SETTA", 1):
- log.debug("Recv SETTA cmd")
-
- # Save to the BurstForwarder instance
- self.burst_fwd.ta = int(request[1])
- return 0
-
- # Timing of Arrival simulation for Uplink
- # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_TOA", 2):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.toa256_ul_base = int(request[1])
- self.burst_fwd.toa256_ul_threshold = int(request[2])
-
- return 0
-
- # Timing of Arrival simulation for Uplink
- # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_TOA", 1):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply delta
- self.burst_fwd.toa256_ul_base += int(request[1])
-
- return 0
-
- # RSSI simulation for Uplink
- # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_RSSI", 2):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.rssi_ul_base = int(request[1])
- self.burst_fwd.rssi_ul_threshold = int(request[2])
-
- return 0
-
- # RSSI simulation for Uplink
- # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_RSSI", 1):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply delta
- self.burst_fwd.rssi_ul_base += int(request[1])
-
- return 0
-
- # Path loss simulation for UL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT>
- # Dropping pattern: fn % 1 == 0
- elif self.verify_cmd(request, "FAKE_DROP", 1):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- self.burst_fwd.burst_ul_drop_amount = num
- self.burst_fwd.burst_ul_drop_period = 1
-
- return 0
-
- # Path loss simulation for UL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
- # Dropping pattern: fn % period == 0
- elif self.verify_cmd(request, "FAKE_DROP", 2):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- # Parse / validate period
- period = int(request[2])
- if period <= 0:
- log.error("FAKE_DROP period shall be greater than zero")
- return -1
-
- self.burst_fwd.burst_ul_drop_amount = num
- self.burst_fwd.burst_ul_drop_period = period
-
- return 0
-
- # Wrong / unknown command
- else:
- # We don't care about other commands,
- # so let's merely ignore them ;)
- log.debug("Ignore CMD %s" % request[0])
- return 0
diff --git a/src/target/trx_toolkit/ctrl_if_bts.py b/src/target/trx_toolkit/ctrl_if_bts.py
deleted file mode 100644
index cb38b67c..00000000
--- a/src/target/trx_toolkit/ctrl_if_bts.py
+++ /dev/null
@@ -1,189 +0,0 @@
-#!/usr/bin/env python2
-# -*- coding: utf-8 -*-
-
-# TRX Toolkit
-# CTRL interface implementation (OsmoBTS specific)
-#
-# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
-#
-# All Rights Reserved
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-import logging as log
-
-from ctrl_if import CTRLInterface
-
-class CTRLInterfaceBTS(CTRLInterface):
- # Internal state variables
- trx_started = False
- burst_fwd = None
- clck_gen = None
- rx_freq = None
- tx_freq = None
- pm = None
-
- def __init__(self, *udp_link_args):
- CTRLInterface.__init__(self, *udp_link_args)
- log.info("Init CTRL interface for BTS (%s)" % self.desc_link())
-
- def parse_cmd(self, request):
- # Power control
- if self.verify_cmd(request, "POWERON", 0):
- log.debug("Recv POWERON CMD")
-
- # Ensure transceiver isn't working
- if self.trx_started:
- log.error("Transceiver already started")
- return -1
-
- # Ensure RX / TX freq. are set
- if (self.rx_freq is None) or (self.tx_freq is None):
- log.error("RX / TX freq. are not set")
- return -1
-
- log.info("Starting transceiver...")
- self.trx_started = True
-
- # Power emulation
- if self.pm is not None:
- self.pm.add_bts_list([self.tx_freq])
-
- # Start clock indications
- if self.clck_gen is not None:
- self.clck_gen.start()
-
- return 0
-
- elif self.verify_cmd(request, "POWEROFF", 0):
- log.debug("Recv POWEROFF cmd")
-
- log.info("Stopping transceiver...")
- self.trx_started = False
-
- # Power emulation
- if self.pm is not None:
- self.pm.del_bts_list([self.tx_freq])
-
- # Stop clock indications
- if self.clck_gen is not None:
- self.clck_gen.stop()
-
- return 0
-
- # Tuning Control
- elif self.verify_cmd(request, "RXTUNE", 1):
- log.debug("Recv RXTUNE cmd")
-
- # TODO: check freq range
- self.rx_freq = int(request[1]) * 1000
- return 0
-
- elif self.verify_cmd(request, "TXTUNE", 1):
- log.debug("Recv TXTUNE cmd")
-
- # TODO: check freq range
- self.tx_freq = int(request[1]) * 1000
- self.burst_fwd.bts_freq = self.tx_freq
- return 0
-
- # Timing of Arrival simulation for Downlink
- # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_TOA", 2):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.toa256_dl_base = int(request[1])
- self.burst_fwd.toa256_dl_threshold = int(request[2])
-
- return 0
-
- # Timing of Arrival simulation for Downlink
- # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_TOA", 1):
- log.debug("Recv FAKE_TOA cmd")
-
- # Parse and apply delta
- self.burst_fwd.toa256_dl_base += int(request[1])
-
- return 0
-
- # RSSI simulation for Downlink
- # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
- elif self.verify_cmd(request, "FAKE_RSSI", 2):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply both base and threshold
- self.burst_fwd.rssi_dl_base = int(request[1])
- self.burst_fwd.rssi_dl_threshold = int(request[2])
-
- return 0
-
- # RSSI simulation for Downlink
- # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
- elif self.verify_cmd(request, "FAKE_RSSI", 1):
- log.debug("Recv FAKE_RSSI cmd")
-
- # Parse and apply delta
- self.burst_fwd.rssi_dl_base += int(request[1])
-
- return 0
-
- # Path loss simulation for DL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT>
- # Dropping pattern: fn % 1 == 0
- elif self.verify_cmd(request, "FAKE_DROP", 1):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- self.burst_fwd.burst_dl_drop_amount = num
- self.burst_fwd.burst_dl_drop_period = 1
-
- return 0
-
- # Path loss simulation for DL: burst dropping
- # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
- # Dropping pattern: fn % period == 0
- elif self.verify_cmd(request, "FAKE_DROP", 2):
- log.debug("Recv FAKE_DROP cmd")
-
- # Parse / validate amount of bursts
- num = int(request[1])
- if num < 0:
- log.error("FAKE_DROP amount shall not be negative")
- return -1
-
- # Parse / validate period
- period = int(request[2])
- if period <= 0:
- log.error("FAKE_DROP period shall be greater than zero")
- return -1
-
- self.burst_fwd.burst_dl_drop_amount = num
- self.burst_fwd.burst_dl_drop_period = period
-
- return 0
-
- # Wrong / unknown command
- else:
- # We don't care about other commands,
- # so let's merely ignore them ;)
- log.debug("Ignore CMD %s" % request[0])
- return 0
diff --git a/src/target/trx_toolkit/ctrl_if_trx.py b/src/target/trx_toolkit/ctrl_if_trx.py
new file mode 100644
index 00000000..83d55db1
--- /dev/null
+++ b/src/target/trx_toolkit/ctrl_if_trx.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# CTRL interface implementation (common commands)
+#
+# (C) 2016-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import logging as log
+
+from ctrl_if import CTRLInterface
+
+class CTRLInterfaceTRX(CTRLInterface):
+ """ CTRL interface handler for common transceiver management commands.
+
+ The following set of commands is mandatory for every transceiver:
+
+ - POWERON / POWEROFF - state management (running / idle),
+ - RXTUNE / TXTUNE - RX / TX frequency management,
+ - SETSLOT - timeslot management.
+
+ Additionally, there is an optional MEASURE command, which is used
+ by OsmocomBB to perform power measurement on a given frequency.
+
+ A given transceiver may also define its own command handler,
+ that is prioritized, i.e. it can overwrite any commands mentioned
+ above. If None is returned, a command is considered as unhandled.
+
+ """
+
+ def __init__(self, trx, *udp_link_args):
+ CTRLInterface.__init__(self, *udp_link_args)
+ log.info("Init CTRL interface (%s)" % self.desc_link())
+
+ # Link with Transceiver instance we belong to
+ self.trx = trx
+
+ def parse_cmd(self, request):
+ # Custom command handlers (prioritized)
+ res = self.trx.ctrl_cmd_handler(request)
+ if res is not None:
+ return res
+
+ # Power control
+ if self.verify_cmd(request, "POWERON", 0):
+ log.debug("Recv POWERON CMD")
+
+ # Ensure transceiver isn't working
+ if self.trx.running:
+ log.error("Transceiver already started")
+ return -1
+
+ # Ensure RX / TX freq. are set
+ if (self.trx.rx_freq is None) or (self.trx.tx_freq is None):
+ log.error("RX / TX freq. are not set")
+ return -1
+
+ log.info("Starting transceiver...")
+ self.trx.running = True
+
+ # Notify transceiver about that
+ self.trx.power_event_handler("POWERON")
+
+ return 0
+
+ elif self.verify_cmd(request, "POWEROFF", 0):
+ log.debug("Recv POWEROFF cmd")
+
+ log.info("Stopping transceiver...")
+ self.trx.running = False
+
+ # Notify transceiver about that
+ self.trx.power_event_handler("POWEROFF")
+
+ return 0
+
+ # Tuning Control
+ elif self.verify_cmd(request, "RXTUNE", 1):
+ log.debug("Recv RXTUNE cmd")
+
+ # TODO: check freq range
+ self.trx.rx_freq = int(request[1]) * 1000
+ return 0
+
+ elif self.verify_cmd(request, "TXTUNE", 1):
+ log.debug("Recv TXTUNE cmd")
+
+ # TODO: check freq range
+ self.trx.tx_freq = int(request[1]) * 1000
+ return 0
+
+ elif self.verify_cmd(request, "SETSLOT", 2):
+ log.debug("Recv SETSLOT cmd")
+
+ # Obtain TS index
+ ts = int(request[1])
+ if ts not in range(0, 8):
+ log.error("TS index should be in range: 0..7")
+ return -1
+
+ # Parse TS type
+ ts_type = int(request[2])
+
+ # TS activation / deactivation
+ # We don't care about ts_type
+ if ts_type == 0:
+ # Deactivate TS (remove from the list of active timeslots)
+ if ts in self.trx.ts_list:
+ self.trx.ts_list.remove(ts)
+ else:
+ # Activate TS (add to the list of active timeslots)
+ if ts not in self.trx.ts_list:
+ self.trx.ts_list.append(ts)
+
+ return 0
+
+ # Power measurement
+ if self.verify_cmd(request, "MEASURE", 1):
+ log.debug("Recv MEASURE cmd")
+
+ # Power Measurement interface is optional
+ # for Transceiver, thus may be uninitialized
+ if self.trx.pwr_meas is None:
+ log.error("Power Measurement interface "
+ "is not initialized => rejecting command")
+ return -1
+
+ # TODO: check freq range
+ meas_freq = int(request[1]) * 1000
+ meas_dbm = self.trx.pwr_meas.measure(meas_freq)
+
+ return (0, [str(meas_dbm)])
+
+ # Wrong / unknown command
+ else:
+ # We don't care about other commands,
+ # so let's merely ignore them ;)
+ log.debug("Ignore CMD %s" % request[0])
+ return 0
diff --git a/src/target/trx_toolkit/fake_pm.py b/src/target/trx_toolkit/fake_pm.py
index 840b4e40..1992f8d3 100644
--- a/src/target/trx_toolkit/fake_pm.py
+++ b/src/target/trx_toolkit/fake_pm.py
@@ -2,9 +2,9 @@
# -*- coding: utf-8 -*-
# TRX Toolkit
-# Power measurement emulation for BB
+# Power measurement emulation
#
-# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@@ -25,29 +25,53 @@
from random import randint
class FakePM:
- # Freq. list for good power level
- bts_list = []
+ """ Power measurement emulation for fake transceivers.
- def __init__(self, noise_min, noise_max, bts_min, bts_max):
- # Save power level ranges
+ There is no such thing like RF signal level in fake Um-interface,
+ so we need to emulate this. The main idea is to have a list of
+ all running and idle transceivers. As soon as a measurement
+ request is received, FakePM will attempt to find a running
+ transceiver on a given frequency.
+
+ The result of such "measurement" is a random RSSI value
+ in one of the following ranges:
+
+ - trx_min ... trx_max - if at least one TRX was found,
+ - noise_min ... noise_max - no TRX instances were found.
+
+ FIXME: it would be great to average the rate of bursts
+ and indicated power / attenuation values for all
+ matching transceivers, so "pure traffic" ARFCNs
+ would be handled properly.
+
+ """
+
+ def __init__(self, noise_min, noise_max, trx_min, trx_max):
+ # Init list of transceivers
+ self.trx_list = []
+
+ # RSSI randomization ranges
self.noise_min = noise_min
self.noise_max = noise_max
- self.bts_min = bts_min
- self.bts_max = bts_max
+ self.trx_min = trx_min
+ self.trx_max = trx_max
+
+ @property
+ def rssi_noise(self):
+ return randint(self.noise_min, self.noise_max)
- def measure(self, bts):
- if bts in self.bts_list:
- return randint(self.bts_min, self.bts_max)
- else:
- return randint(self.noise_min, self.noise_max)
+ @property
+ def rssi_trx(self):
+ return randint(self.trx_min, self.trx_max)
- def update_bts_list(self, new_list):
- self.bts_list = new_list
+ def measure(self, freq):
+ # Iterate over all known transceivers
+ for trx in self.trx_list:
+ if not trx.running:
+ continue
- def add_bts_list(self, add_list):
- self.bts_list += add_list
+ # Match by given frequency
+ if trx.tx_freq == freq:
+ return self.rssi_trx
- def del_bts_list(self, del_list):
- for item in del_list:
- if item in self.bts_list:
- self.bts_list.remove(item)
+ return self.rssi_noise
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
index 95261dfe..46b413aa 100755
--- a/src/target/trx_toolkit/fake_trx.py
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -27,17 +27,237 @@ APP_CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
import logging as log
import signal
import argparse
+import random
import select
import sys
from app_common import ApplicationBase
-from ctrl_if_bts import CTRLInterfaceBTS
-from ctrl_if_bb import CTRLInterfaceBB
from burst_fwd import BurstForwarder
+from transceiver import Transceiver
+from clck_gen import CLCKGen
from fake_pm import FakePM
-from udp_link import UDPLink
-from clck_gen import CLCKGen
+class FakeTRX(Transceiver):
+ """ Fake transceiver with RF path (burst loss, RSSI, TA, ToA) simulation.
+
+ == ToA / RSSI measurement simulation
+
+ Since this is a virtual environment, we can simulate different
+ parameters of the physical RF interface:
+
+ - ToA (Timing of Arrival) - measured difference between expected
+ and actual time of burst arrival in units of 1/256 of GSM symbol
+ periods. A pair of both base and threshold values defines a range
+ of ToA value randomization:
+
+ from (toa256_base - toa256_rand_threshold)
+ to (toa256_base + toa256_rand_threshold).
+
+ - RSSI (Received Signal Strength Indication) - measured "power" of
+ the signal (per burst) in dBm. A pair of both base and threshold
+ values defines a range of RSSI value randomization:
+
+ from (rssi_base - rssi_rand_threshold)
+ to (rssi_base + rssi_rand_threshold).
+
+ Please note that randomization of both RSSI and ToA is optional,
+ and can be enabled from the control interface.
+
+ == Timing Advance handling
+
+ The BTS is using ToA measurements for UL bursts in order to calculate
+ Timing Advance value, that is then indicated to a MS, which in its turn
+ shall apply this value to the transmitted signal in order to compensate
+ the delay. Basically, every burst is transmitted in advance defined by
+ the indicated Timing Advance value. The valid range is 0..63, where
+ each unit means one GSM symbol advance. The actual Timing Advance value
+ is set using SETTA control command from MS. By default, it's set to 0.
+
+ == Path loss simulation
+
+ === Burst dropping
+
+ In some cases, e.g. due to a weak signal or high interference, a burst
+ can be lost, i.e. not detected by the receiver. This can also be
+ simulated using FAKE_DROP command on the control interface:
+
+ - burst_drop_amount - the amount of DL/UL bursts
+ to be dropped (i.e. not forwarded towards the MS/BTS),
+
+ - burst_drop_period - drop a DL/UL burst if its (fn % period) == 0.
+
+ == Configuration
+
+ All simulation parameters mentioned above can be changed at runtime
+ using the commands with prefix 'FAKE_' on the control interface.
+ All of them are handled by our custom CTRL command handler.
+
+ """
+
+ TOA256_BASE_DEFAULT = 0
+ RSSI_BASE_DEFAULT = -60
+
+ def __init__(self, *trx_args, **trx_kwargs):
+ Transceiver.__init__(self, *trx_args, **trx_kwargs)
+
+ # Actual ToA / RSSI / TA values
+ self.toa256_base = self.TOA256_BASE_DEFAULT
+ self.rssi_base = self.RSSI_BASE_DEFAULT
+ self.ta = 0
+
+ # ToA / RSSI randomization threshold
+ self.toa256_rand_threshold = 0
+ self.rssi_rand_threshold = 0
+
+ # Path loss simulation (burst dropping)
+ self.burst_drop_amount = 0
+ self.burst_drop_period = 1
+
+ @property
+ def toa256(self):
+ # Check if randomization is required
+ if self.toa256_rand_threshold is 0:
+ return self.toa256_base
+
+ # Generate a random ToA value in required range
+ toa256_min = self.toa256_base - self.toa256_rand_threshold
+ toa256_max = self.toa256_base + self.toa256_rand_threshold
+ return random.randint(toa256_min, toa256_max)
+
+ @property
+ def rssi(self):
+ # Check if randomization is required
+ if self.rssi_rand_threshold is 0:
+ return self.rssi_base
+
+ # Generate a random RSSI value in required range
+ rssi_min = self.rssi_base - self.rssi_rand_threshold
+ rssi_max = self.rssi_base + self.rssi_rand_threshold
+ return random.randint(rssi_min, rssi_max)
+
+ # Path loss simulation: burst dropping
+ # Returns: True - drop, False - keep
+ def sim_burst_drop(self, msg):
+ # Check if dropping is required
+ if self.burst_drop_amount is 0:
+ return False
+
+ if msg.fn % self.burst_drop_period == 0:
+ log.info("Simulation: dropping burst (fn=%u %% %u == 0)"
+ % (msg.fn, self.burst_drop_period))
+ self.burst_drop_amount -= 1
+ return True
+
+ return False
+
+ # Takes (partially initialized) TRX2L1 message,
+ # simulates RF path parameters (such as RSSI),
+ # and sends towards the L1
+ def send_data_msg(self, src_trx, msg):
+ # Complete message header
+ msg.toa256 = self.toa256
+ msg.rssi = self.rssi
+
+ # Apply optional Timing Advance
+ if src_trx.ta is not 0:
+ msg.toa256 -= src_trx.ta * 256
+
+ # Path loss simulation
+ if self.sim_burst_drop(msg):
+ return
+
+ # TODO: make legacy mode configurable (via argv?)
+ self.data_if.send_msg(msg, legacy = True)
+
+ # Simulation specific CTRL command handler
+ def ctrl_cmd_handler(self, request):
+ # Timing Advance
+ # Syntax: CMD SETTA <TA>
+ if self.ctrl_if.verify_cmd(request, "SETTA", 1):
+ log.debug("Recv SETTA cmd")
+
+ # Store indicated value
+ self.ta = int(request[1])
+ return 0
+
+ # Timing of Arrival simulation
+ # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 2):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply both base and threshold
+ self.toa256_base = int(request[1])
+ self.toa256_rand_threshold = int(request[2])
+ return 0
+
+ # Timing of Arrival simulation
+ # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_TOA", 1):
+ log.debug("Recv FAKE_TOA cmd")
+
+ # Parse and apply delta
+ self.toa256_base += int(request[1])
+ return 0
+
+ # RSSI simulation
+ # Absolute form: CMD FAKE_RSSI <BASE> <THRESH>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 2):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply both base and threshold
+ self.rssi_base = int(request[1])
+ self.rssi_rand_threshold = int(request[2])
+ return 0
+
+ # RSSI simulation
+ # Relative form: CMD FAKE_RSSI <+-BASE_DELTA>
+ elif self.ctrl_if.verify_cmd(request, "FAKE_RSSI", 1):
+ log.debug("Recv FAKE_RSSI cmd")
+
+ # Parse and apply delta
+ self.rssi_base += int(request[1])
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT>
+ # Dropping pattern: fn % 1 == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 1):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = 1
+ return 0
+
+ # Path loss simulation: burst dropping
+ # Syntax: CMD FAKE_DROP <AMOUNT> <FN_PERIOD>
+ # Dropping pattern: fn % period == 0
+ elif self.ctrl_if.verify_cmd(request, "FAKE_DROP", 2):
+ log.debug("Recv FAKE_DROP cmd")
+
+ # Parse / validate amount of bursts
+ num = int(request[1])
+ if num < 0:
+ log.error("FAKE_DROP amount shall not be negative")
+ return -1
+
+ # Parse / validate period
+ period = int(request[2])
+ if period <= 0:
+ log.error("FAKE_DROP period shall be greater than zero")
+ return -1
+
+ self.burst_drop_amount = num
+ self.burst_drop_period = period
+ return 0
+
+ # Unhandled command
+ return None
class Application(ApplicationBase):
def __init__(self):
@@ -51,72 +271,58 @@ class Application(ApplicationBase):
self.app_init_logging(self.argv)
def run(self):
- # Init TRX CTRL interface for BTS
- self.bts_ctrl = CTRLInterfaceBTS(
- self.argv.bts_addr, self.argv.bts_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 1)
-
- # Init TRX CTRL interface for BB
- self.bb_ctrl = CTRLInterfaceBB(
- self.argv.bb_addr, self.argv.bb_base_port + 101,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 1)
+ # Init shared clock generator
+ self.clck_gen = CLCKGen([])
# Power measurement emulation
# Noise: -120 .. -105
# BTS: -75 .. -50
- self.pm = FakePM(-120, -105, -75, -50)
-
- # Share a FakePM instance between both BTS and BB
- self.bts_ctrl.pm = self.pm
- self.bb_ctrl.pm = self.pm
-
- # Init DATA links
- self.bts_data = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bts_base_port + 2)
- self.bb_data = UDPLink(
- self.argv.bb_addr, self.argv.bb_base_port + 102,
- self.argv.trx_bind_addr, self.argv.bb_base_port + 2)
-
- # BTS <-> BB burst forwarding
- self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data)
-
- # Share a BurstForwarder instance between BTS and BB
- self.bts_ctrl.burst_fwd = self.burst_fwd
- self.bb_ctrl.burst_fwd = self.burst_fwd
-
- # Provide clock to BTS
- self.bts_clck = UDPLink(
- self.argv.bts_addr, self.argv.bts_base_port + 100,
- self.argv.trx_bind_addr, self.argv.bts_base_port)
- self.clck_gen = CLCKGen([self.bts_clck])
- self.bts_ctrl.clck_gen = self.clck_gen
+ self.fake_pm = FakePM(-120, -105, -75, -50)
+
+ # Init TRX instance for BTS
+ self.bts_trx = FakeTRX(self.argv.trx_bind_addr,
+ self.argv.bts_addr, self.argv.bts_base_port,
+ clck_gen = self.clck_gen)
+
+ # Init TRX instance for BB
+ self.bb_trx = FakeTRX(self.argv.trx_bind_addr,
+ self.argv.bb_addr, self.argv.bb_base_port,
+ pwr_meas = self.fake_pm)
+
+ # Burst forwarding between transceivers
+ self.burst_fwd = BurstForwarder()
+ self.burst_fwd.add_trx(self.bts_trx)
+ self.burst_fwd.add_trx(self.bb_trx)
log.info("Init complete")
# Enter main loop
while True:
- socks = [self.bts_ctrl.sock, self.bb_ctrl.sock,
- self.bts_data.sock, self.bb_data.sock]
+ socks = [self.bts_trx.ctrl_if.sock, self.bb_trx.ctrl_if.sock,
+ self.bts_trx.data_if.sock, self.bb_trx.data_if.sock]
# Wait until we get any data on any socket
r_event, w_event, x_event = select.select(socks, [], [])
# Downlink: BTS -> BB
- if self.bts_data.sock in r_event:
- self.burst_fwd.bts2bb()
+ if self.bts_trx.data_if.sock in r_event:
+ msg = self.bts_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bts_trx, msg)
# Uplink: BB -> BTS
- if self.bb_data.sock in r_event:
- self.burst_fwd.bb2bts()
+ if self.bb_trx.data_if.sock in r_event:
+ msg = self.bb_trx.recv_data_msg()
+ if msg is not None:
+ self.burst_fwd.forward_msg(self.bb_trx, msg)
# CTRL commands from BTS
- if self.bts_ctrl.sock in r_event:
- self.bts_ctrl.handle_rx()
+ if self.bts_trx.ctrl_if.sock in r_event:
+ self.bts_trx.ctrl_if.handle_rx()
# CTRL commands from BB
- if self.bb_ctrl.sock in r_event:
- self.bb_ctrl.handle_rx()
+ if self.bb_trx.ctrl_if.sock in r_event:
+ self.bb_trx.ctrl_if.handle_rx()
def shutdown(self):
log.info("Shutting down...")
diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py
new file mode 100644
index 00000000..edd4d31f
--- /dev/null
+++ b/src/target/trx_toolkit/transceiver.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Transceiver implementation
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import logging as log
+
+from ctrl_if_trx import CTRLInterfaceTRX
+from data_if import DATAInterface
+from udp_link import UDPLink
+
+class Transceiver:
+ """ Base transceiver implementation.
+
+ Represents a single transceiver, that can be used as for the BTS side,
+ as for the MS side. Each individual instance of Transceiver unifies
+ three basic interfaces built on three independent UDP connections:
+
+ - CLCK (base port + 100/0) - clock indications from TRX to L1,
+ - CTRL (base port + 101/1) - control interface for L1,
+ - DATA (base port + 102/2) - bidirectional data interface for bursts.
+
+ A transceiver can be either in active (i.e. working), or in idle mode.
+ The active mode should ensure that both RX/TX frequencies are set.
+
+ NOTE: CLCK is not required for some L1 implementations, so it is optional.
+
+ == Timeslot configuration
+
+ Transceiver has a list of active (i.e. configured) TDMA timeslots.
+ The L1 should configure a timeslot before sending or expecting any
+ data on it. This is done by SETSLOT control command, which also
+ indicates an associated channel combination (see GSM TS 05.02).
+
+ NOTE: we don't store the associated channel combinations,
+ as they are only useful for burst detection and demodulation.
+
+ == Clock distribution (optional)
+
+ The clock indications are not expected by L1 when transceiver
+ is not running, so we monitor both POWERON / POWEROFF events
+ from the control interface, and keep the list of CLCK links
+ in a given CLCKGen instance updated. The clock generator is
+ started and stopped automatically.
+
+ NOTE: a single instance of CLCKGen can be shared between multiple
+ transceivers, as well as multiple transceivers may use
+ individual CLCKGen instances.
+
+ == Power Measurement (optional)
+
+ Transceiver may have an optional power measurement interface,
+ that shall provide at least one method: measure(freq). This
+ is required for the MS side (i.e. OsmocomBB).
+
+ """
+
+ def __init__(self, bind_addr, remote_addr, base_port,
+ clck_gen = None, pwr_meas = None):
+ # Connection info
+ self.remote_addr = remote_addr
+ self.bind_addr = bind_addr
+ self.base_port = base_port
+
+ # Init DATA interface
+ self.data_if = DATAInterface(
+ remote_addr, base_port + 102,
+ bind_addr, base_port + 2)
+
+ # Init CTRL interface
+ self.ctrl_if = CTRLInterfaceTRX(self,
+ remote_addr, base_port + 101,
+ bind_addr, base_port + 1)
+
+ # Init optional CLCK interface
+ self.clck_gen = clck_gen
+ if clck_gen is not None:
+ self.clck_if = UDPLink(
+ remote_addr, base_port + 100,
+ bind_addr, base_port)
+
+ # Optional Power Measurement interface
+ self.pwr_meas = pwr_meas
+
+ # Internal state
+ self.running = False
+
+ # Actual RX / TX frequencies
+ self.rx_freq = None
+ self.tx_freq = None
+
+ # List of active (configured) timeslots
+ self.ts_list = []
+
+ # To be overwritten if required,
+ # no custom command handlers by default
+ def ctrl_cmd_handler(self, request):
+ return None
+
+ def power_event_handler(self, event):
+ # Trigger clock generator if required
+ if self.clck_gen is not None:
+ clck_links = self.clck_gen.clck_links
+ if not self.running and (self.clck_if in clck_links):
+ # Transceiver was stopped
+ clck_links.remove(self.clck_if)
+ elif self.running and (self.clck_if not in clck_links):
+ # Transceiver was started
+ clck_links.append(self.clck_if)
+
+ if not self.clck_gen.timer and len(clck_links) > 0:
+ log.info("Starting clock generator")
+ self.clck_gen.start()
+ elif self.clck_gen.timer and not clck_links:
+ log.info("Stopping clock generator")
+ self.clck_gen.stop()
+
+ def recv_data_msg(self):
+ # Read and parse data from socket
+ msg = self.data_if.recv_l12trx_msg()
+ if not msg:
+ return None
+
+ # Make sure that transceiver is configured and running
+ if not self.running:
+ log.warning("RX DATA message (%s), but transceiver "
+ "is not running => dropping..." % msg.desc_hdr())
+ return None
+
+ # Make sure that indicated timeslot is configured
+ if msg.tn not in self.ts_list:
+ log.warning("RX DATA message (%s), but timeslot "
+ "is not configured => dropping..." % msg.desc_hdr())
+ return None
+
+ return msg