<p>neels has uploaded this change for <strong>review</strong>.</p><p><a href="https://gerrit.osmocom.org/c/osmo-gsm-tester/+/21504">View Change</a></p><pre style="font-family: monospace,monospace; white-space: pre-wrap;">add osmo_vty.py<br><br>To trigger manual handovers, I need a VTY interface. The non-trivial<br>parts of this are copied from osmo-python-tests osmo_interact_vty.py.<br><br>Will be used in the upcoming handover_2G test suite in<br>I0b2671304165a1aaae2b386af46fbd8b098e3bd8.<br><br>Change-Id: I7c17b143b7c690b8c4105ee7c6272670046fa91d<br>---<br>A src/osmo_gsm_tester/obj/osmo_vty.py<br>1 file changed, 181 insertions(+), 0 deletions(-)<br><br></pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;">git pull ssh://gerrit.osmocom.org:29418/osmo-gsm-tester refs/changes/04/21504/1</pre><pre style="font-family: monospace,monospace; white-space: pre-wrap;"><span>diff --git a/src/osmo_gsm_tester/obj/osmo_vty.py b/src/osmo_gsm_tester/obj/osmo_vty.py</span><br><span>new file mode 100644</span><br><span>index 0000000..88deb61</span><br><span>--- /dev/null</span><br><span>+++ b/src/osmo_gsm_tester/obj/osmo_vty.py</span><br><span>@@ -0,0 +1,181 @@</span><br><span style="color: hsl(120, 100%, 40%);">+# osmo_gsm_tester: VTY connection</span><br><span style="color: hsl(120, 100%, 40%);">+#</span><br><span style="color: hsl(120, 100%, 40%);">+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH</span><br><span style="color: hsl(120, 100%, 40%);">+#</span><br><span style="color: hsl(120, 100%, 40%);">+# Author: Neels Hofmeyr <neels@hofmeyr.de></span><br><span style="color: hsl(120, 100%, 40%);">+#</span><br><span style="color: hsl(120, 100%, 40%);">+# This program is free software: you can redistribute it and/or modify</span><br><span style="color: hsl(120, 100%, 40%);">+# it under the terms of the GNU General Public License as</span><br><span style="color: hsl(120, 100%, 40%);">+# published by the Free Software Foundation, either version 3 of the</span><br><span style="color: hsl(120, 100%, 40%);">+# License, or (at your option) any later version.</span><br><span style="color: hsl(120, 100%, 40%);">+#</span><br><span style="color: hsl(120, 100%, 40%);">+# This program is distributed in the hope that it will be useful,</span><br><span style="color: hsl(120, 100%, 40%);">+# but WITHOUT ANY WARRANTY; without even the implied warranty of</span><br><span style="color: hsl(120, 100%, 40%);">+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the</span><br><span style="color: hsl(120, 100%, 40%);">+# GNU General Public License for more details.</span><br><span style="color: hsl(120, 100%, 40%);">+#</span><br><span style="color: hsl(120, 100%, 40%);">+# You should have received a copy of the GNU General Public License</span><br><span style="color: hsl(120, 100%, 40%);">+# along with this program.  If not, see <http://www.gnu.org/licenses/>.</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+import socket</span><br><span style="color: hsl(120, 100%, 40%);">+import struct</span><br><span style="color: hsl(120, 100%, 40%);">+import re</span><br><span style="color: hsl(120, 100%, 40%);">+import time</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+from ..core import log</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+class VtyInterfaceExn(Exception):</span><br><span style="color: hsl(120, 100%, 40%);">+    pass</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+class OsmoVty(log.Origin):</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def __init__(self, host, port, prompt=None):</span><br><span style="color: hsl(120, 100%, 40%);">+        super().__init__(log.C_BUS, 'Vty', host=host, port=port)</span><br><span style="color: hsl(120, 100%, 40%);">+        self.host = host</span><br><span style="color: hsl(120, 100%, 40%);">+        self.port = port</span><br><span style="color: hsl(120, 100%, 40%);">+        self.sck = None</span><br><span style="color: hsl(120, 100%, 40%);">+        self.prompt = prompt</span><br><span style="color: hsl(120, 100%, 40%);">+        self.re_prompt = None</span><br><span style="color: hsl(120, 100%, 40%);">+        self.this_node = None</span><br><span style="color: hsl(120, 100%, 40%);">+        self.this_prompt_char = None</span><br><span style="color: hsl(120, 100%, 40%);">+        self.last_node = None</span><br><span style="color: hsl(120, 100%, 40%);">+        self.last_prompt_char = None</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def connect(self):</span><br><span style="color: hsl(120, 100%, 40%);">+        attempts = 10;</span><br><span style="color: hsl(120, 100%, 40%);">+        while True:</span><br><span style="color: hsl(120, 100%, 40%);">+            try:</span><br><span style="color: hsl(120, 100%, 40%);">+                self.dbg('Connecting')</span><br><span style="color: hsl(120, 100%, 40%);">+                self.sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)</span><br><span style="color: hsl(120, 100%, 40%);">+                self.sck.connect((self.host, self.port))</span><br><span style="color: hsl(120, 100%, 40%);">+            except ConnectionRefusedError:</span><br><span style="color: hsl(120, 100%, 40%);">+                attempts -= 1</span><br><span style="color: hsl(120, 100%, 40%);">+                if attempts < 1:</span><br><span style="color: hsl(120, 100%, 40%);">+                    raise</span><br><span style="color: hsl(120, 100%, 40%);">+                self.dbg('Connection Refused, trying again (attempts left: %d)' % attempts)</span><br><span style="color: hsl(120, 100%, 40%);">+                time.sleep(3)</span><br><span style="color: hsl(120, 100%, 40%);">+                continue</span><br><span style="color: hsl(120, 100%, 40%);">+            break</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        self.sck.setblocking(1)</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        # read first prompt</span><br><span style="color: hsl(120, 100%, 40%);">+        # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)</span><br><span style="color: hsl(120, 100%, 40%);">+        self.this_node = None</span><br><span style="color: hsl(120, 100%, 40%);">+        self.this_prompt_char = '>' # slight cheat for initial prompt char</span><br><span style="color: hsl(120, 100%, 40%);">+        self.last_node = None</span><br><span style="color: hsl(120, 100%, 40%);">+        self.last_prompt_char = None</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        data = self.sck.recv(4096)</span><br><span style="color: hsl(120, 100%, 40%);">+        if not self.prompt:</span><br><span style="color: hsl(120, 100%, 40%);">+            b = data</span><br><span style="color: hsl(120, 100%, 40%);">+            b = b[b.rfind(b'\n') + 1:]</span><br><span style="color: hsl(120, 100%, 40%);">+            while b and (b[0] < ord('A') or b[0] > ord('z')):</span><br><span style="color: hsl(120, 100%, 40%);">+                b = b[1:]</span><br><span style="color: hsl(120, 100%, 40%);">+            prompt_str = b.decode('utf-8')</span><br><span style="color: hsl(120, 100%, 40%);">+            if '>' in prompt_str:</span><br><span style="color: hsl(120, 100%, 40%);">+                self.prompt = prompt_str[:prompt_str.find('>')]</span><br><span style="color: hsl(120, 100%, 40%);">+            self.dbg(prompt=self.prompt)</span><br><span style="color: hsl(120, 100%, 40%);">+        if not self.prompt:</span><br><span style="color: hsl(120, 100%, 40%);">+            raise VtyInterfaceExn('Could not find application name; needed to decode prompts.'</span><br><span style="color: hsl(120, 100%, 40%);">+                    ' Initial data was: %r' % data)</span><br><span style="color: hsl(120, 100%, 40%);">+        self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def disconnect(self):</span><br><span style="color: hsl(120, 100%, 40%);">+        self.dbg('Disconnecting')</span><br><span style="color: hsl(120, 100%, 40%);">+        if self.sck is not None:</span><br><span style="color: hsl(120, 100%, 40%);">+            self.sck.close()</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def _command(self, command_str, timeout=10, strict=True):</span><br><span style="color: hsl(120, 100%, 40%);">+        # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)</span><br><span style="color: hsl(120, 100%, 40%);">+        self.dbg('Sending', command_str=command_str)</span><br><span style="color: hsl(120, 100%, 40%);">+        self.sck.send(command_str.encode())</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        waited_since = time.time()</span><br><span style="color: hsl(120, 100%, 40%);">+        received_lines = []</span><br><span style="color: hsl(120, 100%, 40%);">+        last_line = ''</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        while True:</span><br><span style="color: hsl(120, 100%, 40%);">+            new_data = self.sck.recv(4096).decode('utf-8')</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+            last_line = "%s%s" % (last_line, new_data)</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+            if last_line:</span><br><span style="color: hsl(120, 100%, 40%);">+                # Separate the received response into lines.</span><br><span style="color: hsl(120, 100%, 40%);">+                # But note: the VTY logging currently separates with '\n\r', not '\r\n',</span><br><span style="color: hsl(120, 100%, 40%);">+                # see _vty_output() in libosmocore logging_vty.c.</span><br><span style="color: hsl(120, 100%, 40%);">+                # So we need to jump through hoops to not separate 'abc\n\rdef' as</span><br><span style="color: hsl(120, 100%, 40%);">+                # [ 'abc', '', 'def' ]; but also not to convert '\r\n\r\n' to '\r\n\n' ('\r{\r\n}\n')</span><br><span style="color: hsl(120, 100%, 40%);">+                # Simplest is to just drop all the '\r' and only care about the '\n'.</span><br><span style="color: hsl(120, 100%, 40%);">+                last_line = last_line.replace('\r', '')</span><br><span style="color: hsl(120, 100%, 40%);">+                lines = last_line.splitlines()</span><br><span style="color: hsl(120, 100%, 40%);">+                if last_line.endswith('\n'):</span><br><span style="color: hsl(120, 100%, 40%);">+                    received_lines.extend(lines)</span><br><span style="color: hsl(120, 100%, 40%);">+                    last_line = ""</span><br><span style="color: hsl(120, 100%, 40%);">+                else:</span><br><span style="color: hsl(120, 100%, 40%);">+                    # if pkt buffer ends in the middle of a line, we need to keep</span><br><span style="color: hsl(120, 100%, 40%);">+                    # last non-finished line:</span><br><span style="color: hsl(120, 100%, 40%);">+                    received_lines.extend(lines[:-1])</span><br><span style="color: hsl(120, 100%, 40%);">+                    last_line = lines[-1]</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+            match = self.re_prompt.match(last_line)</span><br><span style="color: hsl(120, 100%, 40%);">+            if not match:</span><br><span style="color: hsl(120, 100%, 40%);">+                if time.time() - waited_since > timeout:</span><br><span style="color: hsl(120, 100%, 40%);">+                    raise IOError("Failed to read data (did the app crash?)")</span><br><span style="color: hsl(120, 100%, 40%);">+                time.sleep(.1)</span><br><span style="color: hsl(120, 100%, 40%);">+                continue</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+            self.last_node = self.this_node</span><br><span style="color: hsl(120, 100%, 40%);">+            self.last_prompt_char = self.this_prompt_char</span><br><span style="color: hsl(120, 100%, 40%);">+            self.this_node = match.group(1) or None</span><br><span style="color: hsl(120, 100%, 40%);">+            self.this_prompt_char = match.group(2)</span><br><span style="color: hsl(120, 100%, 40%);">+            break</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        # expecting to have received the command we sent as echo, remove it</span><br><span style="color: hsl(120, 100%, 40%);">+        clean_command_str = command_str.strip()</span><br><span style="color: hsl(120, 100%, 40%);">+        if clean_command_str.endswith('?'):</span><br><span style="color: hsl(120, 100%, 40%);">+            clean_command_str = clean_command_str[:-1]</span><br><span style="color: hsl(120, 100%, 40%);">+        if received_lines and received_lines[0] == clean_command_str:</span><br><span style="color: hsl(120, 100%, 40%);">+            received_lines = received_lines[1:]</span><br><span style="color: hsl(120, 100%, 40%);">+        if len(received_lines) > 1:</span><br><span style="color: hsl(120, 100%, 40%);">+            self.dbg('Received\n|', '\n| '.join(received_lines), '\n')</span><br><span style="color: hsl(120, 100%, 40%);">+        elif len(received_lines) == 1:</span><br><span style="color: hsl(120, 100%, 40%);">+            self.dbg('Received', repr(received_lines[0]))</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        if received_lines == ['% Unknown command.']:</span><br><span style="color: hsl(120, 100%, 40%);">+            errmsg = 'VTY reports unknown command: %r' % command_str</span><br><span style="color: hsl(120, 100%, 40%);">+            if strict:</span><br><span style="color: hsl(120, 100%, 40%);">+                raise VtyInterfaceExn(errmsg)</span><br><span style="color: hsl(120, 100%, 40%);">+            else:</span><br><span style="color: hsl(120, 100%, 40%);">+                self.log('ignoring error:', errmsg)</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        return received_lines</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def cmd(self, command_str, timeout=10, strict=True):</span><br><span style="color: hsl(120, 100%, 40%);">+        # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)</span><br><span style="color: hsl(120, 100%, 40%);">+        command_str = command_str or '\r'</span><br><span style="color: hsl(120, 100%, 40%);">+        if command_str[-1] not in '?\r\t':</span><br><span style="color: hsl(120, 100%, 40%);">+            command_str = command_str + '\r'</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        received_lines = self._command(command_str, timeout, strict)</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        # send escape to cancel the '?' command line</span><br><span style="color: hsl(120, 100%, 40%);">+        if command_str[-1] == '?':</span><br><span style="color: hsl(120, 100%, 40%);">+            self._command('\x03', timeout)</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+        return received_lines</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def cmds(self, *cmds, timeout=10, strict=True):</span><br><span style="color: hsl(120, 100%, 40%);">+        responses = []</span><br><span style="color: hsl(120, 100%, 40%);">+        for cmd in cmds:</span><br><span style="color: hsl(120, 100%, 40%);">+            responses.append(self.cmd(cmd, timeout, strict))</span><br><span style="color: hsl(120, 100%, 40%);">+        return responses</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def __enter__(self):</span><br><span style="color: hsl(120, 100%, 40%);">+        self.connect()</span><br><span style="color: hsl(120, 100%, 40%);">+        return self</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+    def __exit__(self, *exc_info):</span><br><span style="color: hsl(120, 100%, 40%);">+        self.disconnect()</span><br><span style="color: hsl(120, 100%, 40%);">+</span><br><span style="color: hsl(120, 100%, 40%);">+# vim: expandtab tabstop=4 shiftwidth=4</span><br><span></span><br></pre><p>To view, visit <a href="https://gerrit.osmocom.org/c/osmo-gsm-tester/+/21504">change 21504</a>. To unsubscribe, or for help writing mail filters, visit <a href="https://gerrit.osmocom.org/settings">settings</a>.</p><div itemscope itemtype="http://schema.org/EmailMessage"><div itemscope itemprop="action" itemtype="http://schema.org/ViewAction"><link itemprop="url" href="https://gerrit.osmocom.org/c/osmo-gsm-tester/+/21504"/><meta itemprop="name" content="View Change"/></div></div>

<div style="display:none"> Gerrit-Project: osmo-gsm-tester </div>
<div style="display:none"> Gerrit-Branch: master </div>
<div style="display:none"> Gerrit-Change-Id: I7c17b143b7c690b8c4105ee7c6272670046fa91d </div>
<div style="display:none"> Gerrit-Change-Number: 21504 </div>
<div style="display:none"> Gerrit-PatchSet: 1 </div>
<div style="display:none"> Gerrit-Owner: neels <nhofmeyr@sysmocom.de> </div>
<div style="display:none"> Gerrit-MessageType: newchange </div>