aboutsummaryrefslogtreecommitdiffstats
path: root/osmopy/osmo_verify_transcript_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'osmopy/osmo_verify_transcript_common.py')
-rw-r--r--osmopy/osmo_verify_transcript_common.py372
1 files changed, 372 insertions, 0 deletions
diff --git a/osmopy/osmo_verify_transcript_common.py b/osmopy/osmo_verify_transcript_common.py
new file mode 100644
index 0000000..e4b5553
--- /dev/null
+++ b/osmopy/osmo_verify_transcript_common.py
@@ -0,0 +1,372 @@
+#!/usr/bin/env python3
+#
+# (C) 2017 by sysmocom s.f.m.c. GmbH <info@sysmocom.de>
+# All rights reserved.
+#
+# Author: Neels Hofmeyr <nhofmeyr@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+'''
+Common code for verify_transcript_vty.py and verify_transcript_ctrl.py.
+'''
+
+import argparse
+import sys
+import os
+import subprocess
+import time
+import traceback
+import socket
+import shlex
+
+
+class Interact:
+
+ class StepBase:
+ command = None
+ result = None
+ leading_blanks = None
+
+ def __init__(self):
+ self.result = []
+
+ def verify_interact_state(self, interact_instance):
+ # for example to verify that the last VTY prompt received shows the
+ # right node.
+ pass
+
+ def command_str(self, interact_instance=None):
+ return self.command
+
+ def __str__(self):
+ return '%s\n%s' % (self.command_str(), '\n'.join(self.result))
+
+ @staticmethod
+ def is_next_step(line, interact_instance):
+ assert not "implemented by InteractVty.VtyStep and InteractCtrl.CtrlStep"
+
+ socket = None
+
+ def __init__(self, step_class, port, host, verbose=False, update=False):
+ '''
+ host is the hostname to connect to.
+ port is the CTRL port to connect on.
+ '''
+ self.Step = step_class
+ self.port = port
+ self.host = host
+ self.verbose = verbose
+ self.update = update
+
+ def connect(self):
+ assert self.socket is None
+ retries = 30
+ took = 0
+ while True:
+ took += 1
+ try:
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.setblocking(1)
+ self.socket.connect((self.host, int(self.port)))
+ except IOError:
+ retries -= 1
+ if retries <= 0:
+ raise
+ time.sleep(.1)
+ continue
+ break
+
+ def close(self):
+ if self.socket is None:
+ return
+ self.socket.close()
+ self.socket = None
+
+ def command(self, command):
+ assert not "implemented separately by InteractVty and InteractCtrl"
+
+ def verify_transcript_file(self, transcript_file):
+ with open(transcript_file, 'r') as f:
+ content = f.read()
+
+ try:
+ result = self.verify_transcript(content)
+ except:
+ print('Error while verifying transcript file %r' % transcript_file, file=sys.stderr)
+ sys.stderr.flush()
+ raise
+
+ if not self.update:
+ return
+ content = '\n'.join(result)
+ with open(transcript_file, 'w') as f:
+ f.write(content)
+
+ def verify_transcript(self, transcript):
+ ''''
+ transcript is a "screenshot" of a session, a multi-line string
+ including commands and expected results.
+ Feed commands to self.command() and verify the expected results.
+ '''
+
+ # parse steps
+ steps = []
+ step = None
+ blank_lines = 0
+ for line in transcript.splitlines():
+ if not line:
+ blank_lines += 1
+ continue
+ next_step_started = self.Step.is_next_step(line, self)
+ if next_step_started:
+ if step:
+ steps.append(step)
+ step = next_step_started
+ step.leading_blanks = blank_lines
+ blank_lines = 0
+ elif step:
+ # we only count blank lines directly preceding the start of a
+ # next step. Insert blank lines in the middle of a response
+ # back into the response:
+ if blank_lines:
+ step.result.extend([''] * blank_lines)
+ blank_lines = 0
+ step.result.append(line)
+ if step:
+ steps.append(step)
+ step = None
+
+ actual_result = []
+
+ # run steps
+ step_nr = 0
+ for step in steps:
+ step_nr += 1
+ try:
+ if self.verbose:
+ if step.leading_blanks:
+ print('\n' * step.leading_blanks, end='')
+ print(step.command_str())
+ sys.stdout.flush()
+
+ step.verify_interact_state(self)
+
+ res = self.command(step.command)
+
+ if self.verbose:
+ sys.stderr.flush()
+ sys.stdout.flush()
+ print('\n'.join(res))
+ sys.stdout.flush()
+
+ if step.leading_blanks:
+ actual_result.extend([''] * step.leading_blanks)
+ actual_result.append(step.command_str(self))
+
+ match_result = self.match_lines(step.result, res)
+
+ if self.update:
+ if match_result is True:
+ # preserve any wildcards
+ actual_result.extend(step.result)
+ else:
+ # mismatch, take exactly what came in
+ actual_result.extend(res)
+ continue
+ if match_result is not True:
+ raise Exception('Result mismatch:\n%s\n\nExpected:\n[\n%s\n]\n\nGot:\n[\n%s\n%s\n]'
+ % (match_result, step, step.command_str(), '\n'.join(res)))
+ except:
+ print('Error during transcript step %d:\n[\n%s\n]' % (step_nr, step),
+ file=sys.stderr)
+ sys.stderr.flush()
+ raise
+
+ # final line ending
+ actual_result.append('')
+ return actual_result
+
+ @staticmethod
+ def match_lines(expect, got):
+ '''
+ Match two lists of strings, allowing certain wildcards:
+ - In 'expect', if a line is exactly '...', it matches any number of
+ arbitrary lines in 'got'; the implementation is trivial and skips
+ lines to the first occurence in 'got' that continues after '...'.
+
+ Return 'True' on match, or a string describing the mismatch.
+ '''
+ def match_line(expect_line, got_line):
+ return expect_line == got_line
+
+ e = 0
+ g = 0
+ while e < len(expect):
+ if expect[e] == '...':
+ e += 1
+
+ if e >= len(expect):
+ # anything left in 'got' is accepted.
+ return True
+
+ # look for the next occurence of the expected line in 'got'
+ while g < len(got) and not match_line(expect[e], got[g]):
+ g += 1
+ continue
+
+ if g >= len(got):
+ return 'Cannot find line %r' % expect[e]
+
+ if not match_line(expect[e], got[g]):
+ return 'Mismatch:\nExpect:\n%r\nGot:\n%r' % (expect[e], got[g])
+
+ e += 1
+ g += 1
+
+ if g < len(got):
+ return 'Did not expect line %r' % got[g]
+ return True
+
+def end_process(proc):
+ if not proc:
+ return
+
+ rc = proc.poll()
+ if rc is not None:
+ print('Process has already terminated with', rc)
+ proc.wait()
+ return
+
+ proc.terminate()
+ time_to_wait_for_term = 5
+ wait_step = 0.001
+ waited_time = 0
+ while True:
+ # poll returns None if proc is still running
+ if proc.poll() is not None:
+ break
+ waited_time += wait_step
+ # make wait_step approach 1.0
+ wait_step = (1. + 5. * wait_step) / 6.
+ if waited_time >= time_to_wait_for_term:
+ break
+ time.sleep(wait_step)
+
+ if proc.poll() is None:
+ # termination seems to be slower than that, let's just kill
+ proc.kill()
+ print("Killed child process")
+ elif waited_time > .002:
+ print("Terminating took %.3fs" % waited_time)
+ proc.wait()
+
+class Application:
+ proc = None
+ _devnull = None
+
+ @staticmethod
+ def devnull():
+ if Application._devnull is None:
+ Application._devnull = open(os.devnull, 'w')
+ return Application._devnull
+
+ def __init__(self, command_tuple, purge_output=True):
+ self.command_tuple = command_tuple
+ self.purge_output = purge_output
+
+ def run(self):
+ out_err = None
+ if self.purge_output:
+ out_err = Application.devnull()
+
+ print('Launching: cd %r; %s' % (os.getcwd(), ' '.join(self.command_tuple)))
+ self.proc = subprocess.Popen(self.command_tuple, stdout=out_err, stderr=out_err)
+
+ def stop(self):
+ end_process(self.proc)
+
+def verify_application(command_tuple, interact, transcript_file, verbose):
+ passed = None
+ application = None
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ if command_tuple:
+ application = Application(command_tuple, purge_output=not verbose)
+ application.run()
+
+ try:
+ interact.connect()
+ interact.verify_transcript_file(transcript_file)
+ passed = True
+ except:
+ traceback.print_exc()
+ passed = False
+ interact.close()
+
+ if application:
+ application.stop()
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ return passed
+
+def common_parser():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-r', '--run', dest='command_str',
+ help='command to run to launch application to test,'
+ ' including command line arguments. If omitted, no'
+ ' application is launched.')
+ parser.add_argument('-p', '--port', dest='port',
+ help="Port that the application opens.")
+ parser.add_argument('-H', '--host', dest='host', default='localhost',
+ help="Host that the application opens the port on.")
+ parser.add_argument('-u', '--update', dest='update', action='store_true',
+ help='Do not verify, but OVERWRITE transcripts based on'
+ ' the applications current behavior. OVERWRITES TRANSCRIPT'
+ ' FILES.')
+ parser.add_argument('-v', '--verbose', action='store_true',
+ help='Print commands and application output')
+ parser.add_argument('transcript_files', nargs='*', help='transcript files to verify')
+ return parser
+
+def main(command_str, transcript_files, interact, verbose):
+
+ if command_str:
+ command_tuple = shlex.split(command_str)
+ else:
+ command_tuple = None
+
+ results = []
+ for t in transcript_files:
+ passed = verify_application(command_tuple=command_tuple,
+ interact=interact,
+ transcript_file=t,
+ verbose=verbose)
+ results.append((passed, t))
+
+ print('\nRESULTS:')
+ all_passed = True
+ for passed, t in results:
+ print('%s: %s' % ('pass' if passed else 'FAIL', t))
+ all_passed = all_passed and passed
+ print()
+
+ if not all_passed:
+ sys.exit(1)
+
+# vim: tabstop=4 shiftwidth=4 expandtab nocin ai