diff options
Diffstat (limited to 'src/target/trx_toolkit/data_msg.py')
-rw-r--r-- | src/target/trx_toolkit/data_msg.py | 395 |
1 files changed, 362 insertions, 33 deletions
diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py index 5bd04cac..a2996abe 100644 --- a/src/target/trx_toolkit/data_msg.py +++ b/src/target/trx_toolkit/data_msg.py @@ -25,8 +25,37 @@ import random import struct +from enum import Enum from gsm_shared import * +class Modulation(Enum): + """ Modulation types defined in 3GPP TS 45.002 """ + ModGMSK = (0b0000, 148) + Mod8PSK = (0b0100, 444) + ModAQPSK = (0b0110, 296) + Mod16QAM = (0b1000, 592) + Mod32QAM = (0b1010, 740) + + def __init__(self, coding, bl): + # Coding in TRXD header + self.coding = coding + # Burst length + self.bl = bl + + @classmethod + def pick(self, coding): + for mod in list(self): + if mod.coding == coding: + return mod + return None + + @classmethod + def pick_by_bl(self, bl): + for mod in list(self): + if mod.bl == bl: + return mod + return None + class DATAMSG: """ TRXD (DATA) message codec (common part). @@ -92,7 +121,7 @@ class DATAMSG: # NOTE: up to 16 versions can be encoded CHDR_VERSION_MAX = 0b1111 - known_versions = [0x00] + known_versions = [0x00, 0x01] # Common constructor def __init__(self, fn = None, tn = None, burst = None, ver = 0): @@ -101,6 +130,12 @@ class DATAMSG: self.fn = fn self.tn = tn + # The common header length + @property + def CHDR_LEN(self): + # (VER + TN) + FN + return 1 + 4 + # Generates message specific header def gen_hdr(self): raise NotImplementedError @@ -197,12 +232,6 @@ class DATAMSG: if not self.ver in self.known_versions: return False - if self.burst is None: - return False - - if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN): - return False - if self.fn is None: return False @@ -237,7 +266,8 @@ class DATAMSG: buf += hdr # Generate burst - buf += self.gen_burst() + if self.burst is not None: + buf += self.gen_burst() # This is a rudiment from (legacy) OpenBTS transceiver, # some L1 implementations still expect two dummy bytes. @@ -248,11 +278,8 @@ class DATAMSG: # Parses a TRX DATA message def parse_msg(self, msg): - # Calculate message length - length = len(msg) - - # Check length - if length < (self.HDR_LEN + GSM_BURST_LEN): + # Make sure we have at least header + if len(msg) < self.HDR_LEN: raise ValueError("Message is to short") # Parse version and TDMA TN @@ -267,7 +294,10 @@ class DATAMSG: # Copy burst, skipping header msg_burst = msg[self.HDR_LEN:] - self.parse_burst(msg_burst) + if len(msg_burst) > 0: + self.parse_burst(msg_burst) + else: + self.burst = None class DATAMSG_L12TRX(DATAMSG): """ L12TRX (L1 -> TRX) message codec. @@ -276,6 +306,8 @@ class DATAMSG_L12TRX(DATAMSG): or an Uplink burst on the MS side, and has the following message specific fixed-size header preceding the burst bits: + == Versions 0x00, 0x01 + +-----+--------------------+ | PWR | hard-bits (1 or 0) | +-----+--------------------+ @@ -290,13 +322,26 @@ class DATAMSG_L12TRX(DATAMSG): """ # Constants - HDR_LEN = 6 PWR_MIN = 0x00 PWR_MAX = 0xff # Specific message fields pwr = None + # Calculates header length depending on its version + @property + def HDR_LEN(self): + # Common header length + length = self.CHDR_LEN + + # Message specific header length + if self.ver in (0x00, 0x01): + length += 1 # PWR + else: + raise IndexError("Unhandled version %u" % self.ver) + + return length + # Validates the message fields def validate(self): # Validate common fields @@ -309,6 +354,14 @@ class DATAMSG_L12TRX(DATAMSG): if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX: return False + # FIXME: properly handle IDLE / NOPE indications + if self.burst is None: + return False + + # FIXME: properly handle IDLE / NOPE indications + if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN): + return False + return True # Generates a random power level @@ -394,16 +447,78 @@ class DATAMSG_TRX2L1(DATAMSG): or a Downlink burst on the MS side, and has the following message specific fixed-size header preceding the burst bits: + == Version 0x00 + +------+-----+--------------------+ | RSSI | ToA | soft-bits (254..0) | +------+-----+--------------------+ + == Version 0x01 + + +------+-----+-----+-----+--------------------+ + | RSSI | ToA | MTS | C/I | soft-bits (254..0) | + +------+-----+-----+-----+--------------------+ + where: - RSSI (1 octet) - Received Signal Strength Indication encoded without the negative sign. - ToA (2 octets) - Timing of Arrival in units of 1/256 of symbol (big endian). + - MTS (1 octet) - Modulation and Training Sequence info. + - C/I (2 octets) - Carrier-to-Interference ratio (big endian). + + == Coding of MTS: Modulation and Training Sequence info + + 3GPP TS 45.002 version 15.1.0 defines several modulation types, + and a few sets of training sequences for each type. The most + common are GMSK and 8-PSK (which is used in EDGE). + + +-----------------+---------------------------------------+ + | 7 6 5 4 3 2 1 0 | bit numbers (value range) | + +-----------------+---------------------------------------+ + | . . . . . X X X | Training Sequence Code (0..7) | + +-----------------+---------------------------------------+ + | . X X X X . . . | Modulation, TS set number (see below) | + +-----------------+---------------------------------------+ + | X . . . . . . . | IDLE / nope frame indication (0 or 1) | + +-----------------+---------------------------------------+ + + The bit number 7 (MSB) is set to high when either nothing has been + detected, or during IDLE frames, so we can deliver noise levels, + and avoid clock gaps on the L1 side. Other bits are ignored, + and should be set to low (0) in this case. L16 shall be set to 0x00. + + == Coding of modulation and TS set number + + GMSK has 4 sets of training sequences (see tables 5.2.3a-d), + while 8-PSK (see tables 5.2.3f-g) and the others have 2 sets. + Access and Synchronization bursts also have several synch. + sequences. + + +-----------------+---------------------------------------+ + | 7 6 5 4 3 2 1 0 | bit numbers (value range) | + +-----------------+---------------------------------------+ + | . 0 0 X X . . . | GMSK, 4 TS sets (0..3) | + +-----------------+---------------------------------------+ + | . 0 1 0 X . . . | 8-PSK, 2 TS sets (0..1) | + +-----------------+---------------------------------------+ + | . 0 1 1 X . . . | AQPSK, 2 TS sets (0..1) | + +-----------------+---------------------------------------+ + | . 1 0 0 X . . . | 16QAM, 2 TS sets (0..1) | + +-----------------+---------------------------------------+ + | . 1 0 1 X . . . | 32QAM, 2 TS sets (0..1) | + +-----------------+---------------------------------------+ + | . 1 1 1 X . . . | RESERVED (0) | + +-----------------+---------------------------------------+ + + == C/I: Carrier-to-Interference ratio + + The C/I value can be computed from the training sequence of each + burst, where we can compare the "ideal" training sequence with + the actual training sequence and then express that in centiBels. + + == Coding of the burst bits Unlike to be transmitted bursts, the received bursts are designated using the soft-bits notation, so the receiver can indicate its @@ -416,9 +531,6 @@ class DATAMSG_TRX2L1(DATAMSG): """ - # Constants - HDR_LEN = 8 - # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise RSSI_MIN = -120 RSSI_MAX = -47 @@ -427,11 +539,81 @@ class DATAMSG_TRX2L1(DATAMSG): TOA256_MIN = -32768 TOA256_MAX = 32767 + # TSC (Training Sequence Code) range + TSC_RANGE = range(0, 8) + + # C/I range (in centiBels) + CI_MIN = -1280 + CI_MAX = 1280 + + # IDLE frame / nope detection indicator + NOPE_IND = (1 << 7) + # Specific message fields rssi = None toa256 = None - # Validates the message fields + # Version 0x01 specific (default values) + mod_type = Modulation.ModGMSK + nope_ind = False + + tsc_set = None + tsc = None + ci = None + + # Calculates header length depending on its version + @property + def HDR_LEN(self): + # Common header length + length = self.CHDR_LEN + + # Message specific header length + if self.ver == 0x00: + # RSSI + ToA + length += 1 + 2 + elif self.ver == 0x01: + # RSSI + ToA + TS + C/I + length += 1 + 2 + 1 + 2 + else: + raise IndexError("Unhandled version %u" % self.ver) + + return length + + def _validate_burst_v0(self): + # Burst is mandatory + if self.burst is None: + return False + + # ... and can be either of GSM (GMSK) or EDGE (8-PSK) + if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN): + return False + + return True + + def _validate_burst_v1(self): + # Burst is omitted in case of an IDLE / NOPE indication + if self.nope_ind and self.burst is None: + return True + if self.nope_ind and self.burst is not None: + return False + + if self.burst is None: + return False + + # Burst length depends on modulation type + if len(self.burst) != self.mod_type.bl: + return False + + return True + + # Validates the burst + def validate_burst(self): + if self.ver == 0x00: + return self._validate_burst_v0() + elif self.ver >= 0x01: + return self._validate_burst_v1() + + # Validates the message header fields def validate(self): # Validate common fields if not DATAMSG.validate(self): @@ -449,6 +631,35 @@ class DATAMSG_TRX2L1(DATAMSG): if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX: return False + if self.ver >= 0x01: + if type(self.mod_type) is not Modulation: + return False + + if self.tsc_set is None: + return False + + if self.mod_type is Modulation.ModGMSK: + if self.tsc_set not in range(0, 4): + return False + else: + if self.tsc_set not in range(0, 2): + return False + + if self.tsc is None: + return False + + if self.tsc not in self.TSC_RANGE: + return False + + if self.ci is None: + return False + + if self.ci < self.CI_MIN or self.ci > self.CI_MAX: + return False + + if not self.validate_burst(): + return False + return True # Generates a random RSSI value @@ -477,6 +688,17 @@ class DATAMSG_TRX2L1(DATAMSG): self.rssi = self.rand_rssi() self.toa256 = self.rand_toa256() + if self.ver >= 0x01: + self.mod_type = random.choice(list(Modulation)) + if self.mod_type is Modulation.ModGMSK: + self.tsc_set = random.randint(0, 3) + else: + self.tsc_set = random.randint(0, 1) + self.tsc = random.choice(self.TSC_RANGE) + + # C/I: Carrier-to-Interference ratio + self.ci = random.randint(self.CI_MIN, self.CI_MAX) + # Generates human-readable header description def desc_hdr(self): # Describe the common part @@ -488,9 +710,61 @@ class DATAMSG_TRX2L1(DATAMSG): if self.toa256 is not None: result += ("toa256=%d " % self.toa256) + if self.ver >= 0x01: + if not self.nope_ind: + if self.mod_type is not None: + result += ("%s " % self.mod_type) + if self.tsc_set is not None: + result += ("set=%u " % self.tsc_set) + if self.tsc is not None: + result += ("tsc=%u " % self.tsc) + if self.ci is not None: + result += ("C/I=%d cB " % self.ci) + else: + result += "(IDLE / NOPE IND) " + # Strip useless whitespace and return return result.strip() + # Encodes Modulation and Training Sequence info + def gen_mts(self): + # IDLE / nope indication has no MTS info + if self.nope_ind: + return self.NOPE_IND + + # TSC: . . . . . X X X + mts = self.tsc & 0b111 + + # MTS: . X X X X . . . + mts |= self.mod_type.coding << 3 + mts |= self.tsc_set << 3 + + return mts + + # Parses Modulation and Training Sequence info + def parse_mts(self, mts): + # IDLE / nope indication has no MTS info + self.nope_ind = (mts & self.NOPE_IND) > 0 + if self.nope_ind: + self.mod_type = None + self.tsc_set = None + self.tsc = None + return + + # TSC: . . . . . X X X + self.tsc = mts & 0b111 + + # MTS: . X X X X . . . + mts = (mts >> 3) & 0b1111 + if (mts & 0b1100) > 0: + # Mask: . . . . M M M S + self.mod_type = Modulation.pick(mts & 0b1110) + self.tsc_set = mts & 0b1 + else: + # GMSK: . . . . 0 0 S S + self.mod_type = Modulation.ModGMSK + self.tsc_set = mts & 0b11 + # Generates message specific header part def gen_hdr(self): # Allocate an empty byte-array @@ -503,6 +777,17 @@ class DATAMSG_TRX2L1(DATAMSG): # Big endian, 2 bytes (int32_t) buf += struct.pack(">h", self.toa256) + if self.ver >= 0x01: + # Modulation and Training Sequence info + mts = self.gen_mts() + buf.append(mts) + + # C/I: Carrier-to-Interference ratio (in centiBels) + if not self.nope_ind: + buf += struct.pack(">h", self.ci) + else: + buf += bytearray(2) + return buf # Parses message specific header part @@ -513,6 +798,16 @@ class DATAMSG_TRX2L1(DATAMSG): # Parse ToA (Time of Arrival) self.toa256 = struct.unpack(">h", hdr[6:8])[0] + if self.ver >= 0x01: + # Modulation and Training Sequence info + self.parse_mts(hdr[8]) + + # C/I: Carrier-to-Interference ratio (in centiBels) + if not self.nope_ind: + self.ci = struct.unpack(">h", hdr[9:11])[0] + else: + self.ci = None + # Generates message specific burst def gen_burst(self): # Convert soft-bits to unsigned soft-bits @@ -521,26 +816,38 @@ class DATAMSG_TRX2L1(DATAMSG): # Encode to bytes return bytearray(burst_usbits) + # Parses message specific burst for header version 0 + def _parse_burst_v0(self, burst): + bl = len(burst) + + # We need to guess modulation by the length of burst + self.mod_type = Modulation.pick_by_bl(bl) + if self.mod_type is None: + # Some old transceivers append two dummy bytes + self.mod_type = Modulation.pick_by_bl(bl - 2) + + if self.mod_type is None: + raise ValueError("Odd burst length") + + return burst[:self.mod_type.bl] + # Parses message specific burst def parse_burst(self, burst): - length = len(burst) + burst = list(burst) - # Distinguish between GSM and EDGE - if length >= EDGE_BURST_LEN: - burst_usbits = list(burst[:EDGE_BURST_LEN]) - else: - burst_usbits = list(burst[:GSM_BURST_LEN]) + if self.ver == 0x00: + burst = self._parse_burst_v0(burst) # Convert unsigned soft-bits to soft-bits - burst_sbits = self.usbit2sbit(burst_usbits) - - # Save - self.burst = burst_sbits + self.burst = self.usbit2sbit(burst) # Generate a random message specific burst - def rand_burst(self, length = GSM_BURST_LEN): + def rand_burst(self, length = None): self.burst = [] + if length is None: + length = self.mod_type.bl + for i in range(length): sbit = random.randint(-127, 127) self.burst.append(sbit) @@ -697,12 +1004,20 @@ if __name__ == '__main__': assert(msg_l12trx_dec.tn == msg_l12trx.tn) assert(msg_trx2l1_dec.fn == msg_trx2l1.fn) + # Match version specific fields + if msg_trx2l1.ver >= 0x01: + assert(msg_trx2l1_dec.nope_ind == msg_trx2l1.nope_ind) + assert(msg_trx2l1_dec.mod_type == msg_trx2l1.mod_type) + assert(msg_trx2l1_dec.tsc_set == msg_trx2l1.tsc_set) + assert(msg_trx2l1_dec.tsc == msg_trx2l1.tsc) + assert(msg_trx2l1_dec.ci == msg_trx2l1.ci) + + log.info("Check header version %u coding: OK" % ver) + # Compare bursts assert(msg_l12trx_dec.burst == msg_l12trx.burst) assert(msg_trx2l1_dec.burst == msg_trx2l1.burst) - log.info("Check header version %u coding: OK" % ver) - msg_trx2l1_gen = msg_l12trx.gen_trx2l1() msg_l12trx_gen = msg_trx2l1.gen_l12trx() @@ -717,4 +1032,18 @@ if __name__ == '__main__': assert(msg_trx2l1_gen.tn == msg_l12trx.tn) assert(msg_l12trx_gen.fn == msg_trx2l1.fn) - log.info("Verify direct transformation: OK") + log.info("Verify version %u direct transformation: OK" % ver) + + # Verify NOPE indication coding + if msg_trx2l1.ver >= 0x01: + msg_trx2l1 = DATAMSG_TRX2L1(ver = ver) + msg_trx2l1.nope_ind = True + msg_trx2l1.rand_hdr() + + msg_trx2l1_dec = DATAMSG_TRX2L1() + msg_trx2l1_dec.parse_msg(msg_trx2l1.gen_msg()) + + assert(msg_trx2l1.nope_ind == msg_trx2l1_dec.nope_ind) + assert(msg_trx2l1.burst == msg_trx2l1_dec.burst) + + log.info("Verify version %u NOPE indication coding: OK" % ver) |