From 6af5d8da2c3df768b9a00235d70f433be08e525b Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Tue, 27 Aug 2019 20:24:19 +0200 Subject: trx_toolkit/data_msg.py: raise exceptions from validate() methods Raising exceptions is a Pythonic way to handle errors, which in this particular case will help us to know *why* exactly a given message is incorrect or incomplete. Change-Id: Ia961f83c717066af61699c80536468392b8ce064 --- src/target/trx_toolkit/data_dump.py | 10 +-- src/target/trx_toolkit/data_if.py | 3 +- src/target/trx_toolkit/data_msg.py | 122 ++++++++++++++++++++---------------- 3 files changed, 74 insertions(+), 61 deletions(-) diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py index 97518bdf..da42023b 100644 --- a/src/target/trx_toolkit/data_dump.py +++ b/src/target/trx_toolkit/data_dump.py @@ -296,7 +296,7 @@ if __name__ == '__main__': assert(messages_check[i].tn == messages_ref[i].tn) # Validate a message - assert(messages_check[i].validate()) + messages_check[i].validate() log.info("Check append_msg(): OK") @@ -318,7 +318,7 @@ if __name__ == '__main__': assert(messages_check[i].tn == messages_ref[i].tn) # Validate a message - assert(messages_check[i].validate()) + messages_check[i].validate() log.info("Check append_all(): OK") @@ -340,8 +340,8 @@ if __name__ == '__main__': assert(msg10.tn == messages_ref[10].tn) # Validate both messages - assert(msg0.validate()) - assert(msg10.validate()) + msg0.validate() + msg10.validate() log.info("Check parse_msg(): OK") @@ -362,6 +362,6 @@ if __name__ == '__main__': assert(messages_check[i].tn == messages_ref[i + 10].tn) # Validate a message - assert(messages_check[i].validate()) + messages_check[i].validate() log.info("Check parse_all(): OK") diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py index e43b3884..418c3f8a 100644 --- a/src/target/trx_toolkit/data_if.py +++ b/src/target/trx_toolkit/data_if.py @@ -107,8 +107,7 @@ class DATAInterface(UDPLink): def send_msg(self, msg, legacy = False): # Validate a message - if not msg.validate(): - raise ValueError("Message incomplete or incorrect") + msg.validate() # Generate TRX message payload = msg.gen_msg(legacy) diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py index de5f9089..a1046ec9 100644 --- a/src/target/trx_toolkit/data_msg.py +++ b/src/target/trx_toolkit/data_msg.py @@ -227,30 +227,27 @@ class DATAMSG: return buf - # Validates the message fields + # Validates the message fields (throws ValueError) def validate(self): if not self.ver in self.known_versions: - return False + raise ValueError("Unknown TRXD header version") if self.fn is None: - return False + raise ValueError("TDMA frame-number is not set") if self.fn < 0 or self.fn > GSM_HYPERFRAME: - return False + raise ValueError("TDMA frame-number %d is out of range" % self.fn) if self.tn is None: - return False + raise ValueError("TDMA time-slot is not set") if self.tn < 0 or self.tn > 7: - return False - - return True + raise ValueError("TDMA time-slot %d is out of range" % self.tn) # Generates a TRX DATA message def gen_msg(self, legacy = False): # Validate all the fields - if not self.validate(): - raise ValueError("Message incomplete or incorrect") + self.validate() # Allocate an empty byte-array buf = bytearray() @@ -342,27 +339,24 @@ class DATAMSG_L12TRX(DATAMSG): return length - # Validates the message fields + # Validates the message fields (throws ValueError) def validate(self): # Validate common fields - if not DATAMSG.validate(self): - return False + DATAMSG.validate(self) if self.pwr is None: - return False + raise ValueError("Tx Attenuation level is not set") if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX: - return False + raise ValueError("Tx Attenuation %d is out of range" % self.pwr) # FIXME: properly handle IDLE / NOPE indications if self.burst is None: - return False + raise ValueError("Tx burst bits are not set") # FIXME: properly handle IDLE / NOPE indications if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN): - return False - - return True + raise ValueError("Tx burst has odd length %u" % len(self.burst)) # Generates a random power level def rand_pwr(self, min = None, max = None): @@ -582,85 +576,77 @@ class DATAMSG_TRX2L1(DATAMSG): def _validate_burst_v0(self): # Burst is mandatory if self.burst is None: - return False + raise ValueError("Rx burst bits are not set") # ... and can be either of GSM (GMSK) or EDGE (8-PSK) if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN): - return False - - return True + raise ValueError("Rx burst has odd length %u" % len(self.burst)) def _validate_burst_v1(self): # Burst is omitted in case of an IDLE / NOPE indication if self.nope_ind and self.burst is None: - return True - if self.nope_ind and self.burst is not None: - return False + return + if self.nope_ind and self.burst is not None: + raise ValueError("NOPE.ind comes with burst?!?") if self.burst is None: - return False + raise ValueError("Rx burst bits are not set") # Burst length depends on modulation type if len(self.burst) != self.mod_type.bl: - return False + raise ValueError("Rx burst has odd length %u" % len(self.burst)) - return True - - # Validates the burst + # Validates the burst (throws ValueError) def validate_burst(self): if self.ver == 0x00: - return self._validate_burst_v0() + self._validate_burst_v0() elif self.ver >= 0x01: - return self._validate_burst_v1() + self._validate_burst_v1() - # Validates the message header fields + # Validates the message header fields (throws ValueError) def validate(self): # Validate common fields - if not DATAMSG.validate(self): - return False + DATAMSG.validate(self) if self.rssi is None: - return False + raise ValueError("RSSI is not set") if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX: - return False + raise ValueError("RSSI %d is out of range" % self.rssi) if self.toa256 is None: - return False + raise ValueError("ToA256 is not set") if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX: - return False + raise ValueError("ToA256 %d is out of range" % self.toa256) if self.ver >= 0x01: if type(self.mod_type) is not Modulation: - return False + raise ValueError("Unknown Rx modulation type") if self.tsc_set is None: - return False + raise ValueError("TSC set is not set") if self.mod_type is Modulation.ModGMSK: if self.tsc_set not in range(0, 4): - return False + raise ValueError("TSC set %d is out of range" % self.tsc_set) else: if self.tsc_set not in range(0, 2): - return False + raise ValueError("TSC set %d is out of range" % self.tsc_set) if self.tsc is None: - return False + raise ValueError("TSC is not set") if self.tsc not in self.TSC_RANGE: - return False + raise ValueError("TSC %d is out of range" % self.tsc) if self.ci is None: - return False + raise ValueError("C/I is not set") if self.ci < self.CI_MIN or self.ci > self.CI_MAX: - return False + raise ValueError("C/I %d is out of range" % self.ci) - if not self.validate_burst(): - return False - - return True + self.validate_burst() # Generates a random RSSI value def rand_rssi(self, min = None, max = None): @@ -886,11 +872,39 @@ if __name__ == '__main__': msg_l12trx_ref.rand_burst() msg_trx2l1_ref.rand_burst() - assert(msg_l12trx_ref.validate()) - assert(msg_trx2l1_ref.validate()) + msg_l12trx_ref.validate() + msg_trx2l1_ref.validate() log.info("Validate header randomization: OK") + # Test error handling for common fields + msg = DATAMSG() + + # Make sure that message validation throws a ValueError + def validate_throw(msg): + try: + msg.validate() + return False + except ValueError: + return True + + # Unknown version + msg.rand_hdr() + msg.ver = 100 + assert(validate_throw(msg)) + + # Uninitialized field + msg.rand_hdr() + msg.fn = None + assert(validate_throw(msg)) + + # Out-of-range value + msg.rand_hdr() + msg.tn = 10 + assert(validate_throw(msg)) + + log.info("Check incorrect message validation: OK") + log.info("Encoding the reference messages") # Encode DATA messages -- cgit v1.2.3