dexter has uploaded this change for review.

View Change

pySim/global_platform: make functionality available outside of cmd2

The nested class AddlShellCommands holds methods that encapsulate
the actual functionality from the related do_ method (e.g.
do_store_data calls self.store_data). This is already a good level
of separation but it does not allow us to call those methods from
programs that are not based on cmd2. Let's turn those methods into
functions so that non cmd2 applications have easy access to the
functionality of pySim.global_platform.

Let's also add a pySimLogger, so that we do not have to call
self._cmd.poutput

Related: SYS#6959
Change-Id: Idf4e4b58bf49ba62b2c22de4c49a2dcacfa872cb
---
M pySim/global_platform/__init__.py
1 file changed, 140 insertions(+), 116 deletions(-)

git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/41/42741/1
diff --git a/pySim/global_platform/__init__.py b/pySim/global_platform/__init__.py
index 306746f..e836e08 100644
--- a/pySim/global_platform/__init__.py
+++ b/pySim/global_platform/__init__.py
@@ -35,6 +35,10 @@
from pySim.profile import CardProfile
from pySim.ota import SimFileAccessAndToolkitAppSpecParams
from pySim.javacard import CapFile
+from pySim.runtime import RuntimeLchan
+from pySim.log import PySimLogger
+
+log = PySimLogger.get(__name__)

# GPCS Table 11-48 Load Parameter Tags
class NonVolatileCodeMinMemoryReq(BER_TLV_IE, tag=0xC6):
@@ -543,7 +547,8 @@
self._cmd.poutput('Unknown data object "%s", available options: %s' % (tlv_cls_name,
do_names))
return
- (data, _sw) = self._cmd.lchan.scc.get_data(cla=0x80, tag=tlv_cls.tag)
+
+ data = get_data(self._cmd.lchan, tag=tlv_cls.tag)
ie = tlv_cls()
ie.from_tlv(h2b(data))
self._cmd.poutput_json(ie.to_dict())
@@ -564,27 +569,7 @@
"""Perform the GlobalPlatform STORE DATA command in order to store some card-specific data.
See GlobalPlatform CardSpecification v2.3 Section 11.11 for details."""
response_permitted = opts.response == 'may_be_returned'
- self.store_data(h2b(opts.DATA), opts.data_structure, opts.encryption, response_permitted)
-
- def store_data(self, data: bytes, structure:str = 'none', encryption:str = 'none', response_permitted: bool = False) -> bytes:
- """Perform the GlobalPlatform STORE DATA command in order to store some card-specific data.
- See GlobalPlatform CardSpecification v2.3 Section 11.11 for details."""
- max_cmd_len = self._cmd.lchan.scc.max_cmd_len
- # Table 11-89 of GP Card Specification v2.3
- remainder = data
- block_nr = 0
- response = ''
- while len(remainder):
- chunk = remainder[:max_cmd_len]
- remainder = remainder[max_cmd_len:]
- p1b = build_construct(ADF_SD.StoreData,
- {'last_block': len(remainder) == 0, 'encryption': encryption,
- 'structure': structure, 'response': response_permitted})
- hdr = "80E2%02x%02x%02x" % (p1b[0], block_nr, len(chunk))
- data, _sw = self._cmd.lchan.scc.send_apdu_checksw(hdr + b2h(chunk) + "00")
- block_nr += 1
- response += data
- return h2b(response)
+ store_data(self._cmd.lchan, h2b(opts.DATA), opts.data_structure, opts.encryption, response_permitted)

put_key_parser = argparse.ArgumentParser()
put_key_parser.add_argument('--old-key-version-nr', type=auto_uint8, default=0, help='Old Key Version Number')
@@ -635,20 +620,8 @@
p2 = opts.key_id
if len(opts.key_type) > 1:
p2 |= 0x80
- self.put_key(opts.old_key_version_nr, opts.key_version_nr, p2, kdb)
+ put_key(self._cmd.lchan, opts.old_key_version_nr, opts.key_version_nr, p2, kdb)

- # Table 11-68: Key Data Field - Format 1 (Basic Format)
- KeyDataBasic = GreedyRange(Struct('key_type'/KeyType,
- 'kcb'/Prefixed(Int8ub, GreedyBytes),
- 'kcv'/Prefixed(Int8ub, GreedyBytes)))
-
- def put_key(self, old_kvn:int, kvn: int, kid: int, key_dict: dict) -> bytes:
- """Perform the GlobalPlatform PUT KEY command in order to store a new key on the card.
- See GlobalPlatform CardSpecification v2.3 Section 11.8 for details."""
- key_data = kvn.to_bytes(1, 'big') + build_construct(ADF_SD.AddlShellCommands.KeyDataBasic, key_dict)
- hdr = "80D8%02x%02x%02x" % (old_kvn, kid, len(key_data))
- data, _sw = self._cmd.lchan.scc.send_apdu_checksw(hdr + b2h(key_data) + "00")
- return data

get_status_parser = argparse.ArgumentParser()
get_status_parser.add_argument('subset', choices=list(StatusSubset.ksymapping.values()),
@@ -660,31 +633,10 @@
def do_get_status(self, opts):
"""Perform GlobalPlatform GET STATUS command in order to retrieve status information
on Issuer Security Domain, Executable Load File, Executable Module or Applications."""
- grd_list = self.get_status(opts.subset, opts.aid)
+ grd_list = get_status(self._cmd.lchan, opts.subset, opts.aid)
for grd in grd_list:
self._cmd.poutput_json(grd.to_dict())

- def get_status(self, subset:str, aid_search_qualifier:Hexstr = '') -> List[GpRegistryRelatedData]:
- subset_hex = b2h(build_construct(StatusSubset, subset))
- aid = ApplicationAID(decoded=aid_search_qualifier)
- cmd_data = aid.to_tlv() + h2b('5c054f9f70c5cc')
- p2 = 0x02 # TLV format according to Table 11-36
- grd_list = []
- while True:
- hdr = "80F2%s%02x%02x" % (subset_hex, p2, len(cmd_data))
- data, sw = self._cmd.lchan.scc.send_apdu(hdr + b2h(cmd_data) + "00")
- remainder = h2b(data)
- while len(remainder):
- # tlv sequence, each element is one GpRegistryRelatedData()
- grd = GpRegistryRelatedData()
- _dec, remainder = grd.from_tlv(remainder)
- grd_list.append(grd)
- if sw != '6310':
- return grd_list
- else:
- p2 |= 0x01
- return grd_list
-
set_status_parser = argparse.ArgumentParser()
set_status_parser.add_argument('scope', choices=list(SetStatusScope.ksymapping.values()),
help='Defines the scope of the requested status change')
@@ -698,14 +650,7 @@
"""Perform GlobalPlatform SET STATUS command in order to change the life cycle state of the
Issuer Security Domain, Supplementary Security Domain or Application. This normally requires
prior authentication with a Secure Channel Protocol."""
- self.set_status(opts.scope, opts.status, opts.aid)
-
- def set_status(self, scope:str, status:str, aid:Hexstr = ''):
- SetStatus = Struct(Const(0x80, Byte), Const(0xF0, Byte),
- 'scope'/SetStatusScope, 'status'/CLifeCycleState,
- 'aid'/Prefixed(Int8ub, COptional(GreedyBytes)))
- apdu = build_construct(SetStatus, {'scope':scope, 'status':status, 'aid':aid})
- _data, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(apdu))
+ set_status(self._cmd.lchan, opts.scope, opts.status, opts.aid)

inst_perso_parser = argparse.ArgumentParser()
inst_perso_parser.add_argument('application_aid', type=is_hexstr, help='Application AID')
@@ -715,7 +660,8 @@
"""Perform GlobalPlatform INSTALL [for personalization] command in order to inform a Security
Domain that the following STORE DATA commands are meant for a specific AID (specified here)."""
# Section 11.5.2.3.6 / Table 11-47
- self.install(0x20, 0x00, "0000%02x%s000000" % (len(opts.application_aid)//2, opts.application_aid))
+ install(self._cmd.lchan, 0x20, 0x00, "0000%02x%s000000" %
+ (len(opts.application_aid)//2, opts.application_aid))

inst_inst_parser = argparse.ArgumentParser()
inst_inst_parser.add_argument('--load-file-aid', type=is_hexstr, default='',
@@ -750,7 +696,7 @@
# convert from list to "true-dict" as required by construct.FlagsEnum
decoded['privileges'] = {x: True for x in decoded['privileges']}
ifi_bytes = build_construct(InstallForInstallCD, decoded)
- self.install(p1, 0x00, b2h(ifi_bytes))
+ install(self._cmd.lchan, p1, 0x00, b2h(ifi_bytes))

inst_load_parser = argparse.ArgumentParser()
inst_load_parser.add_argument('--load-file-aid', type=is_hexstr, required=True,
@@ -775,11 +721,7 @@
'load_parameters'/Prefixed(Int8ub, GreedyBytes),
'load_token'/Prefixed(Int8ub, GreedyBytes))
ifl_bytes = build_construct(InstallForLoadCD, vars(opts))
- self.install(0x02, 0x00, b2h(ifl_bytes))
-
- def install(self, p1:int, p2:int, data:Hexstr) -> ResTuple:
- cmd_hex = "80E6%02x%02x%02x%s00" % (p1, p2, len(data)//2, data)
- return self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
+ install(self._cmd.lchan, 0x02, 0x00, b2h(ifl_bytes))

del_cc_parser = argparse.ArgumentParser()
del_cc_parser.add_argument('aid', type=is_hexstr,
@@ -793,7 +735,7 @@
File, an Application or an Executable Load File and its related Applications."""
p2 = 0x80 if opts.delete_related_objects else 0x00
aid = ApplicationAID(decoded=opts.aid)
- self.delete(0x00, p2, b2h(aid.to_tlv()))
+ delete(self._cmd.lchan, 0x00, p2, b2h(aid.to_tlv()))

del_key_parser = argparse.ArgumentParser()
del_key_parser.add_argument('--key-id', type=auto_uint7, help='Key Identifier (KID)')
@@ -814,11 +756,7 @@
cmd += "d001%02x" % opts.key_id
if opts.key_ver is not None:
cmd += "d201%02x" % opts.key_ver
- self.delete(0x00, p2, cmd)
-
- def delete(self, p1:int, p2:int, data:Hexstr) -> ResTuple:
- cmd_hex = "80E4%02x%02x%02x%s00" % (p1, p2, len(data)//2, data)
- return self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
+ delete(self._cmd.lchan, 0x00, p2, cmd)

load_parser = argparse.ArgumentParser()
load_parser_from_grp = load_parser.add_mutually_exclusive_group(required=True)
@@ -831,33 +769,15 @@
"""Perform a GlobalPlatform LOAD command. (We currently only support loading without DAP and
without ciphering.)"""
if opts.from_hex is not None:
- self.load(h2b(opts.from_hex))
+ load(self._cmd.lchan, h2b(opts.from_hex))
elif opts.from_file is not None:
- self.load(opts.from_file.read())
+ load(self._cmd.lchan, opts.from_file.read())
elif opts.from_cap_file is not None:
cap = CapFile(opts.from_cap_file)
- self.load(cap.get_loadfile())
+ load(self._cmd.lchan, cap.get_loadfile())
else:
raise ValueError('load source not specified!')

- def load(self, contents:bytes, chunk_len:int = 240):
- # TODO:tune chunk_len based on the overhead of the used SCP?
- # build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case
- remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
- # transfer this in various chunks to the card
- total_size = len(remainder)
- block_nr = 0
- while len(remainder):
- block = remainder[:chunk_len]
- remainder = remainder[chunk_len:]
- # build LOAD command APDU according to GPC_SPE_034 section 11.6.2 / Table 11-56
- p1 = 0x00 if len(remainder) else 0x80
- p2 = block_nr % 256
- block_nr += 1
- cmd_hex = "80E8%02x%02x%02x%s00" % (p1, p2, len(block), b2h(block))
- _rsp_hex, _sw = self._cmd.lchan.scc.send_apdu_checksw(cmd_hex)
- self._cmd.poutput("Loaded a total of %u bytes in %u blocks. Don't forget install_for_install (and make selectable) now!" % (total_size, block_nr))
-
install_cap_parser = argparse.ArgumentParser(usage='%(prog)s FILE [--install-parameters | --install-parameters-*]')
install_cap_parser.add_argument('cap_file', type=str, metavar='FILE',
help='JAVA-CARD CAP file to install')
@@ -918,7 +838,7 @@
self._cmd.poutput("step #1: install for load...")
self.do_install_for_load("--load-file-aid %s --security-domain-aid %s" % (load_file_aid, security_domain_aid))
self._cmd.poutput("step #2: load...")
- self.load(load_file)
+ load(self._cmd.lchan, load_file)
self._cmd.poutput("step #3: install_for_install (and make selectable)...")
self.do_install_for_install("--load-file-aid %s --module-aid %s --application-aid %s --install-parameters %s --make-selectable" %
(load_file_aid, module_aid, application_aid, install_parameters))
@@ -958,7 +878,7 @@
host_challenge = h2b(opts.host_challenge) if opts.host_challenge else get_random_bytes(8)
kset = GpCardKeyset(opts.key_ver, h2b(opts.key_enc), h2b(opts.key_mac), h2b(opts.key_dek))
scp02 = SCP02(card_keys=kset)
- self._establish_scp(scp02, host_challenge, opts.security_level)
+ establish_scp(self._cmd.lchan, scp02, host_challenge, opts.security_level)

est_scp03_parser = deepcopy(est_scp02_parser)
est_scp03_parser.description = None
@@ -986,27 +906,14 @@
host_challenge = h2b(opts.host_challenge) if opts.host_challenge else get_random_bytes(s_mode)
kset = GpCardKeyset(opts.key_ver, h2b(opts.key_enc), h2b(opts.key_mac), h2b(opts.key_dek))
scp03 = SCP03(card_keys=kset, s_mode = s_mode)
- self._establish_scp(scp03, host_challenge, opts.security_level)
-
- def _establish_scp(self, scp, host_challenge, security_level):
- # perform the common functionality shared by SCP02 and SCP03 establishment
- init_update_apdu = scp.gen_init_update_apdu(host_challenge=host_challenge)
- init_update_resp, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(init_update_apdu))
- scp.parse_init_update_resp(h2b(init_update_resp))
- ext_auth_apdu = scp.gen_ext_auth_apdu(security_level)
- _ext_auth_resp, _sw = self._cmd.lchan.scc.send_apdu_checksw(b2h(ext_auth_apdu))
- self._cmd.poutput("Successfully established a %s secure channel" % str(scp))
- # store a reference to the SCP instance
- self._cmd.lchan.scc.scp = scp
- self._cmd.update_prompt()
-
+ establish_scp(self._cmd.lchan, scp03, host_challenge, opts.security_level)

def do_release_scp(self, _opts):
"""Release a previously establiehed secure channel."""
if not self._cmd.lchan.scc.scp:
self._cmd.poutput("Cannot release SCP as none is established")
return
- self._cmd.lchan.scc.scp = None
+ release_scp(self._cmd.lchan)
self._cmd.update_prompt()

# Card Application of a Security Domain
@@ -1075,3 +982,120 @@
return None
else:
return kcv_calculator(key)[:3]
+
+def store_data(lchan: RuntimeLchan, data: bytes, structure:str = 'none', encryption:str = 'none',
+ response_permitted: bool = False) -> bytes:
+ """
+ Perform the GlobalPlatform STORE DATA command in order to store some card-specific data.
+ See GlobalPlatform CardSpecification v2.3 Section 11.11 for details.
+ """
+ max_cmd_len = lchan.scc.max_cmd_len
+ # Table 11-89 of GP Card Specification v2.3
+ remainder = data
+ block_nr = 0
+ response = ''
+ while len(remainder):
+ chunk = remainder[:max_cmd_len]
+ remainder = remainder[max_cmd_len:]
+ p1b = build_construct(ADF_SD.StoreData,
+ {'last_block': len(remainder) == 0, 'encryption': encryption,
+ 'structure': structure, 'response': response_permitted})
+ hdr = "80E2%02x%02x%02x" % (p1b[0], block_nr, len(chunk))
+ data, _sw = lchan.scc.send_apdu_checksw(hdr + b2h(chunk) + "00")
+ block_nr += 1
+ response += data
+ return h2b(response)
+
+
+
+def get_data(lchan: RuntimeLchan, tag: int) -> bytes:
+ (data, _sw) = lchan.scc.get_data(cla=0x80, tag=tag)
+ return data
+
+def put_key(lchan: RuntimeLchan, old_kvn:int, kvn: int, kid: int, key_dict: dict) -> bytes:
+ """
+ Perform the GlobalPlatform PUT KEY command in order to store a new key on the card.
+ See GlobalPlatform CardSpecification v2.3 Section 11.8 for details.
+ """
+
+ # Table 11-68: Key Data Field - Format 1 (Basic Format)
+ KeyDataBasic = GreedyRange(Struct('key_type'/KeyType,
+ 'kcb'/Prefixed(Int8ub, GreedyBytes),
+ 'kcv'/Prefixed(Int8ub, GreedyBytes)))
+
+ key_data = kvn.to_bytes(1, 'big') + build_construct(KeyDataBasic, key_dict)
+ hdr = "80D8%02x%02x%02x" % (old_kvn, kid, len(key_data))
+ data, _sw = lchan.scc.send_apdu_checksw(hdr + b2h(key_data) + "00")
+ return data
+
+def get_status(lchan: RuntimeLchan, subset:str, aid_search_qualifier:Hexstr = '') -> List[GpRegistryRelatedData]:
+ subset_hex = b2h(build_construct(StatusSubset, subset))
+ aid = ApplicationAID(decoded=aid_search_qualifier)
+ cmd_data = aid.to_tlv() + h2b('5c054f9f70c5cc')
+ p2 = 0x02 # TLV format according to Table 11-36
+ grd_list = []
+ while True:
+ hdr = "80F2%s%02x%02x" % (subset_hex, p2, len(cmd_data))
+ data, sw = lchan.scc.send_apdu(hdr + b2h(cmd_data) + "00")
+ remainder = h2b(data)
+ while len(remainder):
+ # tlv sequence, each element is one GpRegistryRelatedData()
+ grd = GpRegistryRelatedData()
+ _dec, remainder = grd.from_tlv(remainder)
+ grd_list.append(grd)
+ if sw != '6310':
+ return grd_list
+ else:
+ p2 |= 0x01
+ return grd_list
+
+def set_status(lchan: RuntimeLchan, scope:str, status:str, aid:Hexstr = ''):
+ SetStatus = Struct(Const(0x80, Byte), Const(0xF0, Byte),
+ 'scope'/SetStatusScope, 'status'/CLifeCycleState,
+ 'aid'/Prefixed(Int8ub, COptional(GreedyBytes)))
+ apdu = build_construct(SetStatus, {'scope':scope, 'status':status, 'aid':aid})
+ _data, _sw = lchan.scc.send_apdu_checksw(b2h(apdu))
+
+ inst_perso_parser = argparse.ArgumentParser()
+ inst_perso_parser.add_argument('application_aid', type=is_hexstr, help='Application AID')
+
+def install(lchan: RuntimeLchan, p1:int, p2:int, data:Hexstr) -> ResTuple:
+ cmd_hex = "80E6%02x%02x%02x%s00" % (p1, p2, len(data)//2, data)
+ return lchan.scc.send_apdu_checksw(cmd_hex)
+
+def delete(lchan: RuntimeLchan, p1:int, p2:int, data:Hexstr) -> ResTuple:
+ cmd_hex = "80E4%02x%02x%02x%s00" % (p1, p2, len(data)//2, data)
+ return lchan.scc.send_apdu_checksw(cmd_hex)
+
+def load(lchan: RuntimeLchan, contents:bytes, chunk_len:int = 240):
+ # TODO:tune chunk_len based on the overhead of the used SCP?
+ # build TLV according to GPC_SPE_034 section 11.6.2.3 / Table 11-58 for unencrypted case
+ remainder = b'\xC4' + bertlv_encode_len(len(contents)) + contents
+ # transfer this in various chunks to the card
+ total_size = len(remainder)
+ block_nr = 0
+ while len(remainder):
+ block = remainder[:chunk_len]
+ remainder = remainder[chunk_len:]
+ # build LOAD command APDU according to GPC_SPE_034 section 11.6.2 / Table 11-56
+ p1 = 0x00 if len(remainder) else 0x80
+ p2 = block_nr % 256
+ block_nr += 1
+ cmd_hex = "80E8%02x%02x%02x%s00" % (p1, p2, len(block), b2h(block))
+ _rsp_hex, _sw = lchan.scc.send_apdu_checksw(cmd_hex)
+ log.info("Loaded a total of %u bytes in %u blocks. Don't forget install_for_install (and make selectable) now!",
+ total_size, block_nr)
+
+def establish_scp(lchan: RuntimeLchan, scp, host_challenge, security_level):
+ # perform the common functionality shared by SCP02 and SCP03 establishment
+ init_update_apdu = scp.gen_init_update_apdu(host_challenge=host_challenge)
+ init_update_resp, _sw = lchan.scc.send_apdu_checksw(b2h(init_update_apdu))
+ scp.parse_init_update_resp(h2b(init_update_resp))
+ ext_auth_apdu = scp.gen_ext_auth_apdu(security_level)
+ _ext_auth_resp, _sw = lchan.scc.send_apdu_checksw(b2h(ext_auth_apdu))
+ log.info("Successfully established a %s secure channel", str(scp))
+ # store a reference to the SCP instance
+ lchan.scc.scp = scp
+
+def release_scp(lchan: RuntimeLchan):
+ lchan.scc.scp = None

To view, visit change 42741. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: newchange
Gerrit-Project: pysim
Gerrit-Branch: master
Gerrit-Change-Id: Idf4e4b58bf49ba62b2c22de4c49a2dcacfa872cb
Gerrit-Change-Number: 42741
Gerrit-PatchSet: 1
Gerrit-Owner: dexter <pmaier@sysmocom.de>