From 51dd1dc7036f5a6752fa52dc1a51da7e1ddb25e9 Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Fri, 7 Dec 2018 06:52:59 +0700 Subject: trx_toolkit/burst_gen.py: migrate from getopt to argparse Change-Id: I7eb3f2e2713f1f97293bd47a2eae3b140f63fb59 --- src/target/trx_toolkit/burst_gen.py | 233 ++++++++++++++---------------------- 1 file changed, 88 insertions(+), 145 deletions(-) (limited to 'src/target/trx_toolkit') diff --git a/src/target/trx_toolkit/burst_gen.py b/src/target/trx_toolkit/burst_gen.py index 0268da9a..7625a273 100755 --- a/src/target/trx_toolkit/burst_gen.py +++ b/src/target/trx_toolkit/burst_gen.py @@ -28,7 +28,7 @@ CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy ")] import logging as log import signal -import getopt +import argparse import sys from rand_burst_gen import RandBurstGen @@ -38,29 +38,9 @@ from gsm_shared import * from data_msg import * class Application: - # Application variables - remote_addr = "127.0.0.1" - bind_addr = "0.0.0.0" - base_port = 5700 - conn_mode = "TRX" - output_file = None - - burst_type = None - burst_count = 1 - - # Common header fields - fn = None - tn = None - - # Message specific header fields - toa256 = None - rssi = None - pwr = None - def __init__(self): print_copyright(CR_HOLDERS) - self.parse_argv() - self.check_argv() + self.argv = self.parse_argv() # Set up signal handlers signal.signal(signal.SIGINT, self.sig_handler) @@ -70,32 +50,35 @@ class Application: format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") # Open requested capture file - if self.output_file is not None: - self.ddf = DATADumpFile(self.output_file) + if self.argv.output_file is not None: + self.ddf = DATADumpFile(self.argv.output_file) def run(self): # Init DATA interface with TRX or L1 - if self.conn_mode == "TRX": - self.data_if = DATAInterface(self.remote_addr, self.base_port + 2, - self.bind_addr, self.base_port + 102) - elif self.conn_mode == "L1": - self.data_if = DATAInterface(self.remote_addr, self.base_port + 102, - self.bind_addr, self.base_port + 2) + if self.argv.conn_mode == "TRX": + self.data_if = DATAInterface( + self.argv.remote_addr, self.argv.base_port + 2, + self.argv.bind_addr, self.argv.base_port + 102) + elif self.argv.conn_mode == "L1": + self.data_if = DATAInterface( + self.argv.remote_addr, self.argv.base_port + 102, + self.argv.bind_addr, self.argv.base_port + 2) # Init random burst generator burst_gen = RandBurstGen() # Init an empty DATA message - if self.conn_mode == "TRX": + if self.argv.conn_mode == "TRX": msg = DATAMSG_L12TRX() - elif self.conn_mode == "L1": + elif self.argv.conn_mode == "L1": msg = DATAMSG_TRX2L1() # Generate a random frame number or use provided one - fn_init = msg.rand_fn() if self.fn is None else self.fn + fn_init = msg.rand_fn() if self.argv.tdma_fn is None \ + else self.argv.tdma_fn # Send as much bursts as required - for i in range(self.burst_count): + for i in range(self.argv.burst_count): # Randomize the message header msg.rand_hdr() @@ -103,145 +86,105 @@ class Application: msg.fn = (fn_init + i) % GSM_HYPERFRAME # Set timeslot number - if self.tn is not None: - msg.tn = self.tn + if self.argv.tdma_tn is not None: + msg.tn = self.argv.tdma_tn # Set transmit power level - if self.pwr is not None: - msg.pwr = self.pwr + if self.argv.pwr is not None: + msg.pwr = self.argv.pwr # Set time of arrival - if self.toa256 is not None: - msg.toa256 = self.toa256 + if self.argv.toa is not None: + msg.toa256 = int(float(self.argv.toa) * 256.0 + 0.5) + elif self.argv.toa256 is not None: + msg.toa256 = self.argv.toa256 # Set RSSI - if self.rssi is not None: - msg.rssi = self.rssi + if self.argv.rssi is not None: + msg.rssi = self.argv.rssi # Generate a random burst - if self.burst_type == "NB": + if self.argv.burst_type == "NB": burst = burst_gen.gen_nb() - elif self.burst_type == "FB": + elif self.argv.burst_type == "FB": burst = burst_gen.gen_fb() - elif self.burst_type == "SB": + elif self.argv.burst_type == "SB": burst = burst_gen.gen_sb() - elif self.burst_type == "AB": + elif self.argv.burst_type == "AB": burst = burst_gen.gen_ab() # Convert to soft-bits in case of TRX -> L1 message - if self.conn_mode == "L1": + if self.argv.conn_mode == "L1": burst = msg.ubit2sbit(burst) # Set burst msg.burst = burst log.info("Sending %d/%d %s burst %s to %s..." - % (i + 1, self.burst_count, self.burst_type, - msg.desc_hdr(), self.conn_mode)) + % (i + 1, self.argv.burst_count, self.argv.burst_type, + msg.desc_hdr(), self.argv.conn_mode)) # Send message self.data_if.send_msg(msg) # Append a new message to the capture - if self.output_file is not None: + if self.argv.output_file is not None: self.ddf.append_msg(msg) - def print_help(self, msg = None): - s = " Usage: " + sys.argv[0] + " [options]\n\n" \ - " Some help...\n" \ - " -h --help this text\n\n" - - s += " TRX interface specific\n" \ - " -o --output-file Write bursts to a capture file\n" \ - " -m --conn-mode Send bursts to: TRX (default) / L1\n" \ - " -r --remote-addr Set remote address (default %s)\n" \ - " -b --bind-addr Set local address (default %s)\n" \ - " -p --base-port Set base port number (default %d)\n\n" - - s += " Burst generation\n" \ - " -B --burst-type Random burst type (NB, FB, SB, AB)\n" \ - " -c --burst-count How much bursts to send (default 1)\n" \ - " -f --frame-number Set frame number (default random)\n" \ - " -t --timeslot Set timeslot index (default random)\n" \ - " --pwr Set power level (default random)\n" \ - " --rssi Set RSSI (default random)\n" \ - " --toa Set ToA in symbols (default random)\n" \ - " --toa256 Set ToA in 1/256 symbol periods\n" - - print(s % (self.remote_addr, self.bind_addr, self.base_port)) - - if msg is not None: - print(msg) - def parse_argv(self): - try: - opts, args = getopt.getopt(sys.argv[1:], - "o:m:r:b:p:B:c:f:t:h", - [ - "help", - "output-file=" - "conn-mode=", - "remote-addr=", - "bind-addr=", - "base-port=", - "burst-type=", - "burst-count=", - "frame-number=", - "timeslot=", - "rssi=", - "toa=", - "toa256=", - "pwr=", - ]) - except getopt.GetoptError as err: - self.print_help("[!] " + str(err)) - sys.exit(2) - - for o, v in opts: - if o in ("-h", "--help"): - self.print_help() - sys.exit(2) - - elif o in ("-o", "--output-file"): - self.output_file = v - elif o in ("-m", "--conn-mode"): - self.conn_mode = v - elif o in ("-r", "--remote-addr"): - self.remote_addr = v - elif o in ("-b", "--bind-addr"): - self.bind_addr = v - elif o in ("-p", "--base-port"): - self.base_port = int(v) - - elif o in ("-B", "--burst-type"): - self.burst_type = v - elif o in ("-c", "--burst-count"): - self.burst_count = int(v) - elif o in ("-f", "--frame-number"): - self.fn = int(v) - elif o in ("-t", "--timeslot"): - self.tn = int(v) - - # Message specific header fields - elif o == "--pwr": - self.pwr = int(v) - elif o == "--rssi": - self.rssi = int(v) - elif o == "--toa256": - self.toa256 = int(v) - elif o == "--toa": - self.toa256 = int(float(v) * 256.0 + 0.5) - - def check_argv(self): - # Check connection mode - if self.conn_mode not in ("TRX", "L1"): - self.print_help("[!] Unknown connection type") - sys.exit(2) - - # Check connection mode - if self.burst_type not in ("NB", "FB", "SB", "AB"): - self.print_help("[!] Unknown burst type") - sys.exit(2) + parser = argparse.ArgumentParser(prog = "burst_gen", + description = "Auxiliary tool to generate and send random bursts") + + trx_group = parser.add_argument_group("TRX interface") + trx_group.add_argument("-r", "--remote-addr", + dest = "remote_addr", type = str, default = "127.0.0.1", + help = "Set remote address (default %(default)s)") + trx_group.add_argument("-b", "--bind-addr", + dest = "bind_addr", type = str, default = "0.0.0.0", + help = "Set bind address (default %(default)s)") + trx_group.add_argument("-p", "--base-port", + dest = "base_port", type = int, default = 6700, + help = "Set base port number (default %(default)s)") + trx_group.add_argument("-m", "--conn-mode", + dest = "conn_mode", type = str, + choices = ["TRX", "L1"], default = "TRX", + help = "Where to send bursts (default %(default)s)") + trx_group.add_argument("-o", "--output-file", + dest = "output_file", type = str, + help = "Write bursts to a capture file") + + bg_group = parser.add_argument_group("Burst generation") + bg_group.add_argument("-B", "--burst-type", + dest = "burst_type", type = str, + choices = ["NB", "FB", "SB", "AB"], default = "NB", + help = "Random burst type (default %(default)s)") + bg_group.add_argument("-c", "--burst-count", metavar = "N", + dest = "burst_count", type = int, default = 1, + help = "How many bursts to send (default %(default)s)") + bg_group.add_argument("-f", "--frame-number", metavar = "FN", + dest = "tdma_fn", type = int, + help = "Set TDMA frame number (default random)") + bg_group.add_argument("-t", "--timeslot", metavar = "TN", + dest = "tdma_tn", type = int, choices = range(0, 8), + help = "Set TDMA timeslot (default random)") + + bg_pwr_group = bg_group.add_mutually_exclusive_group() + bg_pwr_group.add_argument("--pwr", metavar = "dBm", + dest = "pwr", type = int, + help = "Set power level (default random)") + bg_pwr_group.add_argument("--rssi", metavar = "dBm", + dest = "rssi", type = int, + help = "Set RSSI (default random)") + + bg_toa_group = bg_group.add_mutually_exclusive_group() + bg_toa_group.add_argument("--toa", + dest = "toa", type = int, + help = "Set Timing of Arrival in symbols (default random)") + bg_toa_group.add_argument("--toa256", + dest = "toa256", type = int, + help = "Set Timing of Arrival in 1/256 symbol periods") + + return parser.parse_args() def sig_handler(self, signum, frame): log.info("Signal %d received" % signum) -- cgit v1.2.3