neels has uploaded this change for review. ( https://gerrit.osmocom.org/c/pysim/+/40096?usp=email )
Change subject: personalization: add param_source.py, implement batch personalization ......................................................................
personalization: add param_source.py, implement batch personalization
Implement pySim.esim.saip.personalization.BatchPersonalization, generating N eSIM profiles from a preset configuration.
Batch parameters can be fed by a constant, incrementing, random or from CSV rows: add pySim.esim.saip.param_source.* classes to feed such input to each of the BatchPersonalization's ConfigurableParameter instances.
Related: SYS#6768 Change-Id: I497c60c101ea0eea980e8b1a4b1f36c0eda39002 --- A pySim/esim/saip/param_source.py M pySim/esim/saip/personalization.py 2 files changed, 230 insertions(+), 1 deletion(-)
git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/96/40096/1
diff --git a/pySim/esim/saip/param_source.py b/pySim/esim/saip/param_source.py new file mode 100644 index 0000000..8fce881 --- /dev/null +++ b/pySim/esim/saip/param_source.py @@ -0,0 +1,167 @@ +# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization. +# +# (C) 2025 by sysmocom - s.f.m.c. GmbH info@sysmocom.de +# +# Author: nhofmeyr@sysmocom.de +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import random +from pySim.utils import all_subclasses_of + +class ParamSourceExn(Exception): + pass + +class ParamSourceExhaustedExn(ParamSourceExn): + pass + +class ParamSourceUndefinedExn(ParamSourceExn): + pass + +class ParamSource: + 'abstract parameter source' + + # This name should be short but descriptive, useful for a user interface, like 'random decimal digits'. + name = 'none' + + @classmethod + def get_all_implementations(cls, blacklist=None): + # return a set() so that multiple inheritance does not return dups + return set(c + for c in all_subclasses_of(cls) + if ((not blacklist) or (c not in blacklist)) + ) + + @classmethod + def from_str(cls, s:str): + '''if a parameter source defines some string input magic, override this function. + For example, a RandomDigitSource derives the number of digits from the string length, + so the user can enter '0000' to get a four digit random number.''' + return cls(s) + + def get_next(self, csv_row:dict=None): + '''return the next value from the parameter source. + When there are no more values from the source, raise a ParamSourceExhaustedExn.''' + raise ParamSourceExhaustedExn() + + +class ConstantSource(ParamSource): + 'one value for all' + name = 'constant' + + def __init__(self, val:str): + self.val = val + + def get_next(self, csv_row:dict=None): + return self.val + +class RandomDigitSource(ParamSource): + 'return a different sequence of random decimal digits each' + name = 'random decimal digits' + + def __init__(self, num_digits, first_value, last_value): + 'see from_str()' + num_digits = int(num_digits) + first_value = int(first_value) + last_value = int(last_value) + assert num_digits > 0 + assert first_value <= last_value + self.num_digits = num_digits + self.val_first_last = (first_value, last_value) + + def get_next(self, csv_row:dict=None): + val = random.randint(*self.val_first_last) # TODO secure random source? + return self.val_to_digit(val) + + def val_to_digit(self, val:int): + return '%0*d' % (self.num_digits, val) # pylint: disable=consider-using-f-string + + @classmethod + def from_str(cls, s:str): + if '..' in s: + first_str, last_str = s.split('..') + first_str = first_str.strip() + last_str = last_str.strip() + else: + first_str = s.strip() + last_str = None + + first_value = int(first_str) + last_value = int(last_str) if last_str is not None else '9' * len(first_str) + return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value) + +class RandomHexDigitSource(ParamSource): + 'return a different sequence of random hexadecimal digits each' + name = 'random hexadecimal digits' + + def __init__(self, num_digits): + 'see from_str()' + num_digits = int(num_digits) + if num_digits < 1: + raise ValueError('zero number of digits') + # hex digits always come in two + if (num_digits & 1) != 0: + raise ValueError(f'hexadecimal value should have even number of digits, not {num_digits}') + self.num_digits = num_digits + + def get_next(self, csv_row:dict=None): + val = random.randbytes(self.num_digits // 2) # TODO secure random source? + return val + + @classmethod + def from_str(cls, s:str): + return cls(num_digits=len(s.strip())) + +class IncDigitSource(RandomDigitSource): + 'incrementing sequence of digits' + name = 'incrementing decimal digits' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.next_val = None + self.reset() + + def reset(self): + self.next_val = self.val_first_last[0] + + def get_next(self, csv_row:dict=None): + val = self.next_val + if val is None: + raise ParamSourceExhaustedExn() + + returnval = self.val_to_digit(val) + + val += 1 + if val > self.val_first_last[1]: + self.next_val = None + else: + self.next_val = val + + return returnval + +class CsvSource(ParamSource): + 'apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)' + name = 'from CSV' + + def __init__(self, csv_column, *args, **kwargs): + super().__init__(*args, **kwargs) + self.csv_column = csv_column + + def get_next(self, csv_row:dict=None): + val = None + if csv_row: + val = csv_row.get(self.csv_column) + if not val: + raise ParamSourceUndefinedExn(f'no value for CSV column {self.csv_column!r}') + return val diff --git a/pySim/esim/saip/personalization.py b/pySim/esim/saip/personalization.py index 26fcbf7..160962f 100644 --- a/pySim/esim/saip/personalization.py +++ b/pySim/esim/saip/personalization.py @@ -17,11 +17,13 @@
import abc import io -from typing import List, Tuple +import copy +from typing import List, Tuple, Generator
from osmocom.tlv import camel_to_snake from pySim.utils import enc_iccid, enc_imsi, h2b, rpad, sanitize_iccid, all_subclasses_of from pySim.esim.saip import ProfileElement, ProfileElementSequence +from pySim.esim.saip import param_source
def remove_unwanted_tuples_from_list(l: List[Tuple], unwanted_keys: List[str]) -> List[Tuple]: """In a list of tuples, remove all tuples whose first part equals 'unwanted_key'.""" @@ -608,3 +610,63 @@ class Opc(K): name = 'OPc' algo_config_key = 'opc' + + +class BatchPersonalization: + + class ParamAndSrc: + 'tie a ConfigurableParameter to a source of actual values' + def __init__(self, param:ConfigurableParameter, src:param_source.ParamSource): + self.param = param + self.src = src + + def __init__(self, + n:int, + src_pes:ProfileElementSequence, + params:list[ParamAndSrc]=None, + csv_rows:Generator=None, + ): + self.n = n + self.params = params or [] + self.src_pes = src_pes + self.csv_rows = csv_rows + + def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource): + self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src)) + + def generate_profiles(self): + # get first row of CSV: column names + csv_columns = None + if self.csv_rows: + try: + csv_columns = next(self.csv_rows) + except StopIteration as e: + raise ValueError('the input CSV file appears to be empty') from e + + for i in range(self.n): + csv_row = None + if self.csv_rows and csv_columns: + try: + csv_row_list = next(self.csv_rows) + except StopIteration as e: + raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e + + csv_row = dict(zip(csv_columns, csv_row_list)) + + pes = copy.deepcopy(self.src_pes) + + for p in self.params: + try: + input_value = p.src.get_next(csv_row=csv_row) + assert input_value is not None + value = p.param.__class__.validate_val(input_value) + p.param.__class__.apply_val(pes, value) + except ( + TypeError, + ValueError, + KeyError, + ) as e: + raise ValueError(f'{p.param.name} fed by {p.src.name}: {e}' + f' (input_value={p.param.input_value!r} value={p.param.value!r})') from e + + yield pes