diff options
Diffstat (limited to 'src/target/trx_toolkit/burst_fwd.py')
-rw-r--r-- | src/target/trx_toolkit/burst_fwd.py | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py new file mode 100644 index 00000000..144ae5f4 --- /dev/null +++ b/src/target/trx_toolkit/burst_fwd.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# BTS <-> BB burst forwarding +# +# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import random + +from data_msg import * + +class BurstForwarder: + # Timeslot filter (drop everything by default) + ts_pass = None + + # Freq. filter + bts_freq = None + bb_freq = None + + # Randomization of RSSI + randomize_dl_rssi = False + randomize_ul_rssi = False + + # Randomization of ToA + randomize_dl_toa256 = False + randomize_ul_toa256 = False + + # Timing Advance value indicated by MS (0 by default) + # Valid range: 0..63, where each unit means + # one GSM symbol advance. + ta = 0 + + # Timing of Arrival values indicated by transceiver + # in units of 1/256 of GSM symbol periods. A pair of + # base and threshold values defines a range of ToA value + # randomization: from (base - threshold) to (base + threshold). + toa256_dl_base = 0 + toa256_ul_base = 0 + + toa256_dl_threshold = 128 + toa256_ul_threshold = 128 + + # RSSI values indicated by transceiver in dBm. + # A pair of base and threshold values defines a range of RSSI + # randomization: from (base - threshold) to (base + threshold). + rssi_dl_base = -60 + rssi_ul_base = -70 + + rssi_dl_threshold = 10 + rssi_ul_threshold = 5 + + def __init__(self, bts_link, bb_link): + self.bts_link = bts_link + self.bb_link = bb_link + + # Converts TA value from symbols to + # units of 1/256 of GSM symbol periods + def calc_ta256(self): + return self.ta * 256 + + # Calculates a random ToA value for Downlink bursts + def calc_dl_toa256(self): + # Check if randomization is required + if not self.randomize_dl_toa256: + return self.toa256_dl_base + + # Calculate a range for randomization + toa256_min = self.toa256_dl_base - self.toa256_dl_threshold + toa256_max = self.toa256_dl_base + self.toa256_dl_threshold + + # Generate a random ToA value + toa256 = random.randint(toa256_min, toa256_max) + + return toa256 + + # Calculates a random ToA value for Uplink bursts + def calc_ul_toa256(self): + # Check if randomization is required + if not self.randomize_ul_toa256: + return self.toa256_ul_base + + # Calculate a range for randomization + toa256_min = self.toa256_ul_base - self.toa256_ul_threshold + toa256_max = self.toa256_ul_base + self.toa256_ul_threshold + + # Generate a random ToA value + toa256 = random.randint(toa256_min, toa256_max) + + return toa256 + + # Calculates a random RSSI value for Downlink bursts + def calc_dl_rssi(self): + # Check if randomization is required + if not self.randomize_dl_rssi: + return self.rssi_dl_base + + # Calculate a range for randomization + rssi_min = self.rssi_dl_base - self.rssi_dl_threshold + rssi_max = self.rssi_dl_base + self.rssi_dl_threshold + + # Generate a random RSSI value + return random.randint(rssi_min, rssi_max) + + # Calculates a random RSSI value for Uplink bursts + def calc_ul_rssi(self): + # Check if randomization is required + if not self.randomize_ul_rssi: + return self.rssi_ul_base + + # Calculate a range for randomization + rssi_min = self.rssi_ul_base - self.rssi_ul_threshold + rssi_max = self.rssi_ul_base + self.rssi_ul_threshold + + # Generate a random RSSI value + return random.randint(rssi_min, rssi_max) + + # Converts a L12TRX message to TRX2L1 message + def transform_msg(self, msg_raw, dl = True): + # Attempt to parse a message + try: + msg_l12trx = DATAMSG_L12TRX() + msg_l12trx.parse_msg(bytearray(msg_raw)) + except: + print("[!] Dropping unhandled DL message...") + return None + + # Compose a new message for L1 + msg_trx2l1 = msg_l12trx.gen_trx2l1() + + # Randomize both RSSI and ToA values + if dl: + msg_trx2l1.toa256 = self.calc_dl_toa256() + msg_trx2l1.rssi = self.calc_dl_rssi() + else: + msg_trx2l1.toa256 = self.calc_ul_toa256() + msg_trx2l1.toa256 -= self.calc_ta256() + msg_trx2l1.rssi = self.calc_ul_rssi() + + return msg_trx2l1 + + # Downlink handler: BTS -> BB + def bts2bb(self): + # Read data from socket + data, addr = self.bts_link.sock.recvfrom(512) + + # BB is not connected / tuned + if self.bb_freq is None: + return None + + # Freq. filter + if self.bb_freq != self.bts_freq: + return None + + # Process a message + msg = self.transform_msg(data, dl = True) + if msg is None: + return None + + # Timeslot filter + if msg.tn != self.ts_pass: + return None + + # Validate and generate the payload + payload = msg.gen_msg() + + # Append two unused bytes at the end + # in order to keep the compatibility + payload += bytearray(2) + + # Send burst to BB + self.bb_link.send(payload) + + # Uplink handler: BB -> BTS + def bb2bts(self): + # Read data from socket + data, addr = self.bb_link.sock.recvfrom(512) + + # BTS is not connected / tuned + if self.bts_freq is None: + return None + + # Freq. filter + if self.bb_freq != self.bts_freq: + return None + + # Process a message + msg = self.transform_msg(data, dl = False) + if msg is None: + return None + + # Validate and generate the payload + payload = msg.gen_msg() + + # Append two unused bytes at the end + # in order to keep the compatibility + payload += bytearray(2) + + # Send burst to BTS + self.bts_link.send(payload) |