aboutsummaryrefslogtreecommitdiffstats
path: root/osmopy
diff options
context:
space:
mode:
Diffstat (limited to 'osmopy')
-rw-r--r--osmopy/__init__.py14
-rwxr-xr-xosmopy/osmo_ctrl.py120
-rw-r--r--osmopy/osmo_interact_common.py469
-rwxr-xr-xosmopy/osmo_interact_ctrl.py100
-rwxr-xr-xosmopy/osmo_interact_vty.py180
-rwxr-xr-xosmopy/osmo_verify_transcript_ctrl.py58
-rwxr-xr-xosmopy/osmo_verify_transcript_vty.py67
-rw-r--r--osmopy/osmodumpdoc.py96
-rw-r--r--osmopy/osmotestconfig.py220
-rw-r--r--osmopy/osmotestvty.py102
-rwxr-xr-xosmopy/soap.py188
-rwxr-xr-xosmopy/twisted_ipa.py384
12 files changed, 2 insertions, 1996 deletions
diff --git a/osmopy/__init__.py b/osmopy/__init__.py
index 3fd197f..e3bf016 100644
--- a/osmopy/__init__.py
+++ b/osmopy/__init__.py
@@ -1,14 +1,4 @@
#!/usr/bin/env python
-__version__ = '0.0.3'
+__version__ = '0.0.4'
-__all__ = ['obscvty', 'osmodumpdoc', 'osmotestconfig', 'osmotestvty',
- 'osmoutil',
- 'osmo_ipa',
- 'osmo_ctrl',
- 'soap',
- 'twisted_ipa',
- 'osmo_interact_common',
- 'osmo_interact_vty',
- 'osmo_interact_ctrl',
- 'osmo_verify_transcript_vty',
- 'osmo_verify_transcript_ctrl']
+__all__ = ['obscvty', 'osmoutil', 'osmo_ipa']
diff --git a/osmopy/osmo_ctrl.py b/osmopy/osmo_ctrl.py
deleted file mode 100755
index 2b8c4be..0000000
--- a/osmopy/osmo_ctrl.py
+++ /dev/null
@@ -1,120 +0,0 @@
-#!/usr/bin/env python2
-# -*- mode: python-mode; py-indent-tabs-mode: nil -*-
-"""
-/*
- * Copyright (C) 2016 sysmocom s.f.m.c. GmbH
- *
- * All Rights Reserved
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-"""
-
-from optparse import OptionParser
-from osmopy.osmo_ipa import Ctrl
-import socket
-
-verbose = False
-
-def connect(host, port):
- if verbose:
- print "Connecting to host %s:%i" % (host, port)
-
- sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sck.setblocking(1)
- sck.connect((host, port))
- return sck
-
-def do_set_get(sck, var, value = None):
- (r, c) = Ctrl().cmd(var, value)
- sck.send(c)
- answer = Ctrl().rem_header(sck.recv(4096))
- return (answer,) + Ctrl().verify(answer, r, var, value)
-
-def set_var(sck, var, val):
- (a, _, _) = do_set_get(sck, var, val)
- return a
-
-def get_var(sck, var):
- (_, _, v) = do_set_get(sck, var)
- return v
-
-def _leftovers(sck, fl):
- """
- Read outstanding data if any according to flags
- """
- try:
- data = sck.recv(1024, fl)
- except socket.error as (s_errno, strerror):
- return False
- if len(data) != 0:
- tail = data
- while True:
- (head, tail) = Ctrl().split_combined(tail)
- print "Got message:", Ctrl().rem_header(head)
- if len(tail) == 0:
- break
- return True
- return False
-
-if __name__ == '__main__':
- parser = OptionParser("Usage: %prog [options] var [value]")
- parser.add_option("-d", "--host", dest="host",
- help="connect to HOST", metavar="HOST")
- parser.add_option("-p", "--port", dest="port", type="int",
- help="use PORT", metavar="PORT", default=4249)
- parser.add_option("-g", "--get", action="store_true",
- dest="cmd_get", help="perform GET operation")
- parser.add_option("-s", "--set", action="store_true",
- dest="cmd_set", help="perform SET operation")
- parser.add_option("-v", "--verbose", action="store_true",
- dest="verbose", help="be verbose", default=False)
- parser.add_option("-m", "--monitor", action="store_true",
- dest="monitor", help="monitor the connection for traps", default=False)
-
- (options, args) = parser.parse_args()
-
- verbose = options.verbose
-
- if options.cmd_set and options.cmd_get:
- parser.error("Get and set options are mutually exclusive!")
-
- if not (options.cmd_get or options.cmd_set or options.monitor):
- parser.error("One of -m, -g, or -s must be set")
-
- if not (options.host):
- parser.error("Destination host and port required!")
-
- sock = connect(options.host, options.port)
-
- if options.cmd_set:
- if len(args) < 2:
- parser.error("Set requires var and value arguments")
- _leftovers(sock, socket.MSG_DONTWAIT)
- print "Got message:", set_var(sock, args[0], ' '.join(args[1:]))
-
- if options.cmd_get:
- if len(args) != 1:
- parser.error("Get requires the var argument")
- _leftovers(sock, socket.MSG_DONTWAIT)
- (a, _, _) = do_set_get(sock, args[0])
- print "Got message:", a
-
- if options.monitor:
- while True:
- if not _leftovers(sock, 0):
- print "Connection is gone."
- break
- sock.close()
diff --git a/osmopy/osmo_interact_common.py b/osmopy/osmo_interact_common.py
deleted file mode 100644
index 5efc22d..0000000
--- a/osmopy/osmo_interact_common.py
+++ /dev/null
@@ -1,469 +0,0 @@
-#!/usr/bin/env python3
-#
-# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
-# All rights reserved.
-#
-# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-'''
-Common code for osmo_interact_vty.py and osmo_interact_ctrl.py.
-This implements all of application interaction, piping and verification.
-osmo_interact_{vty,ctrl}.py plug VTY and CTRL interface specific bits.
-'''
-
-import argparse
-import sys
-import os
-import subprocess
-import time
-import traceback
-import socket
-import shlex
-import re
-
-
-class Interact:
-
- class StepBase:
- command = None
- result = None
- leading_blanks = None
-
- def __init__(self):
- self.result = []
-
- def verify_interact_state(self, interact_instance):
- # for example to verify that the last VTY prompt received shows the
- # right node.
- pass
-
- def command_str(self, interact_instance=None):
- return self.command
-
- def __str__(self):
- return '%s\n%s' % (self.command_str(), '\n'.join(self.result))
-
- @staticmethod
- def is_next_step(line, interact_instance):
- assert not "implemented by InteractVty.VtyStep and InteractCtrl.CtrlStep"
-
- socket = None
-
- def __init__(self, step_class, port, host, verbose=False, update=False):
- '''
- host is the hostname to connect to.
- port is the CTRL port to connect on.
- '''
- self.Step = step_class
- self.port = port
- self.host = host
- self.verbose = verbose
- self.update = update
-
- if not port:
- raise Exception("You need to provide port number to connect to")
-
- def connect(self):
- assert self.socket is None
- retries = 30
- took = 0
- while True:
- took += 1
- try:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.setblocking(1)
- self.socket.connect((self.host, int(self.port)))
- except IOError:
- retries -= 1
- if retries <= 0:
- raise
- time.sleep(.1)
- continue
- break
-
- def close(self):
- if self.socket is None:
- return
- self.socket.close()
- self.socket = None
-
- def command(self, command):
- assert not "implemented separately by InteractVty and InteractCtrl"
-
- def verify_transcript_file(self, transcript_file):
- with open(transcript_file, 'r') as f:
- content = f.read()
-
- try:
- result = self.verify_transcript(content)
- except:
- print('Error while verifying transcript file %r' % transcript_file, file=sys.stderr)
- sys.stderr.flush()
- raise
-
- if not self.update:
- return
- content = '\n'.join(result)
- with open(transcript_file, 'w') as f:
- f.write(content)
-
- def verify_transcript(self, transcript):
- ''''
- transcript is a "screenshot" of a session, a multi-line string
- including commands and expected results.
- Feed commands to self.command() and verify the expected results.
- '''
-
- # parse steps
- steps = []
- step = None
- blank_lines = 0
- for line in transcript.splitlines():
- if not line:
- blank_lines += 1
- continue
- next_step_started = self.Step.is_next_step(line, self)
- if next_step_started:
- if step:
- steps.append(step)
- step = next_step_started
- step.leading_blanks = blank_lines
- blank_lines = 0
- elif step:
- # we only count blank lines directly preceding the start of a
- # next step. Insert blank lines in the middle of a response
- # back into the response:
- if blank_lines:
- step.result.extend([''] * blank_lines)
- blank_lines = 0
- step.result.append(line)
- if step:
- steps.append(step)
- step = None
-
- actual_result = []
-
- # run steps
- step_nr = 0
- for step in steps:
- step_nr += 1
- try:
- if self.verbose:
- if step.leading_blanks:
- print('\n' * step.leading_blanks, end='')
- print(step.command_str())
- sys.stdout.flush()
-
- step.verify_interact_state(self)
-
- res = self.command(step.command)
-
- if self.verbose:
- sys.stderr.flush()
- sys.stdout.flush()
- print('\n'.join(res))
- sys.stdout.flush()
-
- if step.leading_blanks:
- actual_result.extend([''] * step.leading_blanks)
- actual_result.append(step.command_str(self))
-
- match_result = self.match_lines(step.result, res)
-
- if self.update:
- if match_result is True:
- # preserve any wildcards
- actual_result.extend(step.result)
- else:
- # mismatch, take exactly what came in
- actual_result.extend(res)
- continue
- if match_result is not True:
- raise Exception('Result mismatch:\n%s\n\nExpected:\n[\n%s\n]\n\nGot:\n[\n%s\n%s\n]'
- % (match_result, step, step.command_str(), '\n'.join(res)))
- except:
- print('Error during transcript step %d:\n[\n%s\n]' % (step_nr, step),
- file=sys.stderr)
- sys.stderr.flush()
- raise
-
- # final line ending
- actual_result.append('')
- return actual_result
-
- @staticmethod
- def match_lines(expect, got):
- '''
- Match two lists of strings, allowing certain wildcards:
- - In 'expect', if a line is exactly '...', it matches any number of
- arbitrary lines in 'got'; the implementation is trivial and skips
- lines to the first occurence in 'got' that continues after '...'.
- - If an 'expect' line is '... !regex', it matches any number of
- lines like '...', but the given regex must not match any of those
- lines.
-
- Return 'True' on match, or a string describing the mismatch.
- '''
- def match_line(expect_line, got_line):
- return expect_line == got_line
-
- ANY = '...'
- ANY_EXCEPT = '... !'
-
- e = 0
- g = 0
- while e < len(expect):
- if expect[e] == ANY or expect[e].startswith(ANY_EXCEPT):
- wildcard = expect[e]
- e += 1
- g_end = g
-
- if e >= len(expect):
- # anything left in 'got' is accepted.
- g_end = len(got)
-
- # look for the next occurence of the expected line in 'got'
- while g_end < len(got) and not match_line(expect[e], got[g_end]):
- g_end += 1
-
- if wildcard == ANY:
- # no restrictions on lines
- g = g_end
-
- elif wildcard.startswith(ANY_EXCEPT):
- except_re = re.compile(wildcard[len(ANY_EXCEPT):])
- while g < g_end:
- if except_re.search(got[g]):
- return ('Got forbidden line for wildcard %r:'
- ' did not expect %r in line %d of response'
- % (wildcard, got[g], g))
- g += 1
-
- continue
-
- if g >= len(got):
- return 'Cannot find line %r' % expect[e]
-
- if not match_line(expect[e], got[g]):
- return 'Mismatch:\nExpect:\n%r\nGot:\n%r' % (expect[e], got[g])
-
- e += 1
- g += 1
-
- if g < len(got):
- return 'Did not expect line %r' % got[g]
- return True
-
- def feed_commands(self, output, command_strs):
- for command_str in command_strs:
- for command in command_str.splitlines():
- res = self.command(command)
- output.write('\n'.join(res))
- output.write('\n')
-
-def end_process(proc, quiet=False):
- if not proc:
- return
-
- rc = proc.poll()
- if rc is not None:
- if not quiet:
- print('Process has already terminated with', rc)
- proc.wait()
- return
-
- proc.terminate()
- time_to_wait_for_term = 5
- wait_step = 0.001
- waited_time = 0
- while True:
- # poll returns None if proc is still running
- if proc.poll() is not None:
- break
- waited_time += wait_step
- # make wait_step approach 1.0
- wait_step = (1. + 5. * wait_step) / 6.
- if waited_time >= time_to_wait_for_term:
- break
- time.sleep(wait_step)
-
- if proc.poll() is None:
- # termination seems to be slower than that, let's just kill
- proc.kill()
- if not quiet:
- print("Killed child process")
- elif waited_time > .002:
- if not quiet:
- print("Terminating took %.3fs" % waited_time)
- proc.wait()
-
-class Application:
- proc = None
- _devnull = None
-
- @staticmethod
- def devnull():
- if Application._devnull is None:
- Application._devnull = open(os.devnull, 'w')
- return Application._devnull
-
- def __init__(self, run_app_str, purge_output=True, quiet=False):
- self.command_tuple = shlex.split(run_app_str)
- self.purge_output = purge_output
- self.quiet = quiet
-
- def run(self):
- out_err = None
- if self.purge_output:
- out_err = Application.devnull()
-
- if not self.quiet:
- print('Launching: cd %r; %s' % (os.getcwd(), ' '.join(self.command_tuple)))
- self.proc = subprocess.Popen(self.command_tuple, stdout=out_err, stderr=out_err)
-
- def stop(self):
- end_process(self.proc, self.quiet)
-
-def verify_application(run_app_str, interact, transcript_file, verbose):
- passed = None
- application = None
-
- sys.stdout.flush()
- sys.stderr.flush()
-
- if run_app_str:
- application = Application(run_app_str, purge_output=not verbose)
- application.run()
-
- try:
- interact.connect()
- interact.verify_transcript_file(transcript_file)
- passed = True
- except:
- traceback.print_exc()
- passed = False
- interact.close()
-
- if application:
- application.stop()
-
- sys.stdout.flush()
- sys.stderr.flush()
-
- return passed
-
-def common_parser():
- parser = argparse.ArgumentParser()
- parser.add_argument('-r', '--run', dest='run_app_str',
- help='command to run to launch application to test,'
- ' including command line arguments. If omitted, no'
- ' application is launched.')
- parser.add_argument('-p', '--port', dest='port',
- help="Port to reach the application at.")
- parser.add_argument('-H', '--host', dest='host', default='localhost',
- help="Host to reach the application at.")
- return parser
-
-def parser_add_verify_args(parser):
- parser.add_argument('-u', '--update', dest='update', action='store_true',
- help='Do not verify, but OVERWRITE transcripts based on'
- ' the application\'s current behavior. OVERWRITES TRANSCRIPT'
- ' FILES.')
- parser.add_argument('-v', '--verbose', action='store_true',
- help='Print commands and application output')
- parser.add_argument('transcript_files', nargs='*', help='transcript file(s) to verify')
- return parser
-
-def parser_add_run_args(parser):
- parser.add_argument('-O', '--output', dest='output_path',
- help="Write command results to a file instead of stdout."
- "('-O -' writes to stdout and is the default)")
- parser.add_argument('-c', '--command', dest='cmd_str',
- help="Run this command (before reading input files, if any)."
- " multiple commands may be separated by ';'")
- parser.add_argument('cmd_files', nargs='*', help='file(s) with plain commands to run')
- return parser
-
-def main_run_commands(run_app_str, output_path, cmd_str, cmd_files, interact):
- to_stdout = False
- if not output_path or output_path == '-':
- to_stdout = True
- output = sys.stdout
- else:
- output = open(output_path, 'w')
-
- application = None
-
- if run_app_str:
- application = Application(run_app_str, quiet=to_stdout)
- application.run()
-
- try:
- interact.connect()
-
- if cmd_str:
- interact.feed_commands(output, cmd_str.split(';'))
-
- for f_path in (cmd_files or []):
- with open(f_path, 'r') as f:
- interact.feed_commands(output, f.read().decode('utf-8').splitlines())
-
- if not (cmd_str or cmd_files):
- while True:
- line = sys.stdin.readline()
- if not line:
- break;
- interact.feed_commands(output, line.split(';'))
- except:
- traceback.print_exc()
- finally:
- if not to_stdout:
- try:
- output.close()
- except:
- traceback.print_exc()
-
- try:
- interact.close()
- except:
- traceback.print_exc()
-
- if application:
- try:
- application.stop()
- except:
- traceback.print_exc()
-
-def main_verify_transcripts(run_app_str, transcript_files, interact, verbose):
- results = []
- for t in transcript_files:
- passed = verify_application(run_app_str=run_app_str,
- interact=interact,
- transcript_file=t,
- verbose=verbose)
- results.append((passed, t))
-
- print('\nRESULTS:')
- all_passed = True
- for passed, t in results:
- print('%s: %s' % ('pass' if passed else 'FAIL', t))
- all_passed = all_passed and passed
- print()
-
- if not all_passed:
- sys.exit(1)
-
-# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
diff --git a/osmopy/osmo_interact_ctrl.py b/osmopy/osmo_interact_ctrl.py
deleted file mode 100755
index 9b1a20b..0000000
--- a/osmopy/osmo_interact_ctrl.py
+++ /dev/null
@@ -1,100 +0,0 @@
-#!/usr/bin/env python3
-#
-# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
-# All rights reserved.
-#
-# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-'''
-Run CTRL commands or test transcripts against a given application. Commandline
-invocation exposes only direct command piping, the transcript verification code
-is exposed as commandline args by osmo_verify_transcript_ctrl.py.
-'''
-
-import re
-
-from osmopy.osmo_interact_common import *
-from osmopy.osmo_ipa import Ctrl, IPA
-
-class InteractCtrl(Interact):
- next_id = 1
- keep_ids = True
- re_command = re.compile('^(SET|GET) ([^ ]*) (.*)$')
-
- class CtrlStep(Interact.StepBase):
-
- @staticmethod
- def is_next_step(line, interact_instance):
- m = InteractCtrl.re_command.match(line)
- if not m:
- return None
- next_step = InteractCtrl.CtrlStep()
-
- set_get = m.group(1)
- cmd_id = m.group(2)
- var_val = m.group(3)
- if not interact_instance.keep_ids:
- cmd_id = interact_instance.next_id
- interact_instance.next_id += 1
- next_step.command = '%s %s %s' % (set_get, cmd_id, var_val)
-
- return next_step
-
- def __init__(self, port, host, verbose=False, update=False, keep_ids=True):
- if not update:
- keep_ids = True
- self.keep_ids = keep_ids
- super().__init__(InteractCtrl.CtrlStep, port=port, host=host, verbose=verbose, update=update)
-
- def connect(self):
- self.next_id = 1
- super().connect()
-
- def send(self, data):
- data = Ctrl().add_header(data)
- return self.socket.send(data) == len(data)
-
- def receive(self):
- responses = []
- data = self.socket.recv(4096)
- while (len(data)>0):
- (response_with_header, data) = IPA().split_combined(data)
- response = Ctrl().rem_header(response_with_header)
- responses.append(response.decode('utf-8'))
- return responses
-
- def command(self, command):
- assert self.send(command)
- res = self.receive()
- split_responses = []
- for r in res:
- split_responses.extend(r.splitlines())
- sys.stdout.flush()
- sys.stderr.flush()
- return split_responses
-
-if __name__ == '__main__':
- parser = common_parser()
- parser_add_run_args(parser)
- args = parser.parse_args()
-
- interact = InteractCtrl(args.port, args.host, verbose=False, update=False,
- keep_ids=True)
-
- main_run_commands(args.run_app_str, args.output_path, args.cmd_str,
- args.cmd_files, interact)
-
-# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
diff --git a/osmopy/osmo_interact_vty.py b/osmopy/osmo_interact_vty.py
deleted file mode 100755
index b57cd8c..0000000
--- a/osmopy/osmo_interact_vty.py
+++ /dev/null
@@ -1,180 +0,0 @@
-#!/usr/bin/env python3
-#
-# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
-# All rights reserved.
-#
-# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-'''
-Run VTY commands or test transcripts against a given application. Commandline
-invocation exposes only direct command piping, the transcript verification code
-is exposed as commandline args by osmo_verify_transcript_vty.py.
-'''
-
-import re
-
-from osmopy.osmo_interact_common import *
-
-class InteractVty(Interact):
-
- class VtyStep(Interact.StepBase):
- expect_node = None # e.g. '(config-net)'
- expect_prompt_char = None # '>' or '#'
-
- def __init__(self, prompt):
- super().__init__()
- self.prompt = prompt
-
- def verify_interact_state(self, interact_instance):
- if interact_instance.update:
- return
- if interact_instance.this_node != self.expect_node:
- raise Exception('Mismatch: expected VTY node %r in the prompt, got %r'
- % (self.expect_node, interact_instance.this_node))
- if interact_instance.this_prompt_char != self.expect_prompt_char:
- raise Exception('Mismatch: expected VTY prompt character %r, got %r'
- % (self.expect_prompt_char, interact_instance.this_prompt_char))
-
- @staticmethod
- def is_next_step(line, interact_instance):
- m = interact_instance.re_prompt.match(line)
- if not m:
- return None
- next_step = InteractVty.VtyStep(interact_instance.prompt)
- next_step.expect_node = m.group(1)
- next_step.expect_prompt_char = m.group(2)
- next_step.command = m.group(3)
- return next_step
-
- def command_str(self, interact_instance=None):
- if interact_instance is None:
- node = self.expect_node
- prompt_char = self.expect_prompt_char
- else:
- node = interact_instance.last_node
- prompt_char = interact_instance.last_prompt_char
- if node:
- node = '(%s)' % node
- node = node or ''
- return '%s%s%s %s' % (self.prompt, node, prompt_char, self.command)
-
- def __init__(self, prompt, port, host, verbose, update):
- self.prompt = prompt
- super().__init__(InteractVty.VtyStep, port, host, verbose, update)
-
- def connect(self):
- self.this_node = None
- self.this_prompt_char = '>' # slight cheat for initial prompt char
- self.last_node = None
- self.last_prompt_char = None
-
- super().connect()
- # receive the first welcome message and discard
- data = self.socket.recv(4096)
- if not self.prompt:
- b = data
- b = b[b.rfind(b'\n') + 1:]
- while b and (b[0] < ord('A') or b[0] > ord('z')):
- b = b[1:]
- prompt_str = b.decode('utf-8')
- if '>' in prompt_str:
- self.prompt = prompt_str[:prompt_str.find('>')]
- if not self.prompt:
- raise Exception('Could not find application name; needed to decode prompts.'
- ' Initial data was: %r' % data)
- self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)
-
- def _command(self, command_str, timeout=10):
- self.socket.send(command_str.encode())
-
- waited_since = time.time()
- received_lines = []
- last_line = ''
-
- while True:
- new_data = self.socket.recv(4096).decode('utf-8')
-
- last_line = "%s%s" % (last_line, new_data)
-
- if last_line:
- lines = last_line.splitlines()
- received_lines.extend(lines[:-1])
- last_line = lines[-1]
-
- match = self.re_prompt.match(last_line)
- if not match:
- if time.time() - waited_since > timeout:
- raise IOError("Failed to read data (did the app crash?)")
- time.sleep(.1)
- continue
-
- self.last_node = self.this_node
- self.last_prompt_char = self.this_prompt_char
- self.this_node = match.group(1) or None
- self.this_prompt_char = match.group(2)
- break
-
- # expecting to have received the command we sent as echo, remove it
- clean_command_str = command_str.strip()
- if clean_command_str.endswith('?'):
- clean_command_str = clean_command_str[:-1]
- if received_lines and received_lines[0] == clean_command_str:
- received_lines = received_lines[1:]
- return received_lines
-
- def command(self, command_str, timeout=10):
- command_str = command_str or '\r'
- if command_str[-1] not in '?\r\t':
- command_str = command_str + '\r'
-
- received_lines = self._command(command_str, timeout)
-
- # send escape to cancel the '?' command line
- if command_str[-1] == '?':
- self._command('\x03', timeout)
-
- return received_lines
-
-def parser_add_vty_args(parser):
- parser.add_argument('-n', '--prompt-name', dest='prompt',
- help="Name used in application's telnet VTY prompt."
- " If omitted, will attempt to determine the name from"
- " the initial VTY prompt.")
- return parser
-
-if __name__ == '__main__':
- parser = common_parser()
- parser_add_vty_args(parser)
- parser_add_run_args(parser)
- parser.add_argument('-X', '--gen-xml-ref', dest='gen_xml', action='store_true',
- help="Equivalent to '-c \"show online-help\" -O -',"
- " can be used to generate the VTY reference file as"
- " required by osmo-gsm-manuals.git.")
- args = parser.parse_args()
-
- if args.gen_xml:
- if args.cmd_str:
- raise Exception('It does not make sense to pass both --command and'
- ' --gen-xml-ref.')
- args.cmd_str = 'show online-help'
-
- interact = InteractVty(args.prompt, args.port, args.host,
- verbose=False, update=False)
-
- main_run_commands(args.run_app_str, args.output_path, args.cmd_str,
- args.cmd_files, interact)
-
-# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
diff --git a/osmopy/osmo_verify_transcript_ctrl.py b/osmopy/osmo_verify_transcript_ctrl.py
deleted file mode 100755
index 3afbc62..0000000
--- a/osmopy/osmo_verify_transcript_ctrl.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#!/usr/bin/env python3
-#
-# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
-# All rights reserved.
-#
-# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-'''
-Run CTRL test transcripts against a given application.
-
-A CTRL transcript contains CTRL commands and their expected results.
-It looks like:
-
-"
-SET 1 var val
-SET_REPLY 1 var OK
-GET 2 var
-GET_REPLY 2 var val
-"
-
-The application to be tested is described by
-- a binary to run,
-- command line arguments to pass to the binary,
-- the CTRL port.
-
-This module can either be run directly to run or update a given CTRL transcript,
-or it can be imported as a module to run more complex setups.
-'''
-
-from osmopy.osmo_interact_ctrl import *
-
-if __name__ == '__main__':
- parser = common_parser()
- parser_add_verify_args(parser)
- parser.add_argument('-i', '--keep-ids', dest='keep_ids', action='store_true',
- help='With --update, default is to overwrite the command IDs'
- ' so that they are consecutive numbers starting from 1.'
- ' With --keep-ids, do not change these command IDs.')
- args = parser.parse_args()
-
- interact = InteractCtrl(args.port, args.host, args.verbose, args.update, args.keep_ids)
-
- main_verify_transcripts(args.run_app_str, args.transcript_files, interact, args.verbose)
-
-# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
diff --git a/osmopy/osmo_verify_transcript_vty.py b/osmopy/osmo_verify_transcript_vty.py
deleted file mode 100755
index e70c36c..0000000
--- a/osmopy/osmo_verify_transcript_vty.py
+++ /dev/null
@@ -1,67 +0,0 @@
-#!/usr/bin/env python3
-#
-# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
-# All rights reserved.
-#
-# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-'''
-Run VTY test transcripts against a given application.
-
-A VTY transcript contains VTY commands and their expected results.
-It looks like:
-
-"
-OsmoHLR> enable
-
-OsmoHLR# subscriber show imsi 123456789023000
-% No subscriber for imsi = '123456789023000'
-OsmoHLR# subscriber show msisdn 12345
-% No subscriber for msisdn = '12345'
-
-OsmoHLR# subscriber create imsi 123456789023000
-% Created subscriber 123456789023000
- ID: 1
- IMSI: 123456789023000
- MSISDN: none
- No auth data
-"
-
-The application to be tested is described by
-- a binary to run,
-- command line arguments to pass to the binary,
-- the VTY telnet port,
-- the application name as printed in the VTY prompt.
-
-This module can either be run directly to run or update a given VTY transcript,
-or it can be imported as a module to run more complex setups.
-'''
-
-import re
-
-from osmopy.osmo_interact_vty import *
-
-if __name__ == '__main__':
- parser = common_parser()
- parser_add_vty_args(parser)
- parser_add_verify_args(parser)
- args = parser.parse_args()
-
- interact = InteractVty(args.prompt, args.port, args.host, args.verbose, args.update)
-
- main_verify_transcripts(args.run_app_str, args.transcript_files, interact, args.verbose)
-
-# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
diff --git a/osmopy/osmodumpdoc.py b/osmopy/osmodumpdoc.py
deleted file mode 100644
index 2464b05..0000000
--- a/osmopy/osmodumpdoc.py
+++ /dev/null
@@ -1,96 +0,0 @@
-#!/usr/bin/env python
-
-# Make sure this code is in sync with the BTS directory.
-# Fixes may need to be applied to both.
-
-"""Start the process and dump the documentation to the doc dir."""
-
-import subprocess
-import time
-import os
-import sys
-
-import osmopy.obscvty as obscvty
-import osmopy.osmoutil as osmoutil
-
-
-def dump_doc(name, port, filename):
- vty = obscvty.VTYInteract(name, "127.0.0.1", port)
- xml = vty.command("show online-help")
- # Now write everything until the end to the file
- out = open(filename, 'w')
- out.write(xml)
- out.close()
- print 'generated %r' % filename
-
-
-"""Dump the config of all the apps.
-
-Returns the number of apps configs could not be dumped for."""
-
-
-def dump_configs(apps, configs, confpath):
- failures = 0
- successes = 0
-
- try: # make sure the doc directory exists
- os.mkdir('doc')
- except OSError: # it probably does
- pass
-
- for app in apps:
- appname = app[3]
- print "Starting app for %s" % appname
- proc = None
- cmd = [app[1], "-c", os.path.join(confpath, configs[appname][0])]
- print 'cd', os.path.abspath(os.path.curdir), ';', ' '.join(cmd)
- try:
- proc = subprocess.Popen(cmd, stdin=None, stdout=None)
- except OSError as e: # Probably a missing binary
- print >> sys.stderr, e
- print >> sys.stderr, "Skipping app %s" % appname
- failures += 1
- else:
- try:
- dump_doc(app[2], app[0], 'doc/%s_vty_reference.xml' % appname)
- successes += 1
- except IOError: # Generally a socket issue
- print >> sys.stderr, "%s: couldn't connect, skipping" % appname
- failures += 1
- finally:
- osmoutil.end_proc(proc)
-
- return (failures, successes)
-
-
-if __name__ == '__main__':
- import argparse
-
- confpath = "."
- workdir = "."
-
- parser = argparse.ArgumentParser()
- parser.add_argument("-p", "--pythonconfpath", dest="p",
- help="searchpath for config (osmoappdesc)")
- parser.add_argument("-w", "--workdir", dest="w",
- help="Working directory to run in")
- args = parser.parse_args()
-
- if args.p:
- confpath = args.p
-
- if args.w:
- workdir = args.w
-
- osmoappdesc = osmoutil.importappconf_or_quit(
- confpath, "osmoappdesc", args.p)
-
- confpath = os.path.relpath(confpath, workdir)
- os.chdir(workdir)
- num_fails, num_sucs = dump_configs(
- osmoappdesc.apps, osmoappdesc.app_configs, confpath)
- if num_fails > 0:
- print >> sys.stderr, "Warning: Skipped %s apps" % num_fails
- if 0 == num_sucs:
- print >> sys.stderr, "Nothing run, wrong working dir? Set with -w"
- sys.exit(num_fails)
diff --git a/osmopy/osmotestconfig.py b/osmopy/osmotestconfig.py
deleted file mode 100644
index 2132c43..0000000
--- a/osmopy/osmotestconfig.py
+++ /dev/null
@@ -1,220 +0,0 @@
-#!/usr/bin/env python
-
-# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import os.path
-import time
-import sys, shutil, stat
-import tempfile
-
-import osmopy.obscvty as obscvty
-import osmopy.osmoutil as osmoutil
-
-
-# Return true iff all the tests for the given config pass
-def test_config(app_desc, config, tmpdir, verbose=True):
- try:
- err = 0
- if test_config_atest(app_desc, config, verify_doc, verbose)[0] > 0:
- err += 1
-
- newconfig = copy_config(tmpdir, config)
- if test_config_atest(app_desc, newconfig, write_config, verbose) > 0:
- err += 1
-
- if test_config_atest(app_desc, newconfig, token_vty_command, verbose) > 0:
- err += 1
-
- return err
-
- # If there's a socket error, skip the rest of the tests for this config
- except IOError:
- return 1
-
-
-def test_config_atest(app_desc, config, run_test, verbose=True):
- proc = None
- ret = None
- vty = None
- try:
- cmd = app_desc[1].split(' ') + [ "-c", config]
- if verbose:
- print "Verifying %s, test %s" % (' '.join(cmd), run_test.__name__)
-
- proc = osmoutil.popen_devnull(cmd)
- end = app_desc[2]
- port = app_desc[0]
- vty = obscvty.VTYInteract(end, "127.0.0.1", port)
- ret = run_test(vty)
-
- except IOError as se:
- print >> sys.stderr, "Failed to verify %s" % ' '.join(cmd)
- print >> sys.stderr, "Current directory: %s" % os.getcwd()
- print >> sys.stderr, "Error was %s" % se
- print >> sys.stderr, "Config was\n%s" % open(config).read()
- raise se
-
- finally:
- if proc:
- osmoutil.end_proc(proc)
- if vty:
- vty._close_socket()
-
- return ret
-
-def copy_config(dirname, config):
- shutil.rmtree(dirname, True)
- ign = shutil.ignore_patterns('*.cfg')
- shutil.copytree(os.path.dirname(config), dirname, ignore=ign)
- os.chmod(dirname, stat.S_IRWXU)
-
- try:
- os.stat(dirname)
- except OSError:
- os.mkdir(dirname)
-
- prefix = os.path.basename(config)
- tmpfile = tempfile.NamedTemporaryFile(
- dir=dirname, prefix=prefix, delete=False)
- tmpfile.write(open(config).read())
- tmpfile.close()
- # This works around the precautions NamedTemporaryFile is made for...
- return tmpfile.name
-
-
-def write_config(vty):
- new_config = vty.enabled_command("write")
- if not new_config.startswith("Configuration saved to "):
- print(new_config)
- return 1, [new_config]
- return 0
-
-
-# The only purpose of this function is to verify a working vty
-def token_vty_command(vty):
- vty.command("help")
- return 0
-
-
-# This may warn about the same doc missing multiple times, by design
-def verify_doc(vty):
- xml = vty.command("show online-help")
- split_at = "<command"
- all_errs = []
- for command in xml.split(split_at):
- if "(null)" in command:
- lines = command.split("\n")
- cmd_line = split_at + lines[0]
- err_lines = []
- for line in lines:
- if '(null)' in line:
- err_lines.append(line)
-
- all_errs.append(err_lines)
-
- print >> sys.stderr, \
- "Documentation error (missing docs): \n%s\n%s\n" % (
- cmd_line, '\n'.join(err_lines))
-
- return (len(all_errs), all_errs)
-
-
-# Skip testing the configurations of anything that hasn't been compiled
-def app_exists(app_desc):
- cmd = app_desc[1].split(' ')[0]
- return os.path.exists(cmd)
-
-
-def remove_tmpdir(tmpdir):
- files = os.listdir(tmpdir)
- for f in files:
- os.unlink(os.path.join(tmpdir, f))
- os.rmdir(tmpdir)
-
-
-def check_configs_tested(basedir, app_configs, ignore_configs):
- configs = []
- for root, dirs, files in os.walk(basedir):
- for f in files:
- if f.endswith(".cfg") and f not in ignore_configs:
- configs.append(os.path.join(root, f))
- for config in configs:
- found = False
- for app in app_configs:
- if config in app_configs[app]:
- found = True
- if not found:
- print >> sys.stderr, "Warning: %s is not being tested" % config
-
-
-def test_all_apps(apps, app_configs, tmpdir="writtenconfig", verbose=True,
- confpath=".", rmtmp=False, ignore_configs=[]):
- check_configs_tested("doc/examples/", app_configs, ignore_configs)
- errors = 0
- for app in apps:
- if not app_exists(app):
- print >> sys.stderr, "Skipping app %s (not found)" % app[1]
- continue
-
- configs = app_configs[app[3]]
- for config in configs:
- config = os.path.join(confpath, config)
- errors += test_config(app, config, tmpdir, verbose)
-
- if rmtmp or not errors:
- remove_tmpdir(tmpdir)
-
- if errors:
- print >> sys.stderr, "ERRORS: %d" % errors
- return errors
-
-
-if __name__ == '__main__':
- import argparse
-
- confpath = "."
- wordir = "."
-
- parser = argparse.ArgumentParser()
- parser.add_argument("--e1nitb", action="store_true", dest="e1nitb")
- parser.add_argument("-v", "--verbose", dest="verbose",
- action="store_true", help="verbose mode")
- parser.add_argument("-p", "--pythonconfpath", dest="p",
- help="searchpath for config")
- parser.add_argument("-w", "--workdir", dest="w",
- help="Working directory to run in")
-
- args = parser.parse_args()
-
- if args.p:
- confpath = args.p
-
- if args.w:
- workdir = args.w
-
- osmoappdesc = osmoutil.importappconf_or_quit(confpath, "osmoappdesc",
- args.p)
-
- apps = osmoappdesc.apps
- configs = osmoappdesc.app_configs
- ignores = getattr(osmoappdesc, 'ignore_configs', [])
-
- if args.e1nitb:
- configs['nitb'].extend(osmoappdesc.nitb_e1_configs)
-
- os.chdir(workdir)
- sys.exit(test_all_apps(apps, configs, ignore_configs=ignores,
- confpath=confpath, verbose=args.verbose))
diff --git a/osmopy/osmotestvty.py b/osmopy/osmotestvty.py
deleted file mode 100644
index e513c05..0000000
--- a/osmopy/osmotestvty.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#!/usr/bin/env python
-
-# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com>
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import time
-import unittest
-
-import osmopy.obscvty as obscvty
-import osmopy.osmoutil as osmoutil
-
-confpath = '.'
-
-"""Test a VTY. Warning: osmoappdesc must be imported first."""
-
-
-class TestVTY(unittest.TestCase):
-
- def setUp(self):
- osmo_vty_cmd = osmoappdesc.vty_command[:]
- config_index = osmo_vty_cmd.index('-c')
- if config_index:
- cfi = config_index + 1
- osmo_vty_cmd[cfi] = os.path.join(confpath, osmo_vty_cmd[cfi])
-
- try:
- print "Launch: %s from %s" % (' '.join(osmo_vty_cmd), os.getcwd())
- self.proc = osmoutil.popen_devnull(osmo_vty_cmd)
- except OSError:
- print >> sys.stderr, "Current directory: %s" % os.getcwd()
- print >> sys.stderr, "Consider setting -b"
-
- appstring = osmoappdesc.vty_app[2]
- appport = osmoappdesc.vty_app[0]
- self.vty = obscvty.VTYInteract(appstring, "127.0.0.1", appport)
-
- def tearDown(self):
- self.vty._close_socket()
- self.vty = None
- osmoutil.end_proc(self.proc)
-
- def test_history(self):
- t1 = "show version"
- self.vty.command(t1)
- test_str = "show history"
- assert(self.vty.w_verify(test_str, [t1]))
-
- def test_unknown_command(self):
- test_str = "help show"
- assert(self.vty.verify(test_str, ['% Unknown command.']))
-
- def test_terminal_length(self):
- test_str = "terminal length 20"
- assert(self.vty.verify(test_str, ['']))
-
-
-if __name__ == '__main__':
- import argparse
- import os
- import sys
-
- workdir = '.'
-
- parser = argparse.ArgumentParser()
- parser.add_argument("-v", "--verbose", dest="verbose",
- action="store_true", help="verbose mode")
- parser.add_argument("-p", "--pythonconfpath", dest="p",
- help="searchpath for config")
- parser.add_argument("-w", "--workdir", dest="w",
- help="Working directory")
- args = parser.parse_args()
-
- verbose_level = 1
- if args.verbose:
- verbose_level = 2
-
- if args.w:
- workdir = args.w
-
- if args.p:
- confpath = args.p
- osmoappdesc = osmoutil.importappconf_or_quit(confpath, "osmoappdesc",
- args.p)
-
- print "confpath %s, workdir %s" % (confpath, workdir)
- os.chdir(workdir)
- print "Running tests for specific VTY commands"
- suite = unittest.TestLoader().loadTestsFromTestCase(TestVTY)
- res = unittest.TextTestRunner(verbosity=verbose_level).run(suite)
- sys.exit(len(res.errors) + len(res.failures))
diff --git a/osmopy/soap.py b/osmopy/soap.py
deleted file mode 100755
index f1da8f2..0000000
--- a/osmopy/soap.py
+++ /dev/null
@@ -1,188 +0,0 @@
-#!/usr/bin/python3
-# -*- mode: python-mode; py-indent-tabs-mode: nil -*-
-"""
-/*
- * Copyright (C) 2016 sysmocom s.f.m.c. GmbH
- *
- * All Rights Reserved
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-"""
-
-__version__ = "0.7.1" # bump this on every non-trivial change
-
-from twisted.internet import defer, reactor
-from twisted_ipa import CTRL, IPAFactory, __version__ as twisted_ipa_version
-from osmopy.osmo_ipa import Ctrl
-from treq import post, collect
-from suds.client import Client
-from functools import partial
-from distutils.version import StrictVersion as V # FIXME: use NormalizedVersion from PEP-386 when available
-import argparse, datetime, signal, sys, os, logging, logging.handlers
-
-# we don't support older versions of TwistedIPA module
-assert V(twisted_ipa_version) > V('0.4')
-
-# keys from OpenBSC openbsc/src/libbsc/bsc_rf_ctrl.c, values SOAP-specific
-oper = { 'inoperational' : 0, 'operational' : 1 }
-admin = { 'locked' : 0, 'unlocked' : 1 }
-policy = { 'off' : 0, 'on' : 1, 'grace' : 2, 'unknown' : 3 }
-
-# keys from OpenBSC openbsc/src/libbsc/bsc_vty.c
-fix = { 'invalid' : 0, 'fix2d' : 1, 'fix3d' : 1 } # SOAP server treats it as boolean but expects int
-
-
-def handle_reply(p, f, log, r):
- """
- Reply handler: takes function p to process raw SOAP server reply r, function f to run for each command and verbosity flag v
- """
- repl = p(r) # result is expected to have both commands[] array and error string (could be None)
- bsc_id = repl.commands[0].split()[0].split('.')[3] # we expect 1st command to have net.0.bsc.666.bts.2.trx.1 location prefix format
- log.info("Received SOAP response for BSC %s with %d commands, error status: %s" % (bsc_id, len(repl.commands), repl.error))
- log.debug("BSC %s commands: %s" % (bsc_id, repl.commands))
- for t in repl.commands: # Process OpenBscCommands format from .wsdl
- (_, m) = Ctrl().cmd(*t.split())
- f(m)
-
-
-class Trap(CTRL):
- """
- TRAP handler (agnostic to factory's client object)
- """
- def ctrl_TRAP(self, data, op_id, v):
- """
- Parse CTRL TRAP and dispatch to appropriate handler after normalization
- """
- (l, r) = v.split()
- loc = l.split('.')
- t_type = loc[-1]
- p = partial(lambda a, i: a[i] if len(a) > i else None, loc) # parse helper
- method = getattr(self, 'handle_' + t_type.replace('-', ''), lambda: "Unhandled %s trap" % t_type)
- method(p(1), p(3), p(5), p(7), r) # we expect net.0.bsc.666.bts.2.trx.1 format for trap prefix
-
- def ctrl_SET_REPLY(self, data, _, v):
- """
- Debug log for replies to our commands
- """
- self.factory.log.debug('SET REPLY %s' % v)
-
- def ctrl_ERROR(self, data, op_id, v):
- """
- We want to know if smth went wrong
- """
- self.factory.log.debug('CTRL ERROR [%s] %s' % (op_id, v))
-
- def connectionMade(self):
- """
- Logging wrapper, calling super() is necessary not to break reconnection logic
- """
- self.factory.log.info("Connected to CTRL@%s:%d" % (self.factory.host, self.factory.port))
- super(CTRL, self).connectionMade()
-
- @defer.inlineCallbacks
- def handle_locationstate(self, net, bsc, bts, trx, data):
- """
- Handle location-state TRAP: parse trap content, build SOAP context and use treq's routines to post it while setting up async handlers
- """
- (ts, fx, lat, lon, height, opr, adm, pol, mcc, mnc) = data.split(',')
- tstamp = datetime.datetime.fromtimestamp(float(ts)).isoformat()
- self.factory.log.debug('location-state@%s.%s.%s.%s (%s) [%s/%s] => %s' % (net, bsc, bts, trx, tstamp, mcc, mnc, data))
- ctx = self.factory.client.registerSiteLocation(bsc, float(lon), float(lat), fix.get(fx, 0), tstamp, oper.get(opr, 2), admin.get(adm, 2), policy.get(pol, 3))
- d = post(self.factory.location, ctx.envelope)
- d.addCallback(collect, partial(handle_reply, ctx.process_reply, self.transport.write, self.factory.log)) # treq's collect helper is handy to get all reply content at once using closure on ctx
- d.addErrback(lambda e, bsc: self.factory.log.critical("HTTP POST error %s while trying to register BSC %s" % (e, bsc)), bsc) # handle HTTP errors
- # Ensure that we run only limited number of requests in parallel:
- yield self.factory.semaphore.acquire()
- yield d # we end up here only if semaphore is available which means it's ok to fire the request without exceeding the limit
- self.factory.semaphore.release()
-
- def handle_notificationrejectionv1(self, net, bsc, bts, trx, data):
- """
- Handle notification-rejection-v1 TRAP: just an example to show how more message types can be handled
- """
- self.factory.log.debug('notification-rejection-v1@bsc-id %s => %s' % (bsc, data))
-
-
-class TrapFactory(IPAFactory):
- """
- Store SOAP client object so TRAP handler can use it for requests
- """
- location = None
- log = None
- semaphore = None
- client = None
- host = None
- port = None
- def __init__(self, host, port, proto, semaphore, log, wsdl=None, location=None):
- self.host = host # for logging only,
- self.port = port # seems to be no way to get it from ReconnectingClientFactory
- self.log = log
- self.semaphore = semaphore
- soap = Client(wsdl, location=location, nosend=True) # make async SOAP client
- self.location = location.encode() if location else soap.wsdl.services[0].ports[0].location # necessary for dispatching HTTP POST via treq
- self.client = soap.service
- level = self.log.getEffectiveLevel()
- self.log.setLevel(logging.WARNING) # we do not need excessive debug from lower levels
- super(TrapFactory, self).__init__(proto, self.log)
- self.log.setLevel(level)
- self.log.debug("Using IPA %s, SUDS client: %s" % (Ctrl.version, soap))
-
-
-def reloader(path, script, log, dbg1, dbg2, signum, _):
- """
- Signal handler: we have to use execl() because twisted's reactor is not restartable due to some bug in twisted implementation
- """
- log.info("Received Signal %d - restarting..." % signum)
- if signum == signal.SIGUSR1 and dbg1 not in sys.argv and dbg2 not in sys.argv:
- sys.argv.append(dbg1) # enforce debug
- if signum == signal.SIGUSR2 and (dbg1 in sys.argv or dbg2 in sys.argv): # disable debug
- if dbg1 in sys.argv:
- sys.argv.remove(dbg1)
- if dbg2 in sys.argv:
- sys.argv.remove(dbg2)
- os.execl(path, script, *sys.argv[1:])
-
-
-if __name__ == '__main__':
- p = argparse.ArgumentParser(description='Proxy between given SOAP service and Osmocom CTRL protocol.')
- p.add_argument('-v', '--version', action='version', version=("%(prog)s v" + __version__))
- p.add_argument('-p', '--port', type=int, default=4250, help="Port to use for CTRL interface, defaults to 4250")
- p.add_argument('-c', '--ctrl', default='localhost', help="Adress to use for CTRL interface, defaults to localhost")
- p.add_argument('-w', '--wsdl', required=True, help="WSDL URL for SOAP")
- p.add_argument('-n', '--num', type=int, default=5, help="Max number of concurrent HTTP requests to SOAP server")
- p.add_argument('-d', '--debug', action='store_true', help="Enable debug log")
- p.add_argument('-o', '--output', action='store_true', help="Log to STDOUT in addition to SYSLOG")
- p.add_argument('-l', '--location', help="Override location found in WSDL file (don't use unless you know what you're doing)")
- args = p.parse_args()
-
- log = logging.getLogger('CTRL2SOAP')
- if args.debug:
- log.setLevel(logging.DEBUG)
- else:
- log.setLevel(logging.INFO)
- log.addHandler(logging.handlers.SysLogHandler('/dev/log'))
- if args.output:
- log.addHandler(logging.StreamHandler(sys.stdout))
-
- reboot = partial(reloader, os.path.abspath(__file__), os.path.basename(__file__), log, '-d', '--debug') # keep in sync with add_argument() call above
- signal.signal(signal.SIGHUP, reboot)
- signal.signal(signal.SIGQUIT, reboot)
- signal.signal(signal.SIGUSR1, reboot) # restart and enabled debug output
- signal.signal(signal.SIGUSR2, reboot) # restart and disable debug output
-
- log.info("SOAP proxy %s starting with PID %d ..." % (__version__, os.getpid()))
- reactor.connectTCP(args.ctrl, args.port, TrapFactory(args.ctrl, args.port, Trap, defer.DeferredSemaphore(args.num), log, args.wsdl, args.location))
- reactor.run()
diff --git a/osmopy/twisted_ipa.py b/osmopy/twisted_ipa.py
deleted file mode 100755
index bb8323d..0000000
--- a/osmopy/twisted_ipa.py
+++ /dev/null
@@ -1,384 +0,0 @@
-#!/usr/bin/python3
-# -*- mode: python-mode; py-indent-tabs-mode: nil -*-
-"""
-/*
- * Copyright (C) 2016 sysmocom s.f.m.c. GmbH
- *
- * All Rights Reserved
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-"""
-
-__version__ = "0.7.0" # bump this on every non-trivial change
-
-from osmopy.osmo_ipa import Ctrl, IPA
-from twisted.internet.protocol import ReconnectingClientFactory
-from twisted.internet import reactor
-from twisted.protocols import basic
-import argparse, logging, sys
-
-class IPACommon(basic.Int16StringReceiver):
- """
- Generic IPA protocol handler: include some routines for simpler subprotocols.
- It's not intended as full implementation of all subprotocols, rather common ground and example code.
- """
- def dbg(self, line):
- """
- Debug print helper
- """
- self.factory.log.debug(line)
-
- def osmo_CTRL(self, data):
- """
- OSMO CTRL protocol
- Placeholder, see corresponding derived class
- """
- pass
-
- def osmo_MGCP(self, data):
- """
- OSMO MGCP extension
- """
- self.dbg('OSMO MGCP received %s' % data)
-
- def osmo_LAC(self, data):
- """
- OSMO LAC extension
- """
- self.dbg('OSMO LAC received %s' % data)
-
- def osmo_SMSC(self, data):
- """
- OSMO SMSC extension
- """
- self.dbg('OSMO SMSC received %s' % data)
-
- def osmo_ORC(self, data):
- """
- OSMO ORC extension
- """
- self.dbg('OSMO ORC received %s' % data)
-
- def osmo_GSUP(self, data):
- """
- OSMO GSUP extension
- """
- self.dbg('OSMO GSUP received %s' % data)
-
- def osmo_OAP(self, data):
- """
- OSMO OAP extension
- """
- self.dbg('OSMO OAP received %s' % data)
-
- def osmo_UNKNOWN(self, data):
- """
- OSMO defaul extension handler
- """
- self.dbg('OSMO unknown extension received %s' % data)
-
- def handle_RSL(self, data, proto, extension):
- """
- RSL protocol handler
- """
- self.dbg('IPA RSL received message with extension %s' % extension)
-
- def handle_CCM(self, data, proto, msgt):
- """
- CCM (IPA Connection Management)
- Placeholder, see corresponding derived class
- """
- pass
-
- def handle_SCCP(self, data, proto, extension):
- """
- SCCP protocol handler
- """
- self.dbg('IPA SCCP received message with extension %s' % extension)
-
- def handle_OML(self, data, proto, extension):
- """
- OML protocol handler
- """
- self.dbg('IPA OML received message with extension %s' % extension)
-
- def handle_OSMO(self, data, proto, extension):
- """
- Dispatcher point for OSMO subprotocols based on extension name, lambda default should never happen
- """
- method = getattr(self, 'osmo_' + IPA().ext(extension), lambda: "extension dispatch failure")
- method(data)
-
- def handle_MGCP(self, data, proto, extension):
- """
- MGCP protocol handler
- """
- self.dbg('IPA MGCP received message with attribute %s' % extension)
-
- def handle_UNKNOWN(self, data, proto, extension):
- """
- Default protocol handler
- """
- self.dbg('IPA received message for %s (%s) protocol with attribute %s' % (IPA().proto(proto), proto, extension))
-
- def process_chunk(self, data):
- """
- Generic message dispatcher for IPA (sub)protocols based on protocol name, lambda default should never happen
- """
- (_, proto, extension, content) = IPA().del_header(data)
- if content is not None:
- self.dbg('IPA received %s::%s [%d/%d] %s' % (IPA().proto(proto), IPA().ext_name(proto, extension), len(data), len(content), content))
- method = getattr(self, 'handle_' + IPA().proto(proto), lambda: "protocol dispatch failure")
- method(content, proto, extension)
-
- def dataReceived(self, data):
- """
- Override for dataReceived from Int16StringReceiver because of inherently incompatible interpretation of length
- If default handler is used than we would always get off-by-1 error (Int16StringReceiver use equivalent of l + 2)
- """
- if len(data):
- (head, tail) = IPA().split_combined(data)
- self.process_chunk(head)
- self.dataReceived(tail)
-
- def connectionMade(self):
- """
- We have to resetDelay() here to drop internal state to default values to make reconnection logic work
- Make sure to call this via super() if overriding to keep reconnection logic intact
- """
- addr = self.transport.getPeer()
- self.dbg('IPA connected to %s:%d peer' % (addr.host, addr.port))
- self.factory.resetDelay()
-
-
-class CCM(IPACommon):
- """
- Implementation of CCM protocol for IPA multiplex
- """
- def ack(self):
- self.transport.write(IPA().id_ack())
-
- def ping(self):
- self.transport.write(IPA().ping())
-
- def pong(self):
- self.transport.write(IPA().pong())
-
- def handle_CCM(self, data, proto, msgt):
- """
- CCM (IPA Connection Management)
- Only basic logic necessary for tests is implemented (ping-pong, id ack etc)
- """
- if msgt == IPA.MSGT['ID_GET']:
- self.transport.getHandle().sendall(IPA().id_resp(self.factory.ccm_id))
- # if we call
- # self.transport.write(IPA().id_resp(self.factory.test_id))
- # instead, than we would have to also call
- # reactor.callLater(1, self.ack)
- # instead of self.ack()
- # otherwise the writes will be glued together - hence the necessity for ugly hack with 1s timeout
- # Note: this still might work depending on the IPA implementation details on the other side
- self.ack()
- # schedule PING in 4s
- reactor.callLater(4, self.ping)
- if msgt == IPA.MSGT['PING']:
- self.pong()
-
-
-class CTRL(IPACommon):
- """
- Implementation of Osmocom control protocol for IPA multiplex
- """
- def ctrl_SET(self, data, op_id, v):
- """
- Handle CTRL SET command
- """
- self.dbg('CTRL SET [%s] %s' % (op_id, v))
-
- def ctrl_SET_REPLY(self, data, op_id, v):
- """
- Handle CTRL SET reply
- """
- self.dbg('CTRL SET REPLY [%s] %s' % (op_id, v))
-
- def ctrl_GET(self, data, op_id, v):
- """
- Handle CTRL GET command
- """
- self.dbg('CTRL GET [%s] %s' % (op_id, v))
-
- def ctrl_GET_REPLY(self, data, op_id, v):
- """
- Handle CTRL GET reply
- """
- self.dbg('CTRL GET REPLY [%s] %s' % (op_id, v))
-
- def ctrl_TRAP(self, data, op_id, v):
- """
- Handle CTRL TRAP command
- """
- self.dbg('CTRL TRAP [%s] %s' % (op_id, v))
-
- def ctrl_ERROR(self, data, op_id, v):
- """
- Handle CTRL ERROR reply
- """
- self.dbg('CTRL ERROR [%s] %s' % (op_id, v))
-
- def osmo_CTRL(self, data):
- """
- OSMO CTRL message dispatcher, lambda default should never happen
- For basic tests only, appropriate handling routines should be replaced: see CtrlServer for example
- """
- self.dbg('OSMO CTRL received %s::%s' % Ctrl().parse(data.decode('utf-8')))
- (cmd, op_id, v) = data.decode('utf-8').split(' ', 2)
- method = getattr(self, 'ctrl_' + cmd, lambda: "CTRL unknown command")
- method(data, op_id, v)
-
-
-class IPAServer(CCM):
- """
- Test implementation of IPA server
- Demonstrate CCM opearation by overriding necessary bits from CCM
- """
- def connectionMade(self):
- """
- Keep reconnection logic working by calling routine from CCM
- Initiate CCM upon connection
- """
- addr = self.transport.getPeer()
- self.factory.log.info('IPA server: connection from %s:%d client' % (addr.host, addr.port))
- super(IPAServer, self).connectionMade()
- self.transport.write(IPA().id_get())
-
-
-class CtrlServer(CTRL):
- """
- Test implementation of CTRL server
- Demonstarte CTRL handling by overriding simpler routines from CTRL
- """
- def connectionMade(self):
- """
- Keep reconnection logic working by calling routine from CTRL
- Send TRAP upon connection
- Note: we can't use sendString() because of it's incompatibility with IPA interpretation of length prefix
- """
- addr = self.transport.getPeer()
- self.factory.log.info('CTRL server: connection from %s:%d client' % (addr.host, addr.port))
- super(CtrlServer, self).connectionMade()
- self.transport.write(Ctrl().trap('LOL', 'what'))
- self.transport.write(Ctrl().trap('rulez', 'XXX'))
-
- def reply(self, r):
- self.transport.write(Ctrl().add_header(r))
-
- def ctrl_SET(self, data, op_id, v):
- """
- CTRL SET command: always succeed
- """
- self.dbg('SET [%s] %s' % (op_id, v))
- self.reply('SET_REPLY %s %s' % (op_id, v))
-
- def ctrl_GET(self, data, op_id, v):
- """
- CTRL GET command: always fail
- """
- self.dbg('GET [%s] %s' % (op_id, v))
- self.reply('ERROR %s No variable found' % op_id)
-
-
-class IPAFactory(ReconnectingClientFactory):
- """
- Generic IPA Client Factory which can be used to store state for various subprotocols and manage connections
- Note: so far we do not really need separate Factory for acting as a server due to protocol simplicity
- """
- protocol = IPACommon
- log = None
- ccm_id = IPA().identity(unit=b'1515/0/1', mac=b'b0:0b:fa:ce:de:ad:be:ef', utype=b'sysmoBTS', name=b'StingRay', location=b'hell', sw=IPA.version.encode('utf-8'))
-
- def __init__(self, proto=None, log=None, ccm_id=None):
- if proto:
- self.protocol = proto
- if ccm_id:
- self.ccm_id = ccm_id
- if log:
- self.log = log
- else:
- self.log = logging.getLogger('IPAFactory')
- self.log.setLevel(logging.CRITICAL)
- self.log.addHandler(logging.NullHandler)
-
- def clientConnectionFailed(self, connector, reason):
- """
- Only necessary for as debugging aid - if we can somehow set parent's class noisy attribute then we can omit this method
- """
- self.log.warning('IPAFactory connection failed: %s' % reason.getErrorMessage())
- ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
-
- def clientConnectionLost(self, connector, reason):
- """
- Only necessary for as debugging aid - if we can somehow set parent's class noisy attribute then we can omit this method
- """
- self.log.warning('IPAFactory connection lost: %s' % reason.getErrorMessage())
- ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
-
-
-if __name__ == '__main__':
- p = argparse.ArgumentParser("Twisted IPA (module v%s) app" % IPA.version)
- p.add_argument('-v', '--version', action='version', version="%(prog)s v" + __version__)
- p.add_argument('-p', '--port', type=int, default=4250, help="Port to use for CTRL interface")
- p.add_argument('-d', '--host', default='localhost', help="Adress to use for CTRL interface")
- cs = p.add_mutually_exclusive_group()
- cs.add_argument("-c", "--client", action='store_true', help="asume client role")
- cs.add_argument("-s", "--server", action='store_true', help="asume server role")
- ic = p.add_mutually_exclusive_group()
- ic.add_argument("--ipa", action='store_true', help="use IPA protocol")
- ic.add_argument("--ctrl", action='store_true', help="use CTRL protocol")
- args = p.parse_args()
- test = False
-
- log = logging.getLogger('TwistedIPA')
- log.setLevel(logging.DEBUG)
- log.addHandler(logging.StreamHandler(sys.stdout))
-
- if args.ctrl:
- if args.client:
- # Start osmo-bsc to receive TRAP messages when osmo-bts-* connects to it
- print('CTRL client, connecting to %s:%d' % (args.host, args.port))
- reactor.connectTCP(args.host, args.port, IPAFactory(CTRL, log))
- test = True
- if args.server:
- # Use bsc_control.py to issue set/get commands
- print('CTRL server, listening on port %d' % args.port)
- reactor.listenTCP(args.port, IPAFactory(CtrlServer, log))
- test = True
- if args.ipa:
- if args.client:
- # Start osmo-nitb which would initiate A-bis/IP session
- print('IPA client, connecting to %s ports %d and %d' % (args.host, IPA.TCP_PORT_OML, IPA.TCP_PORT_RSL))
- reactor.connectTCP(args.host, IPA.TCP_PORT_OML, IPAFactory(CCM, log))
- reactor.connectTCP(args.host, IPA.TCP_PORT_RSL, IPAFactory(CCM, log))
- test = True
- if args.server:
- # Start osmo-bts-* which would attempt to connect to us
- print('IPA server, listening on ports %d and %d' % (IPA.TCP_PORT_OML, IPA.TCP_PORT_RSL))
- reactor.listenTCP(IPA.TCP_PORT_RSL, IPAFactory(IPAServer, log))
- reactor.listenTCP(IPA.TCP_PORT_OML, IPAFactory(IPAServer, log))
- test = True
- if test:
- reactor.run()
- else:
- print("Please specify which protocol in which role you'd like to test.")