aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNeels Hofmeyr <neels@hofmeyr.de>2017-09-25 23:22:02 +0200
committerNeels Hofmeyr <neels@hofmeyr.de>2017-09-28 18:52:57 +0200
commitf95ce04cbda6caeedfc4d3d006313afff767f1af (patch)
treee86e8d994c54ae1bd1b24cb911be79885f954dbd
parent05c8b465ab2fe13edb67c95210a9b475f91ebeb3 (diff)
add basic CTRL interface tests
Prepare for adding tests of enable-/disable-/status-ps CTRL commands. Change-Id: Ie195169c574716b514da7e04a3ce9727ef70a55e
-rw-r--r--configure.ac15
-rwxr-xr-xcontrib/ipa.py278
-rwxr-xr-xcontrib/jenkins.sh2
-rw-r--r--tests/Makefile.am10
-rw-r--r--tests/ctrl_test_runner.py199
5 files changed, 503 insertions, 1 deletions
diff --git a/configure.ac b/configure.ac
index 6532940..167d7f3 100644
--- a/configure.ac
+++ b/configure.ac
@@ -47,6 +47,21 @@ AC_CONFIG_MACRO_DIR([m4])
dnl checks for header files
AC_HEADER_STDC
+AC_ARG_ENABLE([external_tests],
+ AC_HELP_STRING([--enable-external-tests],
+ [Include the VTY/CTRL tests in make check [default=no]]),
+ [enable_ext_tests="$enableval"],[enable_ext_tests="no"])
+if test "x$enable_ext_tests" = "xyes" ; then
+ AM_PATH_PYTHON
+ AC_CHECK_PROG(OSMOTESTEXT_CHECK,osmotestvty.py,yes)
+ if test "x$OSMOTESTEXT_CHECK" != "xyes" ; then
+ AC_MSG_ERROR([Please install git://osmocom.org/python/osmo-python-tests to run the VTY/CTRL tests.])
+ fi
+fi
+AC_MSG_CHECKING([whether to enable VTY/CTRL tests])
+AC_MSG_RESULT([$enable_ext_tests])
+AM_CONDITIONAL(ENABLE_EXT_TESTS, test "x$enable_ext_tests" = "xyes")
+
AC_OUTPUT(
Makefile
src/Makefile
diff --git a/contrib/ipa.py b/contrib/ipa.py
new file mode 100755
index 0000000..71cbf45
--- /dev/null
+++ b/contrib/ipa.py
@@ -0,0 +1,278 @@
+#!/usr/bin/python3
+# -*- mode: python-mode; py-indent-tabs-mode: nil -*-
+"""
+/*
+ * Copyright (C) 2016 sysmocom s.f.m.c. GmbH
+ *
+ * All Rights Reserved
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+"""
+
+import struct, random, sys
+
+class IPA(object):
+ """
+ Stateless IPA protocol multiplexer: add/remove/parse (extended) header
+ """
+ version = "0.0.5"
+ TCP_PORT_OML = 3002
+ TCP_PORT_RSL = 3003
+ # OpenBSC extensions: OSMO, MGCP_OLD
+ PROTO = dict(RSL=0x00, CCM=0xFE, SCCP=0xFD, OML=0xFF, OSMO=0xEE, MGCP_OLD=0xFC)
+ # ...OML Router Control, GSUP GPRS extension, Osmocom Authn Protocol
+ EXT = dict(CTRL=0, MGCP=1, LAC=2, SMSC=3, ORC=4, GSUP=5, OAP=6)
+ # OpenBSC extension: SCCP_OLD
+ MSGT = dict(PING=0x00, PONG=0x01, ID_GET=0x04, ID_RESP=0x05, ID_ACK=0x06, SCCP_OLD=0xFF)
+ _IDTAG = dict(SERNR=0, UNITNAME=1, LOCATION=2, TYPE=3, EQUIPVERS=4, SWVERSION=5, IPADDR=6, MACADDR=7, UNIT=8)
+ CTRL_GET = 'GET'
+ CTRL_SET = 'SET'
+ CTRL_REP = 'REPLY'
+ CTRL_ERR = 'ERR'
+ CTRL_TRAP = 'TRAP'
+
+ def _l(self, d, p):
+ """
+ Reverse dictionary lookup: return key for a given value
+ """
+ if p is None:
+ return 'UNKNOWN'
+ return list(d.keys())[list(d.values()).index(p)]
+
+ def _tag(self, t, v):
+ """
+ Create TAG as TLV data
+ """
+ return struct.pack(">HB", len(v) + 1, t) + v
+
+ def proto(self, p):
+ """
+ Lookup protocol name
+ """
+ return self._l(self.PROTO, p)
+
+ def ext(self, p):
+ """
+ Lookup protocol extension name
+ """
+ return self._l(self.EXT, p)
+
+ def msgt(self, p):
+ """
+ Lookup message type name
+ """
+ return self._l(self.MSGT, p)
+
+ def idtag(self, p):
+ """
+ Lookup ID tag name
+ """
+ return self._l(self._IDTAG, p)
+
+ def ext_name(self, proto, exten):
+ """
+ Return proper extension byte name depending on the protocol used
+ """
+ if self.PROTO['CCM'] == proto:
+ return self.msgt(exten)
+ if self.PROTO['OSMO'] == proto:
+ return self.ext(exten)
+ return None
+
+ def add_header(self, data, proto, ext=None):
+ """
+ Add IPA header (with extension if necessary), data must be represented as bytes
+ """
+ if ext is None:
+ return struct.pack(">HB", len(data) + 1, proto) + data
+ return struct.pack(">HBB", len(data) + 1, proto, ext) + data
+
+ def del_header(self, data):
+ """
+ Strip IPA protocol header correctly removing extension if present
+ Returns data length, IPA protocol, extension (or None if not defined for a give protocol) and the data without header
+ """
+ if not len(data):
+ return None, None, None, None
+ (dlen, proto) = struct.unpack('>HB', data[:3])
+ if self.PROTO['OSMO'] == proto or self.PROTO['CCM'] == proto: # there's extension which we have to unpack
+ return struct.unpack('>HBB', data[:4]) + (data[4:], ) # length, protocol, extension, data
+ return dlen, proto, None, data[3:] # length, protocol, _, data
+
+ def split_combined(self, data):
+ """
+ Split the data which contains multiple concatenated IPA messages into tuple (first, rest) where rest contains remaining messages, first is the single IPA message
+ """
+ (length, _, _, _) = self.del_header(data)
+ return data[:(length + 3)], data[(length + 3):]
+
+ def tag_serial(self, data):
+ """
+ Make TAG for serial number
+ """
+ return self._tag(self._IDTAG['SERNR'], data)
+
+ def tag_name(self, data):
+ """
+ Make TAG for unit name
+ """
+ return self._tag(self._IDTAG['UNITNAME'], data)
+
+ def tag_loc(self, data):
+ """
+ Make TAG for location
+ """
+ return self._tag(self._IDTAG['LOCATION'], data)
+
+ def tag_type(self, data):
+ """
+ Make TAG for unit type
+ """
+ return self._tag(self._IDTAG['TYPE'], data)
+
+ def tag_equip(self, data):
+ """
+ Make TAG for equipment version
+ """
+ return self._tag(self._IDTAG['EQUIPVERS'], data)
+
+ def tag_sw(self, data):
+ """
+ Make TAG for software version
+ """
+ return self._tag(self._IDTAG['SWVERSION'], data)
+
+ def tag_ip(self, data):
+ """
+ Make TAG for IP address
+ """
+ return self._tag(self._IDTAG['IPADDR'], data)
+
+ def tag_mac(self, data):
+ """
+ Make TAG for MAC address
+ """
+ return self._tag(self._IDTAG['MACADDR'], data)
+
+ def tag_unit(self, data):
+ """
+ Make TAG for unit ID
+ """
+ return self._tag(self._IDTAG['UNIT'], data)
+
+ def identity(self, unit=b'', mac=b'', location=b'', utype=b'', equip=b'', sw=b'', name=b'', serial=b''):
+ """
+ Make IPA IDENTITY tag list, by default returns empty concatenated bytes of tag list
+ """
+ return self.tag_unit(unit) + self.tag_mac(mac) + self.tag_loc(location) + self.tag_type(utype) + self.tag_equip(equip) + self.tag_sw(sw) + self.tag_name(name) + self.tag_serial(serial)
+
+ def ping(self):
+ """
+ Make PING message
+ """
+ return self.add_header(b'', self.PROTO['CCM'], self.MSGT['PING'])
+
+ def pong(self):
+ """
+ Make PONG message
+ """
+ return self.add_header(b'', self.PROTO['CCM'], self.MSGT['PONG'])
+
+ def id_ack(self):
+ """
+ Make ID_ACK CCM message
+ """
+ return self.add_header(b'', self.PROTO['CCM'], self.MSGT['ID_ACK'])
+
+ def id_get(self):
+ """
+ Make ID_GET CCM message
+ """
+ return self.add_header(self.identity(), self.PROTO['CCM'], self.MSGT['ID_GET'])
+
+ def id_resp(self, data):
+ """
+ Make ID_RESP CCM message
+ """
+ return self.add_header(data, self.PROTO['CCM'], self.MSGT['ID_RESP'])
+
+class Ctrl(IPA):
+ """
+ Osmocom CTRL protocol implemented on top of IPA multiplexer
+ """
+ def __init__(self):
+ random.seed()
+
+ def add_header(self, data):
+ """
+ Add CTRL header
+ """
+ return super(Ctrl, self).add_header(data.encode('utf-8'), IPA.PROTO['OSMO'], IPA.EXT['CTRL'])
+
+ def rem_header(self, data):
+ """
+ Remove CTRL header, check for appropriate protocol and extension
+ """
+ (_, proto, ext, d) = super(Ctrl, self).del_header(data)
+ if self.PROTO['OSMO'] != proto or self.EXT['CTRL'] != ext:
+ return None
+ return d
+
+ def parse(self, data, op=None):
+ """
+ Parse Ctrl string returning (var, value) pair
+ var could be None in case of ERROR message
+ value could be None in case of GET message
+ """
+ (s, i, v) = data.split(' ', 2)
+ if s == self.CTRL_ERR:
+ return None, v
+ if s == self.CTRL_GET:
+ return v, None
+ (s, i, var, val) = data.split(' ', 3)
+ if s == self.CTRL_TRAP and i != '0':
+ return None, '%s with non-zero id %s' % (s, i)
+ if op is not None and i != op:
+ if s == self.CTRL_GET + '_' + self.CTRL_REP or s == self.CTRL_SET + '_' + self.CTRL_REP:
+ return None, '%s with unexpected id %s' % (s, i)
+ return var, val
+
+ def trap(self, var, val):
+ """
+ Make TRAP message with given (vak, val) pair
+ """
+ return self.add_header("%s 0 %s %s" % (self.CTRL_TRAP, var, val))
+
+ def cmd(self, var, val=None):
+ """
+ Make SET/GET command message: returns (r, m) tuple where r is random operation id and m is assembled message
+ """
+ r = random.randint(1, sys.maxsize)
+ if val is not None:
+ return r, self.add_header("%s %s %s %s" % (self.CTRL_SET, r, var, val))
+ return r, self.add_header("%s %s %s" % (self.CTRL_GET, r, var))
+
+ def verify(self, reply, r, var, val=None):
+ """
+ Verify reply to SET/GET command: returns (b, v) tuple where v is True/False verification result and v is the variable value
+ """
+ (k, v) = self.parse(reply)
+ if k != var or (val is not None and v != val):
+ return False, v
+ return True, v
+
+if __name__ == '__main__':
+ print("IPA multiplexer v%s loaded." % IPA.version)
diff --git a/contrib/jenkins.sh b/contrib/jenkins.sh
index e2abb60..b08c63b 100755
--- a/contrib/jenkins.sh
+++ b/contrib/jenkins.sh
@@ -35,7 +35,7 @@ set -x
cd "$base"
autoreconf --install --force
-./configure
+./configure --enable-external-tests
$MAKE $PARALLEL_MAKE
if [ "x$label" != "xFreeBSD_amd64" ]; then
$MAKE check || cat-testlogs.sh
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 0bd0820..d979fb6 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -25,6 +25,7 @@ EXTRA_DIST = \
testsuite.at \
$(srcdir)/package.m4 \
$(TESTSUITE) \
+ ctrl_test_runner.py \
$(NULL)
TESTSUITE = $(srcdir)/testsuite
@@ -33,8 +34,17 @@ DISTCLEANFILES = \
atconfig \
$(NULL)
+if ENABLE_EXT_TESTS
+python-tests: $(BUILT_SOURCES)
+ $(PYTHON) $(srcdir)/ctrl_test_runner.py -w $(abs_top_builddir) -v
+else
+python-tests: $(BUILT_SOURCES)
+ echo "Not running python-based tests (determined at configure-time)"
+endif
+
check-local: atconfig $(TESTSUITE)
$(SHELL) '$(TESTSUITE)' $(TESTSUITEFLAGS)
+ $(MAKE) $(AM_MAKEFLAGS) python-tests
installcheck-local: atconfig $(TESTSUITE)
$(SHELL) '$(TESTSUITE)' AUTOTEST_PATH='$(bindir)' \
diff --git a/tests/ctrl_test_runner.py b/tests/ctrl_test_runner.py
new file mode 100644
index 0000000..d453197
--- /dev/null
+++ b/tests/ctrl_test_runner.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python
+
+# (C) 2013 by Jacob Erlbeck <jerlbeck@sysmocom.de>
+# (C) 2014 by Holger Hans Peter Freyther
+# based on vty_test_runner.py:
+# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
+# (C) 2013 by Holger Hans Peter Freyther
+# based on bsc_control.py.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import time
+import unittest
+import socket
+import sys
+import struct
+import subprocess
+
+import osmopy.osmoutil as osmoutil
+
+# add $top_srcdir/contrib to find ipa.py
+sys.path.append(os.path.join(sys.path[0], '..', 'contrib'))
+
+from ipa import Ctrl, IPA
+
+# to be able to find $top_srcdir/doc/...
+confpath = os.path.join(sys.path[0], '..')
+verbose = False
+
+class TestCtrlBase(unittest.TestCase):
+
+ def ctrl_command(self):
+ raise Exception("Needs to be implemented by a subclass")
+
+ def ctrl_app(self):
+ raise Exception("Needs to be implemented by a subclass")
+
+ def setUp(self):
+ osmo_ctrl_cmd = self.ctrl_command()[:]
+ config_index = osmo_ctrl_cmd.index('-c')
+ if config_index:
+ cfi = config_index + 1
+ osmo_ctrl_cmd[cfi] = os.path.join(confpath, osmo_ctrl_cmd[cfi])
+
+ try:
+ self.proc = osmoutil.popen_devnull(osmo_ctrl_cmd)
+ except OSError:
+ print >> sys.stderr, "Current directory: %s" % os.getcwd()
+ print >> sys.stderr, "Consider setting -b"
+ time.sleep(2)
+
+ appstring = self.ctrl_app()[2]
+ appport = self.ctrl_app()[0]
+ self.connect("127.0.0.1", appport)
+ self.next_id = 1000
+
+ def tearDown(self):
+ self.disconnect()
+ osmoutil.end_proc(self.proc)
+
+ def disconnect(self):
+ if not (self.sock is None):
+ self.sock.close()
+
+ def connect(self, host, port):
+ if verbose:
+ print "Connecting to host %s:%i" % (host, port)
+
+ retries = 30
+ while True:
+ try:
+ sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sck.setblocking(1)
+ sck.connect((host, port))
+ except IOError:
+ retries -= 1
+ if retries <= 0:
+ raise
+ time.sleep(.1)
+ continue
+ break
+ self.sock = sck
+ return sck
+
+ def send(self, data):
+ if verbose:
+ print "Sending \"%s\"" %(data)
+ data = Ctrl().add_header(data)
+ return self.sock.send(data) == len(data)
+
+ def send_set(self, var, value, id):
+ setmsg = "SET %s %s %s" %(id, var, value)
+ return self.send(setmsg)
+
+ def send_get(self, var, id):
+ getmsg = "GET %s %s" %(id, var)
+ return self.send(getmsg)
+
+ def do_set(self, var, value):
+ id = self.next_id
+ self.next_id += 1
+ self.send_set(var, value, id)
+ return self.recv_msgs()[id]
+
+ def do_get(self, var):
+ id = self.next_id
+ self.next_id += 1
+ self.send_get(var, id)
+ return self.recv_msgs()[id]
+
+ def assert_reply(self, r, mtype, var, val):
+ expect = dict(mtype=mtype, var=var, value=val)
+ result_matches = all([r.get(k) == expect.get(k) for k in expect.keys()])
+ if not result_matches:
+ print('\nError details:\nGot reply: %r\nExpected reply: %r\n' % (r, expect))
+ self.assertTrue(result_matches)
+
+ def assert_set(self, var, val, result_val):
+ r = self.do_set(var, val)
+ self.assert_reply(r, 'SET_REPLY', var, result_val)
+
+ def assert_get(self, var, result_val):
+ r = self.do_get(var)
+ self.assert_reply(r, 'GET_REPLY', var, result_val)
+
+ def recv_msgs(self):
+ responses = {}
+ data = self.sock.recv(4096)
+ while (len(data)>0):
+ (head, data) = IPA().split_combined(data)
+ answer = Ctrl().rem_header(head)
+ if verbose:
+ print "Got message:", answer
+ (mtype, id, msg) = answer.split(None, 2)
+ id = int(id)
+ rsp = {'mtype': mtype, 'id': id}
+ if mtype == "ERROR":
+ rsp['error'] = msg
+ else:
+ split = msg.split(None, 1)
+ rsp['var'] = split[0]
+ if len(split) > 1:
+ rsp['value'] = split[1]
+ else:
+ rsp['value'] = None
+ responses[id] = rsp
+
+ if verbose:
+ print "Decoded replies: ", responses
+
+ return responses
+
+
+if __name__ == '__main__':
+ import argparse
+ import sys
+
+ workdir = '.'
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-v", "--verbose", dest="verbose",
+ action="store_true", help="verbose mode")
+ parser.add_argument("-p", "--pythonconfpath", dest="p",
+ help="searchpath for config")
+ parser.add_argument("-w", "--workdir", dest="w",
+ help="Working directory")
+ args = parser.parse_args()
+
+ verbose_level = 1
+ if args.verbose:
+ verbose_level = 2
+ verbose = True
+
+ if args.w:
+ workdir = args.w
+
+ if args.p:
+ confpath = args.p
+
+ print "confpath %s, workdir %s" % (confpath, workdir)
+ os.chdir(workdir)
+ print "Running tests for specific control commands"
+ suite = unittest.TestSuite()
+ res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
+ sys.exit(len(res.errors) + len(res.failures))
+
+# vim: tabstop=4 shiftwidth=4 expandtab