aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@osmocom.org>2022-08-06 19:24:52 +0200
committerHarald Welte <laforge@osmocom.org>2022-08-12 16:58:43 +0200
commit2241e72ecd134279b3fafe77f09d685aa803ec78 (patch)
treed82f64220f954649a30c16cbb18b7d45466a5147
parent755bb2dcfc3eecdaaff7258616e7f3bf7ba26928 (diff)
WIP: smpp2sim
-rwxr-xr-xsmpp2sim.py245
1 files changed, 245 insertions, 0 deletions
diff --git a/smpp2sim.py b/smpp2sim.py
new file mode 100755
index 0000000..0922b40
--- /dev/null
+++ b/smpp2sim.py
@@ -0,0 +1,245 @@
+#!/usr/bin/env python3
+#
+# Program to emulate the entire communication path SMSC-MSC-BSC-BTS-ME
+# that is usually between an OTA backend and the SIM card. This allows
+# to play with SIM OTA technology without using a mobile network or even
+# a mobile phone.
+#
+# An external application must encode (and encrypt/sign) the OTA SMS
+# and submit them via SMPP to this program, just like it would submit
+# it normally to a SMSC (SMS Service Centre). The program then re-formats
+# the SMPP-SUBMIT into a SMS DELIVER TPDU and passes it via an ENVELOPE
+# APDU to the SIM card that is locally inserted into a smart card reader.
+#
+# The path from SIM to external OTA application works the opposite way.
+
+import argparse
+import logging
+import colorlog
+from pprint import pprint as pp
+
+from twisted.protocols import basic
+from twisted.internet import defer, endpoints, protocol, reactor, task
+from twisted.cred.portal import IRealm
+from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
+from twisted.cred.portal import Portal
+from zope.interface import implementer
+
+from smpp.twisted.config import SMPPServerConfig
+from smpp.twisted.server import SMPPServerFactory, SMPPBindManager
+from smpp.twisted.protocol import SMPPSessionStates, DataHandlerResponse
+
+from smpp.pdu import pdu_types, operations, pdu_encoding
+
+from pySim.sms import SMS_DELIVER, AddressField
+
+from pySim.transport import LinkBase, ProactiveHandler, argparse_add_reader_args, init_reader
+from pySim.commands import SimCardCommands
+from pySim.cards import UsimCard
+from pySim.exceptions import *
+from pySim.cat import ProactiveCommand, SendShortMessage, SMS_TPDU, SMSPPDownload
+from pySim.cat import DeviceIdentities, Address
+from pySim.utils import b2h, h2b
+
+logger = logging.getLogger(__name__)
+
+# MSISDNs to use when generating proactive SMS messages
+SIM_MSISDN='23'
+ESME_MSISDN='12'
+
+# HACK: we need some kind of mapping table between system_id and card-reader
+# or actually route based on MSISDNs
+hackish_global_smpp = None
+
+class Proact(ProactiveHandler):
+ def __init__(self, smpp_factory):
+ self.smpp_factory = smpp_factory
+
+ @staticmethod
+ def _find_first_element_of_type(instlist, cls):
+ for i in instlist:
+ if isinstance(i, cls):
+ return i
+ return None
+
+ """Call-back which the pySim transport core calls whenever it receives a
+ proactive command from the SIM."""
+ def handle_SendShortMessage(self, data):
+ """Card requests sending a SMS."""
+ pp(data)
+ # Relevant parts in data: Address, SMS_TPDU
+ addr_ie = _find_first_element_of_type(data.children, Address)
+ sms_tpdu_ie = _find_first_element_of_type(data.children, SMS_TPDU)
+ raw_tpdu = sms_tpdu_ie.decoded['tpdu']
+ submit = SMS_SUBMIT.fromBytes(raw_tpdu)
+ self.send_sms_via_smpp(data)
+ def handle_OpenChannel(self, data):
+ """Card requests opening a new channel via a UDP/TCP socket."""
+ pp(data)
+ pass
+ def handle_CloseChannel(self, data):
+ """Close a channel."""
+ pp(data)
+ pass
+ def handleReceiveData(self, data):
+ """Receive/read data from the socket."""
+ pp(data)
+ pass
+ def handleSendData(self, data):
+ """Send/write data to the socket."""
+ pp(data)
+ pass
+ def getChannelStatus(self, data):
+ pp(data)
+ pass
+
+ def send_sms_via_smpp(self, data):
+ # while in a normal network the phone/ME would *submit* a message to the SMSC,
+ # we are actually emulating the SMSC itself, so we must *deliver* the message
+ # to the ESME
+ dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
+ pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
+ esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
+ gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
+ deliver = operations.DeliverSM(source_addr=SIM_MSISDN,
+ destination_addr=ESME_MSISDN,
+ esm_class=esm_class,
+ protocol_id=0x7F,
+ data_coding=dcs,
+ short_message=h2b(data))
+ hackish_global_smpp.sendDataRequest(deliver)
+# # obtain the connection/binding of system_id to be used for delivering MO-SMS to the ESME
+# connection = smpp_server.getBoundConnections[system_id].getNextBindingForDelivery()
+# connection.sendDataRequest(deliver)
+
+
+
+def dcs_is_8bit(dcs):
+ if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
+ pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
+ return True
+ if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
+ pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
+ return True
+ if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
+ return True
+ else:
+ return False
+
+
+class MyServer:
+
+ @implementer(IRealm)
+ class SmppRealm:
+ def requestAvatar(self, avatarId, mind, *interfaces):
+ return ('SMPP', avatarId, lambda: None)
+
+ def __init__(self, tcp_port:int = 2775, bind_ip = '::'):
+ smpp_config = SMPPServerConfig(msgHandler=self._msgHandler,
+ systems={'test': {'max_bindings': 2}})
+ portal = Portal(self.SmppRealm())
+ credential_checker = InMemoryUsernamePasswordDatabaseDontUse()
+ credential_checker.addUser('test', 'test')
+ portal.registerChecker(credential_checker)
+ self.factory = SMPPServerFactory(smpp_config, auth_portal=portal)
+ logger.info('Binding Virtual SMSC to TCP Port %u at %s' % (tcp_port, bind_ip))
+ smppEndpoint = endpoints.TCP6ServerEndpoint(reactor, tcp_port, interface=bind_ip)
+ smppEndpoint.listen(self.factory)
+ self.tp = self.scc = self.card = None
+
+ def connect_to_card(self, tp: LinkBase):
+ self.tp = tp
+ self.scc = SimCardCommands(self.tp)
+ self.card = UsimCard(self.scc)
+ # this should be part of UsimCard, but FairewavesSIM breaks with that :/
+ self.scc.cla_byte = "00"
+ self.scc.sel_ctrl = "0004"
+ self.card.read_aids()
+ self.card.select_adf_by_aid(adf='usim')
+ # FIXME: create a more realistic profile than ffffff
+ self.scc.terminal_profile('ffffff')
+
+ def _msgHandler(self, system_id, smpp, pdu):
+ # HACK: we need some kind of mapping table between system_id and card-reader
+ # or actually route based on MSISDNs
+ global hackish_global_smpp
+ hackish_global_smpp = smpp
+ #pp(pdu)
+ if pdu.id == pdu_types.CommandId.submit_sm:
+ return self.handle_submit_sm(system_id, smpp, pdu)
+ else:
+ logging.warning('Rejecting non-SUBMIT commandID')
+ return pdu_types.CommandStatus.ESME_RINVCMDID
+
+ def handle_submit_sm(self, system_id, smpp, pdu):
+ # check for valid data coding scheme + PID
+ if not dcs_is_8bit(pdu.params['data_coding']):
+ logging.warning('Rejecting non-8bit DCS')
+ return pdu_types.CommandStatus.ESME_RINVDCS
+ if pdu.params['protocol_id'] != 0x7f:
+ logging.warning('Rejecting non-SIM PID')
+ return pdu_types.CommandStatus.ESME_RINVDCS
+
+ # 1) build a SMS-DELIVER (!) from the SMPP-SUBMIT
+ tpdu = SMS_DELIVER.fromSmppSubmit(pdu)
+ print(tpdu)
+ # 2) wrap into the CAT ENVELOPE for SMS-PP-Download
+ tpdu_ie = SMS_TPDU(decoded={'tpdu': b2h(tpdu.toBytes())})
+ dev_ids = DeviceIdentities(decoded={'source_dev_id': 'network', 'dest_dev_id': 'uicc'})
+ sms_dl = SMSPPDownload(children=[dev_ids, tpdu_ie])
+ # 3) send to the card
+ envelope_hex = b2h(sms_dl.to_tlv())
+ print("ENVELOPE: %s" % envelope_hex)
+ (data, sw) = self.scc.envelope(envelope_hex)
+ print("SW %s: %s" % (sw, data))
+ if sw == '9300':
+ # TODO send back RP-ERROR message with TP-FCS == 'SIM Application Toolkit Busy'
+ return pdu_types.CommandStatus.ESME_RSUBMITFAIL
+ elif sw == '9000' or sw[0:2] in ['6f', '62', '63']:
+ # data something like 027100000e0ab000110000000000000001612f or
+ # 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
+ # which is the user-data portion of the SMS starting with the UDH (027100)
+ # TODO: return the response back to the sender in an RP-ACK; PID/DCS like in CMD
+ deliver = operations.DeliverSM(service_type=pdu.params['service_type'],
+ source_addr_ton=pdu.params['dest_addr_ton'],
+ source_addr_npi=pdu.params['dest_addr_npi'],
+ source_addr=pdu.params['destination_addr'],
+ dest_addr_ton=pdu.params['source_addr_ton'],
+ dest_addr_npi=pdu.params['source_addr_npi'],
+ destination_addr=pdu.params['source_addr'],
+ esm_class=pdu.params['esm_class'],
+ protocol_id=pdu.params['protocol_id'],
+ priority_flag=pdu.params['priority_flag'],
+ data_coding=pdu.params['data_coding'],
+ short_message=h2b(data))
+ smpp.sendDataRequest(deliver)
+ return pdu_types.CommandStatus.ESME_ROK
+ else:
+ return pdu_types.CommandStatus.ESME_RSUBMITFAIL
+
+
+option_parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+argparse_add_reader_args(option_parser)
+smpp_group = option_parser.add_argument_group('SMPP Options')
+smpp_group.add_argument('--smpp-bind-port', type=int, default=2775,
+ help='TCP Port to bind the SMPP socket to')
+smpp_group.add_argument('--smpp-bind-ip', default='::',
+ help='IPv4/IPv6 address to bind the SMPP socket to')
+
+if __name__ == '__main__':
+ log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
+ colorlog.basicConfig(level=logging.INFO, format = log_format)
+ logger = colorlog.getLogger()
+
+ opts = option_parser.parse_args()
+
+ #tp = init_reader(opts, proactive_handler = Proact())
+ tp = init_reader(opts)
+ if tp is None:
+ exit(1)
+ tp.connect()
+
+ ms = MyServer(opts.smpp_bind_port, opts.smpp_bind_ip)
+ ms.connect_to_card(tp)
+ reactor.run()
+