aboutsummaryrefslogtreecommitdiffstats
path: root/test/subprocesstest.py
diff options
context:
space:
mode:
authorGerald Combs <gerald@wireshark.org>2018-04-02 17:12:23 -0700
committerGerald Combs <gerald@wireshark.org>2018-04-26 19:27:19 +0000
commit0ad423924992f8504b3e75980e1e9efb65d84214 (patch)
tree5b9039f1ba272037c10528860b28a99272908620 /test/subprocesstest.py
parentf9522d8a23a375ddc8bd39cf556002cdec346ab1 (diff)
Start porting our test scripts to Python. Add ctest support.
Create Python versions of our various test shell scripts. Add CMake tests for each suite. Tests can now be run directly via test.py, via the "test" target, or via ctest, e.g. ctest --verbose --jobs 3 Add a testing chapter to the Developer's Guide. Add a way to disable ctest in dpkg-buildpackage. Suites completed: - capture - clopts - decryption - dissection Remaining suites: - fileformats - io - mergecap - nameres - text2pcap - unittests - wslua Change-Id: I8936e05edefc76a86b6a7a5da302e7461bbdda0f Reviewed-on: https://code.wireshark.org/review/27134 Petri-Dish: Gerald Combs <gerald@wireshark.org> Tested-by: Petri Dish Buildbot Reviewed-by: Peter Wu <peter@lekensteyn.nl> Reviewed-by: Gerald Combs <gerald@wireshark.org>
Diffstat (limited to 'test/subprocesstest.py')
-rw-r--r--test/subprocesstest.py206
1 files changed, 206 insertions, 0 deletions
diff --git a/test/subprocesstest.py b/test/subprocesstest.py
new file mode 100644
index 0000000000..18313db795
--- /dev/null
+++ b/test/subprocesstest.py
@@ -0,0 +1,206 @@
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Subprocess test case superclass'''
+
+import io
+import os
+import os.path
+import re
+import subprocess
+import sys
+import unittest
+
+# To do:
+# - Add a subprocesstest.SkipUnlessCapture decorator?
+# - Try to catch crashes? See the comments below in waitProcess.
+
+# XXX This should probably be in config.py and settable from
+# the command line.
+if sys.version_info[0] >= 3:
+ process_timeout = 300 # Seconds
+
+class LoggingPopen(subprocess.Popen):
+ '''Run a process using subprocess.Popen. Capture and log its output.
+
+ Stdout and stderr are captured to memory and decoded as UTF-8. The
+ program command and output is written to log_fd.
+ '''
+ def __init__(self, proc_args, *args, **kwargs):
+ self.log_fd = kwargs.pop('log_fd', None)
+ kwargs['stdout'] = subprocess.PIPE
+ kwargs['stderr'] = subprocess.PIPE
+ # Make sure communicate() gives us bytes.
+ kwargs['universal_newlines'] = False
+ self.cmd_str = 'command ' + repr(proc_args)
+ super(LoggingPopen, self).__init__(proc_args, *args, **kwargs)
+ self.stdout_str = ''
+ self.stderr_str = ''
+
+ def wait_and_log(self):
+ '''Wait for the process to finish and log its output.'''
+ # Wherein we navigate the Python 2 and 3 Unicode compatibility maze.
+ if sys.version_info[0] >= 3:
+ out_data, err_data = self.communicate(timeout=process_timeout)
+ out_log = out_data.decode('UTF-8', 'replace')
+ err_log = err_data.decode('UTF-8', 'replace')
+ else:
+ out_data, err_data = self.communicate()
+ out_log = unicode(out_data, 'UTF-8', 'replace')
+ err_log = unicode(err_data, 'UTF-8', 'replace')
+ # Throwing a UnicodeDecodeError exception here is arguably a good thing.
+ self.stdout_str = out_data.decode('UTF-8', 'strict')
+ self.stderr_str = err_data.decode('UTF-8', 'strict')
+ self.log_fd.flush()
+ self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str))
+ self.log_fd.write(out_log)
+ self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str))
+ self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str))
+ self.log_fd.write(err_log)
+ self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str))
+ self.log_fd.flush()
+
+ def stop_process(self, kill=False):
+ '''Stop the process immediately.'''
+ if kill:
+ super(LoggingPopen, self).kill()
+ else:
+ super(LoggingPopen, self).terminate()
+
+ def terminate(self):
+ '''Terminate the process. Do not log its output.'''
+ # XXX Currently unused.
+ self.stop_process(kill=False)
+
+ def kill(self):
+ '''Kill the process. Do not log its output.'''
+ self.stop_process(kill=True)
+
+class SubprocessTestCase(unittest.TestCase):
+ '''Run a program and gather its stdout and stderr.'''
+
+ def __init__(self, *args, **kwargs):
+ super(SubprocessTestCase, self).__init__(*args, **kwargs)
+ self.exit_ok = 0
+ self.exit_command_line = 1
+ self.exit_error = 2
+ self.exit_code = None
+ self.log_fname = None
+ self.log_fd = None
+ self.processes = []
+ self.cleanup_files = []
+ self.dump_files = []
+
+ def log_fd_write_bytes(self, log_data):
+ if sys.version_info[0] >= 3:
+ self.log_fd.write(log_data)
+ else:
+ self.log_fd.write(unicode(log_data, 'UTF-8', 'replace'))
+
+ def filename_from_id(self, filename):
+ '''Generate a filename prefixed with our test ID.'''
+ return self.id() + '.' + filename
+
+ def kill_processes(self):
+ '''Kill any processes we've opened so far'''
+ for proc in self.processes:
+ try:
+ proc.kill()
+ except:
+ pass
+
+ def run(self, result=None):
+ # Subclass run() so that we can do the following:
+ # - Open our log file and add it to the cleanup list.
+ # - Check our result before and after the run so that we can tell
+ # if the current test was successful.
+
+ # Probably not needed, but shouldn't hurt.
+ self.kill_processes()
+ self.processes = []
+ self.log_fname = self.filename_from_id('log')
+ # Our command line utilities generate UTF-8. The log file endcoding
+ # needs to match that.
+ self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8')
+ self.cleanup_files.append(self.log_fname)
+ pre_run_problem_count = 0
+ if result:
+ pre_run_problem_count = len(result.failures) + len(result.errors)
+ try:
+ super(SubprocessTestCase, self).run(result=result)
+ except KeyboardInterrupt:
+ # XXX This doesn't seem to work on Windows, which is where we need it the most.
+ self.kill_processes()
+
+ # Tear down our test. We don't do this in tearDown() because Python 3
+ # updates "result" after calling tearDown().
+ self.kill_processes()
+ self.log_fd.close()
+ if result:
+ post_run_problem_count = len(result.failures) + len(result.errors)
+ if pre_run_problem_count != post_run_problem_count:
+ self.dump_files.append(self.log_fname)
+ # Leave some evidence behind.
+ self.cleanup_files = []
+ print('\nProcess output for {}:'.format(self.id()))
+ with io.open(self.log_fname, 'r', encoding='UTF-8') as log_fd:
+ for line in log_fd:
+ sys.stdout.write(line)
+ for filename in self.cleanup_files:
+ try:
+ os.unlink(filename)
+ except OSError:
+ pass
+ self.cleanup_files = []
+
+ def countOutput(self, search_pat, proc=None):
+ '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
+ match_count = 0
+ if proc is None:
+ proc = self.processes[-1]
+ # We might want to let the caller decide what we're searching.
+ out_data = proc.stdout_str + proc.stderr_str
+ search_re = re.compile(search_pat)
+ for line in out_data.splitlines():
+ if search_re.search(line):
+ match_count += 1
+ return match_count
+
+ def grepOutput(self, search_pat, proc=None):
+ return self.countOutput(search_pat, proc) > 0
+
+ def startProcess(self, proc_args, env=None, shell=False):
+ '''Start a process in the background. Returns a subprocess.Popen object. You typically wait for it using waitProcess() or assertWaitProcess().'''
+ proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd)
+ self.processes.append(proc)
+ return proc
+
+ def waitProcess(self, process):
+ '''Wait for a process to finish.'''
+ process.wait_and_log()
+ # XXX The shell version ran processes using a script called run_and_catch_crashes
+ # which looked for core dumps and printed stack traces if found. We might want
+ # to do something similar here. This may not be easy on modern Ubuntu systems,
+ # which default to using Apport: https://wiki.ubuntu.com/Apport
+
+ def assertWaitProcess(self, process, expected_return=0):
+ '''Wait for a process to finish and check its exit code.'''
+ process.wait_and_log()
+ self.assertEqual(process.returncode, expected_return)
+
+ def runProcess(self, args, env=None, shell=False):
+ '''Start a process and wait for it to finish.'''
+ process = self.startProcess(args, env=env, shell=shell)
+ process.wait_and_log()
+ return process
+
+ def assertRun(self, args, env=None, shell=False, expected_return=0):
+ '''Start a process and wait for it to finish. Check its return code.'''
+ process = self.runProcess(args, env=env, shell=shell)
+ self.assertEqual(process.returncode, expected_return)
+ return process