summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/data_dump.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/target/trx_toolkit/data_dump.py')
-rw-r--r--src/target/trx_toolkit/data_dump.py188
1 files changed, 17 insertions, 171 deletions
diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py
index da42023b..8475ceb2 100644
--- a/src/target/trx_toolkit/data_dump.py
+++ b/src/target/trx_toolkit/data_dump.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
# -*- coding: utf-8 -*-
# TRX Toolkit
@@ -17,10 +16,6 @@
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import logging as log
import struct
@@ -29,18 +24,18 @@ from data_msg import *
class DATADump:
# Constants
- TAG_L12TRX = b'\x01'
- TAG_TRX2L1 = b'\x02'
+ TAG_TxMsg = b'\x01'
+ TAG_RxMsg = b'\x02'
HDR_LENGTH = 3
# Generates raw bytes from a DATA message
# Return value: raw message bytes
def dump_msg(self, msg):
# Determine a message type
- if isinstance(msg, DATAMSG_L12TRX):
- tag = self.TAG_L12TRX
- elif isinstance(msg, DATAMSG_TRX2L1):
- tag = self.TAG_TRX2L1
+ if isinstance(msg, TxMsg):
+ tag = self.TAG_TxMsg
+ elif isinstance(msg, RxMsg):
+ tag = self.TAG_RxMsg
else:
raise ValueError("Unknown message type")
@@ -62,12 +57,10 @@ class DATADump:
tag = hdr[:1]
# Check if tag is known
- if tag == self.TAG_L12TRX:
- # L1 -> TRX
- msg = DATAMSG_L12TRX()
- elif tag == self.TAG_TRX2L1:
- # TRX -> L1
- msg = DATAMSG_TRX2L1()
+ if tag == self.TAG_TxMsg:
+ msg = TxMsg()
+ elif tag == self.TAG_RxMsg:
+ msg = RxMsg()
else:
# Unknown tag
return False
@@ -84,8 +77,6 @@ class DATADumpFile(DATADump):
self.f = capture
def __del__(self):
- # FIXME: this causes an Exception in Python 2 (but not in Python 3)
- # AttributeError: 'NoneType' object has no attribute 'info'
log.info("Closing the capture file")
self.f.close()
@@ -94,11 +85,11 @@ class DATADumpFile(DATADump):
# True in case of success,
# or False in case of EOF or header parsing error.
def _seek2msg(self, idx):
- # Seek to the begining of the capture
+ # Seek to the beginning of the capture
self.f.seek(0)
# Read the capture in loop...
- for i in range(idx):
+ for _ in range(idx):
# Attempt to read a message header
hdr_raw = self.f.read(self.HDR_LENGTH)
if len(hdr_raw) != self.HDR_LENGTH:
@@ -158,14 +149,14 @@ class DATADumpFile(DATADump):
# Parses a particular message defined by index idx
# Return value:
# a parsed message in case of success,
- # or None in case of EOF or header parsing error,
- # or False in case of message parsing error or out of range.
+ # or None in case of EOF, out of range, or header parsing error,
+ # or False in case of message parsing error.
def parse_msg(self, idx):
- # Move descriptor to the begining of requested message
+ # Move descriptor to the beginning of requested message
rc = self._seek2msg(idx)
if not rc:
log.error("Couldn't find requested message")
- return False
+ return None
# Attempt to parse a message
return self._parse_msg()
@@ -179,7 +170,7 @@ class DATADumpFile(DATADump):
# Should we skip some messages?
if skip is None:
- # Seek to the begining of the capture
+ # Seek to the beginning of the capture
self.f.seek(0)
else:
rc = self._seek2msg(skip)
@@ -220,148 +211,3 @@ class DATADumpFile(DATADump):
def append_all(self, msgs):
for msg in msgs:
self.append_msg(msg)
-
-# Regression tests
-if __name__ == '__main__':
- from tempfile import TemporaryFile
- from gsm_shared import *
- import random
-
- # Configure logging
- log.basicConfig(level = log.DEBUG,
- format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
-
- # Create a temporary file
- tf = TemporaryFile()
-
- # Create an instance of DATA dump manager
- ddf = DATADumpFile(tf)
-
- # Generate two random bursts
- burst_l12trx = []
- burst_trx2l1 = []
-
- for i in range(0, GSM_BURST_LEN):
- ubit = random.randint(0, 1)
- burst_l12trx.append(ubit)
-
- sbit = random.randint(-127, 127)
- burst_trx2l1.append(sbit)
-
- # Generate a basic list of random messages
- log.info("Generating the reference messages")
- messages_ref = []
-
- for i in range(100):
- # Create a message
- if i % 2:
- msg = DATAMSG_L12TRX()
- msg.burst = burst_l12trx
- else:
- msg = DATAMSG_TRX2L1()
- msg.burst = burst_trx2l1
-
- # Randomize the header
- msg.rand_hdr()
-
- # Append
- messages_ref.append(msg)
-
- log.info("Adding the following messages to the capture:")
- for msg in messages_ref[:3]:
- log.info("%s: burst_len=%d"
- % (msg.desc_hdr(), len(msg.burst)))
-
- # Check single message appending
- ddf.append_msg(messages_ref[0])
- ddf.append_msg(messages_ref[1])
- ddf.append_msg(messages_ref[2])
-
- # Read the written messages back
- messages_check = ddf.parse_all()
-
- log.info("Read the following messages back:")
- for msg in messages_check:
- log.info("%s: burst_len=%d"
- % (msg.desc_hdr(), len(msg.burst)))
-
- # Expecting three messages
- assert(len(messages_check) == 3)
-
- # Check the messages
- for i in range(3):
- # Compare common header parts and bursts
- assert(messages_check[i].burst == messages_ref[i].burst)
- assert(messages_check[i].fn == messages_ref[i].fn)
- assert(messages_check[i].tn == messages_ref[i].tn)
-
- # Validate a message
- messages_check[i].validate()
-
- log.info("Check append_msg(): OK")
-
-
- # Append the pending reference messages
- ddf.append_all(messages_ref[3:])
-
- # Read the written messages back
- messages_check = ddf.parse_all()
-
- # Check the final amount
- assert(len(messages_check) == len(messages_ref))
-
- # Check the messages
- for i in range(len(messages_check)):
- # Compare common header parts and bursts
- assert(messages_check[i].burst == messages_ref[i].burst)
- assert(messages_check[i].fn == messages_ref[i].fn)
- assert(messages_check[i].tn == messages_ref[i].tn)
-
- # Validate a message
- messages_check[i].validate()
-
- log.info("Check append_all(): OK")
-
-
- # Check parse_msg()
- msg0 = ddf.parse_msg(0)
- msg10 = ddf.parse_msg(10)
-
- # Make sure parsing was successful
- assert(msg0 and msg10)
-
- # Compare common header parts and bursts
- assert(msg0.burst == messages_ref[0].burst)
- assert(msg0.fn == messages_ref[0].fn)
- assert(msg0.tn == messages_ref[0].tn)
-
- assert(msg10.burst == messages_ref[10].burst)
- assert(msg10.fn == messages_ref[10].fn)
- assert(msg10.tn == messages_ref[10].tn)
-
- # Validate both messages
- msg0.validate()
- msg10.validate()
-
- log.info("Check parse_msg(): OK")
-
-
- # Check parse_all() with range
- messages_check = ddf.parse_all(skip = 10, count = 20)
-
- # Make sure parsing was successful
- assert(messages_check)
-
- # Check the amount
- assert(len(messages_check) == 20)
-
- for i in range(20):
- # Compare common header parts and bursts
- assert(messages_check[i].burst == messages_ref[i + 10].burst)
- assert(messages_check[i].fn == messages_ref[i + 10].fn)
- assert(messages_check[i].tn == messages_ref[i + 10].tn)
-
- # Validate a message
- messages_check[i].validate()
-
- log.info("Check parse_all(): OK")