This is merely a historical archive of years 2008-2021, before the migration to mailman3.
A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.
Pau Espin Pedrol gerrit-no-reply at lists.osmocom.orgReview at https://gerrit.osmocom.org/7559 event_loop: Use glib as mainloop impl and move modem to use event_loop Several benefits: - We can add APIs to poll on fds in the future (for smpp socket for instance) instead of using busy polling. - During wait(), we now block in the glib mainloop instead of sleeping 0.1 secs and not handling events during that time. - We remove glib mainloop specific bits from modem.py Change-Id: I8c3bc44bbe443703077110cdc67207e9cbb43767 --- M src/osmo_gsm_tester/event_loop.py M src/osmo_gsm_tester/modem.py 2 files changed, 115 insertions(+), 54 deletions(-) git pull ssh://gerrit.osmocom.org:29418/osmo-gsm-tester refs/changes/59/7559/1 diff --git a/src/osmo_gsm_tester/event_loop.py b/src/osmo_gsm_tester/event_loop.py index ebe6afb..068eca9 100644 --- a/src/osmo_gsm_tester/event_loop.py +++ b/src/osmo_gsm_tester/event_loop.py @@ -18,48 +18,129 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import time +from gi.repository import GLib, GObject + from . import log -poll_funcs = [] +class DeferredHandling: + defer_queue = [] + + def handle_queue(self): + while DeferredHandling.defer_queue: + handler, args, kwargs = self.defer_queue.pop(0) + handler(*args, **kwargs) + + def defer(self, handler, *args, **kwargs): + self.defer_queue.append((handler, args, kwargs)) + +class WaitRequest: + timeout_ack = False + condition_ack = False + + def __init__(self, condition, condition_args, condition_kwargs, timeout, timestep): + self.timeout_started = time.time() + self.timeout = timeout + self.condition = condition + self.condition_args = condition_args + self.condition_kwargs = condition_kwargs + + def condition_check(self): + #print("_wait_condition_check") + waited = time.time() - self.timeout_started + if self.condition(*self.condition_args, **self.condition_kwargs): + self.condition_ack = True + elif waited > self.timeout: + self.timeout_ack = True + +class EventLoop: + poll_funcs = [] + gloop = None + gctx = None + deferred_handling = None + + def __init__(self): + self.gloop = GLib.MainLoop() + self.gctx = self.gloop.get_context() + self.deferred_handling = DeferredHandling() + + def _trigger_cb_func(self, user_data): + self.defer(user_data) + return True #to retrigger the timeout + + def defer(self, handler, *args, **kwargs): + self.deferred_handling.defer(handler, *args, **kwargs) + + def register_poll_func(self, func, timestep=1): + id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, func) # in 1/1000th of a sec + self.poll_funcs.append((func, id)) + + def unregister_poll_func(self, func): + for pair in self.poll_funcs: + f, id = pair + if f == func: + GObject.source_remove(id) + self.poll_funcs.remove(pair) + return + + def poll(self, may_block=False): + self.gctx.iteration(may_block) + self.deferred_handling.handle_queue() + + def wait_no_raise(self, log_obj, condition, condition_args, condition_kwargs, timeout, timestep): + if not timeout or timeout < 0: + self = log_obj + raise log.Error('wait() *must* time out at some point.', timeout=timeout) + if timestep < 0.1: + timestep = 0.1 + + wait_req = WaitRequest(condition, condition_args, condition_kwargs, timeout, timestep) + wait_id = GObject.timeout_add(timestep*1000, self._trigger_cb_func, wait_req.condition_check) + while True: + self.poll(may_block=True) + if wait_req.condition_ack or wait_req.timeout_ack: + GObject.source_remove(wait_id) + success = wait_req.condition_ack + return success + + def wait(self, log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs): + if not self.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep): + log.ctx(log_obj) + raise log.Error('Wait timeout') + + def sleep(self, log_obj, seconds): + assert seconds > 0. + self.wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=seconds) + + +evloop = EventLoop() def register_poll_func(func): - global poll_funcs - poll_funcs.append(func) + global evloop + evloop.register_poll_func(func) def unregister_poll_func(func): - global poll_funcs - poll_funcs.remove(func) + global evloop + evloop.unregister_poll_func(func) def poll(): - global poll_funcs - for func in poll_funcs: - func() + global evloop + evloop.poll() def wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep): - if not timeout or timeout < 0: - self = log_obj - raise log.Error('wait() *must* time out at some point.', timeout=timeout) - if timestep < 0.1: - timestep = 0.1 - - started = time.time() - while True: - poll() - if condition(*condition_args, **condition_kwargs): - return True - waited = time.time() - started - if waited > timeout: - return False - time.sleep(timestep) + global evloop + evloop.wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep) def wait(log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs): - if not wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep): - log.ctx(log_obj) - raise log.Error('Wait timeout') + global evloop + evloop.wait(log_obj, condition, *condition_args, timeout=timeout, timestep=timestep, **condition_kwargs) def sleep(log_obj, seconds): - assert seconds > 0. - wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=min(seconds, 1)) + global evloop + evloop.sleep(log_obj, seconds) + +def defer(handler, *args, **kwargs): + global evloop + evloop.defer(handler, *args, **kwargs) # vim: expandtab tabstop=4 shiftwidth=4 diff --git a/src/osmo_gsm_tester/modem.py b/src/osmo_gsm_tester/modem.py index f50a291..8d41935 100644 --- a/src/osmo_gsm_tester/modem.py +++ b/src/osmo_gsm_tester/modem.py @@ -30,8 +30,6 @@ Gio = get_introspection_module('Gio') from gi.repository import GLib -glib_main_loop = GLib.MainLoop() -glib_main_ctx = glib_main_loop.get_context() bus = SystemBus() I_MODEM = 'org.ofono.Modem' @@ -49,24 +47,14 @@ NETREG_MAX_REGISTER_ATTEMPTS = 3 -class DeferredHandling: - defer_queue = [] +class DeferredDBus: def __init__(self, dbus_iface, handler): self.handler = handler self.subscription_id = dbus_iface.connect(self.receive_signal) def receive_signal(self, *args, **kwargs): - DeferredHandling.defer_queue.append((self.handler, args, kwargs)) - - @staticmethod - def handle_queue(): - while DeferredHandling.defer_queue: - handler, args, kwargs = DeferredHandling.defer_queue.pop(0) - handler(*args, **kwargs) - -def defer(handler, *args, **kwargs): - DeferredHandling.defer_queue.append((handler, args, kwargs)) + event_loop.defer(self.handler, *args, **kwargs) def dbus_connect(dbus_iface, handler): '''This function shall be used instead of directly connecting DBus signals. @@ -75,15 +63,7 @@ so that a signal handler is invoked only after the DBus polling is through by enlisting signals that should be handled in the DeferredHandling.defer_queue.''' - return DeferredHandling(dbus_iface, handler).subscription_id - -def poll_glib(): - global glib_main_ctx - while glib_main_ctx.pending(): - glib_main_ctx.iteration() - DeferredHandling.handle_queue() - -event_loop.register_poll_func(poll_glib) + return DeferredDBus(dbus_iface, handler).subscription_id def systembus_get(path): global bus @@ -493,8 +473,8 @@ # waiting for that. Make it async and try to register when the scan is # finished. register_func = self.scan_cb_register_automatic if mcc_mnc is None else self.scan_cb_register - result_handler = lambda obj, result, user_data: defer(register_func, result, user_data) - error_handler = lambda obj, e, user_data: defer(self.scan_cb_error_handler, e, mcc_mnc) + result_handler = lambda obj, result, user_data: event_loop.defer(register_func, result, user_data) + error_handler = lambda obj, e, user_data: event_loop.defer(self.scan_cb_error_handler, e, mcc_mnc) dbus_async_call(netreg, netreg.Scan, timeout=30, cancellable=self.cancellable, result_handler=result_handler, error_handler=error_handler, user_data=mcc_mnc) @@ -559,7 +539,7 @@ self.cancellable.cancel() # Cancel op is applied as a signal coming from glib mainloop, so we # need to run it and wait for the callbacks to handle cancellations. - poll_glib() + event_loop.poll() # once it has been triggered, create a new one for next operation: self.cancellable = Gio.Cancellable.new() -- To view, visit https://gerrit.osmocom.org/7559 To unsubscribe, visit https://gerrit.osmocom.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I8c3bc44bbe443703077110cdc67207e9cbb43767 Gerrit-PatchSet: 1 Gerrit-Project: osmo-gsm-tester Gerrit-Branch: master Gerrit-Owner: Pau Espin Pedrol <pespin at sysmocom.de>