summaryrefslogtreecommitdiffstats
path: root/s1ap_reiniger.py
blob: 1c21f26997a509c7064df58b665af1456ab96205 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Reiniger: S1AP / NAS packet capture anonymizer.
#
# (C) 2019 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All rights reserved.
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Released under the terms of GNU General Public License,
# Version 2 or (at your option) any later version.
#
# Based on Scapy and pycrate:
# https://github.com/secdev/scapy
# https://github.com/P1sec/pycrate

import logging as log
import sys

from scapy.all import PcapReader, PcapWriter
from scapy.all import SCTPChunkData, NoPayload
from scapy.all import SCTP

from pycrate_asn1dir import S1AP
from pycrate_mobile import NAS

def handle_nas_pdu(pdu, dl, regen = False):
	log.debug("Processing %s NAS PDU: %s" % ("Downlink" if dl else "Uplink", pdu.hex()))
	(msg, code) = NAS.parse_NASLTE_MT(pdu) if dl else NAS.parse_NASLTE_MO(pdu)
	if code:
		log.error("Failed to parse NAS payload")
		return None
	
	# Try to find EPSID (may contain IMSI)
	# TODO: also patch IMEI / IMEISV
	try:
		epsid = msg['EPSID'][1]
		# Check if EPSID contains exactly IMSI
		if epsid[2].get_val() == 1:
			log.info("Cleaning %s" % epsid.repr())
			# 262420000000000, Vodafone GmbH, Germany
			epsid.from_bytes('\x29\x26\x24' + '\x00' * 5)
			regen = True
	except:
		pass

	return msg.to_bytes() if regen else pdu

def handle_s1ap_imsi(imsi):
	log.info("Cleaning IMSI: %s" % NAS.decode_bcd(imsi))
	# 262420000000000, Vodafone GmbH, Germany
	return '\x29\x26\x24' + '\x00' * 5

def handle_s1ap_tmsi(tmsi):
	log.info("Cleaning TMSI: %s" % tmsi.hex())
	return tmsi # NOTE: for now, keep TMSI unchanged

def handle_s1ap_imeisv(imeisv):
	log.info("Cleaning IMEISV: %s" % NAS.decode_bcd(imeisv))
	# 3555720187847840 (Motorola C113)
	return '\x33\x55\x75\x02\x81\x87\x74\x48\xf0'

def find_s1ap_ie(ie_list, ie_id):
	for ie in ie_list:
		if ie['id'] == ie_id:
			return ie
	return None

def find_s1ap_ies(ie_list, ie_ids):
	for ie in ie_list:
		if ie['id'] in ie_ids:
			return ie
	return None

def handle_s1ap_paging(msg):
	ie = find_s1ap_ie(msg['protocolIEs'], S1AP.S1AP_Constants.id_UEPagingID.get_val())
	if ie is None:
		return

	ueid = ie['value'][1]
	if ueid[0] == 's-TMSI':
		tmsi = handle_s1ap_tmsi(ueid[1]['m-TMSI'])
		ueid[1]['m-TMSI'] = tmsi
	elif ueid[0] == 'iMSI':
		imsi = handle_s1ap_imsi(ueid[1])
		# FIXME: I am not 100% sure if this is correct
		ie['value'] = ('iMSI', imsi)
	else:
		log.warn("Unknown Paging identity type '%s'" % ueid[0])

def find_and_handle_s1ap_imei_ie(msg):
	ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_Masked_IMEISV.get_val())
	if ie is not None:
		imeisv = handle_s1ap_imeisv(ie['value'][1])
		# FIXME: I am not 100% sure if this is correct
		ie['value'] = (ie['value'][0], imeisv)

def find_and_handle_s1ap_nas_pdu(msg, dl):
	ie = find_s1ap_ie(msg[1]['protocolIEs'], S1AP.S1AP_Constants.id_NAS_PDU.get_val())
	if ie is not None:
		pdu = handle_nas_pdu(ie['value'][1], dl)
		ie['value'] = ('NAS-PDU', pdu)

def find_and_handle_s1ap_nested_nas_pdu(msg):
	IEs = {
		S1AP.S1AP_Constants.id_E_RABToBeSetupListCtxtSUReq.get_val() : 'E-RABToBeSetupItemCtxtSUReq',
		S1AP.S1AP_Constants.id_E_RABToBeSetupListBearerSUReq.get_val() : 'E-RABToBeSetupItemBearerSUReq',
		S1AP.S1AP_Constants.id_E_RABToBeModifiedListBearerModReq.get_val() : 'E-RABToBeModifiedItemBearerModReq'}
	ie = find_s1ap_ies(msg[1]['protocolIEs'], IEs.keys())
	if ie is None:
		return

	for item in ie['value'][1]:
		if item['value'][0] in IEs.values():
			pdu = handle_nas_pdu(item['value'][1]['nAS-PDU'], dl = True)
			item['value'][1]['nAS-PDU'] = pdu

def handle_s1ap(msg):
	log.info("Processing S1AP message '%s:%s'" % (msg[0], msg[1]['value'][0]))

	# Paging may contain 
	if msg[0] != u'initiatingMessage':
		return
	
	# Look for IMSI, TMSI and IMEISV in S1AP
	if msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_Paging.get_val():
		handle_s1ap_paging(msg[1]['value'][1])
	elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_InitialContextSetup.get_val():
		find_and_handle_s1ap_imei_ie(msg[1]['value'])
	elif msg[1]['procedureCode'] == S1AP.S1AP_Constants.id_HandoverResourceAllocation.get_val():
		find_and_handle_s1ap_imei_ie(msg[1]['value'])

	# Look for NAS payload (which may also contain sensitive info)
	procCodeListNested = ( # MME-originated NAS payload
		S1AP.S1AP_Constants.id_E_RABSetup.get_val(),
		S1AP.S1AP_Constants.id_E_RABModify.get_val(),
		S1AP.S1AP_Constants.id_InitialContextSetup.get_val())
	procCodeListDL = ( # MME-originated NAS payload
		S1AP.S1AP_Constants.id_E_RABRelease.get_val(),
		S1AP.S1AP_Constants.id_downlinkNASTransport.get_val())
	procCodeListUL = ( # UE-originated payload
		S1AP.S1AP_Constants.id_initialUEMessage.get_val(),
		S1AP.S1AP_Constants.id_uplinkNASTransport.get_val(),
		S1AP.S1AP_Constants.id_NASNonDeliveryIndication.get_val())
	if msg[1]['procedureCode'] in procCodeListNested:
		find_and_handle_s1ap_nested_nas_pdu(msg[1]['value'])
	elif msg[1]['procedureCode'] in procCodeListDL:
		find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = True)
	elif msg[1]['procedureCode'] in procCodeListUL:
		find_and_handle_s1ap_nas_pdu(msg[1]['value'], dl = False)

def handle_sctp_chunk(chunk):
	log.debug("Processing an ASN.1 encoded S1AP PDU")

	# Parse ASN.1 encoded S1AP PDU
	s1ap = S1AP.S1AP_PDU_Descriptions.S1AP_PDU
	# Guard against malformed packets
	try:
		s1ap.from_aper(chunk.data)
	except:
		log.error("Malformed packet, skipping...")
		return False

	handle_s1ap(s1ap())

	# Encapsulate the new payload
	# TODO: reset checksum fields
	try:
		# Encode the S1AP payload
		payload = s1ap.to_aper()
		chunk.data = payload

		# Scapy will calculate the length and padding
		del chunk.len
	except:
		log.error("Failed to encode a S1AP payload")
		return False

	return True

def handle_sctp_pkt(pkt):
	# Find and decapsulate the S1AP payload
	ip = pkt.payload
	sctp = ip.payload

	# There can be multiple chunks, look for Data with S1AP
	chunk = sctp.payload
	while not isinstance(chunk, NoPayload):
		if isinstance(chunk, SCTPChunkData) and chunk.proto_id == 0x12:
			success = handle_sctp_chunk(chunk)
			if not success:
				return None
		chunk = chunk.payload

	return pkt

def handle_pcap(src_path, dst_path = '/dev/null'):
	# Open the destination capture
	dst_pcap = PcapWriter(dst_path)

	# Open source capture (with sensitive info)
	src_pcap = PcapReader(src_path)

	# Get the first packet
	pkt = src_pcap.read_packet()

	# Process all packets in loop
	while pkt:
		# TODO: S1AP can be carried over TCP too
		if pkt.haslayer(SCTP):
			pkt = handle_sctp_pkt(pkt)

		# Write potentially cleaned packet to the new file
		if pkt is not None:
			dst_pcap.write(pkt)

		# Get the next packet
		pkt = src_pcap.read_packet()

if __name__ == '__main__':
	# Configure logging format
	# Example: [DEBUG] foo_bar.py:71 Mahlzeit!
	LOG_FMT_DEFAULT = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s"
	LOG_LEVEL_DEFAULT = log.INFO

	# Default logging handler (stderr)
	sh = log.StreamHandler()
	sh.setLevel(log.getLevelName(LOG_LEVEL_DEFAULT))
	sh.setFormatter(log.Formatter(LOG_FMT_DEFAULT))
	log.root.addHandler(sh)

	# Set DEBUG for the root logger
	log.root.setLevel(log.DEBUG)

	# TODO: use argparse
	if len(sys.argv) < 3:
		print("Usage: %s SRC_PCAP DST_PCAP" % sys.argv[0])
		sys.exit(1)

	handle_pcap(sys.argv[1], sys.argv[2])