laforge has submitted this change. ( https://gerrit.osmocom.org/c/pysim/+/35634?usp=email )
Change subject: Move X.509 related code from osmo-smdpp to pySim.esim.x509_cert ......................................................................
Move X.509 related code from osmo-smdpp to pySim.esim.x509_cert
Change-Id: I230ba0537b702b0bf6e9da9a430908ed2a21ca61 --- M osmo-smdpp.py M pySim/esim/x509_cert.py 2 files changed, 86 insertions(+), 74 deletions(-)
Approvals: laforge: Looks good to me, approved Jenkins Builder: Verified
diff --git a/osmo-smdpp.py b/osmo-smdpp.py index 41cdfc8..7e0db27 100755 --- a/osmo-smdpp.py +++ b/osmo-smdpp.py @@ -36,6 +36,7 @@
import pySim.esim.rsp as rsp from pySim.esim.es8p import * +from pySim.esim.x509_cert import oid, cert_policy_has_oid, CertAndPrivkey
# HACK: make this configurable DATA_DIR = './smdpp-data' @@ -70,18 +71,12 @@ js['header']['functionExecutionStatus']['statusCodeData'] = status_code_data
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature -from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding, PublicFormat, PrivateFormat, NoEncryption +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives import hashes from cryptography.exceptions import InvalidSignature from cryptography import x509
- -def ecdsa_dss_to_tr03111(sig: bytes) -> bytes: - """convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes.""" - r, s = decode_dss_signature(sig) - return r.to_bytes(32, 'big') + s.to_bytes(32, 'big') - def ecdsa_tr03111_to_dss(sig: bytes) -> bytes: """convert an ECDSA signature from BSI TR-03111 format to DER: first get long integers; then encode those.""" assert len(sig) == 64 @@ -90,52 +85,6 @@ return encode_dss_signature(r, s)
-class CertAndPrivkey: - """A pair of certificate and private key, as used for ECDSA signing.""" - def __init__(self, required_policy_oid: Optional[x509.ObjectIdentifier] = None, - cert: Optional[x509.Certificate] = None, priv_key = None): - self.required_policy_oid = required_policy_oid - self.cert = cert - self.priv_key = priv_key - - def cert_from_der_file(self, path: str): - with open(path, 'rb') as f: - cert = x509.load_der_x509_certificate(f.read()) - if self.required_policy_oid: - # verify it is the right type of certificate (id-rspRole-dp-auth, id-rspRole-dp-auth-v2, etc.) - assert cert_policy_has_oid(cert, self.required_policy_oid) - self.cert = cert - - def privkey_from_pem_file(self, path: str, password: Optional[str] = None): - with open(path, 'rb') as f: - self.priv_key = load_pem_private_key(f.read(), password) - - def ecdsa_sign(self, plaintext: bytes) -> bytes: - """Sign some input-data using an ECDSA signature compliant with SGP.22, - which internally refers to Global Platform 2.2 Annex E, which in turn points - to BSI TS-03111 which states "concatengated raw R + S values". """ - sig = self.priv_key.sign(plaintext, ec.ECDSA(hashes.SHA256())) - # convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes - return ecdsa_dss_to_tr03111(sig) - - def get_authority_key_identifier(self) -> x509.AuthorityKeyIdentifier: - """Return the AuthorityKeyIdentifier X.509 extension of the certificate.""" - return list(filter(lambda x: isinstance(x.value, x509.AuthorityKeyIdentifier), self.cert.extensions))[0].value - - def get_subject_alt_name(self) -> x509.SubjectAlternativeName: - """Return the SubjectAlternativeName X.509 extension of the certificate.""" - return list(filter(lambda x: isinstance(x.value, x509.SubjectAlternativeName), self.cert.extensions))[0].value - - def get_cert_as_der(self) -> bytes: - """Return certificate encoded as DER.""" - return self.cert.public_bytes(Encoding.DER) - - def get_curve(self) -> ec.EllipticCurve: - return self.cert.public_key().public_numbers().curve - - - - class ApiError(Exception): def __init__(self, subject_code: str, reason_code: str, message: Optional[str] = None, subject_id: Optional[str] = None): @@ -147,27 +96,6 @@ build_resp_header(js, 'Failed', self.status_code) return json.dumps(js)
-def cert_policy_has_oid(cert: x509.Certificate, match_oid: x509.ObjectIdentifier) -> bool: - """Determine if given certificate has a certificatePolicy extension of matching OID.""" - for policy_ext in filter(lambda x: isinstance(x.value, x509.CertificatePolicies), cert.extensions): - if any(policy.policy_identifier == match_oid for policy in policy_ext.value._policies): - return True - return False - -ID_RSP = "2.23.146.1" -ID_RSP_CERT_OBJECTS = '.'.join([ID_RSP, '2']) -ID_RSP_ROLE = '.'.join([ID_RSP_CERT_OBJECTS, '1']) - -class oid: - id_rspRole_ci = x509.ObjectIdentifier(ID_RSP_ROLE + '.0') - id_rspRole_euicc_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.1') - id_rspRole_eum_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.2') - id_rspRole_dp_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.3') - id_rspRole_dp_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.4') - id_rspRole_dp_pb_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.5') - id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6') - id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7') - class SmDppHttpServer: app = Klein()
diff --git a/pySim/esim/x509_cert.py b/pySim/esim/x509_cert.py index f6c6d43..9f6c8e6 100644 --- a/pySim/esim/x509_cert.py +++ b/pySim/esim/x509_cert.py @@ -23,6 +23,7 @@ from cryptography.hazmat.primitives import hashes from cryptography.exceptions import InvalidSignature from cryptography import x509 +from cryptography.hazmat.primitives.serialization import load_pem_private_key, Encoding
from pySim.utils import b2h
@@ -43,6 +44,27 @@ aki_ext = cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier).value return aki_ext.key_identifier
+def cert_policy_has_oid(cert: x509.Certificate, match_oid: x509.ObjectIdentifier) -> bool: + """Determine if given certificate has a certificatePolicy extension of matching OID.""" + for policy_ext in filter(lambda x: isinstance(x.value, x509.CertificatePolicies), cert.extensions): + if any(policy.policy_identifier == match_oid for policy in policy_ext.value._policies): + return True + return False + +ID_RSP = "2.23.146.1" +ID_RSP_CERT_OBJECTS = '.'.join([ID_RSP, '2']) +ID_RSP_ROLE = '.'.join([ID_RSP_CERT_OBJECTS, '1']) + +class oid: + id_rspRole_ci = x509.ObjectIdentifier(ID_RSP_ROLE + '.0') + id_rspRole_euicc_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.1') + id_rspRole_eum_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.2') + id_rspRole_dp_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.3') + id_rspRole_dp_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.4') + id_rspRole_dp_pb_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.5') + id_rspRole_ds_tls_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.6') + id_rspRole_ds_auth_v2 = x509.ObjectIdentifier(ID_RSP_ROLE + '.7') + class VerifyError(Exception): """An error during certificate verification,""" pass @@ -54,6 +76,8 @@ def __init__(self, root_cert: x509.Certificate): check_signed(root_cert, root_cert) # TODO: check other mandatory attributes for CA Cert + if not cert_policy_has_oid(root_cert, oid.id_rspRole_ci): + raise ValueError("Given root certificate doesn't have rspRole_ci OID") usage_ext = root_cert.extensions.get_extension_for_class(x509.KeyUsage).value if not usage_ext.key_cert_sign: raise ValueError('Given root certificate key usage does not permit signing of certificates') @@ -135,3 +159,54 @@ depth += 1 if depth > max_depth: raise VerifyError('Maximum depth %u exceeded while verifying certificate chain' % max_depth) + +from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature + +def ecdsa_dss_to_tr03111(sig: bytes) -> bytes: + """convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes.""" + r, s = decode_dss_signature(sig) + return r.to_bytes(32, 'big') + s.to_bytes(32, 'big') + + +class CertAndPrivkey: + """A pair of certificate and private key, as used for ECDSA signing.""" + def __init__(self, required_policy_oid: Optional[x509.ObjectIdentifier] = None, + cert: Optional[x509.Certificate] = None, priv_key = None): + self.required_policy_oid = required_policy_oid + self.cert = cert + self.priv_key = priv_key + + def cert_from_der_file(self, path: str): + with open(path, 'rb') as f: + cert = x509.load_der_x509_certificate(f.read()) + if self.required_policy_oid: + # verify it is the right type of certificate (id-rspRole-dp-auth, id-rspRole-dp-auth-v2, etc.) + assert cert_policy_has_oid(cert, self.required_policy_oid) + self.cert = cert + + def privkey_from_pem_file(self, path: str, password: Optional[str] = None): + with open(path, 'rb') as f: + self.priv_key = load_pem_private_key(f.read(), password) + + def ecdsa_sign(self, plaintext: bytes) -> bytes: + """Sign some input-data using an ECDSA signature compliant with SGP.22, + which internally refers to Global Platform 2.2 Annex E, which in turn points + to BSI TS-03111 which states "concatengated raw R + S values". """ + sig = self.priv_key.sign(plaintext, ec.ECDSA(hashes.SHA256())) + # convert from DER format to BSI TR-03111; first get long integers; then convert those to bytes + return ecdsa_dss_to_tr03111(sig) + + def get_authority_key_identifier(self) -> x509.AuthorityKeyIdentifier: + """Return the AuthorityKeyIdentifier X.509 extension of the certificate.""" + return list(filter(lambda x: isinstance(x.value, x509.AuthorityKeyIdentifier), self.cert.extensions))[0].value + + def get_subject_alt_name(self) -> x509.SubjectAlternativeName: + """Return the SubjectAlternativeName X.509 extension of the certificate.""" + return list(filter(lambda x: isinstance(x.value, x509.SubjectAlternativeName), self.cert.extensions))[0].value + + def get_cert_as_der(self) -> bytes: + """Return certificate encoded as DER.""" + return self.cert.public_bytes(Encoding.DER) + + def get_curve(self) -> ec.EllipticCurve: + return self.cert.public_key().public_numbers().curve