aboutsummaryrefslogtreecommitdiffstats
path: root/test/subprocesstest.py
blob: 18313db795d371f2d49285b9ff43db2b3b73574c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''Subprocess test case superclass'''

import io
import os
import os.path
import re
import subprocess
import sys
import unittest

# To do:
# - Add a subprocesstest.SkipUnlessCapture decorator?
# - Try to catch crashes? See the comments below in waitProcess.

# XXX This should probably be in config.py and settable from
# the command line.
if sys.version_info[0] >= 3:
    process_timeout = 300 # Seconds

class LoggingPopen(subprocess.Popen):
    '''Run a process using subprocess.Popen. Capture and log its output.

    Stdout and stderr are captured to memory and decoded as UTF-8. The
    program command and output is written to log_fd.
    '''
    def __init__(self, proc_args, *args, **kwargs):
        self.log_fd = kwargs.pop('log_fd', None)
        kwargs['stdout'] = subprocess.PIPE
        kwargs['stderr'] = subprocess.PIPE
        # Make sure communicate() gives us bytes.
        kwargs['universal_newlines'] = False
        self.cmd_str = 'command ' + repr(proc_args)
        super(LoggingPopen, self).__init__(proc_args, *args, **kwargs)
        self.stdout_str = ''
        self.stderr_str = ''

    def wait_and_log(self):
        '''Wait for the process to finish and log its output.'''
        # Wherein we navigate the Python 2 and 3 Unicode compatibility maze.
        if sys.version_info[0] >= 3:
            out_data, err_data = self.communicate(timeout=process_timeout)
            out_log = out_data.decode('UTF-8', 'replace')
            err_log = err_data.decode('UTF-8', 'replace')
        else:
            out_data, err_data = self.communicate()
            out_log = unicode(out_data, 'UTF-8', 'replace')
            err_log = unicode(err_data, 'UTF-8', 'replace')
        # Throwing a UnicodeDecodeError exception here is arguably a good thing.
        self.stdout_str = out_data.decode('UTF-8', 'strict')
        self.stderr_str = err_data.decode('UTF-8', 'strict')
        self.log_fd.flush()
        self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str))
        self.log_fd.write(out_log)
        self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str))
        self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str))
        self.log_fd.write(err_log)
        self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str))
        self.log_fd.flush()

    def stop_process(self, kill=False):
        '''Stop the process immediately.'''
        if kill:
            super(LoggingPopen, self).kill()
        else:
            super(LoggingPopen, self).terminate()

    def terminate(self):
        '''Terminate the process. Do not log its output.'''
        # XXX Currently unused.
        self.stop_process(kill=False)

    def kill(self):
        '''Kill the process. Do not log its output.'''
        self.stop_process(kill=True)

class SubprocessTestCase(unittest.TestCase):
    '''Run a program and gather its stdout and stderr.'''

    def __init__(self, *args, **kwargs):
        super(SubprocessTestCase, self).__init__(*args, **kwargs)
        self.exit_ok = 0
        self.exit_command_line = 1
        self.exit_error = 2
        self.exit_code = None
        self.log_fname = None
        self.log_fd = None
        self.processes = []
        self.cleanup_files = []
        self.dump_files = []

    def log_fd_write_bytes(self, log_data):
        if sys.version_info[0] >= 3:
            self.log_fd.write(log_data)
        else:
            self.log_fd.write(unicode(log_data, 'UTF-8', 'replace'))

    def filename_from_id(self, filename):
        '''Generate a filename prefixed with our test ID.'''
        return self.id() + '.' + filename

    def kill_processes(self):
        '''Kill any processes we've opened so far'''
        for proc in self.processes:
            try:
                proc.kill()
            except:
                pass

    def run(self, result=None):
        # Subclass run() so that we can do the following:
        # - Open our log file and add it to the cleanup list.
        # - Check our result before and after the run so that we can tell
        #   if the current test was successful.

        # Probably not needed, but shouldn't hurt.
        self.kill_processes()
        self.processes = []
        self.log_fname = self.filename_from_id('log')
        # Our command line utilities generate UTF-8. The log file endcoding
        # needs to match that.
        self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8')
        self.cleanup_files.append(self.log_fname)
        pre_run_problem_count = 0
        if result:
            pre_run_problem_count = len(result.failures) + len(result.errors)
        try:
            super(SubprocessTestCase, self).run(result=result)
        except KeyboardInterrupt:
            # XXX This doesn't seem to work on Windows, which is where we need it the most.
            self.kill_processes()

        # Tear down our test. We don't do this in tearDown() because Python 3
        # updates "result" after calling tearDown().
        self.kill_processes()
        self.log_fd.close()
        if result:
            post_run_problem_count = len(result.failures) + len(result.errors)
            if pre_run_problem_count != post_run_problem_count:
                self.dump_files.append(self.log_fname)
                # Leave some evidence behind.
                self.cleanup_files = []
                print('\nProcess output for {}:'.format(self.id()))
                with io.open(self.log_fname, 'r', encoding='UTF-8') as log_fd:
                    for line in log_fd:
                        sys.stdout.write(line)
        for filename in self.cleanup_files:
            try:
                os.unlink(filename)
            except OSError:
                pass
        self.cleanup_files = []

    def countOutput(self, search_pat, proc=None):
        '''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
        match_count = 0
        if proc is None:
            proc = self.processes[-1]
        # We might want to let the caller decide what we're searching.
        out_data = proc.stdout_str + proc.stderr_str
        search_re = re.compile(search_pat)
        for line in out_data.splitlines():
            if search_re.search(line):
                match_count += 1
        return match_count

    def grepOutput(self, search_pat, proc=None):
        return self.countOutput(search_pat, proc) > 0

    def startProcess(self, proc_args, env=None, shell=False):
        '''Start a process in the background. Returns a subprocess.Popen object. You typically wait for it using waitProcess() or assertWaitProcess().'''
        proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd)
        self.processes.append(proc)
        return proc

    def waitProcess(self, process):
        '''Wait for a process to finish.'''
        process.wait_and_log()
        # XXX The shell version ran processes using a script called run_and_catch_crashes
        # which looked for core dumps and printed stack traces if found. We might want
        # to do something similar here. This may not be easy on modern Ubuntu systems,
        # which default to using Apport: https://wiki.ubuntu.com/Apport

    def assertWaitProcess(self, process, expected_return=0):
        '''Wait for a process to finish and check its exit code.'''
        process.wait_and_log()
        self.assertEqual(process.returncode, expected_return)

    def runProcess(self, args, env=None, shell=False):
        '''Start a process and wait for it to finish.'''
        process = self.startProcess(args, env=env, shell=shell)
        process.wait_and_log()
        return process

    def assertRun(self, args, env=None, shell=False, expected_return=0):
        '''Start a process and wait for it to finish. Check its return code.'''
        process = self.runProcess(args, env=env, shell=shell)
        self.assertEqual(process.returncode, expected_return)
        return process