From 3fee14b5a30cb9c5d290c54fd58a6f4fb0012d63 Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Fri, 7 Dec 2018 07:15:45 +0700 Subject: trx_toolkit/burst_send.py: migrate from getopt to argparse Change-Id: I1be66aa022a79aa1683f0e6cfebaed568b1736b1 --- src/target/trx_toolkit/burst_send.py | 194 ++++++++++++----------------------- 1 file changed, 63 insertions(+), 131 deletions(-) (limited to 'src/target') diff --git a/src/target/trx_toolkit/burst_send.py b/src/target/trx_toolkit/burst_send.py index 787e0fc3..499e929d 100755 --- a/src/target/trx_toolkit/burst_send.py +++ b/src/target/trx_toolkit/burst_send.py @@ -27,7 +27,7 @@ CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy ")] import logging as log import signal -import getopt +import argparse import sys from data_dump import DATADumpFile @@ -36,27 +36,9 @@ from gsm_shared import * from data_msg import * class Application: - # Application variables - remote_addr = "127.0.0.1" - bind_addr = "0.0.0.0" - base_port = 5700 - conn_mode = "TRX" - - # Burst source - capture_file = None - - # Count limitations - msg_skip = None - msg_count = None - - # Pass filtering - pf_fn_lt = None - pf_fn_gt = None - pf_tn = None - def __init__(self): print_copyright(CR_HOLDERS) - self.parse_argv() + self.argv = self.parse_argv() # Set up signal handlers signal.signal(signal.SIGINT, self.sig_handler) @@ -66,152 +48,102 @@ class Application: format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") # Open requested capture file - self.ddf = DATADumpFile(self.capture_file) + self.ddf = DATADumpFile(self.argv.capture_file) def run(self): # Init DATA interface with TRX or L1 - if self.conn_mode == "TRX": - self.data_if = DATAInterface(self.remote_addr, self.base_port + 2, - self.bind_addr, self.base_port + 102) - l12trx = True - elif self.conn_mode == "L1": - self.data_if = DATAInterface(self.remote_addr, self.base_port + 102, - self.bind_addr, self.base_port + 2) - l12trx = False - else: - self.print_help("[!] Unknown connection type") - sys.exit(2) + if self.argv.conn_mode == "TRX": + self.data_if = DATAInterface( + self.argv.remote_addr, self.argv.base_port + 2, + self.argv.bind_addr, self.argv.base_port + 102) + elif self.argv.conn_mode == "L1": + self.data_if = DATAInterface( + self.argv.remote_addr, self.argv.base_port + 102, + self.argv.bind_addr, self.argv.base_port + 2) # Read messages from the capture messages = self.ddf.parse_all( - skip = self.msg_skip, count = self.msg_count) + skip = self.argv.cnt_skip, count = self.argv.cnt_count) if messages is False: pass # FIXME!!! for msg in messages: # Pass filter - if not self.msg_pass_filter(l12trx, msg): + if not self.msg_pass_filter(msg): continue log.info("Sending a burst %s to %s..." - % (msg.desc_hdr(), self.conn_mode)) + % (msg.desc_hdr(), self.argv.conn_mode)) # Send message self.data_if.send_msg(msg) - def msg_pass_filter(self, l12trx, msg): + def msg_pass_filter(self, msg): # Direction filter + l12trx = self.argv.conn_mode == "TRX" if isinstance(msg, DATAMSG_L12TRX) and not l12trx: return False elif isinstance(msg, DATAMSG_TRX2L1) and l12trx: return False # Timeslot filter - if self.pf_tn is not None: - if msg.tn != self.pf_tn: + if self.argv.pf_tn is not None: + if msg.tn != self.argv.pf_tn: return False # Frame number filter - if self.pf_fn_lt is not None: - if msg.fn > self.pf_fn_lt: + if self.argv.pf_fn_lt is not None: + if msg.fn > self.argv.pf_fn_lt: return False - if self.pf_fn_gt is not None: - if msg.fn < self.pf_fn_gt: + if self.argv.pf_fn_gt is not None: + if msg.fn < self.argv.pf_fn_gt: return False # Burst passed ;) return True - def print_help(self, msg = None): - s = " Usage: " + sys.argv[0] + " [options]\n\n" \ - " Some help...\n" \ - " -h --help this text\n\n" - - s += " TRX interface specific\n" \ - " -m --conn-mode Send bursts to: TRX (default) / L1\n" \ - " -r --remote-addr Set remote address (default %s)\n" \ - " -b --bind-addr Set bind address (default %s)\n" \ - " -p --base-port Set base port number (default %d)\n\n" - - s += " Burst source\n" \ - " -i --capture-file Read bursts from capture file\n\n" \ - - s += " Count limitations (disabled by default)\n" \ - " --msg-skip NUM Skip NUM messages before sending\n" \ - " --msg-count NUM Stop after sending NUM messages\n\n" \ - - s += " Filtering (disabled by default)\n" \ - " --timeslot NUM TDMA timeslot number [0..7]\n" \ - " --frame-num-lt NUM TDMA frame number lower than NUM\n" \ - " --frame-num-gt NUM TDMA frame number greater than NUM\n" - - print(s % (self.remote_addr, self.bind_addr, self.base_port)) - - if msg is not None: - print(msg) - def parse_argv(self): - try: - opts, args = getopt.getopt(sys.argv[1:], - "m:r:b:p:i:h", - [ - "help", - "conn-mode=", - "remote-addr=", - "bind-addr=", - "base-port=", - "capture-file=", - "msg-skip=", - "msg-count=", - "timeslot=", - "frame-num-lt=", - "frame-num-gt=", - ]) - except getopt.GetoptError as err: - self.print_help("[!] " + str(err)) - sys.exit(2) - - for o, v in opts: - if o in ("-h", "--help"): - self.print_help() - sys.exit(2) - - # Capture file - elif o in ("-i", "--capture-file"): - self.capture_file = v - - # TRX interface specific - elif o in ("-m", "--conn-mode"): - self.conn_mode = v - elif o in ("-r", "--remote-addr"): - self.remote_addr = v - elif o in ("-b", "--bind-addr"): - self.bind_addr = v - elif o in ("-p", "--base-port"): - self.base_port = int(v) - - # Count limitations - elif o == "--msg-skip": - self.msg_skip = int(v) - elif o == "--msg-count": - self.msg_count = int(v) - - # Timeslot pass filter - elif o == "--timeslot": - self.pf_tn = int(v) - if self.pf_tn < 0 or self.pf_tn > 7: - self.print_help("[!] Wrong timeslot value") - sys.exit(2) - - # Frame number pass filter - elif o == "--frame-num-lt": - self.pf_fn_lt = int(v) - elif o == "--frame-num-gt": - self.pf_fn_gt = int(v) - - if self.capture_file is None: - self.print_help("[!] Please specify a capture file") - sys.exit(2) + parser = argparse.ArgumentParser(prog = "burst_send", + description = "Auxiliary tool to send (reply) captured bursts") + + trx_group = parser.add_argument_group("TRX interface") + trx_group.add_argument("-r", "--remote-addr", + dest = "remote_addr", type = str, default = "127.0.0.1", + help = "Set remote address (default %(default)s)") + trx_group.add_argument("-b", "--bind-addr", + dest = "bind_addr", type = str, default = "0.0.0.0", + help = "Set bind address (default %(default)s)") + trx_group.add_argument("-p", "--base-port", + dest = "base_port", type = int, default = 6700, + help = "Set base port number (default %(default)s)") + trx_group.add_argument("-m", "--conn-mode", + dest = "conn_mode", type = str, + choices = ["TRX", "L1"], default = "TRX", + help = "Where to send bursts (default %(default)s)") + trx_group.add_argument("-i", "--capture-file", metavar = "FILE", + dest = "capture_file", type = str, required = True, + help = "Capture file to read bursts from") + + cnt_group = parser.add_argument_group("Count limitations (optional)") + cnt_group.add_argument("--skip", metavar = "N", + dest = "cnt_skip", type = int, + help = "Skip N messages before sending") + cnt_group.add_argument("--count", metavar = "N", + dest = "cnt_count", type = int, + help = "Stop after sending N messages") + + pf_group = parser.add_argument_group("Filtering (optional)") + cnt_group.add_argument("--timeslot", metavar = "TN", + dest = "pf_tn", type = int, choices = range(0, 8), + help = "TDMA timeslot number (equal TN)") + cnt_group.add_argument("--frame-num-lt", metavar = "FN", + dest = "pf_fn_lt", type = int, + help = "TDMA frame number (lower than FN)") + cnt_group.add_argument("--frame-num-gt", metavar = "FN", + dest = "pf_fn_gt", type = int, + help = "TDMA frame number (greater than FN)") + + return parser.parse_args() def sig_handler(self, signum, frame): log.info("Signal %d received" % signum) -- cgit v1.2.3