aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@osmocom.org>2022-08-11 17:37:46 +0200
committerHarald Welte <laforge@osmocom.org>2022-08-12 16:58:43 +0200
commit66717dfc45507e50dec1c6febebb8a8b7ff96d0b (patch)
tree998f49b9f6ea147020f3994e0edcf8b088da739a
parent63054b0d3681c4218e8305fc8c0f01efebe8e018 (diff)
WIP SMS
-rwxr-xr-xcontrib/jenkins.sh2
-rwxr-xr-xota_test.py57
-rwxr-xr-xpySim-shell.py2
-rw-r--r--pySim/sms.py355
-rw-r--r--requirements.txt1
-rw-r--r--setup.py3
-rwxr-xr-xsms_test.py23
-rw-r--r--tests/test_sms.py105
8 files changed, 539 insertions, 9 deletions
diff --git a/contrib/jenkins.sh b/contrib/jenkins.sh
index 76e14c4..7252423 100755
--- a/contrib/jenkins.sh
+++ b/contrib/jenkins.sh
@@ -27,6 +27,8 @@ pip install gsm0338
pip install termcolor
pip install colorlog
pip install pycryptodome
+# we need this direct git install, as pypi only lists the python2.7 only release 0.3 from 2013 :(
+pip install git+https://github.com/hologram-io/smpp.pdu
# Execute automatically discovered unit tests first
python -m unittest discover -v -s tests/
diff --git a/ota_test.py b/ota_test.py
new file mode 100755
index 0000000..dad4de1
--- /dev/null
+++ b/ota_test.py
@@ -0,0 +1,57 @@
+#!/usr/bin/python3
+
+from pySim.ota import *
+from pySim.sms import SMS_SUBMIT, SMS_DELIVER, AddressField
+from pySim.utils import h2b, h2b
+
+# KIC1 + KID1 of 8988211000000515398
+#KIC1 = h2b('C039ED58F7B81446105E79EBFD373038')
+#KID1 = h2b('1799B93FE53F430BD7FD4810C77E1FDF')
+#KIC3 = h2b('167F2576D64C8D41862954875C8D7979')
+#KID3 = h2b('ECAE122B0E6AE4186D6487D50FDC0922')
+
+# KIC1 + KID1 of 8988211000000467285
+KIC1 = h2b('D0FDA31990D8D64178601317191669B4')
+KID1 = h2b('D24EB461799C5E035C77451FD9404463')
+KIC3 = h2b('C21DD66ACAC13CB3BC8B331B24AFB57B')
+KID3 = h2b('12110C78E678C25408233076AA033615')
+
+od = OtaKeyset(algo_crypt='triple_des_cbc2', kic_idx=3, kic=KIC3,
+ algo_auth='triple_des_cbc2', kid_idx=3, kid=KID3)
+print(od.crypt)
+print(od.auth)
+
+dialect = OtaDialectSms()
+
+# RAM: B00000
+# SIM RFM: B00010
+# USIM RFM: B00011
+tar = h2b('B00011')
+
+spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
+ 'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
+outp = dialect.encode_cmd(od, tar, spi, apdu=b'\x00\xa4\x00\x04\x02\x3f\x00')
+print("result: %s" % b2h(outp))
+
+with_udh = b'\x02\x70\x00' + outp
+print("with_udh: %s" % b2h(with_udh))
+
+
+da = AddressField('12345678', 'unknown', 'isdn_e164')
+#tpdu = SMS_SUBMIT(tp_udhi=True, tp_mr=0x23, tp_da=da, tp_pid=0x7F, tp_dcs=0xF6, tp_udl=3, tp_ud=with_udh)
+tpdu = SMS_DELIVER(tp_udhi=True, tp_oa=da, tp_pid=0x7F, tp_dcs=0xF6, tp_scts=h2b('22705200000000'), tp_udl=3, tp_ud=with_udh)
+print(tpdu)
+print("tpdu: %s" % b2h(tpdu.toBytes()))
+
+spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
+ 'por_shall_be_ciphered':True, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
+dialect.decode_resp(od, spi, '027100001c12b000119660ebdb81be189b5e4389e9e7ab2bc0954f963ad869ed7c')
+
+spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
+ 'por_shall_be_ciphered':False, 'por_rc_cc_ds': 'cc', 'por': 'por_required'}
+dialect.decode_resp(od, spi, '027100001612b000110000000000000055f47118381175fb01612f')
+
+spi = {'counter':'no_counter', 'ciphering':True, 'rc_cc_ds': 'cc', 'por_in_submit':False,
+ 'por_shall_be_ciphered':False, 'por_rc_cc_ds': 'no_rc_cc_ds', 'por': 'por_required'}
+dialect.decode_resp(od, spi, '027100000e0ab000110000000000000001612f')
+
diff --git a/pySim-shell.py b/pySim-shell.py
index ef6a2b1..686968f 100755
--- a/pySim-shell.py
+++ b/pySim-shell.py
@@ -189,7 +189,7 @@ class PysimApp(cmd2.Cmd):
self.register_command_set(Iso7816Commands())
self.register_command_set(Ts102222Commands())
self.register_command_set(PySimCommands())
- self.iccid, sw = self.card.read_iccid()
+ #self.iccid, sw = self.card.read_iccid()
self.lchan.select('MF', self)
rc = True
else:
diff --git a/pySim/sms.py b/pySim/sms.py
index a953ba6..f5747e4 100644
--- a/pySim/sms.py
+++ b/pySim/sms.py
@@ -18,20 +18,25 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import typing
-from construct import Int8ub, Bytes
-from construct import Struct, Tell, this, RepeatUntil
+import abc
+from pprint import pprint as pp
+from construct import Int8ub, Byte, Bytes, Bit, Flag, BitsInteger, Flag
+from construct import Struct, Enum, Tell, BitStruct, this, Padding
+from construct import Prefixed, GreedyRange, GreedyBytes
+from pySim.construct import HexAdapter, BcdAdapter, TonNpi
from pySim.utils import Hexstr, h2b, b2h
+from smpp.pdu import pdu_types
+
BytesOrHex = typing.Union[Hexstr, bytes]
class UserDataHeader:
# a single IE in the user data header
- ie_c = Struct('offset'/Tell, 'iei'/Int8ub, 'length'/Int8ub, 'data'/Bytes(this.length))
+ ie_c = Struct('iei'/Int8ub, 'length'/Int8ub, 'value'/Bytes(this.length))
# parser for the full UDH: Length octet followed by sequence of IEs
- _construct = Struct('udhl'/Int8ub,
- # FIXME: somehow the below lambda is not working, we always only get the first IE?
- 'ies'/RepeatUntil(lambda obj,lst,ctx: ctx._io.tell() > 1+this.udhl, ie_c))
+ _construct = Struct('ies'/Prefixed(Int8ub, GreedyRange(ie_c)),
+ 'data'/GreedyBytes)
def __init__(self, ies=[]):
self.ies = ies
@@ -50,4 +55,340 @@ class UserDataHeader:
if isinstance(inb, str):
inb = h2b(inb)
res = cls._construct.parse(inb)
- return cls(res['ies']), inb[1+res['udhl']:]
+ return cls(res['ies']), res['data']
+
+ def toBytes(self) -> bytes:
+ return self._construct.build({'ies':self.ies, 'data':b''})
+
+
+def smpp_dcs_is_8bit(dcs: pdu_types.DataCoding) -> bool:
+ if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
+ pdu_types.DataCodingDefault.OCTET_UNSPECIFIED):
+ return True
+ if dcs == pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT,
+ pdu_types.DataCodingDefault.OCTET_UNSPECIFIED_COMMON):
+ return True
+ if dcs.scheme == pdu_types.DataCodingScheme.GSM_MESSAGE_CLASS and dcs.schemeData['msgCoding'] == pdu_types.DataCodingGsmMsgCoding.DATA_8BIT:
+ return True
+ else:
+ return False
+
+def ensure_smpp_is_8bit(dcs: pdu_types.DataCoding):
+ if not smpp_dcs_is_8bit(smpp_pdu.params['data_coding']):
+ raise ValueError('We only support 8bit coded SMS for now')
+
+class AddressField:
+ """Representation of an address field as used in SMS T-PDU."""
+ _construct = Struct('addr_len'/Int8ub,
+ 'type_of_addr'/TonNpi,
+ 'digits'/BcdAdapter(Bytes(this.addr_len//2 + this.addr_len%2)),
+ 'tell'/Tell)
+
+ def __init__(self, digits, ton='unknown', npi='unknown'):
+ self.ton = ton
+ self.npi = npi
+ self.digits = digits
+
+ def __str__(self):
+ return 'AddressField(TON=%s, NPI=%s, %s)' % (self.ton, self.npi, self.digits)
+
+ @classmethod
+ def fromBytes(cls, inb: BytesOrHex) -> typing.Tuple['AddressField', bytes]:
+ """Construct an AddressField instance from the binary T-PDU address format."""
+ if isinstance(inb, str):
+ inb = h2b(inb)
+ res = cls._construct.parse(inb)
+ #pp(res)
+ #print("size: %s" % cls._construct.sizeof())
+ ton = res['type_of_addr']['type_of_number']
+ npi = res['type_of_addr']['numbering_plan_id']
+ # return resulting instance + remainder bytes
+ return cls(res['digits'][:res['addr_len']], ton, npi), inb[res['tell']:]
+
+ @classmethod
+ def fromSmpp(cls, addr, ton, npi) -> 'AddressField':
+ """Construct an AddressField from {source,dest}_addr_{,ton,npi} attributes of smpp.pdu."""
+ smpp_map_npi = {
+ 'UNKNOWN': 'unknown',
+ 'ISDN': 'isdn_e164',
+ 'DATA': 'data_x121',
+ 'TELEX': 'telex_f69',
+ 'LAND_MOBILE': 'sc_specific6',
+ 'NATIONAL': 'national',
+ 'PRIVATE': 'private',
+ 'ERMES': 'ermes',
+ }
+ smpp_map_ton = {
+ 'UNKNOWN': 'unknown',
+ 'INTERNATIONAL': 'international',
+ 'NATIONAL': 'national',
+ 'NETWORK_SPECIFIC': 'network_specific',
+ 'SUBSCRIBER_NUMBER': 'short_code',
+ 'ALPHANUMERIC': 'alphanumeric',
+ 'ABBREVIATED': 'abbreviated',
+ }
+ # return the resulting instance
+ return cls(addr.decode('ascii'), smpp_map_ton[ton.name], smpp_map_npi[npi.name])
+
+
+ def toBytes(self) -> bytes:
+ """Encode the AddressField into the binary representation as used in T-PDU."""
+ num_digits = len(self.digits)
+ if num_digits % 2:
+ self.digits += 'f'
+ d = {
+ 'addr_len': num_digits,
+ 'type_of_addr': {
+ 'ext': True,
+ 'type_of_number': self.ton,
+ 'numbering_plan_id': self.npi,
+ },
+ 'digits': self.digits,
+ }
+ return self._construct.build(d)
+
+
+class SMS_TPDU(abc.ABC):
+ """Base class for a SMS T-PDU."""
+ def __init__(self, **kwargs):
+ self.tp_mti = kwargs.get('tp_mti', None)
+ self.tp_rp = kwargs.get('tp_rp', False)
+ self.tp_udhi = kwargs.get('tp_udhi', False)
+ self.tp_pid = kwargs.get('tp_pid', None)
+ self.tp_dcs = kwargs.get('tp_dcs', None)
+ self.tp_udl = kwargs.get('tp_udl', None)
+ self.tp_ud = kwargs.get('tp_ud', None)
+
+
+
+class SMS_DELIVER(SMS_TPDU):
+ """Representation of a SMS-DELIVER T-PDU."""
+ flags_construct = BitStruct('tp_rp'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag, 'tp_sri'/Flag,
+ Padding(1), 'tp_mms'/Flag, 'tp_mti'/BitsInteger(2))
+ def __init__(self, **kwargs):
+ kwargs['tp_mti'] = 0
+ super().__init__(**kwargs)
+ self.tp_lp = kwargs.get('tp_lp', False)
+ self.tp_mms = kwargs.get('tp_mms', False)
+ self.tp_oa = kwargs.get('tp_oa', None)
+ self.tp_scts = kwargs.get('tp_scts', None)
+ self.tp_sri = kwargs.get('tp_sri', False)
+
+ def __repr__(self):
+ return '%s(MTI=%s, MMS=%s, LP=%s, RP=%s, UDHI=%s, SRI=%s, OA=%s, PID=%2x, DCS=%x, SCTS=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_mms, self.tp_lp, self.tp_rp, self.tp_udhi, self.tp_sri, self.tp_oa, self.tp_pid, self.tp_dcs, self.tp_scts, self.tp_udl, self.tp_ud)
+
+ @classmethod
+ def fromBytes(cls, inb: BytesOrHex) -> 'SMS_DELIVER':
+ """Construct a SMS_DELIVER instance from the binary encoded format as used in T-PDU."""
+ if isinstance(inb, str):
+ inb = h2b(inb)
+ flags = inb[0]
+ d = SMS_DELIVER.flags_construct.parse(inb)
+ oa, remainder = AddressField.fromBytes(inb[1:])
+ d['tp_oa'] = oa
+ offset = 0
+ d['tp_pid'] = remainder[offset]
+ offset += 1
+ d['tp_dcs'] = remainder[offset]
+ offset += 1
+ # TODO: further decode
+ d['tp_scts'] = remainder[offset:offset+7]
+ offset += 7
+ d['tp_udl'] = remainder[offset]
+ offset += 1
+ d['tp_ud'] = remainder[offset:]
+ return cls(**d)
+
+ def toBytes(self) -> bytes:
+ """Encode a SMS_DELIVER instance to the binary encoded format as used in T-PDU."""
+ outb = bytearray()
+ d = {
+ 'tp_mti': self.tp_mti, 'tp_mms': self.tp_mms, 'tp_lp': self.tp_lp,
+ 'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_sri': self.tp_sri,
+ }
+ flags = SMS_DELIVER.flags_construct.build(d)
+ outb.extend(flags)
+ outb.extend(self.tp_oa.toBytes())
+ outb.append(self.tp_pid)
+ outb.append(self.tp_dcs)
+ outb.extend(self.tp_scts)
+ outb.append(self.tp_udl)
+ outb.extend(self.tp_ud)
+
+ return outb
+
+ @classmethod
+ def fromSmpp(cls, smpp_pdu) -> 'SMS_DELIVER':
+ """Construct a SMS_DELIVER instance from the deliver format used by smpp.pdu."""
+ if smpp_pdu.id == pdu_types.CommandId.submit_sm:
+ return cls.fromSmppSubmit(cls, smpp_pdu)
+ else:
+ raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
+
+ @classmethod
+ def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_DELIVER':
+ """Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
+ ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
+ tp_oa = AddressField.fromSmpp(smpp_pdu.params['source_addr'],
+ smpp_pdu.params['source_addr_ton'],
+ smpp_pdu.params['source_addr_npi'])
+ tp_ud = smpp_pdu.params['short_message']
+ d = {
+ 'tp_lp': False,
+ 'tp_mms': False,
+ 'tp_oa': tp_oa,
+ 'tp_scts': h2b('22705200000000'), # FIXME
+ 'tp_sri': False,
+ 'tp_rp': False,
+ 'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
+ 'tp_pid': smpp_pdu.params['protocol_id'],
+ 'tp_dcs': 0xF6, # we only deal with binary SMS here
+ 'tp_udl': len(tp_ud),
+ 'tp_ud': tp_ud,
+ }
+ return cls(**d)
+
+
+
+class SMS_SUBMIT(SMS_TPDU):
+ """Representation of a SMS-DELIVER T-PDU."""
+ flags_construct = BitStruct('tp_srr'/Flag, 'tp_udhi'/Flag, 'tp_rp'/Flag,
+ 'tp_vpf'/Enum(BitsInteger(2), none=0, relative=2, enhanced=1, absolute=3),
+ 'tp_rd'/Flag, 'tp_mti'/BitsInteger(2))
+ def __init__(self, **kwargs):
+ kwargs['tp_mti'] = 1
+ super().__init__(**kwargs)
+ self.tp_rd = kwargs.get('tp_rd', False)
+ self.tp_vpf = kwargs.get('tp_vpf', 'none')
+ self.tp_srr = kwargs.get('tp_srr', False)
+ self.tp_mr = kwargs.get('tp_mr', None)
+ self.tp_da = kwargs.get('tp_da', None)
+ self.tp_vp = kwargs.get('tp_vp', None)
+
+ def __repr__(self):
+ return '%s(MTI=%s, RD=%s, VPF=%u, RP=%s, UDHI=%s, SRR=%s, DA=%s, PID=%2x, DCS=%x, VP=%s, UDL=%u, UD=%s)' % (self.__class__.__name__, self.tp_mti, self.tp_rd, self.tp_vpf, self.tp_rp, self.tp_udhi, self.tp_srr, self.tp_da, self.tp_pid, self.tp_dcs, self.tp_vp, self.tp_udl, self.tp_ud)
+
+ @classmethod
+ def fromBytes(cls, inb:BytesOrHex) -> 'SMS_SUBMIT':
+ """Construct a SMS_SUBMIT instance from the binary encoded format as used in T-PDU."""
+ offset = 0
+ if isinstance(inb, str):
+ inb = h2b(inb)
+ d = SMS_SUBMIT.flags_construct.parse(inb)
+ offset += 1
+ d['tp_mr']= inb[offset]
+ offset += 1
+ da, remainder = AddressField.fromBytes(inb[2:])
+ d['tp_da'] = da
+
+ offset = 0
+ d['tp_pid'] = remainder[offset]
+ offset += 1
+ d['tp_dcs'] = remainder[offset]
+ offset += 1
+ if d['tp_vpf'] == 'none':
+ pass
+ elif d['tp_vpf'] == 'relative':
+ # TODO: further decode
+ d['tp_vp'] = remainder[offset:offset+1]
+ offset += 1
+ elif d['tp_vpf'] == 'enhanced':
+ # TODO: further decode
+ d['tp_vp'] = remainder[offset:offset+7]
+ offset += 7
+ pass
+ elif d['tp_vpf'] == 'absolute':
+ # TODO: further decode
+ d['tp_vp'] = remainder[offset:offset+7]
+ offset += 7
+ pass
+ else:
+ raise ValueError('Invalid VPF: %s' % d['tp_vpf'])
+ d['tp_udl'] = remainder[offset]
+ offset += 1
+ d['tp_ud'] = remainder[offset:]
+ return cls(**d)
+
+ def toBytes(self) -> bytes:
+ """Encode a SMS_SUBMIT instance to the binary encoded format as used in T-PDU."""
+ outb = bytearray()
+ d = {
+ 'tp_mti': self.tp_mti, 'tp_rd': self.tp_rd, 'tp_vpf': self.tp_vpf,
+ 'tp_rp': self.tp_rp, 'tp_udhi': self.tp_udhi, 'tp_srr': self.tp_srr,
+ }
+ flags = SMS_SUBMIT.flags_construct.build(d)
+ outb.extend(flags)
+ outb.append(self.tp_mr)
+ outb.extend(self.tp_da.toBytes())
+ outb.append(self.tp_pid)
+ outb.append(self.tp_dcs)
+ if self.tp_vpf != 'none':
+ outb.extend(self.tp_vp)
+ outb.append(self.tp_udl)
+ outb.extend(self.tp_ud)
+ return outb
+
+ @classmethod
+ def fromSmpp(cls, smpp_pdu) -> 'SMS_SUBMIT':
+ """Construct a SMS_DELIVER instance from the format used by smpp.pdu."""
+ if smpp_pdu.id == pdu_types.CommandId.submit_sm:
+ return cls.fromSmppSubmit(cls, smpp_pdu)
+ else:
+ raise ValueError('Unsupported SMPP commandId %s' % smpp_pdu.id)
+
+ @classmethod
+ def fromSmppSubmit(cls, smpp_pdu) -> 'SMS_SUBMIT':
+ """Construct a SMS_DELIVER instance from the submit format used by smpp.pdu."""
+ ensure_smpp_is_8bit(smpp_pdu.params['data_coding'])
+ tp_da = AddressField.fromSmpp(smpp_pdu.params['destination_addr'],
+ smpp_pdu.params['dest_addr_ton'],
+ smpp_pdu.params['dest_addr_npi'])
+ tp_ud = smpp_pdu.params['short_message']
+ #vp_smpp = smpp_pdu.params['validity_period']
+ #if not vp_smpp:
+ # vpf = 'none'
+ d = {
+ 'tp_rd': True if smpp_pdu.params['replace_if_present_flag'].name == 'REPLACE' else False,
+ 'tp_vpf': None, # vpf,
+ 'tp_rp': False, # related to ['registered_delivery'] ?
+ 'tp_udhi': pdu_types.EsmClassGsmFeatures.UDHI_INDICATOR_SET in smpp_pdu.params['esm_class'].gsmFeatures,
+ 'tp_srr': True if smpp_pdu.params['registered_delivery'] else False,
+ 'tp_mr': 0, # FIXME: sm_default_msg_id ?
+ 'tp_da': tp_da,
+ 'tp_pid': smpp_pdu.params['protocol_id'],
+ 'tp_dcs': 0xF6, # FIXME: we only deal with binary SMS here
+ 'tp_vp': None, # FIXME: implement VPF conversion
+ 'tp_udl': len(tp_ud),
+ 'tp_ud': tp_ud,
+ }
+ return cls(**d)
+
+ def toSmpp(self) -> pdu_types.PDU:
+ """Translate a SMS_DELIVER instance to a smpp.pdu.pdu_types.SubmitSM instance."""
+ esm_class = pdu_types.EsmClass(pdu_types.EsmClassMode.DEFAULT, pdu_types.EsmClassType.DEFAULT)
+ reg_del = pdu_types.RegisteredDelivery(pdu_types.RegisteredDeliveryReceipt.NO_SMSC_DELIVERY_RECEIPT_REQUESTED)
+ if self.tp_rp:
+ repl_if = pdu_types.ReplaceIfPresentFlag.REPLACE
+ else:
+ repl_if = pdu_types.ReplaceIfPresentFlag.DO_NOT_REPLACE
+ # we only deal with binary SMS here:
+ if self.tp_dcs != 0xF6:
+ raise ValueError('Unsupported DCS: We only support DCS=0xF6 for now')
+ dc = pdu_types.DataCoding(pdu_types.DataCodingScheme.DEFAULT, pdu_types.DataCodingDefault.OCTET_UNSPECIFIED)
+ return pdu_types.SubmitSM(service_type='',
+ source_addr_ton=pdu_types.AddrTon.ALPHANUMERIC,
+ source_addr_npi=pdu_types.AddrNpi.UNKNOWN,
+ source_addr='simcard',
+ dest_addr_ton=FIXME(self.tp_da.ton),
+ dest_addr_npi=FIXME(self.tp_da.npi),
+ destination_addr=self.tp_da.digits,
+ esm_class=esm_class,
+ protocol_id=self.tp_pid,
+ priority_flag=pdu_types.PriorityFlag.LEVEL_0,
+ #schedule_delivery_time,
+ #validity_period,
+ registered_delivery=reg_del,
+ replace_if_present_flag=repl_if,
+ data_coding=dc,
+ #sm_default_msg_id,
+ short_message=self.tp_ud)
diff --git a/requirements.txt b/requirements.txt
index b5677de..819d8bc 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,3 +10,4 @@ pyyaml>=5.1
termcolor
colorlog
pycryptodome
+git+https://github.com/hologram-io/smpp.pdu
diff --git a/setup.py b/setup.py
index d73e67e..c6ec982 100644
--- a/setup.py
+++ b/setup.py
@@ -19,7 +19,8 @@ setup(
"gsm0338",
"termcolor",
"colorlog",
- "pycryptodome"
+ "pycryptodome",
+ "smpp.pdu @ git+https://github.com/hologram-io/smpp.pdu",
],
scripts=[
'pySim-prog.py',
diff --git a/sms_test.py b/sms_test.py
new file mode 100755
index 0000000..a163f1a
--- /dev/null
+++ b/sms_test.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python3
+
+from pySim.sms import *
+from pprint import pprint as pp
+from construct import setGlobalPrintPrivateEntries
+
+
+print(UserDataHeader.fromBytes('027100'))
+print(UserDataHeader.fromBytes('027100abcdef'))
+print(UserDataHeader.fromBytes('03710110'))
+print(UserDataHeader.fromBytes('0571007001ffabcd'))
+
+setGlobalPrintPrivateEntries(True)
+pp(AddressField.fromBytes('0480214399'))
+
+s = SMS_SUBMIT.fromBytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a')
+pp(s)
+print(s.tp_da)
+pp(b2h(s.toBytes()))
+
+d = SMS_DELIVER.fromBytes('0408D0E5759A0E7FF6907090307513000824010101BB400101')
+pp(d)
+pp(b2h(d.toBytes()))
diff --git a/tests/test_sms.py b/tests/test_sms.py
new file mode 100644
index 0000000..8355224
--- /dev/null
+++ b/tests/test_sms.py
@@ -0,0 +1,105 @@
+#!/usr/bin/env python3
+
+import unittest
+from pySim.utils import h2b, b2h
+from pySim.sms import *
+
+class Test_SMS_UDH(unittest.TestCase):
+ def test_single_ie(self):
+ udh, tail = UserDataHeader.fromBytes('027100')
+ self.assertEqual(len(udh.ies), 1)
+ ie = udh.ies[0]
+ self.assertEqual(ie.iei, 0x71)
+ self.assertEqual(ie.length, 0)
+ self.assertEqual(ie.value, b'')
+ self.assertEqual(tail, b'')
+
+ def test_single_ie_tail(self):
+ udh, tail = UserDataHeader.fromBytes('027100abcdef')
+ self.assertEqual(len(udh.ies), 1)
+ ie = udh.ies[0]
+ self.assertEqual(ie.iei, 0x71)
+ self.assertEqual(ie.length, 0)
+ self.assertEqual(ie.value, b'')
+ self.assertEqual(tail, b'\xab\xcd\xef')
+
+ def test_single_ie_value(self):
+ udh, tail = UserDataHeader.fromBytes('03710110')
+ self.assertEqual(len(udh.ies), 1)
+ ie = udh.ies[0]
+ self.assertEqual(ie.iei, 0x71)
+ self.assertEqual(ie.length, 1)
+ self.assertEqual(ie.value, b'\x10')
+ self.assertEqual(tail, b'')
+
+ def test_two_ie_data_tail(self):
+ udh, tail = UserDataHeader.fromBytes('0571007001ffabcd')
+ self.assertEqual(len(udh.ies), 2)
+ ie = udh.ies[0]
+ self.assertEqual(ie.iei, 0x71)
+ self.assertEqual(ie.length, 0)
+ self.assertEqual(ie.value, b'')
+ ie = udh.ies[1]
+ self.assertEqual(ie.iei, 0x70)
+ self.assertEqual(ie.length, 1)
+ self.assertEqual(ie.value, b'\xff')
+ self.assertEqual(tail, b'\xab\xcd')
+
+ def test_toBytes(self):
+ indata = h2b('0571007001ff')
+ udh, tail = UserDataHeader.fromBytes(indata)
+ encoded = udh.toBytes()
+ self.assertEqual(encoded, indata)
+
+class Test_AddressField(unittest.TestCase):
+ def test_fromBytes(self):
+ encoded = h2b('0480214399')
+ af, trailer = AddressField.fromBytes(encoded)
+ self.assertEqual(trailer, b'\x99')
+ self.assertEqual(af.ton, 'unknown')
+ self.assertEqual(af.npi, 'unknown')
+ self.assertEqual(af.digits, '1234')
+
+ def test_fromBytes_odd(self):
+ af, trailer = AddressField.fromBytes('038021f399')
+ self.assertEqual(trailer, b'\x99')
+ self.assertEqual(af.ton, 'unknown')
+ self.assertEqual(af.npi, 'unknown')
+ self.assertEqual(af.digits, '123')
+
+ def test_toBytes(self):
+ encoded = h2b('04802143')
+ af, trailer = AddressField.fromBytes(encoded)
+ self.assertEqual(af.toBytes(), encoded)
+
+ def test_toBytes_odd(self):
+ af = AddressField('12345', 'international', 'isdn_e164')
+ encoded = af.toBytes()
+ self.assertEqual(encoded, h2b('05912143f5'))
+
+
+class Test_SUBMIT(unittest.TestCase):
+ def test_fromBytes(self):
+ s = SMS_SUBMIT.fromBytes('550d0b911614261771f000f5a78c0b050423f423f40003010201424547494e3a56434152440d0a56455253494f4e3a322e310d0a4e3a4d650d0a54454c3b505245463b43454c4c3b564f4943453a2b36313431363237313137300d0a54454c3b484f4d453b564f4943453a2b36313339353337303437310d0a54454c3b574f524b3b564f4943453a2b36313339363734373031350d0a454e443a')
+ self.assertEqual(s.tp_mti, 1)
+ self.assertEqual(s.tp_rd, True)
+ self.assertEqual(s.tp_vpf, 'relative')
+ self.assertEqual(s.tp_rp, False)
+ self.assertEqual(s.tp_udhi, True)
+ self.assertEqual(s.tp_srr, False)
+ self.assertEqual(s.tp_pid, 0)
+ self.assertEqual(s.tp_dcs, 0xf5)
+ self.assertEqual(s.tp_udl, 140)
+
+class Test_DELIVER(unittest.TestCase):
+ def test_fromBytes(self):
+ d = SMS_DELIVER.fromBytes('0408D0E5759A0E7FF6907090307513000824010101BB400101')
+ self.assertEqual(d.tp_mti, 0)
+ self.assertEqual(d.tp_mms, True)
+ self.assertEqual(d.tp_lp, False)
+ self.assertEqual(d.tp_rp, False)
+ self.assertEqual(d.tp_udhi, False)
+ self.assertEqual(d.tp_sri, False)
+ self.assertEqual(d.tp_pid, 0x7f)
+ self.assertEqual(d.tp_dcs, 0xf6)
+ self.assertEqual(d.tp_udl, 8)