laforge has uploaded this change for review. (
https://gerrit.osmocom.org/c/pysim/+/36972?usp=email )
Change subject: esim.es2p: Split generic part of HTTP/REST API from ES2+
......................................................................
esim.es2p: Split generic part of HTTP/REST API from ES2+
This way we can reuse it for other eSIM RSP HTTP interfaces like
ES9+, ES11, ...
Change-Id: I468041da40a88875e8df15b04d3ad508e06f16f7
---
M pySim/esim/es2p.py
A pySim/esim/http_json_api.py
2 files changed, 276 insertions(+), 230 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/72/36972/1
diff --git a/pySim/esim/es2p.py b/pySim/esim/es2p.py
index fa21d2c..b026f9e 100644
--- a/pySim/esim/es2p.py
+++ b/pySim/esim/es2p.py
@@ -23,90 +23,11 @@
import time
import base64
+from pySim.esim.http_json_api import *
+
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
-class ApiParam(abc.ABC):
- """A class reprsenting a single parameter in the ES2+
API."""
- @classmethod
- def verify_decoded(cls, data):
- """Verify the decoded reprsentation of a value. Should raise an
exception if somthing is odd."""
- pass
-
- @classmethod
- def verify_encoded(cls, data):
- """Verify the encoded reprsentation of a value. Should raise an
exception if somthing is odd."""
- pass
-
- @classmethod
- def encode(cls, data):
- """[Validate and] Encode the given value."""
- cls.verify_decoded(data)
- encoded = cls._encode(data)
- cls.verify_decoded(encoded)
- return encoded
-
- @classmethod
- def _encode(cls, data):
- """encoder function, typically [but not always] overridden by
derived class."""
- return data
-
- @classmethod
- def decode(cls, data):
- """[Validate and] Decode the given value."""
- cls.verify_encoded(data)
- decoded = cls._decode(data)
- cls.verify_decoded(decoded)
- return decoded
-
- @classmethod
- def _decode(cls, data):
- """decoder function, typically [but not always] overridden by
derived class."""
- return data
-
-class ApiParamString(ApiParam):
- """Base class representing an API parameter of 'string'
type."""
- pass
-
-
-class ApiParamInteger(ApiParam):
- """Base class representing an API parameter of 'integer'
type."""
- @classmethod
- def _decode(cls, data):
- return int(data)
-
- @classmethod
- def _encode(cls, data):
- return str(data)
-
- @classmethod
- def verify_decoded(cls, data):
- if not isinstance(data, int):
- raise TypeError('Expected an integer input data type')
-
- @classmethod
- def verify_encoded(cls, data):
- if isinstance(data, int):
- return
- if not data.isdecimal():
- raise ValueError('integer (%s) contains non-decimal characters' %
data)
- assert str(int(data)) == data
-
-class ApiParamBoolean(ApiParam):
- """Base class representing an API parameter of 'boolean'
type."""
- @classmethod
- def _encode(cls, data):
- return bool(data)
-
-class ApiParamFqdn(ApiParam):
- """String, as a list of domain labels concatenated using the full stop
(dot, period) character as
- separator between labels. Labels are restricted to the Alphanumeric mode character
set defined in table 5
- of ISO/IEC 18004"""
- @classmethod
- def verify_encoded(cls, data):
- # FIXME
- pass
-
class param:
class Iccid(ApiParamString):
"""String representation of 19 or 20 digits, where the 20th digit
MAY optionally be the padding
@@ -172,9 +93,6 @@
class SmdsAddress(ApiParamFqdn):
pass
- class SmdpAddress(ApiParamFqdn):
- pass
-
class ReleaseFlag(ApiParamBoolean):
pass
@@ -197,150 +115,12 @@
class NotificationPointStatus(ApiParam):
pass
- class ResultData(ApiParam):
- @classmethod
- def _decode(cls, data):
- return base64.b64decode(data)
+ class ResultData(ApiParamBase64):
+ pass
- @classmethod
- def _encode(cls, data):
- return base64.b64encode(data)
-
- class JsonResponseHeader(ApiParam):
- """SGP.22 section 6.5.1.4."""
- @classmethod
- def verify_decoded(cls, data):
- fe_status = data.get('functionExecutionStatus')
- if not fe_status:
- raise ValueError('Missing mandatory functionExecutionStatus in
header')
- status = fe_status.get('status')
- if not status:
- raise ValueError('Missing mandatory status in header
functionExecutionStatus')
- if status not in ['Executed-Success', 'Executed-WithWarning',
'Failed', 'Expired']:
- raise ValueError('Unknown/unspecified status "%s"' %
status)
-
-
-class HttpStatusError(Exception):
- pass
-
-class HttpHeaderError(Exception):
- pass
-
-class Es2PlusApiError(Exception):
- """Exception representing an error at the ES2+ API level (status !=
Executed)."""
- def __init__(self, func_ex_status: dict):
- self.status = func_ex_status['status']
- sec = {
- 'subjectCode': None,
- 'reasonCode': None,
- 'subjectIdentifier': None,
- 'message': None,
- }
- actual_sec = func_ex_status.get('statusCodeData', None)
- sec.update(actual_sec)
- self.subject_code = sec['subjectCode']
- self.reason_code = sec['reasonCode']
- self.subject_id = sec['subjectIdentifier']
- self.message = sec['message']
-
- def __str__(self):
- return
f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
-
-class Es2PlusApiFunction(abc.ABC):
+class Es2PlusApiFunction(JsonHttpApiFunction):
"""Base classs for representing an ES2+ API
Function."""
- # the below class variables are expected to be overridden in derived classes
-
- path = None
- # dictionary of input parameters. key is parameter name, value is ApiParam class
- input_params = {}
- # list of mandatory input parameters
- input_mandatory = []
- # dictionary of output parameters. key is parameter name, value is ApiParam class
- output_params = {}
- # list of mandatory output parameters (for successful response)
- output_mandatory = []
- # expected HTTP status code of the response
- expected_http_status = 200
- # the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
- http_method = 'POST'
-
- def __init__(self, url_prefix: str, func_req_id: str, session):
- self.url_prefix = url_prefix
- self.func_req_id = func_req_id
- self.session = session
-
- def encode(self, data: dict, func_call_id: str) -> dict:
- """Validate an encode input dict into JSON-serializable dict for
request body."""
- output = {
- 'header': {
- 'functionRequesterIdentifier': self.func_req_id,
- 'functionCallIdentifier': func_call_id
- }
- }
- for p in self.input_mandatory:
- if not p in data:
- raise ValueError('Mandatory input parameter %s missing' % p)
- for p, v in data.items():
- p_class = self.input_params.get(p)
- if not p_class:
- logger.warning('Unexpected/unsupported input parameter %s=%s', p,
v)
- output[p] = v
- else:
- output[p] = p_class.encode(v)
- return output
-
-
- def decode(self, data: dict) -> dict:
- """[further] Decode and validate the JSON-Dict of the respnse
body."""
- output = {}
- # let's first do the header, it's special
- if not 'header' in data:
- raise ValueError('Mandatory output parameter "header"
missing')
- hdr_class = self.output_params.get('header')
- output['header'] = hdr_class.decode(data['header'])
-
- if output['header']['functionExecutionStatus']['status']
not in ['Executed-Success','Executed-WithWarning']:
- raise
Es2PlusApiError(output['header']['functionExecutionStatus'])
- # we can only expect mandatory parameters to be present in case of successful
execution
- for p in self.output_mandatory:
- if p == 'header':
- continue
- if not p in data:
- raise ValueError('Mandatory output parameter "%s"
missing' % p)
- for p, v in data.items():
- p_class = self.output_params.get(p)
- if not p_class:
- logger.warning('Unexpected/unsupported output parameter
"%s"="%s"', p, v)
- output[p] = v
- else:
- output[p] = p_class.decode(v)
- return output
-
- def call(self, data: dict, func_call_id:str, timeout=10) -> dict:
- """Make an API call to the ES2+ API endpoint represented by this
object.
- Input data is passed in `data` as json-serializable dict. Output data
- is returned as json-deserialized dict."""
- url = self.url_prefix + self.path
- encoded = json.dumps(self.encode(data, func_call_id))
- headers = {
- 'Content-Type': 'application/json',
- 'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
- }
-
- logger.debug("HTTP REQ %s - '%s'" % (url, encoded))
- response = self.session.request(self.http_method, url, data=encoded,
headers=headers, timeout=timeout)
- logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code,
response.headers))
- logger.debug("HTTP RSP: %s" % (response.content))
-
- if response.status_code != self.expected_http_status:
- raise HttpStatusError(response)
- if not
response.headers.get('Content-Type').startswith(headers['Content-Type']):
- raise HttpHeaderError(response)
- if not response.headers.get('X-Admin-Protocol',
'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
- raise HttpHeaderError(response)
-
- return self.decode(response.json())
-
+ pass
# ES2+ DownloadOrder function (SGP.22 section 5.3.1)
class DownloadOrder(Es2PlusApiFunction):
@@ -351,7 +131,7 @@
'profileType': param.ProfileType
}
output_params = {
- 'header': param.JsonResponseHeader,
+ 'header': JsonResponseHeader,
'iccid': param.Iccid,
}
output_mandatory = ['header', 'iccid']
@@ -369,7 +149,7 @@
}
input_mandatory = ['iccid', 'releaseFlag']
output_params = {
- 'header': param.JsonResponseHeader,
+ 'header': JsonResponseHeader,
'eid': param.Eid,
'matchingId': param.MatchingId,
'smdpAddress': param.SmdpAddress,
@@ -387,7 +167,7 @@
}
input_mandatory = ['finalProfileStatusIndicator', 'iccid']
output_params = {
- 'header': param.JsonResponseHeader,
+ 'header': JsonResponseHeader,
}
output_mandatory = ['header']
@@ -399,7 +179,7 @@
}
input_mandatory = ['iccid']
output_params = {
- 'header': param.JsonResponseHeader,
+ 'header': JsonResponseHeader,
}
output_mandatory = ['header']
diff --git a/pySim/esim/http_json_api.py b/pySim/esim/http_json_api.py
new file mode 100644
index 0000000..ec9cd00
--- /dev/null
+++ b/pySim/esim/http_json_api.py
@@ -0,0 +1,254 @@
+"""GSMA eSIM RSP HTTP/REST/JSON interface according to SGP.22
v2.5"""
+
+# (C) 2024 by Harald Welte <laforge(a)osmocom.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import abc
+import requests
+import logging
+import json
+from datetime import datetime
+import time
+import base64
+
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
+
+class ApiParam(abc.ABC):
+ """A class reprsenting a single parameter in the
API."""
+ @classmethod
+ def verify_decoded(cls, data):
+ """Verify the decoded reprsentation of a value. Should raise an
exception if somthing is odd."""
+ pass
+
+ @classmethod
+ def verify_encoded(cls, data):
+ """Verify the encoded reprsentation of a value. Should raise an
exception if somthing is odd."""
+ pass
+
+ @classmethod
+ def encode(cls, data):
+ """[Validate and] Encode the given value."""
+ cls.verify_decoded(data)
+ encoded = cls._encode(data)
+ cls.verify_decoded(encoded)
+ return encoded
+
+ @classmethod
+ def _encode(cls, data):
+ """encoder function, typically [but not always] overridden by
derived class."""
+ return data
+
+ @classmethod
+ def decode(cls, data):
+ """[Validate and] Decode the given value."""
+ cls.verify_encoded(data)
+ decoded = cls._decode(data)
+ cls.verify_decoded(decoded)
+ return decoded
+
+ @classmethod
+ def _decode(cls, data):
+ """decoder function, typically [but not always] overridden by
derived class."""
+ return data
+
+class ApiParamString(ApiParam):
+ """Base class representing an API parameter of 'string'
type."""
+ pass
+
+
+class ApiParamInteger(ApiParam):
+ """Base class representing an API parameter of 'integer'
type."""
+ @classmethod
+ def _decode(cls, data):
+ return int(data)
+
+ @classmethod
+ def _encode(cls, data):
+ return str(data)
+
+ @classmethod
+ def verify_decoded(cls, data):
+ if not isinstance(data, int):
+ raise TypeError('Expected an integer input data type')
+
+ @classmethod
+ def verify_encoded(cls, data):
+ if isinstance(data, int):
+ return
+ if not data.isdecimal():
+ raise ValueError('integer (%s) contains non-decimal characters' %
data)
+ assert str(int(data)) == data
+
+class ApiParamBoolean(ApiParam):
+ """Base class representing an API parameter of 'boolean'
type."""
+ @classmethod
+ def _encode(cls, data):
+ return bool(data)
+
+class ApiParamFqdn(ApiParam):
+ """String, as a list of domain labels concatenated using the full stop
(dot, period) character as
+ separator between labels. Labels are restricted to the Alphanumeric mode character
set defined in table 5
+ of ISO/IEC 18004"""
+ @classmethod
+ def verify_encoded(cls, data):
+ # FIXME
+ pass
+
+class ApiParamBase64(ApiParam):
+ @classmethod
+ def _decode(cls, data):
+ return base64.b64decode(data)
+
+ @classmethod
+ def _encode(cls, data):
+ return base64.b64encode(data).decode('ascii')
+
+class SmdpAddress(ApiParamFqdn):
+ pass
+
+class JsonResponseHeader(ApiParam):
+ """SGP.22 section 6.5.1.4."""
+ @classmethod
+ def verify_decoded(cls, data):
+ fe_status = data.get('functionExecutionStatus')
+ if not fe_status:
+ raise ValueError('Missing mandatory functionExecutionStatus in
header')
+ status = fe_status.get('status')
+ if not status:
+ raise ValueError('Missing mandatory status in header
functionExecutionStatus')
+ if status not in ['Executed-Success', 'Executed-WithWarning',
'Failed', 'Expired']:
+ raise ValueError('Unknown/unspecified status "%s"' %
status)
+
+
+class HttpStatusError(Exception):
+ pass
+
+class HttpHeaderError(Exception):
+ pass
+
+class ApiError(Exception):
+ """Exception representing an error at the API level (status !=
Executed)."""
+ def __init__(self, func_ex_status: dict):
+ self.status = func_ex_status['status']
+ sec = {
+ 'subjectCode': None,
+ 'reasonCode': None,
+ 'subjectIdentifier': None,
+ 'message': None,
+ }
+ actual_sec = func_ex_status.get('statusCodeData', None)
+ sec.update(actual_sec)
+ self.subject_code = sec['subjectCode']
+ self.reason_code = sec['reasonCode']
+ self.subject_id = sec['subjectIdentifier']
+ self.message = sec['message']
+
+ def __str__(self):
+ return
f'{self.status}("{self.subject_code}","{self.reason_code}","{self.subject_id}","{self.message}")'
+
+class JsonHttpApiFunction(abc.ABC):
+ """Base classs for representing an HTTP[s] API
Function."""
+ # the below class variables are expected to be overridden in derived classes
+
+ path = None
+ # dictionary of input parameters. key is parameter name, value is ApiParam class
+ input_params = {}
+ # list of mandatory input parameters
+ input_mandatory = []
+ # dictionary of output parameters. key is parameter name, value is ApiParam class
+ output_params = {}
+ # list of mandatory output parameters (for successful response)
+ output_mandatory = []
+ # expected HTTP status code of the response
+ expected_http_status = 200
+ # the HTTP method used (GET, OPTIONS, HEAD, POST, PUT, PATCH or DELETE)
+ http_method = 'POST'
+
+ def __init__(self, url_prefix: str, func_req_id: str, session: requests.Session):
+ self.url_prefix = url_prefix
+ self.func_req_id = func_req_id
+ self.session = session
+
+ def encode(self, data: dict, func_call_id: str) -> dict:
+ """Validate an encode input dict into JSON-serializable dict for
request body."""
+ output = {
+ 'header': {
+ 'functionRequesterIdentifier': self.func_req_id,
+ 'functionCallIdentifier': func_call_id
+ }
+ }
+ for p in self.input_mandatory:
+ if not p in data:
+ raise ValueError('Mandatory input parameter %s missing' % p)
+ for p, v in data.items():
+ p_class = self.input_params.get(p)
+ if not p_class:
+ logger.warning('Unexpected/unsupported input parameter %s=%s', p,
v)
+ output[p] = v
+ else:
+ output[p] = p_class.encode(v)
+ return output
+
+ def decode(self, data: dict) -> dict:
+ """[further] Decode and validate the JSON-Dict of the respnse
body."""
+ output = {}
+ # let's first do the header, it's special
+ if not 'header' in data:
+ raise ValueError('Mandatory output parameter "header"
missing')
+ hdr_class = self.output_params.get('header')
+ output['header'] = hdr_class.decode(data['header'])
+
+ if output['header']['functionExecutionStatus']['status']
not in ['Executed-Success','Executed-WithWarning']:
+ raise ApiError(output['header']['functionExecutionStatus'])
+ # we can only expect mandatory parameters to be present in case of successful
execution
+ for p in self.output_mandatory:
+ if p == 'header':
+ continue
+ if not p in data:
+ raise ValueError('Mandatory output parameter "%s"
missing' % p)
+ for p, v in data.items():
+ p_class = self.output_params.get(p)
+ if not p_class:
+ logger.warning('Unexpected/unsupported output parameter
"%s"="%s"', p, v)
+ output[p] = v
+ else:
+ output[p] = p_class.decode(v)
+ return output
+
+ def call(self, data: dict, func_call_id:str, timeout=10) -> dict:
+ """Make an API call to the HTTP API endpoint represented by this
object.
+ Input data is passed in `data` as json-serializable dict. Output data
+ is returned as json-deserialized dict."""
+ url = self.url_prefix + self.path
+ encoded = json.dumps(self.encode(data, func_call_id))
+ headers = {
+ 'Content-Type': 'application/json',
+ 'X-Admin-Protocol': 'gsma/rsp/v2.5.0',
+ }
+
+ logger.debug("HTTP REQ %s - '%s'" % (url, encoded))
+ response = self.session.request(self.http_method, url, data=encoded,
headers=headers, timeout=timeout)
+ logger.debug("HTTP RSP-STS: [%u] hdr: %s" % (response.status_code,
response.headers))
+ logger.debug("HTTP RSP: %s" % (response.content))
+
+ if response.status_code != self.expected_http_status:
+ raise HttpStatusError(response)
+ if not
response.headers.get('Content-Type').startswith(headers['Content-Type']):
+ raise HttpHeaderError(response)
+ if not response.headers.get('X-Admin-Protocol',
'gsma/rsp/v2.unknown').startswith('gsma/rsp/v2.'):
+ raise HttpHeaderError(response)
+
+ return self.decode(response.json())
--
To view, visit
https://gerrit.osmocom.org/c/pysim/+/36972?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings
Gerrit-Project: pysim
Gerrit-Branch: master
Gerrit-Change-Id: I468041da40a88875e8df15b04d3ad508e06f16f7
Gerrit-Change-Number: 36972
Gerrit-PatchSet: 1
Gerrit-Owner: laforge <laforge(a)osmocom.org>
Gerrit-MessageType: newchange