aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@osmocom.org>2022-08-07 19:42:02 +0200
committerHarald Welte <laforge@osmocom.org>2022-08-12 16:58:43 +0200
commit8df793a8bddf12e4ed2da2e82ae79a7e5142a5fb (patch)
tree104be71cd9a9753e9d42983db11a2b2b7aa7dfbe
parent2241e72ecd134279b3fafe77f09d685aa803ec78 (diff)
WIP: vpcd2smpp.py
-rwxr-xr-xvpcd2smpp.py301
1 files changed, 301 insertions, 0 deletions
diff --git a/vpcd2smpp.py b/vpcd2smpp.py
new file mode 100755
index 0000000..e51edfe
--- /dev/null
+++ b/vpcd2smpp.py
@@ -0,0 +1,301 @@
+#!/usr/bin/env python3
+#
+# This program receive APDUs via the VPCD protocol of Frank Morgner's
+# virtualsmartcard, encrypts them with OTA (over the air) keys and
+# forwards them via SMPP to a SMSC (SMS service centre).
+#
+# In other words, you can use it as a poor man's OTA server, to enable
+# you to use unmodified application software with PC/SC support to talk
+# securely via OTA with a remote SMS card.
+#
+# This is very much a work in progress at this point.
+
+#######################################################################
+# twisted VPCD Library
+#######################################################################
+
+import logging
+import struct
+import abc
+from typing import Union, Optional
+from construct import Struct, Int8ub, Int16ub, If, Enum, Bytes, this, len_, Rebuild
+from twisted.internet.protocol import Protocol, ReconnectingClientFactory
+from pySim.utils import b2h, h2b
+
+logger = logging.getLogger(__name__)
+
+class VirtualCard(abc.ABC):
+ """Abstract base class for a virtual smart card."""
+ def __init__(self, atr: Union[str, bytes]):
+ if isinstance(atr, str):
+ atr = h2b(atr)
+ self.atr = atr
+
+ @abc.abstractmethod
+ def power_change(self, new_state: bool):
+ """Power the card on or off."""
+ pass
+
+ @abc.abstractmethod
+ def reset(self):
+ """Reset the card."""
+ pass
+
+ @abc.abstractmethod
+ def rx_c_apdu(self, apdu: bytes):
+ """Receive a C-APDU from the reader/application."""
+ pass
+
+ def tx_r_apdu(self, apdu: Union[str, bytes]):
+ if isinstance(apdu, str):
+ apdu = h2b(apdu)
+ logger.info("R-APDU: %s" % b2h(apdu))
+ self.protocol.send_data(apdu)
+
+class VpcdProtocolBase(Protocol):
+ # Prefixed couldn't be used as the this.length wouldn't be available in this case
+ construct = Struct('length'/Rebuild(Int16ub, len_(this.data) + len_(this.ctrl)),
+ 'data'/If(this.length > 1, Bytes(this.length)),
+ 'ctrl'/If(this.length == 1, Enum(Int8ub, off=0, on=1, reset=2, atr=4)))
+ def __init__(self, vcard: VirtualCard):
+ self.recvBuffer = b''
+ self.connectionCorrupted = False
+ self.pduReadTimer = None
+ self.pduReadTimerSecs = 10
+ self.callLater = reactor.callLater
+ self.on = False
+ self.vcard = vcard
+ self.vcard.protocol = self
+
+ def dataReceived(self, data: bytes):
+ """entry point where twisted tells us data was received."""
+ #logger.debug('Data received: %s' % b2h(data))
+ self.recvBuffer = self.recvBuffer + data
+ while True:
+ if self.connectionCorrupted:
+ return
+ msg = self.readMessage()
+ if msg is None:
+ break
+ self.endPDURead()
+ self.rawMessageReceived(msg)
+
+ if len(self.recvBuffer) > 0:
+ self.incompletePDURead()
+
+ def incompletePDURead(self):
+ """We have an incomplete PDU in readBuffer, schedule pduReadTimer"""
+ if self.pduReadTimer and self.pduReadTimer.active():
+ return
+ self.pduReadTimer = self.callLater(self.pduReadTimerSecs, self.onPDUReadTimeout)
+
+ def endPDURead(self):
+ """We completed reading a PDU, cancel the pduReadTimer."""
+ if self.pduReadTimer and self.pduReadTimer.active():
+ self.pduReadTimer.cancel()
+
+ def readMessage(self) -> Optional[bytes]:
+ """read an entire [raw] message."""
+ pduLen = self._getMessageLength()
+ if pduLen is None:
+ return None
+ return self._getMessage(pduLen)
+
+ def _getMessageLength(self) -> Optional[int]:
+ if len(self.recvBuffer) < 2:
+ return None
+ return struct.unpack('!H', self.recvBuffer[:2])[0]
+
+ def _getMessage(self, pduLen: int) -> Optional[bytes]:
+ if len(self.recvBuffer) < pduLen+2:
+ return None
+
+ message = self.recvBuffer[:pduLen+2]
+ self.recvBuffer = self.recvBuffer[pduLen+2:]
+ return message
+
+ def onPDUReadTimeout(self):
+ logger.error('PDU read timed out. Buffer is now considered corrupt')
+ #self.coruptDataReceived
+
+ def rawMessageReceived(self, message: bytes):
+ """Called once a complete binary vpcd message has been received."""
+ pdu = None
+ try:
+ pdu = VpcdProtocolBase.construct.parse(message)
+ except Exception as e:
+ logger.exception(e)
+ logger.critical('Received corrupt PDU %s' % b2h(message))
+ #self.corupDataRecvd()
+ else:
+ self.PDUReceived(pdu)
+
+ def PDUReceived(self, pdu):
+ logger.debug("Rx PDU: %s" % pdu)
+ if pdu['data']:
+ return self.on_rx_data(pdu)
+ else:
+ method = getattr(self, 'on_rx_' + pdu['ctrl'])
+ return method(pdu)
+
+ def on_rx_atr(self, pdu):
+ self.send_data(self.vcard.atr)
+
+ def on_rx_on(self, pdu):
+ if self.on:
+ return
+ else:
+ self.on = True
+ self.vcard.power_change(self.on)
+
+ def on_rx_reset(self, pdu):
+ self.vcard.reset()
+
+ def on_rx_off(self, pdu):
+ if not self.on:
+ return
+ else:
+ self.on = False
+ self.vcard.power_change(self.on)
+
+ def on_rx_data(self, pdu):
+ self.vcard.rx_c_apdu(pdu['data'])
+
+ def send_pdu(self, pdu):
+ logger.debug("Sending PDU: %s" % pdu)
+ encoded = VpcdProtocolBase.construct.build(pdu)
+ #logger.debug("Sending binary: %s" % b2h(encoded))
+ self.transport.write(encoded)
+
+ def send_data(self, data: Union[str, bytes]):
+ if isinstance(data, str):
+ data = h2b(data)
+ return self.send_pdu({'length': 0, 'ctrl': '', 'data': data})
+
+ def send_ctrl(self, ctrl: str):
+ return self.send_pdu({'length': 0, 'ctrl': ctrl, 'data': ''})
+
+
+class VpcdProtocolClient(VpcdProtocolBase):
+ pass
+
+
+class VpcdClientFactory(ReconnectingClientFactory):
+ def __init__(self, vcard_class: VirtualCard):
+ self.vcard_class = vcard_class
+
+ def startedConnecting(self, connector):
+ logger.debug('Started to connect')
+
+ def buildProtocol(self, addr):
+ logger.info('Connection established to %s' % addr)
+ self.resetDelay()
+ return VpcdProtocolClient(vcard = self.vcard_class())
+
+ def clientConnectionLost(self, connector, reason):
+ logger.warning('Connection lost (reason: %s)' % reason)
+ super().clientConnectionLost(connector, reason)
+
+ def clientConnectionFailed(self, connector, reason):
+ logger.warning('Connection failed (reason: %s)' % reason)
+ super().clientConnectionFailed(connector, reason)
+
+#######################################################################
+# Application
+#######################################################################
+
+from pprint import pprint as pp
+
+from twisted.internet.protocol import Protocol, ReconnectingClientFactory, ClientCreator
+from twisted.internet import reactor
+
+from smpp.twisted.client import SMPPClientTransceiver, SMPPClientService
+from smpp.twisted.protocol import SMPPClientProtocol
+from smpp.twisted.config import SMPPClientConfig
+from smpp.pdu.operations import SubmitSM, DeliverSM
+from smpp.pdu import pdu_types
+
+from pySim.ota import OtaKeyset, OtaDialectSms
+from pySim.utils import b2h, h2b
+
+
+class MyVcard(VirtualCard):
+ def __init__(self, **kwargs):
+ super().__init__(atr='3B9F96801FC78031A073BE21136743200718000001A5', **kwargs)
+ self.smpp_client = None
+ # KIC1 + KID1 of 8988211000000467285
+ KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
+ KID1 = h2b('D24EB461799C5E035C77451FD9404463')
+ KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
+ KID3 = h2b('12110C78E678C25408233076AA033615')
+ self.ota_keyset = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
+ algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
+ self.ota_dialect = OtaDialectSms()
+ self.tar = h2b('B00011')
+ self.spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
+ 'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
+
+ def ensure_smpp(self):
+ config = SMPPClientConfig(host='localhost', port=2775, username='test', password='test')
+ if self.smpp_client:
+ return
+ self.smpp_client = SMPPClientTransceiver(config, self.handleSmpp)
+ smpp = self.smpp_client.connectAndBind()
+ #self.smpp = ClientCreator(reactor, SMPPClientProtocol, config, self.handleSmpp)
+ #d = self.smpp.connectTCP(config.host, config.port)
+ #d = self.smpp.connectAndBind()
+ #d.addCallback(self.forwardToClient, self.smpp)
+
+ def power_change(self, new_state: bool):
+ if new_state:
+ logger.info("POWER ON")
+ self.ensure_smpp()
+ else:
+ logger.info("POWER OFF")
+
+ def reset(self):
+ logger.info("RESET")
+
+ def rx_c_apdu(self, apdu: bytes):
+ pp(self.smpp_client.smpp)
+ logger.info("C-APDU: %s" % b2h(apdu))
+ # translate to Secured OTA RFM
+ secured = self.ota_dialect.encode_cmd(self.ota_keyset, self.tar, self.spi, apdu=apdu)
+ # add user data header
+ tpdu = b'\x02\x70\x00' + secured
+ # send via SMPP
+ self.tx_sms_tpdu(tpdu)
+ #self.tx_r_apdu('9000')
+
+ def tx_sms_tpdu(self, tpdu: bytes):
+ """Send a SMS TPDU via SMPP SubmitSM."""
+ dcs = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
+ pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
+ esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT,
+ gsmFeatures=[pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET])
+ submit = SubmitSM(source_addr='12',destination_addr='23', data_coding=dcs, esm_class=esm_class,
+ protocol_id=0x7f, short_message=tpdu)
+ self.smpp_client.smpp.sendDataRequest(submit)
+
+ def handleSmpp(self, smpp, pdu):
+ #logger.info("Received SMPP %s" % pdu)
+ data = pdu.params['short_message']
+ #logger.info("Received SMS Data %s" % b2h(data))
+ r = self.ota_dialect.decode_resp(self.ota_keyset, self.spi, data)
+ logger.info("Decoded SMPP %s" % r)
+ self.tx_r_apdu(r['last_response_data'] + r['last_status_word'])
+
+
+if __name__ == '__main__':
+ import logging
+ logger = logging.getLogger(__name__)
+ import colorlog
+ log_format='%(log_color)s%(levelname)-8s%(reset)s %(name)s: %(message)s'
+ colorlog.basicConfig(level=logging.INFO, format = log_format)
+ logger = colorlog.getLogger()
+
+ from twisted.internet import reactor
+ host = 'localhost'
+ port = 35963
+ reactor.connectTCP(host, port, VpcdClientFactory(vcard_class=MyVcard))
+ reactor.run()