From 6bab6acee634fcd05a175428e1077ab00188829a Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Fri, 7 Dec 2018 05:00:26 +0700 Subject: trx_toolkit: use generic logging module instead of print() There are multiple advantages of using Python's logging module: - advanced message formatting (file name, line number, etc.), - multiple logging targets (e.g. stderr, file, socket), - logging levels (e.g. DEBUG, INFO, ERROR), - the pythonic way ;) so, let's replace multiple print() calls by logging calls, add use the following logging message format by default: [%(levelname)s] %(filename)s:%(lineno)d %(message)s Examples: [INFO] ctrl_if_bts.py:57 Starting transceiver... [DEBUG] clck_gen.py:87 IND CLOCK 26826 [DEBUG] ctrl_if_bts.py:71 Recv POWEROFF cmd [INFO] ctrl_if_bts.py:73 Stopping transceiver... [INFO] fake_trx.py:127 Shutting down... Please note that there is no way to filter messages by logging level yet. This is to be introduced soon, together with argparse. Change-Id: I7fcafabafe8323b58990997a47afdd48b6d1f357 --- src/target/trx_toolkit/burst_fwd.py | 9 +++--- src/target/trx_toolkit/burst_gen.py | 9 ++++-- src/target/trx_toolkit/burst_send.py | 9 ++++-- src/target/trx_toolkit/clck_gen.py | 9 ++++-- src/target/trx_toolkit/ctrl_cmd.py | 9 ++++-- src/target/trx_toolkit/ctrl_if.py | 4 ++- src/target/trx_toolkit/ctrl_if_bb.py | 52 ++++++++++++++++++----------------- src/target/trx_toolkit/ctrl_if_bts.py | 40 ++++++++++++++------------- src/target/trx_toolkit/data_dump.py | 41 +++++++++++++++------------ src/target/trx_toolkit/data_msg.py | 28 +++++++++++-------- src/target/trx_toolkit/fake_trx.py | 11 ++++++-- src/target/trx_toolkit/trx_sniff.py | 19 ++++++++----- 12 files changed, 145 insertions(+), 95 deletions(-) (limited to 'src/target') diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py index 31f882e9..3cb6acd0 100644 --- a/src/target/trx_toolkit/burst_fwd.py +++ b/src/target/trx_toolkit/burst_fwd.py @@ -22,6 +22,7 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import logging as log import random from data_msg import * @@ -214,7 +215,7 @@ class BurstForwarder: # Burst dropping if self.burst_dl_drop_amount > 0: if msg.fn % self.burst_dl_drop_period == 0: - print("[~] Simulation: dropping DL burst (fn=%u %% %u == 0)" + log.info("Simulation: dropping DL burst (fn=%u %% %u == 0)" % (msg.fn, self.burst_dl_drop_period)) self.burst_dl_drop_amount -= 1 return None @@ -226,7 +227,7 @@ class BurstForwarder: # Burst dropping if self.burst_ul_drop_amount > 0: if msg.fn % self.burst_ul_drop_period == 0: - print("[~] Simulation: dropping UL burst (fn=%u %% %u == 0)" + log.info("Simulation: dropping UL burst (fn=%u %% %u == 0)" % (msg.fn, self.burst_ul_drop_period)) self.burst_ul_drop_amount -= 1 return None @@ -254,7 +255,7 @@ class BurstForwarder: msg_l12trx = DATAMSG_L12TRX() msg_l12trx.parse_msg(bytearray(msg_raw)) except: - print("[!] Dropping unhandled DL message...") + log.error("Dropping unhandled DL message...") return None # Compose a new message for L1 @@ -320,7 +321,7 @@ class BurstForwarder: # Timeslot filter if msg.tn not in self.ts_pass_list: - print("[!] TS %u is not configured, dropping UL burst..." % msg.tn) + log.warning("TS %u is not configured, dropping UL burst..." % msg.tn) return None # Path loss simulation diff --git a/src/target/trx_toolkit/burst_gen.py b/src/target/trx_toolkit/burst_gen.py index d83f1378..9a17ffa9 100755 --- a/src/target/trx_toolkit/burst_gen.py +++ b/src/target/trx_toolkit/burst_gen.py @@ -26,6 +26,7 @@ from copyright import print_copyright CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy ")] +import logging as log import signal import getopt import sys @@ -64,6 +65,10 @@ class Application: # Set up signal handlers signal.signal(signal.SIGINT, self.sig_handler) + # Configure logging + log.basicConfig(level = log.DEBUG, + format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") + # Open requested capture file if self.output_file is not None: self.ddf = DATADumpFile(self.output_file) @@ -130,7 +135,7 @@ class Application: # Set burst msg.burst = burst - print("[i] Sending %d/%d %s burst %s to %s..." + log.info("Sending %d/%d %s burst %s to %s..." % (i + 1, self.burst_count, self.burst_type, msg.desc_hdr(), self.conn_mode)) @@ -239,7 +244,7 @@ class Application: sys.exit(2) def sig_handler(self, signum, frame): - print("Signal %d received" % signum) + log.info("Signal %d received" % signum) if signum is signal.SIGINT: sys.exit(0) diff --git a/src/target/trx_toolkit/burst_send.py b/src/target/trx_toolkit/burst_send.py index f6c85ba0..787e0fc3 100755 --- a/src/target/trx_toolkit/burst_send.py +++ b/src/target/trx_toolkit/burst_send.py @@ -25,6 +25,7 @@ from copyright import print_copyright CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy ")] +import logging as log import signal import getopt import sys @@ -60,6 +61,10 @@ class Application: # Set up signal handlers signal.signal(signal.SIGINT, self.sig_handler) + # Configure logging + log.basicConfig(level = log.DEBUG, + format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") + # Open requested capture file self.ddf = DATADumpFile(self.capture_file) @@ -88,7 +93,7 @@ class Application: if not self.msg_pass_filter(l12trx, msg): continue - print("[i] Sending a burst %s to %s..." + log.info("Sending a burst %s to %s..." % (msg.desc_hdr(), self.conn_mode)) # Send message @@ -209,7 +214,7 @@ class Application: sys.exit(2) def sig_handler(self, signum, frame): - print("Signal %d received" % signum) + log.info("Signal %d received" % signum) if signum is signal.SIGINT: sys.exit(0) diff --git a/src/target/trx_toolkit/clck_gen.py b/src/target/trx_toolkit/clck_gen.py index b488770e..56207f46 100755 --- a/src/target/trx_toolkit/clck_gen.py +++ b/src/target/trx_toolkit/clck_gen.py @@ -25,6 +25,7 @@ from copyright import print_copyright CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy ")] +import logging as log import signal import time import sys @@ -83,7 +84,7 @@ class CLCKGen: link.send(payload) # Debug print - print("[T] %s" % payload) + log.debug(payload) # Increase frame count self.clck_src += self.ind_period @@ -101,13 +102,17 @@ class Application: # Set up signal handlers signal.signal(signal.SIGINT, self.sig_handler) + # Configure logging + log.basicConfig(level = log.DEBUG, + format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") + def run(self): self.link = UDPLink("127.0.0.1", 5800, "0.0.0.0", 5700) self.clck = CLCKGen([self.link], ind_period = 51) self.clck.start() def sig_handler(self, signum, frame): - print("Signal %d received" % signum) + log.info("Signal %d received" % signum) if signum is signal.SIGINT: self.clck.stop() diff --git a/src/target/trx_toolkit/ctrl_cmd.py b/src/target/trx_toolkit/ctrl_cmd.py index e56105a8..ec683be7 100755 --- a/src/target/trx_toolkit/ctrl_cmd.py +++ b/src/target/trx_toolkit/ctrl_cmd.py @@ -26,6 +26,7 @@ from copyright import print_copyright CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy ")] +import logging as log import signal import getopt import select @@ -48,12 +49,16 @@ class Application: # Set up signal handlers signal.signal(signal.SIGINT, self.sig_handler) + # Configure logging + log.basicConfig(level = log.DEBUG, + format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") + # Init UDP connection self.ctrl_link = UDPLink(self.remote_addr, self.base_port + 1, self.bind_addr, self.bind_port) # Debug print - print("[i] Init CTRL interface (%s)" \ + log.info("Init CTRL interface (%s)" \ % self.ctrl_link.desc_link()) def print_help(self, msg = None): @@ -138,7 +143,7 @@ class Application: sys.stdout.flush() def sig_handler(self, signum, frame): - print("\n\nSignal %d received" % signum) + log.info("Signal %d received" % signum) if signum is signal.SIGINT: sys.exit(0) diff --git a/src/target/trx_toolkit/ctrl_if.py b/src/target/trx_toolkit/ctrl_if.py index 1e569a60..b533746b 100644 --- a/src/target/trx_toolkit/ctrl_if.py +++ b/src/target/trx_toolkit/ctrl_if.py @@ -22,12 +22,14 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import logging as log + from udp_link import UDPLink class CTRLInterface(UDPLink): def handle_rx(self, data, remote): if not self.verify_req(data): - print("[!] Wrong data on CTRL interface") + log.error("Wrong data on CTRL interface") return # Attempt to parse a command diff --git a/src/target/trx_toolkit/ctrl_if_bb.py b/src/target/trx_toolkit/ctrl_if_bb.py index 97a3d9df..aaa12f17 100644 --- a/src/target/trx_toolkit/ctrl_if_bb.py +++ b/src/target/trx_toolkit/ctrl_if_bb.py @@ -22,6 +22,8 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import logging as log + from ctrl_if import CTRLInterface class CTRLInterfaceBB(CTRLInterface): @@ -34,37 +36,37 @@ class CTRLInterfaceBB(CTRLInterface): def __init__(self, remote_addr, remote_port, bind_addr, bind_port): CTRLInterface.__init__(self, remote_addr, remote_port, bind_addr, bind_port) - print("[i] Init CTRL interface for BB (%s)" % self.desc_link()) + log.info("Init CTRL interface for BB (%s)" % self.desc_link()) def parse_cmd(self, request): # Power control if self.verify_cmd(request, "POWERON", 0): - print("[i] Recv POWERON CMD") + log.debug("Recv POWERON CMD") # Ensure transceiver isn't working if self.trx_started: - print("[!] Transceiver already started") + log.error("Transceiver already started") return -1 # Ensure RX / TX freq. are set if (self.rx_freq is None) or (self.tx_freq is None): - print("[!] RX / TX freq. are not set") + log.error("RX / TX freq. are not set") return -1 - print("[i] Starting transceiver...") + log.info("Starting transceiver...") self.trx_started = True return 0 elif self.verify_cmd(request, "POWEROFF", 0): - print("[i] Recv POWEROFF cmd") + log.debug("Recv POWEROFF cmd") - print("[i] Stopping transceiver...") + log.info("Stopping transceiver...") self.trx_started = False return 0 # Tuning Control elif self.verify_cmd(request, "RXTUNE", 1): - print("[i] Recv RXTUNE cmd") + log.debug("Recv RXTUNE cmd") # TODO: check freq range self.rx_freq = int(request[1]) * 1000 @@ -72,7 +74,7 @@ class CTRLInterfaceBB(CTRLInterface): return 0 elif self.verify_cmd(request, "TXTUNE", 1): - print("[i] Recv TXTUNE cmd") + log.debug("Recv TXTUNE cmd") # TODO: check freq range self.tx_freq = int(request[1]) * 1000 @@ -80,7 +82,7 @@ class CTRLInterfaceBB(CTRLInterface): # Power measurement elif self.verify_cmd(request, "MEASURE", 1): - print("[i] Recv MEASURE cmd") + log.debug("Recv MEASURE cmd") if self.pm is None: return -1 @@ -92,7 +94,7 @@ class CTRLInterfaceBB(CTRLInterface): return (0, [meas_dbm]) elif self.verify_cmd(request, "SETSLOT", 2): - print("[i] Recv SETSLOT cmd") + log.debug("Recv SETSLOT cmd") if self.burst_fwd is None: return -1 @@ -100,7 +102,7 @@ class CTRLInterfaceBB(CTRLInterface): # Obtain TS index ts = int(request[1]) if ts not in range(0, 8): - print("[!] TS index should be in range: 0..7") + log.error("TS index should be in range: 0..7") return -1 # Parse TS type @@ -113,7 +115,7 @@ class CTRLInterfaceBB(CTRLInterface): if ts in self.burst_fwd.ts_pass_list: self.burst_fwd.ts_pass_list.remove(ts) else: - print("[!] TS %u was not activated before" % ts) + log.warning("TS %u was not activated before" % ts) # TODO: uncomment as soon as RESET is introduced # return -1 else: @@ -121,7 +123,7 @@ class CTRLInterfaceBB(CTRLInterface): if ts not in self.burst_fwd.ts_pass_list: self.burst_fwd.ts_pass_list.append(ts) else: - print("[!] TS %u was already activated before" % ts) + log.warning("TS %u was already activated before" % ts) # TODO: uncomment as soon as RESET is introduced # return -1 @@ -129,7 +131,7 @@ class CTRLInterfaceBB(CTRLInterface): # Timing Advance elif self.verify_cmd(request, "SETTA", 1): - print("[i] Recv SETTA cmd") + log.debug("Recv SETTA cmd") # Save to the BurstForwarder instance self.burst_fwd.ta = ta @@ -138,7 +140,7 @@ class CTRLInterfaceBB(CTRLInterface): # Timing of Arrival simulation for Uplink # Absolute form: CMD FAKE_TOA elif self.verify_cmd(request, "FAKE_TOA", 2): - print("[i] Recv FAKE_TOA cmd") + log.debug("Recv FAKE_TOA cmd") # Parse and apply both base and threshold self.burst_fwd.toa256_ul_base = int(request[1]) @@ -149,7 +151,7 @@ class CTRLInterfaceBB(CTRLInterface): # Timing of Arrival simulation for Uplink # Relative form: CMD FAKE_TOA <+-BASE_DELTA> elif self.verify_cmd(request, "FAKE_TOA", 1): - print("[i] Recv FAKE_TOA cmd") + log.debug("Recv FAKE_TOA cmd") # Parse and apply delta self.burst_fwd.toa256_ul_base += int(request[1]) @@ -159,7 +161,7 @@ class CTRLInterfaceBB(CTRLInterface): # RSSI simulation for Uplink # Absolute form: CMD FAKE_RSSI elif self.verify_cmd(request, "FAKE_RSSI", 2): - print("[i] Recv FAKE_RSSI cmd") + log.debug("Recv FAKE_RSSI cmd") # Parse and apply both base and threshold self.burst_fwd.rssi_ul_base = int(request[1]) @@ -170,7 +172,7 @@ class CTRLInterfaceBB(CTRLInterface): # RSSI simulation for Uplink # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> elif self.verify_cmd(request, "FAKE_RSSI", 1): - print("[i] Recv FAKE_RSSI cmd") + log.debug("Recv FAKE_RSSI cmd") # Parse and apply delta self.burst_fwd.rssi_ul_base += int(request[1]) @@ -181,12 +183,12 @@ class CTRLInterfaceBB(CTRLInterface): # Syntax: CMD FAKE_DROP # Dropping pattern: fn % 1 == 0 elif self.verify_cmd(request, "FAKE_DROP", 1): - print("[i] Recv FAKE_DROP cmd") + log.debug("Recv FAKE_DROP cmd") # Parse / validate amount of bursts num = int(request[1]) if num < 0: - print("[!] FAKE_DROP amount shall not be negative") + log.error("FAKE_DROP amount shall not be negative") return -1 self.burst_fwd.burst_ul_drop_amount = num @@ -198,18 +200,18 @@ class CTRLInterfaceBB(CTRLInterface): # Syntax: CMD FAKE_DROP # Dropping pattern: fn % period == 0 elif self.verify_cmd(request, "FAKE_DROP", 2): - print("[i] Recv FAKE_DROP cmd") + log.debug("Recv FAKE_DROP cmd") # Parse / validate amount of bursts num = int(request[1]) if num < 0: - print("[!] FAKE_DROP amount shall not be negative") + log.error("FAKE_DROP amount shall not be negative") return -1 # Parse / validate period period = int(request[2]) if period <= 0: - print("[!] FAKE_DROP period shall be greater than zero") + log.error("FAKE_DROP period shall be greater than zero") return -1 self.burst_fwd.burst_ul_drop_amount = num @@ -221,5 +223,5 @@ class CTRLInterfaceBB(CTRLInterface): else: # We don't care about other commands, # so let's merely ignore them ;) - print("[i] Ignore CMD %s" % request[0]) + log.debug("Ignore CMD %s" % request[0]) return 0 diff --git a/src/target/trx_toolkit/ctrl_if_bts.py b/src/target/trx_toolkit/ctrl_if_bts.py index 6ac8ffb5..2dde3e3a 100644 --- a/src/target/trx_toolkit/ctrl_if_bts.py +++ b/src/target/trx_toolkit/ctrl_if_bts.py @@ -22,6 +22,8 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import logging as log + from ctrl_if import CTRLInterface class CTRLInterfaceBTS(CTRLInterface): @@ -35,24 +37,24 @@ class CTRLInterfaceBTS(CTRLInterface): def __init__(self, remote_addr, remote_port, bind_addr, bind_port): CTRLInterface.__init__(self, remote_addr, remote_port, bind_addr, bind_port) - print("[i] Init CTRL interface for BTS (%s)" % self.desc_link()) + log.info("Init CTRL interface for BTS (%s)" % self.desc_link()) def parse_cmd(self, request): # Power control if self.verify_cmd(request, "POWERON", 0): - print("[i] Recv POWERON CMD") + log.debug("Recv POWERON CMD") # Ensure transceiver isn't working if self.trx_started: - print("[!] Transceiver already started") + log.error("Transceiver already started") return -1 # Ensure RX / TX freq. are set if (self.rx_freq is None) or (self.tx_freq is None): - print("[!] RX / TX freq. are not set") + log.error("RX / TX freq. are not set") return -1 - print("[i] Starting transceiver...") + log.info("Starting transceiver...") self.trx_started = True # Power emulation @@ -66,9 +68,9 @@ class CTRLInterfaceBTS(CTRLInterface): return 0 elif self.verify_cmd(request, "POWEROFF", 0): - print("[i] Recv POWEROFF cmd") + log.debug("Recv POWEROFF cmd") - print("[i] Stopping transceiver...") + log.info("Stopping transceiver...") self.trx_started = False # Power emulation @@ -83,14 +85,14 @@ class CTRLInterfaceBTS(CTRLInterface): # Tuning Control elif self.verify_cmd(request, "RXTUNE", 1): - print("[i] Recv RXTUNE cmd") + log.debug("Recv RXTUNE cmd") # TODO: check freq range self.rx_freq = int(request[1]) * 1000 return 0 elif self.verify_cmd(request, "TXTUNE", 1): - print("[i] Recv TXTUNE cmd") + log.debug("Recv TXTUNE cmd") # TODO: check freq range self.tx_freq = int(request[1]) * 1000 @@ -100,7 +102,7 @@ class CTRLInterfaceBTS(CTRLInterface): # Timing of Arrival simulation for Downlink # Absolute form: CMD FAKE_TOA elif self.verify_cmd(request, "FAKE_TOA", 2): - print("[i] Recv FAKE_TOA cmd") + log.debug("Recv FAKE_TOA cmd") # Parse and apply both base and threshold self.burst_fwd.toa256_dl_base = int(request[1]) @@ -111,7 +113,7 @@ class CTRLInterfaceBTS(CTRLInterface): # Timing of Arrival simulation for Downlink # Relative form: CMD FAKE_TOA <+-BASE_DELTA> elif self.verify_cmd(request, "FAKE_TOA", 1): - print("[i] Recv FAKE_TOA cmd") + log.debug("Recv FAKE_TOA cmd") # Parse and apply delta self.burst_fwd.toa256_dl_base += int(request[1]) @@ -121,7 +123,7 @@ class CTRLInterfaceBTS(CTRLInterface): # RSSI simulation for Downlink # Absolute form: CMD FAKE_RSSI elif self.verify_cmd(request, "FAKE_RSSI", 2): - print("[i] Recv FAKE_RSSI cmd") + log.debug("Recv FAKE_RSSI cmd") # Parse and apply both base and threshold self.burst_fwd.rssi_dl_base = int(request[1]) @@ -132,7 +134,7 @@ class CTRLInterfaceBTS(CTRLInterface): # RSSI simulation for Downlink # Relative form: CMD FAKE_RSSI <+-BASE_DELTA> elif self.verify_cmd(request, "FAKE_RSSI", 1): - print("[i] Recv FAKE_RSSI cmd") + log.debug("Recv FAKE_RSSI cmd") # Parse and apply delta self.burst_fwd.rssi_dl_base += int(request[1]) @@ -143,12 +145,12 @@ class CTRLInterfaceBTS(CTRLInterface): # Syntax: CMD FAKE_DROP # Dropping pattern: fn % 1 == 0 elif self.verify_cmd(request, "FAKE_DROP", 1): - print("[i] Recv FAKE_DROP cmd") + log.debug("Recv FAKE_DROP cmd") # Parse / validate amount of bursts num = int(request[1]) if num < 0: - print("[!] FAKE_DROP amount shall not be negative") + log.error("FAKE_DROP amount shall not be negative") return -1 self.burst_fwd.burst_dl_drop_amount = num @@ -160,18 +162,18 @@ class CTRLInterfaceBTS(CTRLInterface): # Syntax: CMD FAKE_DROP # Dropping pattern: fn % period == 0 elif self.verify_cmd(request, "FAKE_DROP", 2): - print("[i] Recv FAKE_DROP cmd") + log.debug("Recv FAKE_DROP cmd") # Parse / validate amount of bursts num = int(request[1]) if num < 0: - print("[!] FAKE_DROP amount shall not be negative") + log.error("FAKE_DROP amount shall not be negative") return -1 # Parse / validate period period = int(request[2]) if period <= 0: - print("[!] FAKE_DROP period shall be greater than zero") + log.error("FAKE_DROP period shall be greater than zero") return -1 self.burst_fwd.burst_dl_drop_amount = num @@ -183,5 +185,5 @@ class CTRLInterfaceBTS(CTRLInterface): else: # We don't care about other commands, # so let's merely ignore them ;) - print("[i] Ignore CMD %s" % request[0]) + log.debug("Ignore CMD %s" % request[0]) return 0 diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py index 1d7805e3..71e12612 100644 --- a/src/target/trx_toolkit/data_dump.py +++ b/src/target/trx_toolkit/data_dump.py @@ -22,6 +22,7 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import logging as log import struct from data_msg import * @@ -77,13 +78,15 @@ class DATADumpFile(DATADump): def __init__(self, capture): # Check if capture file is already opened if isinstance(capture, str): - print("[i] Opening capture file '%s'..." % capture) + log.info("Opening capture file '%s'..." % capture) self.f = open(capture, "a+b") else: self.f = capture def __del__(self): - print("[i] Closing the capture file") + # FIXME: this causes an Exception in Python 2 (but not in Python 3) + # AttributeError: 'NoneType' object has no attribute 'info' + log.info("Closing the capture file") self.f.close() # Moves the file descriptor before a specified message @@ -104,7 +107,7 @@ class DATADumpFile(DATADump): # Attempt to parse it rc = self.parse_hdr(hdr_raw) if rc is False: - print("[!] Couldn't parse a message header") + log.error("Couldn't parse a message header") return False # Expand the header @@ -129,7 +132,7 @@ class DATADumpFile(DATADump): # Attempt to parse it rc = self.parse_hdr(hdr_raw) if rc is False: - print("[!] Couldn't parse a message header") + log.error("Couldn't parse a message header") return None # Expand the header @@ -138,7 +141,7 @@ class DATADumpFile(DATADump): # Attempt to read a message msg_raw = self.f.read(msg_len) if len(msg_raw) != msg_len: - print("[!] Message length mismatch") + log.error("Message length mismatch") return None # Attempt to parse a message @@ -146,7 +149,7 @@ class DATADumpFile(DATADump): msg_raw = bytearray(msg_raw) msg.parse_msg(msg_raw) except: - print("[!] Couldn't parse a message, skipping...") + log.error("Couldn't parse a message, skipping...") return False # Success @@ -161,7 +164,7 @@ class DATADumpFile(DATADump): # Move descriptor to the begining of requested message rc = self._seek2msg(idx) if not rc: - print("[!] Couldn't find requested message") + log.error("Couldn't find requested message") return False # Attempt to parse a message @@ -181,7 +184,7 @@ class DATADumpFile(DATADump): else: rc = self._seek2msg(skip) if not rc: - print("[!] Couldn't find requested message") + log.error("Couldn't find requested message") return False # Read the capture in loop... @@ -224,6 +227,10 @@ if __name__ == '__main__': from gsm_shared import * import random + # Configure logging + log.basicConfig(level = log.DEBUG, + format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") + # Create a temporary file tf = TemporaryFile() @@ -242,7 +249,7 @@ if __name__ == '__main__': burst_trx2l1.append(sbit) # Generate a basic list of random messages - print("[i] Generating the reference messages") + log.info("Generating the reference messages") messages_ref = [] for i in range(100): @@ -260,9 +267,9 @@ if __name__ == '__main__': # Append messages_ref.append(msg) - print("[i] Adding the following messages to the capture:") + log.info("Adding the following messages to the capture:") for msg in messages_ref[:3]: - print(" %s: burst_len=%d" + log.info("%s: burst_len=%d" % (msg.desc_hdr(), len(msg.burst))) # Check single message appending @@ -273,9 +280,9 @@ if __name__ == '__main__': # Read the written messages back messages_check = ddf.parse_all() - print("[i] Read the following messages back:") + log.info("Read the following messages back:") for msg in messages_check: - print(" %s: burst_len=%d" + log.info("%s: burst_len=%d" % (msg.desc_hdr(), len(msg.burst))) # Expecting three messages @@ -291,7 +298,7 @@ if __name__ == '__main__': # Validate a message assert(messages_check[i].validate()) - print("[?] Check append_msg(): OK") + log.info("Check append_msg(): OK") # Append the pending reference messages @@ -313,7 +320,7 @@ if __name__ == '__main__': # Validate a message assert(messages_check[i].validate()) - print("[?] Check append_all(): OK") + log.info("Check append_all(): OK") # Check parse_msg() @@ -336,7 +343,7 @@ if __name__ == '__main__': assert(msg0.validate()) assert(msg10.validate()) - print("[?] Check parse_msg(): OK") + log.info("Check parse_msg(): OK") # Check parse_all() with range @@ -357,4 +364,4 @@ if __name__ == '__main__': # Validate a message assert(messages_check[i].validate()) - print("[?] Check parse_all(): OK") + log.info("Check parse_all(): OK") diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py index ea415ab9..95ec9dc1 100644 --- a/src/target/trx_toolkit/data_msg.py +++ b/src/target/trx_toolkit/data_msg.py @@ -431,6 +431,12 @@ class DATAMSG_TRX2L1(DATAMSG): # Regression test if __name__ == '__main__': + import logging as log + + # Configure logging + log.basicConfig(level = log.DEBUG, + format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") + # Common reference data fn = 1024 tn = 0 @@ -446,7 +452,7 @@ if __name__ == '__main__': sbit = random.randint(-127, 127) burst_trx2l1_ref.append(sbit) - print("[i] Generating the reference messages") + log.info("Generating the reference messages") # Create messages of both types msg_l12trx_ref = DATAMSG_L12TRX(fn = fn, tn = tn) @@ -461,13 +467,13 @@ if __name__ == '__main__': msg_l12trx_ref.burst = burst_l12trx_ref msg_trx2l1_ref.burst = burst_trx2l1_ref - print("[i] Encoding the reference messages") + log.info("Encoding the reference messages") # Encode DATA messages l12trx_raw = msg_l12trx_ref.gen_msg() trx2l1_raw = msg_trx2l1_ref.gen_msg() - print("[i] Parsing generated messages back") + log.info("Parsing generated messages back") # Parse generated DATA messages msg_l12trx_dec = DATAMSG_L12TRX() @@ -475,13 +481,13 @@ if __name__ == '__main__': msg_l12trx_dec.parse_msg(l12trx_raw) msg_trx2l1_dec.parse_msg(trx2l1_raw) - print("[i] Comparing decoded messages with the reference") + log.info("Comparing decoded messages with the reference") # Compare bursts assert(msg_l12trx_dec.burst == burst_l12trx_ref) assert(msg_trx2l1_dec.burst == burst_trx2l1_ref) - print("[?] Compare bursts: OK") + log.info("Compare bursts: OK") # Compare both parsed messages with the reference data assert(msg_l12trx_dec.fn == fn) @@ -489,14 +495,14 @@ if __name__ == '__main__': assert(msg_l12trx_dec.tn == tn) assert(msg_trx2l1_dec.tn == tn) - print("[?] Compare FN / TN: OK") + log.info("Compare FN / TN: OK") # Compare message specific parts assert(msg_trx2l1_dec.rssi == msg_trx2l1_ref.rssi) assert(msg_l12trx_dec.pwr == msg_l12trx_ref.pwr) assert(msg_trx2l1_dec.toa256 == msg_trx2l1_ref.toa256) - print("[?] Compare message specific data: OK") + log.info("Compare message specific data: OK") # Validate header randomization for i in range(0, 100): @@ -506,7 +512,7 @@ if __name__ == '__main__': assert(msg_l12trx_ref.validate()) assert(msg_trx2l1_ref.validate()) - print("[?] Validate header randomization: OK") + log.info("Validate header randomization: OK") # Bit conversation test usbits_ref = list(range(0, 256)) @@ -518,7 +524,7 @@ if __name__ == '__main__': assert(usbits[:255] == usbits_ref[:255]) assert(usbits[255] == 254) - print("[?] Check both usbit2sbit() and sbit2usbit(): OK") + log.info("Check both usbit2sbit() and sbit2usbit(): OK") # Test both sbit2ubit() and ubit2sbit() ubits = msg_trx2l1_ref.sbit2ubit(sbits_ref) @@ -527,7 +533,7 @@ if __name__ == '__main__': sbits = msg_trx2l1_ref.ubit2sbit(ubits) assert(sbits == ([-127] * 127 + [127] * 128)) - print("[?] Check both sbit2ubit() and ubit2sbit(): OK") + log.info("Check both sbit2ubit() and ubit2sbit(): OK") # Test message transformation msg_l12trx_dec = msg_trx2l1_ref.gen_l12trx() @@ -542,4 +548,4 @@ if __name__ == '__main__': assert(msg_l12trx_dec.burst == msg_l12trx_dec.sbit2ubit(burst_trx2l1_ref)) assert(msg_trx2l1_dec.burst == msg_trx2l1_dec.ubit2sbit(burst_l12trx_ref)) - print("[?] Check L12TRX <-> TRX2L1 type transformations: OK") + log.info("Check L12TRX <-> TRX2L1 type transformations: OK") diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py index d99186b3..ced12381 100755 --- a/src/target/trx_toolkit/fake_trx.py +++ b/src/target/trx_toolkit/fake_trx.py @@ -25,6 +25,7 @@ from copyright import print_copyright CR_HOLDERS = [("2017-2018", "Vadim Yanitskiy ")] +import logging as log import signal import getopt import select @@ -53,6 +54,10 @@ class Application: # Set up signal handlers signal.signal(signal.SIGINT, self.sig_handler) + # Configure logging + log.basicConfig(level = log.DEBUG, + format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") + def run(self): # Init TRX CTRL interface for BTS self.bts_ctrl = CTRLInterfaceBTS(self.bts_addr, self.bts_base_port + 101, @@ -90,7 +95,7 @@ class Application: self.clck_gen = CLCKGen([self.bts_clck]) self.bts_ctrl.clck_gen = self.clck_gen - print("[i] Init complete") + log.info("Init complete") # Enter main loop while True: @@ -119,7 +124,7 @@ class Application: self.bb_ctrl.handle_rx(data.decode(), addr) def shutdown(self): - print("[i] Shutting down...") + log.info("Shutting down...") # Stop clock generator self.clck_gen.stop() @@ -198,7 +203,7 @@ class Application: sys.exit(2) def sig_handler(self, signum, frame): - print("Signal %d received" % signum) + log.info("Signal %d received" % signum) if signum is signal.SIGINT: self.shutdown() sys.exit(0) diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py index 535bb3f9..98509839 100755 --- a/src/target/trx_toolkit/trx_sniff.py +++ b/src/target/trx_toolkit/trx_sniff.py @@ -25,6 +25,7 @@ from copyright import print_copyright CR_HOLDERS = [("2018", "Vadim Yanitskiy ")] +import logging as log import signal import getopt import sys @@ -67,6 +68,10 @@ class Application: print_copyright(CR_HOLDERS) self.parse_argv() + # Configure logging + log.basicConfig(level = log.DEBUG, + format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") + # Open requested capture file if self.output_file is not None: self.ddf = DATADumpFile(self.output_file) @@ -76,7 +81,7 @@ class Application: pkt_filter = "udp and (port %d or port %d)" \ % (self.sniff_base_port + 2, self.sniff_base_port + 102) - print("[i] Listening on interface '%s'..." % self.sniff_interface) + log.info("Listening on interface '%s'..." % self.sniff_interface) # Start sniffing... scapy.all.sniff(iface = self.sniff_interface, store = 0, @@ -110,7 +115,7 @@ class Application: try: msg.parse_msg(msg_raw) except: - print("[!] Failed to parse message, dropping...") + log.warning("Failed to parse message, dropping...") self.cnt_burst_dropped_num += 1 return @@ -121,7 +126,7 @@ class Application: return # Debug print - print("[i] %s burst: %s" \ + log.debug("%s burst: %s" \ % ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr())) # Poke message handler @@ -177,22 +182,22 @@ class Application: # Stop sniffing after N bursts if self.cnt_burst_break is not None: if self.cnt_burst_num == self.cnt_burst_break: - print("[i] Collected required amount of bursts") + log.info("Collected required amount of bursts") return True # Stop sniffing after N frames if self.cnt_frame_break is not None: if self.cnt_frame_num == self.cnt_frame_break: - print("[i] Collected required amount of frames") + log.info("Collected required amount of frames") return True return False def shutdown(self): - print("[i] Shutting down...") + log.info("Shutting down...") # Print statistics - print("[i] %u bursts handled, %u dropped" \ + log.info("%u bursts handled, %u dropped" \ % (self.cnt_burst_num, self.cnt_burst_dropped_num)) # Exit -- cgit v1.2.3