neels has uploaded this change for review. (
https://gerrit.osmocom.org/c/pysim/+/40208?usp=email )
Change subject: personalization: implement UppAudit and BatchAudit
......................................................................
personalization: implement UppAudit and BatchAudit
Change-Id: Iaab336ca91b483ecdddd5c6c8e08dc475dc6bd0a
---
M pySim/esim/saip/personalization.py
1 file changed, 212 insertions(+), 0 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/08/40208/1
diff --git a/pySim/esim/saip/personalization.py b/pySim/esim/saip/personalization.py
index 7217af3..3919e79 100644
--- a/pySim/esim/saip/personalization.py
+++ b/pySim/esim/saip/personalization.py
@@ -277,6 +277,13 @@
and ((not blacklist) or (c not in blacklist)))
)
+ @classmethod
+ def is_super_of(cls, other_class):
+ try:
+ return issubclass(other_class, cls)
+ except TypeError:
+ return False
+
class DecimalParam(ConfigurableParameter):
"""Decimal digits. The input value may be a string of decimal digits
like '012345', or an int. The output of
validate_val() is a string with only decimal digits 0-9, in the required length with
leading zeros if necessary.
@@ -1156,3 +1163,208 @@
f' (input_value={p.param.input_value!r}
value={p.param.value!r})') from e
yield pes
+
+
+class UppAudit(dict):
+ """
+ Key-value pairs collected from a single UPP DER or PES.
+
+ UppAudit itself is a dict, callers may use the standard python dict API to access
key-value pairs read from the UPP.
+ """
+
+ @classmethod
+ def from_der(cls, der: bytes, params: List):
+ '''return a dict of parameter name and set of parameter values found
in a DER encoded profile.
+ Read all parameters listed in params. This calls only classmethods, so each entry
in params can either be a class or
+ an instance of a class, of a (non-abstract) ConfigurableParameter subclass. For
example, params = [Imsi, ] is
+ equivalent to params = [Imsi(), ].'''
+ upp_audit = cls()
+
+ upp_audit['der_size'] = set((len(der), ))
+
+ pes = ProfileElementSequence.from_der(der)
+ for param in params:
+ key = param.get_name()
+ if key in upp_audit:
+ raise ValueError(f'UPP audit: there seem to be two conflicting
parameters with the name {key!r}: '
+ + ',
'.join(f"{param.get_name()}={param.__name__}" for param in params))
+ try:
+ for valdict in param.get_values_from_pes(pes):
+ upp_audit.add_values(valdict)
+ except (TypeError, ValueError) as e:
+ raise ValueError(f'Error during audit for parameter {key}: {e}')
from e
+ return upp_audit
+
+ def get_single_val(self, param, validate=True, allow_absent=False, absent_val=None):
+ """
+ Return the audit's value for the given ConfigurableParameter class.
+ Any kind of value may occur multiple times in a profile. When all of these agree
to the same unambiguous value,
+ return that value. When they do not agree, raise a ValueError.
+ """
+ cp = None
+ if ConfigurableParameter.is_super_of(param):
+ cp = param
+ key = param.name
+ else:
+ key = param
+ assert isinstance(key, str)
+ v = self.get(key)
+ if v is None and allow_absent:
+ return absent_val
+ if not isinstance(v, set):
+ raise ValueError(f'audit value should be a set(), got {v!r}')
+ if len(v) != 1:
+ raise ValueError(f'expected a single value for {key}, got {v!r}')
+ v = tuple(v)[0]
+ if validate and cp:
+ # run value by the ConfigurableParameter's validation.
+ # (do not use the returned value, because the returned value is encoded for a
PES)
+ cp.validate_val(v)
+ return v
+
+ @staticmethod
+ def audit_val_to_str(v):
+ """
+ Usually, we want to see a single value in an audit. Still, to be able to collect
multiple ambiguous values,
+ audit values are always python sets. Turn it into a nice string representation:
only the value when it is
+ unambiguous, otherwise a list of the ambiguous values.
+ A value may also be completely absent, then return 'not present'.
+ """
+ def try_single_val(w):
+ 'change single-entry sets to just the single value'
+ if isinstance(w, set):
+ if len(w) == 1:
+ return tuple(w)[0]
+ if len(w) == 0:
+ return None
+ return w
+
+ v = try_single_val(v)
+ if isinstance(v, bytes):
+ v = bytes_to_hexstr(v)
+ if v is None:
+ return 'not present'
+ return str(v)
+
+ def get_val_str(self, key):
+ """Return a string of the value stored for the given
key"""
+ return UppAudit.audit_val_to_str(self.get(key))
+
+ def add_values(self, src:dict):
+ """self and src are both a dict of sets.
+ For example from
+ self == { 'a': set((123,)) }
+ and
+ src == { 'a': set((456,)), 'b': set((789,)) }
+ then after this function call:
+ self == { 'a': set((123, 456,)), 'b': set((789,)) }
+ """
+ assert isinstance(src, dict)
+ for key, srcvalset in src.items():
+ dstvalset = self.get(key)
+ if dstvalset is None:
+ dstvalset = set()
+ self[key] = dstvalset
+ dstvalset.add(srcvalset)
+
+ def __str__(self):
+ return '\n'.join(f'{key}: {self.get_val_str(key)}' for key in
sorted(self.keys()))
+
+class BatchAudit(list):
+ """
+ Collect UppAudit instances for a batch of UPP, for example from a
personalization.BatchPersonalization.
+ Produce an output CSV.
+
+ Usage example:
+
+ ba = BatchAudit(params=(personalization.Iccid, ))
+ for upp_der in upps:
+ ba.add_audit(upp_der)
+ print(ba.summarize())
+
+ with open('output.csv', 'wb') as csv_data:
+ csv_str = io.TextIOWrapper(csv_data, 'utf-8', newline='')
+ csv.writer(csv_str).writerows( ba.to_csv_rows() )
+ csv_str.flush()
+
+ BatchAudit itself is a list, callers may use the standard python list API to access
the UppAudit instances.
+ """
+
+ def __init__(self, params:List=None):
+ if params is None:
+ params = ConfigurableParameter.get_all_implementations()
+ self.params = params
+
+ def add_audit(self, upp_der:bytes):
+ audit = UppAudit.from_der(upp_der, self.params)
+ self.append(audit)
+ return audit
+
+ def summarize(self):
+ batch_audit = UppAudit()
+
+ audits = self
+
+ if len(audits) > 2:
+ val_sep = ', ..., '
+ else:
+ val_sep = ', '
+
+ first_audit = None
+ last_audit = None
+ if len(audits) >= 1:
+ first_audit = audits[0]
+ if len(audits) >= 2:
+ last_audit = audits[-1]
+
+ if first_audit:
+ if last_audit:
+ for key in first_audit.keys():
+ first_val = first_audit.get_val_str(key)
+ last_val = last_audit.get_val_str(key)
+
+ if first_val == last_val:
+ val = first_val
+ else:
+ val_sep_with_newline = f"{val_sep.rstrip()}\n{' ' *
(len(key) + 2)}"
+ val = val_sep_with_newline.join((first_val, last_val))
+ batch_audit[key] = val
+ else:
+ batch_audit.update(first_audit)
+
+ return batch_audit
+
+ def to_csv_rows(self, headers=True):
+ '''generator that yields all audits' values as rows, useful feed
to a csv.writer.'''
+ params = tuple(sorted(self.params, key=lambda param: param.get_name()))
+ if headers:
+ yield (p.get_name() for p in params)
+
+ for audit in self:
+ yield (audit.get_single_val(p, allow_absent=True, absent_val="")
for p in params)
+
+def bytes_to_hexstr(b:bytes, sep=''):
+ return sep.join(f'{x:02x}' for x in b)
+
+def esim_profile_introspect(upp):
+ pes = ProfileElementSequence.from_der(upp.read())
+ d = {}
+ d['upp'] = repr(pes)
+
+ def show_bytes_as_hexdump(item):
+ if isinstance(item, bytes):
+ return bytes_to_hexstr(item)
+ if isinstance(item, list):
+ return list(show_bytes_as_hexdump(i) for i in item)
+ if isinstance(item, tuple):
+ return tuple(show_bytes_as_hexdump(i) for i in item)
+ if isinstance(item, dict):
+ d = {}
+ for k, v in item.items():
+ d[k] = show_bytes_as_hexdump(v)
+ return d
+ return item
+
+ l = list((pe.type, show_bytes_as_hexdump(pe.decoded)) for pe in pes)
+ d['pp'] = pprint.pformat(l, width=120)
+ return d
--
To view, visit
https://gerrit.osmocom.org/c/pysim/+/40208?usp=email
To unsubscribe, or for help writing mail filters, visit
https://gerrit.osmocom.org/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: pysim
Gerrit-Branch: master
Gerrit-Change-Id: Iaab336ca91b483ecdddd5c6c8e08dc475dc6bd0a
Gerrit-Change-Number: 40208
Gerrit-PatchSet: 1
Gerrit-Owner: neels <nhofmeyr(a)sysmocom.de>