[PATCH] osmo-gsm-tester[master]: ofono_client: Discover modem path from imsi

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

Pau Espin Pedrol gerrit-no-reply at lists.osmocom.org
Thu May 18 16:09:25 UTC 2017


Hello Neels Hofmeyr, Jenkins Builder,

I'd like you to reexamine a change.  Please visit

    https://gerrit.osmocom.org/2676

to look at the new patch set (#2).

ofono_client: Discover modem path from imsi

Change-Id: Ib9f4de81abc18e8db0c15df965e4811b6513e1b1
---
M example/resources.conf
M src/osmo_gsm_tester/ofono_client.py
2 files changed, 87 insertions(+), 15 deletions(-)


  git pull ssh://gerrit.osmocom.org:29418/osmo-gsm-tester refs/changes/76/2676/2

diff --git a/example/resources.conf b/example/resources.conf
index cd0216e..b7ac5a3 100644
--- a/example/resources.conf
+++ b/example/resources.conf
@@ -60,21 +60,17 @@
 
 modem:
 - label: sierra_1
-  path: '/sierra_1'
   imsi: '901700000009031'
   ki: '80A37E6FDEA931EAC92FFA5F671EFEAD'
 
 - label: sierra_2
-  path: '/sierra_2'
   imsi: '901700000009029'
   ki: '00969E283349D354A8239E877F2E0866'
 
 - label: gobi_0
-  path: '/gobi_0'
   imsi: '901700000009030'
   ki: 'BB70807226393CDBAC8DD3439FF54252'
 
 - label: gobi_3
-  path: '/gobi_3'
   imsi: '901700000009032'
   ki: '2F70DCA43C45ACB97E947FDD0C7CA30A'
diff --git a/src/osmo_gsm_tester/ofono_client.py b/src/osmo_gsm_tester/ofono_client.py
index faa9192..70cdeb6 100644
--- a/src/osmo_gsm_tester/ofono_client.py
+++ b/src/osmo_gsm_tester/ofono_client.py
@@ -28,9 +28,12 @@
 glib_main_ctx = glib_main_loop.get_context()
 bus = SystemBus()
 
+imsi_modem_map = None
+
 I_MODEM = 'org.ofono.Modem'
 I_NETREG = 'org.ofono.NetworkRegistration'
 I_SMS = 'org.ofono.MessageManager'
+I_SIM = 'org.ofono.SimManager'
 
 class DeferredHandling:
     defer_queue = []
@@ -57,12 +60,33 @@
     DeferredHandling.defer_queue.'''
     return DeferredHandling(dbus_iface, handler).subscription_id
 
-
 def poll():
     global glib_main_ctx
     while glib_main_ctx.pending():
         glib_main_ctx.iteration()
     DeferredHandling.handle_queue()
+
+
+def _wait(condition, condition_args, condition_kwargs, timeout, timestep):
+    if not timeout or timeout < 0:
+        raise RuntimeError('wait() *must* time out at some point. timeout=%r' % timeout)
+    if timestep < 0.1:
+        timestep = 0.1
+
+    started = time.time()
+    while True:
+        poll()
+        if condition(*condition_args, **condition_kwargs):
+            return True
+        waited = time.time() - started
+        if waited > timeout:
+            return False
+        time.sleep(timestep)
+
+def wait(log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
+    if not _wait(condition, condition_args, condition_kwargs, timeout, timestep):
+        log_obj.raise_exn('Wait timeout')
+
 
 def systembus_get(path):
     global bus
@@ -73,6 +97,51 @@
     return sorted(root.GetModems())
 
 
+def modem_is_powered(dbus_obj, val):
+    modem_prop = dbus_obj[I_MODEM].GetProperties()
+    return modem_prop.get('Powered') == val
+
+def modem_has_val_in_property(dbus_obj, iface, prop_name, val):
+    modem_prop = dbus_obj[iface].GetProperties()
+    return val in modem_prop.get(prop_name)
+
+def get_imsi_modem_map(log_obj):
+    global imsi_modem_map
+    if imsi_modem_map is not None:
+        return imsi_modem_map
+    imsi_modem_map = {}
+    poll()
+    modems = list_modems()
+    for modem_path, prop in modems:
+        log_obj.dbg('Discovering imsi for modem %s' % modem_path)
+        dbus_obj = systembus_get(modem_path)
+        modem_prop = dbus_obj[I_MODEM].GetProperties()
+        powered = modem_prop.get('Powered')
+        if not powered:
+            print('Powering on %s' % modem_path)
+            dbus_obj[I_MODEM].SetProperty('Powered', Variant('b', True))
+            wait(log_obj, modem_is_powered, dbus_obj, True)
+
+        try:
+            has_sim_feature = True
+            wait(log_obj, modem_has_val_in_property, dbus_obj, I_MODEM, 'Features', 'sim')
+        except RuntimeError:
+            has_sim_feature = False
+            log_obj.dbg('modem %s has no sim feature, skipping' % modem_path)
+        if has_sim_feature:
+            wait(log_obj, modem_has_val_in_property, dbus_obj, I_MODEM, 'Interfaces', I_SIM)
+            sim_prop = dbus_obj[I_SIM].GetProperties()
+            imsi = sim_prop.get('SubscriberIdentity')
+            if imsi:
+                imsi_modem_map[imsi] = modem_path
+
+        #Leave in same status as it was:
+        if not powered:
+            dbus_obj[I_MODEM].SetProperty('Powered', Variant('b', False))
+            wait(log_obj, modem_is_powered, dbus_obj, False)
+    log_obj.log('imsi->modem map:', imsi_modem_map)
+
+
 class Modem(log.Origin):
     'convenience for ofono Modem interaction'
     msisdn = None
@@ -80,39 +149,46 @@
 
     def __init__(self, conf):
         self.conf = conf
-        self.path = conf.get('path')
-        self.set_name(self.path)
+        self.set_name(self.label() if self.label() else self.imsi())
         self.set_log_category(log.C_BUS)
+        if not self.imsi():
+            self.raise_exn('No IMSI')
+        imsi_map = get_imsi_modem_map(self)
         self._dbus_obj = None
         self._interfaces = set()
         self._connected_signals = util.listdict()
         self.sms_received_list = []
+        # Try to find path if not provided:
+        self.path = conf.get('path')
+        if not self.path:
+            self.path = imsi_map.get(self.imsi())
+            if not self.path:
+                self.raise_exn('Unable to find modem with IMSI %s (%s)' % (self.imsi(), self.label()))
         # init interfaces and connect to signals:
         self.dbus_obj()
-        test.poll()
+        poll()
 
     def set_msisdn(self, msisdn):
         self.msisdn = msisdn
 
+    def label(self):
+        return self.conf.get('label')
+
     def imsi(self):
-        imsi = self.conf.get('imsi')
-        if not imsi:
-            with self:
-                raise RuntimeError('No IMSI')
-        return imsi
+        return self.conf.get('imsi')
 
     def ki(self):
         return self.conf.get('ki')
 
     def _dbus_set_bool(self, name, bool_val, iface=I_MODEM):
         # to make sure any pending signals are received before we send out more DBus requests
-        test.poll()
+        poll()
 
         val = bool(bool_val)
         self.log('Setting', name, val)
         self.dbus_obj()[iface].SetProperty(name, Variant('b', val))
 
-        test.wait(self.property_is, name, bool_val)
+        wait(self, self.property_is, name, bool_val)
 
     def property_is(self, name, val):
         is_val = self.properties().get(name)

-- 
To view, visit https://gerrit.osmocom.org/2676
To unsubscribe, visit https://gerrit.osmocom.org/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: Ib9f4de81abc18e8db0c15df965e4811b6513e1b1
Gerrit-PatchSet: 2
Gerrit-Project: osmo-gsm-tester
Gerrit-Branch: master
Gerrit-Owner: Pau Espin Pedrol <pespin at sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: Neels Hofmeyr <nhofmeyr at sysmocom.de>
Gerrit-Reviewer: Pau Espin Pedrol <pespin at sysmocom.de>



More information about the gerrit-log mailing list