laforge has uploaded this change for review. ( https://gerrit.osmocom.org/c/pysim/+/36973?usp=email )
Change subject: add pySim.esim.es9p with definitions of the ES9+ HTTP Interface ......................................................................
add pySim.esim.es9p with definitions of the ES9+ HTTP Interface
Let's use the infrastructure of pySim.esim.http_json_api to define the ES9+ API Functions. This can in turn be used by clients or even osmo-smdpp can be ported over to using this infratructure rather than open-coding a lot of the encoding/decoding of API request/response parameters.
Change-Id: I194ef1d186391f36245c099cc70a4813185ecf9c --- A pySim/esim/es9p.py 1 file changed, 194 insertions(+), 0 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/73/36973/1
diff --git a/pySim/esim/es9p.py b/pySim/esim/es9p.py new file mode 100644 index 0000000..f936a9c --- /dev/null +++ b/pySim/esim/es9p.py @@ -0,0 +1,179 @@ +"""GSMA eSIM RSP ES9+ interface according ot SGP.22 v2.5""" + +# (C) 2024 by Harald Welte laforge@osmocom.org +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import abc +import requests +import logging +import json +from datetime import datetime +import time +import base64 + +import pySim.esim.rsp as rsp +from pySim.esim.http_json_api import * + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +class param: + class RspAsn1Par(ApiParamBase64): + """Generalized RSP ASN.1 parameter: base64-wrapped ASN.1 DER. Derived classes must provie + the asn1_type class variable to indicate the name of the ASN.1 type to use for encode/decode.""" + @classmethod + def _decode(cls, data): + data = ApiParamBase64.decode(data) + return rsp.asn1.decode(cls.asn1_type, data) + + @classmethod + def _encode(cls, data): + data = rsp.asn1.encode(cls.asn1_type, data) + return ApiParamBase64.encode(data) + + class EuiccInfo1(RspAsn1Par): + asn1_type = 'EUICCInfo1' + + class ServerSigned1(RspAsn1Par): + asn1_type = 'ServerSigned1' + + class PrepareDownloadResponse(RspAsn1Par): + asn1_type = 'PrepareDownloadResponse' + + class AuthenticateServerResponse(RspAsn1Par): + asn1_type = 'AuthenticateServerResponse' + + class SmdpSigned2(RspAsn1Par): + asn1_type = 'SmdpSigned2' + + class StoreMetadataRequest(RspAsn1Par): + asn1_type = 'StoreMetadataRequest' + + class PendingNotification(RspAsn1Par): + asn1_type = 'PendingNotification' + + class CancelSessionResponse(RspAsn1Par): + asn1_type = 'CancelSessionResponse' + + class TransactionId(ApiParamString): + pass + +class Es9PlusApiFunction(JsonHttpApiFunction): + pass + +# ES9+ InitiateAuthentication function (SGP.22 section 6.5.2.6) +class InitiateAuthentication(Es9PlusApiFunction): + path = '/gsma/rsp2/es9plus/initiateAuthentication' + input_params = { + 'euiccChallenge': ApiParamBase64, + 'euiccInfo1': param.EuiccInfo1, + 'smdpAddress': SmdpAddress, + } + input_mandatory = ['euiccChallenge', 'euiccInfo1', 'smdpAddress'] + output_params = { + 'header': JsonResponseHeader, + 'transactionId': param.TransactionId, + 'serverSigned1': param.ServerSigned1, + 'serverSignature1': ApiParamBase64, + 'euiccCiPKIdToBeUsed': ApiParamBase64, + 'serverCertificate': ApiParamBase64, + } + output_mandatory = ['header', 'transactionId', 'serverSigned1', 'serverSignature1', + 'euiccCiPKIdToBeUsed', 'serverCertificate'] + +# ES9+ GetBoundProfilePackage function (SGP.22 section 6.5.2.7) +class GetBoundProfilePackage(Es9PlusApiFunction): + path = '/gsma/rsp2/es9plus/getBoundProfilePackage' + input_params = { + 'transactionId': param.TransactionId, + 'prepareDownloadResponse': param.PrepareDownloadResponse, + } + input_mandatory = ['transactionId', 'prepareDownloadResponse'] + output_params = { + 'header': JsonResponseHeader, + 'transactionId': param.TransactionId, + 'boundProfilePackage': ApiParamBase64, + } + output_mandatory = ['header', 'transactionId', 'boundProfilePackage'] + +# ES9+ AuthenticateClient function (SGP.22 section 6.5.2.8) +class AuthenticateClient(Es9PlusApiFunction): + path= '/gsma/rsp2/es9plus/authenticateClient' + input_params = { + 'transactionId': param.TransactionId, + 'authenticateServerResponse': param.AuthenticateServerResponse, + } + input_mandatory = ['transactionId', 'authenticateServerResponse'] + output_params = { + 'header': JsonResponseHeader, + 'transactionId': param.TransactionId, + 'profileMetadata': param.StoreMetadataRequest, + 'smdpSigned2': param.SmdpSigned2, + 'smdpSignature2': ApiParamBase64, + 'smdpCertificate': ApiParamBase64, + } + output_mandatory = ['header', 'transactionId', 'profileMetadata', 'smdpSigned2', + 'smdpSignature2', 'smdpCertificate'] + +# ES9+ HandleNotification function (SGP.22 section 6.5.2.9) +class HandleNotification(Es9PlusApiFunction): + path = '/gsma/rsp2/es9plus/handleNotification' + input_params = { + 'pendingNotification': param.PendingNotification, + } + input_mandatory = ['pendingNotification'] + +# ES9+ CancelSession function (SGP.22 section 6.5.2.10) +class CancelSession(Es9PlusApiFunction): + path = '/gsma/rsp2/es9plus/cancelSession' + input_params = { + 'transactionId': param.TransactionId, + 'cancelSessionResponse': param.CancelSessionResponse, + } + input_mandatory = ['transactionId', 'cancelSessionResponse'] + +class Es9pApiClient: + def __init__(self, url_prefix:str, func_req_id:str, server_cert_verify: str = None): + self.func_id = 0 + self.session = requests.Session() + self.session.verify = False # FIXME HACK + if server_cert_verify: + self.session.verify = server_cert_verify + + self.initiateAuthentication = InitiateAuthentication(url_prefix, func_req_id, self.session) + self.authenticateClient = AuthenticateClient(url_prefix, func_req_id, self.session) + self.getBoundProfilePackage = GetBoundProfilePackage(url_prefix, func_req_id, self.session) + self.handleNotification = HandleNotification(url_prefix, func_req_id, self.session) + self.cancelSession = CancelSession(url_prefix, func_req_id, self.session) + + def _gen_func_id(self) -> str: + """Generate the next function call id.""" + self.func_id += 1 + return 'FCI-%u-%u' % (time.time(), self.func_id) + + def call_initiateAuthentication(self, data: dict) -> dict: + return self.initiateAuthentication.call(data, self._gen_func_id()) + + def call_authenticateClient(self, data: dict) -> dict: + return self.authenticateClient.call(data, self._gen_func_id()) + + def call_getBoundProfilePackage(self, data: dict) -> dict: + return self.getBoundProfilePackage.call(data, self._gen_func_id()) + + def call_handleNotification(self, data: dict) -> dict: + return self.handleNotification.call(data, self._gen_func_id()) + + def call_cancelSession(self, data: dict) -> dict: + return self.cancelSession.call(data, self._gen_func_id())