diff options
Diffstat (limited to 'osmopy')
-rw-r--r-- | osmopy/__init__.py | 14 | ||||
-rwxr-xr-x | osmopy/osmo_ctrl.py | 120 | ||||
-rw-r--r-- | osmopy/osmo_interact_common.py | 469 | ||||
-rwxr-xr-x | osmopy/osmo_interact_ctrl.py | 100 | ||||
-rwxr-xr-x | osmopy/osmo_interact_vty.py | 180 | ||||
-rwxr-xr-x | osmopy/osmo_verify_transcript_ctrl.py | 58 | ||||
-rwxr-xr-x | osmopy/osmo_verify_transcript_vty.py | 67 | ||||
-rw-r--r-- | osmopy/osmodumpdoc.py | 96 | ||||
-rw-r--r-- | osmopy/osmotestconfig.py | 220 | ||||
-rw-r--r-- | osmopy/osmotestvty.py | 102 | ||||
-rwxr-xr-x | osmopy/soap.py | 188 | ||||
-rwxr-xr-x | osmopy/twisted_ipa.py | 384 |
12 files changed, 2 insertions, 1996 deletions
diff --git a/osmopy/__init__.py b/osmopy/__init__.py index 3fd197f..e3bf016 100644 --- a/osmopy/__init__.py +++ b/osmopy/__init__.py @@ -1,14 +1,4 @@ #!/usr/bin/env python -__version__ = '0.0.3' +__version__ = '0.0.4' -__all__ = ['obscvty', 'osmodumpdoc', 'osmotestconfig', 'osmotestvty', - 'osmoutil', - 'osmo_ipa', - 'osmo_ctrl', - 'soap', - 'twisted_ipa', - 'osmo_interact_common', - 'osmo_interact_vty', - 'osmo_interact_ctrl', - 'osmo_verify_transcript_vty', - 'osmo_verify_transcript_ctrl'] +__all__ = ['obscvty', 'osmoutil', 'osmo_ipa'] diff --git a/osmopy/osmo_ctrl.py b/osmopy/osmo_ctrl.py deleted file mode 100755 index 2b8c4be..0000000 --- a/osmopy/osmo_ctrl.py +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env python2 -# -*- mode: python-mode; py-indent-tabs-mode: nil -*- -""" -/* - * Copyright (C) 2016 sysmocom s.f.m.c. GmbH - * - * All Rights Reserved - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ -""" - -from optparse import OptionParser -from osmopy.osmo_ipa import Ctrl -import socket - -verbose = False - -def connect(host, port): - if verbose: - print "Connecting to host %s:%i" % (host, port) - - sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sck.setblocking(1) - sck.connect((host, port)) - return sck - -def do_set_get(sck, var, value = None): - (r, c) = Ctrl().cmd(var, value) - sck.send(c) - answer = Ctrl().rem_header(sck.recv(4096)) - return (answer,) + Ctrl().verify(answer, r, var, value) - -def set_var(sck, var, val): - (a, _, _) = do_set_get(sck, var, val) - return a - -def get_var(sck, var): - (_, _, v) = do_set_get(sck, var) - return v - -def _leftovers(sck, fl): - """ - Read outstanding data if any according to flags - """ - try: - data = sck.recv(1024, fl) - except socket.error as (s_errno, strerror): - return False - if len(data) != 0: - tail = data - while True: - (head, tail) = Ctrl().split_combined(tail) - print "Got message:", Ctrl().rem_header(head) - if len(tail) == 0: - break - return True - return False - -if __name__ == '__main__': - parser = OptionParser("Usage: %prog [options] var [value]") - parser.add_option("-d", "--host", dest="host", - help="connect to HOST", metavar="HOST") - parser.add_option("-p", "--port", dest="port", type="int", - help="use PORT", metavar="PORT", default=4249) - parser.add_option("-g", "--get", action="store_true", - dest="cmd_get", help="perform GET operation") - parser.add_option("-s", "--set", action="store_true", - dest="cmd_set", help="perform SET operation") - parser.add_option("-v", "--verbose", action="store_true", - dest="verbose", help="be verbose", default=False) - parser.add_option("-m", "--monitor", action="store_true", - dest="monitor", help="monitor the connection for traps", default=False) - - (options, args) = parser.parse_args() - - verbose = options.verbose - - if options.cmd_set and options.cmd_get: - parser.error("Get and set options are mutually exclusive!") - - if not (options.cmd_get or options.cmd_set or options.monitor): - parser.error("One of -m, -g, or -s must be set") - - if not (options.host): - parser.error("Destination host and port required!") - - sock = connect(options.host, options.port) - - if options.cmd_set: - if len(args) < 2: - parser.error("Set requires var and value arguments") - _leftovers(sock, socket.MSG_DONTWAIT) - print "Got message:", set_var(sock, args[0], ' '.join(args[1:])) - - if options.cmd_get: - if len(args) != 1: - parser.error("Get requires the var argument") - _leftovers(sock, socket.MSG_DONTWAIT) - (a, _, _) = do_set_get(sock, args[0]) - print "Got message:", a - - if options.monitor: - while True: - if not _leftovers(sock, 0): - print "Connection is gone." - break - sock.close() diff --git a/osmopy/osmo_interact_common.py b/osmopy/osmo_interact_common.py deleted file mode 100644 index 5efc22d..0000000 --- a/osmopy/osmo_interact_common.py +++ /dev/null @@ -1,469 +0,0 @@ -#!/usr/bin/env python3 -# -# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de> -# All rights reserved. -# -# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -''' -Common code for osmo_interact_vty.py and osmo_interact_ctrl.py. -This implements all of application interaction, piping and verification. -osmo_interact_{vty,ctrl}.py plug VTY and CTRL interface specific bits. -''' - -import argparse -import sys -import os -import subprocess -import time -import traceback -import socket -import shlex -import re - - -class Interact: - - class StepBase: - command = None - result = None - leading_blanks = None - - def __init__(self): - self.result = [] - - def verify_interact_state(self, interact_instance): - # for example to verify that the last VTY prompt received shows the - # right node. - pass - - def command_str(self, interact_instance=None): - return self.command - - def __str__(self): - return '%s\n%s' % (self.command_str(), '\n'.join(self.result)) - - @staticmethod - def is_next_step(line, interact_instance): - assert not "implemented by InteractVty.VtyStep and InteractCtrl.CtrlStep" - - socket = None - - def __init__(self, step_class, port, host, verbose=False, update=False): - ''' - host is the hostname to connect to. - port is the CTRL port to connect on. - ''' - self.Step = step_class - self.port = port - self.host = host - self.verbose = verbose - self.update = update - - if not port: - raise Exception("You need to provide port number to connect to") - - def connect(self): - assert self.socket is None - retries = 30 - took = 0 - while True: - took += 1 - try: - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setblocking(1) - self.socket.connect((self.host, int(self.port))) - except IOError: - retries -= 1 - if retries <= 0: - raise - time.sleep(.1) - continue - break - - def close(self): - if self.socket is None: - return - self.socket.close() - self.socket = None - - def command(self, command): - assert not "implemented separately by InteractVty and InteractCtrl" - - def verify_transcript_file(self, transcript_file): - with open(transcript_file, 'r') as f: - content = f.read() - - try: - result = self.verify_transcript(content) - except: - print('Error while verifying transcript file %r' % transcript_file, file=sys.stderr) - sys.stderr.flush() - raise - - if not self.update: - return - content = '\n'.join(result) - with open(transcript_file, 'w') as f: - f.write(content) - - def verify_transcript(self, transcript): - '''' - transcript is a "screenshot" of a session, a multi-line string - including commands and expected results. - Feed commands to self.command() and verify the expected results. - ''' - - # parse steps - steps = [] - step = None - blank_lines = 0 - for line in transcript.splitlines(): - if not line: - blank_lines += 1 - continue - next_step_started = self.Step.is_next_step(line, self) - if next_step_started: - if step: - steps.append(step) - step = next_step_started - step.leading_blanks = blank_lines - blank_lines = 0 - elif step: - # we only count blank lines directly preceding the start of a - # next step. Insert blank lines in the middle of a response - # back into the response: - if blank_lines: - step.result.extend([''] * blank_lines) - blank_lines = 0 - step.result.append(line) - if step: - steps.append(step) - step = None - - actual_result = [] - - # run steps - step_nr = 0 - for step in steps: - step_nr += 1 - try: - if self.verbose: - if step.leading_blanks: - print('\n' * step.leading_blanks, end='') - print(step.command_str()) - sys.stdout.flush() - - step.verify_interact_state(self) - - res = self.command(step.command) - - if self.verbose: - sys.stderr.flush() - sys.stdout.flush() - print('\n'.join(res)) - sys.stdout.flush() - - if step.leading_blanks: - actual_result.extend([''] * step.leading_blanks) - actual_result.append(step.command_str(self)) - - match_result = self.match_lines(step.result, res) - - if self.update: - if match_result is True: - # preserve any wildcards - actual_result.extend(step.result) - else: - # mismatch, take exactly what came in - actual_result.extend(res) - continue - if match_result is not True: - raise Exception('Result mismatch:\n%s\n\nExpected:\n[\n%s\n]\n\nGot:\n[\n%s\n%s\n]' - % (match_result, step, step.command_str(), '\n'.join(res))) - except: - print('Error during transcript step %d:\n[\n%s\n]' % (step_nr, step), - file=sys.stderr) - sys.stderr.flush() - raise - - # final line ending - actual_result.append('') - return actual_result - - @staticmethod - def match_lines(expect, got): - ''' - Match two lists of strings, allowing certain wildcards: - - In 'expect', if a line is exactly '...', it matches any number of - arbitrary lines in 'got'; the implementation is trivial and skips - lines to the first occurence in 'got' that continues after '...'. - - If an 'expect' line is '... !regex', it matches any number of - lines like '...', but the given regex must not match any of those - lines. - - Return 'True' on match, or a string describing the mismatch. - ''' - def match_line(expect_line, got_line): - return expect_line == got_line - - ANY = '...' - ANY_EXCEPT = '... !' - - e = 0 - g = 0 - while e < len(expect): - if expect[e] == ANY or expect[e].startswith(ANY_EXCEPT): - wildcard = expect[e] - e += 1 - g_end = g - - if e >= len(expect): - # anything left in 'got' is accepted. - g_end = len(got) - - # look for the next occurence of the expected line in 'got' - while g_end < len(got) and not match_line(expect[e], got[g_end]): - g_end += 1 - - if wildcard == ANY: - # no restrictions on lines - g = g_end - - elif wildcard.startswith(ANY_EXCEPT): - except_re = re.compile(wildcard[len(ANY_EXCEPT):]) - while g < g_end: - if except_re.search(got[g]): - return ('Got forbidden line for wildcard %r:' - ' did not expect %r in line %d of response' - % (wildcard, got[g], g)) - g += 1 - - continue - - if g >= len(got): - return 'Cannot find line %r' % expect[e] - - if not match_line(expect[e], got[g]): - return 'Mismatch:\nExpect:\n%r\nGot:\n%r' % (expect[e], got[g]) - - e += 1 - g += 1 - - if g < len(got): - return 'Did not expect line %r' % got[g] - return True - - def feed_commands(self, output, command_strs): - for command_str in command_strs: - for command in command_str.splitlines(): - res = self.command(command) - output.write('\n'.join(res)) - output.write('\n') - -def end_process(proc, quiet=False): - if not proc: - return - - rc = proc.poll() - if rc is not None: - if not quiet: - print('Process has already terminated with', rc) - proc.wait() - return - - proc.terminate() - time_to_wait_for_term = 5 - wait_step = 0.001 - waited_time = 0 - while True: - # poll returns None if proc is still running - if proc.poll() is not None: - break - waited_time += wait_step - # make wait_step approach 1.0 - wait_step = (1. + 5. * wait_step) / 6. - if waited_time >= time_to_wait_for_term: - break - time.sleep(wait_step) - - if proc.poll() is None: - # termination seems to be slower than that, let's just kill - proc.kill() - if not quiet: - print("Killed child process") - elif waited_time > .002: - if not quiet: - print("Terminating took %.3fs" % waited_time) - proc.wait() - -class Application: - proc = None - _devnull = None - - @staticmethod - def devnull(): - if Application._devnull is None: - Application._devnull = open(os.devnull, 'w') - return Application._devnull - - def __init__(self, run_app_str, purge_output=True, quiet=False): - self.command_tuple = shlex.split(run_app_str) - self.purge_output = purge_output - self.quiet = quiet - - def run(self): - out_err = None - if self.purge_output: - out_err = Application.devnull() - - if not self.quiet: - print('Launching: cd %r; %s' % (os.getcwd(), ' '.join(self.command_tuple))) - self.proc = subprocess.Popen(self.command_tuple, stdout=out_err, stderr=out_err) - - def stop(self): - end_process(self.proc, self.quiet) - -def verify_application(run_app_str, interact, transcript_file, verbose): - passed = None - application = None - - sys.stdout.flush() - sys.stderr.flush() - - if run_app_str: - application = Application(run_app_str, purge_output=not verbose) - application.run() - - try: - interact.connect() - interact.verify_transcript_file(transcript_file) - passed = True - except: - traceback.print_exc() - passed = False - interact.close() - - if application: - application.stop() - - sys.stdout.flush() - sys.stderr.flush() - - return passed - -def common_parser(): - parser = argparse.ArgumentParser() - parser.add_argument('-r', '--run', dest='run_app_str', - help='command to run to launch application to test,' - ' including command line arguments. If omitted, no' - ' application is launched.') - parser.add_argument('-p', '--port', dest='port', - help="Port to reach the application at.") - parser.add_argument('-H', '--host', dest='host', default='localhost', - help="Host to reach the application at.") - return parser - -def parser_add_verify_args(parser): - parser.add_argument('-u', '--update', dest='update', action='store_true', - help='Do not verify, but OVERWRITE transcripts based on' - ' the application\'s current behavior. OVERWRITES TRANSCRIPT' - ' FILES.') - parser.add_argument('-v', '--verbose', action='store_true', - help='Print commands and application output') - parser.add_argument('transcript_files', nargs='*', help='transcript file(s) to verify') - return parser - -def parser_add_run_args(parser): - parser.add_argument('-O', '--output', dest='output_path', - help="Write command results to a file instead of stdout." - "('-O -' writes to stdout and is the default)") - parser.add_argument('-c', '--command', dest='cmd_str', - help="Run this command (before reading input files, if any)." - " multiple commands may be separated by ';'") - parser.add_argument('cmd_files', nargs='*', help='file(s) with plain commands to run') - return parser - -def main_run_commands(run_app_str, output_path, cmd_str, cmd_files, interact): - to_stdout = False - if not output_path or output_path == '-': - to_stdout = True - output = sys.stdout - else: - output = open(output_path, 'w') - - application = None - - if run_app_str: - application = Application(run_app_str, quiet=to_stdout) - application.run() - - try: - interact.connect() - - if cmd_str: - interact.feed_commands(output, cmd_str.split(';')) - - for f_path in (cmd_files or []): - with open(f_path, 'r') as f: - interact.feed_commands(output, f.read().decode('utf-8').splitlines()) - - if not (cmd_str or cmd_files): - while True: - line = sys.stdin.readline() - if not line: - break; - interact.feed_commands(output, line.split(';')) - except: - traceback.print_exc() - finally: - if not to_stdout: - try: - output.close() - except: - traceback.print_exc() - - try: - interact.close() - except: - traceback.print_exc() - - if application: - try: - application.stop() - except: - traceback.print_exc() - -def main_verify_transcripts(run_app_str, transcript_files, interact, verbose): - results = [] - for t in transcript_files: - passed = verify_application(run_app_str=run_app_str, - interact=interact, - transcript_file=t, - verbose=verbose) - results.append((passed, t)) - - print('\nRESULTS:') - all_passed = True - for passed, t in results: - print('%s: %s' % ('pass' if passed else 'FAIL', t)) - all_passed = all_passed and passed - print() - - if not all_passed: - sys.exit(1) - -# vim: tabstop=4 shiftwidth=4 expandtab nocin ai diff --git a/osmopy/osmo_interact_ctrl.py b/osmopy/osmo_interact_ctrl.py deleted file mode 100755 index 9b1a20b..0000000 --- a/osmopy/osmo_interact_ctrl.py +++ /dev/null @@ -1,100 +0,0 @@ -#!/usr/bin/env python3 -# -# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de> -# All rights reserved. -# -# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -''' -Run CTRL commands or test transcripts against a given application. Commandline -invocation exposes only direct command piping, the transcript verification code -is exposed as commandline args by osmo_verify_transcript_ctrl.py. -''' - -import re - -from osmopy.osmo_interact_common import * -from osmopy.osmo_ipa import Ctrl, IPA - -class InteractCtrl(Interact): - next_id = 1 - keep_ids = True - re_command = re.compile('^(SET|GET) ([^ ]*) (.*)$') - - class CtrlStep(Interact.StepBase): - - @staticmethod - def is_next_step(line, interact_instance): - m = InteractCtrl.re_command.match(line) - if not m: - return None - next_step = InteractCtrl.CtrlStep() - - set_get = m.group(1) - cmd_id = m.group(2) - var_val = m.group(3) - if not interact_instance.keep_ids: - cmd_id = interact_instance.next_id - interact_instance.next_id += 1 - next_step.command = '%s %s %s' % (set_get, cmd_id, var_val) - - return next_step - - def __init__(self, port, host, verbose=False, update=False, keep_ids=True): - if not update: - keep_ids = True - self.keep_ids = keep_ids - super().__init__(InteractCtrl.CtrlStep, port=port, host=host, verbose=verbose, update=update) - - def connect(self): - self.next_id = 1 - super().connect() - - def send(self, data): - data = Ctrl().add_header(data) - return self.socket.send(data) == len(data) - - def receive(self): - responses = [] - data = self.socket.recv(4096) - while (len(data)>0): - (response_with_header, data) = IPA().split_combined(data) - response = Ctrl().rem_header(response_with_header) - responses.append(response.decode('utf-8')) - return responses - - def command(self, command): - assert self.send(command) - res = self.receive() - split_responses = [] - for r in res: - split_responses.extend(r.splitlines()) - sys.stdout.flush() - sys.stderr.flush() - return split_responses - -if __name__ == '__main__': - parser = common_parser() - parser_add_run_args(parser) - args = parser.parse_args() - - interact = InteractCtrl(args.port, args.host, verbose=False, update=False, - keep_ids=True) - - main_run_commands(args.run_app_str, args.output_path, args.cmd_str, - args.cmd_files, interact) - -# vim: tabstop=4 shiftwidth=4 expandtab nocin ai diff --git a/osmopy/osmo_interact_vty.py b/osmopy/osmo_interact_vty.py deleted file mode 100755 index b57cd8c..0000000 --- a/osmopy/osmo_interact_vty.py +++ /dev/null @@ -1,180 +0,0 @@ -#!/usr/bin/env python3 -# -# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de> -# All rights reserved. -# -# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -''' -Run VTY commands or test transcripts against a given application. Commandline -invocation exposes only direct command piping, the transcript verification code -is exposed as commandline args by osmo_verify_transcript_vty.py. -''' - -import re - -from osmopy.osmo_interact_common import * - -class InteractVty(Interact): - - class VtyStep(Interact.StepBase): - expect_node = None # e.g. '(config-net)' - expect_prompt_char = None # '>' or '#' - - def __init__(self, prompt): - super().__init__() - self.prompt = prompt - - def verify_interact_state(self, interact_instance): - if interact_instance.update: - return - if interact_instance.this_node != self.expect_node: - raise Exception('Mismatch: expected VTY node %r in the prompt, got %r' - % (self.expect_node, interact_instance.this_node)) - if interact_instance.this_prompt_char != self.expect_prompt_char: - raise Exception('Mismatch: expected VTY prompt character %r, got %r' - % (self.expect_prompt_char, interact_instance.this_prompt_char)) - - @staticmethod - def is_next_step(line, interact_instance): - m = interact_instance.re_prompt.match(line) - if not m: - return None - next_step = InteractVty.VtyStep(interact_instance.prompt) - next_step.expect_node = m.group(1) - next_step.expect_prompt_char = m.group(2) - next_step.command = m.group(3) - return next_step - - def command_str(self, interact_instance=None): - if interact_instance is None: - node = self.expect_node - prompt_char = self.expect_prompt_char - else: - node = interact_instance.last_node - prompt_char = interact_instance.last_prompt_char - if node: - node = '(%s)' % node - node = node or '' - return '%s%s%s %s' % (self.prompt, node, prompt_char, self.command) - - def __init__(self, prompt, port, host, verbose, update): - self.prompt = prompt - super().__init__(InteractVty.VtyStep, port, host, verbose, update) - - def connect(self): - self.this_node = None - self.this_prompt_char = '>' # slight cheat for initial prompt char - self.last_node = None - self.last_prompt_char = None - - super().connect() - # receive the first welcome message and discard - data = self.socket.recv(4096) - if not self.prompt: - b = data - b = b[b.rfind(b'\n') + 1:] - while b and (b[0] < ord('A') or b[0] > ord('z')): - b = b[1:] - prompt_str = b.decode('utf-8') - if '>' in prompt_str: - self.prompt = prompt_str[:prompt_str.find('>')] - if not self.prompt: - raise Exception('Could not find application name; needed to decode prompts.' - ' Initial data was: %r' % data) - self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt) - - def _command(self, command_str, timeout=10): - self.socket.send(command_str.encode()) - - waited_since = time.time() - received_lines = [] - last_line = '' - - while True: - new_data = self.socket.recv(4096).decode('utf-8') - - last_line = "%s%s" % (last_line, new_data) - - if last_line: - lines = last_line.splitlines() - received_lines.extend(lines[:-1]) - last_line = lines[-1] - - match = self.re_prompt.match(last_line) - if not match: - if time.time() - waited_since > timeout: - raise IOError("Failed to read data (did the app crash?)") - time.sleep(.1) - continue - - self.last_node = self.this_node - self.last_prompt_char = self.this_prompt_char - self.this_node = match.group(1) or None - self.this_prompt_char = match.group(2) - break - - # expecting to have received the command we sent as echo, remove it - clean_command_str = command_str.strip() - if clean_command_str.endswith('?'): - clean_command_str = clean_command_str[:-1] - if received_lines and received_lines[0] == clean_command_str: - received_lines = received_lines[1:] - return received_lines - - def command(self, command_str, timeout=10): - command_str = command_str or '\r' - if command_str[-1] not in '?\r\t': - command_str = command_str + '\r' - - received_lines = self._command(command_str, timeout) - - # send escape to cancel the '?' command line - if command_str[-1] == '?': - self._command('\x03', timeout) - - return received_lines - -def parser_add_vty_args(parser): - parser.add_argument('-n', '--prompt-name', dest='prompt', - help="Name used in application's telnet VTY prompt." - " If omitted, will attempt to determine the name from" - " the initial VTY prompt.") - return parser - -if __name__ == '__main__': - parser = common_parser() - parser_add_vty_args(parser) - parser_add_run_args(parser) - parser.add_argument('-X', '--gen-xml-ref', dest='gen_xml', action='store_true', - help="Equivalent to '-c \"show online-help\" -O -'," - " can be used to generate the VTY reference file as" - " required by osmo-gsm-manuals.git.") - args = parser.parse_args() - - if args.gen_xml: - if args.cmd_str: - raise Exception('It does not make sense to pass both --command and' - ' --gen-xml-ref.') - args.cmd_str = 'show online-help' - - interact = InteractVty(args.prompt, args.port, args.host, - verbose=False, update=False) - - main_run_commands(args.run_app_str, args.output_path, args.cmd_str, - args.cmd_files, interact) - -# vim: tabstop=4 shiftwidth=4 expandtab nocin ai diff --git a/osmopy/osmo_verify_transcript_ctrl.py b/osmopy/osmo_verify_transcript_ctrl.py deleted file mode 100755 index 3afbc62..0000000 --- a/osmopy/osmo_verify_transcript_ctrl.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python3 -# -# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de> -# All rights reserved. -# -# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -''' -Run CTRL test transcripts against a given application. - -A CTRL transcript contains CTRL commands and their expected results. -It looks like: - -" -SET 1 var val -SET_REPLY 1 var OK -GET 2 var -GET_REPLY 2 var val -" - -The application to be tested is described by -- a binary to run, -- command line arguments to pass to the binary, -- the CTRL port. - -This module can either be run directly to run or update a given CTRL transcript, -or it can be imported as a module to run more complex setups. -''' - -from osmopy.osmo_interact_ctrl import * - -if __name__ == '__main__': - parser = common_parser() - parser_add_verify_args(parser) - parser.add_argument('-i', '--keep-ids', dest='keep_ids', action='store_true', - help='With --update, default is to overwrite the command IDs' - ' so that they are consecutive numbers starting from 1.' - ' With --keep-ids, do not change these command IDs.') - args = parser.parse_args() - - interact = InteractCtrl(args.port, args.host, args.verbose, args.update, args.keep_ids) - - main_verify_transcripts(args.run_app_str, args.transcript_files, interact, args.verbose) - -# vim: tabstop=4 shiftwidth=4 expandtab nocin ai diff --git a/osmopy/osmo_verify_transcript_vty.py b/osmopy/osmo_verify_transcript_vty.py deleted file mode 100755 index e70c36c..0000000 --- a/osmopy/osmo_verify_transcript_vty.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -# -# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de> -# All rights reserved. -# -# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -''' -Run VTY test transcripts against a given application. - -A VTY transcript contains VTY commands and their expected results. -It looks like: - -" -OsmoHLR> enable - -OsmoHLR# subscriber show imsi 123456789023000 -% No subscriber for imsi = '123456789023000' -OsmoHLR# subscriber show msisdn 12345 -% No subscriber for msisdn = '12345' - -OsmoHLR# subscriber create imsi 123456789023000 -% Created subscriber 123456789023000 - ID: 1 - IMSI: 123456789023000 - MSISDN: none - No auth data -" - -The application to be tested is described by -- a binary to run, -- command line arguments to pass to the binary, -- the VTY telnet port, -- the application name as printed in the VTY prompt. - -This module can either be run directly to run or update a given VTY transcript, -or it can be imported as a module to run more complex setups. -''' - -import re - -from osmopy.osmo_interact_vty import * - -if __name__ == '__main__': - parser = common_parser() - parser_add_vty_args(parser) - parser_add_verify_args(parser) - args = parser.parse_args() - - interact = InteractVty(args.prompt, args.port, args.host, args.verbose, args.update) - - main_verify_transcripts(args.run_app_str, args.transcript_files, interact, args.verbose) - -# vim: tabstop=4 shiftwidth=4 expandtab nocin ai diff --git a/osmopy/osmodumpdoc.py b/osmopy/osmodumpdoc.py deleted file mode 100644 index 2464b05..0000000 --- a/osmopy/osmodumpdoc.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python - -# Make sure this code is in sync with the BTS directory. -# Fixes may need to be applied to both. - -"""Start the process and dump the documentation to the doc dir.""" - -import subprocess -import time -import os -import sys - -import osmopy.obscvty as obscvty -import osmopy.osmoutil as osmoutil - - -def dump_doc(name, port, filename): - vty = obscvty.VTYInteract(name, "127.0.0.1", port) - xml = vty.command("show online-help") - # Now write everything until the end to the file - out = open(filename, 'w') - out.write(xml) - out.close() - print 'generated %r' % filename - - -"""Dump the config of all the apps. - -Returns the number of apps configs could not be dumped for.""" - - -def dump_configs(apps, configs, confpath): - failures = 0 - successes = 0 - - try: # make sure the doc directory exists - os.mkdir('doc') - except OSError: # it probably does - pass - - for app in apps: - appname = app[3] - print "Starting app for %s" % appname - proc = None - cmd = [app[1], "-c", os.path.join(confpath, configs[appname][0])] - print 'cd', os.path.abspath(os.path.curdir), ';', ' '.join(cmd) - try: - proc = subprocess.Popen(cmd, stdin=None, stdout=None) - except OSError as e: # Probably a missing binary - print >> sys.stderr, e - print >> sys.stderr, "Skipping app %s" % appname - failures += 1 - else: - try: - dump_doc(app[2], app[0], 'doc/%s_vty_reference.xml' % appname) - successes += 1 - except IOError: # Generally a socket issue - print >> sys.stderr, "%s: couldn't connect, skipping" % appname - failures += 1 - finally: - osmoutil.end_proc(proc) - - return (failures, successes) - - -if __name__ == '__main__': - import argparse - - confpath = "." - workdir = "." - - parser = argparse.ArgumentParser() - parser.add_argument("-p", "--pythonconfpath", dest="p", - help="searchpath for config (osmoappdesc)") - parser.add_argument("-w", "--workdir", dest="w", - help="Working directory to run in") - args = parser.parse_args() - - if args.p: - confpath = args.p - - if args.w: - workdir = args.w - - osmoappdesc = osmoutil.importappconf_or_quit( - confpath, "osmoappdesc", args.p) - - confpath = os.path.relpath(confpath, workdir) - os.chdir(workdir) - num_fails, num_sucs = dump_configs( - osmoappdesc.apps, osmoappdesc.app_configs, confpath) - if num_fails > 0: - print >> sys.stderr, "Warning: Skipped %s apps" % num_fails - if 0 == num_sucs: - print >> sys.stderr, "Nothing run, wrong working dir? Set with -w" - sys.exit(num_fails) diff --git a/osmopy/osmotestconfig.py b/osmopy/osmotestconfig.py deleted file mode 100644 index 2132c43..0000000 --- a/osmopy/osmotestconfig.py +++ /dev/null @@ -1,220 +0,0 @@ -#!/usr/bin/env python - -# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com> -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import os.path -import time -import sys, shutil, stat -import tempfile - -import osmopy.obscvty as obscvty -import osmopy.osmoutil as osmoutil - - -# Return true iff all the tests for the given config pass -def test_config(app_desc, config, tmpdir, verbose=True): - try: - err = 0 - if test_config_atest(app_desc, config, verify_doc, verbose)[0] > 0: - err += 1 - - newconfig = copy_config(tmpdir, config) - if test_config_atest(app_desc, newconfig, write_config, verbose) > 0: - err += 1 - - if test_config_atest(app_desc, newconfig, token_vty_command, verbose) > 0: - err += 1 - - return err - - # If there's a socket error, skip the rest of the tests for this config - except IOError: - return 1 - - -def test_config_atest(app_desc, config, run_test, verbose=True): - proc = None - ret = None - vty = None - try: - cmd = app_desc[1].split(' ') + [ "-c", config] - if verbose: - print "Verifying %s, test %s" % (' '.join(cmd), run_test.__name__) - - proc = osmoutil.popen_devnull(cmd) - end = app_desc[2] - port = app_desc[0] - vty = obscvty.VTYInteract(end, "127.0.0.1", port) - ret = run_test(vty) - - except IOError as se: - print >> sys.stderr, "Failed to verify %s" % ' '.join(cmd) - print >> sys.stderr, "Current directory: %s" % os.getcwd() - print >> sys.stderr, "Error was %s" % se - print >> sys.stderr, "Config was\n%s" % open(config).read() - raise se - - finally: - if proc: - osmoutil.end_proc(proc) - if vty: - vty._close_socket() - - return ret - -def copy_config(dirname, config): - shutil.rmtree(dirname, True) - ign = shutil.ignore_patterns('*.cfg') - shutil.copytree(os.path.dirname(config), dirname, ignore=ign) - os.chmod(dirname, stat.S_IRWXU) - - try: - os.stat(dirname) - except OSError: - os.mkdir(dirname) - - prefix = os.path.basename(config) - tmpfile = tempfile.NamedTemporaryFile( - dir=dirname, prefix=prefix, delete=False) - tmpfile.write(open(config).read()) - tmpfile.close() - # This works around the precautions NamedTemporaryFile is made for... - return tmpfile.name - - -def write_config(vty): - new_config = vty.enabled_command("write") - if not new_config.startswith("Configuration saved to "): - print(new_config) - return 1, [new_config] - return 0 - - -# The only purpose of this function is to verify a working vty -def token_vty_command(vty): - vty.command("help") - return 0 - - -# This may warn about the same doc missing multiple times, by design -def verify_doc(vty): - xml = vty.command("show online-help") - split_at = "<command" - all_errs = [] - for command in xml.split(split_at): - if "(null)" in command: - lines = command.split("\n") - cmd_line = split_at + lines[0] - err_lines = [] - for line in lines: - if '(null)' in line: - err_lines.append(line) - - all_errs.append(err_lines) - - print >> sys.stderr, \ - "Documentation error (missing docs): \n%s\n%s\n" % ( - cmd_line, '\n'.join(err_lines)) - - return (len(all_errs), all_errs) - - -# Skip testing the configurations of anything that hasn't been compiled -def app_exists(app_desc): - cmd = app_desc[1].split(' ')[0] - return os.path.exists(cmd) - - -def remove_tmpdir(tmpdir): - files = os.listdir(tmpdir) - for f in files: - os.unlink(os.path.join(tmpdir, f)) - os.rmdir(tmpdir) - - -def check_configs_tested(basedir, app_configs, ignore_configs): - configs = [] - for root, dirs, files in os.walk(basedir): - for f in files: - if f.endswith(".cfg") and f not in ignore_configs: - configs.append(os.path.join(root, f)) - for config in configs: - found = False - for app in app_configs: - if config in app_configs[app]: - found = True - if not found: - print >> sys.stderr, "Warning: %s is not being tested" % config - - -def test_all_apps(apps, app_configs, tmpdir="writtenconfig", verbose=True, - confpath=".", rmtmp=False, ignore_configs=[]): - check_configs_tested("doc/examples/", app_configs, ignore_configs) - errors = 0 - for app in apps: - if not app_exists(app): - print >> sys.stderr, "Skipping app %s (not found)" % app[1] - continue - - configs = app_configs[app[3]] - for config in configs: - config = os.path.join(confpath, config) - errors += test_config(app, config, tmpdir, verbose) - - if rmtmp or not errors: - remove_tmpdir(tmpdir) - - if errors: - print >> sys.stderr, "ERRORS: %d" % errors - return errors - - -if __name__ == '__main__': - import argparse - - confpath = "." - wordir = "." - - parser = argparse.ArgumentParser() - parser.add_argument("--e1nitb", action="store_true", dest="e1nitb") - parser.add_argument("-v", "--verbose", dest="verbose", - action="store_true", help="verbose mode") - parser.add_argument("-p", "--pythonconfpath", dest="p", - help="searchpath for config") - parser.add_argument("-w", "--workdir", dest="w", - help="Working directory to run in") - - args = parser.parse_args() - - if args.p: - confpath = args.p - - if args.w: - workdir = args.w - - osmoappdesc = osmoutil.importappconf_or_quit(confpath, "osmoappdesc", - args.p) - - apps = osmoappdesc.apps - configs = osmoappdesc.app_configs - ignores = getattr(osmoappdesc, 'ignore_configs', []) - - if args.e1nitb: - configs['nitb'].extend(osmoappdesc.nitb_e1_configs) - - os.chdir(workdir) - sys.exit(test_all_apps(apps, configs, ignore_configs=ignores, - confpath=confpath, verbose=args.verbose)) diff --git a/osmopy/osmotestvty.py b/osmopy/osmotestvty.py deleted file mode 100644 index e513c05..0000000 --- a/osmopy/osmotestvty.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python - -# (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com> -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import time -import unittest - -import osmopy.obscvty as obscvty -import osmopy.osmoutil as osmoutil - -confpath = '.' - -"""Test a VTY. Warning: osmoappdesc must be imported first.""" - - -class TestVTY(unittest.TestCase): - - def setUp(self): - osmo_vty_cmd = osmoappdesc.vty_command[:] - config_index = osmo_vty_cmd.index('-c') - if config_index: - cfi = config_index + 1 - osmo_vty_cmd[cfi] = os.path.join(confpath, osmo_vty_cmd[cfi]) - - try: - print "Launch: %s from %s" % (' '.join(osmo_vty_cmd), os.getcwd()) - self.proc = osmoutil.popen_devnull(osmo_vty_cmd) - except OSError: - print >> sys.stderr, "Current directory: %s" % os.getcwd() - print >> sys.stderr, "Consider setting -b" - - appstring = osmoappdesc.vty_app[2] - appport = osmoappdesc.vty_app[0] - self.vty = obscvty.VTYInteract(appstring, "127.0.0.1", appport) - - def tearDown(self): - self.vty._close_socket() - self.vty = None - osmoutil.end_proc(self.proc) - - def test_history(self): - t1 = "show version" - self.vty.command(t1) - test_str = "show history" - assert(self.vty.w_verify(test_str, [t1])) - - def test_unknown_command(self): - test_str = "help show" - assert(self.vty.verify(test_str, ['% Unknown command.'])) - - def test_terminal_length(self): - test_str = "terminal length 20" - assert(self.vty.verify(test_str, [''])) - - -if __name__ == '__main__': - import argparse - import os - import sys - - workdir = '.' - - parser = argparse.ArgumentParser() - parser.add_argument("-v", "--verbose", dest="verbose", - action="store_true", help="verbose mode") - parser.add_argument("-p", "--pythonconfpath", dest="p", - help="searchpath for config") - parser.add_argument("-w", "--workdir", dest="w", - help="Working directory") - args = parser.parse_args() - - verbose_level = 1 - if args.verbose: - verbose_level = 2 - - if args.w: - workdir = args.w - - if args.p: - confpath = args.p - osmoappdesc = osmoutil.importappconf_or_quit(confpath, "osmoappdesc", - args.p) - - print "confpath %s, workdir %s" % (confpath, workdir) - os.chdir(workdir) - print "Running tests for specific VTY commands" - suite = unittest.TestLoader().loadTestsFromTestCase(TestVTY) - res = unittest.TextTestRunner(verbosity=verbose_level).run(suite) - sys.exit(len(res.errors) + len(res.failures)) diff --git a/osmopy/soap.py b/osmopy/soap.py deleted file mode 100755 index f1da8f2..0000000 --- a/osmopy/soap.py +++ /dev/null @@ -1,188 +0,0 @@ -#!/usr/bin/python3 -# -*- mode: python-mode; py-indent-tabs-mode: nil -*- -""" -/* - * Copyright (C) 2016 sysmocom s.f.m.c. GmbH - * - * All Rights Reserved - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ -""" - -__version__ = "0.7.1" # bump this on every non-trivial change - -from twisted.internet import defer, reactor -from twisted_ipa import CTRL, IPAFactory, __version__ as twisted_ipa_version -from osmopy.osmo_ipa import Ctrl -from treq import post, collect -from suds.client import Client -from functools import partial -from distutils.version import StrictVersion as V # FIXME: use NormalizedVersion from PEP-386 when available -import argparse, datetime, signal, sys, os, logging, logging.handlers - -# we don't support older versions of TwistedIPA module -assert V(twisted_ipa_version) > V('0.4') - -# keys from OpenBSC openbsc/src/libbsc/bsc_rf_ctrl.c, values SOAP-specific -oper = { 'inoperational' : 0, 'operational' : 1 } -admin = { 'locked' : 0, 'unlocked' : 1 } -policy = { 'off' : 0, 'on' : 1, 'grace' : 2, 'unknown' : 3 } - -# keys from OpenBSC openbsc/src/libbsc/bsc_vty.c -fix = { 'invalid' : 0, 'fix2d' : 1, 'fix3d' : 1 } # SOAP server treats it as boolean but expects int - - -def handle_reply(p, f, log, r): - """ - Reply handler: takes function p to process raw SOAP server reply r, function f to run for each command and verbosity flag v - """ - repl = p(r) # result is expected to have both commands[] array and error string (could be None) - bsc_id = repl.commands[0].split()[0].split('.')[3] # we expect 1st command to have net.0.bsc.666.bts.2.trx.1 location prefix format - log.info("Received SOAP response for BSC %s with %d commands, error status: %s" % (bsc_id, len(repl.commands), repl.error)) - log.debug("BSC %s commands: %s" % (bsc_id, repl.commands)) - for t in repl.commands: # Process OpenBscCommands format from .wsdl - (_, m) = Ctrl().cmd(*t.split()) - f(m) - - -class Trap(CTRL): - """ - TRAP handler (agnostic to factory's client object) - """ - def ctrl_TRAP(self, data, op_id, v): - """ - Parse CTRL TRAP and dispatch to appropriate handler after normalization - """ - (l, r) = v.split() - loc = l.split('.') - t_type = loc[-1] - p = partial(lambda a, i: a[i] if len(a) > i else None, loc) # parse helper - method = getattr(self, 'handle_' + t_type.replace('-', ''), lambda: "Unhandled %s trap" % t_type) - method(p(1), p(3), p(5), p(7), r) # we expect net.0.bsc.666.bts.2.trx.1 format for trap prefix - - def ctrl_SET_REPLY(self, data, _, v): - """ - Debug log for replies to our commands - """ - self.factory.log.debug('SET REPLY %s' % v) - - def ctrl_ERROR(self, data, op_id, v): - """ - We want to know if smth went wrong - """ - self.factory.log.debug('CTRL ERROR [%s] %s' % (op_id, v)) - - def connectionMade(self): - """ - Logging wrapper, calling super() is necessary not to break reconnection logic - """ - self.factory.log.info("Connected to CTRL@%s:%d" % (self.factory.host, self.factory.port)) - super(CTRL, self).connectionMade() - - @defer.inlineCallbacks - def handle_locationstate(self, net, bsc, bts, trx, data): - """ - Handle location-state TRAP: parse trap content, build SOAP context and use treq's routines to post it while setting up async handlers - """ - (ts, fx, lat, lon, height, opr, adm, pol, mcc, mnc) = data.split(',') - tstamp = datetime.datetime.fromtimestamp(float(ts)).isoformat() - self.factory.log.debug('location-state@%s.%s.%s.%s (%s) [%s/%s] => %s' % (net, bsc, bts, trx, tstamp, mcc, mnc, data)) - ctx = self.factory.client.registerSiteLocation(bsc, float(lon), float(lat), fix.get(fx, 0), tstamp, oper.get(opr, 2), admin.get(adm, 2), policy.get(pol, 3)) - d = post(self.factory.location, ctx.envelope) - d.addCallback(collect, partial(handle_reply, ctx.process_reply, self.transport.write, self.factory.log)) # treq's collect helper is handy to get all reply content at once using closure on ctx - d.addErrback(lambda e, bsc: self.factory.log.critical("HTTP POST error %s while trying to register BSC %s" % (e, bsc)), bsc) # handle HTTP errors - # Ensure that we run only limited number of requests in parallel: - yield self.factory.semaphore.acquire() - yield d # we end up here only if semaphore is available which means it's ok to fire the request without exceeding the limit - self.factory.semaphore.release() - - def handle_notificationrejectionv1(self, net, bsc, bts, trx, data): - """ - Handle notification-rejection-v1 TRAP: just an example to show how more message types can be handled - """ - self.factory.log.debug('notification-rejection-v1@bsc-id %s => %s' % (bsc, data)) - - -class TrapFactory(IPAFactory): - """ - Store SOAP client object so TRAP handler can use it for requests - """ - location = None - log = None - semaphore = None - client = None - host = None - port = None - def __init__(self, host, port, proto, semaphore, log, wsdl=None, location=None): - self.host = host # for logging only, - self.port = port # seems to be no way to get it from ReconnectingClientFactory - self.log = log - self.semaphore = semaphore - soap = Client(wsdl, location=location, nosend=True) # make async SOAP client - self.location = location.encode() if location else soap.wsdl.services[0].ports[0].location # necessary for dispatching HTTP POST via treq - self.client = soap.service - level = self.log.getEffectiveLevel() - self.log.setLevel(logging.WARNING) # we do not need excessive debug from lower levels - super(TrapFactory, self).__init__(proto, self.log) - self.log.setLevel(level) - self.log.debug("Using IPA %s, SUDS client: %s" % (Ctrl.version, soap)) - - -def reloader(path, script, log, dbg1, dbg2, signum, _): - """ - Signal handler: we have to use execl() because twisted's reactor is not restartable due to some bug in twisted implementation - """ - log.info("Received Signal %d - restarting..." % signum) - if signum == signal.SIGUSR1 and dbg1 not in sys.argv and dbg2 not in sys.argv: - sys.argv.append(dbg1) # enforce debug - if signum == signal.SIGUSR2 and (dbg1 in sys.argv or dbg2 in sys.argv): # disable debug - if dbg1 in sys.argv: - sys.argv.remove(dbg1) - if dbg2 in sys.argv: - sys.argv.remove(dbg2) - os.execl(path, script, *sys.argv[1:]) - - -if __name__ == '__main__': - p = argparse.ArgumentParser(description='Proxy between given SOAP service and Osmocom CTRL protocol.') - p.add_argument('-v', '--version', action='version', version=("%(prog)s v" + __version__)) - p.add_argument('-p', '--port', type=int, default=4250, help="Port to use for CTRL interface, defaults to 4250") - p.add_argument('-c', '--ctrl', default='localhost', help="Adress to use for CTRL interface, defaults to localhost") - p.add_argument('-w', '--wsdl', required=True, help="WSDL URL for SOAP") - p.add_argument('-n', '--num', type=int, default=5, help="Max number of concurrent HTTP requests to SOAP server") - p.add_argument('-d', '--debug', action='store_true', help="Enable debug log") - p.add_argument('-o', '--output', action='store_true', help="Log to STDOUT in addition to SYSLOG") - p.add_argument('-l', '--location', help="Override location found in WSDL file (don't use unless you know what you're doing)") - args = p.parse_args() - - log = logging.getLogger('CTRL2SOAP') - if args.debug: - log.setLevel(logging.DEBUG) - else: - log.setLevel(logging.INFO) - log.addHandler(logging.handlers.SysLogHandler('/dev/log')) - if args.output: - log.addHandler(logging.StreamHandler(sys.stdout)) - - reboot = partial(reloader, os.path.abspath(__file__), os.path.basename(__file__), log, '-d', '--debug') # keep in sync with add_argument() call above - signal.signal(signal.SIGHUP, reboot) - signal.signal(signal.SIGQUIT, reboot) - signal.signal(signal.SIGUSR1, reboot) # restart and enabled debug output - signal.signal(signal.SIGUSR2, reboot) # restart and disable debug output - - log.info("SOAP proxy %s starting with PID %d ..." % (__version__, os.getpid())) - reactor.connectTCP(args.ctrl, args.port, TrapFactory(args.ctrl, args.port, Trap, defer.DeferredSemaphore(args.num), log, args.wsdl, args.location)) - reactor.run() diff --git a/osmopy/twisted_ipa.py b/osmopy/twisted_ipa.py deleted file mode 100755 index bb8323d..0000000 --- a/osmopy/twisted_ipa.py +++ /dev/null @@ -1,384 +0,0 @@ -#!/usr/bin/python3 -# -*- mode: python-mode; py-indent-tabs-mode: nil -*- -""" -/* - * Copyright (C) 2016 sysmocom s.f.m.c. GmbH - * - * All Rights Reserved - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ -""" - -__version__ = "0.7.0" # bump this on every non-trivial change - -from osmopy.osmo_ipa import Ctrl, IPA -from twisted.internet.protocol import ReconnectingClientFactory -from twisted.internet import reactor -from twisted.protocols import basic -import argparse, logging, sys - -class IPACommon(basic.Int16StringReceiver): - """ - Generic IPA protocol handler: include some routines for simpler subprotocols. - It's not intended as full implementation of all subprotocols, rather common ground and example code. - """ - def dbg(self, line): - """ - Debug print helper - """ - self.factory.log.debug(line) - - def osmo_CTRL(self, data): - """ - OSMO CTRL protocol - Placeholder, see corresponding derived class - """ - pass - - def osmo_MGCP(self, data): - """ - OSMO MGCP extension - """ - self.dbg('OSMO MGCP received %s' % data) - - def osmo_LAC(self, data): - """ - OSMO LAC extension - """ - self.dbg('OSMO LAC received %s' % data) - - def osmo_SMSC(self, data): - """ - OSMO SMSC extension - """ - self.dbg('OSMO SMSC received %s' % data) - - def osmo_ORC(self, data): - """ - OSMO ORC extension - """ - self.dbg('OSMO ORC received %s' % data) - - def osmo_GSUP(self, data): - """ - OSMO GSUP extension - """ - self.dbg('OSMO GSUP received %s' % data) - - def osmo_OAP(self, data): - """ - OSMO OAP extension - """ - self.dbg('OSMO OAP received %s' % data) - - def osmo_UNKNOWN(self, data): - """ - OSMO defaul extension handler - """ - self.dbg('OSMO unknown extension received %s' % data) - - def handle_RSL(self, data, proto, extension): - """ - RSL protocol handler - """ - self.dbg('IPA RSL received message with extension %s' % extension) - - def handle_CCM(self, data, proto, msgt): - """ - CCM (IPA Connection Management) - Placeholder, see corresponding derived class - """ - pass - - def handle_SCCP(self, data, proto, extension): - """ - SCCP protocol handler - """ - self.dbg('IPA SCCP received message with extension %s' % extension) - - def handle_OML(self, data, proto, extension): - """ - OML protocol handler - """ - self.dbg('IPA OML received message with extension %s' % extension) - - def handle_OSMO(self, data, proto, extension): - """ - Dispatcher point for OSMO subprotocols based on extension name, lambda default should never happen - """ - method = getattr(self, 'osmo_' + IPA().ext(extension), lambda: "extension dispatch failure") - method(data) - - def handle_MGCP(self, data, proto, extension): - """ - MGCP protocol handler - """ - self.dbg('IPA MGCP received message with attribute %s' % extension) - - def handle_UNKNOWN(self, data, proto, extension): - """ - Default protocol handler - """ - self.dbg('IPA received message for %s (%s) protocol with attribute %s' % (IPA().proto(proto), proto, extension)) - - def process_chunk(self, data): - """ - Generic message dispatcher for IPA (sub)protocols based on protocol name, lambda default should never happen - """ - (_, proto, extension, content) = IPA().del_header(data) - if content is not None: - self.dbg('IPA received %s::%s [%d/%d] %s' % (IPA().proto(proto), IPA().ext_name(proto, extension), len(data), len(content), content)) - method = getattr(self, 'handle_' + IPA().proto(proto), lambda: "protocol dispatch failure") - method(content, proto, extension) - - def dataReceived(self, data): - """ - Override for dataReceived from Int16StringReceiver because of inherently incompatible interpretation of length - If default handler is used than we would always get off-by-1 error (Int16StringReceiver use equivalent of l + 2) - """ - if len(data): - (head, tail) = IPA().split_combined(data) - self.process_chunk(head) - self.dataReceived(tail) - - def connectionMade(self): - """ - We have to resetDelay() here to drop internal state to default values to make reconnection logic work - Make sure to call this via super() if overriding to keep reconnection logic intact - """ - addr = self.transport.getPeer() - self.dbg('IPA connected to %s:%d peer' % (addr.host, addr.port)) - self.factory.resetDelay() - - -class CCM(IPACommon): - """ - Implementation of CCM protocol for IPA multiplex - """ - def ack(self): - self.transport.write(IPA().id_ack()) - - def ping(self): - self.transport.write(IPA().ping()) - - def pong(self): - self.transport.write(IPA().pong()) - - def handle_CCM(self, data, proto, msgt): - """ - CCM (IPA Connection Management) - Only basic logic necessary for tests is implemented (ping-pong, id ack etc) - """ - if msgt == IPA.MSGT['ID_GET']: - self.transport.getHandle().sendall(IPA().id_resp(self.factory.ccm_id)) - # if we call - # self.transport.write(IPA().id_resp(self.factory.test_id)) - # instead, than we would have to also call - # reactor.callLater(1, self.ack) - # instead of self.ack() - # otherwise the writes will be glued together - hence the necessity for ugly hack with 1s timeout - # Note: this still might work depending on the IPA implementation details on the other side - self.ack() - # schedule PING in 4s - reactor.callLater(4, self.ping) - if msgt == IPA.MSGT['PING']: - self.pong() - - -class CTRL(IPACommon): - """ - Implementation of Osmocom control protocol for IPA multiplex - """ - def ctrl_SET(self, data, op_id, v): - """ - Handle CTRL SET command - """ - self.dbg('CTRL SET [%s] %s' % (op_id, v)) - - def ctrl_SET_REPLY(self, data, op_id, v): - """ - Handle CTRL SET reply - """ - self.dbg('CTRL SET REPLY [%s] %s' % (op_id, v)) - - def ctrl_GET(self, data, op_id, v): - """ - Handle CTRL GET command - """ - self.dbg('CTRL GET [%s] %s' % (op_id, v)) - - def ctrl_GET_REPLY(self, data, op_id, v): - """ - Handle CTRL GET reply - """ - self.dbg('CTRL GET REPLY [%s] %s' % (op_id, v)) - - def ctrl_TRAP(self, data, op_id, v): - """ - Handle CTRL TRAP command - """ - self.dbg('CTRL TRAP [%s] %s' % (op_id, v)) - - def ctrl_ERROR(self, data, op_id, v): - """ - Handle CTRL ERROR reply - """ - self.dbg('CTRL ERROR [%s] %s' % (op_id, v)) - - def osmo_CTRL(self, data): - """ - OSMO CTRL message dispatcher, lambda default should never happen - For basic tests only, appropriate handling routines should be replaced: see CtrlServer for example - """ - self.dbg('OSMO CTRL received %s::%s' % Ctrl().parse(data.decode('utf-8'))) - (cmd, op_id, v) = data.decode('utf-8').split(' ', 2) - method = getattr(self, 'ctrl_' + cmd, lambda: "CTRL unknown command") - method(data, op_id, v) - - -class IPAServer(CCM): - """ - Test implementation of IPA server - Demonstrate CCM opearation by overriding necessary bits from CCM - """ - def connectionMade(self): - """ - Keep reconnection logic working by calling routine from CCM - Initiate CCM upon connection - """ - addr = self.transport.getPeer() - self.factory.log.info('IPA server: connection from %s:%d client' % (addr.host, addr.port)) - super(IPAServer, self).connectionMade() - self.transport.write(IPA().id_get()) - - -class CtrlServer(CTRL): - """ - Test implementation of CTRL server - Demonstarte CTRL handling by overriding simpler routines from CTRL - """ - def connectionMade(self): - """ - Keep reconnection logic working by calling routine from CTRL - Send TRAP upon connection - Note: we can't use sendString() because of it's incompatibility with IPA interpretation of length prefix - """ - addr = self.transport.getPeer() - self.factory.log.info('CTRL server: connection from %s:%d client' % (addr.host, addr.port)) - super(CtrlServer, self).connectionMade() - self.transport.write(Ctrl().trap('LOL', 'what')) - self.transport.write(Ctrl().trap('rulez', 'XXX')) - - def reply(self, r): - self.transport.write(Ctrl().add_header(r)) - - def ctrl_SET(self, data, op_id, v): - """ - CTRL SET command: always succeed - """ - self.dbg('SET [%s] %s' % (op_id, v)) - self.reply('SET_REPLY %s %s' % (op_id, v)) - - def ctrl_GET(self, data, op_id, v): - """ - CTRL GET command: always fail - """ - self.dbg('GET [%s] %s' % (op_id, v)) - self.reply('ERROR %s No variable found' % op_id) - - -class IPAFactory(ReconnectingClientFactory): - """ - Generic IPA Client Factory which can be used to store state for various subprotocols and manage connections - Note: so far we do not really need separate Factory for acting as a server due to protocol simplicity - """ - protocol = IPACommon - log = None - ccm_id = IPA().identity(unit=b'1515/0/1', mac=b'b0:0b:fa:ce:de:ad:be:ef', utype=b'sysmoBTS', name=b'StingRay', location=b'hell', sw=IPA.version.encode('utf-8')) - - def __init__(self, proto=None, log=None, ccm_id=None): - if proto: - self.protocol = proto - if ccm_id: - self.ccm_id = ccm_id - if log: - self.log = log - else: - self.log = logging.getLogger('IPAFactory') - self.log.setLevel(logging.CRITICAL) - self.log.addHandler(logging.NullHandler) - - def clientConnectionFailed(self, connector, reason): - """ - Only necessary for as debugging aid - if we can somehow set parent's class noisy attribute then we can omit this method - """ - self.log.warning('IPAFactory connection failed: %s' % reason.getErrorMessage()) - ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) - - def clientConnectionLost(self, connector, reason): - """ - Only necessary for as debugging aid - if we can somehow set parent's class noisy attribute then we can omit this method - """ - self.log.warning('IPAFactory connection lost: %s' % reason.getErrorMessage()) - ReconnectingClientFactory.clientConnectionLost(self, connector, reason) - - -if __name__ == '__main__': - p = argparse.ArgumentParser("Twisted IPA (module v%s) app" % IPA.version) - p.add_argument('-v', '--version', action='version', version="%(prog)s v" + __version__) - p.add_argument('-p', '--port', type=int, default=4250, help="Port to use for CTRL interface") - p.add_argument('-d', '--host', default='localhost', help="Adress to use for CTRL interface") - cs = p.add_mutually_exclusive_group() - cs.add_argument("-c", "--client", action='store_true', help="asume client role") - cs.add_argument("-s", "--server", action='store_true', help="asume server role") - ic = p.add_mutually_exclusive_group() - ic.add_argument("--ipa", action='store_true', help="use IPA protocol") - ic.add_argument("--ctrl", action='store_true', help="use CTRL protocol") - args = p.parse_args() - test = False - - log = logging.getLogger('TwistedIPA') - log.setLevel(logging.DEBUG) - log.addHandler(logging.StreamHandler(sys.stdout)) - - if args.ctrl: - if args.client: - # Start osmo-bsc to receive TRAP messages when osmo-bts-* connects to it - print('CTRL client, connecting to %s:%d' % (args.host, args.port)) - reactor.connectTCP(args.host, args.port, IPAFactory(CTRL, log)) - test = True - if args.server: - # Use bsc_control.py to issue set/get commands - print('CTRL server, listening on port %d' % args.port) - reactor.listenTCP(args.port, IPAFactory(CtrlServer, log)) - test = True - if args.ipa: - if args.client: - # Start osmo-nitb which would initiate A-bis/IP session - print('IPA client, connecting to %s ports %d and %d' % (args.host, IPA.TCP_PORT_OML, IPA.TCP_PORT_RSL)) - reactor.connectTCP(args.host, IPA.TCP_PORT_OML, IPAFactory(CCM, log)) - reactor.connectTCP(args.host, IPA.TCP_PORT_RSL, IPAFactory(CCM, log)) - test = True - if args.server: - # Start osmo-bts-* which would attempt to connect to us - print('IPA server, listening on ports %d and %d' % (IPA.TCP_PORT_OML, IPA.TCP_PORT_RSL)) - reactor.listenTCP(IPA.TCP_PORT_RSL, IPAFactory(IPAServer, log)) - reactor.listenTCP(IPA.TCP_PORT_OML, IPAFactory(IPAServer, log)) - test = True - if test: - reactor.run() - else: - print("Please specify which protocol in which role you'd like to test.") |