diff options
-rwxr-xr-x | contrib/es2p_client.py | 79 | ||||
-rwxr-xr-x | contrib/jenkins.sh | 3 | ||||
-rw-r--r-- | pySim/esim/__init__.py | 2 | ||||
-rw-r--r-- | pySim/esim/es2p.py | 458 |
4 files changed, 540 insertions, 2 deletions
diff --git a/contrib/es2p_client.py b/contrib/es2p_client.py new file mode 100755 index 0000000..1353006 --- /dev/null +++ b/contrib/es2p_client.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 + +# (C) 2024 by Harald Welte <laforge@osmocom.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import copy +import argparse +from pySim.esim import es2p + +EID_HELP='EID of the eUICC for which eSIM shall be made available' +ICCID_HELP='The ICCID of the eSIM that shall be made available' +MATCHID_HELP='MatchingID that shall be used by profile download' + +parser = argparse.ArgumentParser(description=""" +Utility to manuall issue requests against the ES2+ API of an SM-DP+ according to GSMA SGP.22.""") +parser.add_argument('--url', required=True, help='Base URL of ES2+ API endpoint') +parser.add_argument('--id', required=True, help='Entity identifier passed to SM-DP+') +parser.add_argument('--client-cert', help='X.509 client certificate used to authenticate to server') +parser.add_argument('--server-ca-cert', help="""X.509 CA certificates acceptable for the server side. In + production use cases, this would be the GSMA Root CA (CI) certificate.""") +subparsers = parser.add_subparsers(dest='command',help="The command (API function) to call") + +parser_dlo = subparsers.add_parser('download-order', help="ES2+ DownloadOrder function") +parser_dlo.add_argument('--eid', help=EID_HELP) +parser_dlo.add_argument('--iccid', help=ICCID_HELP) +parser_dlo.add_argument('--profileType', help='The profile type of which one eSIM shall be made available') + +parser_cfo = subparsers.add_parser('confirm-order', help="ES2+ ConfirmOrder function") +parser_cfo.add_argument('--iccid', required=True, help=ICCID_HELP) +parser_cfo.add_argument('--eid', help=EID_HELP) +parser_cfo.add_argument('--matchingId', help=MATCHID_HELP) +parser_cfo.add_argument('--confirmationCode', help='Confirmation code that shall be used by profile download') +parser_cfo.add_argument('--smdsAddress', help='SM-DS Address') +parser_cfo.add_argument('--releaseFlag', action='store_true', help='Shall the profile be immediately released?') + +parser_co = subparsers.add_parser('cancel-order', help="ES2+ CancelOrder function") +parser_co.add_argument('--iccid', required=True, help=ICCID_HELP) +parser_co.add_argument('--eid', help=EID_HELP) +parser_co.add_argument('--matchingId', help=MATCHID_HELP) +parser_co.add_argument('--finalProfileStatusIndicator', required=True, choices=['Available','Unavailable']) + +parser_rp = subparsers.add_parser('release-profile', help='ES2+ ReleaseProfile function') +parser_rp.add_argument('--iccid', required=True, help=ICCID_HELP) + +if __name__ == '__main__': + opts = parser.parse_args() + #print(opts) + + peer = es2p.Es2pApiClient(opts.url, opts.id, server_cert_verify=opts.server_ca_cert, client_cert=opts.client_cert) + + data = {} + for k, v in vars(opts).items(): + if k in ['url', 'id', 'client_cert', 'server_ca_cert', 'command']: + # remove keys from dict that shold not end up in JSON... + continue + if v is not None: + data[k] = v + + print(data) + if opts.command == 'download-order': + res = peer.call_downloadOrder(data) + elif opts.command == 'confirm-order': + res = peer.call_confirmOrder(data) + elif opts.command == 'cancel-order': + res = peer.call_cancelOrder(data) + elif opts.command == 'release-profile': + res = peer.call_releaseProfile(data) diff --git a/contrib/jenkins.sh b/contrib/jenkins.sh index 3f32e9e..a5993da 100755 --- a/contrib/jenkins.sh +++ b/contrib/jenkins.sh @@ -45,7 +45,8 @@ case "$JOB_TYPE" in --disable E1102 \ --disable E0401 \ --enable W0301 \ - pySim tests/*.py *.py + pySim tests/*.py *.py \ + contrib/es2p_client.py ;; "docs") rm -rf docs/_build diff --git a/pySim/esim/__init__.py b/pySim/esim/__init__.py index dfacb83..1f7ea16 100644 --- a/pySim/esim/__init__.py +++ b/pySim/esim/__init__.py @@ -2,10 +2,10 @@ import sys from typing import Optional from importlib import resources -import asn1tools def compile_asn1_subdir(subdir_name:str): """Helper function that compiles ASN.1 syntax from all files within given subdir""" + import asn1tools asn_txt = '' __ver = sys.version_info if (__ver.major, __ver.minor) >= (3, 9): diff --git a/pySim/esim/es2p.py b/pySim/esim/es2p.py new file mode 100644 index 0000000..32d1ef8 --- /dev/null +++ b/pySim/esim/es2p.py @@ -0,0 +1,458 @@ +"""GSMA eSIM RSP ES2+ interface according to SGP.22 v2.5""" + +# (C) 2024 by Harald Welte <laforge@osmocom.org> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import abc +import requests +import logging +import json +from datetime import datetime +import time +import base64 + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +class ApiParam(abc.ABC): + """A class reprsenting a single parameter in the ES2+ API.""" + @classmethod + def verify_decoded(cls, data): + """Verify the decoded reprsentation of a value. Should raise an exception if somthing is odd.""" + pass + + @classmethod + def verify_encoded(cls, data): + """Verify the encoded reprsentation of a value. Should raise an exception if somthing is odd.""" + pass + + @classmethod + def encode(cls, data): + """[Validate and] Encode the given value.""" + cls.verify_decoded(data) + encoded = cls._encode(data) + cls.verify_decoded(encoded) + return encoded + + @classmethod + def _encode(cls, data): + """encoder function, typically [but not always] overridden by derived class.""" + return data + + @classmethod + def decode(cls, data): + """[Validate and] Decode the given value.""" + cls.verify_encoded(data) + decoded = cls._decode(data) + cls.verify_decoded(decoded) + return decoded + + @classmethod + def _decode(cls, data): + """decoder function, typically [but not always] overridden by derived class.""" + return data + +class ApiParamString(ApiParam): + """Base class representing an API parameter of 'string' type.""" + pass + + +class ApiParamInteger(ApiParam): + """Base class representing an API parameter of 'integer' type.""" + @classmethod + def _decode(cls, data): + return int(data) + + @classmethod + def _encode(cls, data): + return str(data) + + @classmethod + def verify_decoded(cls, data): + if not isinstance(data, int): + raise TypeError('Expected an integer input data type') + + @classmethod + def verify_encoded(cls, data): + if not data.isdecimal(): + raise ValueError('integer (%s) contains non-decimal characters' % data) + assert str(int(data)) == data + +class ApiParamBoolean(ApiParam): + """Base class representing an API parameter of 'boolean' type.""" + @classmethod + def _encode(cls, data): + return bool(data) + +class ApiParamFqdn(ApiParam): + """String, as a list of domain labels concatenated using the full stop (dot, period) character as + separator between labels. Labels are restricted to the Alphanumeric mode character set defined in table 5 + of ISO/IEC 18004""" + @classmethod + def verify_encoded(cls, data): + # FIXME + pass + +class param: + class Iccid(ApiParamString): + """String representation of 19 or 20 digits, where the 20th digit MAY optionally be the padding + character F.""" + @classmethod + def _encode(cls, data): + data = str(data) + # SGP.22 version prior to 2.2 do not require support for 19-digit ICCIDs, so let's always + # encode it with padding F at the end. + if len(data) == 19: + data += 'F' + return data + + @classmethod + def verify_encoded(cls, data): + if len(data) not in [19, 20]: + raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) + + @classmethod + def _decode(cls, data): + # strip trailing padding (if it's 20 digits) + if len(data) == 20 and data[-1] in ['F', 'f']: + data = data[:-1] + return data + + @classmethod + def verify_decoded(cls, data): + data = str(data) + if len(data) not in [19, 20]: + raise ValueError('ICCID (%s) length (%u) invalid' % (data, len(data))) + if len(data) == 19: + decimal_part = data + else: + decimal_part = data[:-1] + final_part = data[-1:] + if final_part not in ['F', 'f'] and not final_part.isdecimal(): + raise ValueError('ICCID (%s) contains non-decimal characters' % data) + if not decimal_part.isdecimal(): + raise ValueError('ICCID (%s) contains non-decimal characters' % data) + + + class Eid(ApiParamString): + """String of 32 decimal characters""" + @classmethod + def verify_encoded(cls, data): + if len(data) != 32: + raise ValueError('EID length invalid: "%s" (%u)' % (data, len(data))) + + @classmethod + def verify_decoded(cls, data): + if not data.isdecimal(): + raise ValueError('EID (%s) contains non-decimal characters' % data) + + class ProfileType(ApiParamString): + pass + + class MatchingId(ApiParamString): + pass + + class ConfirmationCode(ApiParamString): + pass + + class SmdsAddress(ApiParamFqdn): + pass + + class SmdpAddress(ApiParamFqdn): + pass + + class ReleaseFlag(ApiParamBoolean): + pass + + class FinalProfileStatusIndicator(ApiParamString): + pass + + class Timestamp(ApiParamString): + """String format as specified by W3C: YYYY-MM-DDThh:mm:ssTZD""" + @classmethod + def _decode(cls, data): + return datetime.fromisoformat(data) + + @classmethod + def _encode(cls, data): + return datetime.toisoformat(data) + + class NotificationPointId(ApiParamInteger): + pass + + class NotificationPointStatus(ApiParam): + pass + + class ResultData(ApiParam): + @classmethod + def _decode(cls, data): + return base64.b64decode(data) + + @classmethod + def _encode(cls, data): + return base64.b64encode(data) + + class JsonResponseHeader(ApiParam): + """SGP.22 section 6.5.1.4.""" + @classmethod + def verify_decoded(cls, data): + fe_status = data.get('functionExecutionStatus') + if not fe_status: + raise ValueError('Missing mandatory functionExecutionStatus in header') + status = fe_status.get('status') + if not status: + raise ValueError('Missing mandatory status in header functionExecutionStatus') + if status not in ['Executed-Success', 'Executed-WithWarning', 'Failed', 'Expired']: + raise ValueError('Unknown/unspecified status "%s"' % status) + + +class HttpStatusError(Exception): + pass + +class HttpHeaderError(Exception): + pass + +class Es2PlusApiError(Exception): + """Exception representing an error at the ES2+ API level (status != Executed).""" + def __init__(self, func_ex_status: dict): + self.status = func_ex_status['status'] + sec = { + 'subjectCode': None, + 'reasonCode': None, + 'subjectIdentifier': None, + 'message': None, + } + actual_sec = func_ex_status.get('statusCodeData', None) + sec.update(actual_sec) + self.subject_code = sec['subjectCode'] + self.reason_code = sec['reasonCode'] + self.subject_id = sec['subjectIdentifier'] + self.message = sec['message'] + + def __str__(self): + return f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")' + +class Es2PlusApiFunction(abc.ABC): + """Base classs for representing an ES2+ API Function.""" + # the below class variables are expected to be overridden in derived classes + + path = None + # dictionary of input parameters. key is parameter name, value is ApiParam class + input_params = {} + # list of mandatory input parameters + input_mandatory = [] + # dictionary of output parameters. key is parameter name, value is ApiParam class + output_params = {} + # list of mandatory output parameters (for successful response) + output_mandatory = [] + # expected HTTP status code of the response + expected_http_status = 200 + + def __init__(self, url_prefix: str, func_req_id: str, session): + self.url_prefix = url_prefix + self.func_req_id = func_req_id + self.session = session + + def encode(self, data: dict, func_call_id: str) -> dict: + """Validate an encode input dict into JSON-serializable dict for request body.""" + output = { + 'header': { + 'functionRequesterIdentifier': self.func_req_id, + 'functionCallIdentifier': func_call_id + } + } + for p in self.input_mandatory: + if not p in data: + raise ValueError('Mandatory input parameter %s missing' % p) + for p, v in data.items(): + p_class = self.input_params.get(p) + if not p_class: + logger.warning('Unexpected/unsupported input parameter %s=%s', p, v) + output[p] = v + else: + output[p] = p_class.encode(v) + return output + + + def decode(self, data: dict) -> dict: + """[further] Decode and validate the JSON-Dict of the respnse body.""" + output = {} + # let's first do the header, it's special + if not 'header' in data: + raise ValueError('Mandatory output parameter "header" missing') + hdr_class = self.output_params.get('header') + output['header'] = hdr_class.decode(data['header']) + + if output['header']['functionExecutionStatus']['status'] not in ['Executed-Success','Executed-WithWarning']: + raise Es2PlusApiError(output['header']['functionExecutionStatus']) + # we can only expect mandatory parameters to be present in case of successful execution + for p in self.output_mandatory: + if p == 'header': + continue + if not p in data: + raise ValueError('Mandatory output parameter "%s" missing' % p) + for p, v in data.items(): + p_class = self.output_params.get(p) + if not p_class: + logger.warning('Unexpected/unsupported output parameter "%s"="%s"', p, v) + output[p] = v + else: + output[p] = p_class.decode(v) + return output + + def call(self, data: dict, func_call_id:str, timeout=10) -> dict: + """Make an API call to the ES2+ API endpoint represented by this object. + Input data is passed in `data` as json-serializable dict. Output data + is returned as json-deserialized dict.""" + url = self.url_prefix + self.path + encoded = json.dumps(self.encode(data, func_call_id)) + headers = { + 'Content-Type': 'application/json', + 'X-Admin-Protocol': 'gsma/rsp/v2.5.0', + } + + logger.debug("HTTP REQ %s - '%s'" % (url, encoded)) + response = self.session.post(url, data=encoded, headers=headers, timeout=timeout) + logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code, response.headers)) + logger.debug("HTTP RSP: %s" % (response.content)) + + if response.status_code != self.expected_http_status: + raise HttpStatusError(response) + if not response.headers.get('Content-Type').startswith(headers['Content-Type']): + raise HttpHeaderError(response) + if not response.headers.get('X-Admin-Protocol', 'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'): + raise HttpHeaderError(response) + + return self.decode(response.json()) + + +# ES2+ DownloadOrder function (SGP.22 section 5.3.1) +class DownloadOrder(Es2PlusApiFunction): + path = '/gsma/rsp2/es2plus/downloadOrder' + input_params = { + 'eid': param.Eid, + 'iccid': param.Iccid, + 'profileType': param.ProfileType + } + output_params = { + 'header': param.JsonResponseHeader, + 'iccid': param.Iccid, + } + output_mandatory = ['header', 'iccid'] + +# ES2+ ConfirmOrder function (SGP.22 section 5.3.2) +class ConfirmOrder(Es2PlusApiFunction): + path = '/gsma/rsp2/es2plus/confirmOrder' + input_params = { + 'iccid': param.Iccid, + 'eid': param.Eid, + 'matchingId': param.MatchingId, + 'confirmationCode': param.ConfirmationCode, + 'smdsAddress': param.SmdsAddress, + 'releaseFlag': param.ReleaseFlag, + } + input_mandatory = ['iccid', 'releaseFlag'] + output_params = { + 'header': param.JsonResponseHeader, + 'eid': param.Eid, + 'matchingId': param.MatchingId, + 'smdpAddress': param.SmdpAddress, + } + output_mandatory = ['header', 'matchingId'] + +# ES2+ CancelOrder function (SGP.22 section 5.3.3) +class CancelOrder(Es2PlusApiFunction): + path = '/gsma/rsp2/es2plus/cancelOrder' + input_params = { + 'iccid': param.Iccid, + 'eid': param.Eid, + 'matchingId': param.MatchingId, + 'finalProfileStatusIndicator': param.FinalProfileStatusIndicator, + } + input_mandatory = ['finalProfileStatusIndicator', 'iccid'] + output_params = { + 'header': param.JsonResponseHeader, + } + output_mandatory = ['header'] + +# ES2+ ReleaseProfile function (SGP.22 section 5.3.4) +class ReleaseProfile(Es2PlusApiFunction): + path = '/gsma/rsp2/es2plus/releaseProfile' + input_params = { + 'iccid': param.Iccid, + } + input_mandatory = ['iccid'] + output_params = { + 'header': param.JsonResponseHeader, + } + output_mandatory = ['header'] + +# ES2+ HandleDownloadProgress function (SGP.22 section 5.3.5) +class HandleDownloadProgressInfo(Es2PlusApiFunction): + path = '/gsma/rsp2/es2plus/handleDownloadProgressInfo' + input_params = { + 'eid': param.Eid, + 'iccid': param.Iccid, + 'profileType': param.ProfileType, + 'timestamp': param.Timestamp, + 'notificationPointId': param.NotificationPointId, + 'notificationPointStatus': param.NotificationPointStatus, + 'resultData': param.ResultData, + } + input_mandatory = ['iccid', 'profileType', 'timestamp', 'notificationPointId', 'notificationPointStatus'] + expected_http_status = 204 + + +class Es2pApiClient: + """Main class representing a full ES2+ API client. Has one method for each API function.""" + def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None, client_cert: str = None): + self.func_id = 0 + self.session = requests.Session() + if server_cert_verify: + self.session.verify = server_cert_verify + if client_cert: + self.session.cert = client_cert + + self.downloadOrder = DownloadOrder(url_prefix, func_req_id, self.session) + self.confirmOrder = ConfirmOrder(url_prefix, func_req_id, self.session) + self.cancelOrder = CancelOrder(url_prefix, func_req_id, self.session) + self.releaseProfile = ReleaseProfile(url_prefix, func_req_id, self.session) + self.handleDownloadProgressInfo = HandleDownloadProgressInfo(url_prefix, func_req_id, self.session) + + def _gen_func_id(self) -> str: + """Generate the next function call id.""" + self.func_id += 1 + return 'FCI-%u-%u' % (time.time(), self.func_id) + + + def call_downloadOrder(self, data: dict) -> dict: + """Perform ES2+ DownloadOrder function (SGP.22 section 5.3.1).""" + return self.downloadOrder.call(data, self._gen_func_id()) + + def call_confirmOrder(self, data: dict) -> dict: + """Perform ES2+ ConfirmOrder function (SGP.22 section 5.3.2).""" + return self.confirmOrder.call(data, self._gen_func_id()) + + def call_cancelOrder(self, data: dict) -> dict: + """Perform ES2+ CancelOrder function (SGP.22 section 5.3.3).""" + return self.cancelOrder.call(data, self._gen_func_id()) + + def call_releaseProfile(self, data: dict) -> dict: + """Perform ES2+ CancelOrder function (SGP.22 section 5.3.4).""" + return self.releaseProfile.call(data, self._gen_func_id()) + + def call_handleDownloadProgressInfo(self, data: dict) -> dict: + """Perform ES2+ HandleDownloadProgressInfo function (SGP.22 section 5.3.5).""" + return self.handleDownloadProgressInfo.call(data, self._gen_func_id()) |