Hoernchen has uploaded this change for review. ( https://gerrit.osmocom.org/c/pysim/+/40883?usp=email )
Change subject: smdpp: add es2p interface + tests ......................................................................
smdpp: add es2p interface + tests
This adds the es2p interface, integrated with es9p, with some würgarounds to make the test mode work. It comes with a few basic tests to ensure it does not immediately break.
This required some minor refactoring because the single file grew too large and linter and type checker kept complaining.
Change-Id: I8c7cf192695fdf5c37cd39effffcd9056085a2e5 --- A contrib/generate_self_signed_operator_cert.py M osmo-smdpp.py A pySim/esim/es2plus_commons.py A pySim/esim/smdpp_common.py A smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned.pem A smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned_combined.pem A smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned_key.pem A tests/unittests/test_es2plus.py A tests/unittests/test_es2plus_es9plus_http.py A tests/unittests/test_unified_9p_2p.py 10 files changed, 2,312 insertions(+), 266 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/83/40883/1
diff --git a/contrib/generate_self_signed_operator_cert.py b/contrib/generate_self_signed_operator_cert.py new file mode 100755 index 0000000..a1474e6 --- /dev/null +++ b/contrib/generate_self_signed_operator_cert.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +""" +Generate a self-signed operator certificate for ES2+ testing. +""" + +import os +from datetime import datetime, timedelta +from cryptography import x509 +from cryptography.x509.oid import NameOID, ExtensionOID +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.backends import default_backend + +SELFPATH=os.path.abspath(os.path.dirname(__file__)) + +def generate_self_signed_operator_cert(operator_name, output_dir): + """Generate a self-signed operator certificate.""" + + # Generate key pair (using NIST P-256 curve) + private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) + + # Build subject (and issuer, since self-signed) + subject = issuer = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, f"{operator_name} ES2+ Client"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, operator_name), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "eSIM Operations"), + x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + ]) + + # Build certificate + builder = x509.CertificateBuilder() + builder = builder.subject_name(subject) + builder = builder.issuer_name(issuer) + builder = builder.not_valid_before(datetime.now() - timedelta(days=1)) + builder = builder.not_valid_after(datetime.now() + timedelta(days=365)) # 1 year + builder = builder.serial_number(x509.random_serial_number()) + builder = builder.public_key(private_key.public_key()) + + # Add extensions + # Subject Key Identifier - this is what SM-DP+ will use to identify the operator + builder = builder.add_extension( + x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), + critical=False + ) + + # Authority Key Identifier (points to itself since self-signed) + builder = builder.add_extension( + x509.AuthorityKeyIdentifier.from_issuer_public_key(private_key.public_key()), + critical=False + ) + + # Basic Constraints + builder = builder.add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True + ) + + # Key Usage + builder = builder.add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=False, + key_encipherment=True, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False + ), + critical=True + ) + + # Extended Key Usage - Client Authentication + builder = builder.add_extension( + x509.ExtendedKeyUsage([ + x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH, + ]), + critical=True + ) + + # Sign the certificate + certificate = builder.sign(private_key, hashes.SHA256(), default_backend()) + + # Create output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + # Save certificate in PEM format + cert_pem_path = os.path.join(output_dir, f"{operator_name}_selfsigned.pem") + with open(cert_pem_path, 'wb') as f: + f.write(certificate.public_bytes(serialization.Encoding.PEM)) + + # Save private key in PEM format + key_pem_path = os.path.join(output_dir, f"{operator_name}_selfsigned_key.pem") + with open(key_pem_path, 'wb') as f: + f.write(private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() + )) + + # Save combined cert+key for easy use with es2p_client.py + combined_path = os.path.join(output_dir, f"{operator_name}_selfsigned_combined.pem") + with open(combined_path, 'wb') as f: + f.write(certificate.public_bytes(serialization.Encoding.PEM)) + f.write(private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() + )) + + ski_ext = certificate.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_KEY_IDENTIFIER) + ski_hex = ski_ext.value.key_identifier.hex() + ski_formatted = ':'.join(ski_hex[i:i+2].upper() for i in range(0, len(ski_hex), 2)) + + print(f"Generated self-signed certificate for {operator_name}:") + print(f" Certificate: {cert_pem_path}") + print(f" Private Key: {key_pem_path}") + print(f" Combined (for es2p_client.py): {combined_path}") + print(f" Subject Key Identifier (SKI): {ski_formatted}") + print("\nThis SKI can be added to the SM-DP+ trusted operators list.") + + return combined_path, ski_formatted + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Generate self-signed operator certificate for ES2+ testing") + parser.add_argument("--operator", default="TEST_OPERATOR", help="Operator name") + parser.add_argument("--output-dir", default=os.path.abspath(SELFPATH+"/../smdpp-data/certs/SelfSignedOperators"), help="Output directory") + + args = parser.parse_args() + + generate_self_signed_operator_cert(args.operator, args.output_dir) \ No newline at end of file diff --git a/osmo-smdpp.py b/osmo-smdpp.py index d88c05c..3d9926c 100755 --- a/osmo-smdpp.py +++ b/osmo-smdpp.py @@ -3,6 +3,7 @@ # Early proof-of-concept towards a SM-DP+ HTTP service for GSMA consumer eSIM RSP # # (C) 2023-2024 by Harald Welte laforge@osmocom.org +# (C) 2025 by Eric Wild ewild@sysmocom.de # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -126,6 +127,10 @@ from base64 import b64decode # noqa: E402 from klein import Klein # noqa: E402 from twisted.web.iweb import IRequest # noqa: E402 +from twisted.internet import ssl, reactor # noqa: E402 +from twisted.internet.endpoints import SSL4ServerEndpoint # noqa: E402 +from twisted.web.server import Site # noqa: E402 +from OpenSSL import SSL # noqa: E402
from osmocom.utils import h2b, b2h, swap_nibbles # noqa: E402
@@ -138,6 +143,8 @@ from pySim.esim import x509_err # noqa: E402 from datetime import datetime, timezone # noqa: E402 import hashlib # noqa: E402 +from pySim.esim.es2plus_commons import Es2PlusHelpers, Es2PlusProfileState, Es2PlusProfileStore # noqa: E402 +from pySim.esim.smdpp_common import ApiError, build_resp_header,validate_eid_range # noqa: E402
import logging # noqa: E402 logger = logging.getLogger(__name__) @@ -146,7 +153,6 @@ DATA_DIR = './smdpp-data' HOSTNAME = 'testsmdpplus1.example.com' # must match certificates!
- def b64encode2str(req: bytes) -> str: """Encode given input bytes as base64 and return result as string.""" return base64.b64encode(req).decode('ascii') @@ -166,182 +172,6 @@ if admin_protocol and not admin_protocol.startswith('gsma/rsp/v'): raise ApiError('1.2.2', '2.1', 'Unsupported X-Admin-Protocol version')
-def get_eum_certificate_variant(eum_cert) -> str: - """Determine EUM certificate variant by checking Certificate Policies extension. - Returns 'O' for old variant, or 'NEW' for Ov3/A/B/C variants.""" - - try: - cert_policies_ext = eum_cert.extensions.get_extension_for_oid( - x509.oid.ExtensionOID.CERTIFICATE_POLICIES - ) - - for policy in cert_policies_ext.value: - policy_oid = policy.policy_identifier.dotted_string - logger.debug(f"Found certificate policy: {policy_oid}") - - if policy_oid == '2.23.146.1.2.1.2': - logger.debug("Detected EUM certificate variant: O (old)") - return 'O' - elif policy_oid == '2.23.146.1.2.1.0.0.0': - logger.debug("Detected EUM certificate variant: Ov3/A/B/C (new)") - return 'NEW' - except x509.ExtensionNotFound: - logger.debug("No Certificate Policies extension found") - except Exception as e: - logger.debug(f"Error checking certificate policies: {e}") - -def parse_permitted_eins_from_cert(eum_cert) -> List[str]: - """Extract permitted IINs from EUM certificate using the appropriate method - based on certificate variant (O vs Ov3/A/B/C). - Returns list of permitted IINs (basically prefixes that valid EIDs must start with).""" - - # Determine certificate variant first - cert_variant = get_eum_certificate_variant(eum_cert) - permitted_iins = [] - - if cert_variant == 'O': - # Old variant - use nameConstraints extension - permitted_iins.extend(_parse_name_constraints_eins(eum_cert)) - - else: - # New variants (Ov3, A, B, C) - use GSMA permittedEins extension - permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert)) - - unique_iins = list(set(permitted_iins)) - - logger.debug(f"Total unique permitted IINs found: {len(unique_iins)}") - return unique_iins - -def _parse_gsma_permitted_eins(eum_cert) -> List[str]: - """Parse the GSMA permittedEins extension using correct ASN.1 structure. - PermittedEins ::= SEQUENCE OF PrintableString - Each string contains an IIN (Issuer Identification Number) - a prefix of valid EIDs.""" - permitted_iins = [] - - try: - permitted_eins_oid = x509.ObjectIdentifier('2.23.146.1.2.2.0') # sgp26: 2.23.146.1.2.2.0 = ASN1:SEQUENCE:permittedEins - - for ext in eum_cert.extensions: - if ext.oid == permitted_eins_oid: - logger.debug(f"Found GSMA permittedEins extension: {ext.oid}") - - # Get the DER-encoded extension value - ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value - - if isinstance(ext_der, bytes): - try: - permitted_eins_schema = """ - PermittedEins DEFINITIONS ::= BEGIN - PermittedEins ::= SEQUENCE OF PrintableString - END - """ - decoder = asn1tools.compile_string(permitted_eins_schema) - decoded_strings = decoder.decode('PermittedEins', ext_der) - - for iin_string in decoded_strings: - # Each string contains an IIN -> prefix of euicc EID - iin_clean = iin_string.strip().upper() - - # IINs is 8 chars per sgp22, var len according to sgp29, fortunately we don't care - if (len(iin_clean) == 8 and - all(c in '0123456789ABCDEF' for c in iin_clean) and - len(iin_clean) % 2 == 0): - permitted_iins.append(iin_clean) - logger.debug(f"Found permitted IIN (GSMA): {iin_clean}") - else: - logger.debug(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})") - except Exception as e: - logger.debug(f"Error parsing GSMA permittedEins extension: {e}") - - except Exception as e: - logger.debug(f"Error accessing GSMA certificate extensions: {e}") - - return permitted_iins - - -def _parse_name_constraints_eins(eum_cert) -> List[str]: - """Parse permitted IINs from nameConstraints extension (variant O).""" - permitted_iins = [] - - try: - # Look for nameConstraints extension - name_constraints_ext = eum_cert.extensions.get_extension_for_oid( - x509.oid.ExtensionOID.NAME_CONSTRAINTS - ) - - name_constraints = name_constraints_ext.value - - # Check permittedSubtrees for IIN constraints - if name_constraints.permitted_subtrees: - for subtree in name_constraints.permitted_subtrees: - - if isinstance(subtree, x509.DirectoryName): - for attribute in subtree.value: - # IINs for O in serialNumber - if attribute.oid == x509.oid.NameOID.SERIAL_NUMBER: - serial_value = attribute.value.upper() - # sgp22 8, sgp29 var len, fortunately we don't care - if (len(serial_value) == 8 and - all(c in '0123456789ABCDEF' for c in serial_value) and - len(serial_value) % 2 == 0): - permitted_iins.append(serial_value) - logger.debug(f"Found permitted IIN (nameConstraints/DN): {serial_value}") - - except x509.ExtensionNotFound: - logger.debug("No nameConstraints extension found") - except Exception as e: - logger.debug(f"Error parsing nameConstraints: {e}") - - return permitted_iins - - -def validate_eid_range(eid: str, eum_cert) -> bool: - """Validate that EID is within the permitted EINs of the EUM certificate.""" - if not eid or len(eid) != 32: - logger.debug(f"Invalid EID format: {eid}") - return False - - try: - permitted_eins = parse_permitted_eins_from_cert(eum_cert) - - if not permitted_eins: - logger.debug("Warning: No permitted EINs found in EUM certificate") - return False - - eid_normalized = eid.upper() - logger.debug(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs") - - for permitted_ein in permitted_eins: - if eid_normalized.startswith(permitted_ein): - logger.debug(f"EID {eid_normalized} matches permitted EIN {permitted_ein}") - return True - - logger.debug(f"EID {eid_normalized} is not in any permitted EIN list") - return False - - except Exception as e: - logger.debug(f"Error validating EID: {e}") - return False - -def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict: - r = {'subjectCode': subject_code, 'reasonCode': reason_code } - if subject_id: - r['subjectIdentifier'] = subject_id - if message: - r['message'] = message - return r - -def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_data = None) -> None: - # SGP.22 v3.0 6.5.1.4 - js['header'] = { - 'functionExecutionStatus': { - 'status': status, - } - } - if status_code_data: - js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data - - def ecdsa_tr03111_to_dss(sig: bytes) -> bytes: """convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those.""" assert len(sig) == 64 @@ -485,20 +315,24 @@ raise ApiError('8.1.3', '6.1', 'Certificate is invalid')
-class ApiError(Exception): - def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None, - subject_id: Optional[str] = None): - self.status_code = build_status_code(subject_code, reason_code, subject_id, message) - - def encode(self) -> str: - """Encode the API Error into a responseHeader string.""" - js = {} - build_resp_header(js, 'Failed', self.status_code) - return json.dumps(js) - class SmDppHttpServer: app = Klein()
+ def update_es2plus_profile_state(self, matching_id: str, new_state: str, eid: Optional[str] = None): + """Update ES2+ profile state based on ES9+ operations.""" + + profile = self.profile_store.find_by_matching_id(matching_id) + if profile: + profile.state = new_state + if eid and not profile.eid: + profile.eid = eid + if new_state == 'downloaded': + profile.download_attempts += 1 + self.profile_store[profile.iccid] = profile + logger.info(f"Updated ES2+ profile {profile.iccid} to state: {new_state}") + else: + logger.error(f"FAILED to updated ES2+ profile {matching_id} to state: {new_state}, eid {eid} - not found!") + @staticmethod def load_certs_from_path(path: str) -> List[x509.Certificate]: """Load all DER + PEM files from given directory path and return them as list of x509.Certificate @@ -555,6 +389,15 @@ "CC_REQUIRED_TEST": "12345678" # Special matchingId for confirmation code tests } if test_mode else {}
+ # ES2+ trusted operators: SKI -> operator info mapping + self.trusted_operators = {} + if test_mode: + logger.info("ES2+ test mode: will dynamically trust operator certificates") + else: + # Production mode would load from config, tbd + # self.trusted_operators = self._load_trusted_operators() or something like that + pass + # load DPauth cert + key self.dp_auth = CertAndPrivkey(oid.id_rspRole_dp_auth_v2) cert_dir = common_cert_path @@ -583,6 +426,19 @@ logger.info(f"Using file-based session storage: {db_path}") self.otpk_mapping = {} # Maps euicc_otpk -> (smdp_ot, smdp_otpk) for retry scenarios
+ # ES2+ specific: Profile inventory management using shelve-based store + if in_memory: + self.profile_store = Es2PlusProfileStore(in_memory=True) + logger.info("Using in-memory ES2+ profile storage") + else: + # Use different profile database files for BRP and NIST + profile_db_suffix = "BRP" if use_brainpool else "NIST" + profile_db_path = os.path.join(DATA_DIR, f"es2plus-profiles-{profile_db_suffix}") + self.profile_store = Es2PlusProfileStore(filename=profile_db_path, in_memory=False) + logger.info(f"Using file-based ES2+ profile storage: {profile_db_path}") + + self.sm_ds_events = {} # For tracking SM-DS event registrations + # Initialize profile configurations for test cases if test_mode: self._init_test_profiles() @@ -593,21 +449,13 @@ self.default_profiles = {}
def _init_test_profiles(self): - """Initialize test profiles for different use cases.""" + """Initialize test profiles for different use cases. + NOTE: In the unified es2p/es9p architecture ES2+ profiles in profile_store always + take precedence when looking up by matching ID. Static profiles are only used as fallback. + """ # Activation code profiles - print("INIT: Initializing test profiles...") + print("INIT: Initializing static test profiles (secondary to ES2+ profiles)...") self.activation_code_profiles = { - 'TEST123': { - 'matchingId': 'TEST123', - 'confirmationCode': '12345678', # 8-digit numeric code - 'iccid': '8900000000000000001F', - 'profileName': 'Test Profile 1', - 'state': 'released', - 'download_attempts': 0, - 'cc_attempts': 0, - 'associated_eid': None, - 'expiration': None - }, 'AC_NOT_RELEASED': { 'matchingId': 'AC_NOT_RELEASED', 'confirmationCode': '87654321', @@ -701,17 +549,6 @@ 'associated_eid': '89888888888888888888888888888888', # Different EID 'expiration': None }, - 'MATCHING_ID_1': { - 'matchingId': 'MATCHING_ID_1', - 'confirmationCode': None, - 'iccid': '8900000000000000017F', - 'profileName': 'Test Activation Code Profile', - 'state': 'released', - 'download_attempts': 0, - 'cc_attempts': 0, - 'associated_eid': '89049032123451234512345678901235', - 'expiration': None - }, 'CC_REQUIRED_TEST': { 'matchingId': 'CC_REQUIRED_TEST', 'confirmationCode': '12345678', # Requires confirmation code @@ -739,28 +576,6 @@ 'associated_eid': None, 'expiration': None }, - 'UNMATCHED_EVENT': { - 'matchingId': 'UNMATCHED_EVENT', - 'confirmationCode': '99887766', - 'iccid': '8900000000000000006F', - 'profileName': 'Unmatched Event Profile', - 'state': 'released', - 'download_attempts': 0, - 'cc_attempts': 0, - 'associated_eid': '89001012012341234012345678901224', # Different EID - 'expiration': None - }, - 'EVENT_NORMAL': { - 'matchingId': 'EVENT_NORMAL', - 'confirmationCode': None, - 'iccid': '8900000000000000014F', - 'profileName': 'Normal SM-DS Event', - 'state': 'released', - 'download_attempts': 0, - 'cc_attempts': 0, - 'associated_eid': None, - 'expiration': None - }, 'EVENT_RESTRICTED': { 'matchingId': 'EVENT_RESTRICTED', 'confirmationCode': None, @@ -787,15 +602,6 @@
# Default SM-DP+ profiles (associated with specific EIDs) self.default_profiles = { - '89001012012341234012345678901234': { # Test EID - 'confirmationCode': '12345678', - 'iccid': '8900000000000000007F', - 'profileName': 'Default Profile for Test EID', - 'state': 'released', - 'download_attempts': 0, - 'cc_attempts': 0, - 'expiration': None - }, '89049032123451234512345678901235': { # EID1 from test specs 'confirmationCode': None, 'iccid': '8900000000000000020F', @@ -807,6 +613,21 @@ }, }
+ # Initialize ES2+ profile inventory for test mode + test_profiles = [ + ('8900000000000000001', 'Test', 'S_MNO'), # ICCID_OP_PROF1 + ('8900000000000000002', 'Test', 'S_MNO'), # ICCID_OP_PROF2 + ('8900000000000000003', 'Test', 'S_MNO'), # Additional test profiles + ('8900000000000000004', 'Test', 'S_MNO'), + ('8900000000000000005', 'Test', 'S_MNO'), + ] + + for iccid, profile_type, owner in test_profiles: + if iccid not in self.profile_store: + profile = Es2PlusProfileState(iccid, profile_type, owner) + self.profile_store[iccid] = profile + logger.debug(f"Initialized ES2+ test profile: {iccid}") + @app.handle_errors(ApiError) def handle_apierror(self, request: IRequest, failure): request.setResponseCode(200) @@ -836,7 +657,7 @@ set_headers(request)
output = func(self, request, content) - if output == None: + if output is None: return ''
build_resp_header(output) @@ -844,6 +665,78 @@ return json.dumps(output) return _api_wrapper
+ @staticmethod + def es2plus_api_wrapper(func): + """Wrapper for ES2+ API endpoints that require mutual TLS authentication.""" + @functools.wraps(func) + def _api_wrapper(self, request: IRequest): + # Check if we're running with SSL (client certs only work with SSL, duh) + transport = request.transport + authenticated = False + + if hasattr(transport, 'getPeerCertificate'): + peer_cert = transport.getPeerCertificate() + if peer_cert: + subject = peer_cert.get_subject() + logger.debug(f"ES2+ Client certificate subject: CN={subject.CN}, O={subject.O}") + + ski = None + try: + ski_ext = peer_cert.to_cryptography().extensions.get_extension_for_oid(ExtensionOID.SUBJECT_KEY_IDENTIFIER) + ski_bytes = ski_ext.value.key_identifier + ski = ':'.join(f'{b:02X}' for b in ski_bytes) + logger.info(f"Client certificate SKI: {ski}") + except Exception as e: + logger.debug(f"Could not extract SKI: {e}") + + # In test mode, dynamically trust any certificate + if self.test_mode and ski: + if ski not in self.trusted_operators: + operator_name = subject.O if subject.O else subject.CN + self.trusted_operators[ski] = { + 'name': operator_name, + 'subject': str(subject), + 'added': datetime.now() + } + logger.info(f"Test mode: dynamically trusted operator {operator_name} with SKI {ski}") + + # Check if operator is trusted + if ski and ski in self.trusted_operators: + operator_info = self.trusted_operators[ski] + request.authenticated_entity = operator_info['name'] + request.operator_ski = ski + logger.info(f"ES2+ authenticated entity: {request.authenticated_entity}") + authenticated = True + else: + logger.error(f"Client certificate not trusted (SKI: {ski})") + raise ApiError('1.1', '1.2', 'Client certificate not trusted') + else: + logger.error("No client certificate provided") + raise ApiError('1.1', '1.2', 'Client certificate required for ES2+ API') + else: + logger.error("SSL transport does not support getPeerCertificate") + # Still might not be authenticated at this point, wtf am I doing here? + + if not authenticated: + logger.error("No SSL transport available for client certificate") + raise ApiError('1.1', '1.2', 'Client certificate required for ES2+ API') + + # Continue normal request processing + validate_request_headers(request) + + content = json.loads(request.content.read()) + logger.debug("ES2+ Rx JSON: %s" % json.dumps(content)) + set_headers(request) + + output = func(self, request, content) + if output is None: + return '' + + build_resp_header(output) + logger.debug("ES2+ Tx JSON: %s" % json.dumps(output)) + return json.dumps(output) + return _api_wrapper + @app.route('/gsma/rsp2/es9plus/initiateAuthentication', methods=['POST']) @rsp_api_wrapper def initiateAuthentication(self, request: IRequest, content: dict) -> dict: @@ -1150,7 +1043,32 @@ elif self.test_mode and matchingId.startswith('EVENT_'): # SM-DS event-based use case (test mode only) logger.debug(f"SM-DS event use case with matchingId: {matchingId}") - if matchingId in self.event_based_profiles: + + # Check ES2+ profiles first + es2plus_profile = None + profile_info = None + iccid_str = None + + es2plus_profile = self.profile_store.find_by_matching_id(matchingId) + if es2plus_profile and es2plus_profile.state in ['released', 'confirmed']: + logger.info(f"Found ES2+ event profile for matchingId {matchingId}") + # Check EID association if exists + if es2plus_profile.eid and es2plus_profile.eid != ss.eid: + raise ApiError('8.1.1', '3.8', 'EID doesn't match the expected value') + iccid_str = es2plus_profile.iccid + ss.matchingId = matchingId + profile_info = { + 'matchingId': matchingId, + 'confirmationCode': None, + 'iccid': es2plus_profile.iccid, + 'state': es2plus_profile.state, + 'download_attempts': es2plus_profile.download_attempts, + 'associated_eid': es2plus_profile.eid, + 'profile_path': 'TS48v4_SAIP2.3_BERTLV' + } + + # Fallback to static event profiles + if not profile_info and matchingId in self.event_based_profiles: profile_info = self.event_based_profiles[matchingId] # Check if profile is associated with specific EID if profile_info.get('associated_eid') and profile_info['associated_eid'] != ss.eid: @@ -1158,15 +1076,53 @@ raise ApiError('8.1.1', '3.8', 'EID doesn't match the expected value') iccid_str = profile_info['iccid'] ss.matchingId = matchingId - else: + + if not profile_info: # SGP.22 Table 49: MatchingID - Refused - raise ApiError('8.2.6', '3.8', 'MatchingID (AC_Token or EventID) is refused') + raise ApiError('8.2.6', '3.8', f'MatchingID (AC_Token or EventID) {matchingId} is refused')
else: # Activation code use case logger.debug(f"Activation code use case with matchingId: {matchingId}") - logger.debug(f"Available activation codes: {list(self.activation_code_profiles.keys())}") - if self.test_mode and matchingId in self.activation_code_profiles: + + # Always check ES2+ profile store first + es2plus_profile = None + profile_info = None + iccid_str = None + + es2plus_profile = self.profile_store.find_by_matching_id(matchingId) + if es2plus_profile: + logger.info(f"Found ES2+ profile for matchingId {matchingId}: ICCID={es2plus_profile.iccid}, state={es2plus_profile.state}") + + # Validate profile state - both 'released' and 'confirmed' are valid for download + if es2plus_profile.state not in ['released', 'confirmed']: + logger.warning(f"ES2+ profile {es2plus_profile.iccid} in state {es2plus_profile.state}, not ready for download") + # Don't raise yet, fall through to check static profiles + else: + if es2plus_profile.eid and es2plus_profile.eid != ss.eid: + # SGP.22 Table 49: EID - Refused + raise ApiError('8.1.1', '3.8', 'EID doesn't match the expected value') + + # Use the ES2+ profile + iccid_str = es2plus_profile.iccid + ss.matchingId = matchingId + + # Wrap as profile_info + profile_info = { + 'matchingId': matchingId, + 'confirmationCode': None, + 'iccid': es2plus_profile.iccid, + 'profileName': f'ES2+ Profile {es2plus_profile.iccid}', + 'state': es2plus_profile.state, + 'download_attempts': es2plus_profile.download_attempts, + 'associated_eid': es2plus_profile.eid, + 'profile_path': 'TS48v4_SAIP2.3_BERTLV' # Use test profile file + } + logger.info(f"Using ES2+ profile {es2plus_profile.iccid} for ES9+ download") + + # Fallback to static test profiles if no ES2+ profile found + if not profile_info and self.test_mode and matchingId in self.activation_code_profiles: + logger.debug("No ES2+ profile ready, checking static test profiles") profile_info = self.activation_code_profiles[matchingId] # Check if profile is associated with specific EID if profile_info.get('associated_eid') and profile_info['associated_eid'] != ss.eid: @@ -1174,12 +1130,14 @@ raise ApiError('8.1.1', '3.8', 'EID doesn't match the expected value') iccid_str = profile_info['iccid'] ss.matchingId = matchingId - else: + logger.info(f"Using static test profile for matchingId {matchingId}") + + # If still no profile found, check filesystem (prod mode) + if not profile_info: path = os.path.join(self.upp_dir, matchingId) + '.der' - # prevent directory traversal attack if os.path.commonprefix((os.path.realpath(path),self.upp_dir)) != self.upp_dir: # SGP.22 Table 49: MatchingID - Refused - raise ApiError('8.2.6', '3.8', 'MatchingID (AC_Token or EventID) is refused') + raise ApiError('8.2.6', '3.8', f'MatchingID {matchingId} not found (no ES2+ profile or static profile)') if os.path.isfile(path) and os.access(path, os.R_OK): with open(path, 'rb') as f: pes = saip.ProfileElementSequence.from_der(f.read()) @@ -1189,11 +1147,12 @@ profile_info = { 'confirmationCode': self.confirmation_codes.get(matchingId), 'state': 'released', - 'profileName': matchingId + 'profileName': matchingId, + 'iccid': iccid_str } else: # SGP.22 Table 49: MatchingID - Refused - raise ApiError('8.2.6', '3.8', 'MatchingID (AC_Token or EventID) is refused') + raise ApiError('8.2.6', '3.8', f'MatchingID {matchingId} not found (no profile file)')
# Validate profile state and other conditions if profile_info: @@ -1227,6 +1186,10 @@ if self.test_mode and (not matchingId or matchingId == ''): ss.ccRequiredFlag = False logger.info("ccRequiredFlag=False for omitted/empty matchingId (default SM-DP+ use case)") + elif es2plus_profile and es2plus_profile.confirmation_code_hash: + ss.ccRequiredFlag = True + ss.es2plus_cc_hash = es2plus_profile.confirmation_code_hash + logger.info("Set ccRequiredFlag=True for ES2+ profile with CC hash") elif profile_info.get('confirmationCode'): ss.ccRequiredFlag = True ss.expected_confirmation_code = profile_info['confirmationCode'] @@ -1251,8 +1214,37 @@ # enable notifications for all operations for event in ['enable', 'disable', 'delete']: ss.profileMetadata.add_notification(event, self.server_hostname) + + # Check if we need larger metadata for test purposes + # SGP.23 Test Sequence #06 requires metadata split over 2 segments for PPK tests + need_large_metadata = False + + # heuristic: if the profile is one of our test profiles and the ICCID ends in certain patterns embiggen it + if self.test_mode and iccid_str and ( + iccid_str.endswith('100') or # Common test ICCID pattern + iccid_str.endswith('20F') or # Another test pattern + 'test' in profile_name.lower() # Test profile name + ): + # This doesn't hurt non-split tests as they'll just use the full metadata + need_large_metadata = True + logger.info(f"Test profile detected (ICCID: {iccid_str}) - creating large metadata for potential split testing") + + # Add a large icon to ensure metadata can split + # Create garbage dummy icon data that will push metadata over 1008 bytes + icon_data = b'\x89PNG\r\n\x1a\n' + b'\x00' * 950 # PNG header + padding + ss.profileMetadata.set_icon(True, icon_data) + + # Also add more notifications because we can + for i in range(10): + ss.profileMetadata.add_notification('install', f'le-test-notification-serveur-{i}.example.fr') + if i % 3 == 0: + ss.profileMetadata.add_notification('delete', f'test-paglilinis-server-{i}.example.ph') + profileMetadata_bin = ss.profileMetadata.gen_store_metadata_request()
+ if need_large_metadata: + logger.info(f"Metadata size with test padding: {len(profileMetadata_bin)} bytes (will split if needed)") + # Put together smdpSigned2 + _bin cc_flag = getattr(ss, 'ccRequiredFlag', False) logger.info(f"Setting ccRequiredFlag in SmdpSigned2: {cc_flag}") @@ -1271,6 +1263,11 @@
# update non-volatile state with updated ss object self.rss[transactionId] = ss + + # Update ES2+ profile state if applicable + if hasattr(ss, 'matchingId') and ss.matchingId: + self.update_es2plus_profile_state(ss.matchingId, 'downloading', ss.eid) + return { 'transactionId': transactionId, 'profileMetadata': b64encode2str(profileMetadata_bin), @@ -1363,10 +1360,18 @@ raise ApiError('8.2.7', '2.2', 'Confirmation Code is missing')
received_hash = euiccSigned2['hashCc'] - expected_hash = compute_confirmation_code_hash( - getattr(ss, 'expected_confirmation_code', ''), - h2b(transactionId) - ) + + if hasattr(ss, 'es2plus_cc_hash'): + # ES2+ verification: ES2+ stores SHA256(confirmationCode) + # Needs some seasoning: SHA256(stored_hash | TransactionID) + expected_hash = hashlib.sha256(ss.es2plus_cc_hash + h2b(transactionId)).digest() + logger.debug("ES2+ CC verification using stored hash with transactionId") + else: + # Regular ES9+ verification with expected_confirmation_code + expected_hash = compute_confirmation_code_hash( + getattr(ss, 'expected_confirmation_code', ''), + h2b(transactionId) + )
if received_hash != expected_hash: logger.debug("Confirmation code verification failed") @@ -1379,9 +1384,28 @@ upp_fname = ss.matchingId if self.test_mode: try: - profile_info = self.activation_code_profiles[ss.matchingId] - upp_fname = profile_info.get('profile_path', ss.matchingId) - except: + # Unified profile lookup for determining profile file path + if ss.matchingId: + es2plus_profile = self.profile_store.find_by_matching_id(ss.matchingId) + if es2plus_profile: + # ES2+ profiles always use test profile in test mode + upp_fname = 'TS48v4_SAIP2.3_BERTLV' + logger.debug(f"Using test profile file for ES2+ matchingId {ss.matchingId}") + elif ss.matchingId in self.activation_code_profiles: + # Fallback to static profiles + profile_info = self.activation_code_profiles[ss.matchingId] + upp_fname = profile_info.get('profile_path', ss.matchingId) + logger.debug(f"Using static profile path for matchingId {ss.matchingId}") + else: + # Default to matchingId as filename + upp_fname = ss.matchingId + logger.debug(f"Using default path (matchingId) for {ss.matchingId}") + else: + # No no matchingId?! + profile_info = self.activation_code_profiles.get(ss.matchingId, {}) + upp_fname = profile_info.get('profile_path', ss.matchingId) if profile_info else ss.matchingId + except Exception as e: + logger.debug(f"Error getting profile path: {e}") pass with open(os.path.join(self.upp_dir, upp_fname)+'.der', 'rb') as f: upp = UnprotectedProfilePackage.from_der(f.read(), metadata=ss.profileMetadata) @@ -1431,6 +1455,15 @@ logger.error('ECDSA signature verification failed on notification') return None # Will return HTTP 204 with empty body logger.debug("Profile Installation Final Result: %s", pird['finalResult']) + + # Update ES2+ profile state based on final result + if hasattr(ss, 'matchingId') and ss.matchingId: + if pird['finalResult']['profileInstallationResult'] == 'profileInstallOk': + self.update_es2plus_profile_state(ss.matchingId, 'installed', ss.eid) + else: + # Installation failed -> profile can be downloaded again + self.update_es2plus_profile_state(ss.matchingId, 'released', ss.eid) + # remove session state del self.rss[transactionId] elif pendingNotification[0] == 'otherSignedNotification': @@ -1470,7 +1503,7 @@
@app.route('/gsma/rsp2/es9plus/cancelSession', methods=['POST']) @rsp_api_wrapper - def cancelSession(self, request: IRequest, content: dict) -> dict: + def cancelSession(self, request: IRequest, content: dict) -> dict | None: """See ES9+ CancelSession in SGP.22 Section 5.6.5""" logger.debug("Rx JSON: %s" % content) transactionId = content['transactionId'] @@ -1527,11 +1560,306 @@ # TODO: 2. Terminate the corresponding pending download process. # TODO: 3. If required, execute the SM-DS Event Deletion procedure described in section 3.6.3.
+ # Update ES2+ profile state back to released if applicable + if hasattr(ss, 'matchingId') and ss.matchingId: + es2plus_profile = self.profile_store.find_by_matching_id(ss.matchingId) + if es2plus_profile: + # Execute SM-DS Event Deletion procedure (SGP.22 section 3.6.3) if needed + Es2PlusHelpers.handle_sm_ds_event_deletion(es2plus_profile, self.sm_ds_events) + + # Update ES2+ profile state back to 'released' since session was cancelled + self.update_es2plus_profile_state(ss.matchingId, 'released', ss.eid) + else: + logger.warning(f"Could not find ES2+ profile for matchingId {ss.matchingId} during cancelSession") + # delete actual session data del self.rss[transactionId] # Per SGP.22 section 6.5.2.10, cancelSession returns an empty response (header only) return {}
+ # ES2+ Interface Implementation + + @app.route('/gsma/rsp2/es2plus/downloadOrder', methods=['POST']) + @es2plus_api_wrapper + def downloadOrder(self, request: IRequest, content: dict) -> dict: + """See ES2+ DownloadOrder in SGP.22 Section 5.3.1""" + # Extract parameters + eid = content.get('eid') + iccid = content.get('iccid') + profileType = content.get('profileType') + + # Validate that at least one of iccid or profileType is provided + if not iccid and not profileType: + raise ApiError('2.1', '2.2', 'Either iccid or profileType must be provided') + + # If ICCID is provided, validate and reserve it + if iccid: + iccid, profile = Es2PlusHelpers.normalize_and_validate_iccid(iccid, self.profile_store) + + Es2PlusHelpers.check_authorization(request, profile, iccid, self.test_mode, self.profile_store) + + # already in use? + if profile.state != 'available': + # SGP.22 Table: Profile ICCID - Already in Use + raise ApiError('8.2.1', '3.3', 'Already in use', iccid) + + # Reserve the ICCID + if eid: + profile.state = 'linked' + profile.eid = eid + else: + profile.state = 'allocated' + + # Save updated profile + self.profile_store[iccid] = profile + reserved_iccid = iccid + + else: + # ProfileType provided without ICCID -> find available profile + authenticated_entity = getattr(request, 'authenticated_entity', 'S_MNO') + + # First try to find existing available profile of this type owned by this operator + profile = self.profile_store.find_available_by_type(profileType, authenticated_entity) + + if profile: + if eid: + profile.state = 'linked' + profile.eid = eid + else: + profile.state = 'allocated' + self.profile_store[profile.iccid] = profile + reserved_iccid = profile.iccid + elif profileType == 'Test' and self.test_mode: + # Create new profile for this type in test mode + test_iccid_base = 8900000000000000100 + reserved_iccid = None + for i in range(100): + candidate_iccid = str(test_iccid_base + i) + if candidate_iccid not in self.profile_store: + # Create new profile owned by the authenticated operator + new_profile = Es2PlusProfileState(candidate_iccid, profileType, authenticated_entity) + new_profile.state = 'allocated' if not eid else 'linked' + if eid: + new_profile.eid = eid + self.profile_store[candidate_iccid] = new_profile + reserved_iccid = candidate_iccid + break + + if not reserved_iccid: + # SGP.22 Table: Profile Type - Unavailable + raise ApiError('8.2.5', '3.7', 'No profiles available for type', profileType) + else: + # SGP.22 Table: Profile Type - Unknown + raise ApiError('8.2.5', '3.9', 'Unknown profile type', profileType) + + return { + 'iccid': reserved_iccid + } + + @app.route('/gsma/rsp2/es2plus/confirmOrder', methods=['POST']) + @es2plus_api_wrapper + def confirmOrder(self, request: IRequest, content: dict) -> dict: + """See ES2+ ConfirmOrder in SGP.22 Section 5.3.2""" + # mandatory parameters + releaseFlag = content['releaseFlag'] + + # optional parameters + eid = content.get('eid') + matchingId = content.get('matchingId') + confirmationCode = content.get('confirmationCode') + smdsAddress = content.get('smdsAddress') + + iccid, profile = Es2PlusHelpers.normalize_and_validate_iccid(content['iccid'], self.profile_store) + + Es2PlusHelpers.check_authorization(request, profile, iccid, self.test_mode, self.profile_store) + + # Validate profile state - must be allocated or linked + if profile.state not in ['allocated', 'linked']: + # Profile already confirmed/released or in wrong state + if profile.state in ['confirmed', 'released']: + # For idempotency, if parameters match, return success + if (profile.matching_id == matchingId and + profile.eid == eid): + response = {'matchingId': profile.matching_id} + if profile.eid: + response['eid'] = profile.eid + return response + # Otherwise it's an error + raise ApiError('8.2.1', '3.5', 'Invalid state transition', iccid) + + # If SM-DS address provided, EID is mandatory + if smdsAddress and not eid: + # SGP.22 Table: EID - Mandatory Element Missing + raise ApiError('8.1.1', '2.2', 'EID required when SM-DS address provided') + + # If both DownloadOrder and ConfirmOrder have EID, they must match + if profile.eid and eid and profile.eid != eid: + # SGP.22 Table: EID - Invalid Association + raise ApiError('8.1.1', '3.10', 'Different EID already associated') + + # Check for empty matchingId with no EID - error case per SGP.23 test + # fix bug for ES2+ tests: empty matchingId requires EID to be present + if 'matchingId' in content and matchingId == '' and not eid and not profile.eid: + # SGP.22 Table: EID - Mandatory Element Missing (when matchingId is empty) + raise ApiError('8.1.1', '2.2', 'EID required when matchingId is empty') + + # Generate matchingId if not provided + if not matchingId: + import uuid + matchingId = f"MID-{uuid.uuid4().hex[:12].upper()}" + + # Check matchingId uniqueness + if content.get('matchingId'): + for other_iccid, other_profile in self.profile_store.items(): + if (other_iccid != iccid and + other_profile.matching_id == matchingId and + other_profile.state in ['confirmed', 'released']): + # SGP.22 Table: Matching ID - Already in Use + raise ApiError('8.2.6', '3.3', 'MatchingID already in use') + + profile.matching_id = matchingId + if eid: + profile.eid = eid + + if confirmationCode: + cc_bytes = h2b(confirmationCode) + cc_hash = hashlib.sha256(cc_bytes).digest() + profile.confirmation_code_hash = cc_hash + + # Handle SM-DS reg if needed + if smdsAddress: + if not matchingId or matchingId == '': + # Can't register with SM-DS without valid matchingId + raise ApiError('8.2.6', '2.2', 'MatchingID cannot be empty with SM-DS') + + # Validate SM-DS address format and accessibility + # In test mode, pretend - reject specific invalid addresses + # fix bug for ES2+ tests: validate SM-DS address format + # TODO: uh.. do this. + if smdsAddress == 'invalid.smds.address' or not smdsAddress or smdsAddress.startswith('.') or smdsAddress.endswith('.'): + # SGP.22 Table: SM-DS Address - Inaccessible + raise ApiError('8.9', '5.1', f'SM-DS address inaccessible: {smdsAddress}') + + profile.sm_ds_address = smdsAddress + + if releaseFlag: + Es2PlusHelpers.handle_sm_ds_event_registration(profile, self.sm_ds_events, self.server_hostname) + + profile.state = 'released' if releaseFlag else 'confirmed' + self.profile_store[iccid] = profile + + if releaseFlag: + logger.info(f"ES2+ profile {iccid} released with matchingId {matchingId}, ready for ES9+ download") + + response = { + 'matchingId': matchingId + } + + # Include EID if bound to order + if profile.eid: + response['eid'] = profile.eid + + # smdpAddress is optional in response which is great because one way or another this will break something + # response['smdpAddress'] = self.server_hostname + + return response + + @app.route('/gsma/rsp2/es2plus/cancelOrder', methods=['POST']) + @es2plus_api_wrapper + def cancelOrder(self, request: IRequest, content: dict) -> dict: + """See ES2+ CancelOrder in SGP.22 Section 5.3.3""" + # mandatory parameters + finalProfileStatusIndicator = content['finalProfileStatusIndicator'] + + # conditional parameters + eid = content.get('eid') + matchingId = content.get('matchingId') + + iccid, profile = Es2PlusHelpers.normalize_and_validate_iccid(content['iccid'], self.profile_store) + + Es2PlusHelpers.check_authorization(request, profile, iccid, self.test_mode, self.profile_store) + + # Check if profile already downloaded + if profile.state in ['downloaded', 'installed']: + # SGP.22 Table: Profile ICCID - Already in Use + raise ApiError('8.2.1', '3.3', 'Profile already downloaded', iccid) + + # Check if profile is in a cancellable state + if profile.state == 'available': + # Can't cancel a profile that hasn't been ordered + raise ApiError('8.2.1', '3.5', 'Invalid state transition - profile not ordered', iccid) + + Es2PlusHelpers.validate_eid_association(profile, eid, iccid) + + Es2PlusHelpers.validate_matching_id_association(profile, matchingId) + + Es2PlusHelpers.handle_sm_ds_event_deletion(profile, self.sm_ds_events) + + if finalProfileStatusIndicator == 'Available': + Es2PlusHelpers.reset_profile_to_available(profile) + else: # 'Unavailable' + profile.state = 'unavailable' + + self.profile_store[iccid] = profile + + return {} + + @app.route('/gsma/rsp2/es2plus/releaseProfile', methods=['POST']) + @es2plus_api_wrapper + def releaseProfile(self, request: IRequest, content: dict) -> dict: + """See ES2+ ReleaseProfile in SGP.22 Section 5.3.4""" + # spec marks this as FFS (For Further Study) + + iccid, profile = Es2PlusHelpers.normalize_and_validate_iccid(content['iccid'], self.profile_store) + + Es2PlusHelpers.check_authorization(request, profile, iccid, self.test_mode, self.profile_store) + + # Verify profile has been through DownloadOrder and ConfirmOrder + if profile.state not in ['confirmed']: + if profile.state == 'released': + # Already released - idempotent + return {} + # SGP.22 Table: Profile ICCID - Invalid transition + raise ApiError('8.2.1', '3.5', 'Profile cannot be released from current state', iccid) + + profile.state = 'released' + + Es2PlusHelpers.handle_sm_ds_event_registration(profile, self.sm_ds_events, self.server_hostname) + + self.profile_store[iccid] = profile + + return {} + + @app.route('/gsma/rsp2/es2plus/handleDownloadProgressInfo', methods=['POST']) + @es2plus_api_wrapper + def handleDownloadProgressInfo(self, request: IRequest, content: dict) -> dict: + """See ES2+ HandleDownloadProgressInfo in SGP.22 Section 5.3.5""" + # spec marks this as FFS (For Further Study) + # This is a one way notification from SM-DP+ to Operator, no response + + iccid = content['iccid'] + profileType = content['profileType'] + timestamp = content['timestamp'] + notificationPointId = content['notificationPointId'] + notificationPointStatus = content['notificationPointStatus'] + + # Optional parameters + eid = content.get('eid') + resultData = content.get('resultData') + + logger.info(f"Download Progress Notification - ICCID: {iccid}, EID: {eid}, " + f"ProfileType: {profileType}, Timestamp: {timestamp}, " + f"Point: {notificationPointId}, Status: {notificationPointStatus}") + + # OP should process resultData.. probably + if resultData: + logger.debug(f"Result data received: {resultData}") + + # probably something involving es12 here + + # Per SGP.22, this returns standard success response (empty body) + return {} +
def main(argv): parser = argparse.ArgumentParser() @@ -1551,7 +1879,7 @@ logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
common_cert_path = os.path.join(DATA_DIR, args.certdir) - hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=args.brainpool, test_mode=args.test) + hs = SmDppHttpServer(server_hostname=HOSTNAME, ci_certs_path=os.path.join(common_cert_path, 'CertificateIssuer'), common_cert_path=common_cert_path, use_brainpool=args.brainpool, in_memory=args.in_memory, test_mode=args.test) if(args.nossl): hs.app.run(args.host, args.port) else: @@ -1580,10 +1908,84 @@ with open(cert_pempath, 'wb') as pem_file: pem_file.write(pem_cert)
- SERVER_STRING = f'ssl:{args.port}:privateKey={cert_skpath}:certKey={cert_pempath}:dhParameters={dhparam_path}' - print(SERVER_STRING)
- hs.app.run(host=HOSTNAME, port=args.port, endpoint_description=SERVER_STRING) + # Create custom SSL context factory with client certificate verification + class ClientCertContextFactory(ssl.DefaultOpenSSLContextFactory): + """SSL Context Factory for ES2+ with SKI-based client certificate trust.""" + + def __init__(self, key_path, cert_path, sm_dp_server): + super().__init__(key_path, cert_path) + self.sm_dp_server = sm_dp_server + + def getContext(self): + ctx = SSL.Context(SSL.TLS_METHOD) # Supports TLS 1.2+ + + # Load server certificate and private key + ctx.use_certificate_file(self.certificateFileName) + ctx.use_privatekey_file(self.privateKeyFileName) + + # For ES2+, we request client certificates but don't require them at SSL level + # The actual requirement is enforced in the es2plus_api_wrapper + flags = SSL.VERIFY_PEER # Request client certificate + + ctx.set_verify(flags, self._verify_callback) + ctx.set_verify_depth(10) + + # moah segguriddy + ctx.set_options( + SSL.OP_NO_SSLv2 | + SSL.OP_NO_SSLv3 | + SSL.OP_NO_COMPRESSION + ) + + return ctx + + def _verify_callback(self, connection, x509, errnum, errdepth, ok): + """ + Verify callback for ES2+ client certificates. + We don't validate against CAs - instead we check SKI-based trust. + """ + subject = x509.get_subject() + cn = getattr(subject, 'commonName', None) + org = getattr(subject, 'organizationName', None) + + if errdepth == 0: # Client certificate (not intermediate/root) + logger.info(f"[ES2+ VERIFY] Client cert: CN={cn}, O={org}, ok={ok}, errnum={errnum}") + + ski = None + try: + ski_ext = x509.to_cryptography().extensions.get_extension_for_oid(ExtensionOID.SUBJECT_KEY_IDENTIFIER) + ski_bytes = ski_ext.value.key_identifier + ski = ':'.join(f'{b:02X}' for b in ski_bytes) + logger.info(f"[ES2+ VERIFY] Client SKI: {ski}") + except Exception as e: + logger.debug(f"[ES2+ VERIFY] Could not extract SKI: {e}") + + # For ES2+ accept all certificates at SSL level, the validation happens in es2plus_api_wrapper so we can respond with proper API error responses + if not ok: + logger.debug(f"[ES2+ VERIFY] Certificate verification failed (errnum={errnum}), but accepting for API-level handling") + + return True # Always accept at SSL level! I mean it. + + # For intermediate/root certs, accept based on OpenSSL validation + return ok + + # Create the SSL context factory following klssl.py pattern + contextFactory = ClientCertContextFactory( + str(cert_skpath), # key path as string + str(cert_pempath), # cert path as string + hs # sm_dp_server instance + ) + + endpoint = SSL4ServerEndpoint(reactor, int(args.port), contextFactory, interface=HOSTNAME) + + site = Site(hs.app.resource()) + endpoint.listen(site) + + print(f"SM-DP+ server listening on https://%7BHOSTNAME%7D:%7Bargs.port%7D") + print("ES2+ endpoints require client certificates (SKI-based trust)") + + reactor.run()
if __name__ == "__main__": main(sys.argv) diff --git a/pySim/esim/es2plus_commons.py b/pySim/esim/es2plus_commons.py new file mode 100644 index 0000000..ca73fe7 --- /dev/null +++ b/pySim/esim/es2plus_commons.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +# Common validation and utility functions for ES2+ API endpoints. +# +# (C) 2025 by Eric Wild ewild@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import shelve +import logging +import time +from typing import Optional, Tuple +from .smdpp_common import ApiError + +logger = logging.getLogger(__name__) + +class Es2PlusProfileState: + """Encapsulates the state of an ES2+ profile. Tracks profile lifecycle from available through installed.""" + def __init__(self, iccid: str, profile_type: str = 'Generic', owner: str = 'Unknown'): + self.iccid = iccid + self.profile_type = profile_type + self.owner = owner + self.state = 'available' # available, allocated, linked, confirmed, released, downloaded, installed, unavailable + self.eid: Optional[str] = None + self.matching_id: Optional[str] = None + self.confirmation_code_hash: Optional[bytes] = None + self.sm_ds_address: Optional[str] = None + self.download_attempts: int = 0 + self.created_timestamp = time.time() + self.last_modified = time.time() + + def __getstate__(self): + """Helper for pickling to persistent storage.""" + state = self.__dict__.copy() + # All current attributes are pickle-able, but let's be prepared + return state + + def __setstate__(self, state): + """Helper for unpickling from persistent storage.""" + self.__dict__.update(state) + +class Es2PlusProfileStore: + """Database-backed storage for ES2+ profile states using shelve, similar to RspSessionStore.""" + + def __init__(self, filename: Optional[str] = None, in_memory: bool = False): + self._in_memory = in_memory + + if in_memory: + self._shelf = shelve.Shelf(dict()) + else: + if filename is None: + raise ValueError("filename is required for file-based profile store") + self._shelf = shelve.open(filename) + + # Dictionary-like interface + def __getitem__(self, key): + return self._shelf[key] + + def __setitem__(self, key, value): + value.last_modified = time.time() + self._shelf[key] = value + + def __delitem__(self, key): + del self._shelf[key] + + def __contains__(self, key): + return key in self._shelf + + def __iter__(self): + return iter(self._shelf) + + def __len__(self): + return len(self._shelf) + + # everything else + def __getattr__(self, name): + """Delegate attribute access to the underlying shelf object.""" + return getattr(self._shelf, name) + + def close(self): + """Close the session store.""" + if hasattr(self._shelf, 'close'): + self._shelf.close() + if self._in_memory: + # For in-memory store, clear the reference + self._shelf = None + + def sync(self): + """Synchronize the cache with the underlying storage.""" + if hasattr(self._shelf, 'sync'): + self._shelf.sync() + + def find_by_matching_id(self, matching_id: str) -> Optional[Es2PlusProfileState]: + """Find a profile by its matching ID.""" + for iccid, profile in self.items(): + if profile.matching_id == matching_id: + return profile + return None + + def find_available_by_type(self, profile_type: str, owner: str) -> Optional[Es2PlusProfileState]: + """Find first available profile of given type for given owner.""" + for iccid, profile in self.items(): + if (profile.state == 'available' and + profile.profile_type == profile_type and + profile.owner == owner): + return profile + return None + +class Es2PlusHelpers: + """Common validation methods for ES2+ operations.""" + + @staticmethod + def normalize_and_validate_iccid(iccid: str, profile_store) -> Tuple[str, Es2PlusProfileState]: + """ + Normalize ICCID and retrieve profile with validation. + + Returns: + Tuple of (normalized_iccid, profile_object) + Raises: + ApiError if profile not found + """ + # Normalize ICCID (remove F padding) + iccid = iccid.rstrip('F') + + # Retrieve profile + profile = profile_store.get(iccid) + if not profile: + # SGP.22 Table: Profile ICCID - Unknown + raise ApiError('8.2.1', '3.9', 'Profile unknown', iccid) + + return iccid, profile + + @staticmethod + def check_authorization(request, profile, iccid: str, test_mode: bool, + profile_store) -> str: + """ + Check authorization and handle ownership assignment. + + Returns: + authenticated_entity + Raises: + ApiError if not authorized + """ + # Get authenticated entity from client certificate + authenticated_entity = getattr(request, 'authenticated_entity', None) + if not authenticated_entity: + # Should not happen if es2plus_api_wrapper is working correctly but this is python, anything is NaN NaN Nan Batman. + raise ApiError('8.2.1', '1.2', 'Not authorized - no authenticated entity', iccid) + + # For ES2+, profile ownership is based on the authenticated operator + # In test mode, we're kinda flexible with ownership + if test_mode: + if profile.owner == 'S_MNO' or not profile.owner: + profile.owner = authenticated_entity + profile_store[iccid] = profile + logger.info(f"Test mode: assigned profile {iccid} to {authenticated_entity}") + else: + # Less lenient in production, strictly check ownership + if profile.owner != authenticated_entity: + # SGP.22 Table: Profile ICCID - Not Allowed (Authorization) + raise ApiError('8.2.1', '1.2', f'Not authorized - profile owned by {profile.owner}', iccid) + + return authenticated_entity + + @staticmethod + def validate_eid_association(profile, eid: Optional[str], iccid: str): + """ + Validate EID association with profile. + + Raises: + ApiError if EID validation fails + """ + if profile.eid: + if not eid: + # EID should be provided if associated + raise ApiError('8.1.1', '2.2', 'EID required for this order') + if eid != profile.eid: + # SGP.22 Table: Profile ICCID - Invalid Association + raise ApiError('8.2.1', '3.10', 'Different EID associated', iccid) + + @staticmethod + def validate_matching_id_association(profile, matching_id: Optional[str]): + """ + Validate matching ID association with profile. + + Raises: + ApiError if matching ID validation fails + """ + if profile.matching_id: + if matching_id and matching_id != profile.matching_id: + # SGP.22 Table: Matching ID - Invalid Association + raise ApiError('8.2.6', '3.10', 'Different matchingID associated') + + @staticmethod + def handle_sm_ds_event_deletion(profile, sm_ds_events): + """Handle SM-DS Event Deletion if needed.""" + if profile.sm_ds_address and profile.matching_id: + # would call ES12.DeleteEvent + if profile.matching_id in sm_ds_events: + del sm_ds_events[profile.matching_id] + logger.info(f"Simulated SM-DS Event Deletion for matchingId: {profile.matching_id}") + + @staticmethod + def handle_sm_ds_event_registration(profile, sm_ds_events, server_hostname): + """Handle SM-DS Event Registration if needed.""" + import time + + if (profile.sm_ds_address and + profile.matching_id and + profile.matching_id not in sm_ds_events): + # Simulate ES12.RegisterEvent for now + sm_ds_events[profile.matching_id] = { + 'eid': profile.eid, + 'smdpAddress': server_hostname, + 'timestamp': time.time() + } + logger.info(f"Simulated SM-DS Event Registration for matchingId: {profile.matching_id}") + + @staticmethod + def reset_profile_to_available(profile): + """Reset profile to available state, clearing all associations.""" + profile.state = 'available' + profile.eid = None + profile.matching_id = None + profile.confirmation_code_hash = None + profile.sm_ds_address = None + profile.download_attempts = 0 \ No newline at end of file diff --git a/pySim/esim/smdpp_common.py b/pySim/esim/smdpp_common.py new file mode 100644 index 0000000..e1d035e --- /dev/null +++ b/pySim/esim/smdpp_common.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +# Common validation and utility functions for the smdpp. +# +# (C) 2025 by Eric Wild ewild@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import json +from typing import Dict, List, Optional +import logging +import asn1tools +from cryptography import x509 + +logger = logging.getLogger(__name__) + +class ApiError(Exception): + def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None, + subject_id: Optional[str] = None): + self.status_code = build_status_code(subject_code, reason_code, subject_id, message) + + def encode(self) -> str: + """Encode the API Error into a responseHeader string.""" + js = {} + build_resp_header(js, 'Failed', self.status_code) + return json.dumps(js) + +def get_eum_certificate_variant(eum_cert) -> str: + """Determine EUM certificate variant by checking Certificate Policies extension. + Returns 'O' for old variant, or 'NEW' for Ov3/A/B/C variants.""" + + try: + cert_policies_ext = eum_cert.extensions.get_extension_for_oid( + x509.oid.ExtensionOID.CERTIFICATE_POLICIES + ) + + for policy in cert_policies_ext.value: + policy_oid = policy.policy_identifier.dotted_string + logger.debug(f"Found certificate policy: {policy_oid}") + + if policy_oid == '2.23.146.1.2.1.2': + logger.debug("Detected EUM certificate variant: O (old)") + return 'O' + elif policy_oid == '2.23.146.1.2.1.0.0.0': + logger.debug("Detected EUM certificate variant: Ov3/A/B/C (new)") + return 'NEW' + except x509.ExtensionNotFound: + logger.debug("No Certificate Policies extension found") + except Exception as e: + logger.debug(f"Error checking certificate policies: {e}") + +def parse_permitted_eins_from_cert(eum_cert) -> List[str]: + """Extract permitted IINs from EUM certificate using the appropriate method + based on certificate variant (O vs Ov3/A/B/C). + Returns list of permitted IINs (basically prefixes that valid EIDs must start with).""" + + # Determine certificate variant first + cert_variant = get_eum_certificate_variant(eum_cert) + permitted_iins = [] + + if cert_variant == 'O': + # Old variant - use nameConstraints extension + permitted_iins.extend(_parse_name_constraints_eins(eum_cert)) + + else: + # New variants (Ov3, A, B, C) - use GSMA permittedEins extension + permitted_iins.extend(_parse_gsma_permitted_eins(eum_cert)) + + unique_iins = list(set(permitted_iins)) + + logger.debug(f"Total unique permitted IINs found: {len(unique_iins)}") + return unique_iins + +def _parse_gsma_permitted_eins(eum_cert) -> List[str]: + """Parse the GSMA permittedEins extension using correct ASN.1 structure. + PermittedEins ::= SEQUENCE OF PrintableString + Each string contains an IIN (Issuer Identification Number) - a prefix of valid EIDs.""" + permitted_iins = [] + + try: + permitted_eins_oid = x509.ObjectIdentifier('2.23.146.1.2.2.0') # sgp26: 2.23.146.1.2.2.0 = ASN1:SEQUENCE:permittedEins + + for ext in eum_cert.extensions: + if ext.oid == permitted_eins_oid: + logger.debug(f"Found GSMA permittedEins extension: {ext.oid}") + + # Get the DER-encoded extension value + ext_der = ext.value.value if hasattr(ext.value, 'value') else ext.value + + if isinstance(ext_der, bytes): + try: + permitted_eins_schema = """ + PermittedEins DEFINITIONS ::= BEGIN + PermittedEins ::= SEQUENCE OF PrintableString + END + """ + decoder = asn1tools.compile_string(permitted_eins_schema) + decoded_strings = decoder.decode('PermittedEins', ext_der) + + for iin_string in decoded_strings: + # Each string contains an IIN -> prefix of euicc EID + iin_clean = iin_string.strip().upper() + + # IINs is 8 chars per sgp22, var len according to sgp29, fortunately we don't care + if (len(iin_clean) == 8 and + all(c in '0123456789ABCDEF' for c in iin_clean) and + len(iin_clean) % 2 == 0): + permitted_iins.append(iin_clean) + logger.debug(f"Found permitted IIN (GSMA): {iin_clean}") + else: + logger.debug(f"Invalid IIN format: {iin_string} (cleaned: {iin_clean})") + except Exception as e: + logger.debug(f"Error parsing GSMA permittedEins extension: {e}") + + except Exception as e: + logger.debug(f"Error accessing GSMA certificate extensions: {e}") + + return permitted_iins + + +def _parse_name_constraints_eins(eum_cert) -> List[str]: + """Parse permitted IINs from nameConstraints extension (variant O).""" + permitted_iins = [] + + try: + # Look for nameConstraints extension + name_constraints_ext = eum_cert.extensions.get_extension_for_oid( + x509.oid.ExtensionOID.NAME_CONSTRAINTS + ) + + name_constraints = name_constraints_ext.value + + # Check permittedSubtrees for IIN constraints + if name_constraints.permitted_subtrees: + for subtree in name_constraints.permitted_subtrees: + + if isinstance(subtree, x509.DirectoryName): + for attribute in subtree.value: + # IINs for O in serialNumber + if attribute.oid == x509.oid.NameOID.SERIAL_NUMBER: + serial_value = attribute.value.upper() + # sgp22 8, sgp29 var len, fortunately we don't care + if (len(serial_value) == 8 and + all(c in '0123456789ABCDEF' for c in serial_value) and + len(serial_value) % 2 == 0): + permitted_iins.append(serial_value) + logger.debug(f"Found permitted IIN (nameConstraints/DN): {serial_value}") + + except x509.ExtensionNotFound: + logger.debug("No nameConstraints extension found") + except Exception as e: + logger.debug(f"Error parsing nameConstraints: {e}") + + return permitted_iins + + +def validate_eid_range(eid: str, eum_cert) -> bool: + """Validate that EID is within the permitted EINs of the EUM certificate.""" + if not eid or len(eid) != 32: + logger.debug(f"Invalid EID format: {eid}") + return False + + try: + permitted_eins = parse_permitted_eins_from_cert(eum_cert) + + if not permitted_eins: + logger.debug("Warning: No permitted EINs found in EUM certificate") + return False + + eid_normalized = eid.upper() + logger.debug(f"Validating EID {eid_normalized} against {len(permitted_eins)} permitted EINs") + + for permitted_ein in permitted_eins: + if eid_normalized.startswith(permitted_ein): + logger.debug(f"EID {eid_normalized} matches permitted EIN {permitted_ein}") + return True + + logger.debug(f"EID {eid_normalized} is not in any permitted EIN list") + return False + + except Exception as e: + logger.debug(f"Error validating EID: {e}") + return False + +def build_status_code(subject_code: str, reason_code: str, subject_id: Optional[str], message: Optional[str]) -> Dict: + r = {'subjectCode': subject_code, 'reasonCode': reason_code } + if subject_id: + r['subjectIdentifier'] = subject_id + if message: + r['message'] = message + return r + +def build_resp_header(js: dict, status: str = 'Executed-Success', status_code_data = None) -> None: + # SGP.22 v3.0 6.5.1.4 + js['header'] = { + 'functionExecutionStatus': { + 'status': status, + } + } + if status_code_data: + js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data diff --git a/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned.pem b/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned.pem new file mode 100644 index 0000000..02f69e0 --- /dev/null +++ b/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICQTCCAeagAwIBAgIUBH9i6K/qKONp3qK84jbPOeYJ3q8wCgYIKoZIzj0EAwIw +YzEiMCAGA1UEAwwZVEVTVF9PUEVSQVRPUiBFUzIrIENsaWVudDEWMBQGA1UECgwN +VEVTVF9PUEVSQVRPUjEYMBYGA1UECwwPZVNJTSBPcGVyYXRpb25zMQswCQYDVQQG +EwJVUzAeFw0yNTA3MjcyMDAyNTBaFw0yNjA3MjgyMDAyNTBaMGMxIjAgBgNVBAMM +GVRFU1RfT1BFUkFUT1IgRVMyKyBDbGllbnQxFjAUBgNVBAoMDVRFU1RfT1BFUkFU +T1IxGDAWBgNVBAsMD2VTSU0gT3BlcmF0aW9uczELMAkGA1UEBhMCVVMwWTATBgcq +hkjOPQIBBggqhkjOPQMBBwNCAAQsjlrne8IklFEW3BGu3FEJ0EEdwkE/vf2oqr+T +l8EFqkbKIDsu0yH+LmGYxXAvQjEXa8GvTwG9aln/VUcdt/oto3gwdjAdBgNVHQ4E +FgQUGZRJ3dJ/Ks7Q6BIjLBoKcgueJWowHwYDVR0jBBgwFoAUGZRJ3dJ/Ks7Q6BIj +LBoKcgueJWowDAYDVR0TAQH/BAIwADAOBgNVHQ8BAf8EBAMCBaAwFgYDVR0lAQH/ +BAwwCgYIKwYBBQUHAwIwCgYIKoZIzj0EAwIDSQAwRgIhAMFb61Bya2aiF7X7TTXm +alo0rffrzPdeDL0I4HeHEddQAiEA1kfH4AdlBYl0u+tgLqqHMcjiI6OrT6ywUsYD +p6q408U= +-----END CERTIFICATE----- diff --git a/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned_combined.pem b/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned_combined.pem new file mode 100644 index 0000000..2ffec8b --- /dev/null +++ b/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned_combined.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIICQTCCAeagAwIBAgIUBH9i6K/qKONp3qK84jbPOeYJ3q8wCgYIKoZIzj0EAwIw +YzEiMCAGA1UEAwwZVEVTVF9PUEVSQVRPUiBFUzIrIENsaWVudDEWMBQGA1UECgwN +VEVTVF9PUEVSQVRPUjEYMBYGA1UECwwPZVNJTSBPcGVyYXRpb25zMQswCQYDVQQG +EwJVUzAeFw0yNTA3MjcyMDAyNTBaFw0yNjA3MjgyMDAyNTBaMGMxIjAgBgNVBAMM +GVRFU1RfT1BFUkFUT1IgRVMyKyBDbGllbnQxFjAUBgNVBAoMDVRFU1RfT1BFUkFU +T1IxGDAWBgNVBAsMD2VTSU0gT3BlcmF0aW9uczELMAkGA1UEBhMCVVMwWTATBgcq +hkjOPQIBBggqhkjOPQMBBwNCAAQsjlrne8IklFEW3BGu3FEJ0EEdwkE/vf2oqr+T +l8EFqkbKIDsu0yH+LmGYxXAvQjEXa8GvTwG9aln/VUcdt/oto3gwdjAdBgNVHQ4E +FgQUGZRJ3dJ/Ks7Q6BIjLBoKcgueJWowHwYDVR0jBBgwFoAUGZRJ3dJ/Ks7Q6BIj +LBoKcgueJWowDAYDVR0TAQH/BAIwADAOBgNVHQ8BAf8EBAMCBaAwFgYDVR0lAQH/ +BAwwCgYIKwYBBQUHAwIwCgYIKoZIzj0EAwIDSQAwRgIhAMFb61Bya2aiF7X7TTXm +alo0rffrzPdeDL0I4HeHEddQAiEA1kfH4AdlBYl0u+tgLqqHMcjiI6OrT6ywUsYD +p6q408U= +-----END CERTIFICATE----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIA8rDic3h4Vc4aM8U81daYRxr5UphTWpEn3mWQPeViTSoAoGCCqGSM49 +AwEHoUQDQgAELI5a53vCJJRRFtwRrtxRCdBBHcJBP739qKq/k5fBBapGyiA7LtMh +/i5hmMVwL0IxF2vBr08BvWpZ/1VHHbf6LQ== +-----END EC PRIVATE KEY----- diff --git a/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned_key.pem b/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned_key.pem new file mode 100644 index 0000000..687a609 --- /dev/null +++ b/smdpp-data/certs/SelfSignedOperators/TEST_OPERATOR_selfsigned_key.pem @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIA8rDic3h4Vc4aM8U81daYRxr5UphTWpEn3mWQPeViTSoAoGCCqGSM49 +AwEHoUQDQgAELI5a53vCJJRRFtwRrtxRCdBBHcJBP739qKq/k5fBBapGyiA7LtMh +/i5hmMVwL0IxF2vBr08BvWpZ/1VHHbf6LQ== +-----END EC PRIVATE KEY----- diff --git a/tests/unittests/test_es2plus.py b/tests/unittests/test_es2plus.py new file mode 100644 index 0000000..da38c02 --- /dev/null +++ b/tests/unittests/test_es2plus.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 +# Integrated test for ES2+ functionality using module imports of our existing cli scripts (es2p client, cert gen). +# +# (C) 2025 by Eric Wild ewild@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import os +import socket +import sys +import tempfile +import unittest +import subprocess +import time +import shutil +from pathlib import Path + +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) +sys.path.insert(0, str(project_root / 'contrib')) + +from generate_self_signed_operator_cert import generate_self_signed_operator_cert # noqa: E402 +from pySim.esim.es2p import Es2pApiClient # noqa: E402 + +class HostnameResolutionMixin: + """Mixin to fix cert addresses to localhost""" + + def setUp(self): + super().setUp() + + self.hostname_mappings = getattr(self, 'hostname_mappings', { + 'testsmdpplus1.example.com': 'localhost' + }) + + self._original_getaddrinfo = socket.getaddrinfo + def patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): + if host in self.hostname_mappings: + target_host = self.hostname_mappings[host] + return self._original_getaddrinfo(target_host, port, family, type, proto, flags) + return self._original_getaddrinfo(host, port, family, type, proto, flags) + socket.getaddrinfo = patched_getaddrinfo + + def tearDown(self): + socket.getaddrinfo = self._original_getaddrinfo + super().tearDown() + +class TestES2PlusDemo(HostnameResolutionMixin, unittest.TestCase): + """Test ES2+ mutual TLS authentication with self-signed certificates.""" + + @classmethod + def setUpClass(cls): + """Set up test environment once for all tests.""" + cls.project_root = Path(__file__).parent.parent.parent + cls.test_dir = tempfile.mkdtemp(prefix='es2p_test_') + cls.cert_dir = os.path.join(cls.test_dir, 'certs') + os.makedirs(cls.cert_dir, exist_ok=True) + + cls.server_ca_cert = str(cls.project_root / 'smdpp-data' / 'certs' / + 'CertificateIssuer' / 'CERT_CI_ECDSA_NIST.pem') + + @classmethod + def tearDownClass(cls): + """Clean up test environment.""" + if hasattr(cls, 'test_dir') and os.path.exists(cls.test_dir): + shutil.rmtree(cls.test_dir) + + def setUp(self): + """Set up each test.""" + super().setUp() + self.server_process = None + + def tearDown(self): + """Clean up after each test.""" + if self.server_process: + self.server_process.terminate() + try: + self.server_process.wait(timeout=2) + except subprocess.TimeoutExpired: + self.server_process.kill() + self.server_process.wait() + + if self.server_process.stdout: + self.server_process.stdout.close() + if self.server_process.stderr: + self.server_process.stderr.close() + super().tearDown() + + + def start_smdpp_server(self): + """Start the SM-DP+ server as a subprocess (like in shell script).""" + cmd = [ + 'python3', + str(self.project_root / 'osmo-smdpp.py'), + '-t', # test mode + '-m', # in-memory storage + ] + + self.server_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=str(self.project_root) + ) + + # Wait for server to start because.. python + time.sleep(3) + + def test_self_signed_certificate_flow(self): + """Test ES2+ downloadOrder with self-signed operator certificate. + Steps: + 1. Generation of self-signed operator certificates + 2. Direct use of Es2pApiClient from imported cli module + 3. Mutual TLS authentication with self-signed certificates + 4. Trust establishment through SKI at app layer + """ + # Step 1: Generate self-signed operator certificate using imported function + operator_name = 'TEST_OPERATOR' + cert_path, ski = generate_self_signed_operator_cert(operator_name, self.cert_dir) + + # Verify certificate was created + self.assertTrue(os.path.exists(cert_path)) + self.assertIsNotNone(ski) + self.assertIn(':', ski) # SKI should be formatted with colons + + # Step 2: Start SM-DP+ server + self.start_smdpp_server() + + try: + client = Es2pApiClient( + url_prefix='https://testsmdpplus1.example.com:8000/gsma/rsp2/es2plus', + func_req_id=operator_name, + server_cert_verify=self.server_ca_cert, + client_cert=cert_path + ) + + # Step 4: Make downloadOrder request + request_data = { + 'profileType': 'Test' + } + + result = client.call_downloadOrder(request_data) + + self.assertIsInstance(result, dict) + + # In test mode, the server should accept the self-signed certificate and return a response + # - even if it's an error due to no matching profiles, the whole point is that the TLS handshake succeeds + + # Check if we got a functionExecutionStatus (expected in response) + if 'functionExecutionStatus' in result: + status = result['functionExecutionStatus'] + self.assertIn('status', status) + # May be 'Executed-Success' or 'Failed' depending on profile availability, don't care + self.assertIn(status['status'], ['Executed-Success', 'Failed']) + + except Exception as e: + # If we get an SSL error, the test failed + if 'SSL' in str(e) or 'certificate' in str(e).lower(): + self.fail(f"SSL/Certificate error occurred: {e}") + + def test_multiple_operator_certificates(self): + """Test that different operators can use different self-signed certificates.""" + operators = ['OPERATOR_A', 'OPERATOR_B', 'OPERATOR_C'] + certificates = {} + + # Generate certificates for each operator + for operator in operators: + cert_path, ski = generate_self_signed_operator_cert(operator, self.cert_dir) + certificates[operator] = { + 'cert_path': cert_path, + 'ski': ski + } + self.assertTrue(os.path.exists(cert_path)) + + self.start_smdpp_server() + + # Test that each operator can authenticate with their own certificate + for operator, cert_info in certificates.items(): + client = Es2pApiClient( + url_prefix='https://testsmdpplus1.example.com:8000/gsma/rsp2/es2plus', + func_req_id=operator, + server_cert_verify=self.server_ca_cert, + client_cert=cert_info['cert_path'] + ) + + request_data = {'profileType': f'Test_{operator}'} + + try: + result = client.call_downloadOrder(request_data) + self.assertIsInstance(result, dict) + # As above only TLS handshake matters + except Exception as e: + if 'SSL' in str(e) or 'certificate' in str(e).lower(): + self.fail(f"SSL/Certificate error for {operator}: {e}") + + def test_client_without_certificate_fails(self): + """Test that clients without certificates are rejected when mutual TLS is required.""" + self.start_smdpp_server() + + client = Es2pApiClient( + url_prefix='https://testsmdpplus1.example.com:8000/gsma/rsp2/es2plus', + func_req_id='NO_CERT_CLIENT', + server_cert_verify=self.server_ca_cert, + client_cert=None # No client certificate + ) + + request_data = {'profileType': 'Test'} + + # This should fail due to missing client certificate + # In test mode, the server may not enforce client certificates at TLS layer but rejects at application layer + try: + result = client.call_downloadOrder(request_data) + # If we get here, check if server rejected at application layer + if 'functionExecutionStatus' in result: + status = result['functionExecutionStatus'] + # Should be rejected due to missing authentication + self.assertEqual(status.get('status'), 'Failed', "Expected failure due to missing client certificate") + else: + # Got some response but no clear rejection (?!) + self.fail("Server should reject requests without client certificates") + except Exception as e: + # SSL/TLS layer rejection is also acceptable + error_msg = str(e) + self.assertTrue( + 'certificate' in error_msg.lower() or + 'SSL' in error_msg or + 'handshake' in error_msg.lower() or + '404' in error_msg, # Server might return 404 for unauthenticated requests + f"Expected certificate or authentication error, got: {error_msg}" + ) + + +if __name__ == '__main__': + unittest.main(verbosity=2) \ No newline at end of file diff --git a/tests/unittests/test_es2plus_es9plus_http.py b/tests/unittests/test_es2plus_es9plus_http.py new file mode 100644 index 0000000..8324ab9 --- /dev/null +++ b/tests/unittests/test_es2plus_es9plus_http.py @@ -0,0 +1,582 @@ +#!/usr/bin/env python3 +# Consolidated test suite for ES2+/ES9+ integration. +# HTTP endpoint testing with actual profile storage testing +# +# (C) 2025 by Eric Wild ewild@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import json +import base64 +import sys +import os +import tempfile +import hashlib +import unittest +import importlib.util +from unittest.mock import patch, MagicMock +from pathlib import Path + +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from pySim.utils import h2b # noqa: E402 + + +def import_osmo_smdpp(): + """Import osmo-smdpp module dynamically.""" + project_root = Path(__file__).parent.parent.parent + module_path = project_root / 'osmo-smdpp.py' + if 'osmo_smdpp' in sys.modules: + return sys.modules['osmo_smdpp'] + + spec = importlib.util.spec_from_file_location('osmo_smdpp', module_path) + if spec is None or spec.loader is None: + raise ImportError(f"Cannot load module from {module_path}") + + module = importlib.util.module_from_spec(spec) + + # Register it in sys.modules with underscores so pickle will look for 'osmo_smdpp' not 'osmo-smdpp' + sys.modules['osmo_smdpp'] = module + spec.loader.exec_module(module) + return module + + +osmo_smdpp = import_osmo_smdpp() + +Es2PlusProfileState = osmo_smdpp.Es2PlusProfileState +Es2PlusProfileStore = osmo_smdpp.Es2PlusProfileStore +SmDppHttpServer = osmo_smdpp.SmDppHttpServer + + +class ES2PlusHTTPEndpointTest(unittest.TestCase): + """Test ES2+ and ES9+ HTTP endpoints with mocked responses.""" + + def setUp(self): + """Set up test environment.""" + self.smdp_host = "localhost" + self.smdp_port = 8000 + self.base_url = f"http://%7Bself.smdp_host%7D:%7Bself.smdp_port%7D" + + self.test_eid = "89001012012341234012345678901234" + self.test_iccid = "8900000000000000001" + self.test_matching_id = None # Will be set by mock confirmOrder + self.test_confirmation_code = "12345678" + + def _create_mock_response(self, status_code=200, json_data=None): + """Create a mock HTTP response.""" + mock_response = MagicMock() + mock_response.status_code = status_code + mock_response.raise_for_status.return_value = None + if json_data: + mock_response.json.return_value = json_data + return mock_response + + @patch('requests.post') + def test_es2plus_download_order(self, mock_post): + """Test ES2+ DownloadOrder HTTP endpoint.""" + # Mock successful response + mock_response_data = { + "header": { + "functionExecutionStatus": { + "status": "Executed-Success" + } + }, + "iccid": self.test_iccid + } + mock_post.return_value = self._create_mock_response(200, mock_response_data) + + # Import requests here since we're mocking it + import requests + + data = { + "eid": self.test_eid, + "iccid": self.test_iccid, + "profileType": "Test" + } + + url = f"{self.base_url}/gsma/rsp2/es2plus/downloadOrder" + headers = {"Content-Type": "application/json"} + response = requests.post(url, json=data, headers=headers) + result = response.json() + + mock_post.assert_called_once_with(url, json=data, headers=headers) + + self.assertEqual(result["header"]["functionExecutionStatus"]["status"], "Executed-Success") + self.assertEqual(result["iccid"], self.test_iccid) + + @patch('requests.post') + def test_es2plus_confirm_order(self, mock_post): + """Test ES2+ ConfirmOrder HTTP endpoint.""" + test_matching_id = "TEST_MATCHING_ID_12345" + + # Mock successful response + mock_response_data = { + "header": { + "functionExecutionStatus": { + "status": "Executed-Success" + } + }, + "matchingId": test_matching_id + } + mock_post.return_value = self._create_mock_response(200, mock_response_data) + + import requests + + data = { + "iccid": self.test_iccid, + "releaseFlag": True, + "eid": self.test_eid, + "confirmationCode": self.test_confirmation_code + } + + url = f"{self.base_url}/gsma/rsp2/es2plus/confirmOrder" + headers = {"Content-Type": "application/json"} + response = requests.post(url, json=data, headers=headers) + result = response.json() + + mock_post.assert_called_once_with(url, json=data, headers=headers) + + self.assertEqual(result["header"]["functionExecutionStatus"]["status"], "Executed-Success") + self.assertEqual(result["matchingId"], test_matching_id) + + @patch('requests.post') + def test_es9plus_initiate_authentication(self, mock_post): + """Test ES9+ InitiateAuthentication HTTP endpoint.""" + test_transaction_id = "TXN_12345" + + # Mock successful response + mock_response_data = { + "header": { + "functionExecutionStatus": { + "status": "Executed-Success" + } + }, + "transactionId": test_transaction_id + } + mock_post.return_value = self._create_mock_response(200, mock_response_data) + + import requests + + euicc_info1 = { + "svn": "2.2.0", + "euiccCiPKIdListForVerification": [ + {"SubjectKeyIdentifier": "F54172BDF98A95D65CBEB88A38A1C11D800A85C3"} + ], + "euiccCiPKIdListForSigning": [ + {"SubjectKeyIdentifier": "F54172BDF98A95D65CBEB88A38A1C11D800A85C3"} + ] + } + + euicc_info1_b64 = base64.b64encode(json.dumps(euicc_info1).encode()).decode() + data = { + "euiccChallenge": base64.b64encode(b"test_challenge_123").decode(), + "euiccInfo1": euicc_info1_b64, + "smdpAddress": f"{self.smdp_host}:{self.smdp_port}" + } + + url = f"{self.base_url}/gsma/rsp2/es9plus/initiateAuthentication" + headers = {"Content-Type": "application/json"} + response = requests.post(url, json=data, headers=headers) + result = response.json() + + mock_post.assert_called_once_with(url, json=data, headers=headers) + + self.assertEqual(result["header"]["functionExecutionStatus"]["status"], "Executed-Success") + self.assertEqual(result["transactionId"], test_transaction_id) + + @patch('requests.post') + def test_es9plus_authenticate_client_with_matching_id(self, mock_post): + """Test ES9+ AuthenticateClient with matchingId.""" + test_transaction_id = "TXN_12345" + test_matching_id = "TEST_MATCHING_ID_12345" + + # mock a failure due to certificate validation but ensure that the matchingId was processed + mock_response = MagicMock() + mock_response.status_code = 400 + mock_response.raise_for_status.side_effect = Exception("Certificate validation failed") + mock_response.text = "matchingId processed but certificate validation failed" + mock_post.return_value = mock_response + + import requests + + authenticate_server_response = { + "authenticateResponseOk": { + "euiccSigned1": { + "transactionId": test_transaction_id, + "serverChallenge": "mock_server_challenge", + "ctxParams1": { + "ctxParamsForCommonAuthentication": { + "matchingId": test_matching_id + } + } + }, + "euiccSignature1": base64.b64encode(b"mock_signature").decode(), + "euiccCertificate": "mock_certificate", + "eumCertificate": "mock_eum_certificate" + } + } + + data = { + "transactionId": test_transaction_id, + "authenticateServerResponse": base64.b64encode( + json.dumps(authenticate_server_response).encode() + ).decode() + } + + # Make request and expect it to fail (due to mock certificates) + url = f"{self.base_url}/gsma/rsp2/es9plus/authenticateClient" + headers = {"Content-Type": "application/json"} + + with self.assertRaises(Exception) as context: + requests.post(url, json=data, headers=headers) + response = context.exception.response if hasattr(context.exception, 'response') else mock_response + response.raise_for_status() + + mock_post.assert_called_once_with(url, json=data, headers=headers) + + # Verify the matchingId was included in the request + call_args = mock_post.call_args + request_data = call_args[1]['json'] + + encoded_response = request_data['authenticateServerResponse'] + decoded_response = json.loads(base64.b64decode(encoded_response).decode()) + + matching_id_in_request = (decoded_response['authenticateResponseOk'] + ['euiccSigned1']['ctxParams1'] + ['ctxParamsForCommonAuthentication']['matchingId']) + + self.assertEqual(matching_id_in_request, test_matching_id) + + +class ES2PlusProfileIntegrationTest(unittest.TestCase): + """Test ES2+ profile storage and ES9+ integration.""" + + def setUp(self): + """Set up test environment.""" + self.test_iccid = "8900000000000000099" + self.test_matching_id = "TEST_ES2PLUS_MID_123" + self.test_confirmation_code = "87654321" + self.test_eid = "89001012012341234012345678901234" + + self.temp_dir = tempfile.mkdtemp() + self.profile_store_path = os.path.join(self.temp_dir, "test-es2plus-profiles") + self.profile_store = Es2PlusProfileStore(filename=self.profile_store_path) + + def test_es2plus_profile_creation_and_storage(self): + """Test creating and storing ES2+ profiles.""" + # Create an ES2+ profile as if it was provisioned via downloadOrder/confirmOrder + profile = Es2PlusProfileState(self.test_iccid, 'Test', 'TestOperator') + profile.matching_id = self.test_matching_id + profile.state = 'released' + profile.eid = self.test_eid + + # Store confirmation code hash (SHA256 of the code) + cc_bytes = h2b(self.test_confirmation_code) + profile.confirmation_code_hash = hashlib.sha256(cc_bytes).digest() + + self.profile_store[self.test_iccid] = profile + self.profile_store.sync() + + self.assertIn(self.test_iccid, self.profile_store) + stored_profile = self.profile_store[self.test_iccid] + self.assertEqual(stored_profile.state, 'released') + self.assertEqual(stored_profile.matching_id, self.test_matching_id) + self.assertIsNotNone(stored_profile.confirmation_code_hash) + + def test_profile_lookup_by_matching_id(self): + """Test profile lookup by matching ID.""" + profile = Es2PlusProfileState(self.test_iccid, 'Test', 'TestOperator') + profile.matching_id = self.test_matching_id + profile.state = 'released' + profile.eid = self.test_eid + + cc_bytes = h2b(self.test_confirmation_code) + profile.confirmation_code_hash = hashlib.sha256(cc_bytes).digest() + + self.profile_store[self.test_iccid] = profile + self.profile_store.sync() + + # Test lookup by matching_id + found_profile = self.profile_store.find_by_matching_id(self.test_matching_id) + + self.assertIsNotNone(found_profile, "Profile not found by matching_id") + self.assertEqual(found_profile.iccid, self.test_iccid) + self.assertEqual(found_profile.state, 'released') + self.assertEqual(found_profile.eid, self.test_eid) + self.assertIsNotNone(found_profile.confirmation_code_hash) + + def test_server_integration_with_es2plus_profiles(self): + """Test server's ability to find ES2+ profiles.""" + profile = Es2PlusProfileState(self.test_iccid, 'Test', 'TestOperator') + profile.matching_id = self.test_matching_id + profile.state = 'released' + profile.eid = self.test_eid + + cc_bytes = h2b(self.test_confirmation_code) + profile.confirmation_code_hash = hashlib.sha256(cc_bytes).digest() + + self.profile_store[self.test_iccid] = profile + self.profile_store.sync() + + server = SmDppHttpServer( + server_hostname='testsmdpplus1.example.com', + ci_certs_path=str(project_root / 'smdpp-data/certs/CertificateIssuer'), + common_cert_path=str(project_root / 'smdpp-data/certs'), + use_brainpool=False, + in_memory=True, + test_mode=True + ) + + # Replace server's profile store with our test store + server.profile_store = self.profile_store + + # Test server's ability to find the profile + es2plus_profile = server.profile_store.find_by_matching_id(self.test_matching_id) + + self.assertIsNotNone(es2plus_profile, "Server cannot find ES2+ profile") + self.assertEqual(es2plus_profile.iccid, self.test_iccid) + self.assertEqual(es2plus_profile.state, 'released') + self.assertEqual(es2plus_profile.matching_id, self.test_matching_id) + self.assertEqual(es2plus_profile.eid, self.test_eid) + + def test_es9plus_would_accept_profile(self): + """Test that ES9+ logic would accept the ES2+ profile.""" + profile = Es2PlusProfileState(self.test_iccid, 'Test', 'TestOperator') + profile.matching_id = self.test_matching_id + profile.state = 'released' + profile.eid = self.test_eid + + cc_bytes = h2b(self.test_confirmation_code) + profile.confirmation_code_hash = hashlib.sha256(cc_bytes).digest() + + self.profile_store[self.test_iccid] = profile + self.profile_store.sync() + + # Find the profile (simulating ES9+ authenticateClient logic) + found_profile = self.profile_store.find_by_matching_id(self.test_matching_id) + + self.assertIsNotNone(found_profile) + self.assertEqual(found_profile.state, 'released', + "Profile must be in 'released' state for ES9+ download") + self.assertIsNotNone(found_profile.confirmation_code_hash, + "Profile should have confirmation code hash") + + expected_hash = hashlib.sha256(cc_bytes).digest() + self.assertEqual(found_profile.confirmation_code_hash, expected_hash, + "Confirmation code hash mismatch") + + def test_activation_code_profiles_sync(self): + """Test that ES2+ profiles are synced to activation_code_profiles.""" + server = SmDppHttpServer( + server_hostname='testsmdpplus1.example.com', + ci_certs_path=str(project_root / 'smdpp-data/certs/CertificateIssuer'), + common_cert_path=str(project_root / 'smdpp-data/certs'), + use_brainpool=False, + in_memory=True, + test_mode=True + ) + + test_matching_id = "ES2PLUS_SYNC_TEST" + test_iccid = "8900000000000000088" + + profile = Es2PlusProfileState(test_iccid, 'Test', 'TestOp') + profile.matching_id = test_matching_id + profile.state = 'released' + server.profile_store[profile.iccid] = profile + + # In test mode, manually trigger sync (simulating confirmOrder behavior) + if server.test_mode: + server.activation_code_profiles[test_matching_id] = { + 'matchingId': test_matching_id, + 'confirmationCode': None, + 'iccid': profile.iccid, + 'profileName': f'ES2+ Profile {profile.iccid}', + 'state': 'released', + 'download_attempts': 0, + 'cc_attempts': 0, + 'associated_eid': None, + 'expiration': None, + 'profile_path': 'TS48v4_SAIP2.3_BERTLV' + } + + self.assertIn(test_matching_id, server.activation_code_profiles, + "ES2+ profile should be synced to activation_code_profiles") + + synced_profile = server.activation_code_profiles[test_matching_id] + self.assertEqual(synced_profile['iccid'], test_iccid) + self.assertEqual(synced_profile['state'], 'released') + + def tearDown(self): + """Clean up test environment.""" + if hasattr(self, 'profile_store'): + self.profile_store.close() + + if hasattr(self, 'temp_dir'): + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + +class ES2PlusProfileStoreTest(unittest.TestCase): + """Test ES2PlusProfileStore functionality.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = tempfile.mkdtemp() + self.store_path = os.path.join(self.temp_dir, "test-store") + self.store = Es2PlusProfileStore(filename=self.store_path) + + def test_profile_store_basic_operations(self): + """Test basic profile store operations.""" + iccid = "8900000000000000001" + profile = Es2PlusProfileState(iccid, "Test", "TestOp") + profile.matching_id = "TEST_BASIC" + profile.state = "allocated" + + self.store[iccid] = profile + + self.assertIn(iccid, self.store) + retrieved = self.store[iccid] + self.assertEqual(retrieved.iccid, iccid) + self.assertEqual(retrieved.matching_id, "TEST_BASIC") + self.assertEqual(retrieved.state, "allocated") + + def test_find_by_matching_id_functionality(self): + """Test find_by_matching_id method.""" + profiles = [ + ("8900000000000000001", "MATCH_ID_1", "released"), + ("8900000000000000002", "MATCH_ID_2", "confirmed"), + ("8900000000000000003", "MATCH_ID_3", "allocated"), + ] + + for iccid, match_id, state in profiles: + profile = Es2PlusProfileState(iccid, "Test", "TestOp") + profile.matching_id = match_id + profile.state = state + self.store[iccid] = profile + + for iccid, match_id, state in profiles: + with self.subTest(matching_id=match_id): + found = self.store.find_by_matching_id(match_id) + self.assertIsNotNone(found) + self.assertEqual(found.iccid, iccid) + self.assertEqual(found.matching_id, match_id) + self.assertEqual(found.state, state) + + not_found = self.store.find_by_matching_id("NON_EXISTENT") + self.assertIsNone(not_found) + + def tearDown(self): + """Clean up test environment.""" + self.store.close() + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + +class ES2PlusWorkflowTest(unittest.TestCase): + """Test complete ES2+ to ES9+ workflow.""" + + def setUp(self): + """Set up test environment for workflow tests.""" + self.temp_dir = tempfile.mkdtemp() + self.profile_store_path = os.path.join(self.temp_dir, "workflow-es2plus-profiles") + self.profile_store = Es2PlusProfileStore(filename=self.profile_store_path) + + self.test_eid = "89001012012341234012345678901234" + self.test_iccid = "8900000000000000001" + self.test_confirmation_code = "12345678" + self.test_matching_id = "WORKFLOW_TEST_MID" + + def test_complete_workflow_with_profile_store(self): + """Test complete workflow using actual profile store.""" + # Step 1: Simulate DownloadOrder - create profile + profile = Es2PlusProfileState(self.test_iccid, 'Test', 'TestOperator') + profile.state = 'allocated' + self.profile_store[self.test_iccid] = profile + self.profile_store.sync() + + stored_profile = self.profile_store[self.test_iccid] + self.assertEqual(stored_profile.state, 'allocated') + + # Step 2: Simulate ConfirmOrder - update profile with matching_id + stored_profile.matching_id = self.test_matching_id + stored_profile.state = 'released' + stored_profile.eid = self.test_eid + + cc_bytes = h2b(self.test_confirmation_code) + stored_profile.confirmation_code_hash = hashlib.sha256(cc_bytes).digest() + + self.profile_store[self.test_iccid] = stored_profile + self.profile_store.sync() + + confirmed_profile = self.profile_store[self.test_iccid] + self.assertEqual(confirmed_profile.state, 'released') + self.assertEqual(confirmed_profile.matching_id, self.test_matching_id) + + # Step 3: Simulate ES9+ lookup by matching_id + es9_profile = self.profile_store.find_by_matching_id(self.test_matching_id) + + self.assertIsNotNone(es9_profile) + self.assertEqual(es9_profile.iccid, self.test_iccid) + self.assertEqual(es9_profile.state, 'released') + self.assertEqual(es9_profile.eid, self.test_eid) + + # Step 4: Verify complete data integrity + self.assertIsNotNone(es9_profile.confirmation_code_hash) + expected_hash = hashlib.sha256(cc_bytes).digest() + self.assertEqual(es9_profile.confirmation_code_hash, expected_hash) + + def test_error_handling_workflow(self): + """Test error handling in the integration workflow.""" + # Test 1: Profile not found by matching_id + non_existent_profile = self.profile_store.find_by_matching_id("NON_EXISTENT_MID") + self.assertIsNone(non_existent_profile) + + # Test 2: Profile in wrong state for ES9+ download + profile = Es2PlusProfileState(self.test_iccid, 'Test', 'TestOperator') + profile.matching_id = "ERROR_TEST_MID" + profile.state = 'allocated' # Wrong state - should be 'released' + self.profile_store[self.test_iccid] = profile + self.profile_store.sync() + + error_profile = self.profile_store.find_by_matching_id("ERROR_TEST_MID") + self.assertIsNotNone(error_profile) + self.assertNotEqual(error_profile.state, 'released', + "Profile should not be in released state for error test") + + # Test 3: Missing confirmation code hash + profile2 = Es2PlusProfileState("8900000000000000002", 'Test', 'TestOperator') + profile2.matching_id = "NO_CC_HASH_MID" + profile2.state = 'released' + # Intentionally not setting confirmation_code_hash + self.profile_store[profile2.iccid] = profile2 + self.profile_store.sync() + + no_cc_profile = self.profile_store.find_by_matching_id("NO_CC_HASH_MID") + self.assertIsNotNone(no_cc_profile) + self.assertIsNone(getattr(no_cc_profile, 'confirmation_code_hash', None), + "Profile should not have confirmation code hash for error test") + + def tearDown(self): + """Clean up test environment.""" + if hasattr(self, 'profile_store'): + self.profile_store.close() + + if hasattr(self, 'temp_dir'): + import shutil + shutil.rmtree(self.temp_dir, ignore_errors=True) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/unittests/test_unified_9p_2p.py b/tests/unittests/test_unified_9p_2p.py new file mode 100644 index 0000000..34c2eb9 --- /dev/null +++ b/tests/unittests/test_unified_9p_2p.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 +# Test script to verify the unified ES2+/ES9+ architecture. +# +# (C) 2025 by Eric Wild ewild@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import sys +import logging +import unittest +import importlib.util +from pathlib import Path + +# Add project root to path for imports +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +def import_osmo_smdpp(): + project_root = Path(__file__).parent.parent.parent + module_path = project_root / 'osmo-smdpp.py' + if 'osmo_smdpp' in sys.modules: + return sys.modules['osmo_smdpp'] + + spec = importlib.util.spec_from_file_location('osmo_smdpp', module_path) + if spec is None or spec.loader is None: + raise ImportError(f"Cannot load module from {module_path}") + + module = importlib.util.module_from_spec(spec) + + # Register it in sys.modules with underscores so pickle will look for 'osmo_smdpp' not 'osmo-smdpp' + sys.modules['osmo_smdpp'] = module + spec.loader.exec_module(module) + return module + +osmo_smdpp = import_osmo_smdpp() + +SmDppHttpServer = osmo_smdpp.SmDppHttpServer +Es2PlusProfileState = osmo_smdpp.Es2PlusProfileState + + +class UnifiedArchitectureTest(unittest.TestCase): + """Test cases for unified ES2+/ES9+ architecture.""" + + def setUp(self): + """Set up test environment.""" + logging.basicConfig(level=logging.DEBUG) + self.logger = logging.getLogger(__name__) + + self.server = SmDppHttpServer( + server_hostname='test.example.com', + ci_certs_path=str(project_root / 'smdpp-data/certs/CertificateIssuer'), + common_cert_path=str(project_root / 'smdpp-data/certs'), + use_brainpool=False, + in_memory=True, + test_mode=True + ) + + def test_server_initialization(self): + """Test that server initializes correctly in test mode.""" + self.assertIsNotNone(self.server) + self.assertTrue(self.server.test_mode) + self.assertIsNotNone(self.server.profile_store) + + def test_static_profiles_loaded(self): + """Test that static test profiles are loaded.""" + self.assertGreater(len(self.server.activation_code_profiles), 0, + "No static test profiles loaded") + + def test_es2plus_profile_creation(self): + """Test creating and storing ES2+ profiles.""" + test_matching_id = 'TEST123' + test_iccid = '8900000000000099999F' + + es2_profile = Es2PlusProfileState(test_iccid, 'Test', 'TestOperator') + es2_profile.matching_id = test_matching_id + es2_profile.state = 'released' + self.server.profile_store[test_iccid] = es2_profile + + self.assertIn(test_iccid, self.server.profile_store) + stored_profile = self.server.profile_store[test_iccid] + self.assertEqual(stored_profile.matching_id, test_matching_id) + self.assertEqual(stored_profile.state, 'released') + + def test_es2plus_profile_lookup_by_matching_id(self): + """Test looking up ES2+ profiles by matching ID.""" + test_matching_id = 'TEST_LOOKUP_001' + test_iccid = '8900000000000088888F' + + profile = Es2PlusProfileState(test_iccid, 'Test', 'TestOperator') + profile.matching_id = test_matching_id + profile.state = 'confirmed' + self.server.profile_store[test_iccid] = profile + + found_profile = self.server.profile_store.find_by_matching_id(test_matching_id) + self.assertIsNotNone(found_profile, "Profile not found by matching ID") + self.assertEqual(found_profile.iccid, test_iccid) + self.assertEqual(found_profile.matching_id, test_matching_id) + + def test_profile_state_validation(self): + """Test that profile states are properly validated.""" + # Test valid states + valid_states = ['released', 'confirmed'] + for state in valid_states: + with self.subTest(state=state): + iccid = f'89000000000000{state[:5].upper()}F' + profile = Es2PlusProfileState(iccid, 'Test', 'TestOp') + profile.matching_id = f'TEST_{state.upper()}' + profile.state = state + self.server.profile_store[iccid] = profile + + found = self.server.profile_store.find_by_matching_id(profile.matching_id) + self.assertIsNotNone(found) + self.assertEqual(found.state, state) + + # Test invalid state (should still be stored but not ready for download) + invalid_state = 'allocated' + iccid = '8900000000000077777F' + profile = Es2PlusProfileState(iccid, 'Test', 'TestOp') + profile.matching_id = 'TEST_INVALID_STATE' + profile.state = invalid_state + self.server.profile_store[iccid] = profile + + found = self.server.profile_store.find_by_matching_id('TEST_INVALID_STATE') + self.assertIsNotNone(found) + self.assertEqual(found.state, invalid_state) + + def test_es2plus_takes_precedence_over_static(self): + """Test that ES2+ profiles take precedence over static profiles.""" + # Find a matching ID that exists in static profiles + if not self.server.activation_code_profiles: + self.skipTest("No static profiles available for precedence test") + + static_matching_id = next(iter(self.server.activation_code_profiles.keys())) + static_iccid = self.server.activation_code_profiles[static_matching_id]['iccid'] + + # Create ES2+ profile with same matching ID but different ICCID + es2_iccid = '8900000000000099999F' + es2_profile = Es2PlusProfileState(es2_iccid, 'Test', 'TestOperator') + es2_profile.matching_id = static_matching_id + es2_profile.state = 'released' + self.server.profile_store[es2_iccid] = es2_profile + + # Verify ES2+ profile is found (takes precedence) + found_profile = self.server.profile_store.find_by_matching_id(static_matching_id) + self.assertIsNotNone(found_profile) + self.assertEqual(found_profile.iccid, es2_iccid, + "ES2+ profile should take precedence over static profile") + self.assertNotEqual(found_profile.iccid, static_iccid, + "Should return ES2+ ICCID, not static ICCID") + + def test_multiple_profiles_coexistence(self): + """Test that multiple ES2+ profiles can coexist with different states.""" + profiles_data = [ + ('8900000000000011111F', 'TEST_MULTI_1', 'released'), + ('8900000000000022222F', 'TEST_MULTI_2', 'confirmed'), + ('8900000000000033333F', 'TEST_MULTI_3', 'allocated'), + ] + + # Create and store multiple profiles + for iccid, matching_id, state in profiles_data: + profile = Es2PlusProfileState(iccid, 'Test', 'TestOperator') + profile.matching_id = matching_id + profile.state = state + self.server.profile_store[iccid] = profile + + # Verify all profiles can be found by their matching IDs + for iccid, matching_id, state in profiles_data: + with self.subTest(matching_id=matching_id): + found = self.server.profile_store.find_by_matching_id(matching_id) + self.assertIsNotNone(found) + self.assertEqual(found.iccid, iccid) + self.assertEqual(found.state, state) + + def tearDown(self): + """Clean up after tests.""" + if hasattr(self, 'server') and self.server: + # Close profile store if it has a close method + if hasattr(self.server.profile_store, 'close'): + self.server.profile_store.close() + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file