summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVadim Yanitskiy <axilirator@gmail.com>2018-02-20 17:36:09 +0700
committerVadim Yanitskiy <axilirator@gmail.com>2018-02-20 17:48:29 +0700
commitafd110a3b5d8a565bffb251f9e031b56605558ac (patch)
tree1dbc1b610859b905621ccf19eaccd6a3835ac8af
parent041bfc0b0313c2b89d3ff0488d3d5f54d22fa197 (diff)
fake_trx: implement classes for DATA capture menagement
This change introduces the following classes: - DATADump - basic class, which contains methods to generate and parse the a message header, and some constants. - DATADumpFile - a child class, which contains methods to write and parse DATA messages from capture files. Usage example: # Open a capture file ddf = DATADumpFile("capture.bin") # Parse the 10th message msg = ddf.parse_msg(10) msg.fn = 100 msg.tn = 0 # Append one to the end of the capture ddf.append_msg(msg) Change-Id: I1b31183bd7bcca94de089847ee0b2f4ec88a7f1d
-rw-r--r--src/target/fake_trx/data_dump.py379
1 files changed, 379 insertions, 0 deletions
diff --git a/src/target/fake_trx/data_dump.py b/src/target/fake_trx/data_dump.py
new file mode 100644
index 00000000..b9047365
--- /dev/null
+++ b/src/target/fake_trx/data_dump.py
@@ -0,0 +1,379 @@
+#!/usr/bin/env python2
+# -*- coding: utf-8 -*-
+
+# TRX Toolkit
+# Helpers for DATA capture management
+#
+# (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+import struct
+
+from data_msg import *
+
+class DATADump:
+ # Constants
+ TAG_L12TRX = '\x01'
+ TAG_TRX2L1 = '\x02'
+ HDR_LENGTH = 2
+
+ # Generates raw bytes from a DATA message
+ # Return value: raw message bytes
+ def dump_msg(self, msg):
+ # Determine a message type
+ if isinstance(msg, DATAMSG_L12TRX):
+ tag = self.TAG_L12TRX
+ elif isinstance(msg, DATAMSG_TRX2L1):
+ tag = self.TAG_TRX2L1
+ else:
+ raise ValueError("Unknown message type")
+
+ # Generate a message payload
+ msg_raw = msg.gen_msg()
+
+ # Calculate the length
+ msg_len = len(msg_raw)
+
+ # Concatenate a message with header
+ return bytearray([tag, msg_len]) + msg_raw
+
+ def parse_hdr(self, hdr):
+ # Extract the header info
+ msg_len = struct.unpack("<B", hdr[1])[0]
+ tag = hdr[0]
+
+ # Check if tag is known
+ if tag == self.TAG_L12TRX:
+ # L1 -> TRX
+ msg = DATAMSG_L12TRX()
+ elif tag == self.TAG_TRX2L1:
+ # TRX -> L1
+ msg = DATAMSG_TRX2L1()
+ else:
+ # Unknown tag
+ return False
+
+ return (msg, msg_len)
+
+class DATADumpFile(DATADump):
+ def __init__(self, capture):
+ # Check if capture file is already opened
+ if type(capture) is file:
+ self.f = capture
+ else:
+ print("[i] Opening capture file '%s'..." % capture)
+ self.f = open(capture, "a+b")
+
+ def __del__(self):
+ print("[i] Closing the capture file")
+ self.f.close()
+
+ # Moves the file descriptor before a specified message
+ # Return value:
+ # True in case of success,
+ # or False in case of EOF or header parsing error.
+ def _seek2msg(self, idx):
+ # Seek to the begining of the capture
+ self.f.seek(0)
+
+ # Read the capture in loop...
+ for i in range(idx):
+ # Attempt to read a message header
+ hdr_raw = self.f.read(self.HDR_LENGTH)
+ if len(hdr_raw) != self.HDR_LENGTH:
+ return False
+
+ # Attempt to parse it
+ rc = self.parse_hdr(hdr_raw)
+ if rc is False:
+ print("[!] Couldn't parse a message header")
+ return False
+
+ # Expand the header
+ (_, msg_len) = rc
+
+ # Skip a message
+ self.f.seek(msg_len, 1)
+
+ return True
+
+ # Parses a single message at the current descriptor position
+ # Return value:
+ # a parsed message in case of success,
+ # or None in case of EOF or header parsing error,
+ # or False in case of message parsing error.
+ def _parse_msg(self):
+ # Attempt to read a message header
+ hdr_raw = self.f.read(self.HDR_LENGTH)
+ if len(hdr_raw) != self.HDR_LENGTH:
+ return None
+
+ # Attempt to parse it
+ rc = self.parse_hdr(hdr_raw)
+ if rc is False:
+ print("[!] Couldn't parse a message header")
+ return None
+
+ # Expand the header
+ (msg, msg_len) = rc
+
+ # Attempt to read a message
+ msg_raw = self.f.read(msg_len)
+ if len(msg_raw) != msg_len:
+ print("[!] Message length mismatch")
+ return None
+
+ # Attempt to parse a message
+ try:
+ msg_raw = bytearray(msg_raw)
+ msg.parse_msg(msg_raw)
+ except:
+ print("[!] Couldn't parse a message, skipping...")
+ return False
+
+ # Success
+ return msg
+
+ # Parses a particular message defined by index idx
+ # Return value:
+ # a parsed message in case of success,
+ # or None in case of EOF or header parsing error,
+ # or False in case of message parsing error or out of range.
+ def parse_msg(self, idx):
+ # Move descriptor to the begining of requested message
+ rc = self._seek2msg(idx)
+ if not rc:
+ print("[!] Couldn't find requested message")
+ return False
+
+ # Attempt to parse a message
+ return self._parse_msg()
+
+ # Parses all messages from a given file
+ # Return value:
+ # list of parsed messages,
+ # or False in case of range error.
+ def parse_all(self, skip = None, count = None):
+ result = []
+
+ # Should we skip some messages?
+ if skip is None:
+ # Seek to the begining of the capture
+ self.f.seek(0)
+ else:
+ rc = self._seek2msg(skip)
+ if not rc:
+ print("[!] Couldn't find requested message")
+ return False
+
+ # Read the capture in loop...
+ while True:
+ # Attempt to parse a message
+ msg = self._parse_msg()
+
+ # EOF or broken header
+ if msg is None:
+ break
+
+ # Skip unparsed messages
+ if msg is False:
+ continue
+
+ # Success, append a message
+ result.append(msg)
+
+ # Count limitation
+ if count is not None:
+ if len(result) == count:
+ break
+
+ return result
+
+ # Writes a new message at the end of the capture
+ def append_msg(self, msg):
+ # Generate raw bytes and write
+ msg_raw = self.dump_msg(msg)
+ self.f.write(msg_raw)
+
+ # Writes a list of messages at the end of the capture
+ def append_all(self, msgs):
+ for msg in msgs:
+ self.append_msg(msg)
+
+# Regression tests
+if __name__ == '__main__':
+ from tempfile import TemporaryFile
+ from gsm_shared import *
+ import random
+
+ # Create a temporary file
+ tf = TemporaryFile()
+
+ # Create an instance of DATA dump manager
+ ddf = DATADumpFile(tf)
+
+ # Generate two random bursts
+ burst_l12trx = []
+ burst_trx2l1 = []
+
+ for i in range(0, GSM_BURST_LEN):
+ ubit = random.randint(0, 1)
+ burst_l12trx.append(ubit)
+
+ sbit = random.randint(-127, 127)
+ burst_trx2l1.append(sbit)
+
+ # Generate a basic list of random messages
+ print("[i] Generating the reference messages")
+ messages_ref = []
+
+ for i in range(100):
+ # Create a message
+ if i % 2:
+ msg = DATAMSG_L12TRX()
+ msg.burst = burst_l12trx
+ else:
+ msg = DATAMSG_TRX2L1()
+ msg.burst = burst_trx2l1
+
+ # Randomize the header
+ msg.rand_hdr()
+
+ # HACK: as ToA parsing is not implemented yet,
+ # we have to use a fixed 0.00 value for now...
+ if isinstance(msg, DATAMSG_TRX2L1):
+ msg.toa = 0.00
+
+ # Append
+ messages_ref.append(msg)
+
+ print("[i] Adding the following messages to the capture:")
+ for msg in messages_ref[:3]:
+ print(" %s: burst_len=%d"
+ % (msg.desc_hdr(), len(msg.burst)))
+
+ # Check single message appending
+ ddf.append_msg(messages_ref[0])
+ ddf.append_msg(messages_ref[1])
+ ddf.append_msg(messages_ref[2])
+
+ # Read the written messages back
+ messages_check = ddf.parse_all()
+
+ print("[i] Read the following messages back:")
+ for msg in messages_check:
+ print(" %s: burst_len=%d"
+ % (msg.desc_hdr(), len(msg.burst)))
+
+ # Expecting three messages
+ assert(len(messages_check) == 3)
+
+ # Check the messages
+ for i in range(3):
+ # Compare common header parts and bursts
+ assert(messages_check[i].burst == messages_ref[i].burst)
+ assert(messages_check[i].fn == messages_ref[i].fn)
+ assert(messages_check[i].tn == messages_ref[i].tn)
+
+ # HACK: as ToA parsing is not implemented yet,
+ # we have to use a fixed 0.00 value for now...
+ messages_check[i].toa = 0.00
+
+ # Validate a message
+ assert(messages_check[i].validate())
+
+ print("[?] Check append_msg(): OK")
+
+
+ # Append the pending reference messages
+ ddf.append_all(messages_ref[3:])
+
+ # Read the written messages back
+ messages_check = ddf.parse_all()
+
+ # Check the final amount
+ assert(len(messages_check) == len(messages_ref))
+
+ # Check the messages
+ for i in range(len(messages_check)):
+ # Compare common header parts and bursts
+ assert(messages_check[i].burst == messages_ref[i].burst)
+ assert(messages_check[i].fn == messages_ref[i].fn)
+ assert(messages_check[i].tn == messages_ref[i].tn)
+
+ # HACK: as ToA parsing is not implemented yet,
+ # we have to use a fixed 0.00 value for now...
+ messages_check[i].toa = 0.00
+
+ # Validate a message
+ assert(messages_check[i].validate())
+
+ print("[?] Check append_all(): OK")
+
+
+ # Check parse_msg()
+ msg0 = ddf.parse_msg(0)
+ msg10 = ddf.parse_msg(10)
+
+ # Make sure parsing was successful
+ assert(msg0 and msg10)
+
+ # Compare common header parts and bursts
+ assert(msg0.burst == messages_ref[0].burst)
+ assert(msg0.fn == messages_ref[0].fn)
+ assert(msg0.tn == messages_ref[0].tn)
+
+ assert(msg10.burst == messages_ref[10].burst)
+ assert(msg10.fn == messages_ref[10].fn)
+ assert(msg10.tn == messages_ref[10].tn)
+
+ # HACK: as ToA parsing is not implemented yet,
+ # we have to use a fixed 0.00 value for now...
+ msg0.toa = 0.00
+ msg10.toa = 0.00
+
+ # Validate both messages
+ assert(msg0.validate())
+ assert(msg10.validate())
+
+ print("[?] Check parse_msg(): OK")
+
+
+ # Check parse_all() with range
+ messages_check = ddf.parse_all(skip = 10, count = 20)
+
+ # Make sure parsing was successful
+ assert(messages_check)
+
+ # Check the amount
+ assert(len(messages_check) == 20)
+
+ for i in range(20):
+ # Compare common header parts and bursts
+ assert(messages_check[i].burst == messages_ref[i + 10].burst)
+ assert(messages_check[i].fn == messages_ref[i + 10].fn)
+ assert(messages_check[i].tn == messages_ref[i + 10].tn)
+
+ # HACK: as ToA parsing is not implemented yet,
+ # we have to use a fixed 0.00 value for now...
+ messages_check[i].toa = 0.00
+
+ # Validate a message
+ assert(messages_check[i].validate())
+
+ print("[?] Check parse_all(): OK")