neels has uploaded this change for review. ( https://gerrit.osmocom.org/c/pysim/+/40208?usp=email )
Change subject: personalization: implement UppAudit and BatchAudit ......................................................................
personalization: implement UppAudit and BatchAudit
Change-Id: Iaab336ca91b483ecdddd5c6c8e08dc475dc6bd0a --- M pySim/esim/saip/personalization.py 1 file changed, 212 insertions(+), 0 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/08/40208/1
diff --git a/pySim/esim/saip/personalization.py b/pySim/esim/saip/personalization.py index 7217af3..3919e79 100644 --- a/pySim/esim/saip/personalization.py +++ b/pySim/esim/saip/personalization.py @@ -277,6 +277,13 @@ and ((not blacklist) or (c not in blacklist))) )
+ @classmethod + def is_super_of(cls, other_class): + try: + return issubclass(other_class, cls) + except TypeError: + return False + class DecimalParam(ConfigurableParameter): """Decimal digits. The input value may be a string of decimal digits like '012345', or an int. The output of validate_val() is a string with only decimal digits 0-9, in the required length with leading zeros if necessary. @@ -1156,3 +1163,208 @@ f' (input_value={p.param.input_value!r} value={p.param.value!r})') from e
yield pes + + +class UppAudit(dict): + """ + Key-value pairs collected from a single UPP DER or PES. + + UppAudit itself is a dict, callers may use the standard python dict API to access key-value pairs read from the UPP. + """ + + @classmethod + def from_der(cls, der: bytes, params: List): + '''return a dict of parameter name and set of parameter values found in a DER encoded profile. + Read all parameters listed in params. This calls only classmethods, so each entry in params can either be a class or + an instance of a class, of a (non-abstract) ConfigurableParameter subclass. For example, params = [Imsi, ] is + equivalent to params = [Imsi(), ].''' + upp_audit = cls() + + upp_audit['der_size'] = set((len(der), )) + + pes = ProfileElementSequence.from_der(der) + for param in params: + key = param.get_name() + if key in upp_audit: + raise ValueError(f'UPP audit: there seem to be two conflicting parameters with the name {key!r}: ' + + ', '.join(f"{param.get_name()}={param.__name__}" for param in params)) + try: + for valdict in param.get_values_from_pes(pes): + upp_audit.add_values(valdict) + except (TypeError, ValueError) as e: + raise ValueError(f'Error during audit for parameter {key}: {e}') from e + return upp_audit + + def get_single_val(self, param, validate=True, allow_absent=False, absent_val=None): + """ + Return the audit's value for the given ConfigurableParameter class. + Any kind of value may occur multiple times in a profile. When all of these agree to the same unambiguous value, + return that value. When they do not agree, raise a ValueError. + """ + cp = None + if ConfigurableParameter.is_super_of(param): + cp = param + key = param.name + else: + key = param + assert isinstance(key, str) + v = self.get(key) + if v is None and allow_absent: + return absent_val + if not isinstance(v, set): + raise ValueError(f'audit value should be a set(), got {v!r}') + if len(v) != 1: + raise ValueError(f'expected a single value for {key}, got {v!r}') + v = tuple(v)[0] + if validate and cp: + # run value by the ConfigurableParameter's validation. + # (do not use the returned value, because the returned value is encoded for a PES) + cp.validate_val(v) + return v + + @staticmethod + def audit_val_to_str(v): + """ + Usually, we want to see a single value in an audit. Still, to be able to collect multiple ambiguous values, + audit values are always python sets. Turn it into a nice string representation: only the value when it is + unambiguous, otherwise a list of the ambiguous values. + A value may also be completely absent, then return 'not present'. + """ + def try_single_val(w): + 'change single-entry sets to just the single value' + if isinstance(w, set): + if len(w) == 1: + return tuple(w)[0] + if len(w) == 0: + return None + return w + + v = try_single_val(v) + if isinstance(v, bytes): + v = bytes_to_hexstr(v) + if v is None: + return 'not present' + return str(v) + + def get_val_str(self, key): + """Return a string of the value stored for the given key""" + return UppAudit.audit_val_to_str(self.get(key)) + + def add_values(self, src:dict): + """self and src are both a dict of sets. + For example from + self == { 'a': set((123,)) } + and + src == { 'a': set((456,)), 'b': set((789,)) } + then after this function call: + self == { 'a': set((123, 456,)), 'b': set((789,)) } + """ + assert isinstance(src, dict) + for key, srcvalset in src.items(): + dstvalset = self.get(key) + if dstvalset is None: + dstvalset = set() + self[key] = dstvalset + dstvalset.add(srcvalset) + + def __str__(self): + return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in sorted(self.keys())) + +class BatchAudit(list): + """ + Collect UppAudit instances for a batch of UPP, for example from a personalization.BatchPersonalization. + Produce an output CSV. + + Usage example: + + ba = BatchAudit(params=(personalization.Iccid, )) + for upp_der in upps: + ba.add_audit(upp_der) + print(ba.summarize()) + + with open('output.csv', 'wb') as csv_data: + csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='') + csv.writer(csv_str).writerows( ba.to_csv_rows() ) + csv_str.flush() + + BatchAudit itself is a list, callers may use the standard python list API to access the UppAudit instances. + """ + + def __init__(self, params:List=None): + if params is None: + params = ConfigurableParameter.get_all_implementations() + self.params = params + + def add_audit(self, upp_der:bytes): + audit = UppAudit.from_der(upp_der, self.params) + self.append(audit) + return audit + + def summarize(self): + batch_audit = UppAudit() + + audits = self + + if len(audits) > 2: + val_sep = ', ..., ' + else: + val_sep = ', ' + + first_audit = None + last_audit = None + if len(audits) >= 1: + first_audit = audits[0] + if len(audits) >= 2: + last_audit = audits[-1] + + if first_audit: + if last_audit: + for key in first_audit.keys(): + first_val = first_audit.get_val_str(key) + last_val = last_audit.get_val_str(key) + + if first_val == last_val: + val = first_val + else: + val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' * (len(key) + 2)}" + val = val_sep_with_newline.join((first_val, last_val)) + batch_audit[key] = val + else: + batch_audit.update(first_audit) + + return batch_audit + + def to_csv_rows(self, headers=True): + '''generator that yields all audits' values as rows, useful feed to a csv.writer.''' + params = tuple(sorted(self.params, key=lambda param: param.get_name())) + if headers: + yield (p.get_name() for p in params) + + for audit in self: + yield (audit.get_single_val(p, allow_absent=True, absent_val="") for p in params) + +def bytes_to_hexstr(b:bytes, sep=''): + return sep.join(f'{x:02x}' for x in b) + +def esim_profile_introspect(upp): + pes = ProfileElementSequence.from_der(upp.read()) + d = {} + d['upp'] = repr(pes) + + def show_bytes_as_hexdump(item): + if isinstance(item, bytes): + return bytes_to_hexstr(item) + if isinstance(item, list): + return list(show_bytes_as_hexdump(i) for i in item) + if isinstance(item, tuple): + return tuple(show_bytes_as_hexdump(i) for i in item) + if isinstance(item, dict): + d = {} + for k, v in item.items(): + d[k] = show_bytes_as_hexdump(v) + return d + return item + + l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes) + d['pp'] = pprint.pformat(l, width=120) + return d