Change in osmo-gsm-tester[master]: Move all obj/ references in suite.py to testenv.py

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

pespin gerrit-no-reply at lists.osmocom.org
Tue May 5 14:45:40 UTC 2020


pespin has submitted this change. ( https://gerrit.osmocom.org/c/osmo-gsm-tester/+/18042 )

Change subject: Move all obj/ references in suite.py to testenv.py
......................................................................

Move all obj/ references in suite.py to testenv.py

Change-Id: If4ab39be7a97d33e82c5a34e2a10dfec38613a4e
---
M selftest/suite_test/test_suite/hello_world.py
M selftest/suite_test/test_suite/test_error.py
M selftest/suite_test/test_suite/test_fail.py
M src/osmo_gsm_tester/core/test.py
M src/osmo_gsm_tester/suite.py
M src/osmo_gsm_tester/testenv.py
M src/osmo_gsm_tester/trial.py
7 files changed, 315 insertions(+), 271 deletions(-)

Approvals:
  Jenkins Builder: Verified
  pespin: Looks good to me, approved



diff --git a/selftest/suite_test/test_suite/hello_world.py b/selftest/suite_test/test_suite/hello_world.py
index 073d07f..a69f95a 100644
--- a/selftest/suite_test/test_suite/hello_world.py
+++ b/selftest/suite_test/test_suite/hello_world.py
@@ -1,5 +1,5 @@
 from osmo_gsm_tester.testenv import *
 
 print('hello world')
-print('I am %r / %r' % (suite.name(), test.name()))
+print('I am %r / %r' % (suite.suite().name(), test.name()))
 print('one\ntwo\nthree')
diff --git a/selftest/suite_test/test_suite/test_error.py b/selftest/suite_test/test_suite/test_error.py
index c0583ff..70d14a1 100755
--- a/selftest/suite_test/test_suite/test_error.py
+++ b/selftest/suite_test/test_suite/test_error.py
@@ -1,5 +1,5 @@
 from osmo_gsm_tester.testenv import *
 
-print('I am %r / %r' % (suite.name(), test.name()))
+print('I am %r / %r' % (suite.suite().name(), test.name()))
 
 assert False
diff --git a/selftest/suite_test/test_suite/test_fail.py b/selftest/suite_test/test_suite/test_fail.py
index cbaeded..ffb7218 100755
--- a/selftest/suite_test/test_suite/test_fail.py
+++ b/selftest/suite_test/test_suite/test_fail.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python3
 from osmo_gsm_tester.testenv import *
 
-print('I am %r / %r' % (suite.name(), test.name()))
+print('I am %r / %r' % (suite.suite().name(), test.name()))
 
 test.set_fail('EpicFail', 'This failure is expected')
diff --git a/src/osmo_gsm_tester/core/test.py b/src/osmo_gsm_tester/core/test.py
index 93dbf6a..76c9ce9 100644
--- a/src/osmo_gsm_tester/core/test.py
+++ b/src/osmo_gsm_tester/core/test.py
@@ -21,9 +21,13 @@
 import sys
 import time
 import traceback
-from .. import testenv
 
-from . import log, util, resource
+from . import log
+from . import  util
+from . import resource
+from .event_loop import MainLoop
+
+from .. import testenv
 
 class Test(log.Origin):
     UNKNOWN = 'UNKNOWN' # matches junit 'error'
@@ -51,12 +55,13 @@
         return self._run_dir
 
     def run(self):
+        testenv_obj = None
         try:
             self.log_target = log.FileLogTarget(self.get_run_dir().new_child('log')).set_all_levels(log.L_DBG).style_change(trace=True)
             log.large_separator(self.suite_run.trial.name(), self.suite_run.name(), self.name(), sublevel=3)
             self.status = Test.UNKNOWN
             self.start_timestamp = time.time()
-            testenv.setup(self.suite_run, self)
+            testenv_obj = testenv.setup(self.suite_run, self)
             with self.redirect_stdout():
                 util.run_python_file('%s.%s' % (self.suite_run.definition.name(), self.basename),
                                      self.path)
@@ -83,6 +88,8 @@
             self.err('TEST RUN ABORTED: %s' % type(e).__name__)
             raise
         finally:
+            if testenv_obj:
+                testenv_obj.stop()
             if self.log_target:
                 self.log_target.remove()
 
diff --git a/src/osmo_gsm_tester/suite.py b/src/osmo_gsm_tester/suite.py
index 6952fd2..3416ff5 100644
--- a/src/osmo_gsm_tester/suite.py
+++ b/src/osmo_gsm_tester/suite.py
@@ -21,18 +21,12 @@
 import sys
 import time
 import pprint
-from .core import config, log, util, process, schema, resource
+from .core import config
+from .core import log
+from .core import util
+from .core import schema
+from .core import resource
 from .core import test
-from .core.event_loop import MainLoop
-from .obj import nitb_osmo, hlr_osmo, mgcpgw_osmo, mgw_osmo, msc_osmo, bsc_osmo, stp_osmo, ggsn_osmo, sgsn_osmo, esme, osmocon, ms_driver, iperf3
-from .obj import run_node
-from .obj import epc
-from .obj import enb
-from .obj import bts
-from .obj import ms
-
-class Timeout(Exception):
-    pass
 
 class SuiteDefinition(log.Origin):
     '''A test suite reserves resources for a number of tests.
@@ -74,12 +68,9 @@
         self.start_timestamp = None
         self.duration = None
         self.reserved_resources = None
-        self.objects_to_clean_up = None
-        self.test_import_modules_to_clean_up = []
         self._resource_requirements = None
         self._resource_modifiers = None
         self._config = None
-        self._processes = []
         self._run_dir = None
         self.trial = trial
         self.definition = suite_definition
@@ -93,40 +84,6 @@
         for test_basename in self.definition.test_basenames:
             self.tests.append(test.Test(self, test_basename))
 
-    def register_for_cleanup(self, *obj):
-        assert all([hasattr(o, 'cleanup') for o in obj])
-        self.objects_to_clean_up = self.objects_to_clean_up or []
-        self.objects_to_clean_up.extend(obj)
-
-    def objects_cleanup(self):
-        while self.objects_to_clean_up:
-            obj = self.objects_to_clean_up.pop()
-            try:
-                obj.cleanup()
-            except Exception:
-                log.log_exn()
-
-    def test_import_modules_register_for_cleanup(self, mod):
-        '''
-        Tests are required to call this API for any module loaded from its own
-        lib subdir, because they are loaded in the global namespace. Otherwise
-        later tests importing modules with the same name will re-use an already
-        loaded module.
-        '''
-        if mod not in self.test_import_modules_to_clean_up:
-            self.dbg('registering module %r for cleanup' % mod)
-            self.test_import_modules_to_clean_up.append(mod)
-
-    def test_import_modules_cleanup(self):
-        while self.test_import_modules_to_clean_up:
-            mod = self.test_import_modules_to_clean_up.pop()
-            try:
-                self.dbg('Cleaning up module %r' % mod)
-                del sys.modules[mod.__name__]
-                del mod
-            except Exception:
-                log.log_exn()
-
     def mark_start(self):
         self.start_timestamp = time.time()
         self.duration = 0
@@ -155,11 +112,6 @@
             self._run_dir = util.Dir(self.trial.get_run_dir().new_dir(self.name()))
         return self._run_dir
 
-    def get_test_run_dir(self):
-        if self.current_test:
-            return self.current_test.get_run_dir()
-        return self.get_run_dir()
-
     def resource_requirements(self):
         if self._resource_requirements is None:
             self._resource_requirements = self.combined('resources')
@@ -175,19 +127,24 @@
             self._config = self.combined('config', False)
         return self._config
 
+    def resource_pool(self):
+        return self.resources_pool
+
     def reserve_resources(self):
         if self.reserved_resources:
             raise RuntimeError('Attempt to reserve resources twice for a SuiteRun')
         self.log('reserving resources in', self.resources_pool.state_dir, '...')
         self.reserved_resources = self.resources_pool.reserve(self, self.resource_requirements(), self.resource_modifiers())
 
+    def get_reserved_resource(self, resource_class_str, specifics):
+        return self.reserved_resources.get(resource_class_str, specifics=specifics)
+
     def run_tests(self, names=None):
         suite_libdir = os.path.join(self.definition.suite_dir, 'lib')
         try:
             log.large_separator(self.trial.name(), self.name(), sublevel=2)
             self.mark_start()
             util.import_path_prepend(suite_libdir)
-            MainLoop.register_poll_func(self.poll)
             if not self.reserved_resources:
                 self.reserve_resources()
             for t in self.tests:
@@ -196,9 +153,6 @@
                     continue
                 self.current_test = t
                 t.run()
-                self.stop_processes()
-                self.objects_cleanup()
-                self.reserved_resources.put_all()
         except Exception:
             log.log_exn()
         except BaseException as e:
@@ -206,14 +160,7 @@
             self.err('SUITE RUN ABORTED: %s' % type(e).__name__)
             raise
         finally:
-            # if sys.exit() called from signal handler (e.g. SIGINT), SystemExit
-            # base exception is raised. Make sure to stop processes in this
-            # finally section. Resources are automatically freed with 'atexit'.
-            self.stop_processes()
-            self.objects_cleanup()
             self.free_resources()
-            MainLoop.unregister_poll_func(self.poll)
-            self.test_import_modules_cleanup()
             util.import_path_remove(suite_libdir)
             self.duration = time.time() - self.start_timestamp
 
@@ -245,203 +192,11 @@
                 errors += 1
         return (passed, skipped, failed, errors)
 
-    def remember_to_stop(self, process, respawn=False):
-        '''Ask suite to monitor and manage lifecycle of the Process object. If a
-        process managed by suite finishes before cleanup time, the current test
-        will be marked as FAIL and end immediatelly. If respwan=True, then suite
-        will respawn() the process instead.'''
-        self._processes.insert(0, (process, respawn))
-
-    def stop_processes(self):
-        if len(self._processes) == 0:
-            return
-        strategy = process.ParallelTerminationStrategy()
-        while self._processes:
-            proc, _ = self._processes.pop()
-            strategy.add_process(proc)
-        strategy.terminate_all()
-
-    def stop_process(self, process):
-        'Remove process from monitored list and stop it'
-        for proc_respawn in self._processes:
-            proc, respawn = proc_respawn
-            if proc == process:
-                self._processes.remove(proc_respawn)
-                proc.terminate()
-
     def free_resources(self):
         if self.reserved_resources is None:
             return
         self.reserved_resources.free()
 
-    def ip_address(self, specifics=None):
-        return self.reserved_resources.get(resource.R_IP_ADDRESS, specifics=specifics)
-
-    def nitb(self, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return nitb_osmo.OsmoNitb(self, ip_address)
-
-    def hlr(self, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return hlr_osmo.OsmoHlr(self, ip_address)
-
-    def ggsn(self, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return ggsn_osmo.OsmoGgsn(self, ip_address)
-
-    def sgsn(self, hlr, ggsn, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return sgsn_osmo.OsmoSgsn(self, hlr, ggsn, ip_address)
-
-    def mgcpgw(self, ip_address=None, bts_ip=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return mgcpgw_osmo.OsmoMgcpgw(self, ip_address, bts_ip)
-
-    def mgw(self, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return mgw_osmo.OsmoMgw(self, ip_address)
-
-    def msc(self, hlr, mgcpgw, stp, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return msc_osmo.OsmoMsc(self, hlr, mgcpgw, stp, ip_address)
-
-    def bsc(self, msc, mgw, stp, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return bsc_osmo.OsmoBsc(self, msc, mgw, stp, ip_address)
-
-    def stp(self, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        return stp_osmo.OsmoStp(self, ip_address)
-
-    def ms_driver(self):
-        ms = ms_driver.MsDriver(self)
-        self.register_for_cleanup(ms)
-        return ms
-
-    def bts(self, specifics=None):
-        bts_obj = bts.Bts.get_instance_by_type(self, self.reserved_resources.get(resource.R_BTS, specifics=specifics))
-        bts_obj.set_lac(self.lac())
-        bts_obj.set_rac(self.rac())
-        bts_obj.set_cellid(self.cellid())
-        bts_obj.set_bvci(self.bvci())
-        self.register_for_cleanup(bts_obj)
-        return bts_obj
-
-    def modem(self, specifics=None):
-        conf = self.reserved_resources.get(resource.R_MODEM, specifics=specifics)
-        ms_obj = ms.MS.get_instance_by_type(self, conf)
-        self.register_for_cleanup(ms_obj)
-        return ms_obj
-
-    def modems(self, count):
-        l = []
-        for i in range(count):
-            l.append(self.modem())
-        return l
-
-    def all_resources(self, resource_func):
-        """Returns all yielded resource."""
-        l = []
-        while True:
-            try:
-                l.append(resource_func())
-            except resource.NoResourceExn:
-                return l
-
-    def esme(self):
-        esme_obj = esme.Esme(self.msisdn())
-        self.register_for_cleanup(esme_obj)
-        return esme_obj
-
-    def run_node(self, specifics=None):
-        return run_node.RunNode.from_conf(self.reserved_resources.get(resource.R_RUN_NODE, specifics=specifics))
-
-    def enb(self, specifics=None):
-        enb_obj = enb.eNodeB.get_instance_by_type(self, self.reserved_resources.get(resource.R_ENB, specifics=specifics))
-        self.register_for_cleanup(enb_obj)
-        return enb_obj
-
-    def epc(self, run_node=None):
-        if run_node is None:
-            run_node = self.run_node()
-        epc_obj = epc.EPC.get_instance_by_type(self, run_node)
-        self.register_for_cleanup(epc_obj)
-        return epc_obj
-
-    def osmocon(self, specifics=None):
-        conf = self.reserved_resources.get(resource.R_OSMOCON, specifics=specifics)
-        osmocon_obj = osmocon.Osmocon(self, conf=conf)
-        self.register_for_cleanup(osmocon_obj)
-        return osmocon_obj
-
-    def iperf3srv(self, ip_address=None):
-        if ip_address is None:
-            ip_address = self.ip_address()
-        iperf3srv_obj = iperf3.IPerf3Server(self, ip_address)
-        return iperf3srv_obj
-
-    def msisdn(self):
-        msisdn = self.resources_pool.next_msisdn(self)
-        self.log('using MSISDN', msisdn)
-        return msisdn
-
-    def lac(self):
-        lac = self.resources_pool.next_lac(self)
-        self.log('using LAC', lac)
-        return lac
-
-    def rac(self):
-        rac = self.resources_pool.next_rac(self)
-        self.log('using RAC', rac)
-        return rac
-
-    def cellid(self):
-        cellid = self.resources_pool.next_cellid(self)
-        self.log('using CellId', cellid)
-        return cellid
-
-    def bvci(self):
-        bvci = self.resources_pool.next_bvci(self)
-        self.log('using BVCI', bvci)
-        return bvci
-
-    def poll(self):
-        for proc, respawn in self._processes:
-            if proc.terminated():
-                if respawn == True:
-                    proc.respawn()
-                else:
-                    proc.log_stdout_tail()
-                    proc.log_stderr_tail()
-                    log.ctx(proc)
-                    raise log.Error('Process ended prematurely: %s' % proc.name())
-
-    def prompt(self, *msgs, **msg_details):
-        'ask for user interaction. Do not use in tests that should run automatically!'
-        if msg_details:
-            msgs = list(msgs)
-            msgs.append('{%s}' %
-                        (', '.join(['%s=%r' % (k,v)
-                                    for k,v in sorted(msg_details.items())])))
-        msg = ' '.join(msgs) or 'Hit Enter to continue'
-        self.log('prompt:', msg)
-        sys.__stdout__.write('\n\n--- PROMPT ---\n')
-        sys.__stdout__.write(msg)
-        sys.__stdout__.write('\n')
-        sys.__stdout__.flush()
-        entered = util.input_polling('> ', MainLoop.poll)
-        self.log('prompt entered:', repr(entered))
-        return entered
-
     def resource_status_str(self):
         return '\n'.join(('',
             'SUITE RUN: %s' % self.origin_id(),
diff --git a/src/osmo_gsm_tester/testenv.py b/src/osmo_gsm_tester/testenv.py
index 8c4743a..aa94f3c 100644
--- a/src/osmo_gsm_tester/testenv.py
+++ b/src/osmo_gsm_tester/testenv.py
@@ -20,6 +20,22 @@
 # These will be initialized before each test run.
 # A test script can thus establish its context by doing:
 # from osmo_gsm_tester.testenv import *
+
+import sys
+
+from .core import process
+from .core import log as log_module
+from .core import process as process_module
+from .core import resource
+from .core.event_loop import MainLoop
+
+from .obj import nitb_osmo, hlr_osmo, mgcpgw_osmo, mgw_osmo, msc_osmo, bsc_osmo, stp_osmo, ggsn_osmo, sgsn_osmo, esme, osmocon, ms_driver, iperf3
+from .obj import run_node
+from .obj import epc
+from .obj import enb
+from .obj import bts
+from .obj import ms
+
 trial = None
 suite = None
 test = None
@@ -32,22 +48,284 @@
 sleep = None
 poll = None
 prompt = None
-Timeout = None
 Sms = None
 process = None
 
+class Timeout(Exception):
+    pass
+
+class TestEnv(log_module.Origin):
+    def __init__(self, suite_run, test):
+        super().__init__(log_module.C_TST, test.name())
+        self.suite_run = suite_run
+        self._test = test
+        self.trial = suite_run.trial # backward compat with objects
+        self.resources_pool = suite_run.resource_pool() # backward compat with objects
+        self._processes = []
+        self.test_import_modules_to_clean_up = []
+        self.objects_to_clean_up = None
+        MainLoop.register_poll_func(self.poll)
+
+    def suite(self):
+        return self.suite_run
+
+    # backward compat with objects
+    def get_test_run_dir(self):
+        return self._test.get_run_dir()
+
+    # backward compat with objects
+    def config(self):
+        return self.suite_run.config()
+
+    def remember_to_stop(self, process, respawn=False):
+        '''Ask suite to monitor and manage lifecycle of the Process object. If a
+        process managed by suite finishes before cleanup time, the current test
+        will be marked as FAIL and end immediatelly. If respwan=True, then suite
+        will respawn() the process instead.'''
+        self._processes.insert(0, (process, respawn))
+
+    def stop_processes(self):
+        if len(self._processes) == 0:
+            return
+        strategy = process_module.ParallelTerminationStrategy()
+        while self._processes:
+            proc, _ = self._processes.pop()
+            strategy.add_process(proc)
+        strategy.terminate_all()
+
+    def stop_process(self, process):
+        'Remove process from monitored list and stop it'
+        for proc_respawn in self._processes:
+            proc, respawn = proc_respawn
+            if proc == process:
+                self._processes.remove(proc_respawn)
+                proc.terminate()
+
+    def register_for_cleanup(self, *obj):
+        assert all([hasattr(o, 'cleanup') for o in obj])
+        self.objects_to_clean_up = self.objects_to_clean_up or []
+        self.objects_to_clean_up.extend(obj)
+
+    def objects_cleanup(self):
+        while self.objects_to_clean_up:
+            obj = self.objects_to_clean_up.pop()
+            try:
+                obj.cleanup()
+            except Exception:
+                log_module.log_exn()
+
+    def test_import_modules_register_for_cleanup(self, mod):
+        '''
+        Tests are required to call this API for any module loaded from its own
+        lib subdir, because they are loaded in the global namespace. Otherwise
+        later tests importing modules with the same name will re-use an already
+        loaded module.
+        '''
+        if mod not in self.test_import_modules_to_clean_up:
+            self.dbg('registering module %r for cleanup' % mod)
+            self.test_import_modules_to_clean_up.append(mod)
+
+    def test_import_modules_cleanup(self):
+        while self.test_import_modules_to_clean_up:
+            mod = self.test_import_modules_to_clean_up.pop()
+            try:
+                self.dbg('Cleaning up module %r' % mod)
+                del sys.modules[mod.__name__]
+                del mod
+            except Exception:
+                log_module.log_exn()
+
+    def poll(self):
+        for proc, respawn in self._processes:
+            if proc.terminated():
+                if respawn == True:
+                    proc.respawn()
+                else:
+                    proc.log_stdout_tail()
+                    proc.log_stderr_tail()
+                    log_module.ctx(proc)
+                    raise log_module.Error('Process ended prematurely: %s' % proc.name())
+
+    def stop(self):
+        # if sys.exit() called from signal handler (e.g. SIGINT), SystemExit
+        # base exception is raised. Make sure to stop processes in this
+        # finally section. Resources are automatically freed with 'atexit'.
+        self.stop_processes()
+        self.objects_cleanup()
+        self.suite_run.reserved_resources.put_all()
+        MainLoop.unregister_poll_func(self.poll)
+        self.test_import_modules_cleanup()
+
+    def prompt(self, *msgs, **msg_details):
+        'ask for user interaction. Do not use in tests that should run automatically!'
+        if msg_details:
+            msgs = list(msgs)
+            msgs.append('{%s}' %
+                        (', '.join(['%s=%r' % (k,v)
+                                    for k,v in sorted(msg_details.items())])))
+        msg = ' '.join(msgs) or 'Hit Enter to continue'
+        self.log('prompt:', msg)
+        sys.__stdout__.write('\n\n--- PROMPT ---\n')
+        sys.__stdout__.write(msg)
+        sys.__stdout__.write('\n')
+        sys.__stdout__.flush()
+        entered = util.input_polling('> ', MainLoop.poll)
+        self.log('prompt entered:', repr(entered))
+        return entered
+
+    def get_reserved_resource(self, resource_class_str, specifics=None):
+        return self.suite_run.get_reserved_resource(resource_class_str, specifics)
+
+    def ip_address(self, specifics=None):
+        return self.get_reserved_resource(resource.R_IP_ADDRESS, specifics)
+
+    def nitb(self, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return nitb_osmo.OsmoNitb(self, ip_address)
+
+    def hlr(self, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return hlr_osmo.OsmoHlr(self, ip_address)
+
+    def ggsn(self, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return ggsn_osmo.OsmoGgsn(self, ip_address)
+
+    def sgsn(self, hlr, ggsn, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return sgsn_osmo.OsmoSgsn(self, hlr, ggsn, ip_address)
+
+    def mgcpgw(self, ip_address=None, bts_ip=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return mgcpgw_osmo.OsmoMgcpgw(self, ip_address, bts_ip)
+
+    def mgw(self, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return mgw_osmo.OsmoMgw(self, ip_address)
+
+    def msc(self, hlr, mgcpgw, stp, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return msc_osmo.OsmoMsc(self, hlr, mgcpgw, stp, ip_address)
+
+    def bsc(self, msc, mgw, stp, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return bsc_osmo.OsmoBsc(self, msc, mgw, stp, ip_address)
+
+    def stp(self, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        return stp_osmo.OsmoStp(self, ip_address)
+
+    def ms_driver(self):
+        ms = ms_driver.MsDriver(self)
+        self.register_for_cleanup(ms)
+        return ms
+
+    def bts(self, specifics=None):
+        bts_obj = bts.Bts.get_instance_by_type(self, self.get_reserved_resource(resource.R_BTS, specifics=specifics))
+        bts_obj.set_lac(self.lac())
+        bts_obj.set_rac(self.rac())
+        bts_obj.set_cellid(self.cellid())
+        bts_obj.set_bvci(self.bvci())
+        self.register_for_cleanup(bts_obj)
+        return bts_obj
+
+    def modem(self, specifics=None):
+        conf = self.get_reserved_resource(resource.R_MODEM, specifics=specifics)
+        ms_obj = ms.MS.get_instance_by_type(self, conf)
+        self.register_for_cleanup(ms_obj)
+        return ms_obj
+
+    def modems(self, count):
+        l = []
+        for i in range(count):
+            l.append(self.modem())
+        return l
+
+    def all_resources(self, resource_func):
+        """Returns all yielded resource."""
+        l = []
+        while True:
+            try:
+                l.append(resource_func())
+            except resource.NoResourceExn:
+                return l
+
+    def esme(self):
+        esme_obj = esme.Esme(self.msisdn())
+        self.register_for_cleanup(esme_obj)
+        return esme_obj
+
+    def run_node(self, specifics=None):
+        return run_node.RunNode.from_conf(self.get_reserved_resource(resource.R_RUN_NODE, specifics=specifics))
+
+    def enb(self, specifics=None):
+        enb_obj = enb.eNodeB.get_instance_by_type(self, self.get_reserved_resource(resource.R_ENB, specifics=specifics))
+        self.register_for_cleanup(enb_obj)
+        return enb_obj
+
+    def epc(self, run_node=None):
+        if run_node is None:
+            run_node = self.run_node()
+        epc_obj = epc.EPC.get_instance_by_type(self, run_node)
+        self.register_for_cleanup(epc_obj)
+        return epc_obj
+
+    def osmocon(self, specifics=None):
+        conf = self.get_reserved_resource(resource.R_OSMOCON, specifics=specifics)
+        osmocon_obj = osmocon.Osmocon(self, conf=conf)
+        self.register_for_cleanup(osmocon_obj)
+        return osmocon_obj
+
+    def iperf3srv(self, ip_address=None):
+        if ip_address is None:
+            ip_address = self.ip_address()
+        iperf3srv_obj = iperf3.IPerf3Server(self, ip_address)
+        return iperf3srv_obj
+
+    def msisdn(self):
+        msisdn = self.suite_run.resource_pool().next_msisdn(self)
+        self.log('using MSISDN', msisdn)
+        return msisdn
+
+    def lac(self):
+        lac = self.suite_run.resource_pool().next_lac(self)
+        self.log('using LAC', lac)
+        return lac
+
+    def rac(self):
+        rac = self.suite_run.resource_pool().next_rac(self)
+        self.log('using RAC', rac)
+        return rac
+
+    def cellid(self):
+        cellid = self.suite_run.resource_pool().next_cellid(self)
+        self.log('using CellId', cellid)
+        return cellid
+
+    def bvci(self):
+        bvci = self.suite_run.resource_pool().next_bvci(self)
+        self.log('using BVCI', bvci)
+        return bvci
+
+
 def setup(suite_run, _test):
-    from .core import process as process_module
     from .core.event_loop import MainLoop
     from .obj.sms import Sms as Sms_class
-    from . import suite as suite_module
 
-    global trial, suite, test, resources, log, dbg, err, wait, wait_no_raise, sleep, poll, prompt, Timeout, Sms, process
+    global trial, suite, test, resources, log, dbg, err, wait, wait_no_raise, sleep, poll, prompt, Sms, process
 
     trial = suite_run.trial
-    suite = suite_run
     test = _test
-    resources = suite_run.reserved_resources
+    resources = suite_run.reserved_resources # TODO: remove this global, only used in selftest
     log = test.log
     dbg = test.dbg
     err = test.err
@@ -55,9 +333,10 @@
     wait_no_raise = lambda *args, **kwargs: MainLoop.wait_no_raise(suite_run, *args, **kwargs)
     sleep = lambda *args, **kwargs: MainLoop.sleep(suite_run, *args, **kwargs)
     poll = MainLoop.poll
-    prompt = suite_run.prompt
-    Timeout = suite_module.Timeout
     Sms = Sms_class
     process = process_module
+    suite = TestEnv(suite_run, _test) # stored in "suite" for backward compatibility
+    prompt = suite.prompt
+    return suite
 
 # vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/trial.py b/src/osmo_gsm_tester/trial.py
index fb94a59..08b7d8f 100644
--- a/src/osmo_gsm_tester/trial.py
+++ b/src/osmo_gsm_tester/trial.py
@@ -22,7 +22,10 @@
 import shutil
 import tarfile
 
-from .core import log, util, report
+from .core import log
+from .core import util
+from .core import report
+
 from . import suite
 
 FILE_MARK_TAKEN = 'taken'

-- 
To view, visit https://gerrit.osmocom.org/c/osmo-gsm-tester/+/18042
To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings

Gerrit-Project: osmo-gsm-tester
Gerrit-Branch: master
Gerrit-Change-Id: If4ab39be7a97d33e82c5a34e2a10dfec38613a4e
Gerrit-Change-Number: 18042
Gerrit-PatchSet: 3
Gerrit-Owner: pespin <pespin at sysmocom.de>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: pespin <pespin at sysmocom.de>
Gerrit-MessageType: merged
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20200505/54395503/attachment.htm>


More information about the gerrit-log mailing list