summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/data_msg.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/target/trx_toolkit/data_msg.py')
-rw-r--r--src/target/trx_toolkit/data_msg.py640
1 files changed, 128 insertions, 512 deletions
diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py
index ec59b856..898a4ae6 100644
--- a/src/target/trx_toolkit/data_msg.py
+++ b/src/target/trx_toolkit/data_msg.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
# -*- coding: utf-8 -*-
# TRX Toolkit
@@ -17,24 +16,24 @@
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import random
import struct
+import abc
+from typing import List
from enum import Enum
from gsm_shared import *
class Modulation(Enum):
""" Modulation types defined in 3GPP TS 45.002 """
- ModGMSK = (0b0000, 148)
- Mod8PSK = (0b0100, 444)
- ModAQPSK = (0b0110, 296)
- Mod16QAM = (0b1000, 592)
- Mod32QAM = (0b1010, 740)
+ ModGMSK = (0b0000, 1 * GMSK_BURST_LEN)
+ Mod8PSK = (0b0100, 3 * GMSK_BURST_LEN)
+ ModGMSK_AB = (0b0110, 1 * GMSK_BURST_LEN)
+ # ModRFU = (0b0111, 0) # Reserved for Future Use
+ Mod16QAM = (0b1000, 4 * GMSK_BURST_LEN)
+ Mod32QAM = (0b1010, 5 * GMSK_BURST_LEN)
+ ModAQPSK = (0b1100, 2 * GMSK_BURST_LEN)
def __init__(self, coding, bl):
# Coding in TRXD header
@@ -56,121 +55,60 @@ class Modulation(Enum):
return mod
return None
-class DATAMSG:
- """ TRXD (DATA) message codec (common part).
-
- The DATA messages are used to carry bursts in both directions
- between L1 and TRX. There exist two kinds of them:
-
- - L12TRX (L1 -> TRX) - to be transmitted bursts,
- - TRX2L1 (TRX -> L1) - received bursts.
-
- Both of them have quite similar structure, and start with
- the common fixed-size message header (no TLVs):
-
- +---------------+-----------------+------------+
- | common header | specific header | burst bits |
- +---------------+-----------------+------------+
-
- while the message specific headers and bit types are different.
-
- The common header is represented by this class, which is the
- parent of both DATAMSG_L12TRX and DATAMSG_TRX2L2 (see below),
- and has the following fields:
-
- +-----------------+----------------+-------------------+
- | VER (1/2 octet) | TN (1/2 octet) | FN (4 octets, BE) |
- +-----------------+----------------+-------------------+
-
- where:
-
- - VER is the header version indicator (1/2 octet MSB),
- - TN is TDMA time-slot number (1/2 octet LSB), and
- - FN is TDMA frame number (4 octets, big endian).
-
- == Header version indication
-
- It may be necessary to extend the message specific header
- with more information. Since this is not a TLV-based
- protocol, we need to include the header format version.
-
- +-----------------+------------------------+
- | 7 6 5 4 3 2 1 0 | bit numbers |
- +-----------------+------------------------+
- | X X X X . . . . | header version (0..15) |
- +-----------------+------------------------+
- | . . . . . X X X | TDMA TN (0..7) |
- +-----------------+------------------------+
- | . . . . X . . . | RESERVED (0) |
- +-----------------+------------------------+
-
- Instead of prepending an additional byte, it was decided to use
- 4 MSB bits of the first octet, which used to be zero-initialized
- due to the value range of TDMA TN. Therefore, the legacy header
- format has implicit version 0x00.
-
- Otherwise Wireshark (or trx_sniff.py) would need to guess the
- header version, or alternatively follow the control channel
- looking for the version setting command.
-
- The reserved bit number 3 can be used in the future to extend
- the TDMA TN range to (0..15), in case anybody would need
- to transfer UMTS bursts.
-
- """
+class Msg(abc.ABC):
+ ''' TRXD (DATA) message coding API (common part). '''
# NOTE: up to 16 versions can be encoded
CHDR_VERSION_MAX = 0b1111
- known_versions = [0x00, 0x01]
+ KNOWN_VERSIONS = (0, 1)
- # Common constructor
def __init__(self, fn = None, tn = None, burst = None, ver = 0):
self.burst = burst
self.ver = ver
self.fn = fn
self.tn = tn
- # The common header length
@property
def CHDR_LEN(self):
- # (VER + TN) + FN
- return 1 + 4
+ ''' The common header length. '''
+ return 1 + 4 # (VER + TN) + FN
- # Generates message specific header
+ @abc.abstractmethod
def gen_hdr(self):
- raise NotImplementedError
+ ''' Generate message specific header. '''
- # Parses message specific header
+ @abc.abstractmethod
def parse_hdr(self, hdr):
- raise NotImplementedError
+ ''' Parse message specific header. '''
- # Generates message specific burst
+ @abc.abstractmethod
def gen_burst(self):
- raise NotImplementedError
+ ''' Generate message specific burst. '''
- # Parses message specific burst
+ @abc.abstractmethod
def parse_burst(self, burst):
- raise NotImplementedError
+ ''' Parse message specific burst. '''
- # Generate a random message specific burst
+ @abc.abstractmethod
def rand_burst(self):
- raise NotImplementedError
+ ''' Generate a random message specific burst. '''
- # Generates a random frame number
def rand_fn(self):
+ ''' Generate a random frame number. '''
return random.randint(0, GSM_HYPERFRAME)
- # Generates a random timeslot number
def rand_tn(self):
+ ''' Generate a random timeslot number. '''
return random.randint(0, 7)
- # Randomizes the message header
def rand_hdr(self):
+ ''' Randomize the message header. '''
self.fn = self.rand_fn()
self.tn = self.rand_tn()
- # Generates human-readable header description
def desc_hdr(self):
+ ''' Generate human-readable header description. '''
+
result = ""
if self.ver > 0:
@@ -187,52 +125,30 @@ class DATAMSG:
return result
- # Converts unsigned soft-bits {254..0} to soft-bits {-127..127}
@staticmethod
- def usbit2sbit(bits):
- buf = []
-
- for bit in bits:
- if bit == 0xff:
- buf.append(-127)
- else:
- buf.append(127 - bit)
+ def usbit2sbit(bits: List[int]) -> List[int]:
+ ''' Convert unsigned soft-bits {254..0} to soft-bits {-127..127}. '''
+ return [-127 if (b == 0xff) else 127 - b for b in bits]
- return buf
-
- # Converts soft-bits {-127..127} to unsigned soft-bits {254..0}
@staticmethod
- def sbit2usbit(bits):
- buf = []
-
- for bit in bits:
- buf.append(127 - bit)
-
- return buf
+ def sbit2usbit(bits: List[int]) -> List[int]:
+ ''' Convert soft-bits {-127..127} to unsigned soft-bits {254..0}. '''
+ return [127 - b for b in bits]
- # Converts soft-bits {-127..127} to bits {1..0}
@staticmethod
- def sbit2ubit(bits):
- buf = []
-
- for bit in bits:
- buf.append(1 if bit < 0 else 0)
-
- return buf
+ def sbit2ubit(bits: List[int]) -> List[int]:
+ ''' Convert soft-bits {-127..127} to bits {1..0}. '''
+ return [int(b < 0) for b in bits]
- # Converts bits {1..0} to soft-bits {-127..127}
@staticmethod
- def ubit2sbit(bits):
- buf = []
+ def ubit2sbit(bits: List[int]) -> List[int]:
+ ''' Convert bits {1..0} to soft-bits {-127..127}. '''
+ return [-127 if b else 127 for b in bits]
- for bit in bits:
- buf.append(-127 if bit else 127)
-
- return buf
-
- # Validates the message fields (throws ValueError)
def validate(self):
- if not self.ver in self.known_versions:
+ ''' Validate the message fields (throws ValueError). '''
+
+ if not self.ver in self.KNOWN_VERSIONS:
raise ValueError("Unknown TRXD header version %d" % self.ver)
if self.fn is None:
@@ -247,8 +163,9 @@ class DATAMSG:
if self.tn < 0 or self.tn > 7:
raise ValueError("TDMA time-slot %d is out of range" % self.tn)
- # Generates a TRX DATA message
def gen_msg(self, legacy = False):
+ ''' Generate a TRX DATA message. '''
+
# Validate all the fields
self.validate()
@@ -276,15 +193,16 @@ class DATAMSG:
return buf
- # Parses a TRX DATA message
def parse_msg(self, msg):
+ ''' Parse a TRX DATA message. '''
+
# Make sure we have at least common header
if len(msg) < self.CHDR_LEN:
raise ValueError("Message is to short: missing common header")
# Parse the header version first
self.ver = (msg[0] >> 4)
- if not self.ver in self.known_versions:
+ if not self.ver in self.KNOWN_VERSIONS:
raise ValueError("Unknown TRXD header version %d" % self.ver)
# Parse TDMA TN and FN
@@ -306,27 +224,8 @@ class DATAMSG:
else:
self.burst = None
-class DATAMSG_L12TRX(DATAMSG):
- """ L12TRX (L1 -> TRX) message codec.
-
- This message represents a Downlink burst on the BTS side,
- or an Uplink burst on the MS side, and has the following
- message specific fixed-size header preceding the burst bits:
-
- == Versions 0x00, 0x01
-
- +-----+--------------------+
- | PWR | hard-bits (1 or 0) |
- +-----+--------------------+
-
- where PWR (1 octet) is relative (to the full-scale amplitude)
- transmit power level in dB. The absolute value is set on
- the control interface.
-
- Each hard-bit (1 or 0) of the burst is represented using one
- byte (0x01 or 0x00 respectively).
-
- """
+class TxMsg(Msg):
+ ''' Tx (L1 -> TRX) message coding API. '''
# Constants
PWR_MIN = 0x00
@@ -335,9 +234,10 @@ class DATAMSG_L12TRX(DATAMSG):
# Specific message fields
pwr = None
- # Calculates header length depending on its version
@property
def HDR_LEN(self):
+ ''' Calculate header length depending on its version. '''
+
# Common header length
length = self.CHDR_LEN
@@ -349,10 +249,11 @@ class DATAMSG_L12TRX(DATAMSG):
return length
- # Validates the message fields (throws ValueError)
def validate(self):
+ ''' Validate the message fields (throws ValueError). '''
+
# Validate common fields
- DATAMSG.validate(self)
+ Msg.validate(self)
if self.pwr is None:
raise ValueError("Tx Attenuation level is not set")
@@ -365,11 +266,12 @@ class DATAMSG_L12TRX(DATAMSG):
raise ValueError("Tx burst bits are not set")
# FIXME: properly handle IDLE / NOPE indications
- if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
+ if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
raise ValueError("Tx burst has odd length %u" % len(self.burst))
- # Generates a random power level
def rand_pwr(self, min = None, max = None):
+ ''' Generate a random power level. '''
+
if min is None:
min = self.PWR_MIN
@@ -378,15 +280,17 @@ class DATAMSG_L12TRX(DATAMSG):
return random.randint(min, max)
- # Randomizes message specific header
def rand_hdr(self):
- DATAMSG.rand_hdr(self)
+ ''' Randomize message specific header. '''
+
+ Msg.rand_hdr(self)
self.pwr = self.rand_pwr()
- # Generates human-readable header description
def desc_hdr(self):
+ ''' Generate human-readable header description. '''
+
# Describe the common part
- result = DATAMSG.desc_hdr(self)
+ result = Msg.desc_hdr(self)
if self.pwr is not None:
result += ("pwr=%u " % self.pwr)
@@ -394,8 +298,9 @@ class DATAMSG_L12TRX(DATAMSG):
# Strip useless whitespace and return
return result.strip()
- # Generates message specific header part
def gen_hdr(self):
+ ''' Generate message specific header part. '''
+
# Allocate an empty byte-array
buf = bytearray()
@@ -404,136 +309,50 @@ class DATAMSG_L12TRX(DATAMSG):
return buf
- # Parses message specific header part
def parse_hdr(self, hdr):
+ ''' Parse message specific header part. '''
+
# Parse power level
self.pwr = hdr[5]
- # Generates message specific burst
def gen_burst(self):
+ ''' Generate message specific burst. '''
+
# Copy burst 'as is'
return bytearray(self.burst)
- # Parses message specific burst
def parse_burst(self, burst):
+ ''' Parse message specific burst. '''
+
length = len(burst)
# Distinguish between GSM and EDGE
if length >= EDGE_BURST_LEN:
self.burst = list(burst[:EDGE_BURST_LEN])
else:
- self.burst = list(burst[:GSM_BURST_LEN])
+ self.burst = list(burst[:GMSK_BURST_LEN])
- # Generate a random message specific burst
- def rand_burst(self, length = GSM_BURST_LEN):
- self.burst = []
+ def rand_burst(self, length = GMSK_BURST_LEN):
+ ''' Generate a random message specific burst. '''
+ self.burst = [random.randint(0, 1) for _ in range(length)]
- for i in range(length):
- ubit = random.randint(0, 1)
- self.burst.append(ubit)
+ def trans(self, ver = None):
+ ''' Transform this message into RxMsg. '''
- # Transforms this message to TRX2L1 message
- def gen_trx2l1(self, ver = None):
# Allocate a new message
- msg = DATAMSG_TRX2L1(fn = self.fn, tn = self.tn,
+ msg = RxMsg(fn = self.fn, tn = self.tn,
ver = self.ver if ver is None else ver)
# Convert burst bits
if self.burst is not None:
msg.burst = self.ubit2sbit(self.burst)
+ else:
+ msg.nope_ind = True
return msg
-class DATAMSG_TRX2L1(DATAMSG):
- """ TRX2L1 (TRX -> L1) message codec.
-
- This message represents an Uplink burst on the BTS side,
- or a Downlink burst on the MS side, and has the following
- message specific fixed-size header preceding the burst bits:
-
- == Version 0x00
-
- +------+-----+--------------------+
- | RSSI | ToA | soft-bits (254..0) |
- +------+-----+--------------------+
-
- == Version 0x01
-
- +------+-----+-----+-----+--------------------+
- | RSSI | ToA | MTS | C/I | soft-bits (254..0) |
- +------+-----+-----+-----+--------------------+
-
- where:
-
- - RSSI (1 octet) - Received Signal Strength Indication
- encoded without the negative sign.
- - ToA (2 octets) - Timing of Arrival in units of 1/256
- of symbol (big endian).
- - MTS (1 octet) - Modulation and Training Sequence info.
- - C/I (2 octets) - Carrier-to-Interference ratio (big endian).
-
- == Coding of MTS: Modulation and Training Sequence info
-
- 3GPP TS 45.002 version 15.1.0 defines several modulation types,
- and a few sets of training sequences for each type. The most
- common are GMSK and 8-PSK (which is used in EDGE).
-
- +-----------------+---------------------------------------+
- | 7 6 5 4 3 2 1 0 | bit numbers (value range) |
- +-----------------+---------------------------------------+
- | X . . . . . . . | IDLE / nope frame indication (0 or 1) |
- +-----------------+---------------------------------------+
- | . X X X X . . . | Modulation, TS set number (see below) |
- +-----------------+---------------------------------------+
- | . . . . . X X X | Training Sequence Code (0..7) |
- +-----------------+---------------------------------------+
-
- The bit number 7 (MSB) is set to high when either nothing has been
- detected, or during IDLE frames, so we can deliver noise levels,
- and avoid clock gaps on the L1 side. Other bits are ignored,
- and should be set to low (0) in this case.
-
- == Coding of modulation and TS set number
-
- GMSK has 4 sets of training sequences (see tables 5.2.3a-d),
- while 8-PSK (see tables 5.2.3f-g) and the others have 2 sets.
- Access and Synchronization bursts also have several synch.
- sequences.
-
- +-----------------+---------------------------------------+
- | 7 6 5 4 3 2 1 0 | bit numbers (value range) |
- +-----------------+---------------------------------------+
- | . 0 0 X X . . . | GMSK, 4 TS sets (0..3) |
- +-----------------+---------------------------------------+
- | . 0 1 0 X . . . | 8-PSK, 2 TS sets (0..1) |
- +-----------------+---------------------------------------+
- | . 0 1 1 X . . . | AQPSK, 2 TS sets (0..1) |
- +-----------------+---------------------------------------+
- | . 1 0 0 X . . . | 16QAM, 2 TS sets (0..1) |
- +-----------------+---------------------------------------+
- | . 1 0 1 X . . . | 32QAM, 2 TS sets (0..1) |
- +-----------------+---------------------------------------+
- | . 1 1 1 X . . . | RESERVED (0) |
- +-----------------+---------------------------------------+
-
- == C/I: Carrier-to-Interference ratio
-
- The C/I value can be computed from the training sequence of each
- burst, where we can compare the "ideal" training sequence with
- the actual training sequence and then express that in centiBels.
-
- == Coding of the burst bits
-
- Unlike the transmitted bursts, the received bursts are designated
- using the soft-bits notation, so the receiver can indicate its
- assurance from 0 to -127 that a given bit is 1, and from 0 to +127
- that a given bit is 0. The Viterbi algorithm allows to approximate
- the original sequence of hard-bits (1 or 0) using these values.
-
- Each soft-bit (-127..127) of the burst is encoded as an unsigned
- value in range (0..255) respectively using the constant shift.
-
- """
+class RxMsg(Msg):
+ ''' Rx (TRX -> L1) message coding API. '''
# rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise
RSSI_MIN = -120
@@ -565,9 +384,10 @@ class DATAMSG_TRX2L1(DATAMSG):
tsc = None
ci = None
- # Calculates header length depending on its version
@property
def HDR_LEN(self):
+ ''' Calculate header length depending on its version. '''
+
# Common header length
length = self.CHDR_LEN
@@ -589,7 +409,7 @@ class DATAMSG_TRX2L1(DATAMSG):
raise ValueError("Rx burst bits are not set")
# ... and can be either of GSM (GMSK) or EDGE (8-PSK)
- if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
+ if len(self.burst) not in (GMSK_BURST_LEN, EDGE_BURST_LEN):
raise ValueError("Rx burst has odd length %u" % len(self.burst))
def _validate_burst_v1(self):
@@ -606,17 +426,19 @@ class DATAMSG_TRX2L1(DATAMSG):
if len(self.burst) != self.mod_type.bl:
raise ValueError("Rx burst has odd length %u" % len(self.burst))
- # Validates the burst (throws ValueError)
def validate_burst(self):
+ ''' Validate the burst (throws ValueError). '''
+
if self.ver == 0x00:
self._validate_burst_v0()
elif self.ver >= 0x01:
self._validate_burst_v1()
- # Validates the message header fields (throws ValueError)
def validate(self):
+ ''' Validate the message header fields (throws ValueError). '''
+
# Validate common fields
- DATAMSG.validate(self)
+ Msg.validate(self)
if self.rssi is None:
raise ValueError("RSSI is not set")
@@ -630,7 +452,8 @@ class DATAMSG_TRX2L1(DATAMSG):
if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:
raise ValueError("ToA256 %d is out of range" % self.toa256)
- if self.ver >= 0x01:
+ # Version specific parameters (omited for NOPE.ind)
+ if self.ver >= 0x01 and not self.nope_ind:
if type(self.mod_type) is not Modulation:
raise ValueError("Unknown Rx modulation type")
@@ -650,6 +473,8 @@ class DATAMSG_TRX2L1(DATAMSG):
if self.tsc not in self.TSC_RANGE:
raise ValueError("TSC %d is out of range" % self.tsc)
+ # Version specific parameters (also present in NOPE.ind)
+ if self.ver >= 0x01:
if self.ci is None:
raise ValueError("C/I is not set")
@@ -658,8 +483,9 @@ class DATAMSG_TRX2L1(DATAMSG):
self.validate_burst()
- # Generates a random RSSI value
def rand_rssi(self, min = None, max = None):
+ ''' Generate a random RSSI value. '''
+
if min is None:
min = self.RSSI_MIN
@@ -668,8 +494,9 @@ class DATAMSG_TRX2L1(DATAMSG):
return random.randint(min, max)
- # Generates a ToA (Time of Arrival) value
def rand_toa256(self, min = None, max = None):
+ ''' Generate a random ToA (Time of Arrival) value. '''
+
if min is None:
min = self.TOA256_MIN
@@ -678,9 +505,10 @@ class DATAMSG_TRX2L1(DATAMSG):
return random.randint(min, max)
- # Randomizes message specific header
def rand_hdr(self):
- DATAMSG.rand_hdr(self)
+ ''' Randomize message specific header. '''
+
+ Msg.rand_hdr(self)
self.rssi = self.rand_rssi()
self.toa256 = self.rand_toa256()
@@ -695,10 +523,11 @@ class DATAMSG_TRX2L1(DATAMSG):
# C/I: Carrier-to-Interference ratio
self.ci = random.randint(self.CI_MIN, self.CI_MAX)
- # Generates human-readable header description
def desc_hdr(self):
+ ''' Generate human-readable header description. '''
+
# Describe the common part
- result = DATAMSG.desc_hdr(self)
+ result = Msg.desc_hdr(self)
if self.rssi is not None:
result += ("rssi=%d " % self.rssi)
@@ -722,8 +551,9 @@ class DATAMSG_TRX2L1(DATAMSG):
# Strip useless whitespace and return
return result.strip()
- # Encodes Modulation and Training Sequence info
def gen_mts(self):
+ ''' Encode Modulation and Training Sequence info. '''
+
# IDLE / nope indication has no MTS info
if self.nope_ind:
return self.NOPE_IND
@@ -737,8 +567,9 @@ class DATAMSG_TRX2L1(DATAMSG):
return mts
- # Parses Modulation and Training Sequence info
def parse_mts(self, mts):
+ ''' Parse Modulation and Training Sequence info. '''
+
# IDLE / nope indication has no MTS info
self.nope_ind = (mts & self.NOPE_IND) > 0
if self.nope_ind:
@@ -761,8 +592,9 @@ class DATAMSG_TRX2L1(DATAMSG):
self.mod_type = Modulation.ModGMSK
self.tsc_set = mts & 0b11
- # Generates message specific header part
def gen_hdr(self):
+ ''' Generate message specific header part. '''
+
# Allocate an empty byte-array
buf = bytearray()
@@ -779,15 +611,13 @@ class DATAMSG_TRX2L1(DATAMSG):
buf.append(mts)
# C/I: Carrier-to-Interference ratio (in centiBels)
- if not self.nope_ind:
- buf += struct.pack(">h", self.ci)
- else:
- buf += bytearray(2)
+ buf += struct.pack(">h", self.ci)
return buf
- # Parses message specific header part
def parse_hdr(self, hdr):
+ ''' Parse message specific header part. '''
+
# Parse RSSI
self.rssi = -(hdr[5])
@@ -799,21 +629,20 @@ class DATAMSG_TRX2L1(DATAMSG):
self.parse_mts(hdr[8])
# C/I: Carrier-to-Interference ratio (in centiBels)
- if not self.nope_ind:
- self.ci = struct.unpack(">h", hdr[9:11])[0]
- else:
- self.ci = None
+ self.ci = struct.unpack(">h", hdr[9:11])[0]
- # Generates message specific burst
def gen_burst(self):
+ ''' Generate message specific burst. '''
+
# Convert soft-bits to unsigned soft-bits
burst_usbits = self.sbit2usbit(self.burst)
# Encode to bytes
return bytearray(burst_usbits)
- # Parses message specific burst for header version 0
def _parse_burst_v0(self, burst):
+ ''' Parse message specific burst for header version 0. '''
+
bl = len(burst)
# We need to guess modulation by the length of burst
@@ -827,8 +656,9 @@ class DATAMSG_TRX2L1(DATAMSG):
return burst[:self.mod_type.bl]
- # Parses message specific burst
def parse_burst(self, burst):
+ ''' Parse message specific burst. '''
+
burst = list(burst)
if self.ver == 0x00:
@@ -837,21 +667,19 @@ class DATAMSG_TRX2L1(DATAMSG):
# Convert unsigned soft-bits to soft-bits
self.burst = self.usbit2sbit(burst)
- # Generate a random message specific burst
def rand_burst(self, length = None):
- self.burst = []
+ ''' Generate a random message specific burst. '''
if length is None:
length = self.mod_type.bl
- for i in range(length):
- sbit = random.randint(-127, 127)
- self.burst.append(sbit)
+ self.burst = [random.randint(-127, 127) for _ in range(length)]
+
+ def trans(self, ver = None):
+ ''' Transform this message to TxMsg. '''
- # Transforms this message to L12TRX message
- def gen_l12trx(self, ver = None):
# Allocate a new message
- msg = DATAMSG_L12TRX(fn = self.fn, tn = self.tn,
+ msg = TxMsg(fn = self.fn, tn = self.tn,
ver = self.ver if ver is None else ver)
# Convert burst bits
@@ -859,215 +687,3 @@ class DATAMSG_TRX2L1(DATAMSG):
msg.burst = self.sbit2ubit(self.burst)
return msg
-
-# Regression test
-if __name__ == '__main__':
- import logging as log
-
- # Configure logging
- log.basicConfig(level = log.DEBUG,
- format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s")
-
- log.info("Generating the reference messages")
-
- # Create messages of both types
- msg_l12trx_ref = DATAMSG_L12TRX()
- msg_trx2l1_ref = DATAMSG_TRX2L1()
-
- # Validate header randomization
- for i in range(0, 100):
- msg_l12trx_ref.rand_hdr()
- msg_trx2l1_ref.rand_hdr()
-
- msg_l12trx_ref.rand_burst()
- msg_trx2l1_ref.rand_burst()
-
- msg_l12trx_ref.validate()
- msg_trx2l1_ref.validate()
-
- log.info("Validate header randomization: OK")
-
- # Test error handling for common fields
- msg = DATAMSG()
-
- # Make sure that message validation throws a ValueError
- def validate_throw(msg):
- try:
- msg.validate()
- return False
- except ValueError:
- return True
-
- # Unknown version
- msg.rand_hdr()
- msg.ver = 100
- assert(validate_throw(msg))
-
- # Uninitialized field
- msg.rand_hdr()
- msg.fn = None
- assert(validate_throw(msg))
-
- # Out-of-range value
- msg.rand_hdr()
- msg.tn = 10
- assert(validate_throw(msg))
-
- log.info("Check incorrect message validation: OK")
-
- log.info("Encoding the reference messages")
-
- # Encode DATA messages
- l12trx_raw = msg_l12trx_ref.gen_msg()
- trx2l1_raw = msg_trx2l1_ref.gen_msg()
-
- # Encode a TRX2L1 message in legacy mode
- trx2l1_raw_legacy = msg_trx2l1_ref.gen_msg(legacy = True)
-
- log.info("Parsing generated messages back")
-
- # Parse generated DATA messages
- msg_l12trx_dec = DATAMSG_L12TRX()
- msg_trx2l1_dec = DATAMSG_TRX2L1()
- msg_l12trx_dec.parse_msg(l12trx_raw)
- msg_trx2l1_dec.parse_msg(trx2l1_raw)
-
- # Parse generated TRX2L1 message in legacy mode
- msg_trx2l1_legacy_dec = DATAMSG_TRX2L1()
- msg_trx2l1_legacy_dec.parse_msg(trx2l1_raw_legacy)
-
- log.info("Comparing decoded messages with the reference")
-
- # Compare bursts
- assert(msg_l12trx_dec.burst == msg_l12trx_ref.burst)
- assert(msg_trx2l1_dec.burst == msg_trx2l1_ref.burst)
- assert(msg_trx2l1_legacy_dec.burst == msg_trx2l1_ref.burst)
-
- log.info("Compare bursts: OK")
-
- # Compare both parsed messages with the reference data
- assert(msg_l12trx_dec.fn == msg_l12trx_ref.fn)
- assert(msg_trx2l1_dec.fn == msg_trx2l1_ref.fn)
- assert(msg_l12trx_dec.tn == msg_l12trx_ref.tn)
- assert(msg_trx2l1_dec.tn == msg_trx2l1_ref.tn)
-
- log.info("Compare FN / TN: OK")
-
- # Compare message specific parts
- assert(msg_trx2l1_dec.rssi == msg_trx2l1_ref.rssi)
- assert(msg_l12trx_dec.pwr == msg_l12trx_ref.pwr)
- assert(msg_trx2l1_dec.toa256 == msg_trx2l1_ref.toa256)
-
- log.info("Compare message specific data: OK")
-
- # Bit conversation test
- usbits_ref = list(range(0, 256))
- sbits_ref = list(range(-127, 128))
-
- # Test both usbit2sbit() and sbit2usbit()
- sbits = DATAMSG.usbit2sbit(usbits_ref)
- usbits = DATAMSG.sbit2usbit(sbits)
- assert(usbits[:255] == usbits_ref[:255])
- assert(usbits[255] == 254)
-
- log.info("Check both usbit2sbit() and sbit2usbit(): OK")
-
- # Test both sbit2ubit() and ubit2sbit()
- ubits = DATAMSG.sbit2ubit(sbits_ref)
- assert(ubits == ([1] * 127 + [0] * 128))
-
- sbits = DATAMSG.ubit2sbit(ubits)
- assert(sbits == ([-127] * 127 + [127] * 128))
-
- log.info("Check both sbit2ubit() and ubit2sbit(): OK")
-
- # Test message transformation
- msg_l12trx_dec = msg_trx2l1_ref.gen_l12trx()
- msg_trx2l1_dec = msg_l12trx_ref.gen_trx2l1()
-
- assert(msg_l12trx_dec.fn == msg_trx2l1_ref.fn)
- assert(msg_l12trx_dec.tn == msg_trx2l1_ref.tn)
-
- assert(msg_trx2l1_dec.fn == msg_l12trx_ref.fn)
- assert(msg_trx2l1_dec.tn == msg_l12trx_ref.tn)
-
- assert(msg_l12trx_dec.burst == DATAMSG.sbit2ubit(msg_trx2l1_ref.burst))
- assert(msg_trx2l1_dec.burst == DATAMSG.ubit2sbit(msg_l12trx_ref.burst))
-
- log.info("Check L12TRX <-> TRX2L1 type transformations: OK")
-
- # Test header version coding
- for ver in DATAMSG.known_versions:
- # Create messages of both types
- msg_l12trx = DATAMSG_L12TRX(ver = ver)
- msg_trx2l1 = DATAMSG_TRX2L1(ver = ver)
-
- # Randomize message specific headers
- msg_l12trx.rand_hdr()
- msg_trx2l1.rand_hdr()
-
- # Randomize bursts
- msg_l12trx.rand_burst()
- msg_trx2l1.rand_burst()
-
- # Encode DATA messages
- msg_l12trx_enc = msg_l12trx.gen_msg()
- msg_trx2l1_enc = msg_trx2l1.gen_msg()
-
- # Parse generated DATA messages
- msg_l12trx_dec = DATAMSG_L12TRX()
- msg_trx2l1_dec = DATAMSG_TRX2L1()
- msg_l12trx_dec.parse_msg(msg_l12trx_enc)
- msg_trx2l1_dec.parse_msg(msg_trx2l1_enc)
-
- # Match the header version
- assert(msg_l12trx_dec.ver == ver)
- assert(msg_trx2l1_dec.ver == ver)
-
- # Match common TDMA fields
- assert(msg_l12trx_dec.tn == msg_l12trx.tn)
- assert(msg_trx2l1_dec.fn == msg_trx2l1.fn)
-
- # Match version specific fields
- if msg_trx2l1.ver >= 0x01:
- assert(msg_trx2l1_dec.nope_ind == msg_trx2l1.nope_ind)
- assert(msg_trx2l1_dec.mod_type == msg_trx2l1.mod_type)
- assert(msg_trx2l1_dec.tsc_set == msg_trx2l1.tsc_set)
- assert(msg_trx2l1_dec.tsc == msg_trx2l1.tsc)
- assert(msg_trx2l1_dec.ci == msg_trx2l1.ci)
-
- log.info("Check header version %u coding: OK" % ver)
-
- # Compare bursts
- assert(msg_l12trx_dec.burst == msg_l12trx.burst)
- assert(msg_trx2l1_dec.burst == msg_trx2l1.burst)
-
- msg_trx2l1_gen = msg_l12trx.gen_trx2l1()
- msg_l12trx_gen = msg_trx2l1.gen_l12trx()
-
- assert(msg_trx2l1_gen is not None)
- assert(msg_l12trx_gen is not None)
-
- # Match the header version
- assert(msg_trx2l1_gen.ver == ver)
- assert(msg_l12trx_gen.ver == ver)
-
- # Match common TDMA fields
- assert(msg_trx2l1_gen.tn == msg_l12trx.tn)
- assert(msg_l12trx_gen.fn == msg_trx2l1.fn)
-
- log.info("Verify version %u direct transformation: OK" % ver)
-
- # Verify NOPE indication coding
- if msg_trx2l1.ver >= 0x01:
- msg_trx2l1 = DATAMSG_TRX2L1(ver = ver)
- msg_trx2l1.nope_ind = True
- msg_trx2l1.rand_hdr()
-
- msg_trx2l1_dec = DATAMSG_TRX2L1()
- msg_trx2l1_dec.parse_msg(msg_trx2l1.gen_msg())
-
- assert(msg_trx2l1.nope_ind == msg_trx2l1_dec.nope_ind)
- assert(msg_trx2l1.burst == msg_trx2l1_dec.burst)
-
- log.info("Verify version %u NOPE indication coding: OK" % ver)