From 23914b9cf855de8e847d450c70494626da2216e6 Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Tue, 13 Mar 2018 01:09:56 +0700 Subject: Rename 'fake_trx' to 'trx_toolkit' This toolkit has branched out into several different tools for TRX interface hacking, and creating a virtual Um-interface (FakeTRX) is only one of its potential applications. Change-Id: I56bcbc76b9c273d6b469a2bb68ddc46f3980e835 --- src/target/trx_toolkit/data_msg.py | 545 +++++++++++++++++++++++++++++++++++++ 1 file changed, 545 insertions(+) create mode 100644 src/target/trx_toolkit/data_msg.py (limited to 'src/target/trx_toolkit/data_msg.py') diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py new file mode 100644 index 00000000..ea415ab9 --- /dev/null +++ b/src/target/trx_toolkit/data_msg.py @@ -0,0 +1,545 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# DATA interface message definitions and helpers +# +# (C) 2018 by Vadim Yanitskiy +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import random +import struct + +from gsm_shared import * + +class DATAMSG: + # Common message fields + burst = None + fn = None + tn = None + + # Common constructor + def __init__(self, fn = None, tn = None, burst = None): + self.burst = burst + self.fn = fn + self.tn = tn + + # Generates message specific header + def gen_hdr(self): + raise NotImplementedError + + # Parses message specific header + def parse_hdr(self, hdr): + raise NotImplementedError + + # Generates message specific burst + def gen_burst(self): + raise NotImplementedError + + # Parses message specific burst + def parse_burst(self, burst): + raise NotImplementedError + + # Generates a random frame number + def rand_fn(self): + return random.randint(0, GSM_HYPERFRAME) + + # Generates a random timeslot number + def rand_tn(self): + return random.randint(0, 7) + + # Randomizes the message header + def rand_hdr(self): + self.fn = self.rand_fn() + self.tn = self.rand_tn() + + # Generates human-readable header description + def desc_hdr(self): + result = "" + + if self.fn is not None: + result += ("fn=%u " % self.fn) + + if self.tn is not None: + result += ("tn=%u " % self.tn) + + return result + + # Converts unsigned soft-bits {254..0} to soft-bits {-127..127} + def usbit2sbit(self, bits): + buf = [] + + for bit in bits: + if bit == 0xff: + buf.append(-127) + else: + buf.append(127 - bit) + + return buf + + # Converts soft-bits {-127..127} to unsigned soft-bits {254..0} + def sbit2usbit(self, bits): + buf = [] + + for bit in bits: + buf.append(127 - bit) + + return buf + + # Converts soft-bits {-127..127} to bits {1..0} + def sbit2ubit(self, bits): + buf = [] + + for bit in bits: + buf.append(1 if bit < 0 else 0) + + return buf + + # Converts bits {1..0} to soft-bits {-127..127} + def ubit2sbit(self, bits): + buf = [] + + for bit in bits: + buf.append(-127 if bit else 127) + + return buf + + # Validates the message fields + def validate(self): + if self.burst is None: + return False + + if len(self.burst) not in (GSM_BURST_LEN, EDGE_BURST_LEN): + return False + + if self.fn is None: + return False + + if self.fn < 0 or self.fn > GSM_HYPERFRAME: + return False + + if self.tn is None: + return False + + if self.tn < 0 or self.tn > 7: + return False + + return True + + # Generates frame number to bytes + def gen_fn(self, fn): + # Allocate an empty byte-array + buf = bytearray() + + # Big endian, 4 bytes + buf.append((fn >> 24) & 0xff) + buf.append((fn >> 16) & 0xff) + buf.append((fn >> 8) & 0xff) + buf.append((fn >> 0) & 0xff) + + return buf + + # Parses frame number from bytes + def parse_fn(self, buf): + # Big endian, 4 bytes + return (buf[0] << 24) \ + | (buf[1] << 16) \ + | (buf[2] << 8) \ + | (buf[3] << 0) + + # Generates a TRX DATA message + def gen_msg(self): + # Validate all the fields + if not self.validate(): + raise ValueError("Message incomplete or incorrect") + + # Allocate an empty byte-array + buf = bytearray() + + # Put timeslot index + buf.append(self.tn) + + # Put frame number + fn = self.gen_fn(self.fn) + buf += fn + + # Generate message specific header part + hdr = self.gen_hdr() + buf += hdr + + # Generate burst + buf += self.gen_burst() + + return buf + + # Parses a TRX DATA message + def parse_msg(self, msg): + # Calculate message length + length = len(msg) + + # Check length + if length < (self.HDR_LEN + GSM_BURST_LEN): + raise ValueError("Message is to short") + + # Parse both fn and tn + self.fn = self.parse_fn(msg[1:]) + self.tn = msg[0] + + # Specific message part + self.parse_hdr(msg) + + # Copy burst, skipping header + msg_burst = msg[self.HDR_LEN:] + self.parse_burst(msg_burst) + +class DATAMSG_L12TRX(DATAMSG): + # Constants + HDR_LEN = 6 + PWR_MIN = 0x00 + PWR_MAX = 0xff + + # Specific message fields + pwr = None + + # Validates the message fields + def validate(self): + # Validate common fields + if not DATAMSG.validate(self): + return False + + if self.pwr is None: + return False + + if self.pwr < self.PWR_MIN or self.pwr > self.PWR_MAX: + return False + + return True + + # Generates a random power level + def rand_pwr(self, min = None, max = None): + if min is None: + min = self.PWR_MIN + + if max is None: + max = self.PWR_MAX + + return random.randint(min, max) + + # Randomizes message specific header + def rand_hdr(self): + DATAMSG.rand_hdr(self) + self.pwr = self.rand_pwr() + + # Generates human-readable header description + def desc_hdr(self): + # Describe the common part + result = DATAMSG.desc_hdr(self) + + if self.pwr is not None: + result += ("pwr=%u " % self.pwr) + + # Strip useless whitespace and return + return result.strip() + + # Generates message specific header part + def gen_hdr(self): + # Allocate an empty byte-array + buf = bytearray() + + # Put power + buf.append(self.pwr) + + return buf + + # Parses message specific header part + def parse_hdr(self, hdr): + # Parse power level + self.pwr = hdr[5] + + # Generates message specific burst + def gen_burst(self): + # Copy burst 'as is' + return bytearray(self.burst) + + # Parses message specific burst + def parse_burst(self, burst): + length = len(burst) + + # Distinguish between GSM and EDGE + if length >= EDGE_BURST_LEN: + self.burst = list(burst[:EDGE_BURST_LEN]) + else: + self.burst = list(burst[:GSM_BURST_LEN]) + + # Transforms this message to TRX2L1 message + def gen_trx2l1(self): + # Allocate a new message + msg = DATAMSG_TRX2L1(fn = self.fn, tn = self.tn) + + # Convert burst bits + if self.burst is not None: + msg.burst = self.ubit2sbit(self.burst) + + return msg + +class DATAMSG_TRX2L1(DATAMSG): + # Constants + HDR_LEN = 8 + RSSI_MIN = -120 + RSSI_MAX = -50 + + # TODO: verify this range + TOA256_MIN = -256 * 200 + TOA256_MAX = 256 * 200 + + # Specific message fields + rssi = None + toa256 = None + + # Validates the message fields + def validate(self): + # Validate common fields + if not DATAMSG.validate(self): + return False + + if self.rssi is None: + return False + + if self.rssi < self.RSSI_MIN or self.rssi > self.RSSI_MAX: + return False + + if self.toa256 is None: + return False + + if self.toa256 < self.TOA256_MIN or self.toa256 > self.TOA256_MAX: + return False + + return True + + # Generates a random RSSI value + def rand_rssi(self, min = None, max = None): + if min is None: + min = self.RSSI_MIN + + if max is None: + max = self.RSSI_MAX + + return random.randint(min, max) + + # Generates a ToA (Time of Arrival) value + def rand_toa256(self, min = None, max = None): + if min is None: + min = self.TOA256_MIN + + if max is None: + max = self.TOA256_MAX + + return random.randint(min, max) + + # Randomizes message specific header + def rand_hdr(self): + DATAMSG.rand_hdr(self) + self.rssi = self.rand_rssi() + self.toa256 = self.rand_toa256() + + # Generates human-readable header description + def desc_hdr(self): + # Describe the common part + result = DATAMSG.desc_hdr(self) + + if self.rssi is not None: + result += ("rssi=%d " % self.rssi) + + if self.toa256 is not None: + result += ("toa256=%d " % self.toa256) + + # Strip useless whitespace and return + return result.strip() + + # Generates message specific header part + def gen_hdr(self): + # Allocate an empty byte-array + buf = bytearray() + + # Put RSSI + buf.append(-self.rssi) + + # Encode ToA (Time of Arrival) + # Big endian, 2 bytes (int32_t) + buf.append((self.toa256 >> 8) & 0xff) + buf.append(self.toa256 & 0xff) + + return buf + + # Parses message specific header part + def parse_hdr(self, hdr): + # Parse RSSI + self.rssi = -(hdr[5]) + + # Parse ToA (Time of Arrival) + self.toa256 = struct.unpack(">h", hdr[6:8])[0] + + # Generates message specific burst + def gen_burst(self): + # Convert soft-bits to unsigned soft-bits + burst_usbits = self.sbit2usbit(self.burst) + + # Encode to bytes + return bytearray(burst_usbits) + + # Parses message specific burst + def parse_burst(self, burst): + length = len(burst) + + # Distinguish between GSM and EDGE + if length >= EDGE_BURST_LEN: + burst_usbits = list(burst[:EDGE_BURST_LEN]) + else: + burst_usbits = list(burst[:GSM_BURST_LEN]) + + # Convert unsigned soft-bits to soft-bits + burst_sbits = self.usbit2sbit(burst_usbits) + + # Save + self.burst = burst_sbits + + # Transforms this message to L12TRX message + def gen_l12trx(self): + # Allocate a new message + msg = DATAMSG_L12TRX(fn = self.fn, tn = self.tn) + + # Convert burst bits + if self.burst is not None: + msg.burst = self.sbit2ubit(self.burst) + + return msg + +# Regression test +if __name__ == '__main__': + # Common reference data + fn = 1024 + tn = 0 + + # Generate two random bursts + burst_l12trx_ref = [] + burst_trx2l1_ref = [] + + for i in range(0, GSM_BURST_LEN): + ubit = random.randint(0, 1) + burst_l12trx_ref.append(ubit) + + sbit = random.randint(-127, 127) + burst_trx2l1_ref.append(sbit) + + print("[i] Generating the reference messages") + + # Create messages of both types + msg_l12trx_ref = DATAMSG_L12TRX(fn = fn, tn = tn) + msg_trx2l1_ref = DATAMSG_TRX2L1(fn = fn, tn = tn) + + # Fill in message specific fields + msg_trx2l1_ref.rssi = -88 + msg_l12trx_ref.pwr = 0x33 + msg_trx2l1_ref.toa256 = -256 + + # Specify the reference bursts + msg_l12trx_ref.burst = burst_l12trx_ref + msg_trx2l1_ref.burst = burst_trx2l1_ref + + print("[i] Encoding the reference messages") + + # Encode DATA messages + l12trx_raw = msg_l12trx_ref.gen_msg() + trx2l1_raw = msg_trx2l1_ref.gen_msg() + + print("[i] Parsing generated messages back") + + # Parse generated DATA messages + msg_l12trx_dec = DATAMSG_L12TRX() + msg_trx2l1_dec = DATAMSG_TRX2L1() + msg_l12trx_dec.parse_msg(l12trx_raw) + msg_trx2l1_dec.parse_msg(trx2l1_raw) + + print("[i] Comparing decoded messages with the reference") + + # Compare bursts + assert(msg_l12trx_dec.burst == burst_l12trx_ref) + assert(msg_trx2l1_dec.burst == burst_trx2l1_ref) + + print("[?] Compare bursts: OK") + + # Compare both parsed messages with the reference data + assert(msg_l12trx_dec.fn == fn) + assert(msg_trx2l1_dec.fn == fn) + assert(msg_l12trx_dec.tn == tn) + assert(msg_trx2l1_dec.tn == tn) + + print("[?] Compare FN / TN: OK") + + # Compare message specific parts + assert(msg_trx2l1_dec.rssi == msg_trx2l1_ref.rssi) + assert(msg_l12trx_dec.pwr == msg_l12trx_ref.pwr) + assert(msg_trx2l1_dec.toa256 == msg_trx2l1_ref.toa256) + + print("[?] Compare message specific data: OK") + + # Validate header randomization + for i in range(0, 100): + msg_l12trx_ref.rand_hdr() + msg_trx2l1_ref.rand_hdr() + + assert(msg_l12trx_ref.validate()) + assert(msg_trx2l1_ref.validate()) + + print("[?] Validate header randomization: OK") + + # Bit conversation test + usbits_ref = list(range(0, 256)) + sbits_ref = list(range(-127, 128)) + + # Test both usbit2sbit() and sbit2usbit() + sbits = msg_trx2l1_ref.usbit2sbit(usbits_ref) + usbits = msg_trx2l1_ref.sbit2usbit(sbits) + assert(usbits[:255] == usbits_ref[:255]) + assert(usbits[255] == 254) + + print("[?] Check both usbit2sbit() and sbit2usbit(): OK") + + # Test both sbit2ubit() and ubit2sbit() + ubits = msg_trx2l1_ref.sbit2ubit(sbits_ref) + assert(ubits == ([1] * 127 + [0] * 128)) + + sbits = msg_trx2l1_ref.ubit2sbit(ubits) + assert(sbits == ([-127] * 127 + [127] * 128)) + + print("[?] Check both sbit2ubit() and ubit2sbit(): OK") + + # Test message transformation + msg_l12trx_dec = msg_trx2l1_ref.gen_l12trx() + msg_trx2l1_dec = msg_l12trx_ref.gen_trx2l1() + + assert(msg_l12trx_dec.fn == msg_trx2l1_ref.fn) + assert(msg_l12trx_dec.tn == msg_trx2l1_ref.tn) + + assert(msg_trx2l1_dec.fn == msg_l12trx_ref.fn) + assert(msg_trx2l1_dec.tn == msg_l12trx_ref.tn) + + assert(msg_l12trx_dec.burst == msg_l12trx_dec.sbit2ubit(burst_trx2l1_ref)) + assert(msg_trx2l1_dec.burst == msg_trx2l1_dec.ubit2sbit(burst_l12trx_ref)) + + print("[?] Check L12TRX <-> TRX2L1 type transformations: OK") -- cgit v1.2.3