1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Reiniger: S1AP / NAS packet capture anonymizer.
#
# (C) 2019 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All rights reserved.
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Released under the terms of GNU General Public License,
# Version 2 or (at your option) any later version.
#
# Based on Scapy and pycrate:
# https://github.com/secdev/scapy
# https://github.com/P1sec/pycrate
import logging as log
import sys
from scapy.all import PcapReader, PcapWriter
from scapy.all import SCTPChunkData, NoPayload
from scapy.all import SCTP
from pycrate_asn1dir import S1AP
from pycrate_mobile import NAS
def handle_nas_pdu(pdu, dl, regen = False):
log.debug("Processing %s NAS PDU: %s" % ("Downlink" if dl else "Uplink", pdu.hex()))
(msg, code) = NAS.parse_NASLTE_MT(pdu) if dl else NAS.parse_NASLTE_MO(pdu)
if code:
log.error("Failed to parse NAS payload")
return None
# Try to find EPSID (may contain IMSI)
# TODO: also patch IMEI / IMEISV
try:
epsid = msg['EPSID'][1]
# Check if EPSID contains exactly IMSI
if epsid[2].get_val() == 1:
log.info("Cleaning %s" % epsid.repr())
# 262420000000000, Vodafone GmbH, Germany
epsid.from_bytes('\x29\x26\x24' + '\x00' * 5)
regen = True
except:
pass
return msg.to_bytes() if regen else pdu
def handle_s1ap_imsi(imsi):
log.info("Cleaning IMSI: %s" % NAS.decode_bcd(imsi))
# 262420000000000, Vodafone GmbH, Germany
return '\x29\x26\x24' + '\x00' * 5
def handle_s1ap_tmsi(tmsi):
log.info("Cleaning TMSI: %s" % tmsi.hex())
return tmsi # NOTE: for now, keep TMSI unchanged
def handle_s1ap_imeisv(imeisv):
log.info("Cleaning IMEISV: %s" % NAS.decode_bcd(imeisv))
# 3555720187847840 (Motorola C113)
return '\x33\x55\x75\x02\x81\x87\x74\x48\xf0'
def find_s1ap_ie(ie_list, ie_id):
for ie in ie_list:
if ie['id'] == ie_id:
return ie
return None
def find_s1ap_ies(ie_list, ie_ids):
for ie in ie_list:
if ie['id'] in ie_ids:
return ie
return None
def handle_s1ap_paging(msg):
ie = find_s1ap_ie(msg['protocolIEs'], S1AP.S1AP_Constants.id_UEPagingID.get_val())
if ie is None:
return
ueid = ie['value'][1]
if ueid[0] == 's-TMSI':
tmsi = handle_s1ap_tmsi(ueid[1]['m-TMSI'])
ueid[1]['m-TMSI'] = tmsi
elif ueid[0] == 'iMSI':
imsi = handle_s1ap_imsi(ueid[1])
# FIXME: I am not 100% sure if this is correct
ie['value'] = ('iMSI', imsi)
else:
log.warn("Unknown Paging identity type '%s'" % ueid[0])
def find_and_handle_s1ap_imei_ie(msg):
ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_Masked_IMEISV.get_val())
if ie is not None:
imeisv = handle_s1ap_imeisv(ie['value'][1])
# FIXME: I am not 100% sure if this is correct
ie['value'] = (ie['value'][0], imeisv)
def find_and_handle_s1ap_nas_pdu(msg, dl):
ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_NAS_PDU.get_val())
if ie is not None:
pdu = handle_nas_pdu(ie['value'][1], dl)
ie['value'] = ('NAS-PDU', pdu)
def find_and_handle_s1ap_nested_nas_pdu(msg):
IEs = {
S1AP.S1AP_Constants.id_E_RABToBeSetupListCtxtSUReq.get_val() : 'E-RABToBeSetupItemCtxtSUReq',
S1AP.S1AP_Constants.id_E_RABToBeSetupListBearerSUReq.get_val() : 'E-RABToBeSetupItemBearerSUReq',
S1AP.S1AP_Constants.id_E_RABToBeModifiedListBearerModReq.get_val() : 'E-RABToBeModifiedItemBearerModReq'}
ie = find_s1ap_ies(msg[1]['protocolIEs'], IEs.keys())
if ie is None:
return
for item in ie['value'][1]:
if item['value'][0] in IEs.values():
pdu = handle_nas_pdu(item['value'][1]['nAS-PDU'], dl = True)
item['value'][1]['nAS-PDU'] = pdu
def handle_s1ap(msg):
log.info("Processing S1AP message '%s:%s'" % (msg[0], msg[1]['value'][0]))
# Paging may contain
if msg[0] != u'initiatingMessage':
return
# Look for IMSI, TMSI and IMEISV in S1AP
if msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_Paging.get_val():
handle_s1ap_paging(msg[1]['value'][1])
elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_InitialContextSetup.get_val():
find_and_handle_s1ap_imei_ie(msg[1]['value'])
elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_HandoverResourceAllocation.get_val():
find_and_handle_s1ap_imei_ie(msg[1]['value'])
# Look for NAS payload (which may also contain sensitive info)
procCodeListNested = ( # MME-originated NAS payload
S1AP.S1AP_Constants.id_E_RABSetup.get_val(),
S1AP.S1AP_Constants.id_E_RABModify.get_val(),
S1AP.S1AP_Constants.id_InitialContextSetup.get_val())
procCodeListDL = ( # MME-originated NAS payload
S1AP.S1AP_Constants.id_E_RABRelease.get_val(),
S1AP.S1AP_Constants.id_downlinkNASTransport.get_val())
procCodeListUL = ( # UE-originated payload
S1AP.S1AP_Constants.id_initialUEMessage.get_val(),
S1AP.S1AP_Constants.id_uplinkNASTransport.get_val(),
S1AP.S1AP_Constants.id_NASNonDeliveryIndication.get_val())
if msg[1]['procedureCode'] in procCodeListNested:
find_and_handle_s1ap_nested_nas_pdu(msg[1]['value'])
elif msg[1]['procedureCode'] in procCodeListDL:
find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = True)
elif msg[1]['procedureCode'] in procCodeListUL:
find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = False)
def handle_sctp_chunk(chunk):
log.debug("Processing an ASN.1 encoded S1AP PDU")
# Parse ASN.1 encoded S1AP PDU
s1ap = S1AP.S1AP_PDU_Descriptions.S1AP_PDU
# Guard against malformed packets
try:
s1ap.from_aper(chunk.data)
except:
log.error("Malformed packet, skipping...")
return False
handle_s1ap(s1ap())
# Encapsulate the new payload
# TODO: reset checksum fields
try:
# Encode the S1AP payload
payload = s1ap.to_aper()
chunk.data = payload
# Scapy will calculate the length and padding
del chunk.len
except:
log.error("Failed to encode a S1AP payload")
return False
return True
def handle_sctp_pkt(pkt):
# Find and decapsulate the S1AP payload
ip = pkt.payload
sctp = ip.payload
# There can be multiple chunks, look for Data with S1AP
chunk = sctp.payload
while not isinstance(chunk, NoPayload):
if isinstance(chunk, SCTPChunkData) and chunk.proto_id == 0x12:
success = handle_sctp_chunk(chunk)
if not success:
return None
chunk = chunk.payload
return pkt
def handle_pcap(src_path, dst_path = '/dev/null'):
# Open the destination capture
dst_pcap = PcapWriter(dst_path)
# Open source capture (with sensitive info)
src_pcap = PcapReader(src_path)
# Get the first packet
pkt = src_pcap.read_packet()
# Process all packets in loop
while pkt:
# TODO: S1AP can be carried over TCP too
if pkt.haslayer(SCTP):
pkt = handle_sctp_pkt(pkt)
# Write potentially cleaned packet to the new file
if pkt is not None:
dst_pcap.write(pkt)
# Get the next packet
pkt = src_pcap.read_packet()
if __name__ == '__main__':
# Configure logging format
# Example: [DEBUG] foo_bar.py:71 Mahlzeit!
LOG_FMT_DEFAULT = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s"
LOG_LEVEL_DEFAULT = log.INFO
# Default logging handler (stderr)
sh = log.StreamHandler()
sh.setLevel(log.getLevelName(LOG_LEVEL_DEFAULT))
sh.setFormatter(log.Formatter(LOG_FMT_DEFAULT))
log.root.addHandler(sh)
# Set DEBUG for the root logger
log.root.setLevel(log.DEBUG)
# TODO: use argparse
if len(sys.argv) < 3:
print("Usage: %s SRC_PCAP DST_PCAP" % sys.argv[0])
sys.exit(1)
handle_pcap(sys.argv[1], sys.argv[2])
|