summaryrefslogtreecommitdiffstats
path: root/src/target/trx_toolkit/burst_fwd.py
blob: f3eedddac4acc2613821c005a5891fb788c14497 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# TRX Toolkit
# BTS <-> BB burst forwarding
#
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import random

from data_msg import *

class BurstForwarder:
	""" Performs burst forwarding and preprocessing between MS and BTS.

	== Pass-filtering parameters

	BurstForwarder may drop or pass an UL/DL burst depending
	on the following parameters:

	  - bts_freq / bb_freq - the current BTS / MS frequency
	    that was set using RXTUNE control command. By default,
	    both freq. values are set to None, so nothing is being
	    forwarded (i.e. bursts are getting dropped).

	    FIXME: currently, we don't care about TXTUNE command
	    and transmit frequencies. It would be great to distinguish
	    between RX and TX frequencies for both BTS and MS.

	  - ts_pass - currently active timeslot, configured by the MS.
	    It can be activated or deactivated using SETSLOT control
	    command from the MS.

	    FIXME: only a single timeslot can be activated!
	    FIXME: there is no such list for the BTS side.

	== Preprocessing and measurement simulation

	Since this is a virtual environment, we can simulate different
	parameters of a virtual RF interface:

	  - ToA (Timing of Arrival) - measured difference between expected
	    and actual time of burst arrival in units of 1/256 of GSM symbol
	    periods. A pair of both base and threshold values defines a range
	    of ToA value randomization:

	      DL: from (toa256_dl_base - toa256_dl_threshold)
	            to (toa256_dl_base + toa256_dl_threshold),
	      UL: from (toa256_ul_base - toa256_ul_threshold)
	            to (toa256_ul_base + toa256_ul_threshold).

	  - RSSI (Received Signal Strength Indication) - measured "power" of
	    the signal (per burst) in dBm. A pair of both base and threshold
	    values defines a range of RSSI value randomization:

	      DL: from (rssi_dl_base - rssi_dl_threshold)
	            to (rssi_dl_base + rssi_dl_threshold),
	      UL: from (rssi_ul_base - rssi_ul_threshold)
	            to (rssi_ul_base + rssi_ul_threshold).

	Please note that the randomization of both RSSI and ToA
	is optional, and should be enabled manually.

	=== Timing Advance handling

	The BTS is using ToA measurements for UL bursts in order to calculate
	Timing Advance value, that is then indicated to a MS, which in its turn
	shall apply this value to the transmitted signal in order to compensate
	the delay. Basically, every burst is transmitted in advance defined by
	the indicated Timing Advance value. The valid range is 0..63, where
	each unit means one GSM symbol advance. The actual Timing Advance value
	is set using SETTA control command from MS. By default, it's set to 0.

	=== Path loss simulation - burst dropping

	In some cases, e.g. due to a weak signal or high interference, a burst
	can be lost, i.e. not detected by the receiver. This can also be
	simulated using FAKE_DROP command on both control interfaces:

	  - burst_{dl|ul}_drop_amount - the amount of DL/UL bursts
	      to be dropped (i.e. not forwarded towards the MS/BTS),

	  - burst_{dl|ul}_drop_period - drop every X DL/UL burst, e.g.
	    1 - drop every consequent burst, 2 - drop every second burst, etc.

	"""

	def __init__(self, bts_link, bb_link):
		self.bts_link = bts_link
		self.bb_link = bb_link

		# Randomization of RSSI
		randomize_dl_rssi = False
		randomize_ul_rssi = False

		# Randomization of ToA
		randomize_dl_toa256 = False
		randomize_ul_toa256 = False

		# Init default parameters
		self.reset_dl()
		self.reset_ul()

	# Initialize (or reset to) default parameters for Downlink
	def reset_dl(self):
		# Unset current DL freq.
		self.bts_freq = None

		# Indicated RSSI / ToA values
		self.toa256_dl_base = 0
		self.rssi_dl_base = -60

		# RSSI / ToA randomization threshold
		self.toa256_dl_threshold = 0
		self.rssi_dl_threshold = 0

		# Path loss simulation (burst dropping)
		self.burst_dl_drop_amount = 0
		self.burst_dl_drop_period = 1

	# Initialize (or reset to) default parameters for Uplink
	def reset_ul(self):
		# Unset current DL freq.
		self.bb_freq = None

		# Indicated RSSI / ToA values
		self.rssi_ul_base = -70
		self.toa256_ul_base = 0

		# RSSI / ToA randomization threshold
		self.toa256_ul_threshold = 0
		self.rssi_ul_threshold = 0

		# Path loss simulation (burst dropping)
		self.burst_ul_drop_amount = 0
		self.burst_ul_drop_period = 1

		# Init timeslot filter (drop everything by default)
		self.ts_pass = None

		# Reset Timing Advance value
		self.ta = 0

	# Converts TA value from symbols to
	# units of 1/256 of GSM symbol periods
	def calc_ta256(self):
		return self.ta * 256

	# Calculates a random ToA value for Downlink bursts
	def calc_dl_toa256(self):
		# Check if randomization is required
		if not self.randomize_dl_toa256:
			return self.toa256_dl_base

		# Calculate a range for randomization
		toa256_min = self.toa256_dl_base - self.toa256_dl_threshold
		toa256_max = self.toa256_dl_base + self.toa256_dl_threshold

		# Generate a random ToA value
		toa256 = random.randint(toa256_min, toa256_max)

		return toa256

	# Calculates a random ToA value for Uplink bursts
	def calc_ul_toa256(self):
		# Check if randomization is required
		if not self.randomize_ul_toa256:
			return self.toa256_ul_base

		# Calculate a range for randomization
		toa256_min = self.toa256_ul_base - self.toa256_ul_threshold
		toa256_max = self.toa256_ul_base + self.toa256_ul_threshold

		# Generate a random ToA value
		toa256 = random.randint(toa256_min, toa256_max)

		return toa256

	# Calculates a random RSSI value for Downlink bursts
	def calc_dl_rssi(self):
		# Check if randomization is required
		if not self.randomize_dl_rssi:
			return self.rssi_dl_base

		# Calculate a range for randomization
		rssi_min = self.rssi_dl_base - self.rssi_dl_threshold
		rssi_max = self.rssi_dl_base + self.rssi_dl_threshold

		# Generate a random RSSI value
		return random.randint(rssi_min, rssi_max)

	# Calculates a random RSSI value for Uplink bursts
	def calc_ul_rssi(self):
		# Check if randomization is required
		if not self.randomize_ul_rssi:
			return self.rssi_ul_base

		# Calculate a range for randomization
		rssi_min = self.rssi_ul_base - self.rssi_ul_threshold
		rssi_max = self.rssi_ul_base + self.rssi_ul_threshold

		# Generate a random RSSI value
		return random.randint(rssi_min, rssi_max)

	# DL path loss simulation
	def path_loss_sim_dl(self, msg):
		# Burst dropping
		if self.burst_dl_drop_amount > 0:
			if msg.fn % self.burst_dl_drop_period == 0:
				print("[~] Simulation: dropping DL burst (fn=%u %% %u == 0)"
					% (msg.fn, self.burst_dl_drop_period))
				self.burst_dl_drop_amount -= 1
				return None

		return msg

	# UL path loss simulation
	def path_loss_sim_ul(self, msg):
		# Burst dropping
		if self.burst_ul_drop_amount > 0:
			if msg.fn % self.burst_ul_drop_period == 0:
				print("[~] Simulation: dropping UL burst (fn=%u %% %u == 0)"
					% (msg.fn, self.burst_ul_drop_period))
				self.burst_ul_drop_amount -= 1
				return None

		return msg

	# DL burst preprocessing
	def preprocess_dl_burst(self, msg):
		# Calculate both RSSI and ToA values
		msg.toa256 = self.calc_dl_toa256()
		msg.rssi = self.calc_dl_rssi()

	# UL burst preprocessing
	def preprocess_ul_burst(self, msg):
		# Calculate both RSSI and ToA values,
		# also apply Timing Advance
		msg.toa256 = self.calc_ul_toa256()
		msg.toa256 -= self.calc_ta256()
		msg.rssi = self.calc_ul_rssi()

	# Converts a L12TRX message to TRX2L1 message
	def transform_msg(self, msg_raw):
		# Attempt to parse a message
		try:
			msg_l12trx = DATAMSG_L12TRX()
			msg_l12trx.parse_msg(bytearray(msg_raw))
		except:
			print("[!] Dropping unhandled DL message...")
			return None

		# Compose a new message for L1
		return msg_l12trx.gen_trx2l1()

	# Downlink handler: BTS -> BB
	def bts2bb(self):
		# Read data from socket
		data, addr = self.bts_link.sock.recvfrom(512)

		# BB is not connected / tuned
		if self.bb_freq is None:
			return None

		# Freq. filter
		if self.bb_freq != self.bts_freq:
			return None

		# Process a message
		msg = self.transform_msg(data)
		if msg is None:
			return None

		# Timeslot filter
		if msg.tn != self.ts_pass:
			return None

		# Path loss simulation
		msg = self.path_loss_sim_dl(msg)
		if msg is None:
			return None

		# Burst preprocessing
		self.preprocess_dl_burst(msg)

		# Validate and generate the payload
		payload = msg.gen_msg()

		# Append two unused bytes at the end
		# in order to keep the compatibility
		payload += bytearray(2)

		# Send burst to BB
		self.bb_link.send(payload)

	# Uplink handler: BB -> BTS
	def bb2bts(self):
		# Read data from socket
		data, addr = self.bb_link.sock.recvfrom(512)

		# BTS is not connected / tuned
		if self.bts_freq is None:
			return None

		# Freq. filter
		if self.bb_freq != self.bts_freq:
			return None

		# Process a message
		msg = self.transform_msg(data)
		if msg is None:
			return None

		# Path loss simulation
		msg = self.path_loss_sim_ul(msg)
		if msg is None:
			return None

		# Burst preprocessing
		self.preprocess_ul_burst(msg)

		# Validate and generate the payload
		payload = msg.gen_msg()

		# Append two unused bytes at the end
		# in order to keep the compatibility
		payload += bytearray(2)

		# Send burst to BTS
		self.bts_link.send(payload)