neels has uploaded this change for review.

View Change

personalization: add param_source.py, implement batch personalization

Implement pySim.esim.saip.personalization.BatchPersonalization,
generating N eSIM profiles from a preset configuration.

Batch parameters can be fed by a constant, incrementing, random or from
CSV rows: add pySim.esim.saip.param_source.* classes to feed such input
to each of the BatchPersonalization's ConfigurableParameter instances.

Related: SYS#6768
Change-Id: I497c60c101ea0eea980e8b1a4b1f36c0eda39002
---
A pySim/esim/saip/param_source.py
M pySim/esim/saip/personalization.py
2 files changed, 230 insertions(+), 1 deletion(-)

git pull ssh://gerrit.osmocom.org:29418/pysim refs/changes/96/40096/1
diff --git a/pySim/esim/saip/param_source.py b/pySim/esim/saip/param_source.py
new file mode 100644
index 0000000..8fce881
--- /dev/null
+++ b/pySim/esim/saip/param_source.py
@@ -0,0 +1,167 @@
+# Implementation of SimAlliance/TCA Interoperable Profile handling: parameter sources for batch personalization.
+#
+# (C) 2025 by sysmocom - s.f.m.c. GmbH <info@sysmocom.de>
+#
+# Author: nhofmeyr@sysmocom.de
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import random
+from pySim.utils import all_subclasses_of
+
+class ParamSourceExn(Exception):
+ pass
+
+class ParamSourceExhaustedExn(ParamSourceExn):
+ pass
+
+class ParamSourceUndefinedExn(ParamSourceExn):
+ pass
+
+class ParamSource:
+ 'abstract parameter source'
+
+ # This name should be short but descriptive, useful for a user interface, like 'random decimal digits'.
+ name = 'none'
+
+ @classmethod
+ def get_all_implementations(cls, blacklist=None):
+ # return a set() so that multiple inheritance does not return dups
+ return set(c
+ for c in all_subclasses_of(cls)
+ if ((not blacklist) or (c not in blacklist))
+ )
+
+ @classmethod
+ def from_str(cls, s:str):
+ '''if a parameter source defines some string input magic, override this function.
+ For example, a RandomDigitSource derives the number of digits from the string length,
+ so the user can enter '0000' to get a four digit random number.'''
+ return cls(s)
+
+ def get_next(self, csv_row:dict=None):
+ '''return the next value from the parameter source.
+ When there are no more values from the source, raise a ParamSourceExhaustedExn.'''
+ raise ParamSourceExhaustedExn()
+
+
+class ConstantSource(ParamSource):
+ 'one value for all'
+ name = 'constant'
+
+ def __init__(self, val:str):
+ self.val = val
+
+ def get_next(self, csv_row:dict=None):
+ return self.val
+
+class RandomDigitSource(ParamSource):
+ 'return a different sequence of random decimal digits each'
+ name = 'random decimal digits'
+
+ def __init__(self, num_digits, first_value, last_value):
+ 'see from_str()'
+ num_digits = int(num_digits)
+ first_value = int(first_value)
+ last_value = int(last_value)
+ assert num_digits > 0
+ assert first_value <= last_value
+ self.num_digits = num_digits
+ self.val_first_last = (first_value, last_value)
+
+ def get_next(self, csv_row:dict=None):
+ val = random.randint(*self.val_first_last) # TODO secure random source?
+ return self.val_to_digit(val)
+
+ def val_to_digit(self, val:int):
+ return '%0*d' % (self.num_digits, val) # pylint: disable=consider-using-f-string
+
+ @classmethod
+ def from_str(cls, s:str):
+ if '..' in s:
+ first_str, last_str = s.split('..')
+ first_str = first_str.strip()
+ last_str = last_str.strip()
+ else:
+ first_str = s.strip()
+ last_str = None
+
+ first_value = int(first_str)
+ last_value = int(last_str) if last_str is not None else '9' * len(first_str)
+ return cls(num_digits=len(first_str), first_value=first_value, last_value=last_value)
+
+class RandomHexDigitSource(ParamSource):
+ 'return a different sequence of random hexadecimal digits each'
+ name = 'random hexadecimal digits'
+
+ def __init__(self, num_digits):
+ 'see from_str()'
+ num_digits = int(num_digits)
+ if num_digits < 1:
+ raise ValueError('zero number of digits')
+ # hex digits always come in two
+ if (num_digits & 1) != 0:
+ raise ValueError(f'hexadecimal value should have even number of digits, not {num_digits}')
+ self.num_digits = num_digits
+
+ def get_next(self, csv_row:dict=None):
+ val = random.randbytes(self.num_digits // 2) # TODO secure random source?
+ return val
+
+ @classmethod
+ def from_str(cls, s:str):
+ return cls(num_digits=len(s.strip()))
+
+class IncDigitSource(RandomDigitSource):
+ 'incrementing sequence of digits'
+ name = 'incrementing decimal digits'
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.next_val = None
+ self.reset()
+
+ def reset(self):
+ self.next_val = self.val_first_last[0]
+
+ def get_next(self, csv_row:dict=None):
+ val = self.next_val
+ if val is None:
+ raise ParamSourceExhaustedExn()
+
+ returnval = self.val_to_digit(val)
+
+ val += 1
+ if val > self.val_first_last[1]:
+ self.next_val = None
+ else:
+ self.next_val = val
+
+ return returnval
+
+class CsvSource(ParamSource):
+ 'apply a column from a CSV row, as passed in to ParamSource.get_next(csv_row)'
+ name = 'from CSV'
+
+ def __init__(self, csv_column, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.csv_column = csv_column
+
+ def get_next(self, csv_row:dict=None):
+ val = None
+ if csv_row:
+ val = csv_row.get(self.csv_column)
+ if not val:
+ raise ParamSourceUndefinedExn(f'no value for CSV column {self.csv_column!r}')
+ return val
diff --git a/pySim/esim/saip/personalization.py b/pySim/esim/saip/personalization.py
index 26fcbf7..160962f 100644
--- a/pySim/esim/saip/personalization.py
+++ b/pySim/esim/saip/personalization.py
@@ -17,11 +17,13 @@

import abc
import io
-from typing import List, Tuple
+import copy
+from typing import List, Tuple, Generator

from osmocom.tlv import camel_to_snake
from pySim.utils import enc_iccid, enc_imsi, h2b, rpad, sanitize_iccid, all_subclasses_of
from pySim.esim.saip import ProfileElement, ProfileElementSequence
+from pySim.esim.saip import param_source

def remove_unwanted_tuples_from_list(l: List[Tuple], unwanted_keys: List[str]) -> List[Tuple]:
"""In a list of tuples, remove all tuples whose first part equals 'unwanted_key'."""
@@ -608,3 +610,63 @@
class Opc(K):
name = 'OPc'
algo_config_key = 'opc'
+
+
+class BatchPersonalization:
+
+ class ParamAndSrc:
+ 'tie a ConfigurableParameter to a source of actual values'
+ def __init__(self, param:ConfigurableParameter, src:param_source.ParamSource):
+ self.param = param
+ self.src = src
+
+ def __init__(self,
+ n:int,
+ src_pes:ProfileElementSequence,
+ params:list[ParamAndSrc]=None,
+ csv_rows:Generator=None,
+ ):
+ self.n = n
+ self.params = params or []
+ self.src_pes = src_pes
+ self.csv_rows = csv_rows
+
+ def add_param_and_src(self, param:ConfigurableParameter, src:param_source.ParamSource):
+ self.params.append(BatchPersonalization.ParamAndSrc(param=param, src=src))
+
+ def generate_profiles(self):
+ # get first row of CSV: column names
+ csv_columns = None
+ if self.csv_rows:
+ try:
+ csv_columns = next(self.csv_rows)
+ except StopIteration as e:
+ raise ValueError('the input CSV file appears to be empty') from e
+
+ for i in range(self.n):
+ csv_row = None
+ if self.csv_rows and csv_columns:
+ try:
+ csv_row_list = next(self.csv_rows)
+ except StopIteration as e:
+ raise ValueError(f'not enough rows in the input CSV for eSIM nr {i+1} of {self.n}') from e
+
+ csv_row = dict(zip(csv_columns, csv_row_list))
+
+ pes = copy.deepcopy(self.src_pes)
+
+ for p in self.params:
+ try:
+ input_value = p.src.get_next(csv_row=csv_row)
+ assert input_value is not None
+ value = p.param.__class__.validate_val(input_value)
+ p.param.__class__.apply_val(pes, value)
+ except (
+ TypeError,
+ ValueError,
+ KeyError,
+ ) as e:
+ raise ValueError(f'{p.param.name} fed by {p.src.name}: {e}'
+ f' (input_value={p.param.input_value!r} value={p.param.value!r})') from e
+
+ yield pes

To view, visit change 40096. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-MessageType: newchange
Gerrit-Project: pysim
Gerrit-Branch: master
Gerrit-Change-Id: I497c60c101ea0eea980e8b1a4b1f36c0eda39002
Gerrit-Change-Number: 40096
Gerrit-PatchSet: 1
Gerrit-Owner: neels <nhofmeyr@sysmocom.de>