aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Welte <laforge@osmocom.org>2024-01-10 19:59:04 +0100
committerHarald Welte <laforge@osmocom.org>2024-01-10 19:59:04 +0100
commitbef85dbc281cf33490717079e86aa7b96aba72d1 (patch)
tree9f369219a4e1574355e4a19fe97fd815c05561e8
parentd1cc8d0c1d02bb87c9073d621e6ffc63fb08dfd8 (diff)
WIP: osmo-smdpp ES9+ support for ASN.1 endpointlaforge/smdp_asn1
-rwxr-xr-xosmo-smdpp.py178
1 files changed, 149 insertions, 29 deletions
diff --git a/osmo-smdpp.py b/osmo-smdpp.py
index 6ae5be8..6e370fd 100755
--- a/osmo-smdpp.py
+++ b/osmo-smdpp.py
@@ -243,6 +243,36 @@ class SmDppHttpServer:
return False
@staticmethod
+ def b64decode_members(indict: Dict, keys: List[str]):
+ """base64-decoder all members of 'indict' whose key is in 'keys'."""
+ for key in keys:
+ if key in indict:
+ indict[key] = b64decode(indict[key])
+
+ @staticmethod
+ def b64encode_members(indict: Dict, keys: List[str]):
+ """base64-encoder (to string!) all members of 'indict' whose key is in 'keys'."""
+ for key in keys:
+ if key in indict:
+ indict[key] = b64encode2str(indict[key])
+
+ @staticmethod
+ def asn1decode_member(indict: Dict, key: str, typename: Optional[str]):
+ """decode indict[key] using RSP ASN.1 decoder for type 'typename'."""
+ if not typename:
+ typename = capitalize_first_char(key)
+ if key in indict:
+ indict[key] = rsp.asn1.decode(typename, indict[key])
+
+ @staticmethod
+ def asn1encode_member(indict: Dict, key: str, typename: Optional[str]):
+ """encode indict[key] using RSP ASN.1 decoder for type 'typename'."""
+ if not typename:
+ typename = capitalize_first_char(key)
+ if key in indict:
+ indict[key] = rsp.asn1.encode(typename, indict[key])
+
+ @staticmethod
def rsp_api_wrapper(func):
"""Wrapper that can be used as decorator in order to perform common REST API endpoint entry/exit
functionality, such as JSON decoding/encoding and debug-printing."""
@@ -262,8 +292,6 @@ class SmDppHttpServer:
return json.dumps(output)
return _api_wrapper
- @app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST'])
- @rsp_api_wrapper
def initiateAutentication(self, request: IRequest, content: dict) -> dict:
"""See ES9+ InitiateAuthentication SGP.22 Section 5.6.1"""
# Verify that the received address matches its own SM-DP+ address, where the comparison SHALL be
@@ -271,12 +299,11 @@ class SmDppHttpServer:
if content['smdpAddress'] != self.server_hostname:
raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address')
- euiccChallenge = b64decode(content['euiccChallenge'])
+ euiccChallenge = content['euiccChallenge']
if len(euiccChallenge) != 16:
raise ValueError
- euiccInfo1_bin = b64decode(content['euiccInfo1'])
- euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin)
+ euiccInfo1 = content['euiccInfo1']
print("Rx euiccInfo1: %s" % euiccInfo1)
#euiccInfo1['svn']
@@ -315,32 +342,46 @@ class SmDppHttpServer:
serverSigned1_bin = rsp.asn1.encode('ServerSigned1', serverSigned1)
print("Tx serverSigned1: %s" % rsp.asn1.decode('ServerSigned1', serverSigned1_bin))
output = {}
- output['serverSigned1'] = b64encode2str(serverSigned1_bin)
+ output['serverSigned1'] = serverSigned1
# Generate a signature (serverSignature1) as described in section 5.7.13 "ES10b.AuthenticateServer" using the SK related to the selected CERT.DPauth.SIG.
# serverSignature1 SHALL be created using the private key associated to the RSP Server Certificate for authentication, and verified by the eUICC using the contained public key as described in section 2.6.9. serverSignature1 SHALL apply on serverSigned1 data object.
- output['serverSignature1'] = b64encode2str(b'\x5f\x37\x40' + self.dp_auth.ecdsa_sign(serverSigned1_bin))
+ output['serverSignature1'] = b'\x5f\x37\x40' + self.dp_auth.ecdsa_sign(serverSigned1_bin)
output['transactionId'] = transactionId
server_cert_aki = self.dp_auth.get_authority_key_identifier()
- output['euiccCiPKIdToBeUsed'] = b64encode2str(b'\x04\x14' + server_cert_aki.key_identifier)
- output['serverCertificate'] = b64encode2str(self.dp_auth.get_cert_as_der()) # CERT.DPauth.SIG
+ output['euiccCiPKIdToBeUsed'] = b'\x04\x14' + server_cert_aki.key_identifier
+ output['serverCertificate'] = self.dp_auth.get_cert_as_der() # CERT.DPauth.SIG
# FIXME: add those certificate
- #output['otherCertsInChain'] = b64encode2str()
+ #output['otherCertsInChain'] = ...
# create SessionState and store it in rss
self.rss[transactionId] = rsp.RspSessionState(transactionId, serverChallenge)
return output
- @app.route('/gsma/rsp2/es9plus/authenticateClient', methods=['POST'])
+ @app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST'])
@rsp_api_wrapper
+ def json_initiateAuthentication(self, request: IRequest, content: dict):
+ """Transform from JSON binding to generic function and back."""
+ # convert from JSON/BASE64 to decoded ASN.1
+ b64decode_members(content, ['euiccChallenge', 'euiccInfo1', 'lpaRspCapability'])
+ asn1decode_member(content, 'eUICCInfo1')
+
+ # do the actual processing
+ output = self.initiateAuthentication(request, content)
+
+ # convert from decoded ASN.1 to base64 to JSON
+ asn1encode_member(output, 'serverSigned1')
+ b64encode_members(output, ['serverSigned1', 'serverSignature1', 'euiccCiPKIdToBeUsed',
+ 'serverCertificate', 'otherCertsInChain', 'crlList'])
+ return output
+
def authenticateClient(self, request: IRequest, content: dict) -> dict:
"""See ES9+ AuthenticateClient in SGP.22 Section 5.6.3"""
transactionId = content['transactionId']
- authenticateServerResp_bin = b64decode(content['authenticateServerResponse'])
- authenticateServerResp = rsp.asn1.decode('AuthenticateServerResponse', authenticateServerResp_bin)
+ authenticateServerResp = content['authenticateServerResponse']
print("Rx %s: %s" % authenticateServerResp)
if authenticateServerResp[0] == 'authenticateResponseError':
r_err = authenticateServerResp[1]
@@ -416,14 +457,26 @@ class SmDppHttpServer:
self.rss[transactionId] = ss
return {
'transactionId': transactionId,
- 'profileMetadata': b64encode2str(profileMetadata_bin),
- 'smdpSigned2': b64encode2str(smdpSigned2_bin),
- 'smdpSignature2': b64encode2str(ss.smdpSignature2_do),
- 'smdpCertificate': b64encode2str(self.dp_pb.get_cert_as_der()), # CERT.DPpb.SIG
+ 'profileMetadata': profileMetadata,
+ 'smdpSigned2': smdpSigned2,
+ 'smdpSignature2': ss.smdpSignature2_do,
+ 'smdpCertificate': self.dp_pb.get_cert_as_der(), # CERT.DPpb.SIG
}
- @app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
+ @app.route('/gsma/rsp2/es9plus/authenticateClient', methods=['POST'])
@rsp_api_wrapper
+ def json_authenticateClient(self, request: IRequest, content: dict):
+ """Transform from JSON binding to generic function and back."""
+ b64decode_members(content, ['authenticateServerResponse', 'deleteNotifciationForDc'])
+ asn1decode_member(content, 'authenticateServerResponse')
+
+ output = self.authenticateClient(requeset, content)
+
+ asn1encode_member(output, 'smdpSigned2')
+ b64encode_members(output, ['profileMetadata', 'smdpSigned2', 'smdpSignature2', 'smdpCertificate'])
+ return output
+
+
def getBoundProfilePackage(self, request: IRequest, content: dict) -> dict:
"""See ES9+ GetBoundProfilePackage SGP.22 Section 5.6.2"""
transactionId = content['transactionId']
@@ -433,8 +486,7 @@ class SmDppHttpServer:
if not ss:
raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
- prepDownloadResp_bin = b64decode(content['prepareDownloadResponse'])
- prepDownloadResp = rsp.asn1.decode('PrepareDownloadResponse', prepDownloadResp_bin)
+ prepDownloadResp = content['prepareDownloadResponse']
print("Rx %s: %s" % prepDownloadResp)
if prepDownloadResp[0] == 'downloadResponseError':
@@ -496,15 +548,26 @@ class SmDppHttpServer:
self.rss[transactionId] = ss
return {
'transactionId': transactionId,
- 'boundProfilePackage': b64encode2str(bpp.encode(ss, self.dp_pb)),
+ 'boundProfilePackage': bpp.encode(ss, self.dp_pb),
}
- @app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
+
+ @app.route('/gsma/rsp2/es9plus/getBoundProfilePackage', methods=['POST'])
@rsp_api_wrapper
+ def json_getBoundProfilePackage(self, request: IRequest, content: dict):
+ """Transform from JSON binding to generic function and back."""
+ b64decode_members(content, ['prepareDownloadResponse'])
+ asn1decode_member(content, 'prepareDownlaodResponse')
+
+ output = self.getBoundProfilePackage(request, content)
+
+ asn1encode_member(output, 'boundProfilePackage')
+ b64encode_members(output, ['boundProfilePackage'])
+ return output
+
def handleNotification(self, request: IRequest, content: dict) -> dict:
"""See ES9+ HandleNotification in SGP.22 Section 5.6.4"""
- pendingNotification_bin = b64decode(content['pendingNotification'])
- pendingNotification = rsp.asn1.decode('PendingNotification', pendingNotification_bin)
+ pendingNotification = content['pendingNotification']
print("Rx %s: %s" % pendingNotification)
if pendingNotification[0] == 'profileInstallationResult':
profileInstallRes = pendingNotification[1]
@@ -528,17 +591,27 @@ class SmDppHttpServer:
pass
else:
raise ValueError(pendingNotification)
+ return None
+
+ @app.route('/gsma/rsp2/es9plus/handleNotification', methods=['POST'])
+ @rsp_api_wrapper
+ def json_handleNotification(self, request: IRequest, content: dict):
+ """Transform from JSON binding to generic function and back."""
+ b64decode_members(content, ['pendingNotification'])
+ asn1decode_member(content, 'pendingNotification')
+
+ output = self.handleNotification(request, content)
+
+ return output
+
#@app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']')
#@rsp_api_wrapper
#"""See ES9+ ConfirmDeviceChange in SGP.22 Section 5.6.6"""
# TODO: implement this
- @app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
- @rsp_api_wrapper
def cancelSession(self, request: IRequest, content: dict) -> dict:
"""See ES9+ CancelSession in SGP.22 Section 5.6.5"""
- print("Rx JSON: %s" % content)
transactionId = content['transactionId']
# Verify that the received transactionId is known and relates to an ongoing RSP session
@@ -546,8 +619,7 @@ class SmDppHttpServer:
if ss is None:
raise ApiError('8.10.1', '3.9', 'The RSP session identified by the transactionId is unknown')
- cancelSessionResponse_bin = b64decode(content['cancelSessionResponse'])
- cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin)
+ cancelSessionResponse = content['cancelSessionResponse']
print("Rx %s: %s" % cancelSessionResponse)
if cancelSessionResponse[0] == 'cancelSessionResponseError':
@@ -577,6 +649,54 @@ class SmDppHttpServer:
del self.rss[transactionId]
return { 'transactionId': transactionId }
+ @app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST'])
+ @rsp_api_wrapper
+ def json_cancelSessionn(self, request: IRequest, content: dict) -> dict:
+ """Transform from JSON binding to generic function and back."""
+ b64decode_members(content, ['cancelSessionResponse'])
+ asn1decode_member(content, 'cancelSessionResponse')
+
+ output = self.cancelSession(request, content)
+
+ return output
+
+
+ @app.route('/gsma/rsp2/asn1', methods=['POST'])
+ def asn1(self, request: IRequest) -> dict:
+ # TODO: evaluate User-Agent + X-Admin-Protocol header
+ # TODO: reject any non-ASN.1 Content-type
+
+ content = rsp.asn1.decode('RemoteProfileProvisioningRequest', request.content.read())
+ print("Rx ASN.1: %s" % content)
+ request.setHeader('Content-Type', 'application/x-gsma-rsp-asn1')
+ request.setHeader('X-Admin-Protocol', 'gsma/rsp/v2.1.0')
+
+ operation = content[0]
+ if operation == 'initiateAuthenticationRequest':
+ method = self.initiateAuthentication
+ ochoice = 'initiateAuthenticationResponse'
+ elif operation == 'authenticateClientRequest':
+ method = self.authenticateClient
+ ochoice = 'authenticateClientResponseEs9'
+ elif operation == 'getBoundProfilePackageRequest':
+ method = self.getBoundProfilePackage
+ ochoice = 'getBoundProfilePackageResponse'
+ elif operation == 'cancelSessionRequestEs9':
+ method = self.cancelSession
+ ochoice = 'cancelSessionResponseEs9'
+ elif operation == 'handleNotification':
+ method = self.handleNotification
+ ochoice = None
+
+ output = method(self, request, content[1])
+
+ if ochoice:
+ output = (ochoice, output)
+ print("Tx ASN.1: %s" % output)
+ return rsp.asn1.encode('RemoteProfileProvisioningResponse', output)
+ else:
+ return None
+
def main(argv):
parser = argparse.ArgumentParser()