laforge has submitted this change. ( https://gerrit.osmocom.org/c/python/pyosmocom/+/39162?usp=email )
Change subject: construct: add length steps to StripTrailerAdapter ......................................................................
construct: add length steps to StripTrailerAdapter
The class StripTrailerAdapter allows to remove trailing bytes that match a specified value from the encoding result of a sub-construct. The result is always the shortest possible remainder of bytes that do not match the secified value.
Unfortunately there are specifications that explicitly require the length of the result to fit into a limited set of possible length values. For example: A result may be either one byte or three byte long. To cover those cases as well, let's add an array parameter where we can configure the allowed length values of the encoding result.
Related: OS#6679 Change-Id: I86df064fa41db85923eeb0d83cc399504fdd4488 --- M src/osmocom/construct.py M tests/test_construct.py 2 files changed, 41 insertions(+), 3 deletions(-)
Approvals: laforge: Looks good to me, approved Jenkins Builder: Verified
diff --git a/src/osmocom/construct.py b/src/osmocom/construct.py index c69b44f..d4284d9 100644 --- a/src/osmocom/construct.py +++ b/src/osmocom/construct.py @@ -413,16 +413,20 @@ Encoder removes all trailing bytes matching the default_value Decoder pads input data up to total_length with default_value
+ In case the encoding restricts the length of the result to specific values, the API user may set those restrictions + using the steps parameter. (e.g. encoded result must be either 1 or 3 byte long, steps would be set to [1,3]) + This is used in constellations like "FlagsEnum(StripTrailerAdapter(GreedyBytes, 3), ..." where you have a bit-mask that may have 1, 2 or 3 bytes, depending on whether or not any of the LSBs are actually set. """ - def __init__(self, subcon, total_length:int, default_value=b'\x00', min_len=1): + def __init__(self, subcon, total_length:int, default_value=b'\x00', min_len=1, steps:typing.List[int]=[]): super().__init__(subcon) assert len(default_value) == 1 self.total_length = total_length self.default_value = default_value self.min_len = min_len + self.steps = steps
def _decode(self, obj, context, path): assert isinstance(obj, bytes) @@ -435,9 +439,17 @@ assert isinstance(obj, int) obj = obj.to_bytes(self.total_length, 'big') # remove trailing bytes if they are zero + + obj_step_aligned = obj while len(obj) > self.min_len and obj[-1] == self.default_value[0]: obj = obj[:-1] - return obj + if len(obj) in self.steps: + obj_step_aligned = obj + + if self.steps == []: + return obj + else: + return obj_step_aligned;
def filter_dict(d, exclude_prefix='_'): diff --git a/tests/test_construct.py b/tests/test_construct.py index bcb2cf8..e4234db 100755 --- a/tests/test_construct.py +++ b/tests/test_construct.py @@ -90,13 +90,39 @@ final_application=0x0200, global_service=0x0100, receipt_generation=0x80, ciphered_load_file_data_block=0x40, contactless_activation=0x20, contactless_self_activation=0x10) + PrivilegesSteps = FlagsEnum(StripTrailerAdapter(GreedyBytes, 3, steps = [1,3]), security_domain=0x800000, + dap_verification=0x400000, + delegated_management=0x200000, card_lock=0x100000, + card_terminate=0x080000, card_reset=0x040000, + cvm_management=0x020000, mandated_dap_verification=0x010000, + trusted_path=0x8000, authorized_management=0x4000, + token_management=0x2000, global_delete=0x1000, + global_lock=0x0800, global_registry=0x0400, + final_application=0x0200, global_service=0x0100, + receipt_generation=0x80, ciphered_load_file_data_block=0x40, + contactless_activation=0x20, contactless_self_activation=0x10) examples = ['00', '80', '8040', '400010'] - def test_examples(self): + def test_encdec(self): for e in self.examples: dec = self.Privileges.parse(h2b(e)) reenc = self.Privileges.build(dec) self.assertEqual(e, b2h(reenc))
+ def test_enc(self): + enc = self.Privileges.build({'dap_verification' : True}) + self.assertEqual(b2h(enc), '40') + enc = self.Privileges.build({'dap_verification' : True, 'global_service' : True}) + self.assertEqual(b2h(enc), '4001') + enc = self.Privileges.build({'dap_verification' : True, 'global_service' : True, 'contactless_self_activation' : True}) + self.assertEqual(b2h(enc), '400110') + + enc = self.PrivilegesSteps.build({'dap_verification' : True}) + self.assertEqual(b2h(enc), '40') + enc = self.PrivilegesSteps.build({'dap_verification' : True, 'global_service' : True}) + self.assertEqual(b2h(enc), '400100') + enc = self.PrivilegesSteps.build({'dap_verification' : True, 'global_service' : True, 'contactless_self_activation' : True}) + self.assertEqual(b2h(enc), '400110') +
class TestAdapters(unittest.TestCase): def test_dns_adapter(self):