summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/data_msg.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/target/trx_toolkit/data_msg.py')
-rw-r--r--src/target/trx_toolkit/data_msg.py395
1 files changed, 362 insertions, 33 deletions
diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py
index 5bd04cac..a2996abe 100644
--- a/src/target/trx_toolkit/data_msg.py
+++ b/src/target/trx_toolkit/data_msg.py
@@ -25,8 +25,37 @@
import random
import struct
+from enum import Enum
from gsm_shared import *
+class Modulation(Enum):
+ """ Modulation types defined in 3GPP TS 45.002 """
+ ModGMSK = (0b0000, 148)
+ Mod8PSK = (0b0100, 444)
+ ModAQPSK = (0b0110, 296)
+ Mod16QAM = (0b1000, 592)
+ Mod32QAM = (0b1010, 740)
+
+ def __init__(self, coding, bl):
+ # Coding in TRXD header
+ self.coding = coding
+ # Burst length
+ self.bl = bl
+
+ @classmethod
+ def pick(self, coding):
+ for mod in list(self):
+ if mod.coding == coding:
+ return mod
+ return None
+
+ @classmethod
+ def pick_by_bl(self, bl):
+ for mod in list(self):
+ if mod.bl == bl:
+ return mod
+ return None
+
class DATAMSG:
""" TRXD (DATA) message codec (common part).
@@ -92,7 +121,7 @@ class DATAMSG:
# NOTE: up to 16 versions can be encoded
CHDR_VERSION_MAX = 0b1111
- known_versions = [0x00]
+ known_versions = [0x00, 0x01]
# Common constructor
def __init__(self, fn = None, tn = None, burst = None, ver = 0):
@@ -101,6 +130,12 @@ class DATAMSG:
self.fn = fn
self.tn = tn
+ # The common header length
+ @property
+ def CHDR_LEN(self):
+ # (VER + TN) + FN
+ return 1 + 4
+
# Generates message specific header
def gen_hdr(self):
raise NotImplementedError
@@ -197,12 +232,6 @@ class DATAMSG:
if not self.ver in self.known_versions:
return False
- if self.burst is None:
- return False
-
- if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
- return False
-
if self.fn is None:
return False
@@ -237,7 +266,8 @@ class DATAMSG:
buf += hdr
# Generate burst
- buf += self.gen_burst()
+ if self.burst is not None:
+ buf += self.gen_burst()
# This is a rudiment from (legacy) OpenBTS transceiver,
# some L1 implementations still expect two dummy bytes.
@@ -248,11 +278,8 @@ class DATAMSG:
# Parses a TRX DATA message
def parse_msg(self, msg):
- # Calculate message length
- length = len(msg)
-
- # Check length
- if length < (self.HDR_LEN + GSM_BURST_LEN):
+ # Make sure we have at least header
+ if len(msg) < self.HDR_LEN:
raise ValueError("Message is to short")
# Parse version and TDMA TN
@@ -267,7 +294,10 @@ class DATAMSG:
# Copy burst, skipping header
msg_burst = msg[self.HDR_LEN:]
- self.parse_burst(msg_burst)
+ if len(msg_burst) > 0:
+ self.parse_burst(msg_burst)
+ else:
+ self.burst = None
class DATAMSG_L12TRX(DATAMSG):
""" L12TRX (L1 -> TRX) message codec.
@@ -276,6 +306,8 @@ class DATAMSG_L12TRX(DATAMSG):
or an Uplink burst on the MS side, and has the following
message specific fixed-size header preceding the burst bits:
+ == Versions 0x00, 0x01
+
+-----+--------------------+
| PWR | hard-bits (1 or 0) |
+-----+--------------------+
@@ -290,13 +322,26 @@ class DATAMSG_L12TRX(DATAMSG):
"""
# Constants
- HDR_LEN = 6
PWR_MIN = 0x00
PWR_MAX = 0xff
# Specific message fields
pwr = None
+ # Calculates header length depending on its version
+ @property
+ def HDR_LEN(self):
+ # Common header length
+ length = self.CHDR_LEN
+
+ # Message specific header length
+ if self.ver in (0x00, 0x01):
+ length += 1 # PWR
+ else:
+ raise IndexError("Unhandled version %u" % self.ver)
+
+ return length
+
# Validates the message fields
def validate(self):
# Validate common fields
@@ -309,6 +354,14 @@ class DATAMSG_L12TRX(DATAMSG):
if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX:
return False
+ # FIXME: properly handle IDLE / NOPE indications
+ if self.burst is None:
+ return False
+
+ # FIXME: properly handle IDLE / NOPE indications
+ if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
+ return False
+
return True
# Generates a random power level
@@ -394,16 +447,78 @@ class DATAMSG_TRX2L1(DATAMSG):
or a Downlink burst on the MS side, and has the following
message specific fixed-size header preceding the burst bits:
+ == Version 0x00
+
+------+-----+--------------------+
| RSSI | ToA | soft-bits (254..0) |
+------+-----+--------------------+
+ == Version 0x01
+
+ +------+-----+-----+-----+--------------------+
+ | RSSI | ToA | MTS | C/I | soft-bits (254..0) |
+ +------+-----+-----+-----+--------------------+
+
where:
- RSSI (1 octet) - Received Signal Strength Indication
encoded without the negative sign.
- ToA (2 octets) - Timing of Arrival in units of 1/256
of symbol (big endian).
+ - MTS (1 octet) - Modulation and Training Sequence info.
+ - C/I (2 octets) - Carrier-to-Interference ratio (big endian).
+
+ == Coding of MTS: Modulation and Training Sequence info
+
+ 3GPP TS 45.002 version 15.1.0 defines several modulation types,
+ and a few sets of training sequences for each type. The most
+ common are GMSK and 8-PSK (which is used in EDGE).
+
+ +-----------------+---------------------------------------+
+ | 7 6 5 4 3 2 1 0 | bit numbers (value range) |
+ +-----------------+---------------------------------------+
+ | . . . . . X X X | Training Sequence Code (0..7) |
+ +-----------------+---------------------------------------+
+ | . X X X X . . . | Modulation, TS set number (see below) |
+ +-----------------+---------------------------------------+
+ | X . . . . . . . | IDLE / nope frame indication (0 or 1) |
+ +-----------------+---------------------------------------+
+
+ The bit number 7 (MSB) is set to high when either nothing has been
+ detected, or during IDLE frames, so we can deliver noise levels,
+ and avoid clock gaps on the L1 side. Other bits are ignored,
+ and should be set to low (0) in this case. L16 shall be set to 0x00.
+
+ == Coding of modulation and TS set number
+
+ GMSK has 4 sets of training sequences (see tables 5.2.3a-d),
+ while 8-PSK (see tables 5.2.3f-g) and the others have 2 sets.
+ Access and Synchronization bursts also have several synch.
+ sequences.
+
+ +-----------------+---------------------------------------+
+ | 7 6 5 4 3 2 1 0 | bit numbers (value range) |
+ +-----------------+---------------------------------------+
+ | . 0 0 X X . . . | GMSK, 4 TS sets (0..3) |
+ +-----------------+---------------------------------------+
+ | . 0 1 0 X . . . | 8-PSK, 2 TS sets (0..1) |
+ +-----------------+---------------------------------------+
+ | . 0 1 1 X . . . | AQPSK, 2 TS sets (0..1) |
+ +-----------------+---------------------------------------+
+ | . 1 0 0 X . . . | 16QAM, 2 TS sets (0..1) |
+ +-----------------+---------------------------------------+
+ | . 1 0 1 X . . . | 32QAM, 2 TS sets (0..1) |
+ +-----------------+---------------------------------------+
+ | . 1 1 1 X . . . | RESERVED (0) |
+ +-----------------+---------------------------------------+
+
+ == C/I: Carrier-to-Interference ratio
+
+ The C/I value can be computed from the training sequence of each
+ burst, where we can compare the "ideal" training sequence with
+ the actual training sequence and then express that in centiBels.
+
+ == Coding of the burst bits
Unlike to be transmitted bursts, the received bursts are designated
using the soft-bits notation, so the receiver can indicate its
@@ -416,9 +531,6 @@ class DATAMSG_TRX2L1(DATAMSG):
"""
- # Constants
- HDR_LEN = 8
-
# rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise
RSSI_MIN = -120
RSSI_MAX = -47
@@ -427,11 +539,81 @@ class DATAMSG_TRX2L1(DATAMSG):
TOA256_MIN = -32768
TOA256_MAX = 32767
+ # TSC (Training Sequence Code) range
+ TSC_RANGE = range(0, 8)
+
+ # C/I range (in centiBels)
+ CI_MIN = -1280
+ CI_MAX = 1280
+
+ # IDLE frame / nope detection indicator
+ NOPE_IND = (1 << 7)
+
# Specific message fields
rssi = None
toa256 = None
- # Validates the message fields
+ # Version 0x01 specific (default values)
+ mod_type = Modulation.ModGMSK
+ nope_ind = False
+
+ tsc_set = None
+ tsc = None
+ ci = None
+
+ # Calculates header length depending on its version
+ @property
+ def HDR_LEN(self):
+ # Common header length
+ length = self.CHDR_LEN
+
+ # Message specific header length
+ if self.ver == 0x00:
+ # RSSI + ToA
+ length += 1 + 2
+ elif self.ver == 0x01:
+ # RSSI + ToA + TS + C/I
+ length += 1 + 2 + 1 + 2
+ else:
+ raise IndexError("Unhandled version %u" % self.ver)
+
+ return length
+
+ def _validate_burst_v0(self):
+ # Burst is mandatory
+ if self.burst is None:
+ return False
+
+ # ... and can be either of GSM (GMSK) or EDGE (8-PSK)
+ if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN):
+ return False
+
+ return True
+
+ def _validate_burst_v1(self):
+ # Burst is omitted in case of an IDLE / NOPE indication
+ if self.nope_ind and self.burst is None:
+ return True
+ if self.nope_ind and self.burst is not None:
+ return False
+
+ if self.burst is None:
+ return False
+
+ # Burst length depends on modulation type
+ if len(self.burst) != self.mod_type.bl:
+ return False
+
+ return True
+
+ # Validates the burst
+ def validate_burst(self):
+ if self.ver == 0x00:
+ return self._validate_burst_v0()
+ elif self.ver >= 0x01:
+ return self._validate_burst_v1()
+
+ # Validates the message header fields
def validate(self):
# Validate common fields
if not DATAMSG.validate(self):
@@ -449,6 +631,35 @@ class DATAMSG_TRX2L1(DATAMSG):
if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX:
return False
+ if self.ver >= 0x01:
+ if type(self.mod_type) is not Modulation:
+ return False
+
+ if self.tsc_set is None:
+ return False
+
+ if self.mod_type is Modulation.ModGMSK:
+ if self.tsc_set not in range(0, 4):
+ return False
+ else:
+ if self.tsc_set not in range(0, 2):
+ return False
+
+ if self.tsc is None:
+ return False
+
+ if self.tsc not in self.TSC_RANGE:
+ return False
+
+ if self.ci is None:
+ return False
+
+ if self.ci < self.CI_MIN or self.ci > self.CI_MAX:
+ return False
+
+ if not self.validate_burst():
+ return False
+
return True
# Generates a random RSSI value
@@ -477,6 +688,17 @@ class DATAMSG_TRX2L1(DATAMSG):
self.rssi = self.rand_rssi()
self.toa256 = self.rand_toa256()
+ if self.ver >= 0x01:
+ self.mod_type = random.choice(list(Modulation))
+ if self.mod_type is Modulation.ModGMSK:
+ self.tsc_set = random.randint(0, 3)
+ else:
+ self.tsc_set = random.randint(0, 1)
+ self.tsc = random.choice(self.TSC_RANGE)
+
+ # C/I: Carrier-to-Interference ratio
+ self.ci = random.randint(self.CI_MIN, self.CI_MAX)
+
# Generates human-readable header description
def desc_hdr(self):
# Describe the common part
@@ -488,9 +710,61 @@ class DATAMSG_TRX2L1(DATAMSG):
if self.toa256 is not None:
result += ("toa256=%d " % self.toa256)
+ if self.ver >= 0x01:
+ if not self.nope_ind:
+ if self.mod_type is not None:
+ result += ("%s " % self.mod_type)
+ if self.tsc_set is not None:
+ result += ("set=%u " % self.tsc_set)
+ if self.tsc is not None:
+ result += ("tsc=%u " % self.tsc)
+ if self.ci is not None:
+ result += ("C/I=%d cB " % self.ci)
+ else:
+ result += "(IDLE / NOPE IND) "
+
# Strip useless whitespace and return
return result.strip()
+ # Encodes Modulation and Training Sequence info
+ def gen_mts(self):
+ # IDLE / nope indication has no MTS info
+ if self.nope_ind:
+ return self.NOPE_IND
+
+ # TSC: . . . . . X X X
+ mts = self.tsc & 0b111
+
+ # MTS: . X X X X . . .
+ mts |= self.mod_type.coding << 3
+ mts |= self.tsc_set << 3
+
+ return mts
+
+ # Parses Modulation and Training Sequence info
+ def parse_mts(self, mts):
+ # IDLE / nope indication has no MTS info
+ self.nope_ind = (mts & self.NOPE_IND) > 0
+ if self.nope_ind:
+ self.mod_type = None
+ self.tsc_set = None
+ self.tsc = None
+ return
+
+ # TSC: . . . . . X X X
+ self.tsc = mts & 0b111
+
+ # MTS: . X X X X . . .
+ mts = (mts >> 3) & 0b1111
+ if (mts & 0b1100) > 0:
+ # Mask: . . . . M M M S
+ self.mod_type = Modulation.pick(mts & 0b1110)
+ self.tsc_set = mts & 0b1
+ else:
+ # GMSK: . . . . 0 0 S S
+ self.mod_type = Modulation.ModGMSK
+ self.tsc_set = mts & 0b11
+
# Generates message specific header part
def gen_hdr(self):
# Allocate an empty byte-array
@@ -503,6 +777,17 @@ class DATAMSG_TRX2L1(DATAMSG):
# Big endian, 2 bytes (int32_t)
buf += struct.pack(">h", self.toa256)
+ if self.ver >= 0x01:
+ # Modulation and Training Sequence info
+ mts = self.gen_mts()
+ buf.append(mts)
+
+ # C/I: Carrier-to-Interference ratio (in centiBels)
+ if not self.nope_ind:
+ buf += struct.pack(">h", self.ci)
+ else:
+ buf += bytearray(2)
+
return buf
# Parses message specific header part
@@ -513,6 +798,16 @@ class DATAMSG_TRX2L1(DATAMSG):
# Parse ToA (Time of Arrival)
self.toa256 = struct.unpack(">h", hdr[6:8])[0]
+ if self.ver >= 0x01:
+ # Modulation and Training Sequence info
+ self.parse_mts(hdr[8])
+
+ # C/I: Carrier-to-Interference ratio (in centiBels)
+ if not self.nope_ind:
+ self.ci = struct.unpack(">h", hdr[9:11])[0]
+ else:
+ self.ci = None
+
# Generates message specific burst
def gen_burst(self):
# Convert soft-bits to unsigned soft-bits
@@ -521,26 +816,38 @@ class DATAMSG_TRX2L1(DATAMSG):
# Encode to bytes
return bytearray(burst_usbits)
+ # Parses message specific burst for header version 0
+ def _parse_burst_v0(self, burst):
+ bl = len(burst)
+
+ # We need to guess modulation by the length of burst
+ self.mod_type = Modulation.pick_by_bl(bl)
+ if self.mod_type is None:
+ # Some old transceivers append two dummy bytes
+ self.mod_type = Modulation.pick_by_bl(bl - 2)
+
+ if self.mod_type is None:
+ raise ValueError("Odd burst length")
+
+ return burst[:self.mod_type.bl]
+
# Parses message specific burst
def parse_burst(self, burst):
- length = len(burst)
+ burst = list(burst)
- # Distinguish between GSM and EDGE
- if length >= EDGE_BURST_LEN:
- burst_usbits = list(burst[:EDGE_BURST_LEN])
- else:
- burst_usbits = list(burst[:GSM_BURST_LEN])
+ if self.ver == 0x00:
+ burst = self._parse_burst_v0(burst)
# Convert unsigned soft-bits to soft-bits
- burst_sbits = self.usbit2sbit(burst_usbits)
-
- # Save
- self.burst = burst_sbits
+ self.burst = self.usbit2sbit(burst)
# Generate a random message specific burst
- def rand_burst(self, length = GSM_BURST_LEN):
+ def rand_burst(self, length = None):
self.burst = []
+ if length is None:
+ length = self.mod_type.bl
+
for i in range(length):
sbit = random.randint(-127, 127)
self.burst.append(sbit)
@@ -697,12 +1004,20 @@ if __name__ == '__main__':
assert(msg_l12trx_dec.tn == msg_l12trx.tn)
assert(msg_trx2l1_dec.fn == msg_trx2l1.fn)
+ # Match version specific fields
+ if msg_trx2l1.ver >= 0x01:
+ assert(msg_trx2l1_dec.nope_ind == msg_trx2l1.nope_ind)
+ assert(msg_trx2l1_dec.mod_type == msg_trx2l1.mod_type)
+ assert(msg_trx2l1_dec.tsc_set == msg_trx2l1.tsc_set)
+ assert(msg_trx2l1_dec.tsc == msg_trx2l1.tsc)
+ assert(msg_trx2l1_dec.ci == msg_trx2l1.ci)
+
+ log.info("Check header version %u coding: OK" % ver)
+
# Compare bursts
assert(msg_l12trx_dec.burst == msg_l12trx.burst)
assert(msg_trx2l1_dec.burst == msg_trx2l1.burst)
- log.info("Check header version %u coding: OK" % ver)
-
msg_trx2l1_gen = msg_l12trx.gen_trx2l1()
msg_l12trx_gen = msg_trx2l1.gen_l12trx()
@@ -717,4 +1032,18 @@ if __name__ == '__main__':
assert(msg_trx2l1_gen.tn == msg_l12trx.tn)
assert(msg_l12trx_gen.fn == msg_trx2l1.fn)
- log.info("Verify direct transformation: OK")
+ log.info("Verify version %u direct transformation: OK" % ver)
+
+ # Verify NOPE indication coding
+ if msg_trx2l1.ver >= 0x01:
+ msg_trx2l1 = DATAMSG_TRX2L1(ver = ver)
+ msg_trx2l1.nope_ind = True
+ msg_trx2l1.rand_hdr()
+
+ msg_trx2l1_dec = DATAMSG_TRX2L1()
+ msg_trx2l1_dec.parse_msg(msg_trx2l1.gen_msg())
+
+ assert(msg_trx2l1.nope_ind == msg_trx2l1_dec.nope_ind)
+ assert(msg_trx2l1.burst == msg_trx2l1_dec.burst)
+
+ log.info("Verify version %u NOPE indication coding: OK" % ver)