summaryrefslogtreecommitdiffstats
path: root/src/target
diff options
context:
space:
mode:
authorHarald Welte <laforge@gnumonks.org>2018-04-07 19:34:19 +0200
committerHarald Welte <laforge@gnumonks.org>2018-04-07 19:35:24 +0200
commit05d95a46fdcdaa53afe45f0afa704ed349a3ff57 (patch)
tree4c44e40eb9d69faa8ebf6e58621cb2aef8f40a78 /src/target
parentf9ac7eb36ebc97c68a98c700f801015c0fc9c8ad (diff)
parent00bfb39d6c0f561a53aa7642e5f005c061c668ad (diff)
Merge 'fixeria/trx' into master
Diffstat (limited to 'src/target')
-rw-r--r--src/target/trx_toolkit/.gitignore4
-rw-r--r--src/target/trx_toolkit/README34
-rw-r--r--src/target/trx_toolkit/burst_fwd.py216
-rwxr-xr-xsrc/target/trx_toolkit/burst_gen.py248
-rwxr-xr-xsrc/target/trx_toolkit/burst_send.py218
-rwxr-xr-xsrc/target/trx_toolkit/clck_gen.py116
-rw-r--r--src/target/trx_toolkit/copyright.py13
-rwxr-xr-xsrc/target/trx_toolkit/ctrl_cmd.py147
-rw-r--r--src/target/trx_toolkit/ctrl_if.py79
-rw-r--r--src/target/trx_toolkit/ctrl_if_bb.py158
-rw-r--r--src/target/trx_toolkit/ctrl_if_bts.py126
-rw-r--r--src/target/trx_toolkit/data_dump.py360
-rw-r--r--src/target/trx_toolkit/data_if.py39
-rw-r--r--src/target/trx_toolkit/data_msg.py545
-rw-r--r--src/target/trx_toolkit/fake_pm.py53
-rwxr-xr-xsrc/target/trx_toolkit/fake_trx.py236
-rw-r--r--src/target/trx_toolkit/gsm_shared.py31
-rw-r--r--src/target/trx_toolkit/rand_burst_gen.py177
-rwxr-xr-xsrc/target/trx_toolkit/trx_sniff.py286
-rw-r--r--src/target/trx_toolkit/udp_link.py57
20 files changed, 3143 insertions, 0 deletions
diff --git a/src/target/trx_toolkit/.gitignore b/src/target/trx_toolkit/.gitignore
new file mode 100644
index 00000000..749ccdaf
--- /dev/null
+++ b/src/target/trx_toolkit/.gitignore
@@ -0,0 +1,4 @@
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
diff --git a/src/target/trx_toolkit/README b/src/target/trx_toolkit/README
new file mode 100644
index 00000000..91b6099b
--- /dev/null
+++ b/src/target/trx_toolkit/README
@@ -0,0 +1,34 @@
+TRX toolkit is a set of tools intended for hacking and debugging
+a TRX interface between both transceiver and L1 software, and
+emulating a virtual Um-interface between OsmocomBB and OsmoBTS.
+
+Brief description of available applications:
+
+ - fake_trx.py - main application, that allows to connect both
+ OsmocomBB and OsmoBTS without actual RF hardware. Currently
+ only a single MS may work with a single BTS.
+
+ - clck_gen.py - a peripheral tool aimed to emulate TDMA frame
+ clock generator. Could be used for testing and clock
+ synchronization of multiple applications. It should be noted,
+ that one relays on generic system timer (via Python), so
+ a random clock jitter takes place.
+
+ - ctrl_cmd.py - another peripheral tool, which could be used
+ for sending CTRL commands directly in manual mode, and also
+ for application fuzzing.
+
+ - burst_gen.py - a tool for sending GSM bursts either to L1
+ (OsmoBTS or OsmocomBB) or to TRX (OsmoTRX and GR-GSM TRX).
+ Currently it is only possible to generate random bursts of
+ different types: NB, FB, SB, AB.
+
+ - burst_send.py - a tool for sending existing bursts from a
+ capture file either to L1 (OsmoBTS or OsmocomBB) or to
+ TRX (e.g. OsmoTRX or GR-GSM TRX).
+
+ - trx_sniff.py - Scapy-based TRX protocol sniffer. Allows one
+ to observe a single connection between TRX and L1, and vice
+ versa. Also provides some capabilities for filtering bursts
+ by direction, frame and timeslot numbers, and for recording
+ captured messages to a binary file.
diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py
new file mode 100644
index 00000000..144ae5f4
--- /dev/null
+++ b/src/target/trx_toolkit/burst_fwd.py
@@ -0,0 +1,216 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# BTS <-> BB burst forwarding
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import random
+
+from data_msg import *
+
+class BurstForwarder:
+ # Timeslot filter (drop everything by default)
+ ts_pass = None
+
+ # Freq. filter
+ bts_freq = None
+ bb_freq = None
+
+ # Randomization of RSSI
+ randomize_dl_rssi = False
+ randomize_ul_rssi = False
+
+ # Randomization of ToA
+ randomize_dl_toa256 = False
+ randomize_ul_toa256 = False
+
+ # Timing Advance value indicated by MS (0 by default)
+ # Valid range: 0..63, where each unit means
+ # one GSM symbol advance.
+ ta = 0
+
+ # Timing of Arrival values indicated by transceiver
+ # in units of 1/256 of GSM symbol periods. A pair of
+ # base and threshold values defines a range of ToA value
+ # randomization: from (base - threshold) to (base + threshold).
+ toa256_dl_base = 0
+ toa256_ul_base = 0
+
+ toa256_dl_threshold = 128
+ toa256_ul_threshold = 128
+
+ # RSSI values indicated by transceiver in dBm.
+ # A pair of base and threshold values defines a range of RSSI
+ # randomization: from (base - threshold) to (base + threshold).
+ rssi_dl_base = -60
+ rssi_ul_base = -70
+
+ rssi_dl_threshold = 10
+ rssi_ul_threshold = 5
+
+ def __init__(self, bts_link, bb_link):
+ self.bts_link = bts_link
+ self.bb_link = bb_link
+
+ # Converts TA value from symbols to
+ # units of 1/256 of GSM symbol periods
+ def calc_ta256(self):
+ return self.ta * 256
+
+ # Calculates a random ToA value for Downlink bursts
+ def calc_dl_toa256(self):
+ # Check if randomization is required
+ if not self.randomize_dl_toa256:
+ return self.toa256_dl_base
+
+ # Calculate a range for randomization
+ toa256_min = self.toa256_dl_base - self.toa256_dl_threshold
+ toa256_max = self.toa256_dl_base + self.toa256_dl_threshold
+
+ # Generate a random ToA value
+ toa256 = random.randint(toa256_min, toa256_max)
+
+ return toa256
+
+ # Calculates a random ToA value for Uplink bursts
+ def calc_ul_toa256(self):
+ # Check if randomization is required
+ if not self.randomize_ul_toa256:
+ return self.toa256_ul_base
+
+ # Calculate a range for randomization
+ toa256_min = self.toa256_ul_base - self.toa256_ul_threshold
+ toa256_max = self.toa256_ul_base + self.toa256_ul_threshold
+
+ # Generate a random ToA value
+ toa256 = random.randint(toa256_min, toa256_max)
+
+ return toa256
+
+ # Calculates a random RSSI value for Downlink bursts
+ def calc_dl_rssi(self):
+ # Check if randomization is required
+ if not self.randomize_dl_rssi:
+ return self.rssi_dl_base
+
+ # Calculate a range for randomization
+ rssi_min = self.rssi_dl_base - self.rssi_dl_threshold
+ rssi_max = self.rssi_dl_base + self.rssi_dl_threshold
+
+ # Generate a random RSSI value
+ return random.randint(rssi_min, rssi_max)
+
+ # Calculates a random RSSI value for Uplink bursts
+ def calc_ul_rssi(self):
+ # Check if randomization is required
+ if not self.randomize_ul_rssi:
+ return self.rssi_ul_base
+
+ # Calculate a range for randomization
+ rssi_min = self.rssi_ul_base - self.rssi_ul_threshold
+ rssi_max = self.rssi_ul_base + self.rssi_ul_threshold
+
+ # Generate a random RSSI value
+ return random.randint(rssi_min, rssi_max)
+
+ # Converts a L12TRX message to TRX2L1 message
+ def transform_msg(self, msg_raw, dl = True):
+ # Attempt to parse a message
+ try:
+ msg_l12trx = DATAMSG_L12TRX()
+ msg_l12trx.parse_msg(bytearray(msg_raw))
+ except:
+ print("[!] Dropping unhandled DL message...")
+ return None
+
+ # Compose a new message for L1
+ msg_trx2l1 = msg_l12trx.gen_trx2l1()
+
+ # Randomize both RSSI and ToA values
+ if dl:
+ msg_trx2l1.toa256 = self.calc_dl_toa256()
+ msg_trx2l1.rssi = self.calc_dl_rssi()
+ else:
+ msg_trx2l1.toa256 = self.calc_ul_toa256()
+ msg_trx2l1.toa256 -= self.calc_ta256()
+ msg_trx2l1.rssi = self.calc_ul_rssi()
+
+ return msg_trx2l1
+
+ # Downlink handler: BTS -> BB
+ def bts2bb(self):
+ # Read data from socket
+ data, addr = self.bts_link.sock.recvfrom(512)
+
+ # BB is not connected / tuned
+ if self.bb_freq is None:
+ return None
+
+ # Freq. filter
+ if self.bb_freq != self.bts_freq:
+ return None
+
+ # Process a message
+ msg = self.transform_msg(data, dl = True)
+ if msg is None:
+ return None
+
+ # Timeslot filter
+ if msg.tn != self.ts_pass:
+ return None
+
+ # Validate and generate the payload
+ payload = msg.gen_msg()
+
+ # Append two unused bytes at the end
+ # in order to keep the compatibility
+ payload += bytearray(2)
+
+ # Send burst to BB
+ self.bb_link.send(payload)
+
+ # Uplink handler: BB -> BTS
+ def bb2bts(self):
+ # Read data from socket
+ data, addr = self.bb_link.sock.recvfrom(512)
+
+ # BTS is not connected / tuned
+ if self.bts_freq is None:
+ return None
+
+ # Freq. filter
+ if self.bb_freq != self.bts_freq:
+ return None
+
+ # Process a message
+ msg = self.transform_msg(data, dl = False)
+ if msg is None:
+ return None
+
+ # Validate and generate the payload
+ payload = msg.gen_msg()
+
+ # Append two unused bytes at the end
+ # in order to keep the compatibility
+ payload += bytearray(2)
+
+ # Send burst to BTS
+ self.bts_link.send(payload)
diff --git a/src/target/trx_toolkit/burst_gen.py b/src/target/trx_toolkit/burst_gen.py
new file mode 100755
index 00000000..d83f1378
--- /dev/null
+++ b/src/target/trx_toolkit/burst_gen.py
@@ -0,0 +1,248 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Auxiliary tool to generate and send random bursts via TRX DATA
+# interface, which may be useful for fuzzing and testing
+#
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from copyright import print_copyright
+CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
+
+import signal
+import getopt
+import sys
+
+from rand_burst_gen import RandBurstGen
+from data_dump import DATADumpFile
+from data_if import DATAInterface
+from gsm_shared import *
+from data_msg import *
+
+class Application:
+ # Application variables
+ remote_addr = "127.0.0.1"
+ bind_addr = "0.0.0.0"
+ base_port = 5700
+ conn_mode = "TRX"
+ output_file = None
+
+ burst_type = None
+ burst_count = 1
+
+ # Common header fields
+ fn = None
+ tn = None
+
+ # Message specific header fields
+ toa256 = None
+ rssi = None
+ pwr = None
+
+ def __init__(self):
+ print_copyright(CR_HOLDERS)
+ self.parse_argv()
+ self.check_argv()
+
+ # Set up signal handlers
+ signal.signal(signal.SIGINT, self.sig_handler)
+
+ # Open requested capture file
+ if self.output_file is not None:
+ self.ddf = DATADumpFile(self.output_file)
+
+ def run(self):
+ # Init DATA interface with TRX or L1
+ if self.conn_mode == "TRX":
+ self.data_if = DATAInterface(self.remote_addr, self.base_port + 2,
+ self.bind_addr, self.base_port + 102)
+ elif self.conn_mode == "L1":
+ self.data_if = DATAInterface(self.remote_addr, self.base_port + 102,
+ self.bind_addr, self.base_port + 2)
+
+ # Init random burst generator
+ burst_gen = RandBurstGen()
+
+ # Init an empty DATA message
+ if self.conn_mode == "TRX":
+ msg = DATAMSG_L12TRX()
+ elif self.conn_mode == "L1":
+ msg = DATAMSG_TRX2L1()
+
+ # Generate a random frame number or use provided one
+ fn_init = msg.rand_fn() if self.fn is None else self.fn
+
+ # Send as much bursts as required
+ for i in range(self.burst_count):
+ # Randomize the message header
+ msg.rand_hdr()
+
+ # Increase and set frame number
+ msg.fn = (fn_init + i) % GSM_HYPERFRAME
+
+ # Set timeslot number
+ if self.tn is not None:
+ msg.tn = self.tn
+
+ # Set transmit power level
+ if self.pwr is not None:
+ msg.pwr = self.pwr
+
+ # Set time of arrival
+ if self.toa256 is not None:
+ msg.toa256 = self.toa256
+
+ # Set RSSI
+ if self.rssi is not None:
+ msg.rssi = self.rssi
+
+ # Generate a random burst
+ if self.burst_type == "NB":
+ burst = burst_gen.gen_nb()
+ elif self.burst_type == "FB":
+ burst = burst_gen.gen_fb()
+ elif self.burst_type == "SB":
+ burst = burst_gen.gen_sb()
+ elif self.burst_type == "AB":
+ burst = burst_gen.gen_ab()
+
+ # Convert to soft-bits in case of TRX -> L1 message
+ if self.conn_mode == "L1":
+ burst = msg.ubit2sbit(burst)
+
+ # Set burst
+ msg.burst = burst
+
+ print("[i] Sending %d/%d %s burst %s to %s..."
+ % (i + 1, self.burst_count, self.burst_type,
+ msg.desc_hdr(), self.conn_mode))
+
+ # Send message
+ self.data_if.send_msg(msg)
+
+ # Append a new message to the capture
+ if self.output_file is not None:
+ self.ddf.append_msg(msg)
+
+ def print_help(self, msg = None):
+ s = " Usage: " + sys.argv[0] + " [options]\n\n" \
+ " Some help...\n" \
+ " -h --help this text\n\n"
+
+ s += " TRX interface specific\n" \
+ " -o --output-file Write bursts to a capture file\n" \
+ " -m --conn-mode Send bursts to: TRX (default) / L1\n" \
+ " -r --remote-addr Set remote address (default %s)\n" \
+ " -b --bind-addr Set local address (default %s)\n" \
+ " -p --base-port Set base port number (default %d)\n\n"
+
+ s += " Burst generation\n" \
+ " -b --burst-type Random burst type (NB, FB, SB, AB)\n" \
+ " -c --burst-count How much bursts to send (default 1)\n" \
+ " -f --frame-number Set frame number (default random)\n" \
+ " -t --timeslot Set timeslot index (default random)\n" \
+ " --pwr Set power level (default random)\n" \
+ " --rssi Set RSSI (default random)\n" \
+ " --toa Set ToA in symbols (default random)\n" \
+ " --toa256 Set ToA in 1/256 symbol periods\n"
+
+ print(s % (self.remote_addr, self.bind_addr, self.base_port))
+
+ if msg is not None:
+ print(msg)
+
+ def parse_argv(self):
+ try:
+ opts, args = getopt.getopt(sys.argv[1:],
+ "o:m:r:b:p:b:c:f:t:h",
+ [
+ "help",
+ "output-file="
+ "conn-mode=",
+ "remote-addr=",
+ "bind-addr=",
+ "base-port=",
+ "burst-type=",
+ "burst-count=",
+ "frame-number=",
+ "timeslot=",
+ "rssi=",
+ "toa=",
+ "toa256=",
+ "pwr=",
+ ])
+ except getopt.GetoptError as err:
+ self.print_help("[!] " + str(err))
+ sys.exit(2)
+
+ for o, v in opts:
+ if o in ("-h", "--help"):
+ self.print_help()
+ sys.exit(2)
+
+ elif o in ("-o", "--output-file"):
+ self.output_file = v
+ elif o in ("-m", "--conn-mode"):
+ self.conn_mode = v
+ elif o in ("-r", "--remote-addr"):
+ self.remote_addr = v
+ elif o in ("-b", "--bind-addr"):
+ self.bind_addr = v
+ elif o in ("-p", "--base-port"):
+ self.base_port = int(v)
+
+ elif o in ("-b", "--burst-type"):
+ self.burst_type = v
+ elif o in ("-c", "--burst-count"):
+ self.burst_count = int(v)
+ elif o in ("-f", "--frame-number"):
+ self.fn = int(v)
+ elif o in ("-t", "--timeslot"):
+ self.tn = int(v)
+
+ # Message specific header fields
+ elif o == "--pwr":
+ self.pwr = int(v)
+ elif o == "--rssi":
+ self.rssi = int(v)
+ elif o == "--toa256":
+ self.toa256 = int(v)
+ elif o == "--toa":
+ self.toa256 = int(float(v) * 256.0 + 0.5)
+
+ def check_argv(self):
+ # Check connection mode
+ if self.conn_mode not in ("TRX", "L1"):
+ self.print_help("[!] Unknown connection type")
+ sys.exit(2)
+
+ # Check connection mode
+ if self.burst_type not in ("NB", "FB", "SB", "AB"):
+ self.print_help("[!] Unknown burst type")
+ sys.exit(2)
+
+ def sig_handler(self, signum, frame):
+ print("Signal %d received" % signum)
+ if signum is signal.SIGINT:
+ sys.exit(0)
+
+if __name__ == '__main__':
+ app = Application()
+ app.run()
diff --git a/src/target/trx_toolkit/burst_send.py b/src/target/trx_toolkit/burst_send.py
new file mode 100755
index 00000000..f6c85ba0
--- /dev/null
+++ b/src/target/trx_toolkit/burst_send.py
@@ -0,0 +1,218 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Auxiliary tool to send existing bursts via TRX DATA interface
+#
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from copyright import print_copyright
+CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
+
+import signal
+import getopt
+import sys
+
+from data_dump import DATADumpFile
+from data_if import DATAInterface
+from gsm_shared import *
+from data_msg import *
+
+class Application:
+ # Application variables
+ remote_addr = "127.0.0.1"
+ bind_addr = "0.0.0.0"
+ base_port = 5700
+ conn_mode = "TRX"
+
+ # Burst source
+ capture_file = None
+
+ # Count limitations
+ msg_skip = None
+ msg_count = None
+
+ # Pass filtering
+ pf_fn_lt = None
+ pf_fn_gt = None
+ pf_tn = None
+
+ def __init__(self):
+ print_copyright(CR_HOLDERS)
+ self.parse_argv()
+
+ # Set up signal handlers
+ signal.signal(signal.SIGINT, self.sig_handler)
+
+ # Open requested capture file
+ self.ddf = DATADumpFile(self.capture_file)
+
+ def run(self):
+ # Init DATA interface with TRX or L1
+ if self.conn_mode == "TRX":
+ self.data_if = DATAInterface(self.remote_addr, self.base_port + 2,
+ self.bind_addr, self.base_port + 102)
+ l12trx = True
+ elif self.conn_mode == "L1":
+ self.data_if = DATAInterface(self.remote_addr, self.base_port + 102,
+ self.bind_addr, self.base_port + 2)
+ l12trx = False
+ else:
+ self.print_help("[!] Unknown connection type")
+ sys.exit(2)
+
+ # Read messages from the capture
+ messages = self.ddf.parse_all(
+ skip = self.msg_skip, count = self.msg_count)
+ if messages is False:
+ pass # FIXME!!!
+
+ for msg in messages:
+ # Pass filter
+ if not self.msg_pass_filter(l12trx, msg):
+ continue
+
+ print("[i] Sending a burst %s to %s..."
+ % (msg.desc_hdr(), self.conn_mode))
+
+ # Send message
+ self.data_if.send_msg(msg)
+
+ def msg_pass_filter(self, l12trx, msg):
+ # Direction filter
+ if isinstance(msg, DATAMSG_L12TRX) and not l12trx:
+ return False
+ elif isinstance(msg, DATAMSG_TRX2L1) and l12trx:
+ return False
+
+ # Timeslot filter
+ if self.pf_tn is not None:
+ if msg.tn != self.pf_tn:
+ return False
+
+ # Frame number filter
+ if self.pf_fn_lt is not None:
+ if msg.fn > self.pf_fn_lt:
+ return False
+ if self.pf_fn_gt is not None:
+ if msg.fn < self.pf_fn_gt:
+ return False
+
+ # Burst passed ;)
+ return True
+
+ def print_help(self, msg = None):
+ s = " Usage: " + sys.argv[0] + " [options]\n\n" \
+ " Some help...\n" \
+ " -h --help this text\n\n"
+
+ s += " TRX interface specific\n" \
+ " -m --conn-mode Send bursts to: TRX (default) / L1\n" \
+ " -r --remote-addr Set remote address (default %s)\n" \
+ " -b --bind-addr Set bind address (default %s)\n" \
+ " -p --base-port Set base port number (default %d)\n\n"
+
+ s += " Burst source\n" \
+ " -i --capture-file Read bursts from capture file\n\n" \
+
+ s += " Count limitations (disabled by default)\n" \
+ " --msg-skip NUM Skip NUM messages before sending\n" \
+ " --msg-count NUM Stop after sending NUM messages\n\n" \
+
+ s += " Filtering (disabled by default)\n" \
+ " --timeslot NUM TDMA timeslot number [0..7]\n" \
+ " --frame-num-lt NUM TDMA frame number lower than NUM\n" \
+ " --frame-num-gt NUM TDMA frame number greater than NUM\n"
+
+ print(s % (self.remote_addr, self.bind_addr, self.base_port))
+
+ if msg is not None:
+ print(msg)
+
+ def parse_argv(self):
+ try:
+ opts, args = getopt.getopt(sys.argv[1:],
+ "m:r:b:p:i:h",
+ [
+ "help",
+ "conn-mode=",
+ "remote-addr=",
+ "bind-addr=",
+ "base-port=",
+ "capture-file=",
+ "msg-skip=",
+ "msg-count=",
+ "timeslot=",
+ "frame-num-lt=",
+ "frame-num-gt=",
+ ])
+ except getopt.GetoptError as err:
+ self.print_help("[!] " + str(err))
+ sys.exit(2)
+
+ for o, v in opts:
+ if o in ("-h", "--help"):
+ self.print_help()
+ sys.exit(2)
+
+ # Capture file
+ elif o in ("-i", "--capture-file"):
+ self.capture_file = v
+
+ # TRX interface specific
+ elif o in ("-m", "--conn-mode"):
+ self.conn_mode = v
+ elif o in ("-r", "--remote-addr"):
+ self.remote_addr = v
+ elif o in ("-b", "--bind-addr"):
+ self.bind_addr = v
+ elif o in ("-p", "--base-port"):
+ self.base_port = int(v)
+
+ # Count limitations
+ elif o == "--msg-skip":
+ self.msg_skip = int(v)
+ elif o == "--msg-count":
+ self.msg_count = int(v)
+
+ # Timeslot pass filter
+ elif o == "--timeslot":
+ self.pf_tn = int(v)
+ if self.pf_tn < 0 or self.pf_tn > 7:
+ self.print_help("[!] Wrong timeslot value")
+ sys.exit(2)
+
+ # Frame number pass filter
+ elif o == "--frame-num-lt":
+ self.pf_fn_lt = int(v)
+ elif o == "--frame-num-gt":
+ self.pf_fn_gt = int(v)
+
+ if self.capture_file is None:
+ self.print_help("[!] Please specify a capture file")
+ sys.exit(2)
+
+ def sig_handler(self, signum, frame):
+ print("Signal %d received" % signum)
+ if signum is signal.SIGINT:
+ sys.exit(0)
+
+if __name__ == '__main__':
+ app = Application()
+ app.run()
diff --git a/src/target/trx_toolkit/clck_gen.py b/src/target/trx_toolkit/clck_gen.py
new file mode 100755
index 00000000..b488770e
--- /dev/null
+++ b/src/target/trx_toolkit/clck_gen.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Simple TDMA frame clock generator
+#
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from copyright import print_copyright
+CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
+
+import signal
+import time
+import sys
+
+from threading import Timer
+from udp_link import UDPLink
+from gsm_shared import *
+
+class CLCKGen:
+ # GSM TDMA definitions
+ SEC_DELAY_US = 1000 * 1000
+ GSM_FRAME_US = 4615.0
+
+ # Average loop back delay
+ LO_DELAY_US = 90.0
+
+ # State variables
+ timer = None
+
+ def __init__(self, clck_links, clck_start = 0, ind_period = 102):
+ self.clck_links = clck_links
+ self.ind_period = ind_period
+ self.clck_start = clck_start
+ self.clck_src = clck_start
+
+ # Calculate counter time
+ self.ctr_interval = self.GSM_FRAME_US - self.LO_DELAY_US
+ self.ctr_interval /= self.SEC_DELAY_US
+ self.ctr_interval *= self.ind_period
+
+ def start(self):
+ # Send the first indication
+ self.send_clck_ind()
+
+ def stop(self):
+ # Stop pending timer
+ if self.timer is not None:
+ self.timer.cancel()
+ self.timer = None
+
+ # Reset the clock source
+ self.clck_src = self.clck_start
+
+ def send_clck_ind(self):
+ # Keep clock cycle
+ if self.clck_src % GSM_HYPERFRAME >= 0:
+ self.clck_src %= GSM_HYPERFRAME
+
+ # We don't need to send so often
+ if self.clck_src % self.ind_period == 0:
+ # Create UDP payload
+ payload = "IND CLOCK %u\0" % self.clck_src
+
+ # Send indication to all UDP links
+ for link in self.clck_links:
+ link.send(payload)
+
+ # Debug print
+ print("[T] %s" % payload)
+
+ # Increase frame count
+ self.clck_src += self.ind_period
+
+ # Schedule a new indication
+ self.timer = Timer(self.ctr_interval, self.send_clck_ind)
+ self.timer.start()
+
+# Just a wrapper for independent usage
+class Application:
+ def __init__(self):
+ # Print copyright
+ print_copyright(CR_HOLDERS)
+
+ # Set up signal handlers
+ signal.signal(signal.SIGINT, self.sig_handler)
+
+ def run(self):
+ self.link = UDPLink("127.0.0.1", 5800, "0.0.0.0", 5700)
+ self.clck = CLCKGen([self.link], ind_period = 51)
+ self.clck.start()
+
+ def sig_handler(self, signum, frame):
+ print("Signal %d received" % signum)
+ if signum is signal.SIGINT:
+ self.clck.stop()
+
+if __name__ == '__main__':
+ app = Application()
+ app.run()
diff --git a/src/target/trx_toolkit/copyright.py b/src/target/trx_toolkit/copyright.py
new file mode 100644
index 00000000..3d3597fd
--- /dev/null
+++ b/src/target/trx_toolkit/copyright.py
@@ -0,0 +1,13 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+def print_copyright(holders = []):
+ # Print copyright holders if any
+ for date, author in holders:
+ print("Copyright (C) %s by %s" % (date, author))
+
+ # Print the license header itself
+ print("License GPLv2+: GNU GPL version 2 or later " \
+ "<http://gnu.org/licenses/gpl.html>\n" \
+ "This is free software: you are free to change and redistribute it.\n" \
+ "There is NO WARRANTY, to the extent permitted by law.\n")
diff --git a/src/target/trx_toolkit/ctrl_cmd.py b/src/target/trx_toolkit/ctrl_cmd.py
new file mode 100755
index 00000000..e56105a8
--- /dev/null
+++ b/src/target/trx_toolkit/ctrl_cmd.py
@@ -0,0 +1,147 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Auxiliary tool to send custom commands via TRX CTRL interface,
+# which may be useful for testing and fuzzing
+#
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from copyright import print_copyright
+CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
+
+import signal
+import getopt
+import select
+import sys
+
+from udp_link import UDPLink
+
+class Application:
+ # Application variables
+ remote_addr = "127.0.0.1"
+ bind_addr = "0.0.0.0"
+ base_port = 5700
+ bind_port = 0
+ fuzzing = False
+
+ def __init__(self):
+ print_copyright(CR_HOLDERS)
+ self.parse_argv()
+
+ # Set up signal handlers
+ signal.signal(signal.SIGINT, self.sig_handler)
+
+ # Init UDP connection
+ self.ctrl_link = UDPLink(self.remote_addr, self.base_port + 1,
+ self.bind_addr, self.bind_port)
+
+ # Debug print
+ print("[i] Init CTRL interface (%s)" \
+ % self.ctrl_link.desc_link())
+
+ def print_help(self, msg = None):
+ s = " Usage: " + sys.argv[0] + " [options]\n\n" \
+ " Some help...\n" \
+ " -h --help this text\n\n"
+
+ s += " TRX interface specific\n" \
+ " -r --remote-addr Set remote address (default %s)\n" \
+ " -p --base-port Set base port number (default %d)\n" \
+ " -P --bind-port Set local port number (default: random)\n" \
+ " -b --bind-addr Set local address (default %s)\n" \
+ " -f --fuzzing Send raw payloads (without CMD)\n" \
+
+ print(s % (self.remote_addr, self.base_port, self.bind_addr))
+
+ if msg is not None:
+ print(msg)
+
+ def parse_argv(self):
+ try:
+ opts, args = getopt.getopt(sys.argv[1:],
+ "r:p:P:b:fh",
+ [
+ "help",
+ "fuzzing",
+ "base-port=",
+ "bind-port=",
+ "bind-addr=",
+ "remote-addr=",
+ ])
+ except getopt.GetoptError as err:
+ self.print_help("[!] " + str(err))
+ sys.exit(2)
+
+ for o, v in opts:
+ if o in ("-h", "--help"):
+ self.print_help()
+ sys.exit(2)
+
+ elif o in ("-r", "--remote-addr"):
+ self.remote_addr = v
+ elif o in ("-b", "--bind-addr"):
+ self.bind_addr = v
+ elif o in ("-p", "--base-port"):
+ self.base_port = int(v)
+ elif o in ("-P", "--bind-port"):
+ self.bind_port = int(v)
+ elif o in ("-f", "--fuzzing"):
+ self.fuzzing = True
+
+ def run(self):
+ while True:
+ self.print_prompt()
+
+ # Wait until we get any data on any socket
+ socks = [sys.stdin, self.ctrl_link.sock]
+ r_event, w_event, x_event = select.select(socks, [], [])
+
+ # Check for incoming CTRL commands
+ if sys.stdin in r_event:
+ cmd = sys.stdin.readline()
+ self.handle_cmd(cmd)
+
+ if self.ctrl_link.sock in r_event:
+ data, addr = self.ctrl_link.sock.recvfrom(128)
+ sys.stdout.write("\r%s\n" % data.decode())
+ sys.stdout.flush()
+
+ def handle_cmd(self, cmd):
+ # Strip spaces, tabs, etc.
+ cmd = cmd.strip().strip("\0")
+
+ # Send a command
+ if self.fuzzing:
+ self.ctrl_link.send("%s" % cmd)
+ else:
+ self.ctrl_link.send("CMD %s\0" % cmd)
+
+ def print_prompt(self):
+ sys.stdout.write("CTRL# ")
+ sys.stdout.flush()
+
+ def sig_handler(self, signum, frame):
+ print("\n\nSignal %d received" % signum)
+ if signum is signal.SIGINT:
+ sys.exit(0)
+
+if __name__ == '__main__':
+ app = Application()
+ app.run()
diff --git a/src/target/trx_toolkit/ctrl_if.py b/src/target/trx_toolkit/ctrl_if.py
new file mode 100644
index 00000000..1e569a60
--- /dev/null
+++ b/src/target/trx_toolkit/ctrl_if.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# CTRL interface implementation
+#
+# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from udp_link import UDPLink
+
+class CTRLInterface(UDPLink):
+ def handle_rx(self, data, remote):
+ if not self.verify_req(data):
+ print("[!] Wrong data on CTRL interface")
+ return
+
+ # Attempt to parse a command
+ request = self.prepare_req(data)
+ rc = self.parse_cmd(request)
+
+ if type(rc) is tuple:
+ self.send_response(request, remote, rc[0], rc[1])
+ else:
+ self.send_response(request, remote, rc)
+
+ def verify_req(self, data):
+ # Verify command signature
+ return data.startswith("CMD")
+
+ def prepare_req(self, data):
+ # Strip signature, paddings and \0
+ request = data[4:].strip().strip("\0")
+ # Split into a command and arguments
+ request = request.split(" ")
+ # Now we have something like ["TXTUNE", "941600"]
+ return request
+
+ def verify_cmd(self, request, cmd, argc):
+ # Check if requested command matches
+ if request[0] != cmd:
+ return False
+
+ # And has enough arguments
+ if len(request) - 1 != argc:
+ return False
+
+ return True
+
+ def send_response(self, request, remote, response_code, params = None):
+ # Include status code, for example ["TXTUNE", "0", "941600"]
+ request.insert(1, str(response_code))
+
+ # Optionally append command specific parameters
+ if params is not None:
+ request += params
+
+ # Add the response signature, and join back to string
+ response = "RSP " + " ".join(request) + "\0"
+ # Now we have something like "RSP TXTUNE 0 941600"
+ self.sendto(response, remote)
+
+ def parse_cmd(self, request):
+ raise NotImplementedError
diff --git a/src/target/trx_toolkit/ctrl_if_bb.py b/src/target/trx_toolkit/ctrl_if_bb.py
new file mode 100644
index 00000000..3de14ef7
--- /dev/null
+++ b/src/target/trx_toolkit/ctrl_if_bb.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# CTRL interface implementation (OsmocomBB specific)
+#
+# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from ctrl_if import CTRLInterface
+
+class CTRLInterfaceBB(CTRLInterface):
+ # Internal state variables
+ trx_started = False
+ burst_fwd = None
+ rx_freq = None
+ tx_freq = None
+ pm = None
+
+ def __init__(self, remote_addr, remote_port, bind_addr, bind_port):
+ CTRLInterface.__init__(self, remote_addr, remote_port, bind_addr, bind_port)
+ print("[i] Init CTRL interface for BB (%s)" % self.desc_link())
+
+ def parse_cmd(self, request):
+ # Power control
+ if self.verify_cmd(request, "POWERON", 0):
+ print("[i] Recv POWERON CMD")
+
+ # Ensure transceiver isn't working
+ if self.trx_started:
+ print("[!] Transceiver already started")
+ return -1
+
+ # Ensure RX / TX freq. are set
+ if (self.rx_freq is None) or (self.tx_freq is None):
+ print("[!] RX / TX freq. are not set")
+ return -1
+
+ print("[i] Starting transceiver...")
+ self.trx_started = True
+ return 0
+
+ elif self.verify_cmd(request, "POWEROFF", 0):
+ print("[i] Recv POWEROFF cmd")
+
+ print("[i] Stopping transceiver...")
+ self.trx_started = False
+ return 0
+
+ # Tuning Control
+ elif self.verify_cmd(request, "RXTUNE", 1):
+ print("[i] Recv RXTUNE cmd")
+
+ # TODO: check freq range
+ self.rx_freq = int(request[1]) * 1000
+ self.burst_fwd.bb_freq = self.rx_freq
+ return 0
+
+ elif self.verify_cmd(request, "TXTUNE", 1):
+ print("[i] Recv TXTUNE cmd")
+
+ # TODO: check freq range
+ self.tx_freq = int(request[1]) * 1000
+ return 0
+
+ # Power measurement
+ elif self.verify_cmd(request, "MEASURE", 1):
+ print("[i] Recv MEASURE cmd")
+
+ if self.pm is None:
+ return -1
+
+ # TODO: check freq range
+ meas_freq = int(request[1]) * 1000
+ meas_dbm = str(self.pm.measure(meas_freq))
+
+ return (0, [meas_dbm])
+
+ elif self.verify_cmd(request, "SETSLOT", 2):
+ print("[i] Recv SETSLOT cmd")
+
+ if self.burst_fwd is None:
+ return -1
+
+ # Obtain TS index
+ ts = int(request[1])
+ if ts not in range(0, 8):
+ print("[!] TS index should be in range: 0..7")
+ return -1
+
+ # Parse TS type
+ ts_type = int(request[2])
+
+ # TS activation / deactivation
+ # We don't care about ts_type
+ if ts_type == 0:
+ self.burst_fwd.ts_pass = None
+ else:
+ self.burst_fwd.ts_pass = ts
+
+ return 0
+
+ # Timing Advance
+ elif self.verify_cmd(request, "SETTA", 1):
+ print("[i] Recv SETTA cmd")
+
+ # Parse and check TA value
+ ta = int(request[1])
+ if ta < 0 or ta > 63:
+ print("[!] TA value should be in range: 0..63")
+ return -1
+
+ # Save to the BurstForwarder instance
+ self.burst_fwd.ta = ta
+ return 0
+
+ # Timing of Arrival simulation for Uplink
+ # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
+ elif self.verify_cmd(request, "FAKE_TOA", 2):
+ print("[i] Recv FAKE_TOA cmd")
+
+ # Parse and apply both base and threshold
+ self.burst_fwd.toa256_ul_base = int(request[1])
+ self.burst_fwd.toa256_ul_threshold = int(request[2])
+
+ return 0
+
+ # Timing of Arrival simulation for Uplink
+ # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
+ elif self.verify_cmd(request, "FAKE_TOA", 1):
+ print("[i] Recv FAKE_TOA cmd")
+
+ # Parse and apply delta
+ self.burst_fwd.toa256_ul_base += int(request[1])
+
+ return 0
+
+ # Wrong / unknown command
+ else:
+ # We don't care about other commands,
+ # so let's merely ignore them ;)
+ print("[i] Ignore CMD %s" % request[0])
+ return 0
diff --git a/src/target/trx_toolkit/ctrl_if_bts.py b/src/target/trx_toolkit/ctrl_if_bts.py
new file mode 100644
index 00000000..14886178
--- /dev/null
+++ b/src/target/trx_toolkit/ctrl_if_bts.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# CTRL interface implementation (OsmoBTS specific)
+#
+# (C) 2016-2017 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from ctrl_if import CTRLInterface
+
+class CTRLInterfaceBTS(CTRLInterface):
+ # Internal state variables
+ trx_started = False
+ burst_fwd = None
+ clck_gen = None
+ rx_freq = None
+ tx_freq = None
+ pm = None
+
+ def __init__(self, remote_addr, remote_port, bind_addr, bind_port):
+ CTRLInterface.__init__(self, remote_addr, remote_port, bind_addr, bind_port)
+ print("[i] Init CTRL interface for BTS (%s)" % self.desc_link())
+
+ def parse_cmd(self, request):
+ # Power control
+ if self.verify_cmd(request, "POWERON", 0):
+ print("[i] Recv POWERON CMD")
+
+ # Ensure transceiver isn't working
+ if self.trx_started:
+ print("[!] Transceiver already started")
+ return -1
+
+ # Ensure RX / TX freq. are set
+ if (self.rx_freq is None) or (self.tx_freq is None):
+ print("[!] RX / TX freq. are not set")
+ return -1
+
+ print("[i] Starting transceiver...")
+ self.trx_started = True
+
+ # Power emulation
+ if self.pm is not None:
+ self.pm.add_bts_list([self.tx_freq])
+
+ # Start clock indications
+ if self.clck_gen is not None:
+ self.clck_gen.start()
+
+ return 0
+
+ elif self.verify_cmd(request, "POWEROFF", 0):
+ print("[i] Recv POWEROFF cmd")
+
+ print("[i] Stopping transceiver...")
+ self.trx_started = False
+
+ # Power emulation
+ if self.pm is not None:
+ self.pm.del_bts_list([self.tx_freq])
+
+ # Stop clock indications
+ if self.clck_gen is not None:
+ self.clck_gen.stop()
+
+ return 0
+
+ # Tuning Control
+ elif self.verify_cmd(request, "RXTUNE", 1):
+ print("[i] Recv RXTUNE cmd")
+
+ # TODO: check freq range
+ self.rx_freq = int(request[1]) * 1000
+ return 0
+
+ elif self.verify_cmd(request, "TXTUNE", 1):
+ print("[i] Recv TXTUNE cmd")
+
+ # TODO: check freq range
+ self.tx_freq = int(request[1]) * 1000
+ self.burst_fwd.bts_freq = self.tx_freq
+ return 0
+
+ # Timing of Arrival simulation for Downlink
+ # Absolute form: CMD FAKE_TOA <BASE> <THRESH>
+ elif self.verify_cmd(request, "FAKE_TOA", 2):
+ print("[i] Recv FAKE_TOA cmd")
+
+ # Parse and apply both base and threshold
+ self.burst_fwd.toa256_dl_base = int(request[1])
+ self.burst_fwd.toa256_dl_threshold = int(request[2])
+
+ return 0
+
+ # Timing of Arrival simulation for Downlink
+ # Relative form: CMD FAKE_TOA <+-BASE_DELTA>
+ elif self.verify_cmd(request, "FAKE_TOA", 1):
+ print("[i] Recv FAKE_TOA cmd")
+
+ # Parse and apply delta
+ self.burst_fwd.toa256_dl_base += int(request[1])
+
+ return 0
+
+ # Wrong / unknown command
+ else:
+ # We don't care about other commands,
+ # so let's merely ignore them ;)
+ print("[i] Ignore CMD %s" % request[0])
+ return 0
diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py
new file mode 100644
index 00000000..1d7805e3
--- /dev/null
+++ b/src/target/trx_toolkit/data_dump.py
@@ -0,0 +1,360 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Helpers for DATA capture management
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import struct
+
+from data_msg import *
+
+class DATADump:
+ # Constants
+ TAG_L12TRX = b'\x01'
+ TAG_TRX2L1 = b'\x02'
+ HDR_LENGTH = 3
+
+ # Generates raw bytes from a DATA message
+ # Return value: raw message bytes
+ def dump_msg(self, msg):
+ # Determine a message type
+ if isinstance(msg, DATAMSG_L12TRX):
+ tag = self.TAG_L12TRX
+ elif isinstance(msg, DATAMSG_TRX2L1):
+ tag = self.TAG_TRX2L1
+ else:
+ raise ValueError("Unknown message type")
+
+ # Generate a message payload
+ msg_raw = msg.gen_msg()
+
+ # Calculate and pack the message length
+ msg_len = len(msg_raw)
+
+ # Pack to unsigned short (2 bytes, BE)
+ msg_len = struct.pack(">H", msg_len)
+
+ # Concatenate a message with header
+ return bytearray(tag + msg_len) + msg_raw
+
+ def parse_hdr(self, hdr):
+ # Extract the header info
+ msg_len = struct.unpack(">H", hdr[1:3])[0]
+ tag = hdr[:1]
+
+ # Check if tag is known
+ if tag == self.TAG_L12TRX:
+ # L1 -> TRX
+ msg = DATAMSG_L12TRX()
+ elif tag == self.TAG_TRX2L1:
+ # TRX -> L1
+ msg = DATAMSG_TRX2L1()
+ else:
+ # Unknown tag
+ return False
+
+ return (msg, msg_len)
+
+class DATADumpFile(DATADump):
+ def __init__(self, capture):
+ # Check if capture file is already opened
+ if isinstance(capture, str):
+ print("[i] Opening capture file '%s'..." % capture)
+ self.f = open(capture, "a+b")
+ else:
+ self.f = capture
+
+ def __del__(self):
+ print("[i] Closing the capture file")
+ self.f.close()
+
+ # Moves the file descriptor before a specified message
+ # Return value:
+ # True in case of success,
+ # or False in case of EOF or header parsing error.
+ def _seek2msg(self, idx):
+ # Seek to the begining of the capture
+ self.f.seek(0)
+
+ # Read the capture in loop...
+ for i in range(idx):
+ # Attempt to read a message header
+ hdr_raw = self.f.read(self.HDR_LENGTH)
+ if len(hdr_raw) != self.HDR_LENGTH:
+ return False
+
+ # Attempt to parse it
+ rc = self.parse_hdr(hdr_raw)
+ if rc is False:
+ print("[!] Couldn't parse a message header")
+ return False
+
+ # Expand the header
+ (_, msg_len) = rc
+
+ # Skip a message
+ self.f.seek(msg_len, 1)
+
+ return True
+
+ # Parses a single message at the current descriptor position
+ # Return value:
+ # a parsed message in case of success,
+ # or None in case of EOF or header parsing error,
+ # or False in case of message parsing error.
+ def _parse_msg(self):
+ # Attempt to read a message header
+ hdr_raw = self.f.read(self.HDR_LENGTH)
+ if len(hdr_raw) != self.HDR_LENGTH:
+ return None
+
+ # Attempt to parse it
+ rc = self.parse_hdr(hdr_raw)
+ if rc is False:
+ print("[!] Couldn't parse a message header")
+ return None
+
+ # Expand the header
+ (msg, msg_len) = rc
+
+ # Attempt to read a message
+ msg_raw = self.f.read(msg_len)
+ if len(msg_raw) != msg_len:
+ print("[!] Message length mismatch")
+ return None
+
+ # Attempt to parse a message
+ try:
+ msg_raw = bytearray(msg_raw)
+ msg.parse_msg(msg_raw)
+ except:
+ print("[!] Couldn't parse a message, skipping...")
+ return False
+
+ # Success
+ return msg
+
+ # Parses a particular message defined by index idx
+ # Return value:
+ # a parsed message in case of success,
+ # or None in case of EOF or header parsing error,
+ # or False in case of message parsing error or out of range.
+ def parse_msg(self, idx):
+ # Move descriptor to the begining of requested message
+ rc = self._seek2msg(idx)
+ if not rc:
+ print("[!] Couldn't find requested message")
+ return False
+
+ # Attempt to parse a message
+ return self._parse_msg()
+
+ # Parses all messages from a given file
+ # Return value:
+ # list of parsed messages,
+ # or False in case of range error.
+ def parse_all(self, skip = None, count = None):
+ result = []
+
+ # Should we skip some messages?
+ if skip is None:
+ # Seek to the begining of the capture
+ self.f.seek(0)
+ else:
+ rc = self._seek2msg(skip)
+ if not rc:
+ print("[!] Couldn't find requested message")
+ return False
+
+ # Read the capture in loop...
+ while True:
+ # Attempt to parse a message
+ msg = self._parse_msg()
+
+ # EOF or broken header
+ if msg is None:
+ break
+
+ # Skip unparsed messages
+ if msg is False:
+ continue
+
+ # Success, append a message
+ result.append(msg)
+
+ # Count limitation
+ if count is not None:
+ if len(result) == count:
+ break
+
+ return result
+
+ # Writes a new message at the end of the capture
+ def append_msg(self, msg):
+ # Generate raw bytes and write
+ msg_raw = self.dump_msg(msg)
+ self.f.write(msg_raw)
+
+ # Writes a list of messages at the end of the capture
+ def append_all(self, msgs):
+ for msg in msgs:
+ self.append_msg(msg)
+
+# Regression tests
+if __name__ == '__main__':
+ from tempfile import TemporaryFile
+ from gsm_shared import *
+ import random
+
+ # Create a temporary file
+ tf = TemporaryFile()
+
+ # Create an instance of DATA dump manager
+ ddf = DATADumpFile(tf)
+
+ # Generate two random bursts
+ burst_l12trx = []
+ burst_trx2l1 = []
+
+ for i in range(0, GSM_BURST_LEN):
+ ubit = random.randint(0, 1)
+ burst_l12trx.append(ubit)
+
+ sbit = random.randint(-127, 127)
+ burst_trx2l1.append(sbit)
+
+ # Generate a basic list of random messages
+ print("[i] Generating the reference messages")
+ messages_ref = []
+
+ for i in range(100):
+ # Create a message
+ if i % 2:
+ msg = DATAMSG_L12TRX()
+ msg.burst = burst_l12trx
+ else:
+ msg = DATAMSG_TRX2L1()
+ msg.burst = burst_trx2l1
+
+ # Randomize the header
+ msg.rand_hdr()
+
+ # Append
+ messages_ref.append(msg)
+
+ print("[i] Adding the following messages to the capture:")
+ for msg in messages_ref[:3]:
+ print(" %s: burst_len=%d"
+ % (msg.desc_hdr(), len(msg.burst)))
+
+ # Check single message appending
+ ddf.append_msg(messages_ref[0])
+ ddf.append_msg(messages_ref[1])
+ ddf.append_msg(messages_ref[2])
+
+ # Read the written messages back
+ messages_check = ddf.parse_all()
+
+ print("[i] Read the following messages back:")
+ for msg in messages_check:
+ print(" %s: burst_len=%d"
+ % (msg.desc_hdr(), len(msg.burst)))
+
+ # Expecting three messages
+ assert(len(messages_check) == 3)
+
+ # Check the messages
+ for i in range(3):
+ # Compare common header parts and bursts
+ assert(messages_check[i].burst == messages_ref[i].burst)
+ assert(messages_check[i].fn == messages_ref[i].fn)
+ assert(messages_check[i].tn == messages_ref[i].tn)
+
+ # Validate a message
+ assert(messages_check[i].validate())
+
+ print("[?] Check append_msg(): OK")
+
+
+ # Append the pending reference messages
+ ddf.append_all(messages_ref[3:])
+
+ # Read the written messages back
+ messages_check = ddf.parse_all()
+
+ # Check the final amount
+ assert(len(messages_check) == len(messages_ref))
+
+ # Check the messages
+ for i in range(len(messages_check)):
+ # Compare common header parts and bursts
+ assert(messages_check[i].burst == messages_ref[i].burst)
+ assert(messages_check[i].fn == messages_ref[i].fn)
+ assert(messages_check[i].tn == messages_ref[i].tn)
+
+ # Validate a message
+ assert(messages_check[i].validate())
+
+ print("[?] Check append_all(): OK")
+
+
+ # Check parse_msg()
+ msg0 = ddf.parse_msg(0)
+ msg10 = ddf.parse_msg(10)
+
+ # Make sure parsing was successful
+ assert(msg0 and msg10)
+
+ # Compare common header parts and bursts
+ assert(msg0.burst == messages_ref[0].burst)
+ assert(msg0.fn == messages_ref[0].fn)
+ assert(msg0.tn == messages_ref[0].tn)
+
+ assert(msg10.burst == messages_ref[10].burst)
+ assert(msg10.fn == messages_ref[10].fn)
+ assert(msg10.tn == messages_ref[10].tn)
+
+ # Validate both messages
+ assert(msg0.validate())
+ assert(msg10.validate())
+
+ print("[?] Check parse_msg(): OK")
+
+
+ # Check parse_all() with range
+ messages_check = ddf.parse_all(skip = 10, count = 20)
+
+ # Make sure parsing was successful
+ assert(messages_check)
+
+ # Check the amount
+ assert(len(messages_check) == 20)
+
+ for i in range(20):
+ # Compare common header parts and bursts
+ assert(messages_check[i].burst == messages_ref[i + 10].burst)
+ assert(messages_check[i].fn == messages_ref[i + 10].fn)
+ assert(messages_check[i].tn == messages_ref[i + 10].tn)
+
+ # Validate a message
+ assert(messages_check[i].validate())
+
+ print("[?] Check parse_all(): OK")
diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py
new file mode 100644
index 00000000..f4431a46
--- /dev/null
+++ b/src/target/trx_toolkit/data_if.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# DATA interface implementation
+#
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from udp_link import UDPLink
+from data_msg import *
+
+class DATAInterface(UDPLink):
+
+ def send_msg(self, msg):
+ # Validate a message
+ if not msg.validate():
+ raise ValueError("Message incomplete or incorrect")
+
+ # Generate TRX message
+ payload = msg.gen_msg()
+
+ # Send message
+ self.send(payload)
diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py
new file mode 100644
index 00000000..ea415ab9
--- /dev/null
+++ b/src/target/trx_toolkit/data_msg.py
@@ -0,0 +1,545 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# DATA interface message definitions and helpers
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import random
+import struct
+
+from gsm_shared import *
+
+class DATAMSG:
+ # Common message fields
+ burst = None
+ fn = None
+ tn = None
+
+ # Common constructor
+ def __init__(self, fn = None, tn = None, burst = None):
+ self.burst = burst
+ self.fn = fn
+ self.tn = tn
+
+ # Generates message specific header
+ def gen_hdr(self):
+ raise NotImplementedError
+
+ # Parses message specific header
+ def parse_hdr(self, hdr):
+ raise NotImplementedError
+
+ # Generates message specific burst
+ def gen_burst(self):
+ raise NotImplementedError
+
+ # Parses message specific burst
+ def parse_burst(self, burst):
+ raise NotImplementedError
+
+ # Generates a random frame number
+ def rand_fn(self):
+ return random.randint(0, GSM_HYPERFRAME)
+
+ # Generates a random timeslot number
+ def rand_tn(self):
+ return random.randint(0, 7)
+
+ # Randomizes the message header
+ def rand_hdr(self):
+ self.fn = self.rand_fn()
+ self.tn = self.rand_tn()
+
+ # Generates human-readable header description
+ def desc_hdr(self):
+ result = ""
+
+ if self.fn is not None:
+ result += ("fn=%u " % self.fn)
+
+ if self.tn is not None:
+ result += ("tn=%u " % self.tn)
+
+ return result
+
+ # Converts unsigned soft-bits {254..0} to soft-bits {-127..127}
+ def usbit2sbit(self, bits):
+ buf = []
+
+ for bit in bits:
+ if bit == 0xff:
+ buf.append(-127)
+ else:
+ buf.append(127 - bit)
+
+ return buf
+
+ # Converts soft-bits {-127..127} to unsigned soft-bits {254..0}
+ def sbit2usbit(self, bits):
+ buf = []
+
+ for bit in bits:
+ buf.append(127 - bit)
+
+ return buf
+
+ # Converts soft-bits {-127..127} to bits {1..0}
+ def sbit2ubit(self, bits):
+ buf = []
+
+ for bit in bits:
+ buf.append(1 if bit < 0 else 0)
+
+ return buf
+
+ # Converts bits {1..0} to soft-bits {-127..127}
+ def ubit2sbit(self, bits):
+ buf = []
+
+ for bit in bits:
+ buf.append(-127 if bit else 127)
+
+ return buf
+
+ # Validates the message fields
+ def validate(self):
+ if self.burst is None:
+ return False
+
+ if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
+ return False
+
+ if self.fn is None:
+ return False
+
+ if self.fn < 0 or self.fn > GSM_HYPERFRAME:
+ return False
+
+ if self.tn is None:
+ return False
+
+ if self.tn < 0 or self.tn > 7:
+ return False
+
+ return True
+
+ # Generates frame number to bytes
+ def gen_fn(self, fn):
+ # Allocate an empty byte-array
+ buf = bytearray()
+
+ # Big endian, 4 bytes
+ buf.append((fn >> 24) & 0xff)
+ buf.append((fn >> 16) & 0xff)
+ buf.append((fn >> 8) & 0xff)
+ buf.append((fn >> 0) & 0xff)
+
+ return buf
+
+ # Parses frame number from bytes
+ def parse_fn(self, buf):
+ # Big endian, 4 bytes
+ return (buf[0] << 24) \
+ | (buf[1] << 16) \
+ | (buf[2] << 8) \
+ | (buf[3] << 0)
+
+ # Generates a TRX DATA message
+ def gen_msg(self):
+ # Validate all the fields
+ if not self.validate():
+ raise ValueError("Message incomplete or incorrect")
+
+ # Allocate an empty byte-array
+ buf = bytearray()
+
+ # Put timeslot index
+ buf.append(self.tn)
+
+ # Put frame number
+ fn = self.gen_fn(self.fn)
+ buf += fn
+
+ # Generate message specific header part
+ hdr = self.gen_hdr()
+ buf += hdr
+
+ # Generate burst
+ buf += self.gen_burst()
+
+ return buf
+
+ # Parses a TRX DATA message
+ def parse_msg(self, msg):
+ # Calculate message length
+ length = len(msg)
+
+ # Check length
+ if length < (self.HDR_LEN + GSM_BURST_LEN):
+ raise ValueError("Message is to short")
+
+ # Parse both fn and tn
+ self.fn = self.parse_fn(msg[1:])
+ self.tn = msg[0]
+
+ # Specific message part
+ self.parse_hdr(msg)
+
+ # Copy burst, skipping header
+ msg_burst = msg[self.HDR_LEN:]
+ self.parse_burst(msg_burst)
+
+class DATAMSG_L12TRX(DATAMSG):
+ # Constants
+ HDR_LEN = 6
+ PWR_MIN = 0x00
+ PWR_MAX = 0xff
+
+ # Specific message fields
+ pwr = None
+
+ # Validates the message fields
+ def validate(self):
+ # Validate common fields
+ if not DATAMSG.validate(self):
+ return False
+
+ if self.pwr is None:
+ return False
+
+ if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX:
+ return False
+
+ return True
+
+ # Generates a random power level
+ def rand_pwr(self, min = None, max = None):
+ if min is None:
+ min = self.PWR_MIN
+
+ if max is None:
+ max = self.PWR_MAX
+
+ return random.randint(min, max)
+
+ # Randomizes message specific header
+ def rand_hdr(self):
+ DATAMSG.rand_hdr(self)
+ self.pwr = self.rand_pwr()
+
+ # Generates human-readable header description
+ def desc_hdr(self):
+ # Describe the common part
+ result = DATAMSG.desc_hdr(self)
+
+ if self.pwr is not None:
+ result += ("pwr=%u " % self.pwr)
+
+ # Strip useless whitespace and return
+ return result.strip()
+
+ # Generates message specific header part
+ def gen_hdr(self):
+ # Allocate an empty byte-array
+ buf = bytearray()
+
+ # Put power
+ buf.append(self.pwr)
+
+ return buf
+
+ # Parses message specific header part
+ def parse_hdr(self, hdr):
+ # Parse power level
+ self.pwr = hdr[5]
+
+ # Generates message specific burst
+ def gen_burst(self):
+ # Copy burst 'as is'
+ return bytearray(self.burst)
+
+ # Parses message specific burst
+ def parse_burst(self, burst):
+ length = len(burst)
+
+ # Distinguish between GSM and EDGE
+ if length >= EDGE_BURST_LEN:
+ self.burst = list(burst[:EDGE_BURST_LEN])
+ else:
+ self.burst = list(burst[:GSM_BURST_LEN])
+
+ # Transforms this message to TRX2L1 message
+ def gen_trx2l1(self):
+ # Allocate a new message
+ msg = DATAMSG_TRX2L1(fn = self.fn, tn = self.tn)
+
+ # Convert burst bits
+ if self.burst is not None:
+ msg.burst = self.ubit2sbit(self.burst)
+
+ return msg
+
+class DATAMSG_TRX2L1(DATAMSG):
+ # Constants
+ HDR_LEN = 8
+ RSSI_MIN = -120
+ RSSI_MAX = -50
+
+ # TODO: verify this range
+ TOA256_MIN = -256 * 200
+ TOA256_MAX = 256 * 200
+
+ # Specific message fields
+ rssi = None
+ toa256 = None
+
+ # Validates the message fields
+ def validate(self):
+ # Validate common fields
+ if not DATAMSG.validate(self):
+ return False
+
+ if self.rssi is None:
+ return False
+
+ if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX:
+ return False
+
+ if self.toa256 is None:
+ return False
+
+ if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:
+ return False
+
+ return True
+
+ # Generates a random RSSI value
+ def rand_rssi(self, min = None, max = None):
+ if min is None:
+ min = self.RSSI_MIN
+
+ if max is None:
+ max = self.RSSI_MAX
+
+ return random.randint(min, max)
+
+ # Generates a ToA (Time of Arrival) value
+ def rand_toa256(self, min = None, max = None):
+ if min is None:
+ min = self.TOA256_MIN
+
+ if max is None:
+ max = self.TOA256_MAX
+
+ return random.randint(min, max)
+
+ # Randomizes message specific header
+ def rand_hdr(self):
+ DATAMSG.rand_hdr(self)
+ self.rssi = self.rand_rssi()
+ self.toa256 = self.rand_toa256()
+
+ # Generates human-readable header description
+ def desc_hdr(self):
+ # Describe the common part
+ result = DATAMSG.desc_hdr(self)
+
+ if self.rssi is not None:
+ result += ("rssi=%d " % self.rssi)
+
+ if self.toa256 is not None:
+ result += ("toa256=%d " % self.toa256)
+
+ # Strip useless whitespace and return
+ return result.strip()
+
+ # Generates message specific header part
+ def gen_hdr(self):
+ # Allocate an empty byte-array
+ buf = bytearray()
+
+ # Put RSSI
+ buf.append(-self.rssi)
+
+ # Encode ToA (Time of Arrival)
+ # Big endian, 2 bytes (int32_t)
+ buf.append((self.toa256 >> 8) & 0xff)
+ buf.append(self.toa256 & 0xff)
+
+ return buf
+
+ # Parses message specific header part
+ def parse_hdr(self, hdr):
+ # Parse RSSI
+ self.rssi = -(hdr[5])
+
+ # Parse ToA (Time of Arrival)
+ self.toa256 = struct.unpack(">h", hdr[6:8])[0]
+
+ # Generates message specific burst
+ def gen_burst(self):
+ # Convert soft-bits to unsigned soft-bits
+ burst_usbits = self.sbit2usbit(self.burst)
+
+ # Encode to bytes
+ return bytearray(burst_usbits)
+
+ # Parses message specific burst
+ def parse_burst(self, burst):
+ length = len(burst)
+
+ # Distinguish between GSM and EDGE
+ if length >= EDGE_BURST_LEN:
+ burst_usbits = list(burst[:EDGE_BURST_LEN])
+ else:
+ burst_usbits = list(burst[:GSM_BURST_LEN])
+
+ # Convert unsigned soft-bits to soft-bits
+ burst_sbits = self.usbit2sbit(burst_usbits)
+
+ # Save
+ self.burst = burst_sbits
+
+ # Transforms this message to L12TRX message
+ def gen_l12trx(self):
+ # Allocate a new message
+ msg = DATAMSG_L12TRX(fn = self.fn, tn = self.tn)
+
+ # Convert burst bits
+ if self.burst is not None:
+ msg.burst = self.sbit2ubit(self.burst)
+
+ return msg
+
+# Regression test
+if __name__ == '__main__':
+ # Common reference data
+ fn = 1024
+ tn = 0
+
+ # Generate two random bursts
+ burst_l12trx_ref = []
+ burst_trx2l1_ref = []
+
+ for i in range(0, GSM_BURST_LEN):
+ ubit = random.randint(0, 1)
+ burst_l12trx_ref.append(ubit)
+
+ sbit = random.randint(-127, 127)
+ burst_trx2l1_ref.append(sbit)
+
+ print("[i] Generating the reference messages")
+
+ # Create messages of both types
+ msg_l12trx_ref = DATAMSG_L12TRX(fn = fn, tn = tn)
+ msg_trx2l1_ref = DATAMSG_TRX2L1(fn = fn, tn = tn)
+
+ # Fill in message specific fields
+ msg_trx2l1_ref.rssi = -88
+ msg_l12trx_ref.pwr = 0x33
+ msg_trx2l1_ref.toa256 = -256
+
+ # Specify the reference bursts
+ msg_l12trx_ref.burst = burst_l12trx_ref
+ msg_trx2l1_ref.burst = burst_trx2l1_ref
+
+ print("[i] Encoding the reference messages")
+
+ # Encode DATA messages
+ l12trx_raw = msg_l12trx_ref.gen_msg()
+ trx2l1_raw = msg_trx2l1_ref.gen_msg()
+
+ print("[i] Parsing generated messages back")
+
+ # Parse generated DATA messages
+ msg_l12trx_dec = DATAMSG_L12TRX()
+ msg_trx2l1_dec = DATAMSG_TRX2L1()
+ msg_l12trx_dec.parse_msg(l12trx_raw)
+ msg_trx2l1_dec.parse_msg(trx2l1_raw)
+
+ print("[i] Comparing decoded messages with the reference")
+
+ # Compare bursts
+ assert(msg_l12trx_dec.burst == burst_l12trx_ref)
+ assert(msg_trx2l1_dec.burst == burst_trx2l1_ref)
+
+ print("[?] Compare bursts: OK")
+
+ # Compare both parsed messages with the reference data
+ assert(msg_l12trx_dec.fn == fn)
+ assert(msg_trx2l1_dec.fn == fn)
+ assert(msg_l12trx_dec.tn == tn)
+ assert(msg_trx2l1_dec.tn == tn)
+
+ print("[?] Compare FN / TN: OK")
+
+ # Compare message specific parts
+ assert(msg_trx2l1_dec.rssi == msg_trx2l1_ref.rssi)
+ assert(msg_l12trx_dec.pwr == msg_l12trx_ref.pwr)
+ assert(msg_trx2l1_dec.toa256 == msg_trx2l1_ref.toa256)
+
+ print("[?] Compare message specific data: OK")
+
+ # Validate header randomization
+ for i in range(0, 100):
+ msg_l12trx_ref.rand_hdr()
+ msg_trx2l1_ref.rand_hdr()
+
+ assert(msg_l12trx_ref.validate())
+ assert(msg_trx2l1_ref.validate())
+
+ print("[?] Validate header randomization: OK")
+
+ # Bit conversation test
+ usbits_ref = list(range(0, 256))
+ sbits_ref = list(range(-127, 128))
+
+ # Test both usbit2sbit() and sbit2usbit()
+ sbits = msg_trx2l1_ref.usbit2sbit(usbits_ref)
+ usbits = msg_trx2l1_ref.sbit2usbit(sbits)
+ assert(usbits[:255] == usbits_ref[:255])
+ assert(usbits[255] == 254)
+
+ print("[?] Check both usbit2sbit() and sbit2usbit(): OK")
+
+ # Test both sbit2ubit() and ubit2sbit()
+ ubits = msg_trx2l1_ref.sbit2ubit(sbits_ref)
+ assert(ubits == ([1] * 127 + [0] * 128))
+
+ sbits = msg_trx2l1_ref.ubit2sbit(ubits)
+ assert(sbits == ([-127] * 127 + [127] * 128))
+
+ print("[?] Check both sbit2ubit() and ubit2sbit(): OK")
+
+ # Test message transformation
+ msg_l12trx_dec = msg_trx2l1_ref.gen_l12trx()
+ msg_trx2l1_dec = msg_l12trx_ref.gen_trx2l1()
+
+ assert(msg_l12trx_dec.fn == msg_trx2l1_ref.fn)
+ assert(msg_l12trx_dec.tn == msg_trx2l1_ref.tn)
+
+ assert(msg_trx2l1_dec.fn == msg_l12trx_ref.fn)
+ assert(msg_trx2l1_dec.tn == msg_l12trx_ref.tn)
+
+ assert(msg_l12trx_dec.burst == msg_l12trx_dec.sbit2ubit(burst_trx2l1_ref))
+ assert(msg_trx2l1_dec.burst == msg_trx2l1_dec.ubit2sbit(burst_l12trx_ref))
+
+ print("[?] Check L12TRX <-> TRX2L1 type transformations: OK")
diff --git a/src/target/trx_toolkit/fake_pm.py b/src/target/trx_toolkit/fake_pm.py
new file mode 100644
index 00000000..840b4e40
--- /dev/null
+++ b/src/target/trx_toolkit/fake_pm.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Power measurement emulation for BB
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from random import randint
+
+class FakePM:
+ # Freq. list for good power level
+ bts_list = []
+
+ def __init__(self, noise_min, noise_max, bts_min, bts_max):
+ # Save power level ranges
+ self.noise_min = noise_min
+ self.noise_max = noise_max
+ self.bts_min = bts_min
+ self.bts_max = bts_max
+
+ def measure(self, bts):
+ if bts in self.bts_list:
+ return randint(self.bts_min, self.bts_max)
+ else:
+ return randint(self.noise_min, self.noise_max)
+
+ def update_bts_list(self, new_list):
+ self.bts_list = new_list
+
+ def add_bts_list(self, add_list):
+ self.bts_list += add_list
+
+ def del_bts_list(self, del_list):
+ for item in del_list:
+ if item in self.bts_list:
+ self.bts_list.remove(item)
diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py
new file mode 100755
index 00000000..b818b2a9
--- /dev/null
+++ b/src/target/trx_toolkit/fake_trx.py
@@ -0,0 +1,236 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Virtual Um-interface (fake transceiver)
+#
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from copyright import print_copyright
+CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
+
+import signal
+import getopt
+import select
+import sys
+
+from ctrl_if_bts import CTRLInterfaceBTS
+from ctrl_if_bb import CTRLInterfaceBB
+from burst_fwd import BurstForwarder
+from fake_pm import FakePM
+
+from udp_link import UDPLink
+from clck_gen import CLCKGen
+
+class Application:
+ # Application variables
+ bts_addr = "127.0.0.1"
+ bb_addr = "127.0.0.1"
+ trx_bind_addr = "0.0.0.0"
+ bts_base_port = 5700
+ bb_base_port = 6700
+
+ # BurstForwarder field randomization
+ randomize_dl_toa256 = False
+ randomize_ul_toa256 = False
+ randomize_dl_rssi = False
+ randomize_ul_rssi = False
+
+ def __init__(self):
+ print_copyright(CR_HOLDERS)
+ self.parse_argv()
+
+ # Set up signal handlers
+ signal.signal(signal.SIGINT, self.sig_handler)
+
+ def run(self):
+ # Init TRX CTRL interface for BTS
+ self.bts_ctrl = CTRLInterfaceBTS(self.bts_addr, self.bts_base_port + 101,
+ self.trx_bind_addr, self.bts_base_port + 1)
+
+ # Init TRX CTRL interface for BB
+ self.bb_ctrl = CTRLInterfaceBB(self.bb_addr, self.bb_base_port + 101,
+ self.trx_bind_addr, self.bb_base_port + 1)
+
+ # Power measurement emulation
+ # Noise: -120 .. -105
+ # BTS: -75 .. -50
+ self.pm = FakePM(-120, -105, -75, -50)
+
+ # Share a FakePM instance between both BTS and BB
+ self.bts_ctrl.pm = self.pm
+ self.bb_ctrl.pm = self.pm
+
+ # Init DATA links
+ self.bts_data = UDPLink(self.bts_addr, self.bts_base_port + 102,
+ self.trx_bind_addr, self.bts_base_port + 2)
+ self.bb_data = UDPLink(self.bb_addr, self.bb_base_port + 102,
+ self.trx_bind_addr, self.bb_base_port + 2)
+
+ # BTS <-> BB burst forwarding
+ self.burst_fwd = BurstForwarder(self.bts_data, self.bb_data)
+ self.burst_fwd.randomize_dl_toa256 = self.randomize_dl_toa256
+ self.burst_fwd.randomize_ul_toa256 = self.randomize_ul_toa256
+ self.burst_fwd.randomize_dl_rssi = self.randomize_dl_rssi
+ self.burst_fwd.randomize_ul_rssi = self.randomize_ul_rssi
+
+ # Share a BurstForwarder instance between BTS and BB
+ self.bts_ctrl.burst_fwd = self.burst_fwd
+ self.bb_ctrl.burst_fwd = self.burst_fwd
+
+ # Provide clock to BTS
+ self.bts_clck = UDPLink(self.bts_addr, self.bts_base_port + 100,
+ self.trx_bind_addr, self.bts_base_port)
+ self.clck_gen = CLCKGen([self.bts_clck])
+ self.bts_ctrl.clck_gen = self.clck_gen
+
+ print("[i] Init complete")
+
+ # Enter main loop
+ while True:
+ socks = [self.bts_ctrl.sock, self.bb_ctrl.sock,
+ self.bts_data.sock, self.bb_data.sock]
+
+ # Wait until we get any data on any socket
+ r_event, w_event, x_event = select.select(socks, [], [])
+
+ # Downlink: BTS -> BB
+ if self.bts_data.sock in r_event:
+ self.burst_fwd.bts2bb()
+
+ # Uplink: BB -> BTS
+ if self.bb_data.sock in r_event:
+ self.burst_fwd.bb2bts()
+
+ # CTRL commands from BTS
+ if self.bts_ctrl.sock in r_event:
+ data, addr = self.bts_ctrl.sock.recvfrom(128)
+ self.bts_ctrl.handle_rx(data.decode(), addr)
+
+ # CTRL commands from BB
+ if self.bb_ctrl.sock in r_event:
+ data, addr = self.bb_ctrl.sock.recvfrom(128)
+ self.bb_ctrl.handle_rx(data.decode(), addr)
+
+ def shutdown(self):
+ print("[i] Shutting down...")
+
+ # Stop clock generator
+ self.clck_gen.stop()
+
+ def print_help(self, msg = None):
+ s = " Usage: " + sys.argv[0] + " [options]\n\n" \
+ " Some help...\n" \
+ " -h --help this text\n\n"
+
+ s += " TRX interface specific\n" \
+ " -R --bts-addr Set BTS remote address (default %s)\n" \
+ " -r --bb-addr Set BB remote address (default %s)\n" \
+ " -P --bts-base-port Set BTS base port number (default %d)\n" \
+ " -p --bb-base-port Set BB base port number (default %d)\n" \
+ " -b --trx-bind-addr Set TRX bind address (default %s)\n\n"
+
+ s += " Simulation\n" \
+ " --rand-dl-rssi Enable DL RSSI randomization\n" \
+ " --rand-ul-rssi Enable UL RSSI randomization\n" \
+ " --rand-dl-toa Enable DL ToA randomization\n" \
+ " --rand-ul-toa Enable UL ToA randomization\n"
+
+ print(s % (self.bts_addr, self.bb_addr,
+ self.bts_base_port, self.bb_base_port,
+ self.trx_bind_addr))
+
+ if msg is not None:
+ print(msg)
+
+ def parse_argv(self):
+ try:
+ opts, args = getopt.getopt(sys.argv[1:],
+ "R:r:P:p:b:h",
+ [
+ "help",
+ "bts-addr=", "bb-addr=",
+ "bts-base-port=", "bb-base-port=",
+ "trx-bind-addr=",
+ "rand-dl-rssi", "rand-ul-rssi",
+ "rand-dl-toa", "rand-ul-toa",
+ ])
+ except getopt.GetoptError as err:
+ self.print_help("[!] " + str(err))
+ sys.exit(2)
+
+ for o, v in opts:
+ if o in ("-h", "--help"):
+ self.print_help()
+ sys.exit(2)
+
+ elif o in ("-R", "--bts-addr"):
+ self.bts_addr = v
+ elif o in ("-r", "--bb-addr"):
+ self.bb_addr = v
+
+ elif o in ("-P", "--bts-base-port"):
+ self.bts_base_port = int(v)
+ elif o in ("-p", "--bb-base-port"):
+ self.bb_base_port = int(v)
+
+ elif o in ("-b", "--trx-bind-addr"):
+ self.trx_bind_addr = v
+
+ # Message field randomization
+ elif o == "rand-dl-rssi":
+ self.randomize_dl_rssi = True
+ elif o == "rand-ul-rssi":
+ self.randomize_ul_rssi = True
+ elif o == "rand-dl-toa":
+ self.randomize_dl_toa256 = True
+ elif o == "rand-ul-toa":
+ self.randomize_ul_toa256 = True
+
+ # Ensure there is no overlap between ports
+ if self.bts_base_port == self.bb_base_port:
+ self.print_help("[!] BTS and BB base ports should be different")
+ sys.exit(2)
+
+ bts_ports = [
+ self.bts_base_port + 0, self.bts_base_port + 100,
+ self.bts_base_port + 1, self.bts_base_port + 101,
+ self.bts_base_port + 2, self.bts_base_port + 102,
+ ]
+
+ bb_ports = [
+ self.bb_base_port + 0, self.bb_base_port + 100,
+ self.bb_base_port + 1, self.bb_base_port + 101,
+ self.bb_base_port + 2, self.bb_base_port + 102,
+ ]
+
+ for p in bb_ports:
+ if p in bts_ports:
+ self.print_help("[!] BTS and BB ports overlap detected")
+ sys.exit(2)
+
+ def sig_handler(self, signum, frame):
+ print("Signal %d received" % signum)
+ if signum is signal.SIGINT:
+ self.shutdown()
+ sys.exit(0)
+
+if __name__ == '__main__':
+ app = Application()
+ app.run()
diff --git a/src/target/trx_toolkit/gsm_shared.py b/src/target/trx_toolkit/gsm_shared.py
new file mode 100644
index 00000000..d2f8278b
--- /dev/null
+++ b/src/target/trx_toolkit/gsm_shared.py
@@ -0,0 +1,31 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Common GSM constants
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+# TDMA definitions
+GSM_SUPERFRAME = 26 * 51
+GSM_HYPERFRAME = 2048 * GSM_SUPERFRAME
+
+# Burst length
+GSM_BURST_LEN = 148
+EDGE_BURST_LEN = GSM_BURST_LEN * 3
diff --git a/src/target/trx_toolkit/rand_burst_gen.py b/src/target/trx_toolkit/rand_burst_gen.py
new file mode 100644
index 00000000..46c1e090
--- /dev/null
+++ b/src/target/trx_toolkit/rand_burst_gen.py
@@ -0,0 +1,177 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Random burst (NB, FB, SB, AB) generator
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import random
+
+from gsm_shared import *
+
+class RandBurstGen:
+
+ # GSM 05.02 Chapter 5.2.3 Normal Burst
+ nb_tsc_list = [
+ [
+ 0, 0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0,
+ 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 1,
+ ],
+ [
+ 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1,
+ 1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1,
+ ],
+ [
+ 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1,
+ 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0,
+ ],
+ [
+ 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0,
+ 1, 0, 0, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0,
+ ],
+ [
+ 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0, 0,
+ 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1,
+ ],
+ [
+ 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0,
+ 0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0,
+ ],
+ [
+ 1, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1,
+ 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 1, 1,
+ ],
+ [
+ 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 1, 0,
+ 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0,
+ ],
+ ]
+
+ # GSM 05.02 Chapter 5.2.5 SCH training sequence
+ sb_tsc = [
+ 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 1, 0,
+ 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1,
+ 0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1,
+ 0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1,
+ ]
+
+ # GSM 05.02 Chapter 5.2.6 Dummy Burst
+ db_bits = [
+ 0, 0, 0,
+ 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0,
+ 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0,
+ 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0,
+ 0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0,
+ 0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0,
+ 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0,
+ 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1,
+ 1, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1,
+ 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0,
+ 0, 0, 0,
+ ]
+
+ # GSM 05.02 Chapter 5.2.7 Access burst
+ ab_tsc = [
+ 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1,
+ 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0,
+ 1, 0, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0,
+ ]
+
+ # Generate a normal burst
+ def gen_nb(self, seq_idx = 0):
+ buf = []
+
+ # Tailing bits
+ buf += [0] * 3
+
+ # Random data 1 / 2
+ for i in range(0, 57):
+ buf.append(random.randint(0, 1))
+
+ # Steal flag 1 / 2
+ buf.append(random.randint(0, 1))
+
+ # Training sequence
+ buf += self.nb_tsc_list[seq_idx]
+
+ # Steal flag 2 / 2
+ buf.append(random.randint(0, 1))
+
+ # Random data 2 / 2
+ for i in range(0, 57):
+ buf.append(random.randint(0, 1))
+
+ # Tailing bits
+ buf += [0] * 3
+
+ return buf
+
+ # Generate a frequency correction burst
+ def gen_fb(self):
+ return [0] * GSM_BURST_LEN
+
+ # Generate a synchronization burst
+ def gen_sb(self):
+ buf = []
+
+ # Tailing bits
+ buf += [0] * 3
+
+ # Random data 1 / 2
+ for i in range(0, 39):
+ buf.append(random.randint(0, 1))
+
+ # Training sequence
+ buf += self.sb_tsc
+
+ # Random data 2 / 2
+ for i in range(0, 39):
+ buf.append(random.randint(0, 1))
+
+ # Tailing bits
+ buf += [0] * 3
+
+ return buf
+
+ # Generate a dummy burst
+ def gen_db(self):
+ return self.db_bits
+
+ # Generate an access burst
+ def gen_ab(self):
+ buf = []
+
+ # Tailing bits
+ buf += [0] * 8
+
+ # Training sequence
+ buf += self.ab_tsc
+
+ # Random data
+ for i in range(0, 36):
+ buf.append(random.randint(0, 1))
+
+ # Tailing bits
+ buf += [0] * 3
+
+ # Guard period
+ buf += [0] * 60
+
+ return buf
diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py
new file mode 100755
index 00000000..577e6f97
--- /dev/null
+++ b/src/target/trx_toolkit/trx_sniff.py
@@ -0,0 +1,286 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Scapy-based TRX interface sniffer
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+from copyright import print_copyright
+CR_HOLDERS = [("2018", "Vadim Yanitskiy <axilirator@gmail.com>")]
+
+import signal
+import getopt
+import sys
+
+import scapy.all
+
+from data_dump import DATADumpFile
+from data_msg import *
+
+class Application:
+ # Application variables
+ sniff_interface = "lo"
+ sniff_base_port = 5700
+ print_bursts = False
+ output_file = None
+
+ # Counters
+ cnt_burst_dropped_num = 0
+ cnt_burst_break = None
+ cnt_burst_num = 0
+
+ cnt_frame_break = None
+ cnt_frame_last = None
+ cnt_frame_num = 0
+
+ # Burst direction fliter
+ bf_dir_l12trx = None
+
+ # Timeslot number filter
+ bf_tn_val = None
+
+ # Frame number fliter
+ bf_fn_lt = None
+ bf_fn_gt = None
+
+ # Internal variables
+ lo_trigger = False
+
+ def __init__(self):
+ print_copyright(CR_HOLDERS)
+ self.parse_argv()
+
+ # Open requested capture file
+ if self.output_file is not None:
+ self.ddf = DATADumpFile(self.output_file)
+
+ def run(self):
+ # Compose a packet filter
+ pkt_filter = "udp and (port %d or port %d)" \
+ % (self.sniff_base_port + 2, self.sniff_base_port + 102)
+
+ print("[i] Listening on interface '%s'..." % self.sniff_interface)
+
+ # Start sniffing...
+ scapy.all.sniff(iface = self.sniff_interface, store = 1,
+ filter = pkt_filter, prn = self.pkt_handler)
+
+ # Scapy registers its own signal handler
+ self.shutdown()
+
+ def pkt_handler(self, ether):
+ # Prevent loopback packet duplication
+ if self.sniff_interface == "lo":
+ self.lo_trigger = not self.lo_trigger
+ if not self.lo_trigger:
+ return
+
+ # Extract a TRX payload
+ ip = ether.payload
+ udp = ip.payload
+ trx = udp.payload
+
+ # Convert to bytearray
+ msg_raw = bytearray(str(trx))
+
+ # Determine a burst direction (L1 <-> TRX)
+ l12trx = udp.sport > udp.dport
+
+ # Create an empty DATA message
+ msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1()
+
+ # Attempt to parse the payload as a DATA message
+ try:
+ msg.parse_msg(msg_raw)
+ except:
+ print("[!] Failed to parse message, dropping...")
+ self.cnt_burst_dropped_num += 1
+ return
+
+ # Poke burst pass filter
+ rc = self.burst_pass_filter(l12trx, msg.fn, msg.tn)
+ if rc is False:
+ self.cnt_burst_dropped_num += 1
+ return
+
+ # Debug print
+ print("[i] %s burst: %s" \
+ % ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr()))
+
+ # Poke message handler
+ self.msg_handle(msg)
+
+ # Poke burst counter
+ rc = self.burst_count(msg.fn, msg.tn)
+ if rc is True:
+ self.shutdown()
+
+ def burst_pass_filter(self, l12trx, fn, tn):
+ # Direction filter
+ if self.bf_dir_l12trx is not None:
+ if l12trx != self.bf_dir_l12trx:
+ return False
+
+ # Timeslot filter
+ if self.bf_tn_val is not None:
+ if tn != self.bf_tn_val:
+ return False
+
+ # Frame number filter
+ if self.bf_fn_lt is not None:
+ if fn > self.bf_fn_lt:
+ return False
+ if self.bf_fn_gt is not None:
+ if fn < self.bf_fn_gt:
+ return False
+
+ # Burst passed ;)
+ return True
+
+ def msg_handle(self, msg):
+ if self.print_bursts:
+ print(msg.burst)
+
+ # Append a new message to the capture
+ if self.output_file is not None:
+ self.ddf.append_msg(msg)
+
+ def burst_count(self, fn, tn):
+ # Update frame counter
+ if self.cnt_frame_last is None:
+ self.cnt_frame_last = fn
+ self.cnt_frame_num += 1
+ else:
+ if fn != self.cnt_frame_last:
+ self.cnt_frame_num += 1
+
+ # Update burst counter
+ self.cnt_burst_num += 1
+
+ # Stop sniffing after N bursts
+ if self.cnt_burst_break is not None:
+ if self.cnt_burst_num == self.cnt_burst_break:
+ print("[i] Collected required amount of bursts")
+ return True
+
+ # Stop sniffing after N frames
+ if self.cnt_frame_break is not None:
+ if self.cnt_frame_num == self.cnt_frame_break:
+ print("[i] Collected required amount of frames")
+ return True
+
+ return False
+
+ def shutdown(self):
+ print("[i] Shutting down...")
+
+ # Print statistics
+ print("[i] %u bursts handled, %u dropped" \
+ % (self.cnt_burst_num, self.cnt_burst_dropped_num))
+
+ # Exit
+ sys.exit(0)
+
+ def print_help(self, msg = None):
+ s = " Usage: " + sys.argv[0] + " [options]\n\n" \
+ " Some help...\n" \
+ " -h --help this text\n\n"
+
+ s += " Sniffing options\n" \
+ " -i --sniff-interface Set network interface (default '%s')\n" \
+ " -p --sniff-base-port Set base port number (default %d)\n\n"
+
+ s += " Processing (no processing by default)\n" \
+ " -o --output-file Write bursts to file\n" \
+ " -v --print-bits Print burst bits to stdout\n\n" \
+
+ s += " Count limitations (disabled by default)\n" \
+ " --frame-count NUM Stop after sniffing NUM frames\n" \
+ " --burst-count NUM Stop after sniffing NUM bursts\n\n"
+
+ s += " Filtering (disabled by default)\n" \
+ " --direction DIR Burst direction: L12TRX or TRX2L1\n" \
+ " --timeslot NUM TDMA timeslot number [0..7]\n" \
+ " --frame-num-lt NUM TDMA frame number lower than NUM\n" \
+ " --burst-num-gt NUM TDMA frame number greater than NUM\n"
+
+ print(s % (self.sniff_interface, self.sniff_base_port))
+
+ if msg is not None:
+ print(msg)
+
+ def parse_argv(self):
+ try:
+ opts, args = getopt.getopt(sys.argv[1:],
+ "i:p:o:v:h", ["help", "sniff-interface=", "sniff-base-port=",
+ "frame-count=", "burst-count=", "direction=",
+ "timeslot=", "frame-num-lt=", "frame-num-gt=",
+ "output-file=", "print-bits"])
+ except getopt.GetoptError as err:
+ self.print_help("[!] " + str(err))
+ sys.exit(2)
+
+ for o, v in opts:
+ if o in ("-h", "--help"):
+ self.print_help()
+ sys.exit(2)
+
+ elif o in ("-i", "--sniff-interface"):
+ self.sniff_interface = v
+ elif o in ("-p", "--sniff-base-port"):
+ self.sniff_base_port = int(v)
+
+ elif o in ("-o", "--output-file"):
+ self.output_file = v
+ elif o in ("-v", "--print-bits"):
+ self.print_bursts = True
+
+ # Break counters
+ elif o == "--frame-count":
+ self.cnt_frame_break = int(v)
+ elif o == "--burst-count":
+ self.cnt_burst_break = int(v)
+
+ # Direction filter
+ elif o == "--direction":
+ if v == "L12TRX":
+ self.bf_dir_l12trx = True
+ elif v == "TRX2L1":
+ self.bf_dir_l12trx = False
+ else:
+ self.print_help("[!] Wrong direction argument")
+ sys.exit(2)
+
+ # Timeslot pass filter
+ elif o == "--timeslot":
+ self.bf_tn_val = int(v)
+ if self.bf_tn_val < 0 or self.bf_tn_val > 7:
+ self.print_help("[!] Wrong timeslot value")
+ sys.exit(2)
+
+ # Frame number pass filter
+ elif o == "--frame-num-lt":
+ self.bf_fn_lt = int(v)
+ elif o == "--frame-num-gt":
+ self.bf_fn_gt = int(v)
+
+if __name__ == '__main__':
+ app = Application()
+ app.run()
diff --git a/src/target/trx_toolkit/udp_link.py b/src/target/trx_toolkit/udp_link.py
new file mode 100644
index 00000000..b378b635
--- /dev/null
+++ b/src/target/trx_toolkit/udp_link.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# UDP link implementation
+#
+# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import socket
+
+class UDPLink:
+ def __init__(self, remote_addr, remote_port, bind_addr = '0.0.0.0', bind_port = 0):
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.sock.bind((bind_addr, bind_port))
+ self.sock.setblocking(0)
+
+ # Save remote info
+ self.remote_addr = remote_addr
+ self.remote_port = remote_port
+
+ def __del__(self):
+ self.sock.close()
+
+ def desc_link(self):
+ (bind_addr, bind_port) = self.sock.getsockname()
+
+ return "L:%s:%u <-> R:%s:%u" \
+ % (bind_addr, bind_port, self.remote_addr, self.remote_port)
+
+ def send(self, data):
+ if type(data) not in [bytearray, bytes]:
+ data = data.encode()
+
+ self.sock.sendto(data, (self.remote_addr, self.remote_port))
+
+ def sendto(self, data, remote):
+ if type(data) not in [bytearray, bytes]:
+ data = data.encode()
+
+ self.sock.sendto(data, remote)