summaryrefslogtreecommitdiffstats
path: root/systemtest.py
blob: c81a35c0557eddca42be1676a17b350c5bc879e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python2

import subprocess
import socket
import sys
import time
import random
import struct
import timeout_decorator

verbose = True

def prefix_ipa_ctrl_header(data):
    return struct.pack(">HBB", len(data)+1, 0xee, 0) + data

def ipa_ctrl_header(header):
    (plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", header)
    return None if (ipa_proto != 0xee or osmo_proto != 0) else plen

def remove_ipa_ctrl_header(data):
    if (len(data) < 4):
        raise BaseException("Answer too short!")
    plen = ipa_ctrl_header(data[:4])
    if (None == plen):
        raise BaseException("Wrong protocol in answer!")
    if (plen + 3 > len(data)):
        print "Warning: Wrong payload length (expected %i, got %i)" % (plen, len(data) - 3)
    return data[4:plen+3], data[plen+3:]

def connect(host, port):
    if verbose:
        print "Connecting to host %s:%i" % (host, port)

    sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sck.setblocking(1)
    sck.connect((host, port))
    return sck

def send(sck, data):
    if verbose:
	print "Sending \"%s\"" %(data)
    data = prefix_ipa_ctrl_header(data)
    sck.send(data)

def do_set(var, value, op_id, sck):
    setmsg = "SET %s %s %s" %(op_id, var, value)
    send(sck, setmsg)

def do_get(var, op_id, sck):
    getmsg = "GET %s %s" %(op_id, var)
    send(sck, getmsg)

def do_set_get(sck, var, value = None):
    r = random.randint(1, sys.maxint)
    if (value != None):
        s = 'SET_REPLY'
        do_set(var, value, r, sck)
    else:
        s = 'GET_REPLY'
        do_get(var, r, sck)
    (answer, data) = remove_ipa_ctrl_header(sck.recv(4096))
    x  = answer.split()
    if (s == x[0] and str(r) == x[1] and var == x[2]):
        return None if ('SET_REPLY' == s and value != x[3]) else x[3]
    return None

def set_var(sck, var, val):
    return do_set_get(sck, var, val)

@timeout_decorator.timeout(20)
def run_ussd_test():
    gst = subprocess.Popen(["gst", "ussd/USSDTest.st"])
    res = gst.wait()
    print(res)
    if res != 23:
        raise Exception("Non success error code")

# Start OpenBSCand give it some time to start
obsc = subprocess.Popen(["../openbsc/openbsc/src/osmo-nitb/osmo-nitb", "-c", "../openbsc/openbsc/doc/examples/osmo-nitb/nanobts/openbsc.cfg", "-e", "1"])
time.sleep(10)

# try to create and authorize subscriber
sck = connect("localhost", 4249)
res = set_var(sck, b"subscriber-modify-v1", b"901010000001111,445567")
sck.close()


# Okay now.. execute the tests..
run_ussd_test()



obsc.terminate()
obsc.wait()
res = proc.wait()
print(res)
sys.exit(1)