laforge submitted this change.

View Change

Approvals: laforge: Looks good to me, approved Jenkins Builder: Verified
personalization: implement UppAudit and BatchAudit

Change-Id: Iaab336ca91b483ecdddd5c6c8e08dc475dc6bd0a
Jenkins: skip-card-test
---
M pySim/esim/saip/batch.py
M pySim/esim/saip/personalization.py
2 files changed, 225 insertions(+), 1 deletion(-)

diff --git a/pySim/esim/saip/batch.py b/pySim/esim/saip/batch.py
index 92e8b1e..a8db5c4 100644
--- a/pySim/esim/saip/batch.py
+++ b/pySim/esim/saip/batch.py
@@ -19,11 +19,16 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import copy
-from typing import Generator
+import pprint
+from typing import Generator, Union
+from osmocom.utils import b2h
from pySim.esim.saip.personalization import ConfigurableParameter
from pySim.esim.saip import param_source
from pySim.esim.saip import ProfileElementSequence

+# a list of ConfigurableParameter classes and/or ConfigurableParameter class instances
+ParamList = list[Union[type[ConfigurableParameter], ConfigurableParameter]]
+
class BatchPersonalization:
"""Produce a series of eSIM profiles from predefined parameters.
Personalization parameters are derived from pysim.esim.saip.param_source.ParamSource.
@@ -118,3 +123,215 @@
raise ValueError(f'{p.param_cls.get_name()} fed by {p.src.name}: {e}') from e

yield pes
+
+
+class UppAudit(dict):
+ """
+ Key-value pairs collected from a single UPP DER or PES.
+
+ UppAudit itself is a dict, callers may use the standard python dict API to access key-value pairs read from the UPP.
+ """
+
+ @classmethod
+ def from_der(cls, der: bytes, params: ParamList, der_size=False):
+ """return a dict of parameter name and set of selected parameter values found in a DER encoded profile. Note:
+ some ConfigurableParameter implementations return more than one key-value pair, for example, Imsi returns
+ both 'IMSI' and 'IMSI-ACC' parameters.
+
+ e.g.
+ UppAudit.from_der(my_der, [Imsi, ])
+ --> {'IMSI': {'001010000000023'}, 'IMSI-ACC': {'5'}}
+
+ (where 'IMSI' == Imsi.name)
+
+ Read all parameters listed in params. params is a list of either ConfigurableParameter classes or
+ ConfigurableParameter class instances. This calls only classmethods, so each entry in params can either be the
+ class itself, or a class-instance of, a (non-abstract) ConfigurableParameter subclass.
+ For example, params = [Imsi, ] is equivalent to params = [Imsi(), ].
+
+ For der_size=True, also include a {'der_size':12345} entry.
+ """
+
+ # make an instance of this class
+ upp_audit = cls()
+
+ if der_size:
+ upp_audit['der_size'] = set((len(der), ))
+
+ pes = ProfileElementSequence.from_der(der)
+ for param in params:
+ try:
+ for valdict in param.get_values_from_pes(pes):
+ upp_audit.add_values(valdict)
+ except (TypeError, ValueError) as e:
+ raise ValueError(f'Error during audit for parameter {param}: {e}') from e
+ return upp_audit
+
+ def get_single_val(self, key, allow_absent=False, absent_val=None):
+ """
+ Return the audit's value for the given audit key (like 'IMSI' or 'IMSI-ACC').
+ Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value,
+ return that value. When they do not agree, raise a ValueError.
+ """
+ # key should be a string, but if someone passes a ConfigurableParameter, just use its default name
+ if ConfigurableParameter.is_super_of(key):
+ key = key.get_name()
+
+ assert isinstance(key, str)
+ v = self.get(key)
+ if v is None and allow_absent:
+ return absent_val
+ if not isinstance(v, set):
+ raise ValueError(f'audit value should be a set(), got {v!r}')
+ if len(v) != 1:
+ raise ValueError(f'expected a single value for {key}, got {v!r}')
+ v = tuple(v)[0]
+ return v
+
+ @staticmethod
+ def audit_val_to_str(v):
+ """
+ Usually, we want to see a single value in an audit. Still, to be able to collect multiple ambiguous values,
+ audit values are always python sets. Turn it into a nice string representation: only the value when it is
+ unambiguous, otherwise a list of the ambiguous values.
+ A value may also be completely absent, then return 'not present'.
+ """
+ def try_single_val(w):
+ 'change single-entry sets to just the single value'
+ if isinstance(w, set):
+ if len(w) == 1:
+ return tuple(w)[0]
+ if len(w) == 0:
+ return None
+ return w
+
+ v = try_single_val(v)
+ if isinstance(v, bytes):
+ v = b2h(v)
+ if v is None:
+ return 'not present'
+ return str(v)
+
+ def get_val_str(self, key):
+ """Return a string of the value stored for the given key"""
+ return UppAudit.audit_val_to_str(self.get(key))
+
+ def add_values(self, src:dict):
+ """Merge a plain dict of values into self, which is a dict of sets.
+ For example from
+ self == { 'a': {123} }
+ and
+ src == { 'a': 456, 'b': 789 }
+ then after this function call:
+ self == { 'a': {123, 456}, 'b': {789} }
+ """
+ assert isinstance(src, dict)
+ for key, srcval in src.items():
+ dstvalset = self.get(key)
+ if dstvalset is None:
+ dstvalset = set()
+ self[key] = dstvalset
+ dstvalset.add(srcval)
+
+ def __str__(self):
+ return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys()))
+
+class BatchAudit(list):
+ """
+ Collect UppAudit instances for a batch of UPP, for example from a personalization.BatchPersonalization.
+ Produce an output CSV.
+
+ Usage example:
+
+ ba = BatchAudit(params=(personalization.Iccid, ))
+ for upp_der in upps:
+ ba.add_audit(upp_der)
+ print(ba.summarize())
+
+ with open('output.csv', 'wb') as csv_data:
+ csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='')
+ csv.writer(csv_str).writerows( ba.to_csv_rows() )
+ csv_str.flush()
+
+ BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances.
+ """
+
+ def __init__(self, params: ParamList):
+ assert params
+ self.params = params
+
+ def add_audit(self, upp_der:bytes):
+ audit = UppAudit.from_der(upp_der, self.params)
+ self.append(audit)
+ return audit
+
+ def summarize(self):
+ batch_audit = UppAudit()
+
+ audits = self
+
+ if len(audits) > 2:
+ val_sep = ', ..., '
+ else:
+ val_sep = ', '
+
+ first_audit = None
+ last_audit = None
+ if len(audits) >= 1:
+ first_audit = audits[0]
+ if len(audits) >= 2:
+ last_audit = audits[-1]
+
+ if first_audit:
+ if last_audit:
+ for key in first_audit.keys():
+ first_val = first_audit.get_val_str(key)
+ last_val = last_audit.get_val_str(key)
+
+ if first_val == last_val:
+ val = first_val
+ else:
+ val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' * (len(key) + 2)}"
+ val = val_sep_with_newline.join((first_val, last_val))
+ batch_audit[key] = val
+ else:
+ batch_audit.update(first_audit)
+
+ return batch_audit
+
+ def to_csv_rows(self, headers=True, sort_key=None):
+ """generator that yields all audits' values as rows, useful feed to a csv.writer."""
+ columns = set()
+ for audit in self:
+ columns.update(audit.keys())
+
+ columns = tuple(sorted(columns, key=sort_key))
+
+ if headers:
+ yield columns
+
+ for audit in self:
+ yield (audit.get_single_val(col, allow_absent=True, absent_val="") for col in columns)
+
+def esim_profile_introspect(upp):
+ pes = ProfileElementSequence.from_der(upp.read())
+ d = {}
+ d['upp'] = repr(pes)
+
+ def show_bytes_as_hexdump(item):
+ if isinstance(item, bytes):
+ return b2h(item)
+ if isinstance(item, list):
+ return list(show_bytes_as_hexdump(i) for i in item)
+ if isinstance(item, tuple):
+ return tuple(show_bytes_as_hexdump(i) for i in item)
+ if isinstance(item, dict):
+ d = {}
+ for k, v in item.items():
+ d[k] = show_bytes_as_hexdump(v)
+ return d
+ return item
+
+ l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes)
+ d['pp'] = pprint.pformat(l, width=120)
+ return d
diff --git a/pySim/esim/saip/personalization.py b/pySim/esim/saip/personalization.py
index c633a9c..eb81934 100644
--- a/pySim/esim/saip/personalization.py
+++ b/pySim/esim/saip/personalization.py
@@ -260,6 +260,13 @@
'''
return cls.get_len_range()[1] or 16

+ @classmethod
+ def is_super_of(cls, other_class):
+ try:
+ return issubclass(other_class, cls)
+ except TypeError:
+ return False
+
class DecimalParam(ConfigurableParameter):
"""Decimal digits. The input value may be a string of decimal digits like '012345', or an int. The output of
validate_val() is a string with only decimal digits 0-9, in the required length with leading zeros if necessary.

To view, visit change 40208. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: merged
Gerrit-Project: pysim
Gerrit-Branch: master
Gerrit-Change-Id: Iaab336ca91b483ecdddd5c6c8e08dc475dc6bd0a
Gerrit-Change-Number: 40208
Gerrit-PatchSet: 12
Gerrit-Owner: neels <nhofmeyr@sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <laforge@osmocom.org>
Gerrit-CC: fixeria <vyanitskiy@sysmocom.de>