laforge has submitted this change. ( https://gerrit.osmocom.org/c/pysim/+/37498?usp=email )
Change subject: es9p_client: Move code into a class; do common steps in constructor ......................................................................
es9p_client: Move code into a class; do common steps in constructor
This is in preparation of supporting more than just 'download'
Change-Id: I5a165efcb97d9264369a9c6571cd92022cbcdfb0 --- M contrib/es9p_client.py 1 file changed, 175 insertions(+), 157 deletions(-)
Approvals: Jenkins Builder: Verified laforge: Looks good to me, approved fixeria: Looks good to me, but someone else must approve
diff --git a/contrib/es9p_client.py b/contrib/es9p_client.py index 987b85e..5e72e7b 100755 --- a/contrib/es9p_client.py +++ b/contrib/es9p_client.py @@ -41,19 +41,21 @@ parser.add_argument('--url', required=True, help='Base URL of ES9+ API endpoint') parser.add_argument('--server-ca-cert', help="""X.509 CA certificates acceptable for the server side. In production use cases, this would be the GSMA Root CA (CI) certificate.""") +parser.add_argument('--certificate-path', default='.', + help="Path in which to look for certificate and key files.") +parser.add_argument('--euicc-certificate', default='CERT_EUICC_ECDSA_NIST.der', + help="File name of DER-encoded eUICC certificate file.") +parser.add_argument('--euicc-private-key', default='SK_EUICC_ECDSA_NIST.pem', + help="File name of PEM-format eUICC secret key file.") +parser.add_argument('--eum-certificate', default='CERT_EUM_ECDSA_NIST.der', + help="File name of DER-encoded EUM certificate file.") +parser.add_argument('--ci-certificate', default='CERT_CI_ECDSA_NIST.der', + help="File name of DER-encoded CI certificate file.") + subparsers = parser.add_subparsers(dest='command',help="The command (API function) to call", required=True)
+# download parser_dl = subparsers.add_parser('download', help="ES9+ download") -parser_dl.add_argument('--certificate-path', default='.', - help="Path in which to look for certificate and key files.") -parser_dl.add_argument('--euicc-certificate', default='CERT_EUICC_ECDSA_NIST.der', - help="File name of DER-encoded eUICC certificate file.") -parser_dl.add_argument('--euicc-private-key', default='SK_EUICC_ECDSA_NIST.pem', - help="File name of PEM-format eUICC secret key file.") -parser_dl.add_argument('--eum-certificate', default='CERT_EUM_ECDSA_NIST.der', - help="File name of DER-encoded EUM certificate file.") -parser_dl.add_argument('--ci-certificate', default='CERT_CI_ECDSA_NIST.der', - help="File name of DER-encoded CI certificate file.") parser_dl.add_argument('--matchingId', required=True, help='MatchingID that shall be used by profile download') parser_dl.add_argument('--output-path', default='.', @@ -61,178 +63,183 @@ parser_dl.add_argument('--confirmation-code', help="Confirmation Code for the eSIM download")
+class Es9pClient: + def __init__(self, opts): + self.opts = opts + self.cert_and_key = CertAndPrivkey() + self.cert_and_key.cert_from_der_file(os.path.join(opts.certificate_path, opts.euicc_certificate)) + self.cert_and_key.privkey_from_pem_file(os.path.join(opts.certificate_path, opts.euicc_private_key))
-def do_download(opts): + with open(os.path.join(opts.certificate_path, opts.eum_certificate), 'rb') as f: + self.eum_cert = x509.load_der_x509_certificate(f.read())
- cert_and_key = CertAndPrivkey() - cert_and_key.cert_from_der_file(os.path.join(opts.certificate_path, opts.euicc_certificate)) - cert_and_key.privkey_from_pem_file(os.path.join(opts.certificate_path, opts.euicc_private_key)) + with open(os.path.join(opts.certificate_path, opts.ci_certificate), 'rb') as f: + self.ci_cert = x509.load_der_x509_certificate(f.read()) + subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), self.ci_cert.extensions)) + subject_pkid = subject_exts[0].value + self.ci_pkid = subject_pkid.key_identifier
- with open(os.path.join(opts.certificate_path, opts.eum_certificate), 'rb') as f: - eum_cert = x509.load_der_x509_certificate(f.read()) + print("EUICC: %s" % self.cert_and_key.cert.subject) + print("EUM: %s" % self.eum_cert.subject) + print("CI: %s" % self.ci_cert.subject)
- with open(os.path.join(opts.certificate_path, opts.ci_certificate), 'rb') as f: - ci_cert = x509.load_der_x509_certificate(f.read()) - subject_exts = list(filter(lambda x: isinstance(x.value, x509.SubjectKeyIdentifier), ci_cert.extensions)) - subject_pkid = subject_exts[0].value - ci_pkid = subject_pkid.key_identifier + self.eid = self.cert_and_key.cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value + print("EID: %s" % self.eid) + print("CI PKID: %s" % b2h(self.ci_pkid)) + print()
- print("EUICC: %s" % cert_and_key.cert.subject) - print("EUM: %s" % eum_cert.subject) - print("CI: %s" % ci_cert.subject) + self.peer = es9p.Es9pApiClient(opts.url, server_cert_verify=opts.server_ca_cert)
- eid = cert_and_key.cert.subject.get_attributes_for_oid(x509.oid.NameOID.SERIAL_NUMBER)[0].value - print("EID: %s" % eid) - print("CI PKID: %s" % b2h(ci_pkid)) - print()
- peer = es9p.Es9pApiClient(opts.url, server_cert_verify=opts.server_ca_cert) + def do_download(self):
- print("Step 1: InitiateAuthentication...") + print("Step 1: InitiateAuthentication...")
- euiccInfo1 = { - 'svn': b'\x02\x04\x00', - 'euiccCiPKIdListForVerification': [ - ci_pkid, - ], - 'euiccCiPKIdListForSigning': [ - ci_pkid, - ], - } + euiccInfo1 = { + 'svn': b'\x02\x04\x00', + 'euiccCiPKIdListForVerification': [ + self.ci_pkid, + ], + 'euiccCiPKIdListForSigning': [ + self.ci_pkid, + ], + }
- data = { - 'euiccChallenge': os.urandom(16), - 'euiccInfo1': euiccInfo1, - 'smdpAddress': urlparse(opts.url).netloc, - } - init_auth_res = peer.call_initiateAuthentication(data) - print(init_auth_res) + data = { + 'euiccChallenge': os.urandom(16), + 'euiccInfo1': euiccInfo1, + 'smdpAddress': urlparse(self.opts.url).netloc, + } + init_auth_res = self.peer.call_initiateAuthentication(data) + print(init_auth_res)
- print("Step 2: AuthenticateClient...") + print("Step 2: AuthenticateClient...")
- #res['serverSigned1'] - #res['serverSignature1'] - print("TODO: verify serverSignature1 over serverSigned1") - #res['transactionId'] - print("TODO: verify transactionId matches the signed one in serverSigned1") - #res['euiccCiPKIdToBeUsed'] - # TODO: select eUICC certificate based on CI - #res['serverCertificate'] - # TODO: verify server certificate against CI + #res['serverSigned1'] + #res['serverSignature1'] + print("TODO: verify serverSignature1 over serverSigned1") + #res['transactionId'] + print("TODO: verify transactionId matches the signed one in serverSigned1") + #res['euiccCiPKIdToBeUsed'] + # TODO: select eUICC certificate based on CI + #res['serverCertificate'] + # TODO: verify server certificate against CI
- euiccInfo2 = { - 'profileVersion': b'\x02\x03\x01', - 'svn': euiccInfo1['svn'], - 'euiccFirmwareVer': b'\x23\x42\x00', - 'extCardResource': b'\x81\x01\x00\x82\x04\x00\x04\x9ch\x83\x02"#', - 'uiccCapability': (b'k6\xd3\xc3', 32), - 'javacardVersion': b'\x11\x02\x00', - 'globalplatformVersion': b'\x02\x03\x00', - 'rspCapability': (b'\x9c', 6), - 'euiccCiPKIdListForVerification': euiccInfo1['euiccCiPKIdListForVerification'], - 'euiccCiPKIdListForSigning': euiccInfo1['euiccCiPKIdListForSigning'], - #'euiccCategory': - #'forbiddenProfilePolicyRules': - 'ppVersion': b'\x01\x00\x00', - 'sasAcreditationNumber': 'OSMOCOM-TEST-1', #TODO: make configurable - #'certificationDataObject': - } + euiccInfo2 = { + 'profileVersion': b'\x02\x03\x01', + 'svn': euiccInfo1['svn'], + 'euiccFirmwareVer': b'\x23\x42\x00', + 'extCardResource': b'\x81\x01\x00\x82\x04\x00\x04\x9ch\x83\x02"#', + 'uiccCapability': (b'k6\xd3\xc3', 32), + 'javacardVersion': b'\x11\x02\x00', + 'globalplatformVersion': b'\x02\x03\x00', + 'rspCapability': (b'\x9c', 6), + 'euiccCiPKIdListForVerification': euiccInfo1['euiccCiPKIdListForVerification'], + 'euiccCiPKIdListForSigning': euiccInfo1['euiccCiPKIdListForSigning'], + #'euiccCategory': + #'forbiddenProfilePolicyRules': + 'ppVersion': b'\x01\x00\x00', + 'sasAcreditationNumber': 'OSMOCOM-TEST-1', #TODO: make configurable + #'certificationDataObject': + }
- euiccSigned1 = { - 'transactionId': h2b(init_auth_res['transactionId']), - 'serverAddress': init_auth_res['serverSigned1']['serverAddress'], - 'serverChallenge': init_auth_res['serverSigned1']['serverChallenge'], - 'euiccInfo2': euiccInfo2, - 'ctxParams1': - ('ctxParamsForCommonAuthentication', { - 'matchingId': opts.matchingId, - 'deviceInfo': { - 'tac': b'\x35\x23\x01\x45', # same as lpac - 'deviceCapabilities': {}, - #imei: - } - }), - } - euiccSigned1_bin = rsp.asn1.encode('EuiccSigned1', euiccSigned1) - euiccSignature1 = cert_and_key.ecdsa_sign(euiccSigned1_bin) - auth_clnt_req = { - 'transactionId': init_auth_res['transactionId'], - 'authenticateServerResponse': - ('authenticateResponseOk', { - 'euiccSigned1': euiccSigned1, - 'euiccSignature1': euiccSignature1, - 'euiccCertificate': rsp.asn1.decode('Certificate', cert_and_key.get_cert_as_der()), - 'eumCertificate': rsp.asn1.decode('Certificate', eum_cert.public_bytes(Encoding.DER)) - }) - } - auth_clnt_res = peer.call_authenticateClient(auth_clnt_req) - print(auth_clnt_res) - #auth_clnt_res['transactionId'] - print("TODO: verify transactionId matches previous ones") - #auth_clnt_res['profileMetadata'] - # TODO: what's in here? - #auth_clnt_res['smdpSigned2']['bppEuiccOtpk'] - #auth_clnt_res['smdpSignature2'] - print("TODO: verify serverSignature2 over smdpSigned2") + euiccSigned1 = { + 'transactionId': h2b(init_auth_res['transactionId']), + 'serverAddress': init_auth_res['serverSigned1']['serverAddress'], + 'serverChallenge': init_auth_res['serverSigned1']['serverChallenge'], + 'euiccInfo2': euiccInfo2, + 'ctxParams1': + ('ctxParamsForCommonAuthentication', { + 'matchingId': self.opts.matchingId, + 'deviceInfo': { + 'tac': b'\x35\x23\x01\x45', # same as lpac + 'deviceCapabilities': {}, + #imei: + } + }), + } + euiccSigned1_bin = rsp.asn1.encode('EuiccSigned1', euiccSigned1) + euiccSignature1 = self.cert_and_key.ecdsa_sign(euiccSigned1_bin) + auth_clnt_req = { + 'transactionId': init_auth_res['transactionId'], + 'authenticateServerResponse': + ('authenticateResponseOk', { + 'euiccSigned1': euiccSigned1, + 'euiccSignature1': euiccSignature1, + 'euiccCertificate': rsp.asn1.decode('Certificate', self.cert_and_key.get_cert_as_der()), + 'eumCertificate': rsp.asn1.decode('Certificate', self.eum_cert.public_bytes(Encoding.DER)) + }) + } + auth_clnt_res = self.peer.call_authenticateClient(auth_clnt_req) + print(auth_clnt_res) + #auth_clnt_res['transactionId'] + print("TODO: verify transactionId matches previous ones") + #auth_clnt_res['profileMetadata'] + # TODO: what's in here? + #auth_clnt_res['smdpSigned2']['bppEuiccOtpk'] + #auth_clnt_res['smdpSignature2'] + print("TODO: verify serverSignature2 over smdpSigned2")
- smdp_cert = x509.load_der_x509_certificate(auth_clnt_res['smdpCertificate']) + smdp_cert = x509.load_der_x509_certificate(auth_clnt_res['smdpCertificate'])
- print("Step 3: GetBoundProfilePackage...") - # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter - # Reference value of CERT.DPpb.ECDSA - euicc_ot = ec.generate_private_key(smdp_cert.public_key().public_numbers().curve) + print("Step 3: GetBoundProfilePackage...") + # Generate a one-time ECKA key pair (ot{PK,SK}.DP.ECKA) using the curve indicated by the Key Parameter + # Reference value of CERT.DPpb.ECDSA + euicc_ot = ec.generate_private_key(smdp_cert.public_key().public_numbers().curve)
- # extract the public key in (hopefully) the right format for the ES8+ interface - euicc_otpk = euicc_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint) + # extract the public key in (hopefully) the right format for the ES8+ interface + euicc_otpk = euicc_ot.public_key().public_bytes(Encoding.X962, PublicFormat.UncompressedPoint)
- euiccSigned2 = { - 'transactionId': h2b(auth_clnt_res['transactionId']), - 'euiccOtpk': euicc_otpk, - #hashCC - } - # check for smdpSigned2 ccRequiredFlag, and send it in PrepareDownloadRequest hashCc - if auth_clnt_res['smdpSigned2']['ccRequiredFlag']: - if not opts.confirmation_code: - raise ValueError('Confirmation Code required but not provided') - cc_hash = hashlib.sha256(opts.confirmation_code.encode('ascii')).digest() - euiccSigned2['hashCc'] = hashlib.sha256(cc_hash + euiccSigned2['transactionId']).digest() - euiccSigned2_bin = rsp.asn1.encode('EUICCSigned2', euiccSigned2) - euiccSignature2 = cert_and_key.ecdsa_sign(euiccSigned2_bin + auth_clnt_res['smdpSignature2']) - gbp_req = { - 'transactionId': auth_clnt_res['transactionId'], - 'prepareDownloadResponse': - ('downloadResponseOk', { - 'euiccSigned2': euiccSigned2, - 'euiccSignature2': euiccSignature2, - }) - } - gbp_res = peer.call_getBoundProfilePackage(gbp_req) - print(gbp_res) - #gbp_res['transactionId'] - # TODO: verify transactionId - print("TODO: verify transactionId matches previous ones") - bpp_bin = gbp_res['boundProfilePackage'] - print("TODO: verify boundProfilePackage smdpSignature") + euiccSigned2 = { + 'transactionId': h2b(auth_clnt_res['transactionId']), + 'euiccOtpk': euicc_otpk, + #hashCC + } + # check for smdpSigned2 ccRequiredFlag, and send it in PrepareDownloadRequest hashCc + if auth_clnt_res['smdpSigned2']['ccRequiredFlag']: + if not self.opts.confirmation_code: + raise ValueError('Confirmation Code required but not provided') + cc_hash = hashlib.sha256(self.opts.confirmation_code.encode('ascii')).digest() + euiccSigned2['hashCc'] = hashlib.sha256(cc_hash + euiccSigned2['transactionId']).digest() + euiccSigned2_bin = rsp.asn1.encode('EUICCSigned2', euiccSigned2) + euiccSignature2 = self.cert_and_key.ecdsa_sign(euiccSigned2_bin + auth_clnt_res['smdpSignature2']) + gbp_req = { + 'transactionId': auth_clnt_res['transactionId'], + 'prepareDownloadResponse': + ('downloadResponseOk', { + 'euiccSigned2': euiccSigned2, + 'euiccSignature2': euiccSignature2, + }) + } + gbp_res = self.peer.call_getBoundProfilePackage(gbp_req) + print(gbp_res) + #gbp_res['transactionId'] + # TODO: verify transactionId + print("TODO: verify transactionId matches previous ones") + bpp_bin = gbp_res['boundProfilePackage'] + print("TODO: verify boundProfilePackage smdpSignature")
- bpp = BoundProfilePackage() - upp_bin = bpp.decode(euicc_ot, eid, bpp_bin) + bpp = BoundProfilePackage() + upp_bin = bpp.decode(euicc_ot, self.eid, bpp_bin)
- iccid = swap_nibbles(b2h(bpp.storeMetadataRequest['iccid'])) - base_name = os.path.join(opts.output_path, '%s' % iccid) + iccid = swap_nibbles(b2h(bpp.storeMetadataRequest['iccid'])) + base_name = os.path.join(self.opts.output_path, '%s' % iccid)
- print("SUCCESS: Storing files as %s.*.der" % base_name) + print("SUCCESS: Storing files as %s.*.der" % base_name)
- # write various output files - with open(base_name+'.upp.der', 'wb') as f: - f.write(bpp.upp) - with open(base_name+'.isdp.der', 'wb') as f: - f.write(bpp.encoded_configureISDPRequest) - with open(base_name+'.smr.der', 'wb') as f: - f.write(bpp.encoded_storeMetadataRequest) + # write various output files + with open(base_name+'.upp.der', 'wb') as f: + f.write(bpp.upp) + with open(base_name+'.isdp.der', 'wb') as f: + f.write(bpp.encoded_configureISDPRequest) + with open(base_name+'.smr.der', 'wb') as f: + f.write(bpp.encoded_storeMetadataRequest)
if __name__ == '__main__': opts = parser.parse_args()
+ c = Es9pClient(opts) + if opts.command == 'download': - do_download(opts) + c.do_download()