Change in python/osmo-python-tests[master]: Add initial version of asyncio trap2cgi script

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

Max gerrit-no-reply at lists.osmocom.org
Wed Jan 9 16:42:42 UTC 2019


Max has submitted this change and it was merged. ( https://gerrit.osmocom.org/12476 )

Change subject: Add initial version of asyncio trap2cgi script
......................................................................

Add initial version of asyncio trap2cgi script

It uses the same config file format to simplify migration from ctrl2cgi.py

Change-Id: I7428cbfbc9f1b80ce42a70be555a38a3497d1cf9
Related: SYS#4399
---
M README
A contrib/systemd/osmo-trap2cgi.service
M debian/rules
A scripts/osmo_trap2cgi.py
M setup.py
5 files changed, 247 insertions(+), 1 deletion(-)

Approvals:
  Jenkins Builder: Verified
  daniel: Looks good to me, but someone else must approve
  Neels Hofmeyr: Looks good to me, approved



diff --git a/README b/README
index e90e214..69eb764 100644
--- a/README
+++ b/README
@@ -9,7 +9,8 @@
 There are currently following scripts in this package:
 osmotestconfig.py - test that apps start/write with example configs
 soap.py - implementation of SOAP <-> Ctrl proxy implemented on top of Twisted (deprecated, unmaintained)
-ctrl2cgi.py - implementation of CGI <-> Ctrl proxy implemented on top of Twisted
+ctrl2cgi.py - implementation of CGI <-> Ctrl proxy implemented on top of Twisted (deprecated, unmaintained)
+osmo_trap2cgi.py - implementation of CGI <-> Ctrl proxy implemented on top of asyncio and aiohttp
 osmo_rate_ctr2csv.py - rate counter dumper on top of osmo_ipa
 osmo_interact_vty.py - pipe stdin/stdout to a VTY session
 osmo_interact_ctrl.py - pipe stdin/stdout to a CTRL port
diff --git a/contrib/systemd/osmo-trap2cgi.service b/contrib/systemd/osmo-trap2cgi.service
new file mode 100644
index 0000000..7a90813
--- /dev/null
+++ b/contrib/systemd/osmo-trap2cgi.service
@@ -0,0 +1,11 @@
+[Unit]
+Description=Proxy between given GCI service and Osmocom CTRL protocol
+
+[Service]
+Type=simple
+Restart=always
+ExecStart=/usr/bin/osmo_trap2cgi.py -d -c %E/osmocom/%N.ini
+RestartSec=2
+
+[Install]
+WantedBy=multi-user.target
diff --git a/debian/rules b/debian/rules
index b33b599..04b59f6 100755
--- a/debian/rules
+++ b/debian/rules
@@ -17,3 +17,4 @@
 	# Install service file with different name than package name:
 	# https://unix.stackexchange.com/questions/306234/is-it-possible-to-install-two-services-for-one-package-using-dh-installinit-how
 	dh_installinit --name=osmo-ctrl2cgi
+	dh_installinit --name=osmo-trap2cgi
diff --git a/scripts/osmo_trap2cgi.py b/scripts/osmo_trap2cgi.py
new file mode 100755
index 0000000..ad66e7b
--- /dev/null
+++ b/scripts/osmo_trap2cgi.py
@@ -0,0 +1,232 @@
+#!/usr/bin/python3
+# -*- mode: python-mode; py-indent-tabs-mode: nil -*-
+"""
+/*
+ * Copyright (C) 2019 sysmocom s.f.m.c. GmbH
+ *
+ * All Rights Reserved
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+"""
+
+__version__ = "0.0.1" # bump this on every non-trivial change
+
+from functools import partial
+import configparser, argparse, time, os, asyncio, aiohttp
+from osmopy.trap_helper import make_params, gen_hash, log_init, comm_proc
+from osmopy.osmo_ipa import Ctrl
+
+
+def log_bsc_time(l, rq, task, ts, bsc, msg, *args, **kwargs):
+    """
+    Logging contextual wrapper.
+    FIXME: remove task parameter once we bump requirements to Python 3.7+
+    """
+    # FIXME: following function is deprecated and will be removed in Python 3.9
+    # Use the asyncio.all_tasks() function instead when available (Python 3.7+).
+    num_tasks = len(task.all_tasks())
+    num_req = len(rq)
+    delta = time.perf_counter() - ts
+    if delta < 1:
+        l('[%d/%d] BSC %s: ' + msg, num_req, num_tasks, bsc, *args, **kwargs)
+    else:
+        l('[%d/%d] BSC %s, %.2f sec: ' + msg, num_req, num_tasks, bsc, time.perf_counter() - ts, *args, **kwargs)
+
+def check_h_val(ctrl, h, v, t, exp):
+    """
+    Check for header inconsistencies.
+    """
+    if v != exp:
+        ctrl.log.error('Unexpected %s value %x (instead of %x) in |%s| header', t, v, exp, h.hex())
+
+def get_ctrl_len(ctrl, header):
+    """
+    Obtain expected message length.
+    """
+    (dlen, p, e, _) = ctrl.del_header(header)
+    check_h_val(ctrl, header, p, "protocol", ctrl.PROTO['OSMO'])
+    check_h_val(ctrl, header, e, "extension", ctrl.EXT['CTRL'])
+    return dlen - 1
+
+
+class Proxy(Ctrl):
+    """
+    Wrapper class to implement per-type message dispatch and keep BSC <-> http Task mapping.
+    N. B: keep async/await semantics out of it.
+    """
+    def __init__(self, log):
+        super().__init__()
+        self.req = {}
+        self.log = log
+        self.conf = configparser.ConfigParser(interpolation = None)
+        self.conf.read(self.config_file)
+        self.timeout = self.conf['main'].getint('timeout', 30)
+        self.location = self.conf['main'].get('location')
+        self.ctrl_addr = self.conf['main'].get('addr_ctrl', 'localhost')
+        self.ctrl_port = self.conf['main'].getint('port_ctrl', 4250)
+        self.concurrency = self.conf['main'].getint('num_max_conn', 5)
+        # FIXME: use timeout parameter when available (aiohttp version 3.3) as follows
+        #self.http_client = aiohttp.ClientSession(connector = aiohttp.TCPConnector(limit = self.concurrency), timeout = self.timeout)
+        self.http_client = aiohttp.ClientSession(connector = aiohttp.TCPConnector(limit = self.concurrency))
+
+    def dispatch(self, w, data):
+        """
+        Basic dispatcher: the expected entry point for CTRL messages.
+        """
+        (cmd, _, v) = data.decode('utf-8').split(' ', 2)
+        method = getattr(self, cmd, lambda *_: self.log.info('CTRL %s is unhandled by dispatch: ignored.', cmd))
+        method(w, v.split())
+
+    def ERROR(self, _, k):
+        """
+        Handle CTRL ERROR messages.
+        """
+        self.log_ignore('ERROR', k)
+
+    def SET_REPLY(self, _, k):
+        """
+        Handle CTRL SET_REPLY messages.
+        """
+        self.log_ignore('SET_REPLY', k)
+
+    def TRAP(self, w, k):
+        """
+        Handle incoming TRAPs.
+        """
+        p = k[0].split('.')
+        if p[-1] == 'location-state':
+            self.handle_locationstate(w, p[1], p[3], p[5], k[1])
+        else:
+            self.log_ignore('TRAP', k[0])
+
+    def handle_locationstate(self, w, net, bsc, bts, data):
+        """
+        Handle location-state TRAP: parse trap content, build HTTP request and setup async handlers.
+        """
+        ts = time.perf_counter()
+        self.cleanup_task(bsc)
+        params = make_params(bsc, data)
+        params['h'] = gen_hash(params, self.conf['main'].get('secret_key'))
+        # FIXME: use asyncio.create_task() when available (Python 3.7+).
+        t = asyncio.ensure_future(self.http_client.post(self.location, data = params))
+        log_bsc_time(self.log.info, self.req, t, ts, bsc, 'location-state@%s => %s', params['time_stamp'], data)
+        t.add_done_callback(partial(self.reply_callback, w, bsc, ts))
+        self.req[bsc] = (t, ts)
+        log_bsc_time(self.log.info, self.req, t, ts, bsc, 'request added (net %s, BTS %s)', net, bts)
+
+    def cleanup_task(self, bsc):
+        """
+        It's ok to cancel() task which is done()
+        but if either of the checks above fires it means that Proxy() is in inconsistent state
+        which should never happen as long as we keep async/await semantics out of it.
+        """
+        if bsc in self.req:
+            (task, ts) = self.req[bsc]
+            log_bsc = partial(log_bsc_time, self.log.error, self.req, task, ts, bsc)
+            if task.done():
+                log_bsc('task is done but not removed')
+            if task.cancelled():
+                log_bsc('task is cancelled without update')
+            task.cancel()
+
+    def log_ignore(self, kind, m):
+        """
+        Log ignored CTRL message.
+        """
+        self.log.error('Ignoring CTRL %s: %s', kind, ' '.join(m) if type(m) is list else m)
+
+    def reply_callback(self, w, bsc, ts, task):
+        """
+        Process per-BSC response status and prepare async handler if necessary.
+        We don't have to delete cancel()ed task from self.req explicitly because it will be replaced by new one in handle_locationstate()
+        """
+        log_bsc = partial(log_bsc_time, self.log.info, self.req, task, ts, bsc)
+        if task.cancelled():
+            log_bsc('request cancelled')
+        else:
+            exp = task.exception()
+            if exp:
+                log_bsc('exception %s triggered', repr(exp))
+            else:
+                resp = task.result()
+                if resp.status != 200:
+                    log_bsc('unexpected HTTP response %d', resp.status)
+                else:
+                    log_bsc('request completed')
+                    # FIXME: use asyncio.create_task() when available (Python 3.7+).
+                    asyncio.ensure_future(recv_response(self.log, w, bsc, resp.json()))
+            del self.req[bsc]
+
+
+async def recv_response(log, w, bsc, resp):
+    """
+    Process json response asynchronously.
+    """
+    js = await resp
+    if js.get('error'):
+        log.info('BSC %s response error: %s', bsc, repr(js.get('error')))
+    else:
+        comm_proc(js.get('commands'), bsc, w.write, log)
+        await w.drain() # Trigger Writer's flow control
+
+async def recon_reader(proxy, reader, num_bytes):
+    """
+    Read requested amount of bytes, reconnect if necessary.
+    """
+    try:
+        return await reader.readexactly(num_bytes)
+    except asyncio.IncompleteReadError:
+        proxy.log.info('Failed to read %d bytes reconnecting to %s:%d...', num_bytes, proxy.ctrl_addr, proxy.ctrl_port)
+        await conn_client(proxy)
+
+async def ctrl_client(proxy, rd, wr):
+    """
+    Recursively read CTRL stream and handle selected messages.
+    """
+    header = await recon_reader(proxy, rd, 4)
+    data = await recon_reader(proxy, rd, get_ctrl_len(proxy, header))
+    proxy.dispatch(wr, data)
+    await ctrl_client(proxy, rd, wr)
+
+async def conn_client(proxy):
+    """
+    (Re)establish connection with CTRL server and pass Reader/Writer to CTRL handler.
+    """
+    try:
+        reader, writer = await asyncio.open_connection(proxy.ctrl_addr, proxy.ctrl_port)
+        proxy.log.info('Connected to %s:%d', proxy.ctrl_addr, proxy.ctrl_port)
+        await ctrl_client(proxy, reader, writer)
+    except OSError as e:
+        proxy.log.info('%s: %d seconds delayed retrying...', e, proxy.timeout)
+        await asyncio.sleep(proxy.timeout)
+        await conn_client(proxy)
+
+
+if __name__ == '__main__':
+    a = argparse.ArgumentParser(description = 'Proxy between given GCI service and Osmocom CTRL protocol.')
+    a.add_argument('-v', '--version', action = 'version', version = ("%(prog)s v" + __version__))
+    a.add_argument('-d', '--debug', action = 'store_true', help = "Enable debug log")
+    a.add_argument('-c', '--config-file', required = True, help = "Path to mandatory config file (in INI format).")
+
+    P = Proxy(log_init('TRAP2CGI', a.parse_args(namespace=Proxy).debug))
+
+    P.log.info('CGI proxy v%s starting with PID %d:', __version__, os.getpid())
+    P.log.info('Destination %s (concurrency %d)', P.location, P.concurrency)
+    P.log.info('Connecting to TRAP source %s:%d...', P.ctrl_addr, P.ctrl_port)
+
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(conn_client(P))
+    # FIXME: use loop.run() function instead when available (Python 3.7+).
diff --git a/setup.py b/setup.py
index 061e5df..69e18b0 100755
--- a/setup.py
+++ b/setup.py
@@ -31,6 +31,7 @@
         "scripts/osmo_rate_ctr2csv.py",
         "scripts/soap.py",
         "scripts/ctrl2cgi.py",
+        "scripts/osmo_trap2cgi.py",
         "scripts/osmo_interact_vty.py",
         "scripts/osmo_interact_ctrl.py",
         "scripts/osmo_verify_transcript_vty.py",

-- 
To view, visit https://gerrit.osmocom.org/12476
To unsubscribe, or for help writing mail filters, visit https://gerrit.osmocom.org/settings

Gerrit-Project: python/osmo-python-tests
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I7428cbfbc9f1b80ce42a70be555a38a3497d1cf9
Gerrit-Change-Number: 12476
Gerrit-PatchSet: 4
Gerrit-Owner: Max <msuraev at sysmocom.de>
Gerrit-Reviewer: Harald Welte <laforge at gnumonks.org>
Gerrit-Reviewer: Jenkins Builder (1000002)
Gerrit-Reviewer: Max <msuraev at sysmocom.de>
Gerrit-Reviewer: Neels Hofmeyr <nhofmeyr at sysmocom.de>
Gerrit-Reviewer: Pau Espin Pedrol <pespin at sysmocom.de>
Gerrit-Reviewer: Vadim Yanitskiy <axilirator at gmail.com>
Gerrit-Reviewer: daniel <dwillmann at sysmocom.de>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.osmocom.org/pipermail/gerrit-log/attachments/20190109/c2e0a628/attachment.htm>


More information about the gerrit-log mailing list