aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNeels Hofmeyr <neels@hofmeyr.de>2017-10-06 02:59:54 +0200
committerNeels Hofmeyr <neels@hofmeyr.de>2017-10-11 01:48:25 +0200
commit047b45beb63553a4566eb1fe73a25bc2cb5af519 (patch)
tree1037d6de2aef1c2125baae2da2dc87e0c086532f
parent063a935062f6309aa5286bd0075eb725d343ad14 (diff)
implement subscriber vty, tests
-rw-r--r--src/Makefile.am2
-rw-r--r--src/hlr_vty.c3
-rw-r--r--src/hlr_vty_subscr.c464
-rw-r--r--src/hlr_vty_subscr.h3
-rw-r--r--tests/Makefile.am5
-rw-r--r--tests/test_subscr_create_update_show_delete.vty64
-rw-r--r--tests/vty_test_runner.py298
7 files changed, 839 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index b410ff3..fc7c653 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,6 +23,7 @@ noinst_HEADERS = \
rand.h \
ctrl.h \
hlr_vty.h \
+ hlr_vty_subscr.h \
$(NULL)
bin_PROGRAMS = \
@@ -46,6 +47,7 @@ osmo_hlr_SOURCES = \
logging.c \
rand_urandom.c \
hlr_vty.c \
+ hlr_vty_subscr.c \
$(NULL)
osmo_hlr_LDADD = \
diff --git a/src/hlr_vty.c b/src/hlr_vty.c
index 946117e..a5eb26f 100644
--- a/src/hlr_vty.c
+++ b/src/hlr_vty.c
@@ -26,6 +26,7 @@
#include <osmocom/vty/logging.h>
#include "hlr_vty.h"
+#include "hlr_vty_subscr.h"
static struct hlr *g_hlr = NULL;
@@ -135,4 +136,6 @@ void hlr_vty_init(struct hlr *hlr, const struct log_info *cat)
install_default(GSUP_NODE);
install_element(GSUP_NODE, &cfg_hlr_gsup_bind_ip_cmd);
+
+ hlr_vty_subscriber_init(hlr);
}
diff --git a/src/hlr_vty_subscr.c b/src/hlr_vty_subscr.c
new file mode 100644
index 0000000..fbd603c
--- /dev/null
+++ b/src/hlr_vty_subscr.c
@@ -0,0 +1,464 @@
+/* OsmoHLR subscriber management VTY implementation */
+/* (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
+ * All Rights Reserved
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdlib.h>
+#include <inttypes.h>
+#include <string.h>
+#include <errno.h>
+
+#include <osmocom/gsm/gsm23003.h>
+#include <osmocom/vty/vty.h>
+#include <osmocom/vty/command.h>
+#include <osmocom/core/utils.h>
+
+#include "hlr.h"
+#include "db.h"
+
+struct vty;
+
+#define osmo_hexdump_buf(buf) osmo_hexdump_nospc((void*)buf, sizeof(buf))
+
+static struct hlr *g_hlr = NULL;
+
+static void subscr_dump_full_vty(struct vty *vty, struct hlr_subscriber *subscr)
+{
+ int rc;
+ struct osmo_sub_auth_data aud2g;
+ struct osmo_sub_auth_data aud3g;
+
+ vty_out(vty, " ID: %"PRIu64"%s", subscr->id, VTY_NEWLINE);
+
+ vty_out(vty, " IMSI: %s%s", subscr->imsi ? subscr->imsi : "none", VTY_NEWLINE);
+ vty_out(vty, " MSISDN: %s%s", *subscr->msisdn ? subscr->msisdn : "none", VTY_NEWLINE);
+ if (*subscr->vlr_number)
+ vty_out(vty, " VLR number: %s%s", subscr->vlr_number, VTY_NEWLINE);
+ if (*subscr->sgsn_number)
+ vty_out(vty, " SGSN number: %s%s", subscr->sgsn_number, VTY_NEWLINE);
+ if (*subscr->sgsn_address)
+ vty_out(vty, " SGSN address: %s%s", subscr->sgsn_address, VTY_NEWLINE);
+ if (subscr->periodic_lu_timer)
+ vty_out(vty, " Periodic LU timer: %u%s", subscr->periodic_lu_timer, VTY_NEWLINE);
+ if (subscr->periodic_rau_tau_timer)
+ vty_out(vty, " Periodic RAU/TAU timer: %u%s", subscr->periodic_rau_tau_timer, VTY_NEWLINE);
+ if (subscr->lmsi)
+ vty_out(vty, " LMSI: %x%s", subscr->lmsi, VTY_NEWLINE);
+ if (!subscr->nam_cs)
+ vty_out(vty, " CS disabled%s", VTY_NEWLINE);
+ if (subscr->ms_purged_cs)
+ vty_out(vty, " CS purged%s", VTY_NEWLINE);
+ if (!subscr->nam_ps)
+ vty_out(vty, " PS disabled%s", VTY_NEWLINE);
+ if (subscr->ms_purged_ps)
+ vty_out(vty, " PS purged%s", VTY_NEWLINE);
+
+ if (!*subscr->imsi)
+ return;
+
+ OSMO_ASSERT(g_hlr);
+ rc = db_get_auth_data(g_hlr->dbc, subscr->imsi, &aud2g, &aud3g, NULL);
+
+ if (rc) {
+ if (rc == -ENOENT)
+ vty_out(vty, " No auth data%s", VTY_NEWLINE);
+ else
+ vty_out(vty, "%% Error retrieving data from database (%d)%s", rc, VTY_NEWLINE);
+ return;
+ }
+
+ if (aud2g.algo == OSMO_AUTH_ALG_NONE || aud2g.type == OSMO_AUTH_TYPE_NONE)
+ vty_out(vty, " No 2G auth data%s", VTY_NEWLINE);
+ else if (aud2g.type != OSMO_AUTH_TYPE_GSM)
+ vty_out(vty, "%% Error: 2G auth data is not of type 'GSM'%s", VTY_NEWLINE);
+ else
+ vty_out(vty, " 2G: KI=%s%s", osmo_hexdump_buf(aud2g.u.gsm.ki), VTY_NEWLINE);
+
+ if (aud3g.algo == OSMO_AUTH_ALG_NONE || aud3g.type == OSMO_AUTH_TYPE_NONE)
+ vty_out(vty, " No 3G auth data%s", VTY_NEWLINE);
+ else if (aud3g.type != OSMO_AUTH_TYPE_UMTS)
+ vty_out(vty, "%% Error: 3G auth data is not of type 'UMTS'%s", VTY_NEWLINE);
+ else {
+ vty_out(vty, " 3G:");
+ vty_out(vty, " K=%s", osmo_hexdump_buf(aud3g.u.umts.k));
+ vty_out(vty, " %s=%s", aud3g.u.umts.opc_is_op? "OP" : "OPC",
+ osmo_hexdump_buf(aud3g.u.umts.opc));
+ vty_out(vty, " IND-len=%u last-SQN=%"PRIu64, aud3g.u.umts.ind_bitlen, aud3g.u.umts.sqn);
+ vty_out(vty, VTY_NEWLINE);
+ }
+}
+
+static int get_subscr_by_argv(struct vty *vty, const char *type, const char *id, struct hlr_subscriber *subscr)
+{
+ int rc = -1;
+ if (strcmp(type, "imsi") == 0)
+ rc = db_subscr_get_by_imsi(g_hlr->dbc, id, subscr);
+ else if (strcmp(type, "msisdn") == 0)
+ rc = db_subscr_get_by_msisdn(g_hlr->dbc, id, subscr);
+ else if (strcmp(type, "id") == 0)
+ rc = db_subscr_get_by_id(g_hlr->dbc, atoll(id), subscr);
+ if (rc)
+ vty_out(vty, "%% No subscriber for %s = '%s'%s",
+ type, id, VTY_NEWLINE);
+ return rc;
+}
+
+#define SUBSCR "(imsi|msisdn|id) ID"
+#define SUBSCR_HELP "Operations on a Subscriber\n" \
+ "Identify subscriber by IMSI\n" \
+ "Identify subscriber by MSISDN (phone number)\n" \
+ "Identify subscriber by database ID\n" \
+ "Identifier for the subscriber\n"
+
+#define SUBSCR_CMD_HELP "Subscriber management commands\n"
+
+DEFUN(subscriber_show,
+ subscriber_show_cmd,
+ "subscriber show " SUBSCR,
+ SUBSCR_CMD_HELP "Show subscriber information\n" SUBSCR_HELP)
+{
+ struct hlr_subscriber subscr;
+ const char *id_type = argv[0];
+ const char *id = argv[1];
+
+ if (get_subscr_by_argv(vty, id_type, id, &subscr))
+ return CMD_WARNING;
+
+ subscr_dump_full_vty(vty, &subscr);
+ return CMD_SUCCESS;
+}
+
+DEFUN(subscriber_create,
+ subscriber_create_cmd,
+ "subscriber create imsi IMSI",
+ SUBSCR_CMD_HELP
+ "Create new subscriber\n"
+ "Create subscriber by IMSI\n"
+ "IMSI for the new subscriber\n")
+{
+ int rc;
+ struct hlr_subscriber subscr;
+ const char *imsi = argv[0];
+
+ if (!osmo_imsi_str_valid(imsi)) {
+ vty_out(vty, "%% Not a valid IMSI: %s%s", imsi, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ rc = db_subscr_create(g_hlr->dbc, imsi);
+
+ if (rc) {
+ if (rc == -EEXIST)
+ vty_out(vty, "%% Subscriber already exists for IMSI = %s%s",
+ imsi, VTY_NEWLINE);
+ else
+ vty_out(vty, "%% Error (rc=%d): cannot create subscriber for IMSI = %s%s",
+ rc, imsi, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ rc = db_subscr_get_by_imsi(g_hlr->dbc, imsi, &subscr);
+ vty_out(vty, "%% Created subscriber %s%s", imsi, VTY_NEWLINE);
+
+ subscr_dump_full_vty(vty, &subscr);
+
+ return CMD_SUCCESS;
+}
+
+DEFUN(subscriber_delete,
+ subscriber_delete_cmd,
+ "subscriber delete " SUBSCR,
+ SUBSCR_CMD_HELP "Delete subscriber from database\n" SUBSCR_HELP)
+{
+ struct hlr_subscriber subscr;
+ int rc;
+ const char *id_type = argv[0];
+ const char *id = argv[1];
+
+ /* Find out the IMSI regardless of which way the caller decided to
+ * identify the subscriber by. */
+ if (get_subscr_by_argv(vty, id_type, id, &subscr))
+ return CMD_WARNING;
+
+ rc = db_subscr_delete_by_id(g_hlr->dbc, subscr.id);
+ if (rc) {
+ vty_out(vty, "%% Error: Failed to remove subscriber for IMSI '%s'%s",
+ subscr.imsi, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ vty_out(vty, "%% Deleted subscriber for IMSI '%s'%s", subscr.imsi, VTY_NEWLINE);
+ return CMD_SUCCESS;
+}
+
+DEFUN(subscriber_msisdn,
+ subscriber_msisdn_cmd,
+ "subscriber " SUBSCR " msisdn MSISDN",
+ SUBSCR_CMD_HELP SUBSCR_HELP
+ "Set MSISDN (phone number) of the subscriber\n"
+ "MSISDN (phone number)\n")
+{
+ struct hlr_subscriber subscr;
+ const char *id_type = argv[0];
+ const char *id = argv[1];
+ const char *msisdn = argv[2];
+
+ if (strlen(msisdn) > sizeof(subscr.msisdn) - 1) {
+ vty_out(vty, "%% MSISDN is too long, max. %zu characters are allowed%s",
+ sizeof(subscr.msisdn)-1, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ if (!osmo_msisdn_str_valid(msisdn)) {
+ vty_out(vty, "%% MSISDN invalid: '%s'%s", msisdn, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ if (get_subscr_by_argv(vty, id_type, id, &subscr))
+ return CMD_WARNING;
+
+ if (db_subscr_update_msisdn_by_imsi(g_hlr->dbc, subscr.imsi, msisdn)) {
+ vty_out(vty, "%% Error: cannot update MSISDN for subscriber IMSI='%s'%s",
+ subscr.imsi, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ vty_out(vty, "%% Updated MSISDN for subscriber IMSI='%s' to '%s'%s",
+ subscr.imsi, msisdn, VTY_NEWLINE);
+ return CMD_SUCCESS;
+}
+
+static bool is_hexkey_valid(struct vty *vty, const char *label,
+ const char *hex_str, int minlen, int maxlen)
+{
+ if (osmo_is_hexstr(hex_str, minlen * 2, maxlen * 2, true))
+ return true;
+ vty_out(vty, "%% Invalid value for %s: '%s'%s", label, hex_str, VTY_NEWLINE);
+ return false;
+}
+
+#define AUTH_ALG_TYPES_2G "(comp128v1|comp128v2|comp128v3|xor)"
+#define AUTH_ALG_TYPES_2G_HELP \
+ "Use COMP128v1 algorithm\n" \
+ "Use COMP128v2 algorithm\n" \
+ "Use COMP128v3 algorithm\n" \
+ "Use XOR algorithm\n"
+
+#define AUTH_ALG_TYPES_3G "(milenage)"
+#define AUTH_ALG_TYPES_3G_HELP \
+ "Use Milenage algorithm\n"
+
+#define A38_XOR_MIN_KEY_LEN 12
+#define A38_XOR_MAX_KEY_LEN 16
+#define A38_COMP128_KEY_LEN 16
+
+#define MILENAGE_KEY_LEN 16
+
+static bool auth_algo_parse(const char *alg_str, enum osmo_auth_algo *algo,
+ int *minlen, int *maxlen)
+{
+ if (!strcasecmp(alg_str, "none")) {
+ *algo = OSMO_AUTH_ALG_NONE;
+ *minlen = *maxlen = 0;
+ } else if (!strcasecmp(alg_str, "comp128v1")) {
+ *algo = OSMO_AUTH_ALG_COMP128v1;
+ *minlen = *maxlen = A38_COMP128_KEY_LEN;
+ } else if (!strcasecmp(alg_str, "comp128v2")) {
+ *algo = OSMO_AUTH_ALG_COMP128v2;
+ *minlen = *maxlen = A38_COMP128_KEY_LEN;
+ } else if (!strcasecmp(alg_str, "comp128v3")) {
+ *algo = OSMO_AUTH_ALG_COMP128v3;
+ *minlen = *maxlen = A38_COMP128_KEY_LEN;
+ } else if (!strcasecmp(alg_str, "xor")) {
+ *algo = OSMO_AUTH_ALG_XOR;
+ *minlen = A38_XOR_MIN_KEY_LEN;
+ *maxlen = A38_XOR_MAX_KEY_LEN;
+ } else if (!strcasecmp(alg_str, "milenage")) {
+ *algo = OSMO_AUTH_ALG_MILENAGE;
+ *minlen = *maxlen = MILENAGE_KEY_LEN;
+ } else
+ return false;
+ return true;
+}
+
+DEFUN(subscriber_no_aud2g,
+ subscriber_no_aud2g_cmd,
+ "subscriber " SUBSCR " aud2g none",
+ SUBSCR_CMD_HELP SUBSCR_HELP "Set 2G authentication data\n"
+ "Delete 2G authentication data\n")
+{
+ struct hlr_subscriber subscr;
+ int rc;
+ const char *id_type = argv[0];
+ const char *id = argv[1];
+ struct sub_auth_data_str aud = {
+ .type = OSMO_AUTH_TYPE_GSM,
+ .algo = OSMO_AUTH_ALG_NONE,
+ };
+
+ if (get_subscr_by_argv(vty, id_type, id, &subscr))
+ return CMD_WARNING;
+
+ rc = db_subscr_update_aud_by_id(g_hlr->dbc, subscr.id, &aud);
+
+ if (rc) {
+ vty_out(vty, "%% Error: cannot disable 2G auth data for IMSI='%s'%s",
+ subscr.imsi, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+ return CMD_SUCCESS;
+}
+
+DEFUN(subscriber_aud2g,
+ subscriber_aud2g_cmd,
+ "subscriber " SUBSCR " aud2g " AUTH_ALG_TYPES_2G " KI",
+ SUBSCR_CMD_HELP SUBSCR_HELP "Set 2G authentication data\n"
+ AUTH_ALG_TYPES_2G_HELP "Encryption Key Ki\n")
+{
+ struct hlr_subscriber subscr;
+ int rc;
+ int minlen = 0;
+ int maxlen = 0;
+ const char *id_type = argv[0];
+ const char *id = argv[1];
+ const char *alg_type = argv[2];
+ const char *ki = argv[3];
+ struct sub_auth_data_str aud2g = {
+ .type = OSMO_AUTH_TYPE_GSM,
+ .u.gsm.ki = ki,
+ };
+
+ if (!auth_algo_parse(alg_type, &aud2g.algo, &minlen, &maxlen)) {
+ vty_out(vty, "%% Unknown auth algorithm: '%s'%s", alg_type, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ if (!is_hexkey_valid(vty, "KI", aud2g.u.gsm.ki, minlen, maxlen))
+ return CMD_WARNING;
+
+ if (get_subscr_by_argv(vty, id_type, id, &subscr))
+ return CMD_WARNING;
+
+ rc = db_subscr_update_aud_by_id(g_hlr->dbc, subscr.id, &aud2g);
+
+ if (rc) {
+ vty_out(vty, "%% Error: cannot set 2G auth data for IMSI='%s'%s",
+ subscr.imsi, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+ return CMD_SUCCESS;
+}
+
+DEFUN(subscriber_no_aud3g,
+ subscriber_no_aud3g_cmd,
+ "subscriber " SUBSCR " aud3g none",
+ SUBSCR_CMD_HELP SUBSCR_HELP
+ "Set UMTS authentication data (3G, and 2G with UMTS AKA)\n"
+ "Delete 3G authentication data\n")
+{
+ struct hlr_subscriber subscr;
+ int rc;
+ const char *id_type = argv[0];
+ const char *id = argv[1];
+ struct sub_auth_data_str aud = {
+ .type = OSMO_AUTH_TYPE_UMTS,
+ .algo = OSMO_AUTH_ALG_NONE,
+ };
+
+ if (get_subscr_by_argv(vty, id_type, id, &subscr))
+ return CMD_WARNING;
+
+ rc = db_subscr_update_aud_by_id(g_hlr->dbc, subscr.id, &aud);
+
+ if (rc) {
+ vty_out(vty, "%% Error: cannot disable 3G auth data for IMSI='%s'%s",
+ subscr.imsi, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+ return CMD_SUCCESS;
+}
+
+DEFUN(subscriber_aud3g,
+ subscriber_aud3g_cmd,
+ "subscriber " SUBSCR " aud3g " AUTH_ALG_TYPES_3G
+ " K"
+ " (op|opc) OP_C"
+ " [ind_bitlen] [<0-" OSMO_STRINGIFY(OSMO_MILENAGE_IND_BITLEN_MAX) ">]",
+ SUBSCR_CMD_HELP SUBSCR_HELP "Set 3G authentication data\n"
+ AUTH_ALG_TYPES_3G_HELP
+ "Encryption Key K\n"
+ "Set OP key\n" "Set OPC key\n" "OP or OPC key\n"
+ "Set IND bit length\n" "IND bit length (default: 5)\n")
+{
+ struct hlr_subscriber subscr;
+ int minlen = 0;
+ int maxlen = 0;
+ int rc;
+ const char *id_type = argv[0];
+ const char *id = argv[1];
+ const char *alg_type = argv[2];
+ const char *k = argv[3];
+ bool opc_is_op = (strcasecmp("op", argv[4]) == 0);
+ const char *op_opc = argv[5];
+ int ind_bitlen = argc > 7? atoi(argv[7]) : 5;
+ struct sub_auth_data_str aud3g = {
+ .type = OSMO_AUTH_TYPE_UMTS,
+ .u.umts = {
+ .k = k,
+ .opc_is_op = opc_is_op,
+ .opc = op_opc,
+ .ind_bitlen = ind_bitlen,
+ },
+ };
+
+ if (!auth_algo_parse(alg_type, &aud3g.algo, &minlen, &maxlen)) {
+ vty_out(vty, "%% Unknown auth algorithm: '%s'%s", alg_type, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+
+ if (!is_hexkey_valid(vty, "K", aud3g.u.umts.k, minlen, maxlen))
+ return CMD_WARNING;
+
+ if (!is_hexkey_valid(vty, opc_is_op ? "OPC" : "OP", aud3g.u.umts.opc,
+ MILENAGE_KEY_LEN, MILENAGE_KEY_LEN))
+ return CMD_WARNING;
+
+ if (get_subscr_by_argv(vty, id_type, id, &subscr))
+ return CMD_WARNING;
+
+ rc = db_subscr_update_aud_by_id(g_hlr->dbc, subscr.id, &aud3g);
+
+ if (rc) {
+ vty_out(vty, "%% Error: cannot set 3G auth data for IMSI='%s'%s",
+ subscr.imsi, VTY_NEWLINE);
+ return CMD_WARNING;
+ }
+ return CMD_SUCCESS;
+}
+
+void hlr_vty_subscriber_init(struct hlr *hlr)
+{
+ g_hlr = hlr;
+
+ install_element_ve(&subscriber_show_cmd);
+ install_element(ENABLE_NODE, &subscriber_create_cmd);
+ install_element(ENABLE_NODE, &subscriber_delete_cmd);
+ install_element(ENABLE_NODE, &subscriber_msisdn_cmd);
+ install_element(ENABLE_NODE, &subscriber_no_aud2g_cmd);
+ install_element(ENABLE_NODE, &subscriber_aud2g_cmd);
+ install_element(ENABLE_NODE, &subscriber_no_aud3g_cmd);
+ install_element(ENABLE_NODE, &subscriber_aud3g_cmd);
+}
diff --git a/src/hlr_vty_subscr.h b/src/hlr_vty_subscr.h
new file mode 100644
index 0000000..841db5a
--- /dev/null
+++ b/src/hlr_vty_subscr.h
@@ -0,0 +1,3 @@
+#pragma once
+
+void hlr_vty_subscriber_init(struct hlr *hlr);
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 0b625f5..09370bd 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -27,6 +27,7 @@ EXTRA_DIST = \
$(srcdir)/package.m4 \
$(TESTSUITE) \
ctrl_test_runner.py \
+ vty_test_runner.py \
$(NULL)
TESTSUITE = $(srcdir)/testsuite
@@ -37,7 +38,11 @@ DISTCLEANFILES = \
if ENABLE_EXT_TESTS
python-tests: $(BUILT_SOURCES)
+ $(PYTHON) $(srcdir)/vty_test_runner.py -w $(abs_top_builddir) -v
$(PYTHON) $(srcdir)/ctrl_test_runner.py -w $(abs_top_builddir) -v
+
+vty-test-update: $(BUILT_SOURCES)
+ $(PYTHON) $(srcdir)/vty_test_runner.py -w $(abs_top_builddir) -v --update
else
python-tests: $(BUILT_SOURCES)
echo "Not running python-based tests (determined at configure-time)"
diff --git a/tests/test_subscr_create_update_show_delete.vty b/tests/test_subscr_create_update_show_delete.vty
new file mode 100644
index 0000000..f8bc483
--- /dev/null
+++ b/tests/test_subscr_create_update_show_delete.vty
@@ -0,0 +1,64 @@
+OsmoHLR> enable
+
+OsmoHLR# subscriber show imsi 123456789023000
+% No subscriber for imsi = '123456789023000'
+OsmoHLR# subscriber show id 1
+% No subscriber for id = '1'
+OsmoHLR# subscriber show msisdn 12345
+% No subscriber for msisdn = '12345'
+
+OsmoHLR# subscriber create imsi 1234567890230001
+% Not a valid IMSI: 1234567890230001
+OsmoHLR# subscriber create imsi 12345678902300x
+% Not a valid IMSI: 12345678902300x
+OsmoHLR# subscriber create imsi 12345
+% Not a valid IMSI: 12345
+
+OsmoHLR# subscriber create imsi 123456789023000
+% Created subscriber 123456789023000
+ ID: 1
+ IMSI: 123456789023000
+ MSISDN: none
+ No auth data
+
+OsmoHLR# subscriber show imsi 123456789023000
+ ID: 1
+ IMSI: 123456789023000
+ MSISDN: none
+ No auth data
+OsmoHLR# subscriber show id 1
+ ID: 1
+ IMSI: 123456789023000
+ MSISDN: none
+ No auth data
+OsmoHLR# subscriber show msisdn 12345
+% No subscriber for msisdn = '12345'
+
+OsmoHLR# subscriber imsi 123456789023000 msisdn 12345
+% Updated MSISDN for subscriber IMSI='123456789023000' to '12345'
+
+OsmoHLR# subscriber show imsi 123456789023000
+ ID: 1
+ IMSI: 123456789023000
+ MSISDN: 12345
+ No auth data
+OsmoHLR# subscriber show id 1
+ ID: 1
+ IMSI: 123456789023000
+ MSISDN: 12345
+ No auth data
+OsmoHLR# subscriber show msisdn 12345
+ ID: 1
+ IMSI: 123456789023000
+ MSISDN: 12345
+ No auth data
+
+OsmoHLR# subscriber delete imsi 123456789023000
+% Deleted subscriber for IMSI '123456789023000'
+
+OsmoHLR# subscriber show imsi 123456789023000
+% No subscriber for imsi = '123456789023000'
+OsmoHLR# subscriber show id 1
+% No subscriber for id = '1'
+OsmoHLR# subscriber show msisdn 12345
+% No subscriber for msisdn = '12345'
diff --git a/tests/vty_test_runner.py b/tests/vty_test_runner.py
new file mode 100644
index 0000000..d7f1e14
--- /dev/null
+++ b/tests/vty_test_runner.py
@@ -0,0 +1,298 @@
+#!/usr/bin/env python2
+
+# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
+# (C) 2013 by Holger Hans Peter Freyther
+# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os, sys
+import time
+import socket
+import subprocess
+import traceback
+import re
+
+import osmopy.obscvty as obscvty
+import osmopy.osmoutil as osmoutil
+
+# to be able to find VTY transcript files
+srcdir = sys.path[0]
+
+# to be able to find $top_srcdir/doc/...
+confdir = os.path.join(srcdir, '..')
+
+def run_tests(test_objects, pick_name_snippets=None, vty_transcript_update=False):
+ tests = []
+ for o in test_objects:
+ for name in dir(o):
+ if not name.startswith('test'):
+ continue
+ member = getattr(o, name)
+ if not hasattr(member, '__call__'):
+ continue
+ if pick_name_snippets:
+ if not any([n.lower() in name.lower() for n in pick_name_snippets]):
+ continue
+ tests.append((o, member, ()))
+
+ if hasattr(o, 'vty_transcripts'):
+ for transcript_file in o.vty_transcripts():
+ tests.append((o, o.verify_vty_transcript_file,
+ (transcript_file, vty_transcript_update)))
+
+ passed = []
+ failed = []
+ for o, t, args in tests:
+ name = '%s.%s(%s)' % (o.__class__.__name__, t.__name__, ', '.join([str(a) for a in args]))
+
+ o.setUp()
+ sys.stdout.flush()
+ sys.stderr.flush()
+ try:
+ t(*args)
+ passed.append(name)
+ except BaseException:
+ traceback.print_exc()
+ failed.append(name)
+ sys.stdout.flush()
+ sys.stderr.flush()
+ o.tearDown()
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ for p in passed:
+ print 'pass:', p
+ for f in failed:
+ print 'FAIL:', f
+ assert not failed
+
+
+class TestVTYBase(object):
+ bin_file = None
+ conf_file = None
+ vty_port = None
+ vty_prompt = None
+ bin_args = None
+
+ def checkForEndAndExit(self):
+ res = self.vty.command('list')
+ assert res.find(' exit\r') > 0
+ assert res.find(' end\r') > 0
+
+ def setUp(self):
+ global confdir
+ o = None
+ e = None
+
+ cmd = [self.bin_file,
+ '-c', os.path.join(confdir, self.conf_file)]
+ if self.bin_args:
+ cmd.extend(self.bin_args)
+ try:
+ print "Launching: cd %r; %s" % (os.getcwd(), ' '.join([repr(c) for c in cmd]))
+ self.proc = subprocess.Popen(cmd)
+ except OSError:
+ print >> sys.stderr, 'Current directory: %s' % os.getcwd()
+ raise
+
+ self.vty = obscvty.VTYInteract(self.vty_prompt, '127.0.0.1', self.vty_port)
+
+ def tearDown(self):
+ if self.vty:
+ self.vty._close_socket()
+ self.vty = None
+ osmoutil.end_proc(self.proc)
+
+ def verify_vty_transcript_file(self, transcript_file, update=False):
+ with open(transcript_file, 'r') as f:
+ content = f.read()
+
+ try:
+ result = self.verify_vty_transcript(content, ignore_errors=update);
+ except:
+ print >> sys.stderr, 'Error during VTY transcript file %r' % transcript_file
+ sys.stderr.flush()
+ raise
+
+ if not update:
+ return
+ content = '\n'.join(result)
+ with open(transcript_file, 'w') as f:
+ f.write(content)
+
+ def verify_vty_transcript(self, transcript, ignore_errors=False):
+ ''''transcript' is a screenshot of a VTY session. Detect prompts and
+ commands entered, feed commands to self.vty and verify that it
+ returns the expected results.'''
+ re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.vty_prompt)
+
+ class Step:
+ expect_node = None # e.g. '(config-net)'
+ expect_prompt = None # '>' or '#'
+ command = None # 'show running-config'
+ result = None # array of strings of expected response lines
+ leading_blanks = None # nr of blank lines before this prompt
+
+ def command_str(step):
+ return '%s%s%s %s' % (self.vty_prompt, step.expect_node or '',
+ step.expect_prompt, step.command)
+ def __str__(self):
+ return '%s\n%s' % (self.command_str(), '\n'.join(step.result))
+
+ # parse steps
+ steps = []
+ step = None
+ blank_lines = 0
+ for line in transcript.splitlines():
+ if not line:
+ blank_lines += 1
+ continue
+ m = re_prompt.match(line)
+ if m:
+ if step:
+ steps.append(step)
+ step = Step()
+ step.expect_node = m.group(1)
+ step.expect_prompt = m.group(2)
+ step.command = m.group(3)
+ step.result = []
+ step.leading_blanks = blank_lines
+ elif step:
+ step.result.append(line)
+ blank_lines = 0
+ if step:
+ steps.append(step)
+ step = None
+
+ actual_result = []
+
+ # run steps
+ step_nr = 0
+ for step in steps:
+ step_nr += 1
+ try:
+ if step.expect_node and not ignore_errors:
+ if '(%s)' % self.vty.node() != step.expect_node:
+ raise Exception('Node prompt mismatch:\n'
+ 'Expected %s\ngot %r'
+ % (step.expect_node, self.vty.node()))
+
+ res = self.vty.command(step.command)
+ res = res.splitlines()
+ if step.command.endswith('?') and res[-1] == '% Command incomplete.':
+ res = res[:-2]
+
+ if step.leading_blanks:
+ actual_result.extend([''] * step.leading_blanks)
+ actual_result.append(step.command_str())
+ actual_result.extend(res)
+
+ if ignore_errors:
+ continue
+ if res == step.result:
+ continue
+ raise Exception('Result mismatch:\n\nExpected:\n[\n%s\n]\n\nGot:\n[\n%s\n%s\n]'
+ % (step, step.command_str(), '\n'.join(res)))
+ except:
+ print >> sys.stderr, 'Error during VTY transcript step %d:\n[\n%s\n]' % (step_nr, step)
+ sys.stderr.flush()
+ raise
+
+ # final line ending
+ actual_result.append('')
+ return actual_result
+
+class TestVTYHLR(TestVTYBase):
+ HLR_DB = 'hlr_vty_test.db'
+ HLR_SQL = '%s/sql/hlr.sql' % confdir
+
+ bin_file = './src/osmo-hlr'
+ conf_file = 'doc/examples/osmo-hlr.cfg'
+ vty_port = 4258
+ vty_prompt = 'OsmoHLR'
+ bin_args = ('-l', HLR_DB)
+
+ def setUp(self):
+ print('\n')
+ print(os.getcwd())
+ subprocess.call(('rm', '-f', self.HLR_DB))
+ assert subprocess.call('sqlite3 %s < %s' % (self.HLR_DB, self.HLR_SQL), shell=True) == 0
+ super(TestVTYHLR, self).setUp()
+
+ def tearDown(self):
+ super(TestVTYHLR, self).tearDown()
+ os.unlink(self.HLR_DB)
+
+ def test_vty_tree(self):
+ self.vty.enable()
+ assert self.vty.verify('configure terminal', [''])
+ assert self.vty.node() == 'config'
+ self.checkForEndAndExit()
+ assert self.vty.verify('hlr', [''])
+ assert self.vty.node() == 'config-hlr'
+ self.checkForEndAndExit()
+ assert self.vty.verify('gsup', [''])
+ assert self.vty.node() == 'config-hlr-gsup'
+ self.checkForEndAndExit()
+ assert self.vty.verify('exit', [''])
+ assert self.vty.node() == 'config-hlr'
+ assert self.vty.verify('exit', [''])
+ assert self.vty.node() == 'config'
+ assert self.vty.verify('exit', [''])
+ assert self.vty.node() is None
+
+ def vty_transcripts(self):
+ for f in os.listdir(srcdir):
+ if f.endswith('.vty'):
+ yield os.path.relpath(os.path.join(srcdir, f))
+
+if __name__ == '__main__':
+ import argparse
+ import sys
+
+ workdir = '.'
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', dest='verbose',
+ action='store_true', help='verbose mode')
+ parser.add_argument('-p', '--pythonconfpath', dest='p',
+ help='searchpath for config')
+ parser.add_argument('-w', '--workdir', dest='w',
+ help='Working directory')
+ parser.add_argument('-u', '--update', dest='update', action='store_true',
+ help='Do not verify VTY transcripts, just show their results,'
+ ' useful to update the VTY transcript.')
+ parser.add_argument('test_name', nargs='*', help='(parts of) test names to run, case-insensitive')
+ args = parser.parse_args()
+
+ verbose_level = 1
+ if args.verbose:
+ verbose_level = 2
+
+ if args.w:
+ workdir = args.w
+
+ if args.p:
+ confdir = args.p
+
+ print 'confdir %s, workdir %s' % (confdir, workdir)
+ os.chdir(workdir)
+ print 'Running tests for specific VTY commands'
+
+ run_tests((TestVTYHLR(),
+ ),
+ pick_name_snippets=args.test_name,
+ vty_transcript_update=args.update)
+
+# vim: tabstop=4 shiftwidth=4 expandtab nocin ai