aboutsummaryrefslogtreecommitdiffstats
path: root/osmopy/osmo_interact
diff options
context:
space:
mode:
authorNeels Hofmeyr <neels@hofmeyr.de>2017-12-19 14:12:16 +0100
committerNeels Hofmeyr <nhofmeyr@sysmocom.de>2017-12-19 15:11:31 +0000
commit56aa4785c028a72211913b6c1e11a0173abce062 (patch)
treed1e7125063a4f48c3e1e41f85b29e7c1e47ad824 /osmopy/osmo_interact
parent1a5364696eaf66a382e6d451bc4255622d845a92 (diff)
fix osmo_interact_* and osmo_verify_transcript_* after dir split
After I30cdf0f85b2a60a235960911c9827f4129da40db, * the osmo_interact_{vty,ctrl}.py can no longer import osmo_interact_common, since it was moved to scripts/ in error. * the osmo_verify_{vty,ctrl} scripts can no longer import osmo_interact_{vty,ctrl}, since it is also in scripts/. Notably, the osmo_interact_{vty,ctrl}.py also served as scripts while being modules at the same time, which is not good. Fix these issues by adding a new osmopy/osmo_interact/ submodule with osmopy/osmo_interact/common.py, /vty.py and /ctrl.py as modules, and add in scripts thin wrappers that invoke the modules' main(). Change-Id: I40a37b212274cb70ebb1e1d9d1b3743eb2d64d05
Diffstat (limited to 'osmopy/osmo_interact')
-rw-r--r--osmopy/osmo_interact/__init__.py2
-rw-r--r--osmopy/osmo_interact/common.py474
-rwxr-xr-xosmopy/osmo_interact/ctrl.py114
-rwxr-xr-xosmopy/osmo_interact/vty.py190
4 files changed, 780 insertions, 0 deletions
diff --git a/osmopy/osmo_interact/__init__.py b/osmopy/osmo_interact/__init__.py
new file mode 100644
index 0000000..4fc4fac
--- /dev/null
+++ b/osmopy/osmo_interact/__init__.py
@@ -0,0 +1,2 @@
+#!/usr/bin/env python
+__all__ = ['common', 'vty', 'ctrl']
diff --git a/osmopy/osmo_interact/common.py b/osmopy/osmo_interact/common.py
new file mode 100644
index 0000000..f7070ae
--- /dev/null
+++ b/osmopy/osmo_interact/common.py
@@ -0,0 +1,474 @@
+#!/usr/bin/env python3
+#
+# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
+# All rights reserved.
+#
+# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+'''
+Common code for osmo_interact_vty.py and osmo_interact_ctrl.py.
+This implements all of application interaction, piping and verification.
+osmo_interact_{vty,ctrl}.py plug VTY and CTRL interface specific bits.
+'''
+
+# Our setup.py currently wants everything to be parsable by both py2 and py3.
+# IMHO that is not a good idea, but until that changes, let's just keep this
+# py2 legacy shim in here so we can syntax-check this py3 module with py2.
+from __future__ import print_function
+
+import argparse
+import sys
+import os
+import subprocess
+import time
+import traceback
+import socket
+import shlex
+import re
+
+
+class Interact:
+
+ class StepBase:
+ command = None
+ result = None
+ leading_blanks = None
+
+ def __init__(self):
+ self.result = []
+
+ def verify_interact_state(self, interact_instance):
+ # for example to verify that the last VTY prompt received shows the
+ # right node.
+ pass
+
+ def command_str(self, interact_instance=None):
+ return self.command
+
+ def __str__(self):
+ return '%s\n%s' % (self.command_str(), '\n'.join(self.result))
+
+ @staticmethod
+ def is_next_step(line, interact_instance):
+ assert not "implemented by InteractVty.VtyStep and InteractCtrl.CtrlStep"
+
+ socket = None
+
+ def __init__(self, step_class, port, host, verbose=False, update=False):
+ '''
+ host is the hostname to connect to.
+ port is the CTRL port to connect on.
+ '''
+ self.Step = step_class
+ self.port = port
+ self.host = host
+ self.verbose = verbose
+ self.update = update
+
+ if not port:
+ raise Exception("You need to provide port number to connect to")
+
+ def connect(self):
+ assert self.socket is None
+ retries = 30
+ took = 0
+ while True:
+ took += 1
+ try:
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.setblocking(1)
+ self.socket.connect((self.host, int(self.port)))
+ except IOError:
+ retries -= 1
+ if retries <= 0:
+ raise
+ time.sleep(.1)
+ continue
+ break
+
+ def close(self):
+ if self.socket is None:
+ return
+ self.socket.close()
+ self.socket = None
+
+ def command(self, command):
+ assert not "implemented separately by InteractVty and InteractCtrl"
+
+ def verify_transcript_file(self, transcript_file):
+ with open(transcript_file, 'r') as f:
+ content = f.read()
+
+ try:
+ result = self.verify_transcript(content)
+ except:
+ print('Error while verifying transcript file %r' % transcript_file, file=sys.stderr)
+ sys.stderr.flush()
+ raise
+
+ if not self.update:
+ return
+ content = '\n'.join(result)
+ with open(transcript_file, 'w') as f:
+ f.write(content)
+
+ def verify_transcript(self, transcript):
+ ''''
+ transcript is a "screenshot" of a session, a multi-line string
+ including commands and expected results.
+ Feed commands to self.command() and verify the expected results.
+ '''
+
+ # parse steps
+ steps = []
+ step = None
+ blank_lines = 0
+ for line in transcript.splitlines():
+ if not line:
+ blank_lines += 1
+ continue
+ next_step_started = self.Step.is_next_step(line, self)
+ if next_step_started:
+ if step:
+ steps.append(step)
+ step = next_step_started
+ step.leading_blanks = blank_lines
+ blank_lines = 0
+ elif step:
+ # we only count blank lines directly preceding the start of a
+ # next step. Insert blank lines in the middle of a response
+ # back into the response:
+ if blank_lines:
+ step.result.extend([''] * blank_lines)
+ blank_lines = 0
+ step.result.append(line)
+ if step:
+ steps.append(step)
+ step = None
+
+ actual_result = []
+
+ # run steps
+ step_nr = 0
+ for step in steps:
+ step_nr += 1
+ try:
+ if self.verbose:
+ if step.leading_blanks:
+ print('\n' * step.leading_blanks, end='')
+ print(step.command_str())
+ sys.stdout.flush()
+
+ step.verify_interact_state(self)
+
+ res = self.command(step.command)
+
+ if self.verbose:
+ sys.stderr.flush()
+ sys.stdout.flush()
+ print('\n'.join(res))
+ sys.stdout.flush()
+
+ if step.leading_blanks:
+ actual_result.extend([''] * step.leading_blanks)
+ actual_result.append(step.command_str(self))
+
+ match_result = self.match_lines(step.result, res)
+
+ if self.update:
+ if match_result is True:
+ # preserve any wildcards
+ actual_result.extend(step.result)
+ else:
+ # mismatch, take exactly what came in
+ actual_result.extend(res)
+ continue
+ if match_result is not True:
+ raise Exception('Result mismatch:\n%s\n\nExpected:\n[\n%s\n]\n\nGot:\n[\n%s\n%s\n]'
+ % (match_result, step, step.command_str(), '\n'.join(res)))
+ except:
+ print('Error during transcript step %d:\n[\n%s\n]' % (step_nr, step),
+ file=sys.stderr)
+ sys.stderr.flush()
+ raise
+
+ # final line ending
+ actual_result.append('')
+ return actual_result
+
+ @staticmethod
+ def match_lines(expect, got):
+ '''
+ Match two lists of strings, allowing certain wildcards:
+ - In 'expect', if a line is exactly '...', it matches any number of
+ arbitrary lines in 'got'; the implementation is trivial and skips
+ lines to the first occurence in 'got' that continues after '...'.
+ - If an 'expect' line is '... !regex', it matches any number of
+ lines like '...', but the given regex must not match any of those
+ lines.
+
+ Return 'True' on match, or a string describing the mismatch.
+ '''
+ def match_line(expect_line, got_line):
+ return expect_line == got_line
+
+ ANY = '...'
+ ANY_EXCEPT = '... !'
+
+ e = 0
+ g = 0
+ while e < len(expect):
+ if expect[e] == ANY or expect[e].startswith(ANY_EXCEPT):
+ wildcard = expect[e]
+ e += 1
+ g_end = g
+
+ if e >= len(expect):
+ # anything left in 'got' is accepted.
+ g_end = len(got)
+
+ # look for the next occurence of the expected line in 'got'
+ while g_end < len(got) and not match_line(expect[e], got[g_end]):
+ g_end += 1
+
+ if wildcard == ANY:
+ # no restrictions on lines
+ g = g_end
+
+ elif wildcard.startswith(ANY_EXCEPT):
+ except_re = re.compile(wildcard[len(ANY_EXCEPT):])
+ while g < g_end:
+ if except_re.search(got[g]):
+ return ('Got forbidden line for wildcard %r:'
+ ' did not expect %r in line %d of response'
+ % (wildcard, got[g], g))
+ g += 1
+
+ continue
+
+ if g >= len(got):
+ return 'Cannot find line %r' % expect[e]
+
+ if not match_line(expect[e], got[g]):
+ return 'Mismatch:\nExpect:\n%r\nGot:\n%r' % (expect[e], got[g])
+
+ e += 1
+ g += 1
+
+ if g < len(got):
+ return 'Did not expect line %r' % got[g]
+ return True
+
+ def feed_commands(self, output, command_strs):
+ for command_str in command_strs:
+ for command in command_str.splitlines():
+ res = self.command(command)
+ output.write('\n'.join(res))
+ output.write('\n')
+
+def end_process(proc, quiet=False):
+ if not proc:
+ return
+
+ rc = proc.poll()
+ if rc is not None:
+ if not quiet:
+ print('Process has already terminated with', rc)
+ proc.wait()
+ return
+
+ proc.terminate()
+ time_to_wait_for_term = 5
+ wait_step = 0.001
+ waited_time = 0
+ while True:
+ # poll returns None if proc is still running
+ if proc.poll() is not None:
+ break
+ waited_time += wait_step
+ # make wait_step approach 1.0
+ wait_step = (1. + 5. * wait_step) / 6.
+ if waited_time >= time_to_wait_for_term:
+ break
+ time.sleep(wait_step)
+
+ if proc.poll() is None:
+ # termination seems to be slower than that, let's just kill
+ proc.kill()
+ if not quiet:
+ print("Killed child process")
+ elif waited_time > .002:
+ if not quiet:
+ print("Terminating took %.3fs" % waited_time)
+ proc.wait()
+
+class Application:
+ proc = None
+ _devnull = None
+
+ @staticmethod
+ def devnull():
+ if Application._devnull is None:
+ Application._devnull = open(os.devnull, 'w')
+ return Application._devnull
+
+ def __init__(self, run_app_str, purge_output=True, quiet=False):
+ self.command_tuple = shlex.split(run_app_str)
+ self.purge_output = purge_output
+ self.quiet = quiet
+
+ def run(self):
+ out_err = None
+ if self.purge_output:
+ out_err = Application.devnull()
+
+ if not self.quiet:
+ print('Launching: cd %r; %s' % (os.getcwd(), ' '.join(self.command_tuple)))
+ self.proc = subprocess.Popen(self.command_tuple, stdout=out_err, stderr=out_err)
+
+ def stop(self):
+ end_process(self.proc, self.quiet)
+
+def verify_application(run_app_str, interact, transcript_file, verbose):
+ passed = None
+ application = None
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ if run_app_str:
+ application = Application(run_app_str, purge_output=not verbose)
+ application.run()
+
+ try:
+ interact.connect()
+ interact.verify_transcript_file(transcript_file)
+ passed = True
+ except:
+ traceback.print_exc()
+ passed = False
+ interact.close()
+
+ if application:
+ application.stop()
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ return passed
+
+def common_parser():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-r', '--run', dest='run_app_str',
+ help='command to run to launch application to test,'
+ ' including command line arguments. If omitted, no'
+ ' application is launched.')
+ parser.add_argument('-p', '--port', dest='port',
+ help="Port to reach the application at.")
+ parser.add_argument('-H', '--host', dest='host', default='localhost',
+ help="Host to reach the application at.")
+ return parser
+
+def parser_add_verify_args(parser):
+ parser.add_argument('-u', '--update', dest='update', action='store_true',
+ help='Do not verify, but OVERWRITE transcripts based on'
+ ' the application\'s current behavior. OVERWRITES TRANSCRIPT'
+ ' FILES.')
+ parser.add_argument('-v', '--verbose', action='store_true',
+ help='Print commands and application output')
+ parser.add_argument('transcript_files', nargs='*', help='transcript file(s) to verify')
+ return parser
+
+def parser_add_run_args(parser):
+ parser.add_argument('-O', '--output', dest='output_path',
+ help="Write command results to a file instead of stdout."
+ "('-O -' writes to stdout and is the default)")
+ parser.add_argument('-c', '--command', dest='cmd_str',
+ help="Run this command (before reading input files, if any)."
+ " multiple commands may be separated by ';'")
+ parser.add_argument('cmd_files', nargs='*', help='file(s) with plain commands to run')
+ return parser
+
+def main_run_commands(run_app_str, output_path, cmd_str, cmd_files, interact):
+ to_stdout = False
+ if not output_path or output_path == '-':
+ to_stdout = True
+ output = sys.stdout
+ else:
+ output = open(output_path, 'w')
+
+ application = None
+
+ if run_app_str:
+ application = Application(run_app_str, quiet=to_stdout)
+ application.run()
+
+ try:
+ interact.connect()
+
+ if cmd_str:
+ interact.feed_commands(output, cmd_str.split(';'))
+
+ for f_path in (cmd_files or []):
+ with open(f_path, 'r') as f:
+ interact.feed_commands(output, f.read().decode('utf-8').splitlines())
+
+ if not (cmd_str or cmd_files):
+ while True:
+ line = sys.stdin.readline()
+ if not line:
+ break;
+ interact.feed_commands(output, line.split(';'))
+ except:
+ traceback.print_exc()
+ finally:
+ if not to_stdout:
+ try:
+ output.close()
+ except:
+ traceback.print_exc()
+
+ try:
+ interact.close()
+ except:
+ traceback.print_exc()
+
+ if application:
+ try:
+ application.stop()
+ except:
+ traceback.print_exc()
+
+def main_verify_transcripts(run_app_str, transcript_files, interact, verbose):
+ results = []
+ for t in transcript_files:
+ passed = verify_application(run_app_str=run_app_str,
+ interact=interact,
+ transcript_file=t,
+ verbose=verbose)
+ results.append((passed, t))
+
+ print('\nRESULTS:')
+ all_passed = True
+ for passed, t in results:
+ print('%s: %s' % ('pass' if passed else 'FAIL', t))
+ all_passed = all_passed and passed
+ print()
+
+ if not all_passed:
+ sys.exit(1)
+
+# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
diff --git a/osmopy/osmo_interact/ctrl.py b/osmopy/osmo_interact/ctrl.py
new file mode 100755
index 0000000..b752351
--- /dev/null
+++ b/osmopy/osmo_interact/ctrl.py
@@ -0,0 +1,114 @@
+#!/usr/bin/env python3
+#
+# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
+# All rights reserved.
+#
+# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+'''
+Run CTRL commands or test transcripts against a given application. Commandline
+invocation exposes only direct command piping, the transcript verification code
+is exposed as commandline args by osmo_verify_transcript_ctrl.py.
+'''
+
+import re
+
+from .common import *
+from osmopy.osmo_ipa import Ctrl, IPA
+
+class InteractCtrl(Interact):
+ next_id = 1
+ keep_ids = True
+ re_command = re.compile('^(SET|GET) ([^ ]*) (.*)$')
+
+ class CtrlStep(Interact.StepBase):
+
+ @staticmethod
+ def is_next_step(line, interact_instance):
+ m = InteractCtrl.re_command.match(line)
+ if not m:
+ return None
+ next_step = InteractCtrl.CtrlStep()
+
+ set_get = m.group(1)
+ cmd_id = m.group(2)
+ var_val = m.group(3)
+ if not interact_instance.keep_ids:
+ cmd_id = interact_instance.next_id
+ interact_instance.next_id += 1
+ next_step.command = '%s %s %s' % (set_get, cmd_id, var_val)
+
+ return next_step
+
+ def __init__(self, port, host, verbose=False, update=False, keep_ids=True):
+ if not update:
+ keep_ids = True
+ self.keep_ids = keep_ids
+ super().__init__(InteractCtrl.CtrlStep, port=port, host=host, verbose=verbose, update=update)
+
+ def connect(self):
+ self.next_id = 1
+ super().connect()
+
+ def send(self, data):
+ data = Ctrl().add_header(data)
+ return self.socket.send(data) == len(data)
+
+ def receive(self):
+ responses = []
+ data = self.socket.recv(4096)
+ while (len(data)>0):
+ (response_with_header, data) = IPA().split_combined(data)
+ response = Ctrl().rem_header(response_with_header)
+ responses.append(response.decode('utf-8'))
+ return responses
+
+ def command(self, command):
+ assert self.send(command)
+ res = self.receive()
+ split_responses = []
+ for r in res:
+ split_responses.extend(r.splitlines())
+ sys.stdout.flush()
+ sys.stderr.flush()
+ return split_responses
+
+def main_interact_ctrl():
+ parser = common_parser()
+ parser_add_run_args(parser)
+ args = parser.parse_args()
+
+ interact = InteractCtrl(args.port, args.host, verbose=False, update=False,
+ keep_ids=True)
+
+ main_run_commands(args.run_app_str, args.output_path, args.cmd_str,
+ args.cmd_files, interact)
+
+
+def main_verify_transcript_ctrl():
+ parser = common_parser()
+ parser_add_verify_args(parser)
+ parser.add_argument('-i', '--keep-ids', dest='keep_ids', action='store_true',
+ help='With --update, default is to overwrite the command IDs'
+ ' so that they are consecutive numbers starting from 1.'
+ ' With --keep-ids, do not change these command IDs.')
+ args = parser.parse_args()
+
+ interact = InteractCtrl(args.port, args.host, args.verbose, args.update, args.keep_ids)
+
+ main_verify_transcripts(args.run_app_str, args.transcript_files, interact, args.verbose)
+
+# vim: tabstop=4 shiftwidth=4 expandtab nocin ai
diff --git a/osmopy/osmo_interact/vty.py b/osmopy/osmo_interact/vty.py
new file mode 100755
index 0000000..f34e87e
--- /dev/null
+++ b/osmopy/osmo_interact/vty.py
@@ -0,0 +1,190 @@
+#!/usr/bin/env python3
+#
+# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
+# All rights reserved.
+#
+# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+'''
+Run VTY commands or test transcripts against a given application. Commandline
+invocation exposes only direct command piping, the transcript verification code
+is exposed as commandline args by osmo_verify_transcript_vty.py.
+'''
+
+import re
+
+from .common import *
+
+class InteractVty(Interact):
+
+ class VtyStep(Interact.StepBase):
+ expect_node = None # e.g. '(config-net)'
+ expect_prompt_char = None # '>' or '#'
+
+ def __init__(self, prompt):
+ super().__init__()
+ self.prompt = prompt
+
+ def verify_interact_state(self, interact_instance):
+ if interact_instance.update:
+ return
+ if interact_instance.this_node != self.expect_node:
+ raise Exception('Mismatch: expected VTY node %r in the prompt, got %r'
+ % (self.expect_node, interact_instance.this_node))
+ if interact_instance.this_prompt_char != self.expect_prompt_char:
+ raise Exception('Mismatch: expected VTY prompt character %r, got %r'
+ % (self.expect_prompt_char, interact_instance.this_prompt_char))
+
+ @staticmethod
+ def is_next_step(line, interact_instance):
+ m = interact_instance.re_prompt.match(line)
+ if not m:
+ return None
+ next_step = InteractVty.VtyStep(interact_instance.prompt)
+ next_step.expect_node = m.group(1)
+ next_step.expect_prompt_char = m.group(2)
+ next_step.command = m.group(3)
+ return next_step
+
+ def command_str(self, interact_instance=None):
+ if interact_instance is None:
+ node = self.expect_node
+ prompt_char = self.expect_prompt_char
+ else:
+ node = interact_instance.last_node
+ prompt_char = interact_instance.last_prompt_char
+ if node:
+ node = '(%s)' % node
+ node = node or ''
+ return '%s%s%s %s' % (self.prompt, node, prompt_char, self.command)
+
+ def __init__(self, prompt, port, host, verbose, update):
+ self.prompt = prompt
+ super().__init__(InteractVty.VtyStep, port, host, verbose, update)
+
+ def connect(self):
+ self.this_node = None
+ self.this_prompt_char = '>' # slight cheat for initial prompt char
+ self.last_node = None
+ self.last_prompt_char = None
+
+ super().connect()
+ # receive the first welcome message and discard
+ data = self.socket.recv(4096)
+ if not self.prompt:
+ b = data
+ b = b[b.rfind(b'\n') + 1:]
+ while b and (b[0] < ord('A') or b[0] > ord('z')):
+ b = b[1:]
+ prompt_str = b.decode('utf-8')
+ if '>' in prompt_str:
+ self.prompt = prompt_str[:prompt_str.find('>')]
+ if not self.prompt:
+ raise Exception('Could not find application name; needed to decode prompts.'
+ ' Initial data was: %r' % data)
+ self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)
+
+ def _command(self, command_str, timeout=10):
+ self.socket.send(command_str.encode())
+
+ waited_since = time.time()
+ received_lines = []
+ last_line = ''
+
+ while True:
+ new_data = self.socket.recv(4096).decode('utf-8')
+
+ last_line = "%s%s" % (last_line, new_data)
+
+ if last_line:
+ lines = last_line.splitlines()
+ received_lines.extend(lines[:-1])
+ last_line = lines[-1]
+
+ match = self.re_prompt.match(last_line)
+ if not match:
+ if time.time() - waited_since > timeout:
+ raise IOError("Failed to read data (did the app crash?)")
+ time.sleep(.1)
+ continue
+
+ self.last_node = self.this_node
+ self.last_prompt_char = self.this_prompt_char
+ self.this_node = match.group(1) or None
+ self.this_prompt_char = match.group(2)
+ break
+
+ # expecting to have received the command we sent as echo, remove it
+ clean_command_str = command_str.strip()
+ if clean_command_str.endswith('?'):
+ clean_command_str = clean_command_str[:-1]
+ if received_lines and received_lines[0] == clean_command_str:
+ received_lines = received_lines[1:]
+ return received_lines
+
+ def command(self, command_str, timeout=10):
+ command_str = command_str or '\r'
+ if command_str[-1] not in '?\r\t':
+ command_str = command_str + '\r'
+
+ received_lines = self._command(command_str, timeout)
+
+ # send escape to cancel the '?' command line
+ if command_str[-1] == '?':
+ self._command('\x03', timeout)
+
+ return received_lines
+
+def parser_add_vty_args(parser):
+ parser.add_argument('-n', '--prompt-name', dest='prompt',
+ help="Name used in application's telnet VTY prompt."
+ " If omitted, will attempt to determine the name from"
+ " the initial VTY prompt.")
+ return parser
+
+def main_interact_vty():
+ parser = common_parser()
+ parser_add_vty_args(parser)
+ parser_add_run_args(parser)
+ parser.add_argument('-X', '--gen-xml-ref', dest='gen_xml', action='store_true',
+ help="Equivalent to '-c \"show online-help\" -O -',"
+ " can be used to generate the VTY reference file as"
+ " required by osmo-gsm-manuals.git.")
+ args = parser.parse_args()
+
+ if args.gen_xml:
+ if args.cmd_str:
+ raise Exception('It does not make sense to pass both --command and'
+ ' --gen-xml-ref.')
+ args.cmd_str = 'show online-help'
+
+ interact = InteractVty(args.prompt, args.port, args.host,
+ verbose=False, update=False)
+
+ main_run_commands(args.run_app_str, args.output_path, args.cmd_str,
+ args.cmd_files, interact)
+
+def main_verify_transcript_vty():
+ parser = common_parser()
+ parser_add_vty_args(parser)
+ parser_add_verify_args(parser)
+ args = parser.parse_args()
+
+ interact = InteractVty(args.prompt, args.port, args.host, args.verbose, args.update)
+
+ main_verify_transcripts(args.run_app_str, args.transcript_files, interact, args.verbose)
+
+# vim: tabstop=4 shiftwidth=4 expandtab nocin ai