summaryrefslogtreecommitdiffstats
path: root/src/target/fake_trx/burst_send.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/target/fake_trx/burst_send.py')
-rwxr-xr-xsrc/target/fake_trx/burst_send.py205
1 files changed, 100 insertions, 105 deletions
diff --git a/src/target/fake_trx/burst_send.py b/src/target/fake_trx/burst_send.py
index b51ce5d4..87f9228b 100755
--- a/src/target/fake_trx/burst_send.py
+++ b/src/target/fake_trx/burst_send.py
@@ -25,12 +25,13 @@ import signal
import getopt
import sys
+from data_dump import DATADumpFile
from data_if import DATAInterface
from gsm_shared import *
from data_msg import *
COPYRIGHT = \
- "Copyright (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
+ "Copyright (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
"License GPLv2+: GNU GPL version 2 or later " \
"<http://gnu.org/licenses/gpl.html>\n" \
"This is free software: you are free to change and redistribute it.\n" \
@@ -42,16 +43,17 @@ class Application:
base_port = 5700
conn_mode = "TRX"
- burst_src = None
-
- # Common header fields
- fn = None
- tn = None
+ # Burst source
+ capture_file = None
- # Message specific header fields
- rssi = None
- toa = None
- pwr = None
+ # Count limitations
+ msg_skip = None
+ msg_count = None
+
+ # Pass filtering
+ pf_fn_lt = None
+ pf_fn_gt = None
+ pf_tn = None
def __init__(self):
self.print_copyright()
@@ -60,81 +62,37 @@ class Application:
# Set up signal handlers
signal.signal(signal.SIGINT, self.sig_handler)
+ # Open requested capture file
+ self.ddf = DATADumpFile(self.capture_file)
+
def run(self):
# Init DATA interface with TRX or L1
if self.conn_mode == "TRX":
self.data_if = DATAInterface(self.remote_addr,
self.base_port + 2, self.base_port + 102)
+ l12trx = True
elif self.conn_mode == "L1":
self.data_if = DATAInterface(self.remote_addr,
self.base_port + 102, self.base_port + 2)
+ l12trx = False
else:
self.print_help("[!] Unknown connection type")
sys.exit(2)
- # Open the burst source (file or stdin)
- if self.burst_src is not None:
- print("[i] Reading bursts from file '%s'..." % self.burst_src)
- src = open(self.burst_src, "r")
- else:
- print("[i] Reading bursts from stdin...")
- src = sys.stdin
-
- # Init an empty DATA message
- if self.conn_mode == "TRX":
- msg = DATAMSG_L12TRX()
- elif self.conn_mode == "L1":
- msg = DATAMSG_TRX2L1()
-
- # Generate a random frame number or use provided one
- fn = msg.rand_fn() if self.fn is None else self.fn
-
- # Read the burst source line-by-line
- for line in src:
- # Strip spaces
- burst_str = line.strip()
- burst = []
+ # Read messages from the capture
+ messages = self.ddf.parse_all(
+ skip = self.msg_skip, count = self.msg_count)
+ if messages is False:
+ pass # FIXME!!!
- # Check length
- if len(burst_str) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
- print("[!] Dropping burst due to incorrect length")
+ for msg in messages:
+ # Pass filter
+ if not self.msg_pass_filter(l12trx, msg):
continue
- # Randomize the message header
- msg.rand_hdr()
-
- # Set frame number
- msg.fn = fn
-
- # Set timeslot number
- if self.tn is not None:
- msg.tn = self.tn
-
- # Set transmit power level
- if self.pwr is not None:
- msg.pwr = self.pwr
-
- # Set time of arrival
- if self.toa is not None:
- msg.toa = self.toa
-
- # Set RSSI
- if self.rssi is not None:
- msg.rssi = self.rssi
-
- # Parse a string
- for bit in burst_str:
- if bit == "1":
- burst.append(1)
- else:
- burst.append(0)
-
- # Convert to soft-bits in case of TRX -> L1 message
- if self.conn_mode == "L1":
- burst = msg.ubit2sbit(burst)
-
- # Set burst
- msg.burst = burst
+ # HACK: as ToA parsing is not implemented yet,
+ # we have to use a fixed 0.00 value for now...
+ msg.toa = 0.00
print("[i] Sending a burst %s to %s..."
% (msg.desc_hdr(), self.conn_mode))
@@ -142,32 +100,56 @@ class Application:
# Send message
self.data_if.send_msg(msg)
- # Increase frame number (for count > 1)
- fn = (fn + 1) % GSM_HYPERFRAME
-
# Finish
self.shutdown()
+ def msg_pass_filter(self, l12trx, msg):
+ # Direction filter
+ if isinstance(msg, DATAMSG_L12TRX) and not l12trx:
+ return False
+ elif isinstance(msg, DATAMSG_TRX2L1) and l12trx:
+ return False
+
+ # Timeslot filter
+ if self.pf_tn is not None:
+ if msg.tn != self.pf_tn:
+ return False
+
+ # Frame number filter
+ if self.pf_fn_lt is not None:
+ if msg.fn > self.pf_fn_lt:
+ return False
+ if self.pf_fn_gt is not None:
+ if msg.fn < self.pf_fn_gt:
+ return False
+
+ # Burst passed ;)
+ return True
+
def print_copyright(self):
print(COPYRIGHT)
def print_help(self, msg = None):
s = " Usage: " + sys.argv[0] + " [options]\n\n" \
" Some help...\n" \
- " -h --help this text\n\n"
+ " -h --help this text\n\n"
s += " TRX interface specific\n" \
- " -m --conn-mode Send bursts to: TRX (default) / L1\n" \
- " -r --remote-addr Set remote address (default %s)\n" \
- " -p --base-port Set base port number (default %d)\n\n"
-
- s += " Burst generation\n" \
- " -i --burst-file Read bursts from file (default stdin)\n" \
- " -f --frame-number Set frame number (default random)\n" \
- " -t --timeslot Set timeslot index (default random)\n" \
- " --pwr Set power level (default random)\n" \
- " --rssi Set RSSI (default random)\n" \
- " --toa Set TOA (default random)\n\n"
+ " -m --conn-mode Send bursts to: TRX (default) / L1\n" \
+ " -r --remote-addr Set remote address (default %s)\n" \
+ " -p --base-port Set base port number (default %d)\n\n"
+
+ s += " Burst source\n" \
+ " -i --capture-file Read bursts from capture file\n\n" \
+
+ s += " Count limitations (disabled by default)\n" \
+ " --msg-skip NUM Skip NUM messages before sending\n" \
+ " --msg-count NUM Stop after sending NUM messages\n\n" \
+
+ s += " Filtering (disabled by default)\n" \
+ " --timeslot NUM TDMA timeslot number [0..7]\n" \
+ " --frame-num-lt NUM TDMA frame number lower than NUM\n" \
+ " --frame-num-gt NUM TDMA frame number greater than NUM\n"
print(s % (self.remote_addr, self.base_port))
@@ -177,18 +159,18 @@ class Application:
def parse_argv(self):
try:
opts, args = getopt.getopt(sys.argv[1:],
- "m:r:p:i:f:t:h",
+ "m:r:p:i:h",
[
"help",
"conn-mode=",
"remote-addr=",
"base-port=",
- "burst-file=",
- "frame-number=",
+ "capture-file=",
+ "msg-skip=",
+ "msg-count=",
"timeslot=",
- "rssi=",
- "toa=",
- "pwr=",
+ "frame-num-lt=",
+ "frame-num-gt=",
])
except getopt.GetoptError as err:
self.print_help("[!] " + str(err))
@@ -199,6 +181,11 @@ class Application:
self.print_help()
sys.exit(2)
+ # Capture file
+ elif o in ("-i", "--capture-file"):
+ self.capture_file = v
+
+ # TRX interface specific
elif o in ("-m", "--conn-mode"):
self.conn_mode = v
elif o in ("-r", "--remote-addr"):
@@ -206,20 +193,28 @@ class Application:
elif o in ("-p", "--base-port"):
self.base_port = int(v)
- elif o in ("-i", "--burst-file"):
- self.burst_src = v
- elif o in ("-f", "--frame-number"):
- self.fn = int(v)
- elif o in ("-t", "--timeslot"):
- self.tn = int(v)
-
- # Message specific header fields
- elif o == "--pwr":
- self.pwr = int(v)
- elif o == "--rssi":
- self.rssi = int(v)
- elif o == "--toa":
- self.toa = float(v)
+ # Count limitations
+ elif o == "--msg-skip":
+ self.msg_skip = int(v)
+ elif o == "--msg-count":
+ self.msg_count = int(v)
+
+ # Timeslot pass filter
+ elif o == "--timeslot":
+ self.pf_tn = int(v)
+ if self.pf_tn < 0 or self.pf_tn > 7:
+ self.print_help("[!] Wrong timeslot value")
+ sys.exit(2)
+
+ # Frame number pass filter
+ elif o == "--frame-num-lt":
+ self.pf_fn_lt = int(v)
+ elif o == "--frame-num-gt":
+ self.pf_fn_gt = int(v)
+
+ if self.capture_file is None:
+ self.print_help("[!] Please specify a capture file")
+ sys.exit(2)
def shutdown(self):
self.data_if.shutdown()