[PATCH] osmo-gsm-tester[master]: event_loop: Use glib as mainloop impl and move modem to use ...

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

Pau Espin Pedrol gerrit-no-reply at lists.osmocom.org
Wed Mar 28 17:18:58 UTC 2018


Review at  https://gerrit.osmocom.org/7559

event_loop: Use glib as mainloop impl and move modem to use event_loop

Several benefits:
- We can add APIs to poll on fds in the future (for smpp socket for
  instance) instead of using busy polling.
- During wait(), we now block in the glib mainloop instead of sleeping
  0.1 secs and not handling events during that time.
- We remove glib mainloop specific bits from modem.py

Change-Id: I8c3bc44bbe443703077110cdc67207e9cbb43767
---
M src/osmo_gsm_tester/event_loop.py
M src/osmo_gsm_tester/modem.py
2 files changed, 115 insertions(+), 54 deletions(-)


  git pull ssh://gerrit.osmocom.org:29418/osmo-gsm-tester refs/changes/59/7559/1

diff --git a/src/osmo_gsm_tester/event_loop.py b/src/osmo_gsm_tester/event_loop.py
index ebe6afb..068eca9 100644
--- a/src/osmo_gsm_tester/event_loop.py
+++ b/src/osmo_gsm_tester/event_loop.py
@@ -18,48 +18,129 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import time
+from gi.repository import GLib, GObject
+
 from . import log
 
-poll_funcs = []
+class DeferredHandling:
+    defer_queue = []
+
+    def handle_queue(self):
+        while DeferredHandling.defer_queue:
+            handler, args, kwargs = self.defer_queue.pop(0)
+            handler(*args, **kwargs)
+
+    def defer(self, handler, *args, **kwargs):
+        self.defer_queue.append((handler, args, kwargs))
+
+class WaitRequest:
+    timeout_ack = False
+    condition_ack = False
+
+    def __init__(self, condition, condition_args, condition_kwargs, timeout, timestep):
+        self.timeout_started = time.time()
+        self.timeout = timeout
+        self.condition = condition
+        self.condition_args = condition_args
+        self.condition_kwargs = condition_kwargs
+
+    def condition_check(self):
+        #print("_wait_condition_check")
+        waited = time.time() - self.timeout_started
+        if self.condition(*self.condition_args, **self.condition_kwargs):
+            self.condition_ack = True
+        elif waited > self.timeout:
+            self.timeout_ack = True
+
+class EventLoop:
+    poll_funcs = []
+    gloop = None
+    gctx = None
+    deferred_handling = None
+
+    def __init__(self):
+        self.gloop = GLib.MainLoop()
+        self.gctx = self.gloop.get_context()
+        self.deferred_handling = DeferredHandling()
+
+    def _trigger_cb_func(self, user_data):
+            self.defer(user_data)
+            return True #to retrigger the timeout
+
+    def defer(self, handler, *args, **kwargs):
+        self.deferred_handling.defer(handler, *args, **kwargs)
+
+    def register_poll_func(self, func, timestep=1):
+        id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, func) # in 1/1000th of a sec
+        self.poll_funcs.append((func, id))
+
+    def unregister_poll_func(self, func):
+        for pair in self.poll_funcs:
+            f, id = pair
+            if f == func:
+                GObject.source_remove(id)
+                self.poll_funcs.remove(pair)
+                return
+
+    def poll(self, may_block=False):
+        self.gctx.iteration(may_block)
+        self.deferred_handling.handle_queue()
+
+    def wait_no_raise(self, log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
+        if not timeout or timeout < 0:
+            self = log_obj
+            raise log.Error('wait() *must* time out at some point.', timeout=timeout)
+        if timestep < 0.1:
+            timestep = 0.1
+
+        wait_req = WaitRequest(condition, condition_args, condition_kwargs, timeout, timestep)
+        wait_id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, wait_req.condition_check)
+        while True:
+            self.poll(may_block=True)
+            if wait_req.condition_ack or wait_req.timeout_ack:
+                GObject.source_remove(wait_id)
+                success = wait_req.condition_ack
+                return success
+
+    def wait(self, log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
+        if not self.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
+            log.ctx(log_obj)
+            raise log.Error('Wait timeout')
+
+    def sleep(self, log_obj, seconds):
+        assert seconds > 0.
+        self.wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=seconds)
+
+
+evloop = EventLoop()
 
 def register_poll_func(func):
-    global poll_funcs
-    poll_funcs.append(func)
+    global evloop
+    evloop.register_poll_func(func)
 
 def unregister_poll_func(func):
-    global poll_funcs
-    poll_funcs.remove(func)
+    global evloop
+    evloop.unregister_poll_func(func)
 
 def poll():
-    global poll_funcs
-    for func in poll_funcs:
-        func()
+    global evloop
+    evloop.poll()
 
 def wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
-    if not timeout or timeout < 0:
-        self = log_obj
-        raise log.Error('wait() *must* time out at some point.', timeout=timeout)
-    if timestep < 0.1:
-        timestep = 0.1
-
-    started = time.time()
-    while True:
-        poll()
-        if condition(*condition_args, **condition_kwargs):
-            return True
-        waited = time.time() - started
-        if waited > timeout:
-            return False
-        time.sleep(timestep)
+    global evloop
+    evloop.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep)
 
 def wait(log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
-    if not wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
-        log.ctx(log_obj)
-        raise log.Error('Wait timeout')
+    global evloop
+    evloop.wait(log_obj, condition, *condition_args, timeout=timeout, timestep=timestep, **condition_kwargs)
 
 def sleep(log_obj, seconds):
-    assert seconds > 0.
-    wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=min(seconds, 1))
+    global evloop
+    evloop.sleep(log_obj, seconds)
+
+def defer(handler, *args, **kwargs):
+    global evloop
+    evloop.defer(handler, *args, **kwargs)
 
 
 # vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/modem.py b/src/osmo_gsm_tester/modem.py
index f50a291..8d41935 100644
--- a/src/osmo_gsm_tester/modem.py
+++ b/src/osmo_gsm_tester/modem.py
@@ -30,8 +30,6 @@
 Gio = get_introspection_module('Gio')
 
 from gi.repository import GLib
-glib_main_loop = GLib.MainLoop()
-glib_main_ctx = glib_main_loop.get_context()
 bus = SystemBus()
 
 I_MODEM = 'org.ofono.Modem'
@@ -49,24 +47,14 @@
 
 NETREG_MAX_REGISTER_ATTEMPTS = 3
 
-class DeferredHandling:
-    defer_queue = []
+class DeferredDBus:
 
     def __init__(self, dbus_iface, handler):
         self.handler = handler
         self.subscription_id = dbus_iface.connect(self.receive_signal)
 
     def receive_signal(self, *args, **kwargs):
-        DeferredHandling.defer_queue.append((self.handler, args, kwargs))
-
-    @staticmethod
-    def handle_queue():
-        while DeferredHandling.defer_queue:
-            handler, args, kwargs = DeferredHandling.defer_queue.pop(0)
-            handler(*args, **kwargs)
-
-def defer(handler, *args, **kwargs):
-    DeferredHandling.defer_queue.append((handler, args, kwargs))
+        event_loop.defer(self.handler, *args, **kwargs)
 
 def dbus_connect(dbus_iface, handler):
     '''This function shall be used instead of directly connecting DBus signals.
@@ -75,15 +63,7 @@
     so that a signal handler is invoked only after the DBus polling is through
     by enlisting signals that should be handled in the
     DeferredHandling.defer_queue.'''
-    return DeferredHandling(dbus_iface, handler).subscription_id
-
-def poll_glib():
-    global glib_main_ctx
-    while glib_main_ctx.pending():
-        glib_main_ctx.iteration()
-    DeferredHandling.handle_queue()
-
-event_loop.register_poll_func(poll_glib)
+    return DeferredDBus(dbus_iface, handler).subscription_id
 
 def systembus_get(path):
     global bus
@@ -493,8 +473,8 @@
         # waiting for that. Make it async and try to register when the scan is
         # finished.
         register_func = self.scan_cb_register_automatic if mcc_mnc is None else self.scan_cb_register
-        result_handler = lambda obj, result, user_data: defer(register_func, result, user_data)
-        error_handler = lambda obj, e, user_data: defer(self.scan_cb_error_handler, e, mcc_mnc)
+        result_handler = lambda obj, result, user_data: event_loop.defer(register_func, result, user_data)
+        error_handler = lambda obj, e, user_data: event_loop.defer(self.scan_cb_error_handler, e, mcc_mnc)
         dbus_async_call(netreg, netreg.Scan, timeout=30, cancellable=self.cancellable,
                         result_handler=result_handler, error_handler=error_handler,
                         user_data=mcc_mnc)
@@ -559,7 +539,7 @@
         self.cancellable.cancel()
         # Cancel op is applied as a signal coming from glib mainloop, so we
         # need to run it and wait for the callbacks to handle cancellations.
-        poll_glib()
+        event_loop.poll()
         # once it has been triggered, create a new one for next operation:
         self.cancellable = Gio.Cancellable.new()
 

-- 
To view, visit https://gerrit.osmocom.org/7559
To unsubscribe, visit https://gerrit.osmocom.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8c3bc44bbe443703077110cdc67207e9cbb43767
Gerrit-PatchSet: 1
Gerrit-Project: osmo-gsm-tester
Gerrit-Branch: master
Gerrit-Owner: Pau Espin Pedrol <pespin at sysmocom.de>



More information about the gerrit-log mailing list