diff options
author | Neels Hofmeyr <neels@hofmeyr.de> | 2017-09-25 23:22:02 +0200 |
---|---|---|
committer | Neels Hofmeyr <neels@hofmeyr.de> | 2017-09-28 18:52:57 +0200 |
commit | f95ce04cbda6caeedfc4d3d006313afff767f1af (patch) | |
tree | e86e8d994c54ae1bd1b24cb911be79885f954dbd | |
parent | 05c8b465ab2fe13edb67c95210a9b475f91ebeb3 (diff) |
add basic CTRL interface tests
Prepare for adding tests of enable-/disable-/status-ps CTRL commands.
Change-Id: Ie195169c574716b514da7e04a3ce9727ef70a55e
-rw-r--r-- | configure.ac | 15 | ||||
-rwxr-xr-x | contrib/ipa.py | 278 | ||||
-rwxr-xr-x | contrib/jenkins.sh | 2 | ||||
-rw-r--r-- | tests/Makefile.am | 10 | ||||
-rw-r--r-- | tests/ctrl_test_runner.py | 199 |
5 files changed, 503 insertions, 1 deletions
diff --git a/configure.ac b/configure.ac index 6532940..167d7f3 100644 --- a/configure.ac +++ b/configure.ac @@ -47,6 +47,21 @@ AC_CONFIG_MACRO_DIR([m4]) dnl checks for header files AC_HEADER_STDC +AC_ARG_ENABLE([external_tests], + AC_HELP_STRING([--enable-external-tests], + [Include the VTY/CTRL tests in make check [default=no]]), + [enable_ext_tests="$enableval"],[enable_ext_tests="no"]) +if test "x$enable_ext_tests" = "xyes" ; then + AM_PATH_PYTHON + AC_CHECK_PROG(OSMOTESTEXT_CHECK,osmotestvty.py,yes) + if test "x$OSMOTESTEXT_CHECK" != "xyes" ; then + AC_MSG_ERROR([Please install git://osmocom.org/python/osmo-python-tests to run the VTY/CTRL tests.]) + fi +fi +AC_MSG_CHECKING([whether to enable VTY/CTRL tests]) +AC_MSG_RESULT([$enable_ext_tests]) +AM_CONDITIONAL(ENABLE_EXT_TESTS, test "x$enable_ext_tests" = "xyes") + AC_OUTPUT( Makefile src/Makefile diff --git a/contrib/ipa.py b/contrib/ipa.py new file mode 100755 index 0000000..71cbf45 --- /dev/null +++ b/contrib/ipa.py @@ -0,0 +1,278 @@ +#!/usr/bin/python3 +# -*- mode: python-mode; py-indent-tabs-mode: nil -*- +""" +/* + * Copyright (C) 2016 sysmocom s.f.m.c. GmbH + * + * All Rights Reserved + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +""" + +import struct, random, sys + +class IPA(object): + """ + Stateless IPA protocol multiplexer: add/remove/parse (extended) header + """ + version = "0.0.5" + TCP_PORT_OML = 3002 + TCP_PORT_RSL = 3003 + # OpenBSC extensions: OSMO, MGCP_OLD + PROTO = dict(RSL=0x00, CCM=0xFE, SCCP=0xFD, OML=0xFF, OSMO=0xEE, MGCP_OLD=0xFC) + # ...OML Router Control, GSUP GPRS extension, Osmocom Authn Protocol + EXT = dict(CTRL=0, MGCP=1, LAC=2, SMSC=3, ORC=4, GSUP=5, OAP=6) + # OpenBSC extension: SCCP_OLD + MSGT = dict(PING=0x00, PONG=0x01, ID_GET=0x04, ID_RESP=0x05, ID_ACK=0x06, SCCP_OLD=0xFF) + _IDTAG = dict(SERNR=0, UNITNAME=1, LOCATION=2, TYPE=3, EQUIPVERS=4, SWVERSION=5, IPADDR=6, MACADDR=7, UNIT=8) + CTRL_GET = 'GET' + CTRL_SET = 'SET' + CTRL_REP = 'REPLY' + CTRL_ERR = 'ERR' + CTRL_TRAP = 'TRAP' + + def _l(self, d, p): + """ + Reverse dictionary lookup: return key for a given value + """ + if p is None: + return 'UNKNOWN' + return list(d.keys())[list(d.values()).index(p)] + + def _tag(self, t, v): + """ + Create TAG as TLV data + """ + return struct.pack(">HB", len(v) + 1, t) + v + + def proto(self, p): + """ + Lookup protocol name + """ + return self._l(self.PROTO, p) + + def ext(self, p): + """ + Lookup protocol extension name + """ + return self._l(self.EXT, p) + + def msgt(self, p): + """ + Lookup message type name + """ + return self._l(self.MSGT, p) + + def idtag(self, p): + """ + Lookup ID tag name + """ + return self._l(self._IDTAG, p) + + def ext_name(self, proto, exten): + """ + Return proper extension byte name depending on the protocol used + """ + if self.PROTO['CCM'] == proto: + return self.msgt(exten) + if self.PROTO['OSMO'] == proto: + return self.ext(exten) + return None + + def add_header(self, data, proto, ext=None): + """ + Add IPA header (with extension if necessary), data must be represented as bytes + """ + if ext is None: + return struct.pack(">HB", len(data) + 1, proto) + data + return struct.pack(">HBB", len(data) + 1, proto, ext) + data + + def del_header(self, data): + """ + Strip IPA protocol header correctly removing extension if present + Returns data length, IPA protocol, extension (or None if not defined for a give protocol) and the data without header + """ + if not len(data): + return None, None, None, None + (dlen, proto) = struct.unpack('>HB', data[:3]) + if self.PROTO['OSMO'] == proto or self.PROTO['CCM'] == proto: # there's extension which we have to unpack + return struct.unpack('>HBB', data[:4]) + (data[4:], ) # length, protocol, extension, data + return dlen, proto, None, data[3:] # length, protocol, _, data + + def split_combined(self, data): + """ + Split the data which contains multiple concatenated IPA messages into tuple (first, rest) where rest contains remaining messages, first is the single IPA message + """ + (length, _, _, _) = self.del_header(data) + return data[:(length + 3)], data[(length + 3):] + + def tag_serial(self, data): + """ + Make TAG for serial number + """ + return self._tag(self._IDTAG['SERNR'], data) + + def tag_name(self, data): + """ + Make TAG for unit name + """ + return self._tag(self._IDTAG['UNITNAME'], data) + + def tag_loc(self, data): + """ + Make TAG for location + """ + return self._tag(self._IDTAG['LOCATION'], data) + + def tag_type(self, data): + """ + Make TAG for unit type + """ + return self._tag(self._IDTAG['TYPE'], data) + + def tag_equip(self, data): + """ + Make TAG for equipment version + """ + return self._tag(self._IDTAG['EQUIPVERS'], data) + + def tag_sw(self, data): + """ + Make TAG for software version + """ + return self._tag(self._IDTAG['SWVERSION'], data) + + def tag_ip(self, data): + """ + Make TAG for IP address + """ + return self._tag(self._IDTAG['IPADDR'], data) + + def tag_mac(self, data): + """ + Make TAG for MAC address + """ + return self._tag(self._IDTAG['MACADDR'], data) + + def tag_unit(self, data): + """ + Make TAG for unit ID + """ + return self._tag(self._IDTAG['UNIT'], data) + + def identity(self, unit=b'', mac=b'', location=b'', utype=b'', equip=b'', sw=b'', name=b'', serial=b''): + """ + Make IPA IDENTITY tag list, by default returns empty concatenated bytes of tag list + """ + return self.tag_unit(unit) + self.tag_mac(mac) + self.tag_loc(location) + self.tag_type(utype) + self.tag_equip(equip) + self.tag_sw(sw) + self.tag_name(name) + self.tag_serial(serial) + + def ping(self): + """ + Make PING message + """ + return self.add_header(b'', self.PROTO['CCM'], self.MSGT['PING']) + + def pong(self): + """ + Make PONG message + """ + return self.add_header(b'', self.PROTO['CCM'], self.MSGT['PONG']) + + def id_ack(self): + """ + Make ID_ACK CCM message + """ + return self.add_header(b'', self.PROTO['CCM'], self.MSGT['ID_ACK']) + + def id_get(self): + """ + Make ID_GET CCM message + """ + return self.add_header(self.identity(), self.PROTO['CCM'], self.MSGT['ID_GET']) + + def id_resp(self, data): + """ + Make ID_RESP CCM message + """ + return self.add_header(data, self.PROTO['CCM'], self.MSGT['ID_RESP']) + +class Ctrl(IPA): + """ + Osmocom CTRL protocol implemented on top of IPA multiplexer + """ + def __init__(self): + random.seed() + + def add_header(self, data): + """ + Add CTRL header + """ + return super(Ctrl, self).add_header(data.encode('utf-8'), IPA.PROTO['OSMO'], IPA.EXT['CTRL']) + + def rem_header(self, data): + """ + Remove CTRL header, check for appropriate protocol and extension + """ + (_, proto, ext, d) = super(Ctrl, self).del_header(data) + if self.PROTO['OSMO'] != proto or self.EXT['CTRL'] != ext: + return None + return d + + def parse(self, data, op=None): + """ + Parse Ctrl string returning (var, value) pair + var could be None in case of ERROR message + value could be None in case of GET message + """ + (s, i, v) = data.split(' ', 2) + if s == self.CTRL_ERR: + return None, v + if s == self.CTRL_GET: + return v, None + (s, i, var, val) = data.split(' ', 3) + if s == self.CTRL_TRAP and i != '0': + return None, '%s with non-zero id %s' % (s, i) + if op is not None and i != op: + if s == self.CTRL_GET + '_' + self.CTRL_REP or s == self.CTRL_SET + '_' + self.CTRL_REP: + return None, '%s with unexpected id %s' % (s, i) + return var, val + + def trap(self, var, val): + """ + Make TRAP message with given (vak, val) pair + """ + return self.add_header("%s 0 %s %s" % (self.CTRL_TRAP, var, val)) + + def cmd(self, var, val=None): + """ + Make SET/GET command message: returns (r, m) tuple where r is random operation id and m is assembled message + """ + r = random.randint(1, sys.maxsize) + if val is not None: + return r, self.add_header("%s %s %s %s" % (self.CTRL_SET, r, var, val)) + return r, self.add_header("%s %s %s" % (self.CTRL_GET, r, var)) + + def verify(self, reply, r, var, val=None): + """ + Verify reply to SET/GET command: returns (b, v) tuple where v is True/False verification result and v is the variable value + """ + (k, v) = self.parse(reply) + if k != var or (val is not None and v != val): + return False, v + return True, v + +if __name__ == '__main__': + print("IPA multiplexer v%s loaded." % IPA.version) diff --git a/contrib/jenkins.sh b/contrib/jenkins.sh index e2abb60..b08c63b 100755 --- a/contrib/jenkins.sh +++ b/contrib/jenkins.sh @@ -35,7 +35,7 @@ set -x cd "$base" autoreconf --install --force -./configure +./configure --enable-external-tests $MAKE $PARALLEL_MAKE if [ "x$label" != "xFreeBSD_amd64" ]; then $MAKE check || cat-testlogs.sh diff --git a/tests/Makefile.am b/tests/Makefile.am index 0bd0820..d979fb6 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -25,6 +25,7 @@ EXTRA_DIST = \ testsuite.at \ $(srcdir)/package.m4 \ $(TESTSUITE) \ + ctrl_test_runner.py \ $(NULL) TESTSUITE = $(srcdir)/testsuite @@ -33,8 +34,17 @@ DISTCLEANFILES = \ atconfig \ $(NULL) +if ENABLE_EXT_TESTS +python-tests: $(BUILT_SOURCES) + $(PYTHON) $(srcdir)/ctrl_test_runner.py -w $(abs_top_builddir) -v +else +python-tests: $(BUILT_SOURCES) + echo "Not running python-based tests (determined at configure-time)" +endif + check-local: atconfig $(TESTSUITE) $(SHELL) '$(TESTSUITE)' $(TESTSUITEFLAGS) + $(MAKE) $(AM_MAKEFLAGS) python-tests installcheck-local: atconfig $(TESTSUITE) $(SHELL) '$(TESTSUITE)' AUTOTEST_PATH='$(bindir)' \ diff --git a/tests/ctrl_test_runner.py b/tests/ctrl_test_runner.py new file mode 100644 index 0000000..d453197 --- /dev/null +++ b/tests/ctrl_test_runner.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python + +# (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de> +# (C) 2014 by Holger Hans Peter Freyther +# based on vty_test_runner.py: +# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com> +# (C) 2013 by Holger Hans Peter Freyther +# based on bsc_control.py. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import time +import unittest +import socket +import sys +import struct +import subprocess + +import osmopy.osmoutil as osmoutil + +# add $top_srcdir/contrib to find ipa.py +sys.path.append(os.path.join(sys.path[0], '..', 'contrib')) + +from ipa import Ctrl, IPA + +# to be able to find $top_srcdir/doc/... +confpath = os.path.join(sys.path[0], '..') +verbose = False + +class TestCtrlBase(unittest.TestCase): + + def ctrl_command(self): + raise Exception("Needs to be implemented by a subclass") + + def ctrl_app(self): + raise Exception("Needs to be implemented by a subclass") + + def setUp(self): + osmo_ctrl_cmd = self.ctrl_command()[:] + config_index = osmo_ctrl_cmd.index('-c') + if config_index: + cfi = config_index + 1 + osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi]) + + try: + self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd) + except OSError: + print >> sys.stderr, "Current directory: %s" % os.getcwd() + print >> sys.stderr, "Consider setting -b" + time.sleep(2) + + appstring = self.ctrl_app()[2] + appport = self.ctrl_app()[0] + self.connect("127.0.0.1", appport) + self.next_id = 1000 + + def tearDown(self): + self.disconnect() + osmoutil.end_proc(self.proc) + + def disconnect(self): + if not (self.sock is None): + self.sock.close() + + def connect(self, host, port): + if verbose: + print "Connecting to host %s:%i" % (host, port) + + retries = 30 + while True: + try: + sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sck.setblocking(1) + sck.connect((host, port)) + except IOError: + retries -= 1 + if retries <= 0: + raise + time.sleep(.1) + continue + break + self.sock = sck + return sck + + def send(self, data): + if verbose: + print "Sending \"%s\"" %(data) + data = Ctrl().add_header(data) + return self.sock.send(data) == len(data) + + def send_set(self, var, value, id): + setmsg = "SET %s %s %s" %(id, var, value) + return self.send(setmsg) + + def send_get(self, var, id): + getmsg = "GET %s %s" %(id, var) + return self.send(getmsg) + + def do_set(self, var, value): + id = self.next_id + self.next_id += 1 + self.send_set(var, value, id) + return self.recv_msgs()[id] + + def do_get(self, var): + id = self.next_id + self.next_id += 1 + self.send_get(var, id) + return self.recv_msgs()[id] + + def assert_reply(self, r, mtype, var, val): + expect = dict(mtype=mtype, var=var, value=val) + result_matches = all([r.get(k) == expect.get(k) for k in expect.keys()]) + if not result_matches: + print('\nError details:\nGot reply: %r\nExpected reply: %r\n' % (r, expect)) + self.assertTrue(result_matches) + + def assert_set(self, var, val, result_val): + r = self.do_set(var, val) + self.assert_reply(r, 'SET_REPLY', var, result_val) + + def assert_get(self, var, result_val): + r = self.do_get(var) + self.assert_reply(r, 'GET_REPLY', var, result_val) + + def recv_msgs(self): + responses = {} + data = self.sock.recv(4096) + while (len(data)>0): + (head, data) = IPA().split_combined(data) + answer = Ctrl().rem_header(head) + if verbose: + print "Got message:", answer + (mtype, id, msg) = answer.split(None, 2) + id = int(id) + rsp = {'mtype': mtype, 'id': id} + if mtype == "ERROR": + rsp['error'] = msg + else: + split = msg.split(None, 1) + rsp['var'] = split[0] + if len(split) > 1: + rsp['value'] = split[1] + else: + rsp['value'] = None + responses[id] = rsp + + if verbose: + print "Decoded replies: ", responses + + return responses + + +if __name__ == '__main__': + import argparse + import sys + + workdir = '.' + + parser = argparse.ArgumentParser() + parser.add_argument("-v", "--verbose", dest="verbose", + action="store_true", help="verbose mode") + parser.add_argument("-p", "--pythonconfpath", dest="p", + help="searchpath for config") + parser.add_argument("-w", "--workdir", dest="w", + help="Working directory") + args = parser.parse_args() + + verbose_level = 1 + if args.verbose: + verbose_level = 2 + verbose = True + + if args.w: + workdir = args.w + + if args.p: + confpath = args.p + + print "confpath %s, workdir %s" % (confpath, workdir) + os.chdir(workdir) + print "Running tests for specific control commands" + suite = unittest.TestSuite() + res = unittest.TextTestRunner(verbosity=verbose_level).run(suite) + sys.exit(len(res.errors) + len(res.failures)) + +# vim: tabstop=4 shiftwidth=4 expandtab |