[PATCH] osmo-sip-connector[master]: tests: Add a basic test to open a single call and send DTMF

This is merely a historical archive of years 2008-2021, before the migration to mailman3.

A maintained and still updated list archive can be found at https://lists.osmocom.org/hyperkitty/list/gerrit-log@lists.osmocom.org/.

Holger Freyther gerrit-no-reply at lists.osmocom.org
Tue Mar 7 17:08:33 UTC 2017


Review at  https://gerrit.osmocom.org/1994

tests: Add a basic test to open a single call and send DTMF

Add a simple/non-complete manual test to open a call and send
DTMF. It doesn't handle hangup yet and can't detect if the conn
is gone.

Change-Id: Iec65a3490dd2a6c91f0a573aea482f3d113b0c6a
---
M tests/Makefile.am
A tests/manual_test.py
M tests/mncc_sock.py
3 files changed, 159 insertions(+), 14 deletions(-)


  git pull ssh://gerrit.osmocom.org:29418/osmo-sip-connector refs/changes/94/1994/1

diff --git a/tests/Makefile.am b/tests/Makefile.am
index 73f0bad..428e297 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -11,4 +11,4 @@
 	$(MAKE) $(AM_MAKEFLAGS) python-tests
 
 
-EXTRA_DIST = mncc.py mncc_sock.py
+EXTRA_DIST = mncc.py mncc_sock.py manual_test.py
diff --git a/tests/manual_test.py b/tests/manual_test.py
new file mode 100644
index 0000000..6cd1551
--- /dev/null
+++ b/tests/manual_test.py
@@ -0,0 +1,126 @@
+import mncc
+import mncc_sock
+import ctypes
+import socket
+
+GSM340_PLAN_ISDN = 1
+GSM340_TYPE_NATIONAL = 2
+
+class MnccMessageBuilder(object):
+    """
+    I help in creating messages...
+    """
+    @staticmethod
+    def build_hello():
+        hello = mncc.gsm_mncc_hello()
+        hello.msg_type = mncc.MNCC_SOCKET_HELLO
+        hello.version = mncc.MNCC_SOCK_VERSION
+        hello.mncc_size = ctypes.sizeof(mncc.gsm_mncc)
+        hello.data_frame_size = ctypes.sizeof(mncc.gsm_data_frame)
+        hello.called_offset = mncc.gsm_mncc.called.offset
+        hello.signal_offset = mncc.gsm_mncc.signal.offset
+        hello.emergency_offset = mncc.gsm_mncc.emergency.offset
+        hello.lchan_type_offset = mncc.gsm_mncc.lchan_type.offset
+        return hello
+
+    @staticmethod
+    def build_mncc_number(number):
+        return mncc.gsm_mncc_number(
+                type=GSM340_TYPE_NATIONAL,
+                plan=GSM340_PLAN_ISDN,
+                number=number)
+
+    @staticmethod
+    def build_setup_ind(calling, called, callref=1):
+        setup = mncc.gsm_mncc()
+        setup.msg_type = mncc.MNCC_SETUP_IND
+        setup.callref = callref
+        setup.fields = mncc.MNCC_F_CALLED | mncc.MNCC_F_CALLING
+        setup.called = MnccMessageBuilder.build_mncc_number(called)
+        setup.calling = MnccMessageBuilder.build_mncc_number(calling)
+        return setup
+
+    @staticmethod
+    def build_setup_cmpl_ind(callref=1):
+        setup = mncc.gsm_mncc()
+        setup.msg_type = mncc.MNCC_SETUP_COMPL_IND
+        setup.callref = callref
+        return setup
+
+    @staticmethod
+    def build_rtp_msg(msg_type, callref, addr, port):
+        return mncc.gsm_mncc_rtp(
+                    msg_type=msg_type, callref=callref,
+                    ip=addr, port=port,
+                    #payload_type=3,
+                    #payload_msg_type=mncc.GSM_TCHF_FRAME) 
+                    payload_type=98,
+                    payload_msg_type=mncc.GSM_TCH_FRAME_AMR) 
+
+    @staticmethod
+    def build_dtmf_start(callref, data):
+        return mncc.gsm_mncc(
+                    msg_type=mncc.MNCC_START_DTMF_IND,
+                    callref=callref,
+                    fields=mncc.MNCC_F_KEYPAD,
+                    keypad=ord(data))
+
+    @staticmethod
+    def build_dtmf_stop(callref, data):
+        return mncc.gsm_mncc(
+                    callref=callref,
+                    msg_type=mncc.MNCC_STOP_DTMF_IND)
+
+def send_dtmf(callref):
+    global conn
+
+    conn.send_msg(MnccMessageBuilder.build_dtmf_start(callref, '1'))
+    conn.send_msg(MnccMessageBuilder.build_dtmf_stop(callref, '1'))
+    conn.send_msg(MnccMessageBuilder.build_dtmf_start(callref, '2'))
+    conn.send_msg(MnccMessageBuilder.build_dtmf_stop(callref, '2'))
+    
+
+server = mncc_sock.MnccSocketServer()
+conn = server.accept()
+
+# Say hello and set-up a call
+conn.send_msg(MnccMessageBuilder.build_hello())
+conn.send_msg(MnccMessageBuilder.build_setup_ind("1234", "5000"))
+print("=> Sent hello + setup indication")
+
+# Wait for the RTP crate.. and actknowledge it..
+msg = conn.recv()
+assert msg.msg_type == mncc.MNCC_RTP_CREATE
+print("<= Received request to create a RTP socket")
+conn.send_msg(MnccMessageBuilder.build_rtp_msg(mncc.MNCC_RTP_CREATE,
+                                                msg.callref,
+                                                #socket.INADDR_LOOPBACK, 4000))
+                                                socket.INADDR_ANY, 4000))
+print("=> Claimed socket was created...")
+
+msg = conn.recv()
+assert msg.msg_type == mncc.MNCC_CALL_PROC_REQ
+print("<= Received proceeding...")
+
+
+
+while True:
+    msg = conn.recv()
+    if msg.msg_type == mncc.MNCC_ALERT_REQ:
+        print("=> I should alert...")
+        continue
+    if msg.msg_type == mncc.MNCC_RTP_CONNECT:
+        conn.send_msg(MnccMessageBuilder.build_rtp_msg(mncc.MNCC_RTP_CONNECT,
+                                                msg.callref,
+                                                socket.INADDR_LOOPBACK, 4000))
+        print("=> I needed to connect RTP...") 
+        continue
+    if msg.msg_type == mncc.MNCC_SETUP_RSP:
+        print("=> Call is connected?")
+        conn.send_msg(MnccMessageBuilder.build_setup_cmpl_ind(msg.callref))
+        send_dtmf(msg.callref)
+        continue
+    
+    print(msg)
+    print(msg.msg_type)
+print(type(msg))
diff --git a/tests/mncc_sock.py b/tests/mncc_sock.py
index 8d3e2b9..3d29691 100644
--- a/tests/mncc_sock.py
+++ b/tests/mncc_sock.py
@@ -52,21 +52,13 @@
                                 plan = num_plan, present = num_present,
                                 screen = num_screen)
 
-class MnccSocket(object):
-    def __init__(self, address = '/tmp/bsc_mncc'):
-        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
-        print 'connectiong to %s' % address
-        try:
-            self.sock.connect(address)
-        except socket.error, errmsg:
-            print >>sys.stderr, errmsg
-            sys.exit(1)
-
-        # FIXME: parse the HELLO message
-        msg = self.recv()
-
+class MnccSocketBase(object):
     def send(self, msg):
         return self.sock.sendall(msg.send())
+
+    def send_msg(self, msg):
+        data = buffer(msg)[:]
+        return self.sock.sendall(data)
 
     def recv(self):
         data = self.sock.recv(1500)
@@ -76,3 +68,30 @@
                ms = mncc_rtp_msg()
                ms.receive(data)
         return ms
+
+class MnccSocket(MnccSocketBase):
+    def __init__(self, address = '/tmp/bsc_mncc'):
+        super(MnccSocketBase, self).__init__()
+        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        print('connecting to %s' % address)
+        try:
+            self.sock.connect(address)
+        except socket.error as errmsg:
+            sys.stderr.write("%s\n" % errmsg)
+            sys.exit(1)
+
+        # FIXME: parse the HELLO message
+        msg = self.recv()
+
+class MnccSocketServer(object):
+    def __init__(self, address = '/tmp/bsc_mncc'):
+        os.unlink(address)
+        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
+        self.sock.bind(address)
+        self.sock.listen(5)
+
+    def accept(self):
+        (fd,_) = self.sock.accept()
+        sock = MnccSocketBase()
+        sock.sock = fd
+        return sock

-- 
To view, visit https://gerrit.osmocom.org/1994
To unsubscribe, visit https://gerrit.osmocom.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iec65a3490dd2a6c91f0a573aea482f3d113b0c6a
Gerrit-PatchSet: 1
Gerrit-Project: osmo-sip-connector
Gerrit-Branch: master
Gerrit-Owner: Holger Freyther <holger at freyther.de>



More information about the gerrit-log mailing list