aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/manual_test_server.py
blob: 7338129cd8e3360a9411a0cd9c394b8b48889550 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import mncc
import mncc_sock
import ctypes
import socket

import logging as log

GSM340_PLAN_ISDN = 1
GSM340_TYPE_NATIONAL = 2

class MnccMessageBuilder(object):
    """
    I help in creating messages...
    """
    @staticmethod
    def build_hello():
        hello = mncc.gsm_mncc_hello()
        hello.msg_type = mncc.MNCC_SOCKET_HELLO
        hello.version = mncc.MNCC_SOCK_VERSION
        hello.mncc_size = ctypes.sizeof(mncc.gsm_mncc)
        hello.data_frame_size = ctypes.sizeof(mncc.gsm_data_frame)
        hello.called_offset = mncc.gsm_mncc.called.offset
        hello.signal_offset = mncc.gsm_mncc.signal.offset
        hello.emergency_offset = mncc.gsm_mncc.emergency.offset
        hello.lchan_type_offset = mncc.gsm_mncc.lchan_type.offset
        return hello

    @staticmethod
    def build_mncc_number(number):
        return mncc.gsm_mncc_number(
                type=GSM340_TYPE_NATIONAL,
                plan=GSM340_PLAN_ISDN,
                number=number)

    @staticmethod
    def build_setup_ind(calling, called, callref=1):
        setup = mncc.gsm_mncc()
        setup.msg_type = mncc.MNCC_SETUP_IND
        setup.callref = callref
        setup.fields = mncc.MNCC_F_CALLED | mncc.MNCC_F_CALLING
        setup.called = MnccMessageBuilder.build_mncc_number(called)
        setup.calling = MnccMessageBuilder.build_mncc_number(calling)
        return setup

    @staticmethod
    def build_setup_cmpl_ind(callref=1):
        setup = mncc.gsm_mncc()
        setup.msg_type = mncc.MNCC_SETUP_COMPL_IND
        setup.callref = callref
        return setup

    @staticmethod
    def build_rtp_msg(msg_type, callref, addr, port):
        return mncc.gsm_mncc_rtp(
                    msg_type=msg_type, callref=callref,
                    ip=addr, port=port,
                    #payload_type=3,
                    #payload_msg_type=mncc.GSM_TCHF_FRAME)
                    payload_type=98,
                    payload_msg_type=mncc.GSM_TCH_FRAME_AMR)

    @staticmethod
    def build_dtmf_start(callref, data):
        return mncc.gsm_mncc(
                    msg_type=mncc.MNCC_START_DTMF_IND,
                    callref=callref,
                    fields=mncc.MNCC_F_KEYPAD,
                    keypad=ord(data))

    @staticmethod
    def build_dtmf_stop(callref, data):
        return mncc.gsm_mncc(
                    callref=callref,
                    msg_type=mncc.MNCC_STOP_DTMF_IND)

def send_dtmf(callref):
    global conn

    conn.send_msg(MnccMessageBuilder.build_dtmf_start(callref, '1'))
    conn.send_msg(MnccMessageBuilder.build_dtmf_stop(callref, '1'))
    conn.send_msg(MnccMessageBuilder.build_dtmf_start(callref, '2'))
    conn.send_msg(MnccMessageBuilder.build_dtmf_stop(callref, '2'))


log.basicConfig(level = log.DEBUG,
    format = "%(levelname)s %(filename)s:%(lineno)d %(message)s")

server = mncc_sock.MnccSocketServer()
conn = server.accept()

# Say hello and set-up a call
conn.send_msg(MnccMessageBuilder.build_hello())
conn.send_msg(MnccMessageBuilder.build_setup_ind("1234", "5000"))
print("=> Sent hello + setup indication")

# Wait for the RTP crate.. and actknowledge it..
msg = conn.recv()
assert msg.msg_type == mncc.MNCC_RTP_CREATE
print("<= Received request to create a RTP socket")
conn.send_msg(MnccMessageBuilder.build_rtp_msg(mncc.MNCC_RTP_CREATE,
                                                msg.callref,
                                                #socket.INADDR_LOOPBACK, 4000))
                                                socket.INADDR_ANY, 4000))
print("=> Claimed socket was created...")

msg = conn.recv()
assert msg.msg_type == mncc.MNCC_CALL_PROC_REQ
print("<= Received proceeding...")



while True:
    msg = conn.recv()
    if msg.msg_type == mncc.MNCC_ALERT_REQ:
        print("=> I should alert...")
        continue
    if msg.msg_type == mncc.MNCC_RTP_CONNECT:
        conn.send_msg(MnccMessageBuilder.build_rtp_msg(mncc.MNCC_RTP_CONNECT,
                                                msg.callref,
                                                socket.INADDR_LOOPBACK, 4000))
        print("=> I needed to connect RTP...")
        continue
    if msg.msg_type == mncc.MNCC_SETUP_RSP:
        print("=> Call is connected?")
        conn.send_msg(MnccMessageBuilder.build_setup_cmpl_ind(msg.callref))
        send_dtmf(msg.callref)
        continue

    print(msg)