osmith has uploaded this change for review. ( https://gerrit.osmocom.org/c/osmo-ttcn3-hacks/+/41420?usp=email )
Change subject: testenv: wait_for_port: add --protocol=sctp ......................................................................
testenv: wait_for_port: add --protocol=sctp
Prepare to use the script in a follow up patch to check if the diameter port of PyHSS is ready.
Change-Id: I6e75728b2e9b67c85d1ea2ae5ab15890074db272 --- M _testenv/data/scripts/wait_for_port.py 1 file changed, 38 insertions(+), 4 deletions(-)
git pull ssh://gerrit.osmocom.org:29418/osmo-ttcn3-hacks refs/changes/20/41420/1
diff --git a/_testenv/data/scripts/wait_for_port.py b/_testenv/data/scripts/wait_for_port.py index 2a85319..2670fa5 100755 --- a/_testenv/data/scripts/wait_for_port.py +++ b/_testenv/data/scripts/wait_for_port.py @@ -2,15 +2,19 @@ # Copyright 2024 sysmocom - s.f.m.c. GmbH # SPDX-License-Identifier: GPL-3.0-or-later # Wait until a port is available, useful for waiting until a SUT has started -import socket import argparse -import time +import contextlib +import socket import sys +import time + +with contextlib.suppress(ImportError): + import sctp
args = None
-def wait_for_port(): +def wait_for_port_tcp(): start_time = time.time() while True: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -25,6 +29,26 @@ time.sleep(0.1)
+def wait_for_port_sctp(): + if "sctp" not in sys.modules: + print("python module sctp is not installed, sleeping 3s instead...") + time.sleep(3) + return + + start_time = time.time() + while True: + try: + sk = sctp.sctpsocket_tcp(socket.AF_INET) + sk.connect((args.hostname, args.port)) + sk.close() + sys.exit(0) + except ConnectionRefusedError: + if time.time() - start_time >= args.timeout: + print(f"ERROR: {args.hostname}:{args.port} did not become available within {args.timeout}s!") + sys.exit(1) + time.sleep(0.1) + + def parse_args(): global args
@@ -48,9 +72,19 @@ default=5, help="timeout in seconds (default: 5).", ) + parser.add_argument( + "-P", + "--protocol", + choices=["tcp", "sctp"], + default="tcp", + ) args = parser.parse_args()
if __name__ == "__main__": parse_args() - wait_for_port() + match args.protocol: + case "tcp": + wait_for_port_tcp() + case "sctp": + wait_for_port_sctp()