1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
|
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# TRX Toolkit
# BTS <-> BB burst forwarding
#
# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import random
from data_msg import *
class BurstForwarder:
""" Performs burst forwarding and preprocessing between MS and BTS.
== Pass-filtering parameters
BurstForwarder may drop or pass an UL/DL burst depending
on the following parameters:
- bts_freq / bb_freq - the current BTS / MS frequency
that was set using RXTUNE control command. By default,
both freq. values are set to None, so nothing is being
forwarded (i.e. bursts are getting dropped).
FIXME: currently, we don't care about TXTUNE command
and transmit frequencies. It would be great to distinguish
between RX and TX frequencies for both BTS and MS.
- ts_pass_list - the list of active (i.e. configured)
timeslot numbers for the MS. A timeslot can be activated
or deactivated using SETSLOT control command from the MS.
FIXME: there is no such list for the BTS side.
== Preprocessing and measurement simulation
Since this is a virtual environment, we can simulate different
parameters of a virtual RF interface:
- ToA (Timing of Arrival) - measured difference between expected
and actual time of burst arrival in units of 1/256 of GSM symbol
periods. A pair of both base and threshold values defines a range
of ToA value randomization:
DL: from (toa256_dl_base - toa256_dl_threshold)
to (toa256_dl_base + toa256_dl_threshold),
UL: from (toa256_ul_base - toa256_ul_threshold)
to (toa256_ul_base + toa256_ul_threshold).
- RSSI (Received Signal Strength Indication) - measured "power" of
the signal (per burst) in dBm. A pair of both base and threshold
values defines a range of RSSI value randomization:
DL: from (rssi_dl_base - rssi_dl_threshold)
to (rssi_dl_base + rssi_dl_threshold),
UL: from (rssi_ul_base - rssi_ul_threshold)
to (rssi_ul_base + rssi_ul_threshold).
Please note that the randomization of both RSSI and ToA
is optional, and should be enabled manually.
=== Timing Advance handling
The BTS is using ToA measurements for UL bursts in order to calculate
Timing Advance value, that is then indicated to a MS, which in its turn
shall apply this value to the transmitted signal in order to compensate
the delay. Basically, every burst is transmitted in advance defined by
the indicated Timing Advance value. The valid range is 0..63, where
each unit means one GSM symbol advance. The actual Timing Advance value
is set using SETTA control command from MS. By default, it's set to 0.
=== Path loss simulation - burst dropping
In some cases, e.g. due to a weak signal or high interference, a burst
can be lost, i.e. not detected by the receiver. This can also be
simulated using FAKE_DROP command on both control interfaces:
- burst_{dl|ul}_drop_amount - the amount of DL/UL bursts
to be dropped (i.e. not forwarded towards the MS/BTS),
- burst_{dl|ul}_drop_period - drop every X DL/UL burst, e.g.
1 - drop every consequent burst, 2 - drop every second burst, etc.
"""
def __init__(self, bts_link, bb_link):
self.bts_link = bts_link
self.bb_link = bb_link
# Randomization of RSSI
randomize_dl_rssi = False
randomize_ul_rssi = False
# Randomization of ToA
randomize_dl_toa256 = False
randomize_ul_toa256 = False
# Init default parameters
self.reset_dl()
self.reset_ul()
# Initialize (or reset to) default parameters for Downlink
def reset_dl(self):
# Unset current DL freq.
self.bts_freq = None
# Indicated RSSI / ToA values
self.toa256_dl_base = 0
self.rssi_dl_base = -60
# RSSI / ToA randomization threshold
self.toa256_dl_threshold = 0
self.rssi_dl_threshold = 0
# Path loss simulation (burst dropping)
self.burst_dl_drop_amount = 0
self.burst_dl_drop_period = 1
# Initialize (or reset to) default parameters for Uplink
def reset_ul(self):
# Unset current DL freq.
self.bb_freq = None
# Indicated RSSI / ToA values
self.rssi_ul_base = -70
self.toa256_ul_base = 0
# RSSI / ToA randomization threshold
self.toa256_ul_threshold = 0
self.rssi_ul_threshold = 0
# Path loss simulation (burst dropping)
self.burst_ul_drop_amount = 0
self.burst_ul_drop_period = 1
# Init timeslot filter (drop everything by default)
self.ts_pass_list = []
# Reset Timing Advance value
self.ta = 0
# Converts TA value from symbols to
# units of 1/256 of GSM symbol periods
def calc_ta256(self):
return self.ta * 256
# Calculates a random ToA value for Downlink bursts
def calc_dl_toa256(self):
# Check if randomization is required
if not self.randomize_dl_toa256:
return self.toa256_dl_base
# Calculate a range for randomization
toa256_min = self.toa256_dl_base - self.toa256_dl_threshold
toa256_max = self.toa256_dl_base + self.toa256_dl_threshold
# Generate a random ToA value
toa256 = random.randint(toa256_min, toa256_max)
return toa256
# Calculates a random ToA value for Uplink bursts
def calc_ul_toa256(self):
# Check if randomization is required
if not self.randomize_ul_toa256:
return self.toa256_ul_base
# Calculate a range for randomization
toa256_min = self.toa256_ul_base - self.toa256_ul_threshold
toa256_max = self.toa256_ul_base + self.toa256_ul_threshold
# Generate a random ToA value
toa256 = random.randint(toa256_min, toa256_max)
return toa256
# Calculates a random RSSI value for Downlink bursts
def calc_dl_rssi(self):
# Check if randomization is required
if not self.randomize_dl_rssi:
return self.rssi_dl_base
# Calculate a range for randomization
rssi_min = self.rssi_dl_base - self.rssi_dl_threshold
rssi_max = self.rssi_dl_base + self.rssi_dl_threshold
# Generate a random RSSI value
return random.randint(rssi_min, rssi_max)
# Calculates a random RSSI value for Uplink bursts
def calc_ul_rssi(self):
# Check if randomization is required
if not self.randomize_ul_rssi:
return self.rssi_ul_base
# Calculate a range for randomization
rssi_min = self.rssi_ul_base - self.rssi_ul_threshold
rssi_max = self.rssi_ul_base + self.rssi_ul_threshold
# Generate a random RSSI value
return random.randint(rssi_min, rssi_max)
# DL path loss simulation
def path_loss_sim_dl(self, msg):
# Burst dropping
if self.burst_dl_drop_amount > 0:
if msg.fn % self.burst_dl_drop_period == 0:
print("[~] Simulation: dropping DL burst (fn=%u %% %u == 0)"
% (msg.fn, self.burst_dl_drop_period))
self.burst_dl_drop_amount -= 1
return None
return msg
# UL path loss simulation
def path_loss_sim_ul(self, msg):
# Burst dropping
if self.burst_ul_drop_amount > 0:
if msg.fn % self.burst_ul_drop_period == 0:
print("[~] Simulation: dropping UL burst (fn=%u %% %u == 0)"
% (msg.fn, self.burst_ul_drop_period))
self.burst_ul_drop_amount -= 1
return None
return msg
# DL burst preprocessing
def preprocess_dl_burst(self, msg):
# Calculate both RSSI and ToA values
msg.toa256 = self.calc_dl_toa256()
msg.rssi = self.calc_dl_rssi()
# UL burst preprocessing
def preprocess_ul_burst(self, msg):
# Calculate both RSSI and ToA values,
# also apply Timing Advance
msg.toa256 = self.calc_ul_toa256()
msg.toa256 -= self.calc_ta256()
msg.rssi = self.calc_ul_rssi()
# Converts a L12TRX message to TRX2L1 message
def transform_msg(self, msg_raw):
# Attempt to parse a message
try:
msg_l12trx = DATAMSG_L12TRX()
msg_l12trx.parse_msg(bytearray(msg_raw))
except:
print("[!] Dropping unhandled DL message...")
return None
# Compose a new message for L1
return msg_l12trx.gen_trx2l1()
# Downlink handler: BTS -> BB
def bts2bb(self):
# Read data from socket
data, addr = self.bts_link.sock.recvfrom(512)
# BB is not connected / tuned
if self.bb_freq is None:
return None
# Freq. filter
if self.bb_freq != self.bts_freq:
return None
# Process a message
msg = self.transform_msg(data)
if msg is None:
return None
# Timeslot filter
if msg.tn not in self.ts_pass_list:
return None
# Path loss simulation
msg = self.path_loss_sim_dl(msg)
if msg is None:
return None
# Burst preprocessing
self.preprocess_dl_burst(msg)
# Validate and generate the payload
payload = msg.gen_msg()
# Append two unused bytes at the end
# in order to keep the compatibility
payload += bytearray(2)
# Send burst to BB
self.bb_link.send(payload)
# Uplink handler: BB -> BTS
def bb2bts(self):
# Read data from socket
data, addr = self.bb_link.sock.recvfrom(512)
# BTS is not connected / tuned
if self.bts_freq is None:
return None
# Freq. filter
if self.bb_freq != self.bts_freq:
return None
# Process a message
msg = self.transform_msg(data)
if msg is None:
return None
# Timeslot filter
if msg.tn not in self.ts_pass_list:
print("[!] TS %u is not configured, dropping UL burst..." % msg.tn)
return None
# Path loss simulation
msg = self.path_loss_sim_ul(msg)
if msg is None:
return None
# Burst preprocessing
self.preprocess_ul_burst(msg)
# Validate and generate the payload
payload = msg.gen_msg()
# Append two unused bytes at the end
# in order to keep the compatibility
payload += bytearray(2)
# Send burst to BTS
self.bts_link.send(payload)
|