diff options
Diffstat (limited to 'src/target')
-rw-r--r-- | src/target/trx_toolkit/.gitignore | 4 | ||||
-rw-r--r-- | src/target/trx_toolkit/README | 34 | ||||
-rw-r--r-- | src/target/trx_toolkit/burst_fwd.py | 216 | ||||
-rwxr-xr-x | src/target/trx_toolkit/burst_gen.py | 248 | ||||
-rwxr-xr-x | src/target/trx_toolkit/burst_send.py | 218 | ||||
-rwxr-xr-x | src/target/trx_toolkit/clck_gen.py | 116 | ||||
-rw-r--r-- | src/target/trx_toolkit/copyright.py | 13 | ||||
-rwxr-xr-x | src/target/trx_toolkit/ctrl_cmd.py | 147 | ||||
-rw-r--r-- | src/target/trx_toolkit/ctrl_if.py | 79 | ||||
-rw-r--r-- | src/target/trx_toolkit/ctrl_if_bb.py | 158 | ||||
-rw-r--r-- | src/target/trx_toolkit/ctrl_if_bts.py | 126 | ||||
-rw-r--r-- | src/target/trx_toolkit/data_dump.py | 360 | ||||
-rw-r--r-- | src/target/trx_toolkit/data_if.py | 39 | ||||
-rw-r--r-- | src/target/trx_toolkit/data_msg.py | 545 | ||||
-rw-r--r-- | src/target/trx_toolkit/fake_pm.py | 53 | ||||
-rwxr-xr-x | src/target/trx_toolkit/fake_trx.py | 236 | ||||
-rw-r--r-- | src/target/trx_toolkit/gsm_shared.py | 31 | ||||
-rw-r--r-- | src/target/trx_toolkit/rand_burst_gen.py | 177 | ||||
-rwxr-xr-x | src/target/trx_toolkit/trx_sniff.py | 286 | ||||
-rw-r--r-- | src/target/trx_toolkit/udp_link.py | 57 |
20 files changed, 3143 insertions, 0 deletions
diff --git a/src/target/trx_toolkit/.gitignore b/src/target/trx_toolkit/.gitignore new file mode 100644 index 00000000..749ccdaf --- /dev/null +++ b/src/target/trx_toolkit/.gitignore @@ -0,0 +1,4 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class diff --git a/src/target/trx_toolkit/README b/src/target/trx_toolkit/README new file mode 100644 index 00000000..91b6099b --- /dev/null +++ b/src/target/trx_toolkit/README @@ -0,0 +1,34 @@ +TRX toolkit is a set of tools intended for hacking and debugging +a TRX interface between both transceiver and L1 software, and +emulating a virtual Um-interface between OsmocomBB and OsmoBTS. + +Brief description of available applications: + + - fake_trx.py - main application, that allows to connect both + OsmocomBB and OsmoBTS without actual RF hardware. Currently + only a single MS may work with a single BTS. + + - clck_gen.py - a peripheral tool aimed to emulate TDMA frame + clock generator. Could be used for testing and clock + synchronization of multiple applications. It should be noted, + that one relays on generic system timer (via Python), so + a random clock jitter takes place. + + - ctrl_cmd.py - another peripheral tool, which could be used + for sending CTRL commands directly in manual mode, and also + for application fuzzing. + + - burst_gen.py - a tool for sending GSM bursts either to L1 + (OsmoBTS or OsmocomBB) or to TRX (OsmoTRX and GR-GSM TRX). + Currently it is only possible to generate random bursts of + different types: NB, FB, SB, AB. + + - burst_send.py - a tool for sending existing bursts from a + capture file either to L1 (OsmoBTS or OsmocomBB) or to + TRX (e.g. OsmoTRX or GR-GSM TRX). + + - trx_sniff.py - Scapy-based TRX protocol sniffer. Allows one + to observe a single connection between TRX and L1, and vice + versa. Also provides some capabilities for filtering bursts + by direction, frame and timeslot numbers, and for recording + captured messages to a binary file. diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py new file mode 100644 index 00000000..144ae5f4 --- /dev/null +++ b/src/target/trx_toolkit/burst_fwd.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# BTS <-> BB burst forwarding +# +# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import random + +from data_msg import * + +class BurstForwarder: + # Timeslot filter (drop everything by default) + ts_pass = None + + # Freq. filter + bts_freq = None + bb_freq = None + + # Randomization of RSSI + randomize_dl_rssi = False + randomize_ul_rssi = False + + # Randomization of ToA + randomize_dl_toa256 = False + randomize_ul_toa256 = False + + # Timing Advance value indicated by MS (0 by default) + # Valid range: 0..63, where each unit means + # one GSM symbol advance. + ta = 0 + + # Timing of Arrival values indicated by transceiver + # in units of 1/256 of GSM symbol periods. A pair of + # base and threshold values defines a range of ToA value + # randomization: from (base - threshold) to (base + threshold). + toa256_dl_base = 0 + toa256_ul_base = 0 + + toa256_dl_threshold = 128 + toa256_ul_threshold = 128 + + # RSSI values indicated by transceiver in dBm. + # A pair of base and threshold values defines a range of RSSI + # randomization: from (base - threshold) to (base + threshold). + rssi_dl_base = -60 + rssi_ul_base = -70 + + rssi_dl_threshold = 10 + rssi_ul_threshold = 5 + + def __init__(self, bts_link, bb_link): + self.bts_link = bts_link + self.bb_link = bb_link + + # Converts TA value from symbols to + # units of 1/256 of GSM symbol periods + def calc_ta256(self): + return self.ta * 256 + + # Calculates a random ToA value for Downlink bursts + def calc_dl_toa256(self): + # Check if randomization is required + if not self.randomize_dl_toa256: + return self.toa256_dl_base + + # Calculate a range for randomization + toa256_min = self.toa256_dl_base - self.toa256_dl_threshold + toa256_max = self.toa256_dl_base + self.toa256_dl_threshold + + # Generate a random ToA value + toa256 = random.randint(toa256_min, toa256_max) + + return toa256 + + # Calculates a random ToA value for Uplink bursts + def calc_ul_toa256(self): + # Check if randomization is required + if not self.randomize_ul_toa256: + return self.toa256_ul_base + + # Calculate a range for randomization + toa256_min = self.toa256_ul_base - self.toa256_ul_threshold + toa256_max = self.toa256_ul_base + self.toa256_ul_threshold + + # Generate a random ToA value + toa256 = random.randint(toa256_min, toa256_max) + + return toa256 + + # Calculates a random RSSI value for Downlink bursts + def calc_dl_rssi(self): + # Check if randomization is required + if not self.randomize_dl_rssi: + return self.rssi_dl_base + + # Calculate a range for randomization + rssi_min = self.rssi_dl_base - self.rssi_dl_threshold + rssi_max = self.rssi_dl_base + self.rssi_dl_threshold + + # Generate a random RSSI value + return random.randint(rssi_min, rssi_max) + + # Calculates a random RSSI value for Uplink bursts + def calc_ul_rssi(self): + # Check if randomization is required + if not self.randomize_ul_rssi: + return self.rssi_ul_base + + # Calculate a range for randomization + rssi_min = self.rssi_ul_base - self.rssi_ul_threshold + rssi_max = self.rssi_ul_base + self.rssi_ul_threshold + + # Generate a random RSSI value + return random.randint(rssi_min, rssi_max) + + # Converts a L12TRX message to TRX2L1 message + def transform_msg(self, msg_raw, dl = True): + # Attempt to parse a message + try: + msg_l12trx = DATAMSG_L12TRX() + msg_l12trx.parse_msg(bytearray(msg_raw)) + except: + print("[!] Dropping unhandled DL message...") + return None + + # Compose a new message for L1 + msg_trx2l1 = msg_l12trx.gen_trx2l1() + + # Randomize both RSSI and ToA values + if dl: + msg_trx2l1.toa256 = self.calc_dl_toa256() + msg_trx2l1.rssi = self.calc_dl_rssi() + else: + msg_trx2l1.toa256 = self.calc_ul_toa256() + msg_trx2l1.toa256 -= self.calc_ta256() + msg_trx2l1.rssi = self.calc_ul_rssi() + + return msg_trx2l1 + + # Downlink handler: BTS -> BB + def bts2bb(self): + # Read data from socket + data, addr = self.bts_link.sock.recvfrom(512) + + # BB is not connected / tuned + if self.bb_freq is None: + return None + + # Freq. filter + if self.bb_freq != self.bts_freq: + return None + + # Process a message + msg = self.transform_msg(data, dl = True) + if msg is None: + return None + + # Timeslot filter + if msg.tn != self.ts_pass: + return None + + # Validate and generate the payload + payload = msg.gen_msg() + + # Append two unused bytes at the end + # in order to keep the compatibility + payload += bytearray(2) + + # Send burst to BB + self.bb_link.send(payload) + + # Uplink handler: BB -> BTS + def bb2bts(self): + # Read data from socket + data, addr = self.bb_link.sock.recvfrom(512) + + # BTS is not connected / tuned + if self.bts_freq is None: + return None + + # Freq. filter + if self.bb_freq != self.bts_freq: + return None + + # Process a message + msg = self.transform_msg(data, dl = False) + if msg is None: + return None + + # Validate and generate the payload + payload = msg.gen_msg() + + # Append two unused bytes at the end + # in order to keep the compatibility + payload += bytearray(2) + + # Send burst to BTS + self.bts_link.send(payload) diff --git a/src/target/trx_toolkit/burst_gen.py b/src/target/trx_toolkit/burst_gen.py new file mode 100755 index 00000000..d83f1378 --- /dev/null +++ b/src/target/trx_toolkit/burst_gen.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Auxiliary tool to generate and send random bursts via TRX DATA +# interface, which may be useful for fuzzing and testing +# +# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from copyright import print_copyright +CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")] + +import signal +import getopt +import sys + +from rand_burst_gen import RandBurstGen +from data_dump import DATADumpFile +from data_if import DATAInterface +from gsm_shared import * +from data_msg import * + +class Application: + # Application variables + remote_addr = "127.0.0.1" + bind_addr = "0.0.0.0" + base_port = 5700 + conn_mode = "TRX" + output_file = None + + burst_type = None + burst_count = 1 + + # Common header fields + fn = None + tn = None + + # Message specific header fields + toa256 = None + rssi = None + pwr = None + + def __init__(self): + print_copyright(CR_HOLDERS) + self.parse_argv() + self.check_argv() + + # Set up signal handlers + signal.signal(signal.SIGINT, self.sig_handler) + + # Open requested capture file + if self.output_file is not None: + self.ddf = DATADumpFile(self.output_file) + + def run(self): + # Init DATA interface with TRX or L1 + if self.conn_mode == "TRX": + self.data_if = DATAInterface(self.remote_addr, self.base_port + 2, + self.bind_addr, self.base_port + 102) + elif self.conn_mode == "L1": + self.data_if = DATAInterface(self.remote_addr, self.base_port + 102, + self.bind_addr, self.base_port + 2) + + # Init random burst generator + burst_gen = RandBurstGen() + + # Init an empty DATA message + if self.conn_mode == "TRX": + msg = DATAMSG_L12TRX() + elif self.conn_mode == "L1": + msg = DATAMSG_TRX2L1() + + # Generate a random frame number or use provided one + fn_init = msg.rand_fn() if self.fn is None else self.fn + + # Send as much bursts as required + for i in range(self.burst_count): + # Randomize the message header + msg.rand_hdr() + + # Increase and set frame number + msg.fn = (fn_init + i) % GSM_HYPERFRAME + + # Set timeslot number + if self.tn is not None: + msg.tn = self.tn + + # Set transmit power level + if self.pwr is not None: + msg.pwr = self.pwr + + # Set time of arrival + if self.toa256 is not None: + msg.toa256 = self.toa256 + + # Set RSSI + if self.rssi is not None: + msg.rssi = self.rssi + + # Generate a random burst + if self.burst_type == "NB": + burst = burst_gen.gen_nb() + elif self.burst_type == "FB": + burst = burst_gen.gen_fb() + elif self.burst_type == "SB": + burst = burst_gen.gen_sb() + elif self.burst_type == "AB": + burst = burst_gen.gen_ab() + + # Convert to soft-bits in case of TRX -> L1 message + if self.conn_mode == "L1": + burst = msg.ubit2sbit(burst) + + # Set burst + msg.burst = burst + + print("[i] Sending %d/%d %s burst %s to %s..." + % (i + 1, self.burst_count, self.burst_type, + msg.desc_hdr(), self.conn_mode)) + + # Send message + self.data_if.send_msg(msg) + + # Append a new message to the capture + if self.output_file is not None: + self.ddf.append_msg(msg) + + def print_help(self, msg = None): + s = " Usage: " + sys.argv[0] + " [options]\n\n" \ + " Some help...\n" \ + " -h --help this text\n\n" + + s += " TRX interface specific\n" \ + " -o --output-file Write bursts to a capture file\n" \ + " -m --conn-mode Send bursts to: TRX (default) / L1\n" \ + " -r --remote-addr Set remote address (default %s)\n" \ + " -b --bind-addr Set local address (default %s)\n" \ + " -p --base-port Set base port number (default %d)\n\n" + + s += " Burst generation\n" \ + " -b --burst-type Random burst type (NB, FB, SB, AB)\n" \ + " -c --burst-count How much bursts to send (default 1)\n" \ + " -f --frame-number Set frame number (default random)\n" \ + " -t --timeslot Set timeslot index (default random)\n" \ + " --pwr Set power level (default random)\n" \ + " --rssi Set RSSI (default random)\n" \ + " --toa Set ToA in symbols (default random)\n" \ + " --toa256 Set ToA in 1/256 symbol periods\n" + + print(s % (self.remote_addr, self.bind_addr, self.base_port)) + + if msg is not None: + print(msg) + + def parse_argv(self): + try: + opts, args = getopt.getopt(sys.argv[1:], + "o:m:r:b:p:b:c:f:t:h", + [ + "help", + "output-file=" + "conn-mode=", + "remote-addr=", + "bind-addr=", + "base-port=", + "burst-type=", + "burst-count=", + "frame-number=", + "timeslot=", + "rssi=", + "toa=", + "toa256=", + "pwr=", + ]) + except getopt.GetoptError as err: + self.print_help("[!] " + str(err)) + sys.exit(2) + + for o, v in opts: + if o in ("-h", "--help"): + self.print_help() + sys.exit(2) + + elif o in ("-o", "--output-file"): + self.output_file = v + elif o in ("-m", "--conn-mode"): + self.conn_mode = v + elif o in ("-r", "--remote-addr"): + self.remote_addr = v + elif o in ("-b", "--bind-addr"): + self.bind_addr = v + elif o in ("-p", "--base-port"): + self.base_port = int(v) + + elif o in ("-b", "--burst-type"): + self.burst_type = v + elif o in ("-c", "--burst-count"): + self.burst_count = int(v) + elif o in ("-f", "--frame-number"): + self.fn = int(v) + elif o in ("-t", "--timeslot"): + self.tn = int(v) + + # Message specific header fields + elif o == "--pwr": + self.pwr = int(v) + elif o == "--rssi": + self.rssi = int(v) + elif o == "--toa256": + self.toa256 = int(v) + elif o == "--toa": + self.toa256 = int(float(v) * 256.0 + 0.5) + + def check_argv(self): + # Check connection mode + if self.conn_mode not in ("TRX", "L1"): + self.print_help("[!] Unknown connection type") + sys.exit(2) + + # Check connection mode + if self.burst_type not in ("NB", "FB", "SB", "AB"): + self.print_help("[!] Unknown burst type") + sys.exit(2) + + def sig_handler(self, signum, frame): + print("Signal %d received" % signum) + if signum is signal.SIGINT: + sys.exit(0) + +if __name__ == '__main__': + app = Application() + app.run() diff --git a/src/target/trx_toolkit/burst_send.py b/src/target/trx_toolkit/burst_send.py new file mode 100755 index 00000000..f6c85ba0 --- /dev/null +++ b/src/target/trx_toolkit/burst_send.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Auxiliary tool to send existing bursts via TRX DATA interface +# +# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from copyright import print_copyright +CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")] + +import signal +import getopt +import sys + +from data_dump import DATADumpFile +from data_if import DATAInterface +from gsm_shared import * +from data_msg import * + +class Application: + # Application variables + remote_addr = "127.0.0.1" + bind_addr = "0.0.0.0" + base_port = 5700 + conn_mode = "TRX" + + # Burst source + capture_file = None + + # Count limitations + msg_skip = None + msg_count = None + + # Pass filtering + pf_fn_lt = None + pf_fn_gt = None + pf_tn = None + + def __init__(self): + print_copyright(CR_HOLDERS) + self.parse_argv() + + # Set up signal handlers + signal.signal(signal.SIGINT, self.sig_handler) + + # Open requested capture file + self.ddf = DATADumpFile(self.capture_file) + + def run(self): + # Init DATA interface with TRX or L1 + if self.conn_mode == "TRX": + self.data_if = DATAInterface(self.remote_addr, self.base_port + 2, + self.bind_addr, self.base_port + 102) + l12trx = True + elif self.conn_mode == "L1": + self.data_if = DATAInterface(self.remote_addr, self.base_port + 102, + self.bind_addr, self.base_port + 2) + l12trx = False + else: + self.print_help("[!] Unknown connection type") + sys.exit(2) + + # Read messages from the capture + messages = self.ddf.parse_all( + skip = self.msg_skip, count = self.msg_count) + if messages is False: + pass # FIXME!!! + + for msg in messages: + # Pass filter + if not self.msg_pass_filter(l12trx, msg): + continue + + print("[i] Sending a burst %s to %s..." + % (msg.desc_hdr(), self.conn_mode)) + + # Send message + self.data_if.send_msg(msg) + + def msg_pass_filter(self, l12trx, msg): + # Direction filter + if isinstance(msg, DATAMSG_L12TRX) and not l12trx: + return False + elif isinstance(msg, DATAMSG_TRX2L1) and l12trx: + return False + + # Timeslot filter + if self.pf_tn is not None: + if msg.tn != self.pf_tn: + return False + + # Frame number filter + if self.pf_fn_lt is not None: + if msg.fn > self.pf_fn_lt: + return False + if self.pf_fn_gt is not None: + if msg.fn < self.pf_fn_gt: + return False + + # Burst passed ;) + return True + + def print_help(self, msg = None): + s = " Usage: " + sys.argv[0] + " [options]\n\n" \ + " Some help...\n" \ + " -h --help this text\n\n" + + s += " TRX interface specific\n" \ + " -m --conn-mode Send bursts to: TRX (default) / L1\n" \ + " -r --remote-addr Set remote address (default %s)\n" \ + " -b --bind-addr Set bind address (default %s)\n" \ + " -p --base-port Set base port number (default %d)\n\n" + + s += " Burst source\n" \ + " -i --capture-file Read bursts from capture file\n\n" \ + + s += " Count limitations (disabled by default)\n" \ + " --msg-skip NUM Skip NUM messages before sending\n" \ + " --msg-count NUM Stop after sending NUM messages\n\n" \ + + s += " Filtering (disabled by default)\n" \ + " --timeslot NUM TDMA timeslot number [0..7]\n" \ + " --frame-num-lt NUM TDMA frame number lower than NUM\n" \ + " --frame-num-gt NUM TDMA frame number greater than NUM\n" + + print(s % (self.remote_addr, self.bind_addr, self.base_port)) + + if msg is not None: + print(msg) + + def parse_argv(self): + try: + opts, args = getopt.getopt(sys.argv[1:], + "m:r:b:p:i:h", + [ + "help", + "conn-mode=", + "remote-addr=", + "bind-addr=", + "base-port=", + "capture-file=", + "msg-skip=", + "msg-count=", + "timeslot=", + "frame-num-lt=", + "frame-num-gt=", + ]) + except getopt.GetoptError as err: + self.print_help("[!] " + str(err)) + sys.exit(2) + + for o, v in opts: + if o in ("-h", "--help"): + self.print_help() + sys.exit(2) + + # Capture file + elif o in ("-i", "--capture-file"): + self.capture_file = v + + # TRX interface specific + elif o in ("-m", "--conn-mode"): + self.conn_mode = v + elif o in ("-r", "--remote-addr"): + self.remote_addr = v + elif o in ("-b", "--bind-addr"): + self.bind_addr = v + elif o in ("-p", "--base-port"): + self.base_port = int(v) + + # Count limitations + elif o == "--msg-skip": + self.msg_skip = int(v) + elif o == "--msg-count": + self.msg_count = int(v) + + # Timeslot pass filter + elif o == "--timeslot": + self.pf_tn = int(v) + if self.pf_tn < 0 or self.pf_tn > 7: + self.print_help("[!] Wrong timeslot value") + sys.exit(2) + + # Frame number pass filter + elif o == "--frame-num-lt": + self.pf_fn_lt = int(v) + elif o == "--frame-num-gt": + self.pf_fn_gt = int(v) + + if self.capture_file is None: + self.print_help("[!] Please specify a capture file") + sys.exit(2) + + def sig_handler(self, signum, frame): + print("Signal %d received" % signum) + if signum is signal.SIGINT: + sys.exit(0) + +if __name__ == '__main__': + app = Application() + app.run() diff --git a/src/target/trx_toolkit/clck_gen.py b/src/target/trx_toolkit/clck_gen.py new file mode 100755 index 00000000..b488770e --- /dev/null +++ b/src/target/trx_toolkit/clck_gen.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Simple TDMA frame clock generator +# +# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from copyright import print_copyright +CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")] + +import signal +import time +import sys + +from threading import Timer +from udp_link import UDPLink +from gsm_shared import * + +class CLCKGen: + # GSM TDMA definitions + SEC_DELAY_US = 1000 * 1000 + GSM_FRAME_US = 4615.0 + + # Average loop back delay + LO_DELAY_US = 90.0 + + # State variables + timer = None + + def __init__(self, clck_links, clck_start = 0, ind_period = 102): + self.clck_links = clck_links + self.ind_period = ind_period + self.clck_start = clck_start + self.clck_src = clck_start + + # Calculate counter time + self.ctr_interval = self.GSM_FRAME_US - self.LO_DELAY_US + self.ctr_interval /= self.SEC_DELAY_US + self.ctr_interval *= self.ind_period + + def start(self): + # Send the first indication + self.send_clck_ind() + + def stop(self): + # Stop pending timer + if self.timer is not None: + self.timer.cancel() + self.timer = None + + # Reset the clock source + self.clck_src = self.clck_start + + def send_clck_ind(self): + # Keep clock cycle + if self.clck_src % GSM_HYPERFRAME >= 0: + self.clck_src %= GSM_HYPERFRAME + + # We don't need to send so often + if self.clck_src % self.ind_period == 0: + # Create UDP payload + payload = "IND CLOCK %u\0" % self.clck_src + + # Send indication to all UDP links + for link in self.clck_links: + link.send(payload) + + # Debug print + print("[T] %s" % payload) + + # Increase frame count + self.clck_src += self.ind_period + + # Schedule a new indication + self.timer = Timer(self.ctr_interval, self.send_clck_ind) + self.timer.start() + +# Just a wrapper for independent usage +class Application: + def __init__(self): + # Print copyright + print_copyright(CR_HOLDERS) + + # Set up signal handlers + signal.signal(signal.SIGINT, self.sig_handler) + + def run(self): + self.link = UDPLink("127.0.0.1", 5800, "0.0.0.0", 5700) + self.clck = CLCKGen([self.link], ind_period = 51) + self.clck.start() + + def sig_handler(self, signum, frame): + print("Signal %d received" % signum) + if signum is signal.SIGINT: + self.clck.stop() + +if __name__ == '__main__': + app = Application() + app.run() diff --git a/src/target/trx_toolkit/copyright.py b/src/target/trx_toolkit/copyright.py new file mode 100644 index 00000000..3d3597fd --- /dev/null +++ b/src/target/trx_toolkit/copyright.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +def print_copyright(holders = []): + # Print copyright holders if any + for date, author in holders: + print("Copyright (C) %s by %s" % (date, author)) + + # Print the license header itself + print("License GPLv2+: GNU GPL version 2 or later " \ + "<http://gnu.org/licenses/gpl.html>\n" \ + "This is free software: you are free to change and redistribute it.\n" \ + "There is NO WARRANTY, to the extent permitted by law.\n") diff --git a/src/target/trx_toolkit/ctrl_cmd.py b/src/target/trx_toolkit/ctrl_cmd.py new file mode 100755 index 00000000..e56105a8 --- /dev/null +++ b/src/target/trx_toolkit/ctrl_cmd.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Auxiliary tool to send custom commands via TRX CTRL interface, +# which may be useful for testing and fuzzing +# +# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from copyright import print_copyright +CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")] + +import signal +import getopt +import select +import sys + +from udp_link import UDPLink + +class Application: + # Application variables + remote_addr = "127.0.0.1" + bind_addr = "0.0.0.0" + base_port = 5700 + bind_port = 0 + fuzzing = False + + def __init__(self): + print_copyright(CR_HOLDERS) + self.parse_argv() + + # Set up signal handlers + signal.signal(signal.SIGINT, self.sig_handler) + + # Init UDP connection + self.ctrl_link = UDPLink(self.remote_addr, self.base_port + 1, + self.bind_addr, self.bind_port) + + # Debug print + print("[i] Init CTRL interface (%s)" \ + % self.ctrl_link.desc_link()) + + def print_help(self, msg = None): + s = " Usage: " + sys.argv[0] + " [options]\n\n" \ + " Some help...\n" \ + " -h --help this text\n\n" + + s += " TRX interface specific\n" \ + " -r --remote-addr Set remote address (default %s)\n" \ + " -p --base-port Set base port number (default %d)\n" \ + " -P --bind-port Set local port number (default: random)\n" \ + " -b --bind-addr Set local address (default %s)\n" \ + " -f --fuzzing Send raw payloads (without CMD)\n" \ + + print(s % (self.remote_addr, self.base_port, self.bind_addr)) + + if msg is not None: + print(msg) + + def parse_argv(self): + try: + opts, args = getopt.getopt(sys.argv[1:], + "r:p:P:b:fh", + [ + "help", + "fuzzing", + "base-port=", + "bind-port=", + "bind-addr=", + "remote-addr=", + ]) + except getopt.GetoptError as err: + self.print_help("[!] " + str(err)) + sys.exit(2) + + for o, v in opts: + if o in ("-h", "--help"): + self.print_help() + sys.exit(2) + + elif o in ("-r", "--remote-addr"): + self.remote_addr = v + elif o in ("-b", "--bind-addr"): + self.bind_addr = v + elif o in ("-p", "--base-port"): + self.base_port = int(v) + elif o in ("-P", "--bind-port"): + self.bind_port = int(v) + elif o in ("-f", "--fuzzing"): + self.fuzzing = True + + def run(self): + while True: + self.print_prompt() + + # Wait until we get any data on any socket + socks = [sys.stdin, self.ctrl_link.sock] + r_event, w_event, x_event = select.select(socks, [], []) + + # Check for incoming CTRL commands + if sys.stdin in r_event: + cmd = sys.stdin.readline() + self.handle_cmd(cmd) + + if self.ctrl_link.sock in r_event: + data, addr = self.ctrl_link.sock.recvfrom(128) + sys.stdout.write("\r%s\n" % data.decode()) + sys.stdout.flush() + + def handle_cmd(self, cmd): + # Strip spaces, tabs, etc. + cmd = cmd.strip().strip("\0") + + # Send a command + if self.fuzzing: + self.ctrl_link.send("%s" % cmd) + else: + self.ctrl_link.send("CMD %s\0" % cmd) + + def print_prompt(self): + sys.stdout.write("CTRL# ") + sys.stdout.flush() + + def sig_handler(self, signum, frame): + print("\n\nSignal %d received" % signum) + if signum is signal.SIGINT: + sys.exit(0) + +if __name__ == '__main__': + app = Application() + app.run() diff --git a/src/target/trx_toolkit/ctrl_if.py b/src/target/trx_toolkit/ctrl_if.py new file mode 100644 index 00000000..1e569a60 --- /dev/null +++ b/src/target/trx_toolkit/ctrl_if.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# CTRL interface implementation +# +# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from udp_link import UDPLink + +class CTRLInterface(UDPLink): + def handle_rx(self, data, remote): + if not self.verify_req(data): + print("[!] Wrong data on CTRL interface") + return + + # Attempt to parse a command + request = self.prepare_req(data) + rc = self.parse_cmd(request) + + if type(rc) is tuple: + self.send_response(request, remote, rc[0], rc[1]) + else: + self.send_response(request, remote, rc) + + def verify_req(self, data): + # Verify command signature + return data.startswith("CMD") + + def prepare_req(self, data): + # Strip signature, paddings and \0 + request = data[4:].strip().strip("\0") + # Split into a command and arguments + request = request.split(" ") + # Now we have something like ["TXTUNE", "941600"] + return request + + def verify_cmd(self, request, cmd, argc): + # Check if requested command matches + if request[0] != cmd: + return False + + # And has enough arguments + if len(request) - 1 != argc: + return False + + return True + + def send_response(self, request, remote, response_code, params = None): + # Include status code, for example ["TXTUNE", "0", "941600"] + request.insert(1, str(response_code)) + + # Optionally append command specific parameters + if params is not None: + request += params + + # Add the response signature, and join back to string + response = "RSP " + " ".join(request) + "\0" + # Now we have something like "RSP TXTUNE 0 941600" + self.sendto(response, remote) + + def parse_cmd(self, request): + raise NotImplementedError diff --git a/src/target/trx_toolkit/ctrl_if_bb.py b/src/target/trx_toolkit/ctrl_if_bb.py new file mode 100644 index 00000000..3de14ef7 --- /dev/null +++ b/src/target/trx_toolkit/ctrl_if_bb.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# CTRL interface implementation (OsmocomBB specific) +# +# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from ctrl_if import CTRLInterface + +class CTRLInterfaceBB(CTRLInterface): + # Internal state variables + trx_started = False + burst_fwd = None + rx_freq = None + tx_freq = None + pm = None + + def __init__(self, remote_addr, remote_port, bind_addr, bind_port): + CTRLInterface.__init__(self, remote_addr, remote_port, bind_addr, bind_port) + print("[i] Init CTRL interface for BB (%s)" % self.desc_link()) + + def parse_cmd(self, request): + # Power control + if self.verify_cmd(request, "POWERON", 0): + print("[i] Recv POWERON CMD") + + # Ensure transceiver isn't working + if self.trx_started: + print("[!] Transceiver already started") + return -1 + + # Ensure RX / TX freq. are set + if (self.rx_freq is None) or (self.tx_freq is None): + print("[!] RX / TX freq. are not set") + return -1 + + print("[i] Starting transceiver...") + self.trx_started = True + return 0 + + elif self.verify_cmd(request, "POWEROFF", 0): + print("[i] Recv POWEROFF cmd") + + print("[i] Stopping transceiver...") + self.trx_started = False + return 0 + + # Tuning Control + elif self.verify_cmd(request, "RXTUNE", 1): + print("[i] Recv RXTUNE cmd") + + # TODO: check freq range + self.rx_freq = int(request[1]) * 1000 + self.burst_fwd.bb_freq = self.rx_freq + return 0 + + elif self.verify_cmd(request, "TXTUNE", 1): + print("[i] Recv TXTUNE cmd") + + # TODO: check freq range + self.tx_freq = int(request[1]) * 1000 + return 0 + + # Power measurement + elif self.verify_cmd(request, "MEASURE", 1): + print("[i] Recv MEASURE cmd") + + if self.pm is None: + return -1 + + # TODO: check freq range + meas_freq = int(request[1]) * 1000 + meas_dbm = str(self.pm.measure(meas_freq)) + + return (0, [meas_dbm]) + + elif self.verify_cmd(request, "SETSLOT", 2): + print("[i] Recv SETSLOT cmd") + + if self.burst_fwd is None: + return -1 + + # Obtain TS index + ts = int(request[1]) + if ts not in range(0, 8): + print("[!] TS index should be in range: 0..7") + return -1 + + # Parse TS type + ts_type = int(request[2]) + + # TS activation / deactivation + # We don't care about ts_type + if ts_type == 0: + self.burst_fwd.ts_pass = None + else: + self.burst_fwd.ts_pass = ts + + return 0 + + # Timing Advance + elif self.verify_cmd(request, "SETTA", 1): + print("[i] Recv SETTA cmd") + + # Parse and check TA value + ta = int(request[1]) + if ta < 0 or ta > 63: + print("[!] TA value should be in range: 0..63") + return -1 + + # Save to the BurstForwarder instance + self.burst_fwd.ta = ta + return 0 + + # Timing of Arrival simulation for Uplink + # Absolute form: CMD FAKE_TOA <BASE> <THRESH> + elif self.verify_cmd(request, "FAKE_TOA", 2): + print("[i] Recv FAKE_TOA cmd") + + # Parse and apply both base and threshold + self.burst_fwd.toa256_ul_base = int(request[1]) + self.burst_fwd.toa256_ul_threshold = int(request[2]) + + return 0 + + # Timing of Arrival simulation for Uplink + # Relative form: CMD FAKE_TOA <+-BASE_DELTA> + elif self.verify_cmd(request, "FAKE_TOA", 1): + print("[i] Recv FAKE_TOA cmd") + + # Parse and apply delta + self.burst_fwd.toa256_ul_base += int(request[1]) + + return 0 + + # Wrong / unknown command + else: + # We don't care about other commands, + # so let's merely ignore them ;) + print("[i] Ignore CMD %s" % request[0]) + return 0 diff --git a/src/target/trx_toolkit/ctrl_if_bts.py b/src/target/trx_toolkit/ctrl_if_bts.py new file mode 100644 index 00000000..14886178 --- /dev/null +++ b/src/target/trx_toolkit/ctrl_if_bts.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# CTRL interface implementation (OsmoBTS specific) +# +# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from ctrl_if import CTRLInterface + +class CTRLInterfaceBTS(CTRLInterface): + # Internal state variables + trx_started = False + burst_fwd = None + clck_gen = None + rx_freq = None + tx_freq = None + pm = None + + def __init__(self, remote_addr, remote_port, bind_addr, bind_port): + CTRLInterface.__init__(self, remote_addr, remote_port, bind_addr, bind_port) + print("[i] Init CTRL interface for BTS (%s)" % self.desc_link()) + + def parse_cmd(self, request): + # Power control + if self.verify_cmd(request, "POWERON", 0): + print("[i] Recv POWERON CMD") + + # Ensure transceiver isn't working + if self.trx_started: + print("[!] Transceiver already started") + return -1 + + # Ensure RX / TX freq. are set + if (self.rx_freq is None) or (self.tx_freq is None): + print("[!] RX / TX freq. are not set") + return -1 + + print("[i] Starting transceiver...") + self.trx_started = True + + # Power emulation + if self.pm is not None: + self.pm.add_bts_list([self.tx_freq]) + + # Start clock indications + if self.clck_gen is not None: + self.clck_gen.start() + + return 0 + + elif self.verify_cmd(request, "POWEROFF", 0): + print("[i] Recv POWEROFF cmd") + + print("[i] Stopping transceiver...") + self.trx_started = False + + # Power emulation + if self.pm is not None: + self.pm.del_bts_list([self.tx_freq]) + + # Stop clock indications + if self.clck_gen is not None: + self.clck_gen.stop() + + return 0 + + # Tuning Control + elif self.verify_cmd(request, "RXTUNE", 1): + print("[i] Recv RXTUNE cmd") + + # TODO: check freq range + self.rx_freq = int(request[1]) * 1000 + return 0 + + elif self.verify_cmd(request, "TXTUNE", 1): + print("[i] Recv TXTUNE cmd") + + # TODO: check freq range + self.tx_freq = int(request[1]) * 1000 + self.burst_fwd.bts_freq = self.tx_freq + return 0 + + # Timing of Arrival simulation for Downlink + # Absolute form: CMD FAKE_TOA <BASE> <THRESH> + elif self.verify_cmd(request, "FAKE_TOA", 2): + print("[i] Recv FAKE_TOA cmd") + + # Parse and apply both base and threshold + self.burst_fwd.toa256_dl_base = int(request[1]) + self.burst_fwd.toa256_dl_threshold = int(request[2]) + + return 0 + + # Timing of Arrival simulation for Downlink + # Relative form: CMD FAKE_TOA <+-BASE_DELTA> + elif self.verify_cmd(request, "FAKE_TOA", 1): + print("[i] Recv FAKE_TOA cmd") + + # Parse and apply delta + self.burst_fwd.toa256_dl_base += int(request[1]) + + return 0 + + # Wrong / unknown command + else: + # We don't care about other commands, + # so let's merely ignore them ;) + print("[i] Ignore CMD %s" % request[0]) + return 0 diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py new file mode 100644 index 00000000..1d7805e3 --- /dev/null +++ b/src/target/trx_toolkit/data_dump.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Helpers for DATA capture management +# +# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import struct + +from data_msg import * + +class DATADump: + # Constants + TAG_L12TRX = b'\x01' + TAG_TRX2L1 = b'\x02' + HDR_LENGTH = 3 + + # Generates raw bytes from a DATA message + # Return value: raw message bytes + def dump_msg(self, msg): + # Determine a message type + if isinstance(msg, DATAMSG_L12TRX): + tag = self.TAG_L12TRX + elif isinstance(msg, DATAMSG_TRX2L1): + tag = self.TAG_TRX2L1 + else: + raise ValueError("Unknown message type") + + # Generate a message payload + msg_raw = msg.gen_msg() + + # Calculate and pack the message length + msg_len = len(msg_raw) + + # Pack to unsigned short (2 bytes, BE) + msg_len = struct.pack(">H", msg_len) + + # Concatenate a message with header + return bytearray(tag + msg_len) + msg_raw + + def parse_hdr(self, hdr): + # Extract the header info + msg_len = struct.unpack(">H", hdr[1:3])[0] + tag = hdr[:1] + + # Check if tag is known + if tag == self.TAG_L12TRX: + # L1 -> TRX + msg = DATAMSG_L12TRX() + elif tag == self.TAG_TRX2L1: + # TRX -> L1 + msg = DATAMSG_TRX2L1() + else: + # Unknown tag + return False + + return (msg, msg_len) + +class DATADumpFile(DATADump): + def __init__(self, capture): + # Check if capture file is already opened + if isinstance(capture, str): + print("[i] Opening capture file '%s'..." % capture) + self.f = open(capture, "a+b") + else: + self.f = capture + + def __del__(self): + print("[i] Closing the capture file") + self.f.close() + + # Moves the file descriptor before a specified message + # Return value: + # True in case of success, + # or False in case of EOF or header parsing error. + def _seek2msg(self, idx): + # Seek to the begining of the capture + self.f.seek(0) + + # Read the capture in loop... + for i in range(idx): + # Attempt to read a message header + hdr_raw = self.f.read(self.HDR_LENGTH) + if len(hdr_raw) != self.HDR_LENGTH: + return False + + # Attempt to parse it + rc = self.parse_hdr(hdr_raw) + if rc is False: + print("[!] Couldn't parse a message header") + return False + + # Expand the header + (_, msg_len) = rc + + # Skip a message + self.f.seek(msg_len, 1) + + return True + + # Parses a single message at the current descriptor position + # Return value: + # a parsed message in case of success, + # or None in case of EOF or header parsing error, + # or False in case of message parsing error. + def _parse_msg(self): + # Attempt to read a message header + hdr_raw = self.f.read(self.HDR_LENGTH) + if len(hdr_raw) != self.HDR_LENGTH: + return None + + # Attempt to parse it + rc = self.parse_hdr(hdr_raw) + if rc is False: + print("[!] Couldn't parse a message header") + return None + + # Expand the header + (msg, msg_len) = rc + + # Attempt to read a message + msg_raw = self.f.read(msg_len) + if len(msg_raw) != msg_len: + print("[!] Message length mismatch") + return None + + # Attempt to parse a message + try: + msg_raw = bytearray(msg_raw) + msg.parse_msg(msg_raw) + except: + print("[!] Couldn't parse a message, skipping...") + return False + + # Success + return msg + + # Parses a particular message defined by index idx + # Return value: + # a parsed message in case of success, + # or None in case of EOF or header parsing error, + # or False in case of message parsing error or out of range. + def parse_msg(self, idx): + # Move descriptor to the begining of requested message + rc = self._seek2msg(idx) + if not rc: + print("[!] Couldn't find requested message") + return False + + # Attempt to parse a message + return self._parse_msg() + + # Parses all messages from a given file + # Return value: + # list of parsed messages, + # or False in case of range error. + def parse_all(self, skip = None, count = None): + result = [] + + # Should we skip some messages? + if skip is None: + # Seek to the begining of the capture + self.f.seek(0) + else: + rc = self._seek2msg(skip) + if not rc: + print("[!] Couldn't find requested message") + return False + + # Read the capture in loop... + while True: + # Attempt to parse a message + msg = self._parse_msg() + + # EOF or broken header + if msg is None: + break + + # Skip unparsed messages + if msg is False: + continue + + # Success, append a message + result.append(msg) + + # Count limitation + if count is not None: + if len(result) == count: + break + + return result + + # Writes a new message at the end of the capture + def append_msg(self, msg): + # Generate raw bytes and write + msg_raw = self.dump_msg(msg) + self.f.write(msg_raw) + + # Writes a list of messages at the end of the capture + def append_all(self, msgs): + for msg in msgs: + self.append_msg(msg) + +# Regression tests +if __name__ == '__main__': + from tempfile import TemporaryFile + from gsm_shared import * + import random + + # Create a temporary file + tf = TemporaryFile() + + # Create an instance of DATA dump manager + ddf = DATADumpFile(tf) + + # Generate two random bursts + burst_l12trx = [] + burst_trx2l1 = [] + + for i in range(0, GSM_BURST_LEN): + ubit = random.randint(0, 1) + burst_l12trx.append(ubit) + + sbit = random.randint(-127, 127) + burst_trx2l1.append(sbit) + + # Generate a basic list of random messages + print("[i] Generating the reference messages") + messages_ref = [] + + for i in range(100): + # Create a message + if i % 2: + msg = DATAMSG_L12TRX() + msg.burst = burst_l12trx + else: + msg = DATAMSG_TRX2L1() + msg.burst = burst_trx2l1 + + # Randomize the header + msg.rand_hdr() + + # Append + messages_ref.append(msg) + + print("[i] Adding the following messages to the capture:") + for msg in messages_ref[:3]: + print(" %s: burst_len=%d" + % (msg.desc_hdr(), len(msg.burst))) + + # Check single message appending + ddf.append_msg(messages_ref[0]) + ddf.append_msg(messages_ref[1]) + ddf.append_msg(messages_ref[2]) + + # Read the written messages back + messages_check = ddf.parse_all() + + print("[i] Read the following messages back:") + for msg in messages_check: + print(" %s: burst_len=%d" + % (msg.desc_hdr(), len(msg.burst))) + + # Expecting three messages + assert(len(messages_check) == 3) + + # Check the messages + for i in range(3): + # Compare common header parts and bursts + assert(messages_check[i].burst == messages_ref[i].burst) + assert(messages_check[i].fn == messages_ref[i].fn) + assert(messages_check[i].tn == messages_ref[i].tn) + + # Validate a message + assert(messages_check[i].validate()) + + print("[?] Check append_msg(): OK") + + + # Append the pending reference messages + ddf.append_all(messages_ref[3:]) + + # Read the written messages back + messages_check = ddf.parse_all() + + # Check the final amount + assert(len(messages_check) == len(messages_ref)) + + # Check the messages + for i in range(len(messages_check)): + # Compare common header parts and bursts + assert(messages_check[i].burst == messages_ref[i].burst) + assert(messages_check[i].fn == messages_ref[i].fn) + assert(messages_check[i].tn == messages_ref[i].tn) + + # Validate a message + assert(messages_check[i].validate()) + + print("[?] Check append_all(): OK") + + + # Check parse_msg() + msg0 = ddf.parse_msg(0) + msg10 = ddf.parse_msg(10) + + # Make sure parsing was successful + assert(msg0 and msg10) + + # Compare common header parts and bursts + assert(msg0.burst == messages_ref[0].burst) + assert(msg0.fn == messages_ref[0].fn) + assert(msg0.tn == messages_ref[0].tn) + + assert(msg10.burst == messages_ref[10].burst) + assert(msg10.fn == messages_ref[10].fn) + assert(msg10.tn == messages_ref[10].tn) + + # Validate both messages + assert(msg0.validate()) + assert(msg10.validate()) + + print("[?] Check parse_msg(): OK") + + + # Check parse_all() with range + messages_check = ddf.parse_all(skip = 10, count = 20) + + # Make sure parsing was successful + assert(messages_check) + + # Check the amount + assert(len(messages_check) == 20) + + for i in range(20): + # Compare common header parts and bursts + assert(messages_check[i].burst == messages_ref[i + 10].burst) + assert(messages_check[i].fn == messages_ref[i + 10].fn) + assert(messages_check[i].tn == messages_ref[i + 10].tn) + + # Validate a message + assert(messages_check[i].validate()) + + print("[?] Check parse_all(): OK") diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py new file mode 100644 index 00000000..f4431a46 --- /dev/null +++ b/src/target/trx_toolkit/data_if.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# DATA interface implementation +# +# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from udp_link import UDPLink +from data_msg import * + +class DATAInterface(UDPLink): + + def send_msg(self, msg): + # Validate a message + if not msg.validate(): + raise ValueError("Message incomplete or incorrect") + + # Generate TRX message + payload = msg.gen_msg() + + # Send message + self.send(payload) diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py new file mode 100644 index 00000000..ea415ab9 --- /dev/null +++ b/src/target/trx_toolkit/data_msg.py @@ -0,0 +1,545 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# DATA interface message definitions and helpers +# +# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import random +import struct + +from gsm_shared import * + +class DATAMSG: + # Common message fields + burst = None + fn = None + tn = None + + # Common constructor + def __init__(self, fn = None, tn = None, burst = None): + self.burst = burst + self.fn = fn + self.tn = tn + + # Generates message specific header + def gen_hdr(self): + raise NotImplementedError + + # Parses message specific header + def parse_hdr(self, hdr): + raise NotImplementedError + + # Generates message specific burst + def gen_burst(self): + raise NotImplementedError + + # Parses message specific burst + def parse_burst(self, burst): + raise NotImplementedError + + # Generates a random frame number + def rand_fn(self): + return random.randint(0, GSM_HYPERFRAME) + + # Generates a random timeslot number + def rand_tn(self): + return random.randint(0, 7) + + # Randomizes the message header + def rand_hdr(self): + self.fn = self.rand_fn() + self.tn = self.rand_tn() + + # Generates human-readable header description + def desc_hdr(self): + result = "" + + if self.fn is not None: + result += ("fn=%u " % self.fn) + + if self.tn is not None: + result += ("tn=%u " % self.tn) + + return result + + # Converts unsigned soft-bits {254..0} to soft-bits {-127..127} + def usbit2sbit(self, bits): + buf = [] + + for bit in bits: + if bit == 0xff: + buf.append(-127) + else: + buf.append(127 - bit) + + return buf + + # Converts soft-bits {-127..127} to unsigned soft-bits {254..0} + def sbit2usbit(self, bits): + buf = [] + + for bit in bits: + buf.append(127 - bit) + + return buf + + # Converts soft-bits {-127..127} to bits {1..0} + def sbit2ubit(self, bits): + buf = [] + + for bit in bits: + buf.append(1 if bit < 0 else 0) + + return buf + + # Converts bits {1..0} to soft-bits {-127..127} + def ubit2sbit(self, bits): + buf = [] + + for bit in bits: + buf.append(-127 if bit else 127) + + return buf + + # Validates the message fields + def validate(self): + if self.burst is None: + return False + + if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN): + return False + + if self.fn is None: + return False + + if self.fn < 0 or self.fn > GSM_HYPERFRAME: + return False + + if self.tn is None: + return False + + if self.tn < 0 or self.tn > 7: + return False + + return True + + # Generates frame number to bytes + def gen_fn(self, fn): + # Allocate an empty byte-array + buf = bytearray() + + # Big endian, 4 bytes + buf.append((fn >> 24) & 0xff) + buf.append((fn >> 16) & 0xff) + buf.append((fn >> 8) & 0xff) + buf.append((fn >> 0) & 0xff) + + return buf + + # Parses frame number from bytes + def parse_fn(self, buf): + # Big endian, 4 bytes + return (buf[0] << 24) \ + | (buf[1] << 16) \ + | (buf[2] << 8) \ + | (buf[3] << 0) + + # Generates a TRX DATA message + def gen_msg(self): + # Validate all the fields + if not self.validate(): + raise ValueError("Message incomplete or incorrect") + + # Allocate an empty byte-array + buf = bytearray() + + # Put timeslot index + buf.append(self.tn) + + # Put frame number + fn = self.gen_fn(self.fn) + buf += fn + + # Generate message specific header part + hdr = self.gen_hdr() + buf += hdr + + # Generate burst + buf += self.gen_burst() + + return buf + + # Parses a TRX DATA message + def parse_msg(self, msg): + # Calculate message length + length = len(msg) + + # Check length + if length < (self.HDR_LEN + GSM_BURST_LEN): + raise ValueError("Message is to short") + + # Parse both fn and tn + self.fn = self.parse_fn(msg[1:]) + self.tn = msg[0] + + # Specific message part + self.parse_hdr(msg) + + # Copy burst, skipping header + msg_burst = msg[self.HDR_LEN:] + self.parse_burst(msg_burst) + +class DATAMSG_L12TRX(DATAMSG): + # Constants + HDR_LEN = 6 + PWR_MIN = 0x00 + PWR_MAX = 0xff + + # Specific message fields + pwr = None + + # Validates the message fields + def validate(self): + # Validate common fields + if not DATAMSG.validate(self): + return False + + if self.pwr is None: + return False + + if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX: + return False + + return True + + # Generates a random power level + def rand_pwr(self, min = None, max = None): + if min is None: + min = self.PWR_MIN + + if max is None: + max = self.PWR_MAX + + return random.randint(min, max) + + # Randomizes message specific header + def rand_hdr(self): + DATAMSG.rand_hdr(self) + self.pwr = self.rand_pwr() + + # Generates human-readable header description + def desc_hdr(self): + # Describe the common part + result = DATAMSG.desc_hdr(self) + + if self.pwr is not None: + result += ("pwr=%u " % self.pwr) + + # Strip useless whitespace and return + return result.strip() + + # Generates message specific header part + def gen_hdr(self): + # Allocate an empty byte-array + buf = bytearray() + + # Put power + buf.append(self.pwr) + + return buf + + # Parses message specific header part + def parse_hdr(self, hdr): + # Parse power level + self.pwr = hdr[5] + + # Generates message specific burst + def gen_burst(self): + # Copy burst 'as is' + return bytearray(self.burst) + + # Parses message specific burst + def parse_burst(self, burst): + length = len(burst) + + # Distinguish between GSM and EDGE + if length >= EDGE_BURST_LEN: + self.burst = list(burst[:EDGE_BURST_LEN]) + else: + self.burst = list(burst[:GSM_BURST_LEN]) + + # Transforms this message to TRX2L1 message + def gen_trx2l1(self): + # Allocate a new message + msg = DATAMSG_TRX2L1(fn = self.fn, tn = self.tn) + + # Convert burst bits + if self.burst is not None: + msg.burst = self.ubit2sbit(self.burst) + + return msg + +class DATAMSG_TRX2L1(DATAMSG): + # Constants + HDR_LEN = 8 + RSSI_MIN = -120 + RSSI_MAX = -50 + + # TODO: verify this range + TOA256_MIN = -256 * 200 + TOA256_MAX = 256 * 200 + + # Specific message fields + rssi = None + toa256 = None + + # Validates the message fields + def validate(self): + # Validate common fields + if not DATAMSG.validate(self): + return False + + if self.rssi is None: + return False + + if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX: + return False + + if self.toa256 is None: + return False + + if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX: + return False + + return True + + # Generates a random RSSI value + def rand_rssi(self, min = None, max = None): + if min is None: + min = self.RSSI_MIN + + if max is None: + max = self.RSSI_MAX + + return random.randint(min, max) + + # Generates a ToA (Time of Arrival) value + def rand_toa256(self, min = None, max = None): + if min is None: + min = self.TOA256_MIN + + if max is None: + max = self.TOA256_MAX + + return random.randint(min, max) + + # Randomizes message specific header + def rand_hdr(self): + DATAMSG.rand_hdr(self) + self.rssi = self.rand_rssi() + self.toa256 = self.rand_toa256() + + # Generates human-readable header description + def desc_hdr(self): + # Describe the common part + result = DATAMSG.desc_hdr(self) + + if self.rssi is not None: + result += ("rssi=%d " % self.rssi) + + if self.toa256 is not None: + result += ("toa256=%d " % self.toa256) + + # Strip useless whitespace and return + return result.strip() + + # Generates message specific header part + def gen_hdr(self): + # Allocate an empty byte-array + buf = bytearray() + + # Put RSSI + buf.append(-self.rssi) + + # Encode ToA (Time of Arrival) + # Big endian, 2 bytes (int32_t) + buf.append((self.toa256 >> 8) & 0xff) + buf.append(self.toa256 & 0xff) + + return buf + + # Parses message specific header part + def parse_hdr(self, hdr): + # Parse RSSI + self.rssi = -(hdr[5]) + + # Parse ToA (Time of Arrival) + self.toa256 = struct.unpack(">h", hdr[6:8])[0] + + # Generates message specific burst + def gen_burst(self): + # Convert soft-bits to unsigned soft-bits + burst_usbits = self.sbit2usbit(self.burst) + + # Encode to bytes + return bytearray(burst_usbits) + + # Parses message specific burst + def parse_burst(self, burst): + length = len(burst) + + # Distinguish between GSM and EDGE + if length >= EDGE_BURST_LEN: + burst_usbits = list(burst[:EDGE_BURST_LEN]) + else: + burst_usbits = list(burst[:GSM_BURST_LEN]) + + # Convert unsigned soft-bits to soft-bits + burst_sbits = self.usbit2sbit(burst_usbits) + + # Save + self.burst = burst_sbits + + # Transforms this message to L12TRX message + def gen_l12trx(self): + # Allocate a new message + msg = DATAMSG_L12TRX(fn = self.fn, tn = self.tn) + + # Convert burst bits + if self.burst is not None: + msg.burst = self.sbit2ubit(self.burst) + + return msg + +# Regression test +if __name__ == '__main__': + # Common reference data + fn = 1024 + tn = 0 + + # Generate two random bursts + burst_l12trx_ref = [] + burst_trx2l1_ref = [] + + for i in range(0, GSM_BURST_LEN): + ubit = random.randint(0, 1) + burst_l12trx_ref.append(ubit) + + sbit = random.randint(-127, 127) + burst_trx2l1_ref.append(sbit) + + print("[i] Generating the reference messages") + + # Create messages of both types + msg_l12trx_ref = DATAMSG_L12TRX(fn = fn, tn = tn) + msg_trx2l1_ref = DATAMSG_TRX2L1(fn = fn, tn = tn) + + # Fill in message specific fields + msg_trx2l1_ref.rssi = -88 + msg_l12trx_ref.pwr = 0x33 + msg_trx2l1_ref.toa256 = -256 + + # Specify the reference bursts + msg_l12trx_ref.burst = burst_l12trx_ref + msg_trx2l1_ref.burst = burst_trx2l1_ref + + print("[i] Encoding the reference messages") + + # Encode DATA messages + l12trx_raw = msg_l12trx_ref.gen_msg() + trx2l1_raw = msg_trx2l1_ref.gen_msg() + + print("[i] Parsing generated messages back") + + # Parse generated DATA messages + msg_l12trx_dec = DATAMSG_L12TRX() + msg_trx2l1_dec = DATAMSG_TRX2L1() + msg_l12trx_dec.parse_msg(l12trx_raw) + msg_trx2l1_dec.parse_msg(trx2l1_raw) + + print("[i] Comparing decoded messages with the reference") + + # Compare bursts + assert(msg_l12trx_dec.burst == burst_l12trx_ref) + assert(msg_trx2l1_dec.burst == burst_trx2l1_ref) + + print("[?] Compare bursts: OK") + + # Compare both parsed messages with the reference data + assert(msg_l12trx_dec.fn == fn) + assert(msg_trx2l1_dec.fn == fn) + assert(msg_l12trx_dec.tn == tn) + assert(msg_trx2l1_dec.tn == tn) + + print("[?] Compare FN / TN: OK") + + # Compare message specific parts + assert(msg_trx2l1_dec.rssi == msg_trx2l1_ref.rssi) + assert(msg_l12trx_dec.pwr == msg_l12trx_ref.pwr) + assert(msg_trx2l1_dec.toa256 == msg_trx2l1_ref.toa256) + + print("[?] Compare message specific data: OK") + + # Validate header randomization + for i in range(0, 100): + msg_l12trx_ref.rand_hdr() + msg_trx2l1_ref.rand_hdr() + + assert(msg_l12trx_ref.validate()) + assert(msg_trx2l1_ref.validate()) + + print("[?] Validate header randomization: OK") + + # Bit conversation test + usbits_ref = list(range(0, 256)) + sbits_ref = list(range(-127, 128)) + + # Test both usbit2sbit() and sbit2usbit() + sbits = msg_trx2l1_ref.usbit2sbit(usbits_ref) + usbits = msg_trx2l1_ref.sbit2usbit(sbits) + assert(usbits[:255] == usbits_ref[:255]) + assert(usbits[255] == 254) + + print("[?] Check both usbit2sbit() and sbit2usbit(): OK") + + # Test both sbit2ubit() and ubit2sbit() + ubits = msg_trx2l1_ref.sbit2ubit(sbits_ref) + assert(ubits == ([1] * 127 + [0] * 128)) + + sbits = msg_trx2l1_ref.ubit2sbit(ubits) + assert(sbits == ([-127] * 127 + [127] * 128)) + + print("[?] Check both sbit2ubit() and ubit2sbit(): OK") + + # Test message transformation + msg_l12trx_dec = msg_trx2l1_ref.gen_l12trx() + msg_trx2l1_dec = msg_l12trx_ref.gen_trx2l1() + + assert(msg_l12trx_dec.fn == msg_trx2l1_ref.fn) + assert(msg_l12trx_dec.tn == msg_trx2l1_ref.tn) + + assert(msg_trx2l1_dec.fn == msg_l12trx_ref.fn) + assert(msg_trx2l1_dec.tn == msg_l12trx_ref.tn) + + assert(msg_l12trx_dec.burst == msg_l12trx_dec.sbit2ubit(burst_trx2l1_ref)) + assert(msg_trx2l1_dec.burst == msg_trx2l1_dec.ubit2sbit(burst_l12trx_ref)) + + print("[?] Check L12TRX <-> TRX2L1 type transformations: OK") diff --git a/src/target/trx_toolkit/fake_pm.py b/src/target/trx_toolkit/fake_pm.py new file mode 100644 index 00000000..840b4e40 --- /dev/null +++ b/src/target/trx_toolkit/fake_pm.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Power measurement emulation for BB +# +# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from random import randint + +class FakePM: + # Freq. list for good power level + bts_list = [] + + def __init__(self, noise_min, noise_max, bts_min, bts_max): + # Save power level ranges + self.noise_min = noise_min + self.noise_max = noise_max + self.bts_min = bts_min + self.bts_max = bts_max + + def measure(self, bts): + if bts in self.bts_list: + return randint(self.bts_min, self.bts_max) + else: + return randint(self.noise_min, self.noise_max) + + def update_bts_list(self, new_list): + self.bts_list = new_list + + def add_bts_list(self, add_list): + self.bts_list += add_list + + def del_bts_list(self, del_list): + for item in del_list: + if item in self.bts_list: + self.bts_list.remove(item) diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py new file mode 100755 index 00000000..b818b2a9 --- /dev/null +++ b/src/target/trx_toolkit/fake_trx.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Virtual Um-interface (fake transceiver) +# +# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from copyright import print_copyright +CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")] + +import signal +import getopt +import select +import sys + +from ctrl_if_bts import CTRLInterfaceBTS +from ctrl_if_bb import CTRLInterfaceBB +from burst_fwd import BurstForwarder +from fake_pm import FakePM + +from udp_link import UDPLink +from clck_gen import CLCKGen + +class Application: + # Application variables + bts_addr = "127.0.0.1" + bb_addr = "127.0.0.1" + trx_bind_addr = "0.0.0.0" + bts_base_port = 5700 + bb_base_port = 6700 + + # BurstForwarder field randomization + randomize_dl_toa256 = False + randomize_ul_toa256 = False + randomize_dl_rssi = False + randomize_ul_rssi = False + + def __init__(self): + print_copyright(CR_HOLDERS) + self.parse_argv() + + # Set up signal handlers + signal.signal(signal.SIGINT, self.sig_handler) + + def run(self): + # Init TRX CTRL interface for BTS + self.bts_ctrl = CTRLInterfaceBTS(self.bts_addr, self.bts_base_port + 101, + self.trx_bind_addr, self.bts_base_port + 1) + + # Init TRX CTRL interface for BB + self.bb_ctrl = CTRLInterfaceBB(self.bb_addr, self.bb_base_port + 101, + self.trx_bind_addr, self.bb_base_port + 1) + + # Power measurement emulation + # Noise: -120 .. -105 + # BTS: -75 .. -50 + self.pm = FakePM(-120, -105, -75, -50) + + # Share a FakePM instance between both BTS and BB + self.bts_ctrl.pm = self.pm + self.bb_ctrl.pm = self.pm + + # Init DATA links + self.bts_data = UDPLink(self.bts_addr, self.bts_base_port + 102, + self.trx_bind_addr, self.bts_base_port + 2) + self.bb_data = UDPLink(self.bb_addr, self.bb_base_port + 102, + self.trx_bind_addr, self.bb_base_port + 2) + + # BTS <-> BB burst forwarding + self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data) + self.burst_fwd.randomize_dl_toa256 = self.randomize_dl_toa256 + self.burst_fwd.randomize_ul_toa256 = self.randomize_ul_toa256 + self.burst_fwd.randomize_dl_rssi = self.randomize_dl_rssi + self.burst_fwd.randomize_ul_rssi = self.randomize_ul_rssi + + # Share a BurstForwarder instance between BTS and BB + self.bts_ctrl.burst_fwd = self.burst_fwd + self.bb_ctrl.burst_fwd = self.burst_fwd + + # Provide clock to BTS + self.bts_clck = UDPLink(self.bts_addr, self.bts_base_port + 100, + self.trx_bind_addr, self.bts_base_port) + self.clck_gen = CLCKGen([self.bts_clck]) + self.bts_ctrl.clck_gen = self.clck_gen + + print("[i] Init complete") + + # Enter main loop + while True: + socks = [self.bts_ctrl.sock, self.bb_ctrl.sock, + self.bts_data.sock, self.bb_data.sock] + + # Wait until we get any data on any socket + r_event, w_event, x_event = select.select(socks, [], []) + + # Downlink: BTS -> BB + if self.bts_data.sock in r_event: + self.burst_fwd.bts2bb() + + # Uplink: BB -> BTS + if self.bb_data.sock in r_event: + self.burst_fwd.bb2bts() + + # CTRL commands from BTS + if self.bts_ctrl.sock in r_event: + data, addr = self.bts_ctrl.sock.recvfrom(128) + self.bts_ctrl.handle_rx(data.decode(), addr) + + # CTRL commands from BB + if self.bb_ctrl.sock in r_event: + data, addr = self.bb_ctrl.sock.recvfrom(128) + self.bb_ctrl.handle_rx(data.decode(), addr) + + def shutdown(self): + print("[i] Shutting down...") + + # Stop clock generator + self.clck_gen.stop() + + def print_help(self, msg = None): + s = " Usage: " + sys.argv[0] + " [options]\n\n" \ + " Some help...\n" \ + " -h --help this text\n\n" + + s += " TRX interface specific\n" \ + " -R --bts-addr Set BTS remote address (default %s)\n" \ + " -r --bb-addr Set BB remote address (default %s)\n" \ + " -P --bts-base-port Set BTS base port number (default %d)\n" \ + " -p --bb-base-port Set BB base port number (default %d)\n" \ + " -b --trx-bind-addr Set TRX bind address (default %s)\n\n" + + s += " Simulation\n" \ + " --rand-dl-rssi Enable DL RSSI randomization\n" \ + " --rand-ul-rssi Enable UL RSSI randomization\n" \ + " --rand-dl-toa Enable DL ToA randomization\n" \ + " --rand-ul-toa Enable UL ToA randomization\n" + + print(s % (self.bts_addr, self.bb_addr, + self.bts_base_port, self.bb_base_port, + self.trx_bind_addr)) + + if msg is not None: + print(msg) + + def parse_argv(self): + try: + opts, args = getopt.getopt(sys.argv[1:], + "R:r:P:p:b:h", + [ + "help", + "bts-addr=", "bb-addr=", + "bts-base-port=", "bb-base-port=", + "trx-bind-addr=", + "rand-dl-rssi", "rand-ul-rssi", + "rand-dl-toa", "rand-ul-toa", + ]) + except getopt.GetoptError as err: + self.print_help("[!] " + str(err)) + sys.exit(2) + + for o, v in opts: + if o in ("-h", "--help"): + self.print_help() + sys.exit(2) + + elif o in ("-R", "--bts-addr"): + self.bts_addr = v + elif o in ("-r", "--bb-addr"): + self.bb_addr = v + + elif o in ("-P", "--bts-base-port"): + self.bts_base_port = int(v) + elif o in ("-p", "--bb-base-port"): + self.bb_base_port = int(v) + + elif o in ("-b", "--trx-bind-addr"): + self.trx_bind_addr = v + + # Message field randomization + elif o == "rand-dl-rssi": + self.randomize_dl_rssi = True + elif o == "rand-ul-rssi": + self.randomize_ul_rssi = True + elif o == "rand-dl-toa": + self.randomize_dl_toa256 = True + elif o == "rand-ul-toa": + self.randomize_ul_toa256 = True + + # Ensure there is no overlap between ports + if self.bts_base_port == self.bb_base_port: + self.print_help("[!] BTS and BB base ports should be different") + sys.exit(2) + + bts_ports = [ + self.bts_base_port + 0, self.bts_base_port + 100, + self.bts_base_port + 1, self.bts_base_port + 101, + self.bts_base_port + 2, self.bts_base_port + 102, + ] + + bb_ports = [ + self.bb_base_port + 0, self.bb_base_port + 100, + self.bb_base_port + 1, self.bb_base_port + 101, + self.bb_base_port + 2, self.bb_base_port + 102, + ] + + for p in bb_ports: + if p in bts_ports: + self.print_help("[!] BTS and BB ports overlap detected") + sys.exit(2) + + def sig_handler(self, signum, frame): + print("Signal %d received" % signum) + if signum is signal.SIGINT: + self.shutdown() + sys.exit(0) + +if __name__ == '__main__': + app = Application() + app.run() diff --git a/src/target/trx_toolkit/gsm_shared.py b/src/target/trx_toolkit/gsm_shared.py new file mode 100644 index 00000000..d2f8278b --- /dev/null +++ b/src/target/trx_toolkit/gsm_shared.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Common GSM constants +# +# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +# TDMA definitions +GSM_SUPERFRAME = 26 * 51 +GSM_HYPERFRAME = 2048 * GSM_SUPERFRAME + +# Burst length +GSM_BURST_LEN = 148 +EDGE_BURST_LEN = GSM_BURST_LEN * 3 diff --git a/src/target/trx_toolkit/rand_burst_gen.py b/src/target/trx_toolkit/rand_burst_gen.py new file mode 100644 index 00000000..46c1e090 --- /dev/null +++ b/src/target/trx_toolkit/rand_burst_gen.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Random burst (NB, FB, SB, AB) generator +# +# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import random + +from gsm_shared import * + +class RandBurstGen: + + # GSM 05.02 Chapter 5.2.3 Normal Burst + nb_tsc_list = [ + [ + 0, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, + 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 1, + ], + [ + 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, + 1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, + ], + [ + 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, + 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, + ], + [ + 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, + 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, + ], + [ + 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0, 0, + 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, + ], + [ + 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0, + 0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, + ], + [ + 1, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, + 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 1, 1, + ], + [ + 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 1, 0, + 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, + ], + ] + + # GSM 05.02 Chapter 5.2.5 SCH training sequence + sb_tsc = [ + 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 1, 0, + 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, + 0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1, + 0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1, + ] + + # GSM 05.02 Chapter 5.2.6 Dummy Burst + db_bits = [ + 0, 0, 0, + 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, + 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, + 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, + 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, + 0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, + 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, + 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1, + 1, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, + 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, + 0, 0, 0, + ] + + # GSM 05.02 Chapter 5.2.7 Access burst + ab_tsc = [ + 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, + 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, + 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, + ] + + # Generate a normal burst + def gen_nb(self, seq_idx = 0): + buf = [] + + # Tailing bits + buf += [0] * 3 + + # Random data 1 / 2 + for i in range(0, 57): + buf.append(random.randint(0, 1)) + + # Steal flag 1 / 2 + buf.append(random.randint(0, 1)) + + # Training sequence + buf += self.nb_tsc_list[seq_idx] + + # Steal flag 2 / 2 + buf.append(random.randint(0, 1)) + + # Random data 2 / 2 + for i in range(0, 57): + buf.append(random.randint(0, 1)) + + # Tailing bits + buf += [0] * 3 + + return buf + + # Generate a frequency correction burst + def gen_fb(self): + return [0] * GSM_BURST_LEN + + # Generate a synchronization burst + def gen_sb(self): + buf = [] + + # Tailing bits + buf += [0] * 3 + + # Random data 1 / 2 + for i in range(0, 39): + buf.append(random.randint(0, 1)) + + # Training sequence + buf += self.sb_tsc + + # Random data 2 / 2 + for i in range(0, 39): + buf.append(random.randint(0, 1)) + + # Tailing bits + buf += [0] * 3 + + return buf + + # Generate a dummy burst + def gen_db(self): + return self.db_bits + + # Generate an access burst + def gen_ab(self): + buf = [] + + # Tailing bits + buf += [0] * 8 + + # Training sequence + buf += self.ab_tsc + + # Random data + for i in range(0, 36): + buf.append(random.randint(0, 1)) + + # Tailing bits + buf += [0] * 3 + + # Guard period + buf += [0] * 60 + + return buf diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py new file mode 100755 index 00000000..577e6f97 --- /dev/null +++ b/src/target/trx_toolkit/trx_sniff.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Scapy-based TRX interface sniffer +# +# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +from copyright import print_copyright +CR_HOLDERS = [("2018", "Vadim Yanitskiy <axilirator@gmail.com>")] + +import signal +import getopt +import sys + +import scapy.all + +from data_dump import DATADumpFile +from data_msg import * + +class Application: + # Application variables + sniff_interface = "lo" + sniff_base_port = 5700 + print_bursts = False + output_file = None + + # Counters + cnt_burst_dropped_num = 0 + cnt_burst_break = None + cnt_burst_num = 0 + + cnt_frame_break = None + cnt_frame_last = None + cnt_frame_num = 0 + + # Burst direction fliter + bf_dir_l12trx = None + + # Timeslot number filter + bf_tn_val = None + + # Frame number fliter + bf_fn_lt = None + bf_fn_gt = None + + # Internal variables + lo_trigger = False + + def __init__(self): + print_copyright(CR_HOLDERS) + self.parse_argv() + + # Open requested capture file + if self.output_file is not None: + self.ddf = DATADumpFile(self.output_file) + + def run(self): + # Compose a packet filter + pkt_filter = "udp and (port %d or port %d)" \ + % (self.sniff_base_port + 2, self.sniff_base_port + 102) + + print("[i] Listening on interface '%s'..." % self.sniff_interface) + + # Start sniffing... + scapy.all.sniff(iface = self.sniff_interface, store = 1, + filter = pkt_filter, prn = self.pkt_handler) + + # Scapy registers its own signal handler + self.shutdown() + + def pkt_handler(self, ether): + # Prevent loopback packet duplication + if self.sniff_interface == "lo": + self.lo_trigger = not self.lo_trigger + if not self.lo_trigger: + return + + # Extract a TRX payload + ip = ether.payload + udp = ip.payload + trx = udp.payload + + # Convert to bytearray + msg_raw = bytearray(str(trx)) + + # Determine a burst direction (L1 <-> TRX) + l12trx = udp.sport > udp.dport + + # Create an empty DATA message + msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1() + + # Attempt to parse the payload as a DATA message + try: + msg.parse_msg(msg_raw) + except: + print("[!] Failed to parse message, dropping...") + self.cnt_burst_dropped_num += 1 + return + + # Poke burst pass filter + rc = self.burst_pass_filter(l12trx, msg.fn, msg.tn) + if rc is False: + self.cnt_burst_dropped_num += 1 + return + + # Debug print + print("[i] %s burst: %s" \ + % ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr())) + + # Poke message handler + self.msg_handle(msg) + + # Poke burst counter + rc = self.burst_count(msg.fn, msg.tn) + if rc is True: + self.shutdown() + + def burst_pass_filter(self, l12trx, fn, tn): + # Direction filter + if self.bf_dir_l12trx is not None: + if l12trx != self.bf_dir_l12trx: + return False + + # Timeslot filter + if self.bf_tn_val is not None: + if tn != self.bf_tn_val: + return False + + # Frame number filter + if self.bf_fn_lt is not None: + if fn > self.bf_fn_lt: + return False + if self.bf_fn_gt is not None: + if fn < self.bf_fn_gt: + return False + + # Burst passed ;) + return True + + def msg_handle(self, msg): + if self.print_bursts: + print(msg.burst) + + # Append a new message to the capture + if self.output_file is not None: + self.ddf.append_msg(msg) + + def burst_count(self, fn, tn): + # Update frame counter + if self.cnt_frame_last is None: + self.cnt_frame_last = fn + self.cnt_frame_num += 1 + else: + if fn != self.cnt_frame_last: + self.cnt_frame_num += 1 + + # Update burst counter + self.cnt_burst_num += 1 + + # Stop sniffing after N bursts + if self.cnt_burst_break is not None: + if self.cnt_burst_num == self.cnt_burst_break: + print("[i] Collected required amount of bursts") + return True + + # Stop sniffing after N frames + if self.cnt_frame_break is not None: + if self.cnt_frame_num == self.cnt_frame_break: + print("[i] Collected required amount of frames") + return True + + return False + + def shutdown(self): + print("[i] Shutting down...") + + # Print statistics + print("[i] %u bursts handled, %u dropped" \ + % (self.cnt_burst_num, self.cnt_burst_dropped_num)) + + # Exit + sys.exit(0) + + def print_help(self, msg = None): + s = " Usage: " + sys.argv[0] + " [options]\n\n" \ + " Some help...\n" \ + " -h --help this text\n\n" + + s += " Sniffing options\n" \ + " -i --sniff-interface Set network interface (default '%s')\n" \ + " -p --sniff-base-port Set base port number (default %d)\n\n" + + s += " Processing (no processing by default)\n" \ + " -o --output-file Write bursts to file\n" \ + " -v --print-bits Print burst bits to stdout\n\n" \ + + s += " Count limitations (disabled by default)\n" \ + " --frame-count NUM Stop after sniffing NUM frames\n" \ + " --burst-count NUM Stop after sniffing NUM bursts\n\n" + + s += " Filtering (disabled by default)\n" \ + " --direction DIR Burst direction: L12TRX or TRX2L1\n" \ + " --timeslot NUM TDMA timeslot number [0..7]\n" \ + " --frame-num-lt NUM TDMA frame number lower than NUM\n" \ + " --burst-num-gt NUM TDMA frame number greater than NUM\n" + + print(s % (self.sniff_interface, self.sniff_base_port)) + + if msg is not None: + print(msg) + + def parse_argv(self): + try: + opts, args = getopt.getopt(sys.argv[1:], + "i:p:o:v:h", ["help", "sniff-interface=", "sniff-base-port=", + "frame-count=", "burst-count=", "direction=", + "timeslot=", "frame-num-lt=", "frame-num-gt=", + "output-file=", "print-bits"]) + except getopt.GetoptError as err: + self.print_help("[!] " + str(err)) + sys.exit(2) + + for o, v in opts: + if o in ("-h", "--help"): + self.print_help() + sys.exit(2) + + elif o in ("-i", "--sniff-interface"): + self.sniff_interface = v + elif o in ("-p", "--sniff-base-port"): + self.sniff_base_port = int(v) + + elif o in ("-o", "--output-file"): + self.output_file = v + elif o in ("-v", "--print-bits"): + self.print_bursts = True + + # Break counters + elif o == "--frame-count": + self.cnt_frame_break = int(v) + elif o == "--burst-count": + self.cnt_burst_break = int(v) + + # Direction filter + elif o == "--direction": + if v == "L12TRX": + self.bf_dir_l12trx = True + elif v == "TRX2L1": + self.bf_dir_l12trx = False + else: + self.print_help("[!] Wrong direction argument") + sys.exit(2) + + # Timeslot pass filter + elif o == "--timeslot": + self.bf_tn_val = int(v) + if self.bf_tn_val < 0 or self.bf_tn_val > 7: + self.print_help("[!] Wrong timeslot value") + sys.exit(2) + + # Frame number pass filter + elif o == "--frame-num-lt": + self.bf_fn_lt = int(v) + elif o == "--frame-num-gt": + self.bf_fn_gt = int(v) + +if __name__ == '__main__': + app = Application() + app.run() diff --git a/src/target/trx_toolkit/udp_link.py b/src/target/trx_toolkit/udp_link.py new file mode 100644 index 00000000..b378b635 --- /dev/null +++ b/src/target/trx_toolkit/udp_link.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# UDP link implementation +# +# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com> +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import socket + +class UDPLink: + def __init__(self, remote_addr, remote_port, bind_addr = '0.0.0.0', bind_port = 0): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind((bind_addr, bind_port)) + self.sock.setblocking(0) + + # Save remote info + self.remote_addr = remote_addr + self.remote_port = remote_port + + def __del__(self): + self.sock.close() + + def desc_link(self): + (bind_addr, bind_port) = self.sock.getsockname() + + return "L:%s:%u <-> R:%s:%u" \ + % (bind_addr, bind_port, self.remote_addr, self.remote_port) + + def send(self, data): + if type(data) not in [bytearray, bytes]: + data = data.encode() + + self.sock.sendto(data, (self.remote_addr, self.remote_port)) + + def sendto(self, data, remote): + if type(data) not in [bytearray, bytes]: + data = data.encode() + + self.sock.sendto(data, remote) |