aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@osmocom.org>2022-07-31 15:51:19 +0200
committerHarald Welte <laforge@osmocom.org>2022-08-12 12:46:05 +0200
commit63054b0d3681c4218e8305fc8c0f01efebe8e018 (patch)
tree0366c436502ef615316ed07a9551ab3929781346
parentcab26c728c0544db398ab0d1ba78d123bf134c30 (diff)
Add new pySim.ota library, implement SIM OTA crypto
This introduces a hierarchy of classes implementing * ETS TS 102 225 (general command structure) * 3GPP TS 31.115 (dialects for SMS-PP) In this initial patch only the SMS "dialect" is supported, but it is foreseen that USSD/SMSCB/HTTPS dialects can be added at a later point. Change-Id: I193ff4712c8503279c017b4b1324f0c3d38b9f84
-rwxr-xr-xcontrib/jenkins.sh1
-rw-r--r--pySim/ota.py452
-rw-r--r--pySim/sms.py53
-rw-r--r--requirements.txt1
-rw-r--r--setup.py3
-rw-r--r--tests/test_ota.py85
6 files changed, 594 insertions, 1 deletions
diff --git a/contrib/jenkins.sh b/contrib/jenkins.sh
index ac5fba8..76e14c4 100755
--- a/contrib/jenkins.sh
+++ b/contrib/jenkins.sh
@@ -26,6 +26,7 @@ pip install bidict
pip install gsm0338
pip install termcolor
pip install colorlog
+pip install pycryptodome
# Execute automatically discovered unit tests first
python -m unittest discover -v -s tests/
diff --git a/pySim/ota.py b/pySim/ota.py
new file mode 100644
index 0000000..c2b475e
--- /dev/null
+++ b/pySim/ota.py
@@ -0,0 +1,452 @@
+"""Code related to SIM/UICC OTA according to TS 102 225 + TS 31.115."""
+
+# (C) 2021-2022 by Harald Welte <laforge@osmocom.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+from pySim.construct import *
+from pySim.utils import b2h
+from pySim.sms import UserDataHeader
+from construct import *
+from bidict import bidict
+import zlib
+import abc
+import struct
+
+# ETS TS 102 225 gives the general command structure and the dialects for CAT_TP, TCP/IP and HTTPS
+# 3GPP TS 31.115 gives the dialects for SMS-PP, SMS-CB, USSD and HTTP
+
+# CPI CPL CHI CHL SPI KIc KID TAR CNTR PCNTR RC/CC/DS data
+
+# CAT_TP TCP/IP SMS
+# CPI 0x01 0x01 =IEIa=70,len=0
+# CHI NULL NULL NULL
+# CPI, CPL and CHL included in RC/CC/DS true true
+# RPI 0x02 0x02 =IEIa=71,len=0
+# RHI NULL NULL
+# RPI, RPL and RHL included in RC/CC/DS true true
+# packet-id 0-bf,ff 0-bf,ff
+# identification packet false 102 225 tbl 6
+
+# KVN 1..f; KI1=KIc, KI2=KID, KI3=DEK
+
+# TS 102 225 Table 5
+ota_status_codes = bidict({
+ 0x00: 'PoR OK',
+ 0x01: 'RC/CC/DS failed',
+ 0x02: 'CNTR low',
+ 0x03: 'CNTR high',
+ 0x04: 'CNTR blocked',
+ 0x05: 'Ciphering error',
+ 0x06: 'Unidentified security error',
+ 0x07: 'Insufficient memory',
+ 0x08: 'more time',
+ 0x09: 'TAR unknown',
+ 0x0a: 'Insufficient security level',
+ 0x0b: 'Actual Response in SMS-SUBMIT', # 31.115
+ 0x0c: 'Actual Response in USSD', # 31.115
+})
+
+# ETSI TS 102 225 Table 5 + 3GPP TS 31.115 Section 7
+ResponseStatus = Enum(Int8ub, por_ok=0, rc_cc_ds_failed=1, cntr_low=2, cntr_high=3,
+ cntr_blocked=4, ciphering_error=5, undefined_security_error=6,
+ insufficient_memory=7, more_time_needed=8, tar_unknown=9,
+ insufficient_security_level=0x0A,
+ actual_response_sms_submit=0x0B,
+ actual_response_ussd=0x0C)
+
+# ETSI TS 102 226 Section 5.1.2
+CompactRemoteResp = Struct('number_of_commands'/Int8ub,
+ 'last_status_word'/HexAdapter(Bytes(2)),
+ 'last_response_data'/HexAdapter(GreedyBytes))
+
+RC_CC_DS = Enum(BitsInteger(2), no_rc_cc_ds=0, rc=1, cc=2, ds=3)
+
+# TS 102 225 Section 5.1.1 + TS 31.115 Section 4.2
+SPI = BitStruct( # first octet
+ Padding(3),
+ 'counter'/Enum(BitsInteger(2), no_counter=0, counter_no_replay_or_seq=1,
+ counter_must_be_higher=2, counter_must_be_lower=3),
+ 'ciphering'/Flag,
+ 'rc_cc_ds'/RC_CC_DS,
+ # second octet
+ Padding(2),
+ 'por_in_submit'/Flag,
+ 'por_shall_be_ciphered'/Flag,
+ 'por_rc_cc_ds'/RC_CC_DS,
+ 'por'/Enum(BitsInteger(2), no_por=0,
+ por_required=1, por_only_when_error=2)
+)
+
+# TS 102 225 Section 5.1.2
+KIC = BitStruct('key'/BitsInteger(4),
+ 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
+ aes_cbc=2)
+ )
+
+# TS 102 225 Section 5.1.3.1
+KID_CC = BitStruct('key'/BitsInteger(4),
+ 'algo'/Enum(BitsInteger(4), implicit=0, single_des=1, triple_des_cbc2=5, triple_des_cbc3=9,
+ aes_cmac=2)
+ )
+
+# TS 102 225 Section 5.1.3.2
+KID_RC = BitStruct('key'/BitsInteger(4),
+ 'algo'/Enum(BitsInteger(4), implicit=0, crc16=1, crc32=5, proprietary=3)
+ )
+
+SmsCommandPacket = Struct('cmd_pkt_len'/Int16ub,
+ 'cmd_hdr_len'/Int8ub,
+ 'spi'/SPI,
+ 'kic'/KIC,
+ 'kid'/Switch(this.spi.rc_cc_ds, {'cc': KID_CC, 'rc': KID_RC }),
+ 'tar'/Bytes(3),
+ 'secured_data'/GreedyBytes)
+
+class OtaKeyset:
+ """The OTA related data (key material, counter) to be used in encrypt/decrypt."""
+ def __init__(self, algo_crypt: str, kic_idx: int, kic: bytes,
+ algo_auth: str, kid_idx: int, kid: bytes, cntr: int = 0):
+ self.algo_crypt = algo_crypt
+ self.kic = bytes(kic)
+ self.kic_idx = kic_idx
+ self.algo_auth = algo_auth
+ self.kid = bytes(kid)
+ self.kid_idx = kid_idx
+ self.cntr = cntr
+
+ @property
+ def auth(self):
+ """Return an instance of the matching OtaAlgoAuth."""
+ return OtaAlgoAuth.fromKeyset(self)
+
+ @property
+ def crypt(self):
+ """Return an instance of the matching OtaAlgoCrypt."""
+ return OtaAlgoCrypt.fromKeyset(self)
+
+class OtaCheckError(Exception):
+ pass
+
+class OtaDialect(abc.ABC):
+ """Base Class for OTA dialects such as SMS, BIP, ..."""
+
+ def _compute_sig_len(self, spi:SPI):
+ if spi['rc_cc_ds'] == 'no_rc_cc_ds':
+ return 0
+ elif spi['rc_cc_ds'] == 'rc': # CRC-32
+ return 4
+ elif spi['rc_cc_ds'] == 'cc': # Cryptographic Checksum (CC)
+ # TODO: this is not entirely correct, as in AES case it could be 4 or 8
+ return 8
+ else:
+ raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
+
+ @abc.abstractmethod
+ def encode_cmd(self, otak: OtaKeyset, tar: bytes, apdu: bytes) -> bytes:
+ pass
+
+ @abc.abstractmethod
+ def decode_resp(self, otak: OtaKeyset, apdu: bytes) -> bytes:
+ pass
+
+
+from Crypto.Cipher import DES, DES3, AES
+from Crypto.Hash import CMAC
+
+class OtaAlgo(abc.ABC):
+ iv = b'\x00\x00\x00\x00\x00\x00\x00\x00'
+ blocksize = None
+ enum_name = None
+
+ @staticmethod
+ def _get_padding(in_len: int, multiple: int, padding: int = 0):
+ """Return padding bytes towards multiple of N."""
+ if in_len % multiple == 0:
+ return b''
+ pad_cnt = multiple - (in_len % multiple)
+ return b'\x00' * pad_cnt
+
+ @staticmethod
+ def _pad_to_multiple(indat: bytes, multiple: int, padding: int = 0):
+ """Pad input bytes to multiple of N."""
+ return indat + OtaAlgo._get_padding(len(indat), multiple, padding)
+
+ def pad_to_blocksize(self, indat: bytes, padding: int = 0):
+ """Pad the given input data to multiple of the cipher block size."""
+ return self._pad_to_multiple(indat, self.blocksize, padding)
+
+ def __init__(self, otak: OtaKeyset):
+ self.otak = otak
+
+ def __str__(self):
+ return self.__class__.__name__
+
+class OtaAlgoCrypt(OtaAlgo, abc.ABC):
+ def __init__(self, otak: OtaKeyset):
+ if self.enum_name != otak.algo_crypt:
+ raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
+ super().__init__(otak)
+
+ def encrypt(self, data:bytes) -> bytes:
+ """Encrypt given input bytes using the key material given in constructor."""
+ padded_data = self.pad_to_blocksize(data)
+ return self._encrypt(data)
+
+ def decrypt(self, data:bytes) -> bytes:
+ """Decrypt given input bytes using the key material given in constructor."""
+ return self._decrypt(data)
+
+ @abc.abstractmethod
+ def _encrypt(self, data:bytes) -> bytes:
+ """Actual implementation, to be implemented by derived class."""
+ pass
+
+ @abc.abstractmethod
+ def _decrypt(self, data:bytes) -> bytes:
+ """Actual implementation, to be implemented by derived class."""
+ pass
+
+ @classmethod
+ def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoCrypt':
+ """Resolve the class for the encryption algorithm of otak and instantiate it."""
+ for subc in cls.__subclasses__():
+ if subc.enum_name == otak.algo_crypt:
+ return subc(otak)
+ raise ValueError('No implementation for crypt algorithm %s' % otak.algo_auth)
+
+class OtaAlgoAuth(OtaAlgo, abc.ABC):
+ def __init__(self, otak: OtaKeyset):
+ if self.enum_name != otak.algo_auth:
+ raise ValueError('Cannot use algorithm %s with key for %s' % (self.enum_name, otak.algo_crypt))
+ super().__init__(otak)
+
+ def sign(self, data:bytes) -> bytes:
+ """Compute the CC/CR check bytes for the input data using key material
+ given in constructor."""
+ padded_data = self.pad_to_blocksize(data)
+ sig = self._sign(padded_data)
+ return sig
+
+ def check_sig(self, data:bytes, cc_received:bytes):
+ """Compute the CC/CR check bytes for the input data and compare against cc_received."""
+ cc = self.sign(data)
+ if cc_received != cc:
+ raise OtaCheckError('Received CC (%s) != Computed CC (%s)' % (b2h(cc_received), b2h(cc)))
+
+ @abc.abstractmethod
+ def _sign(self, data:bytes) -> bytes:
+ """Actual implementation, to be implemented by derived class."""
+ pass
+
+ @classmethod
+ def fromKeyset(cls, otak: OtaKeyset) -> 'OtaAlgoAuth':
+ """Resolve the class for the authentication algorithm of otak and instantiate it."""
+ for subc in cls.__subclasses__():
+ if subc.enum_name == otak.algo_auth:
+ return subc(otak)
+ raise ValueError('No implementation for auth algorithm %s' % otak.algo_auth)
+
+class OtaAlgoCryptDES(OtaAlgoCrypt):
+ """DES is insecure. For backwards compatibility with pre-Rel8"""
+ name = 'DES'
+ enum_name = 'single_des'
+ blocksize = 8
+ def _encrypt(self, data:bytes) -> bytes:
+ cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
+ return cipher.encrypt(data)
+
+ def _decrypt(self, data:bytes) -> bytes:
+ cipher = DES.new(self.otak.kic, DES.MODE_CBC, self.iv)
+ return cipher.decrypt(data)
+
+class OtaAlgoAuthDES(OtaAlgoAuth):
+ """DES is insecure. For backwards compatibility with pre-Rel8"""
+ name = 'DES'
+ enum_name = 'single_des'
+ blocksize = 8
+ def _sign(self, data:bytes) -> bytes:
+ cipher = DES.new(self.otak.kid, DES.MODE_CBC, self.iv)
+ ciph = cipher.encrypt(data)
+ return ciph[len(ciph) - 8:]
+
+class OtaAlgoCryptDES3(OtaAlgoCrypt):
+ name = '3DES'
+ enum_name = 'triple_des_cbc2'
+ blocksize = 8
+ def _encrypt(self, data:bytes) -> bytes:
+ cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
+ return cipher.encrypt(data)
+
+ def _decrypt(self, data:bytes) -> bytes:
+ cipher = DES3.new(self.otak.kic, DES3.MODE_CBC, self.iv)
+ return cipher.decrypt(data)
+
+class OtaAlgoAuthDES3(OtaAlgoAuth):
+ name = '3DES'
+ enum_name = 'triple_des_cbc2'
+ blocksize = 8
+ def _sign(self, data:bytes) -> bytes:
+ cipher = DES3.new(self.otak.kid, DES3.MODE_CBC, self.iv)
+ ciph = cipher.encrypt(data)
+ return ciph[len(ciph) - 8:]
+
+class OtaAlgoCryptAES(OtaAlgoCrypt):
+ name = 'AES'
+ enum_name = 'aes_cbc'
+ blocksize = 16 # TODO: is this needed?
+ def _encrypt(self, data:bytes) -> bytes:
+ cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
+ return cipher.encrypt(data)
+
+ def _decrypt(self, data:bytes) -> bytes:
+ cipher = AES.new(self.otak.kic, AES.MODE_CBC, self.iv)
+ return cipher.decrypt(data)
+
+class OtaAlgoAuthAES(OtaAlgoAuth):
+ name = 'AES'
+ enum_name = 'aes_cmac'
+ blocksize = 16 # TODO: is this needed?
+ def _sign(self, data:bytes) -> bytes:
+ cmac = CMAC.new(self.otak.kid, ciphermod=AES, mac_len=8)
+ cmac.update(data)
+ ciph = cmac.digest()
+ return ciph[len(ciph) - 8:]
+
+
+
+class OtaDialectSms(OtaDialect):
+ """OTA dialect for SMS based transport, as described in 3GPP TS 31.115."""
+ SmsResponsePacket = Struct('rpl'/Int16ub,
+ 'rhl'/Int8ub,
+ 'tar'/Bytes(3),
+ 'cntr'/Bytes(5),
+ 'pcntr'/Int8ub,
+ 'response_status'/ResponseStatus,
+ 'cc_rc'/Bytes(this.rhl-10),
+ 'secured_data'/GreedyBytes)
+
+ def encode_cmd(self, otak: OtaKeyset, tar: bytes, spi: dict, apdu: bytes) -> bytes:
+ # length of signature in octets
+ len_sig = self._compute_sig_len(spi)
+ pad_cnt = 0
+ if spi['ciphering']: # ciphering is requested
+ # append padding bytes to end up with blocksize
+ len_cipher = 6 + len_sig + len(apdu)
+ apdu += otak.crypt._get_padding(len_cipher, otak.crypt.blocksize)
+
+ kic = {'key': otak.kic_idx, 'algo': otak.algo_crypt}
+ kid = {'key': otak.kid_idx, 'algo': otak.algo_auth}
+
+ # CHL = number of octets from (and including) SPI to the end of RC/CC/DS
+ # 13 == SPI(2) + KIc(1) + KId(1) + TAR(3) + CNTR(5) + PCNTR(1)
+ chl = 13 + len_sig
+
+ # CHL + SPI (+ KIC + KID)
+ c = Struct('chl'/Int8ub, 'spi'/SPI, 'kic'/KIC, 'kid'/KID_CC, 'tar'/Bytes(3))
+ part_head = c.build({'chl': chl, 'spi':spi, 'kic':kic, 'kid':kid, 'tar':tar})
+ #print("part_head: %s" % b2h(part_head))
+
+ # CNTR + PCNTR (CNTR not used)
+ part_cnt = otak.cntr.to_bytes(5, 'big') + pad_cnt.to_bytes(1, 'big')
+ #print("part_cnt: %s" % b2h(part_cnt))
+
+ envelope_data = part_head + part_cnt + apdu
+ #print("envelope_data: %s" % b2h(envelope_data))
+
+ # 2-byte CPL. CPL is part of RC/CC/CPI to end of secured data, including any padding for ciphering
+ # CPL from and including CPI to end of secured data, including any padding for ciphering
+ cpl = len(envelope_data) + len_sig
+ envelope_data = cpl.to_bytes(2, 'big') + envelope_data
+ #print("envelope_data with cpl: %s" % b2h(envelope_data))
+
+ if spi['rc_cc_ds'] == 'cc':
+ cc = otak.auth.sign(envelope_data)
+ envelope_data = part_cnt + cc + apdu
+ elif spi['rc_cc_ds'] == 'rc':
+ # CRC32
+ crc32 = zlib.crc32(envelope_data) & 0xffffffff
+ envelope_data = part_cnt + crc32.to_bytes(4, 'big') + apdu
+ elif spi['rc_cc_ds'] == 'no_rc_cc_ds':
+ envelope_data = part_cnt + apdu
+ else:
+ raise ValueError("Invalid rc_cc_ds: %s" % spi['rc_cc_ds'])
+
+ #print("envelope_data with sig: %s" % b2h(envelope_data))
+
+ # encrypt as needed
+ if spi['ciphering']: # ciphering is requested
+ ciph = otak.crypt.encrypt(envelope_data)
+ envelope_data = part_head + ciph
+ # prefix with another CPL
+ cpl = len(envelope_data)
+ envelope_data = cpl.to_bytes(2, 'big') + envelope_data
+ else:
+ envelope_data = part_head + envelope_data
+
+ #print("envelope_data: %s" % b2h(envelope_data))
+
+ return envelope_data
+
+ def decode_resp(self, otak: OtaKeyset, spi: dict, data: bytes) -> bytes:
+ if isinstance(data, str):
+ data = h2b(data)
+ # plain-text POR: 027100000e0ab000110000000000000001612f
+ # UDHL RPI IEDLa RPL RHL TAR CNTR PCNTR STS
+ # 02 71 00 000e 0a b00011 0000000000 00 00 01 612f
+ # POR with CC: 027100001612b000110000000000000055f47118381175fb01612f
+ # POR with CC+CIPH: 027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c
+ if data[0] != 0x02:
+ raise ValueError('Unexpected UDL=0x%02x' % data[0])
+ udhd, remainder = UserDataHeader.fromBytes(data)
+ if not udhd.has_ie(0x71):
+ raise ValueError('RPI 0x71 not found in UDH')
+ rph_rhl_tar = remainder[:6] # RPH+RHL+TAR; not ciphered
+ res = self.SmsResponsePacket.parse(remainder)
+
+ if spi['por_shall_be_ciphered']:
+ # decrypt
+ ciphered_part = remainder[6:]
+ deciph = otak.crypt.decrypt(ciphered_part)
+ temp_data = rph_rhl_tar + deciph
+ res = self.SmsResponsePacket.parse(temp_data)
+ # remove specified number of padding bytes, if any
+ if res['pcntr'] != 0:
+ # this conditional is needed as python [:-0] renders an empty return!
+ res['secured_data'] = res['secured_data'][:-res['pcntr']]
+ remainder = temp_data
+
+ # is there a CC/RC present?
+ len_sig = res['rhl'] - 10
+ if spi['por_rc_cc_ds'] == 'no_rc_cc_ds':
+ if len_sig:
+ raise OtaCheckError('No RC/CC/DS requested, but len_sig=%u' % len_sig)
+ elif spi['por_rc_cc_ds'] == 'cc':
+ # verify signature
+ # UDH is part of CC/RC!
+ udh = data[:3]
+ # RPL, RHL, TAR, CNTR, PCNTR and STSare part of CC/RC
+ rpl_rhl_tar_cntr_pcntr_sts = remainder[:13]
+ # remove the CC/RC bytes
+ temp_data = udh + rpl_rhl_tar_cntr_pcntr_sts + remainder[13+len_sig:]
+ cc = otak.auth.check_sig(temp_data, res['cc_rc'])
+ # TODO: CRC
+ else:
+ raise OtaCheckError('Unknown por_rc_cc_ds: %s' % spi['por_rc_cc_ds'])
+
+ # TODO: ExpandedRemoteResponse according to TS 102 226 5.2.2
+ dec = CompactRemoteResp.parse(res['secured_data'])
+ dec['tar'] = res['tar']
+ dec['response_status'] = res['response_status']
+ return dec
diff --git a/pySim/sms.py b/pySim/sms.py
new file mode 100644
index 0000000..a953ba6
--- /dev/null
+++ b/pySim/sms.py
@@ -0,0 +1,53 @@
+"""Code related to SMS Encoding/Decoding"""
+# simplistic SMS T-PDU code, as unfortunately nobody bothered to port the python smspdu
+# module to python3, and I gave up after >= 3 hours of trying and failing to do so
+
+# (C) 2022 by Harald Welte <laforge@osmocom.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import typing
+from construct import Int8ub, Bytes
+from construct import Struct, Tell, this, RepeatUntil
+
+from pySim.utils import Hexstr, h2b, b2h
+
+BytesOrHex = typing.Union[Hexstr, bytes]
+
+class UserDataHeader:
+ # a single IE in the user data header
+ ie_c = Struct('offset'/Tell, 'iei'/Int8ub, 'length'/Int8ub, 'data'/Bytes(this.length))
+ # parser for the full UDH: Length octet followed by sequence of IEs
+ _construct = Struct('udhl'/Int8ub,
+ # FIXME: somehow the below lambda is not working, we always only get the first IE?
+ 'ies'/RepeatUntil(lambda obj,lst,ctx: ctx._io.tell() > 1+this.udhl, ie_c))
+
+ def __init__(self, ies=[]):
+ self.ies = ies
+
+ def __str__(self) -> str:
+ return 'UDH(%s)' % self.ies
+
+ def has_ie(self, iei:int) -> bool:
+ for ie in self.ies:
+ if ie['iei'] == iei:
+ return True
+ return False
+
+ @classmethod
+ def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['UserDataHeader', bytes]:
+ if isinstance(inb, str):
+ inb = h2b(inb)
+ res = cls._construct.parse(inb)
+ return cls(res['ies']), inb[1+res['udhl']:]
diff --git a/requirements.txt b/requirements.txt
index 320cc7a..b5677de 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,3 +9,4 @@ gsm0338
pyyaml>=5.1
termcolor
colorlog
+pycryptodome
diff --git a/setup.py b/setup.py
index ec1a76a..d73e67e 100644
--- a/setup.py
+++ b/setup.py
@@ -18,7 +18,8 @@ setup(
"bidict",
"gsm0338",
"termcolor",
- "colorlog"
+ "colorlog",
+ "pycryptodome"
],
scripts=[
'pySim-prog.py',
diff --git a/tests/test_ota.py b/tests/test_ota.py
new file mode 100644
index 0000000..e8f56f8
--- /dev/null
+++ b/tests/test_ota.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3
+
+import unittest
+from pySim.utils import h2b, b2h
+from pySim.ota import *
+
+class Test_SMS_3DES(unittest.TestCase):
+ tar = h2b('b00000')
+ """Test the OtaDialectSms for 3DES algorithms."""
+ def __init__(self, foo, **kwargs):
+ super().__init__(foo, **kwargs)
+ # KIC1 + KID1 of 8988211000000467285
+ KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
+ KID1 = h2b('D24EB461799C5E035C77451FD9404463')
+ KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
+ KID3 = h2b('12110C78E678C25408233076AA033615')
+ self.od = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
+ algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
+ self.dialect = OtaDialectSms()
+ self.spi_base = {
+ 'counter':'no_counter',
+ 'ciphering': True,
+ 'rc_cc_ds': 'cc',
+ 'por_in_submit':False,
+ 'por': 'por_required',
+ 'por_shall_be_ciphered': True,
+ 'por_rc_cc_ds': 'cc',
+ }
+
+ def _check_response(self, r):
+ self.assertEqual(r['number_of_commands'], 1)
+ self.assertEqual(r['last_status_word'], '612f')
+ self.assertEqual(r['last_response_data'], u'')
+ self.assertEqual(r['response_status'], 'por_ok')
+
+ def test_resp_3des_ciphered(self):
+ spi = self.spi_base
+ spi['por_shall_be_ciphered'] = True
+ spi['por_rc_cc_ds'] = 'cc'
+ r = self.dialect.decode_resp(self.od, spi, '027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c')
+ self._check_response(r)
+
+ def test_resp_3des_signed(self):
+ spi = self.spi_base
+ spi['por_shall_be_ciphered'] = False
+ spi['por_rc_cc_ds'] = 'cc'
+ r = self.dialect.decode_resp(self.od, spi, '027100001612b000110000000000000055f47118381175fb01612f')
+ self._check_response(r)
+
+ def test_resp_3des_signed_err(self):
+ """Expect an OtaCheckError exception if the computed CC != received CC"""
+ spi = self.spi_base
+ spi['por_shall_be_ciphered'] = False
+ spi['por_rc_cc_ds'] = 'cc'
+ with self.assertRaises(OtaCheckError) as context:
+ r = self.dialect.decode_resp(self.od, spi, '027100001612b000110000000000000055f47118381175fb02612f')
+ self.assertTrue('!= Computed CC' in str(context.exception))
+
+ def test_resp_3des_none(self):
+ spi = self.spi_base
+ spi['por_shall_be_ciphered'] = False
+ spi['por_rc_cc_ds'] = 'no_rc_cc_ds'
+ r = self.dialect.decode_resp(self.od, spi, '027100000e0ab000110000000000000001612f')
+ self._check_response(r)
+
+ def test_cmd_3des_ciphered(self):
+ spi = self.spi_base
+ spi['ciphering'] = True
+ spi['rc_cc_ds'] = 'no_rc_cc_ds'
+ r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
+ self.assertEqual(b2h(r), '00180d04193535b000000c8478b552a4ffc5a8f099b83cad7123')
+
+ def test_cmd_3des_signed(self):
+ spi = self.spi_base
+ spi['ciphering'] = False
+ spi['rc_cc_ds'] = 'cc'
+ r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
+ self.assertEqual(b2h(r), '1502193535b00000000000000000072ea17bdb72060e00a40000023f00')
+
+ def test_cmd_3des_none(self):
+ spi = self.spi_base
+ spi['ciphering'] = False
+ spi['rc_cc_ds'] = 'no_rc_cc_ds'
+ r = self.dialect.encode_cmd(self.od, self.tar, spi, h2b('00a40000023f00'))
+ self.assertEqual(b2h(r), '0d00193535b0000000000000000000a40000023f00')