laforge has submitted this change. ( https://gerrit.osmocom.org/c/pysim/+/41450?usp=email )
Change subject: card_key_provider: separate and refactor CSV column encryption ......................................................................
card_key_provider: separate and refactor CSV column encryption
The CardKeyProviderCsv class implements a column decryption scheme where columns are protected using a transport key. The CSV files are enrcypted using contrib/csv-encrypt-columns.py.
The current implementation has two main problems:
- The decryption code in CardKeyProviderCsv is not specific to CSV files. It could be re-used in other formats, for example to decrypt columns (fields) red from a database. So let's split the decryption code in a separate class.
- The encryption code in csv-encrypt-columns.py accesses methods and properties in CardKeyProviderCsv. Also having the coresponding encryption code somewhere out of tree may be confusing. Let's improve the design and put encryption and decryption functions in a single class. Let's also make sure the encryption/decryption is covered by unittests.
Related: SYS#7725 Change-Id: I180457d4938f526d227c81020e4e03c6b3a57dab --- M contrib/csv-encrypt-columns.py M pySim/card_key_provider.py M tests/unittests/test_card_key_provider.py 3 files changed, 165 insertions(+), 60 deletions(-)
Approvals: laforge: Looks good to me, approved Jenkins Builder: Verified
diff --git a/contrib/csv-encrypt-columns.py b/contrib/csv-encrypt-columns.py index 49340a2..9967005 100755 --- a/contrib/csv-encrypt-columns.py +++ b/contrib/csv-encrypt-columns.py @@ -24,20 +24,12 @@ from Cryptodome.Cipher import AES from osmocom.utils import h2b, b2h, Hexstr
-from pySim.card_key_provider import CardKeyProviderCsv +from pySim.card_key_provider import CardKeyFieldCryptor
-def dict_keys_to_upper(d: dict) -> dict: - return {k.upper():v for k,v in d.items()} - -class CsvColumnEncryptor: +class CsvColumnEncryptor(CardKeyFieldCryptor): def __init__(self, filename: str, transport_keys: dict): self.filename = filename - self.transport_keys = dict_keys_to_upper(transport_keys) - - def encrypt_col(self, colname:str, value: str) -> Hexstr: - key = self.transport_keys[colname] - cipher = AES.new(h2b(key), AES.MODE_CBC, CardKeyProviderCsv.IV) - return b2h(cipher.encrypt(h2b(value))) + self.crypt = CardKeyFieldCryptor(transport_keys)
def encrypt(self) -> None: with open(self.filename, 'r') as infile: @@ -49,9 +41,8 @@ cw.writeheader()
for row in cr: - for key_colname in self.transport_keys: - if key_colname in row: - row[key_colname] = self.encrypt_col(key_colname, row[key_colname]) + for fieldname in cr.fieldnames: + row[fieldname] = self.crypt.encrypt_field(fieldname, row[fieldname]) cw.writerow(row)
if __name__ == "__main__": @@ -71,9 +62,5 @@ print("You must specify at least one key!") sys.exit(1)
- csv_column_keys = CardKeyProviderCsv.process_transport_keys(csv_column_keys) - for name, key in csv_column_keys.items(): - print("Encrypting column %s using AES key %s" % (name, key)) - cce = CsvColumnEncryptor(opts.CSVFILE, csv_column_keys) cce.encrypt() diff --git a/pySim/card_key_provider.py b/pySim/card_key_provider.py index 9f9dc70..561418f 100644 --- a/pySim/card_key_provider.py +++ b/pySim/card_key_provider.py @@ -10,7 +10,7 @@ operation with pySim-shell. """
-# (C) 2021-2024 by Sysmocom s.f.m.c. GmbH +# (C) 2021-2025 by Sysmocom s.f.m.c. GmbH # All Rights Reserved # # Author: Philipp Maier, Harald Welte @@ -31,23 +31,104 @@ from typing import List, Dict, Optional from Cryptodome.Cipher import AES from osmocom.utils import h2b, b2h +from pySim.log import PySimLogger
import abc import csv +import logging + +log = PySimLogger.get("CARDKEY")
card_key_providers = [] # type: List['CardKeyProvider']
-# well-known groups of columns relate to a given functionality. This avoids having -# to specify the same transport key N number of times, if the same key is used for multiple -# fields of one group, like KIC+KID+KID of one SD. -CRYPT_GROUPS = { - 'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'], - 'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'], - 'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'], - 'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'], - 'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'], +class CardKeyFieldCryptor: + """ + A Card key field encryption class that may be used by Card key provider implementations to add support for + a column-based encryption to protect sensitive material (cryptographic key material, ADM keys, etc.). + The sensitive material is encrypted using a "key-encryption key", occasionally also known as "transport key" + before it is stored into a file or database (see also GSMA FS.28). The "transport key" is then used to decrypt + the key material on demand. + """ + + # well-known groups of columns relate to a given functionality. This avoids having + # to specify the same transport key N number of times, if the same key is used for multiple + # fields of one group, like KIC+KID+KID of one SD. + __CRYPT_GROUPS = { + 'UICC_SCP02': ['UICC_SCP02_KIC1', 'UICC_SCP02_KID1', 'UICC_SCP02_KIK1'], + 'UICC_SCP03': ['UICC_SCP03_KIC1', 'UICC_SCP03_KID1', 'UICC_SCP03_KIK1'], + 'SCP03_ISDR': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDR', 'SCP03_DEK_ISDR'], + 'SCP03_ISDA': ['SCP03_ENC_ISDR', 'SCP03_MAC_ISDA', 'SCP03_DEK_ISDA'], + 'SCP03_ECASD': ['SCP03_ENC_ECASD', 'SCP03_MAC_ECASD', 'SCP03_DEK_ECASD'], }
+ __IV = b'\x23' * 16 + + @staticmethod + def __dict_keys_to_upper(d: dict) -> dict: + return {k.upper():v for k,v in d.items()} + + @staticmethod + def __process_transport_keys(transport_keys: dict, crypt_groups: dict): + """Apply a single transport key to multiple fields/columns, if the name is a group.""" + new_dict = {} + for name, key in transport_keys.items(): + if name in crypt_groups: + for field in crypt_groups[name]: + new_dict[field] = key + else: + new_dict[name] = key + return new_dict + + def __init__(self, transport_keys: dict): + """ + Create new field encryptor/decryptor object and set transport keys, usually one for each column. In some cases + it is also possible to use a single key for multiple columns (see also __CRYPT_GROUPS) + + Args: + transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the + respective field (column) of the CSV. This is done so that different fields + (columns) can use different transport keys, which is strongly recommended by + GSMA FS.28 + """ + self.transport_keys = self.__process_transport_keys(self.__dict_keys_to_upper(transport_keys), + self.__CRYPT_GROUPS) + for name, key in self.transport_keys.items(): + log.debug("Encrypting/decrypting field %s using AES key %s" % (name, key)) + + def decrypt_field(self, field_name: str, encrypted_val: str) -> str: + """ + Decrypt a single field. The decryption is only applied if we have a transport key is known under the provided + field name, otherwise the field is treated as plaintext and passed through as it is. + + Args: + field_name : name of the field to decrypt (used to identify which key to use) + encrypted_val : encrypted field value + + Returns: + plaintext field value + """ + if not field_name.upper() in self.transport_keys: + return encrypted_val + cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV) + return b2h(cipher.decrypt(h2b(encrypted_val))) + + def encrypt_field(self, field_name: str, plaintext_val: str) -> str: + """ + Encrypt a single field. The encryption is only applied if we have a transport key is known under the provided + field name, otherwise the field is treated as non sensitive and passed through as it is. + + Args: + field_name : name of the field to decrypt (used to identify which key to use) + encrypted_val : encrypted field value + + Returns: + plaintext field value + """ + if not field_name.upper() in self.transport_keys: + return plaintext_val + cipher = AES.new(h2b(self.transport_keys[field_name.upper()]), AES.MODE_CBC, self.__IV) + return b2h(cipher.encrypt(h2b(plaintext_val))) + class CardKeyProvider(abc.ABC): """Base class, not containing any concrete implementation."""
@@ -89,13 +170,9 @@ dictionary of {field, value} strings for each requested field from 'fields' """
- class CardKeyProviderCsv(CardKeyProvider): - """Card key provider implementation that allows to query against a specified CSV file. - Supports column-based encryption as it is generally a bad idea to store cryptographic key material in - plaintext. Instead, the key material should be encrypted by a "key-encryption key", occasionally also - known as "transport key" (see GSMA FS.28).""" - IV = b'\x23' * 16 + """Card key provider implementation that allows to query against a specified CSV file.""" + csv_file = None filename = None
@@ -103,35 +180,13 @@ """ Args: filename : file name (path) of CSV file containing card-individual key/data - transport_keys : a dict indexed by field name, whose values are hex-encoded AES keys for the - respective field (column) of the CSV. This is done so that different fields - (columns) can use different transport keys, which is strongly recommended by - GSMA FS.28 + transport_keys : (see class CardKeyFieldCryptor) """ self.csv_file = open(filename, 'r') if not self.csv_file: raise RuntimeError("Could not open CSV file '%s'" % filename) self.filename = filename - self.transport_keys = self.process_transport_keys(transport_keys) - - @staticmethod - def process_transport_keys(transport_keys: dict): - """Apply a single transport key to multiple fields/columns, if the name is a group.""" - new_dict = {} - for name, key in transport_keys.items(): - if name in CRYPT_GROUPS: - for field in CRYPT_GROUPS[name]: - new_dict[field] = key - else: - new_dict[name] = key - return new_dict - - def _decrypt_field(self, field_name: str, encrypted_val: str) -> str: - """decrypt a single field, if we have a transport key for the field of that name.""" - if not field_name in self.transport_keys: - return encrypted_val - cipher = AES.new(h2b(self.transport_keys[field_name]), AES.MODE_CBC, self.IV) - return b2h(cipher.decrypt(h2b(encrypted_val))) + self.crypt = CardKeyFieldCryptor(transport_keys)
def get(self, fields: List[str], key: str, value: str) -> Dict[str, str]: super()._verify_get_data(fields, key, value) @@ -147,7 +202,7 @@ if row[key] == value: for f in fields: if f in row: - rc.update({f: self._decrypt_field(f, row[f])}) + rc.update({f: self.crypt.decrypt_field(f, row[f])}) else: raise RuntimeError("CSV-File '%s' lacks column '%s'" % (self.filename, f)) return rc diff --git a/tests/unittests/test_card_key_provider.py b/tests/unittests/test_card_key_provider.py index 03148e2..bbe76e8 100644 --- a/tests/unittests/test_card_key_provider.py +++ b/tests/unittests/test_card_key_provider.py @@ -4,7 +4,7 @@ import os from pySim.card_key_provider import *
-class TestCardKeyProvider(unittest.TestCase): +class TestCardKeyProviderCsv(unittest.TestCase):
def __init__(self, *args, **kwargs): column_keys = {"KI" : "000424252525535532532A0B0C0D0E0F", @@ -76,5 +76,68 @@ result = card_key_provider_get_field("KIC1", "ICCID", t.get('ICCID')) self.assertEqual(result, t.get('EXPECTED'))
+class TestCardKeyFieldCryptor(unittest.TestCase): + + def __init__(self, *args, **kwargs): + transport_keys = {"KI" : "000424252525535532532A0B0C0D0E0F", + "OPC" : "000102030405065545645645645D0E0F", + "KIC1" : "06410203546406456456450B0C0D0E0F", + "UICC_SCP03" : "00040267840507667609045645645E0F"} + self.crypt = CardKeyFieldCryptor(transport_keys) + super().__init__(*args, **kwargs) + + def test_encrypt_field(self): + test_data = [{'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "OPC"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"}, + {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIC1"}, + {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KID1"}, + {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "UICC_SCP03_KIK1"}, + {'EXPECTED' : "0b1e1e56cd62645aeb4c2d72a7c98f27", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "opc"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"}, + {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kic1"}, + {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kid1"}, + {'EXPECTED' : "00248276d2734f108f9761e2f98e2a9d", + 'PLAINTEXT_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "uicc_scp03_kik1"}] + + for t in test_data: + result = self.crypt.encrypt_field(t.get('FIELDNAME'), t.get('PLAINTEXT_VAL')) + self.assertEqual(result, t.get('EXPECTED')) + + def test_decrypt_field(self): + test_data = [{'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "OPC"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "NOCRYPT"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIC1"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KID1"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "UICC_SCP03_KIK1"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "0b1e1e56cd62645aeb4c2d72a7c98f27", 'FIELDNAME' : "opc"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "000102030405060708090a0b0c0d0e0f", 'FIELDNAME' : "nocrypt"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kic1"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kid1"}, + {'EXPECTED' : "000102030405060708090a0b0c0d0e0f", + 'ENCRYPTED_VAL' : "00248276d2734f108f9761e2f98e2a9d", 'FIELDNAME' : "uicc_scp03_kik1"}] + + for t in test_data: + result = self.crypt.decrypt_field(t.get('FIELDNAME'), t.get('ENCRYPTED_VAL')) + self.assertEqual(result, t.get('EXPECTED')) + + if __name__ == "__main__": unittest.main()