summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/burst_fwd.py
blob: 8f69fbae6a568c05d557ee6fffbedb4e2d7efc9a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# TRX Toolkit
# BTS <-> BB burst forwarding
#
# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import random

from data_msg import *

class BurstForwarder:
	# Timeslot filter (drop everything by default)
	ts_pass = None

	# Freq. filter
	bts_freq = None
	bb_freq = None

	# Randomization of RSSI
	randomize_dl_rssi = False
	randomize_ul_rssi = False

	# Randomization of ToA
	randomize_dl_toa256 = False
	randomize_ul_toa256 = False

	# Timing Advance value indicated by MS (0 by default)
	# Valid range: 0..63, where each unit means
	# one GSM symbol advance.
	ta = 0

	# Timing of Arrival values indicated by transceiver
	# in units of 1/256 of GSM symbol periods. A pair of
	# base and threshold values defines a range of ToA value
	# randomization: from (base - threshold) to (base + threshold).
	toa256_dl_base = 0
	toa256_ul_base = 0

	toa256_dl_threshold = 128
	toa256_ul_threshold = 128

	# RSSI values indicated by transceiver in dBm.
	# A pair of base and threshold values defines a range of RSSI
	# randomization: from (base - threshold) to (base + threshold).
	rssi_dl_base = -60
	rssi_ul_base = -70

	rssi_dl_threshold = 10
	rssi_ul_threshold = 5

	# Path loss simulation: DL/UL burst dropping
	# Indicates how many bursts should be dropped
	# and which dropping period is used. By default,
	# period is 1, i.e. every burst (fn % 1 is always 0)
	burst_dl_drop_amount = 0
	burst_ul_drop_amount = 0
	burst_dl_drop_period = 1
	burst_ul_drop_period = 1

	def __init__(self, bts_link, bb_link):
		self.bts_link = bts_link
		self.bb_link = bb_link

	# Converts TA value from symbols to
	# units of 1/256 of GSM symbol periods
	def calc_ta256(self):
		return self.ta * 256

	# Calculates a random ToA value for Downlink bursts
	def calc_dl_toa256(self):
		# Check if randomization is required
		if not self.randomize_dl_toa256:
			return self.toa256_dl_base

		# Calculate a range for randomization
		toa256_min = self.toa256_dl_base - self.toa256_dl_threshold
		toa256_max = self.toa256_dl_base + self.toa256_dl_threshold

		# Generate a random ToA value
		toa256 = random.randint(toa256_min, toa256_max)

		return toa256

	# Calculates a random ToA value for Uplink bursts
	def calc_ul_toa256(self):
		# Check if randomization is required
		if not self.randomize_ul_toa256:
			return self.toa256_ul_base

		# Calculate a range for randomization
		toa256_min = self.toa256_ul_base - self.toa256_ul_threshold
		toa256_max = self.toa256_ul_base + self.toa256_ul_threshold

		# Generate a random ToA value
		toa256 = random.randint(toa256_min, toa256_max)

		return toa256

	# Calculates a random RSSI value for Downlink bursts
	def calc_dl_rssi(self):
		# Check if randomization is required
		if not self.randomize_dl_rssi:
			return self.rssi_dl_base

		# Calculate a range for randomization
		rssi_min = self.rssi_dl_base - self.rssi_dl_threshold
		rssi_max = self.rssi_dl_base + self.rssi_dl_threshold

		# Generate a random RSSI value
		return random.randint(rssi_min, rssi_max)

	# Calculates a random RSSI value for Uplink bursts
	def calc_ul_rssi(self):
		# Check if randomization is required
		if not self.randomize_ul_rssi:
			return self.rssi_ul_base

		# Calculate a range for randomization
		rssi_min = self.rssi_ul_base - self.rssi_ul_threshold
		rssi_max = self.rssi_ul_base + self.rssi_ul_threshold

		# Generate a random RSSI value
		return random.randint(rssi_min, rssi_max)

	# DL path loss simulation
	def path_loss_sim_dl(self, msg):
		# Burst dropping
		if self.burst_dl_drop_amount > 0:
			if msg.fn % self.burst_dl_drop_period == 0:
				print("[~] Simulation: dropping DL burst (fn=%u %% %u == 0)"
					% (msg.fn, self.burst_dl_drop_period))
				self.burst_dl_drop_amount -= 1
				return None

		return msg

	# UL path loss simulation
	def path_loss_sim_ul(self, msg):
		# Burst dropping
		if self.burst_ul_drop_amount > 0:
			if msg.fn % self.burst_ul_drop_period == 0:
				print("[~] Simulation: dropping UL burst (fn=%u %% %u == 0)"
					% (msg.fn, self.burst_ul_drop_period))
				self.burst_ul_drop_amount -= 1
				return None

		return msg

	# DL burst preprocessing
	def preprocess_dl_burst(self, msg):
		# Calculate both RSSI and ToA values
		msg.toa256 = self.calc_dl_toa256()
		msg.rssi = self.calc_dl_rssi()

	# UL burst preprocessing
	def preprocess_ul_burst(self, msg):
		# Calculate both RSSI and ToA values,
		# also apply Timing Advance
		msg.toa256 = self.calc_ul_toa256()
		msg.toa256 -= self.calc_ta256()
		msg.rssi = self.calc_ul_rssi()
		print("[d] preprocess msg: toa256=%d rssi=%d (rand? %r)" % (msg.toa256, msg.rssi, repr(self.randomize_ul_rssi)))

	# Converts a L12TRX message to TRX2L1 message
	def transform_msg(self, msg_raw):
		# Attempt to parse a message
		try:
			msg_l12trx = DATAMSG_L12TRX()
			msg_l12trx.parse_msg(bytearray(msg_raw))
		except:
			print("[!] Dropping unhandled DL message...")
			return None

		# Compose a new message for L1
		return msg_l12trx.gen_trx2l1()

	# Downlink handler: BTS -> BB
	def bts2bb(self):
		# Read data from socket
		data, addr = self.bts_link.sock.recvfrom(512)

		# BB is not connected / tuned
		if self.bb_freq is None:
			return None

		# Freq. filter
		if self.bb_freq != self.bts_freq:
			return None

		# Process a message
		msg = self.transform_msg(data)
		if msg is None:
			return None

		# Timeslot filter
		if msg.tn != self.ts_pass:
			return None

		# Path loss simulation
		msg = self.path_loss_sim_dl(msg)
		if msg is None:
			return None

		# Burst preprocessing
		self.preprocess_dl_burst(msg)

		# Validate and generate the payload
		payload = msg.gen_msg()

		# Append two unused bytes at the end
		# in order to keep the compatibility
		payload += bytearray(2)

		# Send burst to BB
		self.bb_link.send(payload)

	# Uplink handler: BB -> BTS
	def bb2bts(self):
		# Read data from socket
		data, addr = self.bb_link.sock.recvfrom(512)

		# BTS is not connected / tuned
		if self.bts_freq is None:
			return None

		# Freq. filter
		if self.bb_freq != self.bts_freq:
			return None

		# Process a message
		msg = self.transform_msg(data)
		if msg is None:
			return None

		# Path loss simulation
		msg = self.path_loss_sim_ul(msg)
		if msg is None:
			return None

		# Burst preprocessing
		self.preprocess_ul_burst(msg)

		# Validate and generate the payload
		payload = msg.gen_msg()

		# Append two unused bytes at the end
		# in order to keep the compatibility
		payload += bytearray(2)

		# Send burst to BTS
		self.bts_link.send(payload)