diff options
Diffstat (limited to 'src/target/trx_toolkit/data_dump.py')
-rw-r--r-- | src/target/trx_toolkit/data_dump.py | 188 |
1 files changed, 17 insertions, 171 deletions
diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py index da42023b..8475ceb2 100644 --- a/src/target/trx_toolkit/data_dump.py +++ b/src/target/trx_toolkit/data_dump.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- # TRX Toolkit @@ -17,10 +16,6 @@ # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import logging as log import struct @@ -29,18 +24,18 @@ from data_msg import * class DATADump: # Constants - TAG_L12TRX = b'\x01' - TAG_TRX2L1 = b'\x02' + TAG_TxMsg = b'\x01' + TAG_RxMsg = b'\x02' HDR_LENGTH = 3 # Generates raw bytes from a DATA message # Return value: raw message bytes def dump_msg(self, msg): # Determine a message type - if isinstance(msg, DATAMSG_L12TRX): - tag = self.TAG_L12TRX - elif isinstance(msg, DATAMSG_TRX2L1): - tag = self.TAG_TRX2L1 + if isinstance(msg, TxMsg): + tag = self.TAG_TxMsg + elif isinstance(msg, RxMsg): + tag = self.TAG_RxMsg else: raise ValueError("Unknown message type") @@ -62,12 +57,10 @@ class DATADump: tag = hdr[:1] # Check if tag is known - if tag == self.TAG_L12TRX: - # L1 -> TRX - msg = DATAMSG_L12TRX() - elif tag == self.TAG_TRX2L1: - # TRX -> L1 - msg = DATAMSG_TRX2L1() + if tag == self.TAG_TxMsg: + msg = TxMsg() + elif tag == self.TAG_RxMsg: + msg = RxMsg() else: # Unknown tag return False @@ -84,8 +77,6 @@ class DATADumpFile(DATADump): self.f = capture def __del__(self): - # FIXME: this causes an Exception in Python 2 (but not in Python 3) - # AttributeError: 'NoneType' object has no attribute 'info' log.info("Closing the capture file") self.f.close() @@ -94,11 +85,11 @@ class DATADumpFile(DATADump): # True in case of success, # or False in case of EOF or header parsing error. def _seek2msg(self, idx): - # Seek to the begining of the capture + # Seek to the beginning of the capture self.f.seek(0) # Read the capture in loop... - for i in range(idx): + for _ in range(idx): # Attempt to read a message header hdr_raw = self.f.read(self.HDR_LENGTH) if len(hdr_raw) != self.HDR_LENGTH: @@ -158,14 +149,14 @@ class DATADumpFile(DATADump): # Parses a particular message defined by index idx # Return value: # a parsed message in case of success, - # or None in case of EOF or header parsing error, - # or False in case of message parsing error or out of range. + # or None in case of EOF, out of range, or header parsing error, + # or False in case of message parsing error. def parse_msg(self, idx): - # Move descriptor to the begining of requested message + # Move descriptor to the beginning of requested message rc = self._seek2msg(idx) if not rc: log.error("Couldn't find requested message") - return False + return None # Attempt to parse a message return self._parse_msg() @@ -179,7 +170,7 @@ class DATADumpFile(DATADump): # Should we skip some messages? if skip is None: - # Seek to the begining of the capture + # Seek to the beginning of the capture self.f.seek(0) else: rc = self._seek2msg(skip) @@ -220,148 +211,3 @@ class DATADumpFile(DATADump): def append_all(self, msgs): for msg in msgs: self.append_msg(msg) - -# Regression tests -if __name__ == '__main__': - from tempfile import TemporaryFile - from gsm_shared import * - import random - - # Configure logging - log.basicConfig(level = log.DEBUG, - format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") - - # Create a temporary file - tf = TemporaryFile() - - # Create an instance of DATA dump manager - ddf = DATADumpFile(tf) - - # Generate two random bursts - burst_l12trx = [] - burst_trx2l1 = [] - - for i in range(0, GSM_BURST_LEN): - ubit = random.randint(0, 1) - burst_l12trx.append(ubit) - - sbit = random.randint(-127, 127) - burst_trx2l1.append(sbit) - - # Generate a basic list of random messages - log.info("Generating the reference messages") - messages_ref = [] - - for i in range(100): - # Create a message - if i % 2: - msg = DATAMSG_L12TRX() - msg.burst = burst_l12trx - else: - msg = DATAMSG_TRX2L1() - msg.burst = burst_trx2l1 - - # Randomize the header - msg.rand_hdr() - - # Append - messages_ref.append(msg) - - log.info("Adding the following messages to the capture:") - for msg in messages_ref[:3]: - log.info("%s: burst_len=%d" - % (msg.desc_hdr(), len(msg.burst))) - - # Check single message appending - ddf.append_msg(messages_ref[0]) - ddf.append_msg(messages_ref[1]) - ddf.append_msg(messages_ref[2]) - - # Read the written messages back - messages_check = ddf.parse_all() - - log.info("Read the following messages back:") - for msg in messages_check: - log.info("%s: burst_len=%d" - % (msg.desc_hdr(), len(msg.burst))) - - # Expecting three messages - assert(len(messages_check) == 3) - - # Check the messages - for i in range(3): - # Compare common header parts and bursts - assert(messages_check[i].burst == messages_ref[i].burst) - assert(messages_check[i].fn == messages_ref[i].fn) - assert(messages_check[i].tn == messages_ref[i].tn) - - # Validate a message - messages_check[i].validate() - - log.info("Check append_msg(): OK") - - - # Append the pending reference messages - ddf.append_all(messages_ref[3:]) - - # Read the written messages back - messages_check = ddf.parse_all() - - # Check the final amount - assert(len(messages_check) == len(messages_ref)) - - # Check the messages - for i in range(len(messages_check)): - # Compare common header parts and bursts - assert(messages_check[i].burst == messages_ref[i].burst) - assert(messages_check[i].fn == messages_ref[i].fn) - assert(messages_check[i].tn == messages_ref[i].tn) - - # Validate a message - messages_check[i].validate() - - log.info("Check append_all(): OK") - - - # Check parse_msg() - msg0 = ddf.parse_msg(0) - msg10 = ddf.parse_msg(10) - - # Make sure parsing was successful - assert(msg0 and msg10) - - # Compare common header parts and bursts - assert(msg0.burst == messages_ref[0].burst) - assert(msg0.fn == messages_ref[0].fn) - assert(msg0.tn == messages_ref[0].tn) - - assert(msg10.burst == messages_ref[10].burst) - assert(msg10.fn == messages_ref[10].fn) - assert(msg10.tn == messages_ref[10].tn) - - # Validate both messages - msg0.validate() - msg10.validate() - - log.info("Check parse_msg(): OK") - - - # Check parse_all() with range - messages_check = ddf.parse_all(skip = 10, count = 20) - - # Make sure parsing was successful - assert(messages_check) - - # Check the amount - assert(len(messages_check) == 20) - - for i in range(20): - # Compare common header parts and bursts - assert(messages_check[i].burst == messages_ref[i + 10].burst) - assert(messages_check[i].fn == messages_ref[i + 10].fn) - assert(messages_check[i].tn == messages_ref[i + 10].tn) - - # Validate a message - messages_check[i].validate() - - log.info("Check parse_all(): OK") |