Hoernchen has uploaded this change for review. ( https://gerrit.osmocom.org/c/pysim/+/40880?usp=email )
Change subject: smdpp: cc support, proper checks, proper check order, test mode ......................................................................
smdpp: cc support, proper checks, proper check order, test mode
Change-Id: Ie4db65594b2eaf6c95ffc5e73ff9ae61c4d9d3a3
XX cc support
Change-Id: I7b22fe4eecb41d0f98e28a1d9390cd7fc8c5784c --- M osmo-smdpp.py M pySim/esim/x509_cert.py A pySim/esim/x509_err.py 3 files changed, 803 insertions(+), 78 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/80/40880/1
diff --git a/osmo-smdpp.py b/osmo-smdpp.py index 94e64e8..69fae15 100755 --- a/osmo-smdpp.py +++ b/osmo-smdpp.py @@ -110,6 +110,7 @@ from cryptography.hazmat.primitives import hashes # noqa: E402 from cryptography.hazmat.primitives.asymmetric import ec, dh # noqa: E402 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption, ParameterFormat # noqa: E402 +from cryptography.x509.oid import ExtensionOID, NameOID # noqa: E402 from pathlib import Path # noqa: E402 import json # noqa: E402 import sys # noqa: E402 @@ -121,6 +122,7 @@ from pprint import pprint as pp # noqa: E402
import base64 # noqa: E402 +import time # noqa: E402 from base64 import b64decode # noqa: E402 from klein import Klein # noqa: E402 from twisted.web.iweb import IRequest # noqa: E402 @@ -131,7 +133,11 @@ from pySim.esim import saip, PMO # noqa: E402 from pySim.esim.es8p import ProfileMetadata,UnprotectedProfilePackage,ProtectedProfilePackage,BoundProfilePackage,BspInstance # noqa: E402 from pySim.esim.x509_cert import oid, cert_policy_has_oid, cert_get_auth_key_id # noqa: E402 -from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id, VerifyError # noqa: E402 +from cryptography.x509 import ExtensionNotFound # noqa: E402 +from pySim.esim.x509_cert import CertAndPrivkey, CertificateSet, cert_get_subject_key_id # noqa: E402 +from pySim.esim import x509_err # noqa: E402 +from datetime import datetime, timezone # noqa: E402 +import hashlib # noqa: E402
import logging # noqa: E402 logger = logging.getLogger(__name__) @@ -344,6 +350,141 @@ return encode_dss_signature(r, s)
+def compute_confirmation_code_hash(confirmation_code: str, transaction_id: bytes) -> bytes: + """Compute confirmation code hash according to SGP.22 specification. + Hashed Confirmation Code = SHA256(SHA256(Confirmation Code) | TransactionID) + """ + + # Convert confirmation code from hex string to bytes (like EID handling) + cc_bytes = h2b(confirmation_code) + + # Step 1: SHA256(Confirmation Code) + first_hash = hashlib.sha256(cc_bytes).digest() + + # Step 2: SHA256(SHA256(CC) | TransactionID) + return hashlib.sha256(first_hash + transaction_id).digest() + + +def validate_eum_certificate(eum_cert: x509.Certificate) -> None: + """Validate EUM certificate according to SGP.22 requirements. + Raises ApiError with appropriate error codes for different validation failures.""" + + # Check KeyUsage extension + try: + key_usage = eum_cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) + # EUM certificates are intermediate CAs that sign eUICC certificates, so they need keyCertSign + if not key_usage.value.key_cert_sign: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + # Check for critical flag + if not key_usage.critical: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + except x509.ExtensionNotFound: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + + # ExtendedKeyUsage not present in EUM certificates as they are intermediate CAs (?) + + # Check Certificate Policies + try: + cert_policies = eum_cert.extensions.get_extension_for_oid(ExtensionOID.CERTIFICATE_POLICIES) + # Check for required EUM policy OIDs + has_valid_policy = False + for policy in cert_policies.value: + policy_oid = policy.policy_identifier.dotted_string + # EUM policies: 2.23.146.1.2.1.2 (old) or 2.23.146.1.2.1.0.0.0 (new) + if policy_oid in ['2.23.146.1.2.1.2', '2.23.146.1.2.1.0.0.0']: + has_valid_policy = True + break + if not has_valid_policy: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + except x509.ExtensionNotFound: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + + # Check BasicConstraints - EUM is an intermediate CA so it should have CA:TRUE (?) + try: + basic_constraints = eum_cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS) + if not basic_constraints.value.ca: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + # pathLenConstraint of 0 means it can only sign end-entity certificates + if basic_constraints.value.path_length is not None and basic_constraints.value.path_length != 0: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + except x509.ExtensionNotFound: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + + +def validate_euicc_certificate(euicc_cert: x509.Certificate) -> None: + """Validate eUICC certificate according to SGP.22 requirements. + Raises ApiError with appropriate error codes for different validation failures.""" + + # Check KeyUsage extension + try: + key_usage = euicc_cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) + if not key_usage.value.digital_signature: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + if key_usage.value.key_cert_sign: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except x509.ExtensionNotFound: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + + # ExtendedKeyUsage not present in eUICC certificates + + # Check Certificate Policies + try: + cert_policies = euicc_cert.extensions.get_extension_for_oid(ExtensionOID.CERTIFICATE_POLICIES) + # Check for required eUICC policy OIDs + has_valid_policy = False + for policy in cert_policies.value: + policy_oid = policy.policy_identifier.dotted_string + # eUICC policies would be under GSMA arc + if policy_oid.startswith('2.23.146.1.2.1'): + has_valid_policy = True + break + if not has_valid_policy: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except x509.ExtensionNotFound: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + + # Check Subject fields + subject = euicc_cert.subject + + # Verify Organization field + try: + org_attrs = subject.get_attributes_for_oid(NameOID.ORGANIZATION_NAME) + if not org_attrs: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except Exception: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + + # Verify SerialNumber field (contains EID) + try: + serial_attrs = subject.get_attributes_for_oid(NameOID.SERIAL_NUMBER) + if not serial_attrs: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + eid = serial_attrs[0].value + if len(eid) != 32 or not all(c in '0123456789ABCDEFabcdef' for c in eid): + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except Exception as e: + if not isinstance(e, ApiError): + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + + class ApiError(Exception): def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None, subject_id: Optional[str] = None): @@ -401,14 +542,19 @@ cs = CertificateSet(ci_cert) cs.verify_cert_chain(self.dp_auth.cert) return True - except VerifyError: + except x509_err.VerifyError: continue return False
- def __init__(self, server_hostname: str, ci_certs_path: str, common_cert_path: str, use_brainpool: bool = False, in_memory: bool = False): + def __init__(self, server_hostname: str, ci_certs_path: str, common_cert_path: str, use_brainpool: bool = False, in_memory: bool = False, test_mode: bool = False): self.server_hostname = server_hostname self.upp_dir = os.path.realpath(os.path.join(DATA_DIR, 'upp')) self.ci_certs = self.load_certs_from_path(ci_certs_path) + self.test_mode = test_mode + self.confirmation_codes = { + "CC_REQUIRED_TEST": "12345678" # Special matchingId for confirmation code tests + } if test_mode else {} + # load DPauth cert + key self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2) cert_dir = common_cert_path @@ -435,6 +581,230 @@ db_path = os.path.join(DATA_DIR, f"sm-dp-sessions-{session_db_suffix}") self.rss = rsp.RspSessionStore(filename=db_path, in_memory=False) logger.info(f"Using file-based session storage: {db_path}") + self.otpk_mapping = {} # Maps euicc_otpk -> (smdp_ot, smdp_otpk) for retry scenarios + + # Initialize profile configurations for test cases + if test_mode: + self._init_test_profiles() + else: + # Initialize empty profile dictionaries when not in test mode + self.activation_code_profiles = {} + self.event_based_profiles = {} + self.default_profiles = {} + + def _init_test_profiles(self): + """Initialize test profiles for different use cases.""" + # Activation code profiles + print("INIT: Initializing test profiles...") + self.activation_code_profiles = { + 'TEST123': { + 'matchingId': 'TEST123', + 'confirmationCode': '12345678', # 8-digit numeric code + 'iccid': '8900000000000000001F', + 'profileName': 'Test Profile 1', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None + }, + 'AC_NOT_RELEASED': { + 'matchingId': 'AC_NOT_RELEASED', + 'confirmationCode': '87654321', + 'iccid': '8900000000000000002F', + 'profileName': 'Not Released Profile', + 'state': 'not_released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None + }, + 'AC_WITH_CC': { + 'matchingId': 'AC_WITH_CC', + 'confirmationCode': '11223344', + 'iccid': '8900000000000000003F', + 'profileName': 'Profile Requiring CC', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None + }, + 'AC_NO_CC': { + 'matchingId': 'AC_NO_CC', + 'confirmationCode': None, # No confirmation code required + 'iccid': '8900000000000000004F', + 'profileName': 'Profile Without CC', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None + }, + # Error test profiles + 'AC_NO_ELIGIBLE': { + 'matchingId': 'AC_NO_ELIGIBLE', + 'confirmationCode': None, + 'iccid': '8900000000000000009F', + 'profileName': 'Profile with No Eligible Device', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None, + 'device_requirements': { + 'min_memory_mb': 999999, # Impossible requirement + 'required_features': ['IMPOSSIBLE_FEATURE'] + } + }, + 'AC_EXPIRED': { + 'matchingId': 'AC_EXPIRED', + 'confirmationCode': None, + 'iccid': '8900000000000000010F', + 'profileName': 'Expired Download Order', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': '2020-01-01T00:00:00Z' # Expired + }, + 'AC_MAX_RETRIES': { + 'matchingId': 'AC_MAX_RETRIES', + 'confirmationCode': None, + 'iccid': '8900000000000000011F', + 'profileName': 'Max Retries Exceeded', + 'state': 'released', + 'download_attempts': 5, # Already exceeded max attempts + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None + }, + 'AC_RESTRICTED_EID': { + 'matchingId': 'AC_RESTRICTED_EID', + 'confirmationCode': None, + 'iccid': '8900000000000000012F', + 'profileName': 'Profile Restricted to Different EID', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': '89999999999999999999999999999999', # Different EID + 'expiration': None + }, + 'AC_OTHER_EID': { + 'matchingId': 'AC_OTHER_EID', + 'confirmationCode': None, + 'iccid': '8900000000000000013F', + 'profileName': 'Profile for Other EID', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': '89888888888888888888888888888888', # Different EID + 'expiration': None + }, + 'MATCHING_ID_1': { + 'matchingId': 'MATCHING_ID_1', + 'confirmationCode': None, + 'iccid': '8900000000000000017F', + 'profileName': 'Test Activation Code Profile', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': '89049032123451234512345678901235', + 'expiration': None + }, + 'CC_REQUIRED_TEST': { + 'matchingId': 'CC_REQUIRED_TEST', + 'confirmationCode': '12345678', # Requires confirmation code + 'iccid': '8900000000000000019F', + 'profileName': 'CC Required Test Profile', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None + } + } + + # SM-DS event-based profiles + self.event_based_profiles = { + 'EVENT_001': { + 'matchingId': 'EVENT_001', + 'confirmationCode': '55667788', + 'iccid': '8900000000000000005F', + 'profileName': 'Event-based Profile 1', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None + }, + 'UNMATCHED_EVENT': { + 'matchingId': 'UNMATCHED_EVENT', + 'confirmationCode': '99887766', + 'iccid': '8900000000000000006F', + 'profileName': 'Unmatched Event Profile', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': '89001012012341234012345678901224', # Different EID + 'expiration': None + }, + 'EVENT_NORMAL': { + 'matchingId': 'EVENT_NORMAL', + 'confirmationCode': None, + 'iccid': '8900000000000000014F', + 'profileName': 'Normal SM-DS Event', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None + }, + 'EVENT_RESTRICTED': { + 'matchingId': 'EVENT_RESTRICTED', + 'confirmationCode': None, + 'iccid': '8900000000000000015F', + 'profileName': 'SM-DS Event Restricted to Different EID', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': '89777777777777777777777777777777', # Different EID + 'expiration': None + }, + 'MATCHING_ID_EVENT': { + 'matchingId': 'MATCHING_ID_EVENT', + 'confirmationCode': None, + 'iccid': '8900000000000000018F', + 'profileName': 'Test Event-based Profile', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': '89049032123451234512345678901235', + 'expiration': None + } + } + + # Default SM-DP+ profiles (associated with specific EIDs) + self.default_profiles = { + '89001012012341234012345678901234': { # Test EID + 'confirmationCode': '12345678', + 'iccid': '8900000000000000007F', + 'profileName': 'Default Profile for Test EID', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'expiration': None + }, + '89049032123451234512345678901235': { # EID1 from test specs + 'confirmationCode': None, + 'iccid': '8900000000000000020F', + 'profileName': 'Default Profile for EID1', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'expiration': None + }, + }
@app.handle_errors(ApiError) def handle_apierror(self, request: IRequest, failure): @@ -475,12 +845,12 @@
@app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST']) @rsp_api_wrapper - def initiateAutentication(self, request: IRequest, content: dict) -> dict: + def initiateAuthentication(self, request: IRequest, content: dict) -> dict: """See ES9+ InitiateAuthentication SGP.22 Section 5.6.1""" - # Verify that the received address matches its own SM-DP+ address, where the comparison SHALL be - # case-insensitive. Otherwise, the SM-DP+ SHALL return a status code "SM-DP+ Address - Refused". - if content['smdpAddress'] != self.server_hostname: - raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address') + # SGP.22 v2.5 Section 5.6.1: Verify that the received address matches its own SM-DP+ address, + # where the comparison SHALL be case-insensitive. + if content['smdpAddress'].lower() != self.server_hostname.lower(): + raise ApiError('8.8.1', '3.8', 'Invalid SM-DP+ Address') # SGP.22 Table 46
euiccChallenge = b64decode(content['euiccChallenge']) if len(euiccChallenge) != 16: @@ -489,7 +859,22 @@ euiccInfo1_bin = b64decode(content['euiccInfo1']) euiccInfo1 = rsp.asn1.decode('EUICCInfo1', euiccInfo1_bin) logger.debug("Rx euiccInfo1: %s" % euiccInfo1) - #euiccInfo1['svn'] + + # Validate specification version (SVN) + svn = euiccInfo1.get('svn', b'\x02\x02\x00') # Default to v2.2.0 if not present + # Convert SVN bytes to version tuple (major, minor, revision) + svn_version = (svn[0], svn[1], svn[2]) + + # SM-DP+ supports versions from 2.0.0 to 2.3.x + min_version = (2, 0, 0) + max_version = (2, 3, 255) # Allow any 2.3.x version + + if svn_version < min_version: + # SGP.22 Table 46: Specification Version Number - Unsupported + raise ApiError('8.8.3', '3.1', 'The Specification Version Number indicated by the eUICC is not supported by the SM-DP+') + elif svn_version[:2] > max_version[:2]: # Compare major.minor only for upper bound + # SGP.22 Table 46: Specification Version Number - Unsupported + raise ApiError('8.8.3', '3.1', 'The Specification Version Number indicated by the eUICC is not supported by the SM-DP+')
# TODO: If euiccCiPKIdListForSigningV3 is present ...
@@ -497,25 +882,36 @@ if 'euiccCiPKIdListForSigningV3' in euiccInfo1: pkid_list = pkid_list + euiccInfo1['euiccCiPKIdListForSigningV3']
- # Validate that SM-DP+ supports certificate chains for verification - verification_pkid_list = euiccInfo1.get('euiccCiPKIdListForVerification', []) - if verification_pkid_list and not self.validate_certificate_chain_for_verification(verification_pkid_list): - raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPauth.SIG which chains to one of the eSIM CA Root CA Certificate with a Public Key supported by the eUICC') - - # verify it supports one of the keys indicated by euiccCiPKIdListForSigning + # First verify PKIDs for signing are supported (check this before verification PKIDs) ci_cert = None for x in pkid_list: ci_cert = self.ci_get_cert_for_pkid(x) + if not ci_cert: + # Skip if no certificate found for this PKID + continue # we already support multiple CI certificates but only one set of DPauth + DPpb keys. So we must # make sure we choose a CI key-id which has issued both the eUICC as well as our own SM-DP side # certs. - if ci_cert and cert_get_subject_key_id(ci_cert) == self.dp_auth.get_authority_key_identifier().key_identifier: + try: + ci_subject_key_id = cert_get_subject_key_id(ci_cert) + except Exception: + # For CI certs, we need to get the subject key identifier differently + subject_key_ext = ci_cert.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_KEY_IDENTIFIER) + ci_subject_key_id = subject_key_ext.value.key_identifier + + if ci_subject_key_id == self.dp_auth.get_authority_key_identifier().key_identifier: break else: ci_cert = None if not ci_cert: raise ApiError('8.8.2', '3.1', 'None of the proposed Public Key Identifiers is supported by the SM-DP+')
+ # After verifying signing PKIDs, validate certificate chains for verification + verification_pkid_list = euiccInfo1.get('euiccCiPKIdListForVerification', []) + if verification_pkid_list and not self.validate_certificate_chain_for_verification(verification_pkid_list): + # SGP.22 Table 46: SM-DP+ Certificate - Unavailable + raise ApiError('8.8.4', '3.7', 'The SM-DP+ has no CERT.DPAuth.ECDSA signed by one of the GSMA CI Public Key supported by the eUICC') + # Generate a TransactionID which is used to identify the ongoing RSP session. The TransactionID # SHALL be unique within the scope and lifetime of each SM-DP+. transactionId = uuid.uuid4().hex.upper() @@ -584,69 +980,265 @@ euicc_cert = x509.load_der_x509_certificate(euiccCertificate_bin) eum_cert = x509.load_der_x509_certificate(eumCertificate_bin)
- # Verify that the transactionId is known and relates to an ongoing RSP session. Otherwise, the SM-DP+ - # SHALL return a status code "TransactionId - Unknown" + # Verify that the transactionId is known and relates to an ongoing RSP session. ss = self.rss.get(transactionId, None) if ss is None: - raise ApiError('8.10.1', '3.9', 'Unknown') + # SGP.22 Table 49: TransactionId - Unknown + raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown') ss.euicc_cert = euicc_cert ss.eum_cert = eum_cert # TODO: do we need this in the state?
- # Verify that the Root Certificate of the eUICC certificate chain corresponds to the - # euiccCiPKIdToBeUsed or TODO: euiccCiPKIdToBeUsedV3 - if cert_get_auth_key_id(eum_cert) != ss.ci_cert_id: - raise ApiError('8.11.1', '3.9', 'Unknown') + # First verify that the Root Certificate of the eUICC certificate chain corresponds to the + # euiccCiPKIdToBeUsed - this check must come before detailed certificate validation + try: + eum_auth_key_id = cert_get_auth_key_id(eum_cert) + except (ExtensionNotFound, Exception) as e: + logger.error(f"Failed to get AuthorityKeyIdentifier from EUM certificate: {e}") + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + + if eum_auth_key_id != ss.ci_cert_id: + # SGP.22 Table 49: CI Public Key - Unknown + raise ApiError('8.11.1', '3.9', 'Unknown CI Public Key. The CI used by the EUM Certificate is not a trusted root for the SM-DP+') + + # Certificate validation checks - only do after CI key verification + try: + # Check if certificates are valid (not expired, proper format, etc.) + now = datetime.now(timezone.utc) + + # Check EUM certificate validity period + if eum_cert.not_valid_after_utc < now: + # SGP.22 Table 49: EUM Certificate - Expired + raise ApiError('8.1.2', '6.3', 'Certificate has expired') + if eum_cert.not_valid_before_utc > now: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + + # Check eUICC certificate validity period + if euicc_cert.not_valid_after_utc < now: + # SGP.22 Table 49: eUICC Certificate - Expired + raise ApiError('8.1.3', '6.3', 'Certificate has expired') + if euicc_cert.not_valid_before_utc > now: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + + # Perform detailed certificate validation + validate_eum_certificate(eum_cert) + validate_euicc_certificate(euicc_cert) + + except ApiError: + raise # Re-raise our API errors + except Exception as e: + logger.error(f"Certificate validation error: {e}") + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid')
# Verify the validity of the eUICC certificate chain - cs = CertificateSet(self.ci_get_cert_for_pkid(ss.ci_cert_id)) - cs.add_intermediate_cert(eum_cert) - # TODO v3: otherCertsInChain try: + cs = CertificateSet(self.ci_get_cert_for_pkid(ss.ci_cert_id)) + cs.add_intermediate_cert(eum_cert) + # TODO v3: otherCertsInChain cs.verify_cert_chain(euicc_cert) - except VerifyError: - raise ApiError('8.1.3', '6.1', 'Verification failed (certificate chain)') - # raise ApiError('8.1.3', '6.3', 'Expired') + except x509_err.MissingIntermediateCert as e: + # Check if the missing certificate is the EUM cert + if hasattr(e, 'auth_key_id') and e.auth_key_id == b2h(cert_get_auth_key_id(eum_cert)): + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + else: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except x509_err.CertificateRevoked as e: + eum_serial = format(eum_cert.serial_number, 'X') + euicc_serial = format(euicc_cert.serial_number, 'X') + + if e.cert_serial.upper() == eum_serial.upper(): + # SGP.22 Table 49: EUM Certificate - Expired (revoked uses expired code) + raise ApiError('8.1.2', '6.3', 'Certificate has expired') + elif e.cert_serial.upper() == euicc_serial.upper(): + # SGP.22 Table 49: eUICC Certificate - Expired (revoked uses expired code) + raise ApiError('8.1.3', '6.3', 'Certificate has expired') + except x509_err.MaxDepthExceeded: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except x509_err.SignatureVerification as e: + # Check which certificate's signature failed + error_str = str(e) + if 'EUM' in error_str or 'intermediate' in error_str: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + else: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except x509_err.VerifyError as e: + # Generic certificate chain error for any other x509_err subclasses + error_str = str(e) + if 'EUM' in error_str or 'intermediate' in error_str: + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + else: + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except ValueError as e: + # This could be raised by add_intermediate_cert for invalid EUM cert + if 'intermediate certificate' in str(e): + # SGP.22 Table 49: EUM Certificate - Verification Failed + raise ApiError('8.1.2', '6.1', 'Certificate is invalid') + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid') + except Exception as e: + logger.error(f"Certificate chain verification error: {e}") + # Check if it's a missing CI certificate issue + if 'ci_cert_id' in str(e) or 'Unknown' in str(e): + # SGP.22 Table 49: CI Public Key - Unknown + raise ApiError('8.11.1', '3.9', 'Unknown CI Public Key. The CI used by the EUM Certificate is not a trusted root for the SM-DP+') + # SGP.22 Table 49: eUICC Certificate - Verification Failed + raise ApiError('8.1.3', '6.1', 'Certificate is invalid')
# Verify euiccSignature1 over euiccSigned1 using pubkey from euiccCertificate. - # Otherwise, the SM-DP+ SHALL return a status code "eUICC - Verification failed" if not self._ecdsa_verify(euicc_cert, euiccSignature1_bin, euiccSigned1_bin): - raise ApiError('8.1', '6.1', 'Verification failed (euiccSignature1 over euiccSigned1)') + # SGP.22 Table 49: eUICC - Verification Failed + raise ApiError('8.1', '6.1', 'eUICC signature is invalid or serverChallenge is invalid')
ss.eid = ss.euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value logger.debug("EID (from eUICC cert): %s" % ss.eid)
# Verify EID is within permitted range of EUM certificate if not validate_eid_range(ss.eid, eum_cert): - raise ApiError('8.1.4', '6.1', 'EID is not within the permitted range of the EUM certificate') + # Use eUICC Certificate error since EID is from eUICC cert + raise ApiError('8.1.3', '6.1', 'Certificate is invalid')
# Verify that the serverChallenge attached to the ongoing RSP session matches the - # serverChallenge returned by the eUICC. Otherwise, the SM-DP+ SHALL return a status code "eUICC - - # Verification failed". + # serverChallenge returned by the eUICC. if euiccSigned1['serverChallenge'] != ss.serverChallenge: - raise ApiError('8.1', '6.1', 'Verification failed (serverChallenge)') + # SGP.22 Table 49: eUICC - Verification Failed + raise ApiError('8.1', '6.1', 'eUICC signature is invalid or serverChallenge is invalid') + + # Verify that the transactionId in euiccSigned1 matches the outer transactionId + if euiccSigned1['transactionId'] != h2b(transactionId): + # SGP.22 Table 49: TransactionId - Unknown + raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
# If ctxParams1 contains a ctxParamsForCommonAuthentication data object, the SM-DP+ Shall [...] - # TODO: We really do a very simplistic job here, this needs to be properly implemented later, - # considering all the various cases, profile state, etc. if euiccSigned1['ctxParams1'][0] == 'ctxParamsForCommonAuthentication': cpca = euiccSigned1['ctxParams1'][1] matchingId = cpca.get('matchingId', None) - if not matchingId: - # TODO: check if any pending profile downloads for the EID - raise ApiError('8.2.6', '3.8', 'Refused') - if matchingId: - # look up profile based on matchingID. We simply check if a given file exists for now.. - path = os.path.join(self.upp_dir, matchingId) + '.der' - # prevent directory traversal attack - if os.path.commonprefix((os.path.realpath(path),self.upp_dir)) != self.upp_dir: - raise ApiError('8.2.6', '3.8', 'Refused') - if not os.path.isfile(path) or not os.access(path, os.R_OK): - raise ApiError('8.2.6', '3.8', 'Refused') - ss.matchingId = matchingId - with open(path, 'rb') as f: - pes = saip.ProfileElementSequence.from_der(f.read()) - iccid_str = b2h(pes.get_pe_for_type('header').decoded['iccid']) + logger.debug(f"Extracted matchingId from request: {matchingId}") + + # Determine use case and find profile + profile_info = None + iccid_str = None + + if not matchingId or matchingId == '': + # Default SM-DP+ address use case - check if EID has pending profile + logger.debug(f"Default SM-DP+ use case for EID: {ss.eid}") + if self.test_mode and ss.eid in self.default_profiles: + profile_info = self.default_profiles[ss.eid] + iccid_str = profile_info['iccid'] + ss.matchingId = None # No matchingId for default case + elif not self.test_mode: + # In production mode, try to load a default profile from file system + # This is where real implementation would check database/backend + # SGP.22 Table 49: EID - Refused + raise ApiError('8.1.1', '3.8', 'EID doesn't match the expected value') + else: + # Test mode but no profile for this EID + # SGP.22 Table 49: EID - Refused + raise ApiError('8.1.1', '3.8', 'EID doesn't match the expected value') + + elif self.test_mode and matchingId.startswith('EVENT_'): + # SM-DS event-based use case (test mode only) + logger.debug(f"SM-DS event use case with matchingId: {matchingId}") + if matchingId in self.event_based_profiles: + profile_info = self.event_based_profiles[matchingId] + # Check if profile is associated with specific EID + if profile_info.get('associated_eid') and profile_info['associated_eid'] != ss.eid: + # SGP.22 Table 49: EID - Refused + raise ApiError('8.1.1', '3.8', 'EID doesn't match the expected value') + iccid_str = profile_info['iccid'] + ss.matchingId = matchingId + else: + # SGP.22 Table 49: MatchingID - Refused + raise ApiError('8.2.6', '3.8', 'MatchingID (AC_Token or EventID) is refused') + + else: + # Activation code use case + logger.debug(f"Activation code use case with matchingId: {matchingId}") + logger.debug(f"Available activation codes: {list(self.activation_code_profiles.keys())}") + if self.test_mode and matchingId in self.activation_code_profiles: + profile_info = self.activation_code_profiles[matchingId] + # Check if profile is associated with specific EID + if profile_info.get('associated_eid') and profile_info['associated_eid'] != ss.eid: + # SGP.22 Table 49: EID - Refused + raise ApiError('8.1.1', '3.8', 'EID doesn't match the expected value') + iccid_str = profile_info['iccid'] + ss.matchingId = matchingId + else: + path = os.path.join(self.upp_dir, matchingId) + '.der' + # prevent directory traversal attack + if os.path.commonprefix((os.path.realpath(path),self.upp_dir)) != self.upp_dir: + # SGP.22 Table 49: MatchingID - Refused + raise ApiError('8.2.6', '3.8', 'MatchingID (AC_Token or EventID) is refused') + if os.path.isfile(path) and os.access(path, os.R_OK): + with open(path, 'rb') as f: + pes = saip.ProfileElementSequence.from_der(f.read()) + iccid_str = b2h(pes.get_pe_for_type('header').decoded['iccid']) + ss.matchingId = matchingId + # temporary profile info for legacy files + profile_info = { + 'confirmationCode': self.confirmation_codes.get(matchingId), + 'state': 'released', + 'profileName': matchingId + } + else: + # SGP.22 Table 49: MatchingID - Refused + raise ApiError('8.2.6', '3.8', 'MatchingID (AC_Token or EventID) is refused') + + # Validate profile state and other conditions + if profile_info: + # Check if profile is released + if profile_info.get('state') == 'not_released': + # SGP.22 Table 49: Profile - Not allowed + raise ApiError('8.2', '1.2', 'Profile has not yet been released') + + # Check for expired download order + if profile_info.get('expiration'): + exp_date = datetime.fromisoformat(profile_info['expiration'].replace('Z', '+00:00')) + if datetime.now(timezone.utc) > exp_date: + # SGP.22 Table 49: Download order - Time to Live Expired + raise ApiError('8.8.5', '4.10', 'The Download order has expired') + + # Check maximum download attempts + if profile_info.get('download_attempts', 0) >= 3: + # SGP.22 Table 49: Download order - Maximum number of retries exceeded + raise ApiError('8.8.5', '6.4', 'The maximum number of retries for the Profile download order has been exceeded') + + # Check device eligibility (for Test #15 - test mode only) + if self.test_mode and profile_info.get('device_requirements'): + # fixme real implementation would check against actual device capabilities + if profile_info['device_requirements'].get('min_memory_mb', 0) > 1000: + # SGP.22 Table 49: Profile Type - Stopped on warning + raise ApiError('8.2.5', '4.3', 'No eligible Profile for this eUICC/Device') + + # Set confirmation code requirement + # Special handling: When matchingId is omitted or empty, don't require confirmation code + # per SGP.23 test sequences #14-#18 + if self.test_mode and (not matchingId or matchingId == ''): + ss.ccRequiredFlag = False + logger.info("ccRequiredFlag=False for omitted/empty matchingId (default SM-DP+ use case)") + elif profile_info.get('confirmationCode'): + ss.ccRequiredFlag = True + ss.expected_confirmation_code = profile_info['confirmationCode'] + logger.info(f"Set ccRequiredFlag=True with code: {ss.expected_confirmation_code}") + else: + ss.ccRequiredFlag = False + logger.info("ccRequiredFlag=False, no confirmation code required") + + # Use profile name from profile_info if available + profile_name = profile_info.get('profileName', matchingId or 'DefaultProfile') + else: + # Should not happen if all cases are handled above + raise ApiError('8.2.6', '3.8', 'Profile not found') else: # there's currently no other option in the ctxParams1 choice, so this cannot happen raise ApiError('1.3.1', '2.2', 'ctxParams1 missing mandatory ctxParamsForCommonAuthentication') @@ -654,19 +1246,25 @@ # FIXME: we actually want to perform the profile binding herr, and read the profile metadat from the profile
# Put together profileMetadata + _bin - ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=matchingId) + ss.profileMetadata = ProfileMetadata(iccid_bin=h2b(swap_nibbles(iccid_str)), spn="OsmocomSPN", profile_name=profile_name) # enable notifications for all operations for event in ['enable', 'disable', 'delete']: ss.profileMetadata.add_notification(event, self.server_hostname) profileMetadata_bin = ss.profileMetadata.gen_store_metadata_request()
# Put together smdpSigned2 + _bin + cc_flag = getattr(ss, 'ccRequiredFlag', False) + logger.info(f"Setting ccRequiredFlag in SmdpSigned2: {cc_flag}") + logger.info(f"Session state has ccRequiredFlag: {hasattr(ss, 'ccRequiredFlag')}") + if hasattr(ss, 'ccRequiredFlag'): + logger.info(f"Session ccRequiredFlag value: {ss.ccRequiredFlag}") smdpSigned2 = { 'transactionId': h2b(ss.transactionId), - 'ccRequiredFlag': False, # whether the Confirmation Code is required + 'ccRequiredFlag': cc_flag, # whether the Confirmation Code is required #'bppEuiccOtpk': None, # whether otPK.EUICC.ECKA already used for binding the BPP, tag '5F49' } smdpSigned2_bin = rsp.asn1.encode('SmdpSigned2', smdpSigned2) + logger.info(f"Encoded smdpSigned2 with ccRequiredFlag={cc_flag}")
ss.smdpSignature2_do = b'\x5f\x37\x40' + self.dp_pb.ecdsa_sign(smdpSigned2_bin + b'\x5f\x37\x40' + euiccSignature1_bin)
@@ -689,6 +1287,7 @@ # Verify that the received transactionId is known and relates to an ongoing RSP session ss = self.rss.get(transactionId, None) if not ss: + # SGP.22 Table 51: TransactionId - Unknown raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
prepDownloadResp_bin = b64decode(content['prepareDownloadResponse']) @@ -699,7 +1298,9 @@ r_err = prepDownloadResp[1] #r_err['transactionId'] #r_err['downloadErrorCode'] - raise ValueError("downloadResponseError %s" % r_err) + # Download error from eUICC - use generic eUICC verification failed + # SGP.22 Table 51: eUICC - Verification Failed + raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
r_ok = prepDownloadResp[1]
@@ -707,11 +1308,13 @@ euiccSigned2 = r_ok['euiccSigned2'] euiccSigned2_bin = rsp.extract_euiccSigned2(prepDownloadResp_bin) if not self._ecdsa_verify(ss.euicc_cert, r_ok['euiccSignature2'], euiccSigned2_bin + ss.smdpSignature2_do): + # SGP.22 Table 51: eUICC - Verification Failed raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
# not in spec: Verify that signed TransactionID is outer transaction ID if h2b(transactionId) != euiccSigned2['transactionId']: - raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId') + # SGP.22 Table 51: TransactionId - Unknown + raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
# store otPK.EUICC.ECKA in session state ss.euicc_otpk = euiccSigned2['euiccOtpk'] @@ -720,11 +1323,30 @@ # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter # Reference value of CERT.DPpb.ECDDSA logger.debug("curve = %s" % self.dp_pb.get_curve()) - ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve()) - # extract the public key in (hopefully) the right format for the ES8+ interface - ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint) - logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk)) - logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption()))) + + # Check if we've seen this euicc_otpk before (retry scenario) + euicc_otpk_hex = b2h(euiccSigned2['euiccOtpk']) + if euicc_otpk_hex in self.otpk_mapping: + # Retry scenario - reuse existing keys + existing_data = self.otpk_mapping[euicc_otpk_hex] + ss.smdp_ot = existing_data['smdp_ot'] + ss.smdp_otpk = existing_data['smdp_otpk'] + logger.debug("Retry scenario detected - reusing existing smdp_otpk") + logger.debug("smdpOtpk (reused): %s" % b2h(ss.smdp_otpk)) + else: + # Generate new keys + ss.smdp_ot = ec.generate_private_key(self.dp_pb.get_curve()) + # extract the public key in (hopefully) the right format for the ES8+ interface + ss.smdp_otpk = ss.smdp_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint) + logger.debug("smdpOtpk: %s" % b2h(ss.smdp_otpk)) + logger.debug("smdpOtsk: %s" % b2h(ss.smdp_ot.private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption()))) + + # Store the mapping for retry scenarios + self.otpk_mapping[euicc_otpk_hex] = { + 'smdp_ot': ss.smdp_ot, + 'smdp_otpk': ss.smdp_otpk, + 'timestamp': time.time() + }
ss.host_id = b'mahlzeit'
@@ -733,9 +1355,26 @@ ss.shared_secret = ss.smdp_ot.exchange(ec.ECDH(), euicc_public_key) logger.debug("shared_secret: %s" % b2h(ss.shared_secret))
- # TODO: Check if this order requires a Confirmation Code verification + # Check if this order requires a Confirmation Code verification + if getattr(ss, 'ccRequiredFlag', False): + if 'hashCc' not in euiccSigned2: + # SGP.22 Table 51: Confirmation Code - Mandatory Element Missing + raise ApiError('8.2.7', '2.2', 'Confirmation Code is missing')
- # Perform actual protection + binding of profile package (or return pre-bound one) + received_hash = euiccSigned2['hashCc'] + expected_hash = compute_confirmation_code_hash( + getattr(ss, 'expected_confirmation_code', ''), + h2b(transactionId) + ) + + if received_hash != expected_hash: + logger.debug("Confirmation code verification failed") + logger.debug(f"Expected hash: {b2h(expected_hash)}") + logger.debug(f"Received hash: {b2h(received_hash)}") + # SGP.22 Table 51: Confirmation Code - Refused + raise ApiError('8.2.7', '3.8', 'Confirmation Code is refused') + + # Perform actual protection + binding of profile package (or return pre-bound one) with open(os.path.join(self.upp_dir, ss.matchingId)+'.der', 'rb') as f: upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata) # HACK: Use empty PPP as we're still debuggin the configureISDP step, and we want to avoid @@ -772,6 +1411,7 @@ transactionId = b2h(pird['transactionId']) ss = self.rss.get(transactionId, None) if ss is None: + # For unknown transactionId, terminate processing but still return 204 logger.warning(f"Unable to find session for transactionId: {transactionId}") return None # Will return HTTP 204 with empty body profileInstallRes['euiccSignPIR'] @@ -779,7 +1419,9 @@ pird_bin = rsp.asn1.encode('ProfileInstallationResultData', pird) # verify eUICC signature if not self._ecdsa_verify(ss.euicc_cert, profileInstallRes['euiccSignPIR'], pird_bin): - raise Exception('ECDSA signature verification failed on notification') + # Even on verification failure, acknowledge receipt with HTTP 204 + logger.error('ECDSA signature verification failed on notification') + return None # Will return HTTP 204 with empty body logger.debug("Profile Installation Final Result: %s", pird['finalResult']) # remove session state del self.rss[transactionId] @@ -798,7 +1440,9 @@ cs.verify_cert_chain(euicc_cert) tbs_bin = rsp.asn1.encode('NotificationMetadata', otherSignedNotif['tbsOtherNotification']) if not self._ecdsa_verify(euicc_cert, otherSignedNotif['euiccNotificationSignature'], tbs_bin): - raise Exception('ECDSA signature verification failed on notification') + # Even on verification failure, acknowledge receipt with HTTP 204 + logger.error('ECDSA signature verification failed on notification') + return None # Will return HTTP 204 with empty body other_notif = otherSignedNotif['tbsOtherNotification'] pmo = PMO.from_bitstring(other_notif['profileManagementOperation']) eid = euicc_cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value @@ -807,7 +1451,9 @@ iccid = swap_nibbles(b2h(iccid)) logger.debug("handleNotification: EID %s: %s of %s" % (eid, pmo, iccid)) else: - raise ValueError(pendingNotification) + # Unknown notification type - still acknowledge with HTTP 204 + logger.error(f"Unknown notification type: {pendingNotification[0]}") + return None # Always return HTTP 204 with empty body
#@app.route('/gsma/rsp3/es9plus/handleDeviceChangeRequest, methods=['POST']') #@rsp_api_wrapper @@ -824,7 +1470,8 @@ # Verify that the received transactionId is known and relates to an ongoing RSP session ss = self.rss.get(transactionId, None) if ss is None: - raise ApiError('8.10.1', '3.9', 'The RSP session identified by the transactionId is unknown') + # SGP.22 Table 52: TransactionId - Unknown + raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
cancelSessionResponse_bin = b64decode(content['cancelSessionResponse']) cancelSessionResponse = rsp.asn1.decode('CancelSessionResponse', cancelSessionResponse_bin) @@ -839,15 +1486,34 @@ ecsr_bin = rsp.asn1.encode('EuiccCancelSessionSigned', ecsr) # Verify the eUICC signature (euiccCancelSessionSignature) using the PK.EUICC.SIG attached to the ongoing RSP session if not self._ecdsa_verify(ss.euicc_cert, cancelSessionResponseOk['euiccCancelSessionSignature'], ecsr_bin): + # SGP.22 Table 52: eUICC - Verification Failed raise ApiError('8.1', '6.1', 'eUICC signature is invalid')
# Verify that the received smdpOid corresponds to the one in SM-DP+ CERT.DPauth.SIG subj_alt_name = self.dp_auth.get_subject_alt_name() - if x509.ObjectIdentifier(ecsr['smdpOid']) != subj_alt_name.oid: - raise ApiError('8.8', '3.10', 'The provided SM-DP+ OID is invalid.') + + # Extract the SM-DP+ OID from the SubjectAlternativeName extension + smdp_oid_from_cert = None + for item in subj_alt_name: + if isinstance(item, x509.RegisteredID): + smdp_oid_from_cert = item.value + break + + if not smdp_oid_from_cert: + logger.error("No RegisteredID found in SM-DP+ certificate SubjectAlternativeName") + # SGP.22 Table 52: SM-DP+ - Invalid Association + raise ApiError('8.8', '3.10', 'The provided SM-DP+ OID is invalid') + + received_oid = x509.ObjectIdentifier(ecsr['smdpOid']) + + if received_oid != smdp_oid_from_cert: + logger.error(f"OID mismatch: received {received_oid}, expected {smdp_oid_from_cert}") + # SGP.22 Table 52: SM-DP+ - Invalid Association + raise ApiError('8.8', '3.10', 'The provided SM-DP+ OID is invalid')
if ecsr['transactionId'] != h2b(transactionId): - raise ApiError('8.10.1', '3.9', 'The signed transactionId != outer transactionId') + # SGP.22 Table 52: TransactionId - Unknown + raise ApiError('8.10.1', '3.9', 'The RSP session identified by the TransactionID is unknown')
# TODO: 1. Notify the Operator using the function "ES2+.HandleNotification" function # TODO: 2. Terminate the corresponding pending download process. @@ -855,7 +1521,8 @@
# delete actual session data del self.rss[transactionId] - return { 'transactionId': transactionId } + # Per SGP.22 section 6.5.2.10, cancelSession returns an empty response (header only) + return {}
def main(argv): @@ -869,12 +1536,14 @@ action='store_true', default=False) parser.add_argument("-m", "--in-memory", help="Use ephermal in-memory session storage (for concurrent runs)", action='store_true', default=False) + parser.add_argument("-t", "--test", help="Enable test mode with hardcoded test profiles", + action='store_true', default=False) args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
common_cert_path = os.path.join(DATA_DIR, args.certdir) - hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=args.brainpool) + hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=args.brainpool, test_mode=args.test) if(args.nossl): hs.app.run(args.host, args.port) else: diff --git a/pySim/esim/x509_cert.py b/pySim/esim/x509_cert.py index 8139a09..027905d 100644 --- a/pySim/esim/x509_cert.py +++ b/pySim/esim/x509_cert.py @@ -25,6 +25,7 @@ from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
from pySim.utils import b2h +from . import x509_err
def check_signed(signed: x509.Certificate, signer: x509.Certificate) -> bool: """Verify if 'signed' certificate was signed using 'signer'.""" @@ -64,9 +65,6 @@ id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6') id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7')
-class VerifyError(Exception): - """An error during certificate verification,""" - class CertificateSet: """A set of certificates consisting of a trusted [self-signed] CA root certificate, and an optional number of intermediate certificates. Can be used to verify the certificate chain @@ -135,7 +133,7 @@ # we cannot check if there's no CRL return if self.crl.get_revoked_certificate_by_serial_number(cert.serial_nr): - raise VerifyError('Certificate is present in CRL, verification failed') + raise x509_err.CertificateRevoked(cert.serial_nr)
def verify_cert_chain(self, cert: x509.Certificate, max_depth: int = 100): """Verify if a given certificate's signature chain can be traced back to the root CA of this @@ -150,13 +148,13 @@ return parent_cert = self.intermediate_certs.get(aki, None) if not parent_cert: - raise VerifyError('Could not find intermediate certificate for AuthKeyId %s' % b2h(aki)) + raise x509_err.MissingIntermediateCert(b2h(aki)) check_signed(c, parent_cert) # if we reach here, we passed (no exception raised) c = parent_cert depth += 1 if depth > max_depth: - raise VerifyError('Maximum depth %u exceeded while verifying certificate chain' % max_depth) + raise x509_err.MaxDepthExceeded(max_depth, depth)
def ecdsa_dss_to_tr03111(sig: bytes) -> bytes: diff --git a/pySim/esim/x509_err.py b/pySim/esim/x509_err.py new file mode 100644 index 0000000..35b4f41 --- /dev/null +++ b/pySim/esim/x509_err.py @@ -0,0 +1,58 @@ +"""X.509 certificate verification exceptions for GSMA eSIM.""" + +class VerifyError(Exception): + """Base class for certificate verification errors.""" + pass + + +class MissingIntermediateCert(VerifyError): + """Raised when an intermediate certificate in the chain cannot be found.""" + def __init__(self, auth_key_id: str): + self.auth_key_id = auth_key_id + super().__init__(f'Could not find intermediate certificate for AuthKeyId {auth_key_id}') + + +class CertificateRevoked(VerifyError): + """Raised when a certificate is found in the CRL.""" + def __init__(self, cert_serial: str = None): + self.cert_serial = cert_serial + msg = 'Certificate is present in CRL, verification failed' + if cert_serial: + msg += f' (serial: {cert_serial})' + super().__init__(msg) + + +class MaxDepthExceeded(VerifyError): + """Raised when certificate chain depth exceeds the maximum allowed.""" + def __init__(self, max_depth: int, actual_depth: int): + self.max_depth = max_depth + self.actual_depth = actual_depth + super().__init__(f'Maximum depth {max_depth} exceeded while verifying certificate chain (actual: {actual_depth})') + + +class SignatureVerification(VerifyError): + """Raised when certificate signature verification fails.""" + def __init__(self, cert_subject: str = None, signer_subject: str = None): + self.cert_subject = cert_subject + self.signer_subject = signer_subject + msg = 'Certificate signature verification failed' + if cert_subject and signer_subject: + msg += f': {cert_subject} not signed by {signer_subject}' + super().__init__(msg) + + +class InvalidCertificate(VerifyError): + """Raised when a certificate is invalid (missing required fields, wrong type, etc).""" + def __init__(self, reason: str): + self.reason = reason + super().__init__(f'Invalid certificate: {reason}') + + +class CertificateExpired(VerifyError): + """Raised when a certificate has expired.""" + def __init__(self, cert_subject: str = None): + self.cert_subject = cert_subject + msg = 'Certificate has expired' + if cert_subject: + msg += f': {cert_subject}' + super().__init__(msg) \ No newline at end of file