diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/conftest.py | 29 | ||||
-rw-r--r-- | test/fixtures.py | 43 | ||||
-rw-r--r-- | test/fixtures_ws.py | 58 | ||||
-rw-r--r-- | test/subprocesstest.py | 23 | ||||
-rw-r--r-- | test/suite_capture.py | 565 | ||||
-rw-r--r-- | test/suite_clopts.py | 40 | ||||
-rw-r--r-- | test/suite_fileformats.py | 22 | ||||
-rw-r--r-- | test/suite_io.py | 2 | ||||
-rw-r--r-- | test/suite_unittests.py | 7 | ||||
-rwxr-xr-x | test/test.py | 43 |
10 files changed, 454 insertions, 378 deletions
diff --git a/test/conftest.py b/test/conftest.py index a09e86c485..c2646bf7a8 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -8,31 +8,34 @@ # '''py.test configuration''' -import os -import sys import fixtures -import config -# XXX remove globals in config and create py.test-specific fixtures -try: - _program_path = os.environ['WS_BIN_PATH'] -except KeyError: - print('Please set env var WS_BIN_PATH to the run directory with binaries') - sys.exit(1) -if not config.setProgramPath(_program_path): - print('One or more required executables not found at {}\n'.format(_program_path)) - sys.exit(1) +def pytest_addoption(parser): + parser.addoption('--disable-capture', action='store_true', + help='Disable capture tests' + ) + parser.addoption('--capture-interface', + help='Capture interface index or name.' + ) + parser.addoption('--program-path', help='Path to Wireshark executables.') + +_all_test_groups = None # this is set only to please case_unittests.test_unit_ctest_coverage def pytest_collection_modifyitems(items): '''Find all test groups.''' + global _all_test_groups suites = [] for item in items: name = item.nodeid.split("::")[0].replace(".py", "").replace("/", ".") if name not in suites: suites.append(name) - config.all_groups = list(sorted(suites)) + _all_test_groups = sorted(suites) # Must enable pytest before importing fixtures_ws. fixtures.enable_pytest() from fixtures_ws import * + +@fixtures.fixture(scope='session') +def all_test_groups(): + return _all_test_groups diff --git a/test/fixtures.py b/test/fixtures.py index 26c0ed371f..1dbfe40977 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -7,6 +7,7 @@ # SPDX-License-Identifier: (GPL-2.0-or-later OR MIT) # +import argparse import functools import inspect import sys @@ -197,10 +198,13 @@ class _ExecutionScope(object): def cached_result(self, spec): '''Obtain the cached result for a previously executed fixture.''' - value, exc = self._find_scope(spec.scope).cache[spec.name] + entry = self._find_scope(spec.scope).cache.get(spec.name) + if not entry: + return None, False + value, exc = entry if exc: raise exc - return value + return value, True def _execute_one(self, spec, test_fn): # A fixture can only execute in the same or earlier scopes @@ -265,7 +269,15 @@ class _FixtureRequest(object): if not spec: assert argname == 'request' return self - return self._context.cached_result(spec) + value, ok = self._context.cached_result(spec) + if not ok: + # If getfixturevalue is called directly from a setUp function, the + # fixture value might not have computed before, so evaluate it now. + # As the test function is not available, use None. + self._context.execute(spec, test_fn=None) + value, ok = self._context.cached_result(spec) + assert ok, 'Failed to execute fixture %s' % (spec,) + return value def destroy(self): self._context.destroy() @@ -277,6 +289,11 @@ class _FixtureRequest(object): def instance(self): return self.function.__self__ + @property + def config(self): + '''The pytest config object associated with this request.''' + return _config + def _patch_unittest_testcase_class(cls): ''' @@ -304,8 +321,20 @@ def _patch_unittest_testcase_class(cls): cls._orig_tearDown, cls.tearDown = cls.tearDown, tearDown +class _Config(object): + def __init__(self, args): + assert isinstance(args, argparse.Namespace) + self.args = args + + def getoption(self, name, default): + '''Partial emulation for pytest Config.getoption.''' + name = name.lstrip('-').replace('-', '_') + return getattr(self.args, name, default) + + _fallback = None _session_context = None +_config = None def init_fallback_fixtures_once(): @@ -317,10 +346,14 @@ def init_fallback_fixtures_once(): # Register standard fixtures here as needed -def create_session(): - global _session_context +def create_session(args=None): + '''Start a test session where args is from argparse.''' + global _session_context, _config assert not _use_native_pytest _session_context = _ExecutionScope('session', None) + if args is None: + args = argparse.Namespace() + _config = _Config(args) def destroy_session(): diff --git a/test/fixtures_ws.py b/test/fixtures_ws.py index 18be13631f..55e3e2151c 100644 --- a/test/fixtures_ws.py +++ b/test/fixtures_ws.py @@ -22,9 +22,51 @@ import subprocesstest @fixtures.fixture(scope='session') -def program_path(): - # XXX stop using config - return config.program_path +def capture_interface(request, cmd_dumpcap): + ''' + Name of capture interface. Tests will be skipped if dumpcap is not + available or if the capture interface is unknown. + ''' + iface = request.config.getoption('--capture-interface', default=None) + disabled = request.config.getoption('--disable-capture', default=False) + if disabled: + fixtures.skip('Capture tests are disabled via --disable-capture') + if iface: + # If a non-empty interface is given, assume capturing is possible. + return iface + else: + if sys.platform == 'win32': + patterns = '.*(Ethernet|Network Connection|VMware|Intel)' + else: + patterns = None + if patterns: + try: + output = subprocess.check_output((cmd_dumpcap, '-D'), + stderr=subprocess.DEVNULL, + universal_newlines=True) + m = re.search(r'^(\d+)\. %s' % patterns, output, re.MULTILINE) + if m: + return m.group(1) + except subprocess.CalledProcessError: + pass + fixtures.skip('Test requires capture privileges and an interface.') + + +@fixtures.fixture(scope='session') +def program_path(request): + ''' + Path to the Wireshark binaries as set by the --program-path option, the + WS_BIN_PATH environment variable or (curdir)/run. + ''' + paths = ( + request.config.getoption('--program-path', default=None), + os.environ.get('WS_BIN_PATH'), + os.path.join(os.curdir, 'run'), + ) + for path in paths: + if type(path) == str and os.path.isdir(path): + return path + raise AssertionError('Missing directory with Wireshark binaries') @fixtures.fixture(scope='session') @@ -76,6 +118,14 @@ def cmd_wireshark(program): @fixtures.fixture(scope='session') +def wireshark_command(cmd_wireshark): + if sys.platform not in ('win32', 'darwin'): + # TODO check DISPLAY for X11 on Linux or BSD? + fixtures.skip('Wireshark GUI tests requires DISPLAY') + return (cmd_wireshark, '-ogui.update.enabled:FALSE') + + +@fixtures.fixture(scope='session') def features(cmd_tshark): '''Returns an object describing available features in tshark.''' try: @@ -182,5 +232,3 @@ def test_env(base_env, conf_path, request): # Inject the test environment as default if it was not overridden. request.instance.injected_test_env = env return env - -# XXX capture: capture_interface diff --git a/test/subprocesstest.py b/test/subprocesstest.py index 911868624b..060ebeb3a4 100644 --- a/test/subprocesstest.py +++ b/test/subprocesstest.py @@ -27,23 +27,6 @@ import unittest # the command line. process_timeout = 300 # Seconds -def capture_command(cmd, *args, **kwargs): - '''Convert the supplied arguments into a command suitable for SubprocessTestCase. - - If shell is true, return a string. Otherwise, return a list of arguments.''' - shell = kwargs.pop('shell', False) - if shell: - cap_cmd = ['"' + cmd + '"'] - else: - cap_cmd = [cmd] - if cmd == config.cmd_wireshark: - cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k') - cap_cmd += args - if shell: - return ' '.join(cap_cmd) - else: - return cap_cmd - def cat_dhcp_command(mode): '''Create a command string for dumping dhcp.pcap to stdout''' # XXX Do this in Python in a thread? @@ -196,10 +179,12 @@ class SubprocessTestCase(unittest.TestCase): capinfos_args must be a sequence. Default cap_file is <test id>.testout.pcap.''' + # XXX convert users to use a new fixture instead of this function. + cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos') if not cap_file: cap_file = self.filename_from_id('testout.pcap') - self.log_fd.write('\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file)) - capinfos_cmd = [config.cmd_capinfos] + self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file)) + capinfos_cmd = [cmd_capinfos] if capinfos_args is not None: capinfos_cmd += capinfos_args capinfos_cmd.append(cap_file) diff --git a/test/suite_capture.py b/test/suite_capture.py index eb1a4ffcab..a9ac1aa9c0 100644 --- a/test/suite_capture.py +++ b/test/suite_capture.py @@ -12,13 +12,12 @@ import config import glob import os -import re import subprocess import subprocesstest import sys import time -import unittest import uuid +import fixtures capture_duration = 5 @@ -40,331 +39,365 @@ def stop_pinging(ping_procs): for proc in ping_procs: proc.kill() -def check_capture_10_packets(self, cmd=None, to_stdout=False): - # Similar to suite_io.check_io_4_packets. - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) - self.assertIsNotNone(cmd) - testout_file = self.filename_from_id(testout_pcap) - ping_procs = start_pinging(self) - if to_stdout: - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', '"{}"'.format(config.capture_interface), + +@fixtures.fixture(scope='session') +def wireshark_k(wireshark_command): + return tuple(list(wireshark_command) + ['-k']) + +def capture_command(*cmd_args, shell=False): + if type(cmd_args[0]) != str: + # Assume something like ['wireshark', '-k'] + cmd_args = list(cmd_args[0]) + list(cmd_args)[1:] + if shell: + cmd_args = ' '.join(cmd_args) + return cmd_args + + +@fixtures.fixture +def check_capture_10_packets(capture_interface, cmd_dumpcap): + def check_capture_10_packets_real(self, cmd=None, to_stdout=False): + # Similar to suite_io.check_io_4_packets. + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + testout_file = self.filename_from_id(testout_pcap) + ping_procs = start_pinging(self) + if to_stdout: + capture_proc = self.runProcess(capture_command(cmd, + '-i', '"{}"'.format(capture_interface), + '-p', + '-w', '-', + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', '"icmp || icmp6"', + '>', testout_file, + shell=True + ), + shell=True + ) + else: + capture_proc = self.runProcess(capture_command(cmd, + '-i', capture_interface, + '-p', + '-w', testout_file, + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', 'icmp || icmp6', + )) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + if capture_returncode != 0: + self.log_fd.write('{} -D output:\n'.format(cmd)) + self.runProcess((cmd, '-D')) + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + self.checkPacketCount(10) + return check_capture_10_packets_real + + +@fixtures.fixture +def check_capture_fifo(cmd_dumpcap): + if sys.platform == 'win32': + fixtures.skip('Test requires OS fifo support.') + + def check_capture_fifo_real(self, cmd=None): + self.assertIsNotNone(cmd) + testout_file = self.filename_from_id(testout_pcap) + fifo_file = self.filename_from_id('testout.fifo') + try: + # If a previous test left its fifo laying around, e.g. from a failure, remove it. + os.unlink(fifo_file) + except: + pass + os.mkfifo(fifo_file) + slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') + fifo_proc = self.startProcess( + ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), + shell=True) + capture_proc = self.runProcess(capture_command(cmd, + '-i', fifo_file, '-p', - '-w', '-', - '-c', '10', + '-w', testout_file, + '-a', 'duration:{}'.format(capture_duration), + )) + fifo_proc.kill() + self.assertTrue(os.path.isfile(testout_file)) + capture_returncode = capture_proc.returncode + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + self.checkPacketCount(8) + return check_capture_fifo_real + + +@fixtures.fixture +def check_capture_stdin(cmd_dumpcap): + # Capturing always requires dumpcap, hence the dependency on it. + def check_capture_stdin_real(self, cmd=None): + # Similar to suite_io.check_io_4_packets. + self.assertIsNotNone(cmd) + testout_file = self.filename_from_id(testout_pcap) + slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') + capture_cmd = capture_command(cmd, + '-i', '-', + '-w', testout_file, '-a', 'duration:{}'.format(capture_duration), - '-f', '"icmp || icmp6"', - '>', testout_file, shell=True - ), - shell=True ) - else: - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', config.capture_interface, + is_gui = type(cmd) != str and '-k' in cmd[0] + if is_gui: + capture_cmd += ' -o console.log.level:127' + pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + if is_gui: + self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.') + self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.') + self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.') + self.assertTrue(os.path.isfile(testout_file)) + if (pipe_returncode == 0): + self.checkPacketCount(8) + return check_capture_stdin_real + + +@fixtures.fixture +def check_capture_read_filter(capture_interface): + def check_capture_read_filter_real(self, cmd=None): + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + ping_procs = start_pinging(self) + testout_file = self.filename_from_id(testout_pcap) + capture_proc = self.runProcess(capture_command(cmd, + '-i', capture_interface, '-p', '-w', testout_file, + '-2', + '-R', 'dcerpc.cn_call_id==123456', # Something unlikely. '-c', '10', '-a', 'duration:{}'.format(capture_duration), '-f', 'icmp || icmp6', )) - capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) - if capture_returncode != 0: - self.log_fd.write('{} -D output:\n'.format(cmd)) - self.runProcess((cmd, '-D')) - self.assertEqual(capture_returncode, 0) - if (capture_returncode == 0): - self.checkPacketCount(10) - -def check_capture_fifo(self, cmd=None): - if not config.canMkfifo(): - self.skipTest('Test requires OS fifo support.') - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - self.assertIsNotNone(cmd) - capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') - testout_file = self.filename_from_id(testout_pcap) - fifo_file = self.filename_from_id('testout.fifo') - try: - # If a previous test left its fifo laying around, e.g. from a failure, remove it. - os.unlink(fifo_file) - except: - pass - os.mkfifo(fifo_file) - slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') - fifo_proc = self.startProcess( - ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), - shell=True) - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', fifo_file, - '-p', - '-w', testout_file, - '-a', 'duration:{}'.format(capture_duration), - )) - fifo_proc.kill() - self.assertTrue(os.path.isfile(testout_file)) - capture_returncode = capture_proc.returncode - self.assertEqual(capture_returncode, 0) - if (capture_returncode == 0): - self.checkPacketCount(8) - -def check_capture_stdin(self, cmd=None): - # Similar to suite_io.check_io_4_packets. - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - self.assertIsNotNone(cmd) - capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') - testout_file = self.filename_from_id(testout_pcap) - slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') - capture_cmd = subprocesstest.capture_command(cmd, - '-i', '-', - '-w', testout_file, - '-a', 'duration:{}'.format(capture_duration), - shell=True - ) - if cmd == config.cmd_wireshark: - capture_cmd += ' -o console.log.level:127' - pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True) - pipe_returncode = pipe_proc.returncode - self.assertEqual(pipe_returncode, 0) - if cmd == config.cmd_wireshark: - self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.') - self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.') - self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.') - self.assertTrue(os.path.isfile(testout_file)) - if (pipe_returncode == 0): - self.checkPacketCount(8) - -def check_capture_read_filter(self, cmd=None): - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) - self.assertIsNotNone(cmd) - ping_procs = start_pinging(self) - testout_file = self.filename_from_id(testout_pcap) - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', config.capture_interface, - '-p', - '-w', testout_file, - '-2', - '-R', 'dcerpc.cn_call_id==123456', # Something unlikely. - '-c', '10', - '-a', 'duration:{}'.format(capture_duration), - '-f', 'icmp || icmp6', - )) - capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) - self.assertEqual(capture_returncode, 0) - - if (capture_returncode == 0): - self.checkPacketCount(0) - -def check_capture_snapshot_len(self, cmd=None): - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) - self.assertIsNotNone(cmd) - ping_procs = start_pinging(self) - testout_file = self.filename_from_id(testout_pcap) - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', config.capture_interface, - '-p', - '-w', testout_file, - '-s', str(snapshot_len), - '-a', 'duration:{}'.format(capture_duration), - '-f', 'icmp || icmp6', - )) - capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) - self.assertEqual(capture_returncode, 0) - self.assertTrue(os.path.isfile(testout_file)) - - # Use tshark to filter out all packets larger than 68 bytes. - testout2_file = self.filename_from_id('testout2.pcap') - - filter_proc = self.runProcess((config.cmd_tshark, - '-r', testout_file, - '-w', testout2_file, - '-Y', 'frame.cap_len>{}'.format(snapshot_len), - )) - filter_returncode = filter_proc.returncode - self.assertEqual(capture_returncode, 0) - if (capture_returncode == 0): - self.checkPacketCount(0, cap_file=testout2_file) - -def check_dumpcap_autostop_stdin(self, packets=None, filesize=None): - # Similar to check_capture_stdin. - cmd = config.cmd_dumpcap - capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') - testout_file = self.filename_from_id(testout_pcap) - cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') - condition='oops:invalid' - - self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') - self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') - - if packets is not None: - condition = 'packets:{}'.format(packets) - elif filesize is not None: - condition = 'filesize:{}'.format(filesize) - - capture_cmd = subprocesstest.capture_command(cmd, - '-i', '-', - '-w', testout_file, - '-a', condition, - shell=True - ) - pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) - pipe_returncode = pipe_proc.returncode - self.assertEqual(pipe_returncode, 0) - self.assertTrue(os.path.isfile(testout_file)) - if (pipe_returncode != 0): - return - - if packets is not None: - self.checkPacketCount(packets) - elif filesize is not None: - capturekb = os.path.getsize(testout_file) / 1000 - self.assertGreaterEqual(capturekb, filesize) - -def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None): - # Similar to check_capture_stdin. - cmd = config.cmd_dumpcap - capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') - rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID - testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique) - testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique) - cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') - condition='oops:invalid' - - self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') - self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') - - if packets is not None: - condition = 'packets:{}'.format(packets) - elif filesize is not None: - condition = 'filesize:{}'.format(filesize) - - capture_cmd = subprocesstest.capture_command(cmd, - '-i', '-', - '-w', testout_file, - '-a', 'files:2', - '-b', condition, - shell=True - ) - pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) - pipe_returncode = pipe_proc.returncode - self.assertEqual(pipe_returncode, 0) - if (pipe_returncode != 0): - return - - rb_files = glob.glob(testout_glob) - for rbf in rb_files: - self.cleanup_files.append(rbf) - - self.assertEqual(len(rb_files), 2) - - for rbf in rb_files: - self.assertTrue(os.path.isfile(rbf)) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + self.assertEqual(capture_returncode, 0) + + if (capture_returncode == 0): + self.checkPacketCount(0) + return check_capture_read_filter_real + +@fixtures.fixture +def check_capture_snapshot_len(capture_interface, cmd_tshark): + def check_capture_snapshot_len_real(self, cmd=None): + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + ping_procs = start_pinging(self) + testout_file = self.filename_from_id(testout_pcap) + capture_proc = self.runProcess(capture_command(cmd, + '-i', capture_interface, + '-p', + '-w', testout_file, + '-s', str(snapshot_len), + '-a', 'duration:{}'.format(capture_duration), + '-f', 'icmp || icmp6', + )) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + self.assertEqual(capture_returncode, 0) + self.assertTrue(os.path.isfile(testout_file)) + + # Use tshark to filter out all packets larger than 68 bytes. + testout2_file = self.filename_from_id('testout2.pcap') + + filter_proc = self.runProcess((cmd_tshark, + '-r', testout_file, + '-w', testout2_file, + '-Y', 'frame.cap_len>{}'.format(snapshot_len), + )) + filter_returncode = filter_proc.returncode + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + self.checkPacketCount(0, cap_file=testout2_file) + return check_capture_snapshot_len_real + + +@fixtures.fixture +def check_dumpcap_autostop_stdin(cmd_dumpcap): + def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None): + # Similar to check_capture_stdin. + testout_file = self.filename_from_id(testout_pcap) + cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') + condition='oops:invalid' + + self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') + self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') + + if packets is not None: + condition = 'packets:{}'.format(packets) + elif filesize is not None: + condition = 'filesize:{}'.format(filesize) + + capture_cmd = ' '.join((cmd_dumpcap, + '-i', '-', + '-w', testout_file, + '-a', condition, + )) + pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + self.assertTrue(os.path.isfile(testout_file)) + if (pipe_returncode != 0): + return + if packets is not None: - self.checkPacketCount(packets, cap_file=rbf) + self.checkPacketCount(packets) elif filesize is not None: - capturekb = os.path.getsize(rbf) / 1000 + capturekb = os.path.getsize(testout_file) / 1000 self.assertGreaterEqual(capturekb, filesize) + return check_dumpcap_autostop_stdin_real + + +@fixtures.fixture +def check_dumpcap_ringbuffer_stdin(cmd_dumpcap): + def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None): + # Similar to check_capture_stdin. + rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID + testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique) + testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique) + cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') + condition='oops:invalid' + + self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') + self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') + if packets is not None: + condition = 'packets:{}'.format(packets) + elif filesize is not None: + condition = 'filesize:{}'.format(filesize) + + capture_cmd = ' '.join((cmd_dumpcap, + '-i', '-', + '-w', testout_file, + '-a', 'files:2', + '-b', condition, + )) + pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + if (pipe_returncode != 0): + return + + rb_files = glob.glob(testout_glob) + for rbf in rb_files: + self.cleanup_files.append(rbf) + + self.assertEqual(len(rb_files), 2) + + for rbf in rb_files: + self.assertTrue(os.path.isfile(rbf)) + if packets is not None: + self.checkPacketCount(packets, cap_file=rbf) + elif filesize is not None: + capturekb = os.path.getsize(rbf) / 1000 + self.assertGreaterEqual(capturekb, filesize) + return check_dumpcap_ringbuffer_stdin_real + + +@fixtures.mark_usefixtures('test_env') +@fixtures.uses_fixtures class case_wireshark_capture(subprocesstest.SubprocessTestCase): - def test_wireshark_capture_10_packets_to_file(self): + def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets): '''Capture 10 packets from the network to a file using Wireshark''' - check_capture_10_packets(self, cmd=config.cmd_wireshark) + check_capture_10_packets(self, cmd=wireshark_k) # Wireshark doesn't currently support writing to stdout while capturing. - # def test_wireshark_capture_10_packets_to_stdout(self): + # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets): # '''Capture 10 packets from the network to stdout using Wireshark''' - # check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True) + # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True) - def test_wireshark_capture_from_fifo(self): + def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo): '''Capture from a fifo using Wireshark''' - check_capture_fifo(self, cmd=config.cmd_wireshark) + check_capture_fifo(self, cmd=wireshark_k) - def test_wireshark_capture_from_stdin(self): + def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin): '''Capture from stdin using Wireshark''' - check_capture_stdin(self, cmd=config.cmd_wireshark) + check_capture_stdin(self, cmd=wireshark_k) - def test_wireshark_capture_snapshot_len(self): + def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len): '''Capture truncated packets using Wireshark''' - check_capture_snapshot_len(self, cmd=config.cmd_wireshark) + check_capture_snapshot_len(self, cmd=wireshark_k) + +@fixtures.mark_usefixtures('test_env') +@fixtures.uses_fixtures class case_tshark_capture(subprocesstest.SubprocessTestCase): - def test_tshark_capture_10_packets_to_file(self): + def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets): '''Capture 10 packets from the network to a file using TShark''' - check_capture_10_packets(self, cmd=config.cmd_tshark) + check_capture_10_packets(self, cmd=cmd_tshark) - def test_tshark_capture_10_packets_to_stdout(self): + def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets): '''Capture 10 packets from the network to stdout using TShark''' - check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True) + check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True) - def test_tshark_capture_from_fifo(self): + def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo): '''Capture from a fifo using TShark''' - check_capture_fifo(self, cmd=config.cmd_tshark) + check_capture_fifo(self, cmd=cmd_tshark) - def test_tshark_capture_from_stdin(self): + def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin): '''Capture from stdin using TShark''' - check_capture_stdin(self, cmd=config.cmd_tshark) + check_capture_stdin(self, cmd=cmd_tshark) - def test_tshark_capture_snapshot_len(self): + def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len): '''Capture truncated packets using TShark''' - check_capture_snapshot_len(self, cmd=config.cmd_tshark) + check_capture_snapshot_len(self, cmd=cmd_tshark) + +@fixtures.mark_usefixtures('base_env') +@fixtures.uses_fixtures class case_dumpcap_capture(subprocesstest.SubprocessTestCase): - def test_dumpcap_capture_10_packets_to_file(self): + def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets): '''Capture 10 packets from the network to a file using Dumpcap''' - check_capture_10_packets(self, cmd=config.cmd_dumpcap) + check_capture_10_packets(self, cmd=cmd_dumpcap) - def test_dumpcap_capture_10_packets_to_stdout(self): + def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets): '''Capture 10 packets from the network to stdout using Dumpcap''' - check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True) + check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True) - def test_dumpcap_capture_from_fifo(self): + def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo): '''Capture from a fifo using Dumpcap''' - check_capture_fifo(self, cmd=config.cmd_dumpcap) + check_capture_fifo(self, cmd=cmd_dumpcap) - def test_dumpcap_capture_from_stdin(self): + def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin): '''Capture from stdin using Dumpcap''' - check_capture_stdin(self, cmd=config.cmd_dumpcap) + check_capture_stdin(self, cmd=cmd_dumpcap) - def test_dumpcap_capture_snapshot_len(self): + def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap): '''Capture truncated packets using Dumpcap''' - check_capture_snapshot_len(self, cmd=config.cmd_dumpcap) + check_capture_snapshot_len(self, cmd=cmd_dumpcap) + +@fixtures.mark_usefixtures('base_env') +@fixtures.uses_fixtures class case_dumpcap_autostop(subprocesstest.SubprocessTestCase): # duration, filesize, packets, files - def test_dumpcap_autostop_filesize(self): + def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin): '''Capture from stdin using Dumpcap until we reach a file size limit''' check_dumpcap_autostop_stdin(self, filesize=15) - def test_dumpcap_autostop_packets(self): + def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin): '''Capture from stdin using Dumpcap until we reach a packet limit''' check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary. + +@fixtures.mark_usefixtures('base_env') +@fixtures.uses_fixtures class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase): # duration, interval, filesize, packets, files # Need a function that finds ringbuffer file names. - def test_dumpcap_ringbuffer_filesize(self): + def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin): '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit''' check_dumpcap_ringbuffer_stdin(self, filesize=15) - def test_dumpcap_ringbuffer_packets(self): + def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin): '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit''' check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary. diff --git a/test/suite_clopts.py b/test/suite_clopts.py index 9be3e13079..722769b08d 100644 --- a/test/suite_clopts.py +++ b/test/suite_clopts.py @@ -9,10 +9,8 @@ # '''Command line option tests''' -import config import subprocess import subprocesstest -import unittest import fixtures #glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs') @@ -44,36 +42,31 @@ class case_dumpcap_options(subprocesstest.SubprocessTestCase): self.assertIn(process.returncode, valid_returns) +@fixtures.mark_usefixtures('base_env') @fixtures.uses_fixtures class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase): - def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, base_env): + def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, capture_interface): '''Invalid capture filter''' - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') invalid_filter = '__invalid_protocol' # $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1 testout_file = self.filename_from_id(testout_pcap) - self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file), env=base_env) + self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file)) self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface')) - def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, base_env): + def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, capture_interface): '''Invalid capture interface name''' - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') invalid_interface = '__invalid_interface' # $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1 testout_file = self.filename_from_id(testout_pcap) - self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file), env=base_env) + self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file)) self.assertTrue(self.grepOutput('The capture session could not be initiated')) - def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, base_env): + def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, capture_interface): '''Invalid capture interface index''' - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') invalid_index = '0' # $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1 testout_file = self.filename_from_id(testout_pcap) - self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file), env=base_env) + self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file)) self.assertTrue(self.grepOutput('There is no interface with that adapter index')) @@ -106,8 +99,9 @@ class case_tshark_options(subprocesstest.SubprocessTestCase): self.assertRun((cmd_tshark, '-' + char_arg)) # XXX Should we generate individual test functions instead of looping? - def test_tshark_interface_chars(self, cmd_tshark): + def test_tshark_interface_chars(self, cmd_tshark, cmd_dumpcap): '''Valid tshark parameters requiring capture permissions''' + # These options require dumpcap valid_returns = [self.exit_ok, self.exit_error] for char_arg in 'DL': process = self.runProcess((cmd_tshark, '-' + char_arg)) @@ -117,30 +111,24 @@ class case_tshark_options(subprocesstest.SubprocessTestCase): @fixtures.mark_usefixtures('test_env') @fixtures.uses_fixtures class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase): - def test_tshark_invalid_capfilter(self, cmd_tshark): + def test_tshark_invalid_capfilter(self, cmd_tshark, capture_interface): '''Invalid capture filter''' - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') invalid_filter = '__invalid_protocol' # $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1 testout_file = self.filename_from_id(testout_pcap) self.runProcess((cmd_tshark, '-f', invalid_filter, '-w', testout_file )) self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface')) - def test_tshark_invalid_interface_name(self, cmd_tshark): + def test_tshark_invalid_interface_name(self, cmd_tshark, capture_interface): '''Invalid capture interface name''' - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') invalid_interface = '__invalid_interface' # $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1 testout_file = self.filename_from_id(testout_pcap) self.runProcess((cmd_tshark, '-i', invalid_interface, '-w', testout_file)) self.assertTrue(self.grepOutput('The capture session could not be initiated')) - def test_tshark_invalid_interface_index(self, cmd_tshark): + def test_tshark_invalid_interface_index(self, cmd_tshark, capture_interface): '''Invalid capture interface index''' - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') invalid_index = '0' # $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1 testout_file = self.filename_from_id(testout_pcap) @@ -151,9 +139,7 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase): @fixtures.mark_usefixtures('test_env') @fixtures.uses_fixtures class case_tshark_name_resolution_clopts(subprocesstest.SubprocessTestCase): - def test_tshark_valid_name_resolution(self, cmd_tshark): - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') + def test_tshark_valid_name_resolution(self, cmd_tshark, capture_interface): # $TSHARK -N mnNtdv -a duration:1 > ./testout.txt 2>&1 self.assertRun((cmd_tshark, '-N', 'mnNtdv', '-a', 'duration: 1')) diff --git a/test/suite_fileformats.py b/test/suite_fileformats.py index 86be14d378..1d482dff43 100644 --- a/test/suite_fileformats.py +++ b/test/suite_fileformats.py @@ -34,29 +34,29 @@ def fileformats_baseline_str(dirs): class case_fileformat_pcap(subprocesstest.SubprocessTestCase): def test_pcap_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str): '''Microsecond pcap direct vs microsecond pcap stdin''' - capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark, + capture_proc = self.runProcess(' '.join((cmd_tshark, '-r', '-', '-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', '<', capture_file('dhcp.pcap') - , shell=True), + )), shell=True) self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) def test_pcap_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str): '''Microsecond pcap direct vs nanosecond pcap stdin''' - capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark, + capture_proc = self.runProcess(' '.join((cmd_tshark, '-r', '-', '-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', '<', capture_file('dhcp-nanosecond.pcap') - , shell=True), + )), shell=True) self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) def test_pcap_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str): '''Microsecond pcap direct vs nanosecond pcap direct''' - capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark, + capture_proc = self.runProcess((cmd_tshark, '-r', capture_file('dhcp-nanosecond.pcap'), '-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', @@ -70,18 +70,18 @@ class case_fileformat_pcap(subprocesstest.SubprocessTestCase): class case_fileformat_pcapng(subprocesstest.SubprocessTestCase): def test_pcapng_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str): '''Microsecond pcap direct vs microsecond pcapng stdin''' - capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark, + capture_proc = self.runProcess(' '.join((cmd_tshark, '-r', '-', '-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta' '<', capture_file('dhcp.pcapng') - , shell=True), + )), shell=True) self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) def test_pcapng_usec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str): '''Microsecond pcap direct vs microsecond pcapng direct''' - capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark, + capture_proc = self.runProcess((cmd_tshark, '-r', capture_file('dhcp.pcapng'), '-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', @@ -91,18 +91,18 @@ class case_fileformat_pcapng(subprocesstest.SubprocessTestCase): def test_pcapng_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str): '''Microsecond pcap direct vs nanosecond pcapng stdin''' - capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark, + capture_proc = self.runProcess(' '.join((cmd_tshark, '-r', '-', '-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta' '<', capture_file('dhcp-nanosecond.pcapng') - , shell=True), + )), shell=True) self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) def test_pcapng_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str): '''Microsecond pcap direct vs nanosecond pcapng direct''' - capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark, + capture_proc = self.runProcess((cmd_tshark, '-r', capture_file('dhcp-nanosecond.pcapng'), '-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', diff --git a/test/suite_io.py b/test/suite_io.py index 9561b6418e..9b0fa9cf42 100644 --- a/test/suite_io.py +++ b/test/suite_io.py @@ -47,7 +47,7 @@ def check_io_4_packets(self, capture_file, cmd=None, from_stdin=False, to_stdout io_proc = self.runProcess(stdout_cmd, shell=True) else: # direct->direct # $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w ./testout.pcap > ./testout.txt 2>&1 - io_proc = self.runProcess(subprocesstest.capture_command(cmd, + io_proc = self.runProcess((cmd, '-r', capture_file('dhcp.pcap'), '-w', testout_file, )) diff --git a/test/suite_unittests.py b/test/suite_unittests.py index 94621ed7fa..6200dcfcba 100644 --- a/test/suite_unittests.py +++ b/test/suite_unittests.py @@ -11,7 +11,6 @@ # '''EPAN unit tests''' -import config import difflib import os.path import re @@ -53,7 +52,7 @@ class case_unittests(subprocesstest.SubprocessTestCase): '''fieldcount''' self.assertRun((cmd_tshark, '-G', 'fieldcount'), env=test_env) - def test_unit_ctest_coverage(self): + def test_unit_ctest_coverage(self, all_test_groups): '''Make sure CTest runs all of our tests.''' with open(os.path.join(os.path.dirname(__file__), '..', 'CMakeLists.txt')) as cml_fd: group_re = re.compile(r'set *\( *_test_group_list') @@ -68,8 +67,8 @@ class case_unittests(subprocesstest.SubprocessTestCase): break cml_groups.append(cml_line.strip()) cml_groups.sort() - if not config.all_groups == cml_groups: - diff = '\n'.join(list(difflib.unified_diff(config.all_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups'))) + if not all_test_groups == cml_groups: + diff = '\n'.join(list(difflib.unified_diff(all_test_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups'))) self.fail("CMakeLists.txt doesn't test all available groups:\n" + diff) diff --git a/test/test.py b/test/test.py index c81f14c367..f775bae85c 100755 --- a/test/test.py +++ b/test/test.py @@ -13,17 +13,22 @@ # To do: # - Avoid printing Python tracebacks when we assert? It looks like we'd need # to override unittest.TextTestResult.addFailure(). -# - Remove BIN_PATH/hosts via config.tearDownHostFiles + case_name_resolution.tearDownClass? import argparse -import config import os.path import sys import unittest +import fixtures # Required to make fixtures available to tests! import fixtures_ws +_all_test_groups = None + +@fixtures.fixture(scope='session') +def all_test_groups(): + return _all_test_groups + def find_test_ids(suite, all_ids): if hasattr(suite, '__iter__'): for s in suite: @@ -47,10 +52,9 @@ def main(): parser = argparse.ArgumentParser(description='Wireshark unit tests') cap_group = parser.add_mutually_exclusive_group() - cap_group.add_argument('-e', '--enable-capture', action='store_true', help='Enable capture tests') cap_group.add_argument('-E', '--disable-capture', action='store_true', help='Disable capture tests') - cap_group.add_argument('-i', '--capture-interface', nargs=1, default=None, help='Capture interface index or name') - parser.add_argument('-p', '--program-path', nargs=1, default=os.path.curdir, help='Path to Wireshark executables.') + cap_group.add_argument('-i', '--capture-interface', help='Capture interface index or name') + parser.add_argument('-p', '--program-path', default=os.path.curdir, help='Path to Wireshark executables.') list_group = parser.add_mutually_exclusive_group() list_group.add_argument('-l', '--list', action='store_true', help='List tests. One of "all" or a full or partial test name.') list_group.add_argument('--list-suites', action='store_true', help='List all suites.') @@ -60,14 +64,6 @@ def main(): parser.add_argument('tests_to_run', nargs='*', metavar='test', default=['all'], help='Tests to run. One of "all" or a full or partial test name. Default is "all".') args = parser.parse_args() - if args.enable_capture: - config.setCanCapture(True) - elif args.disable_capture: - config.setCanCapture(False) - - if args.capture_interface: - config.setCaptureInterface(args.capture_interface[0]) - all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*') all_ids = [] @@ -96,8 +92,7 @@ def main(): for aid in all_ids: aparts = aid.split('.') all_suites |= {aparts[0]} - config.all_suites = list(all_suites) - config.all_suites.sort() + all_suites = sorted(all_suites) all_groups = set() for aid in all_ids: @@ -106,15 +101,16 @@ def main(): all_groups |= {'.'.join(aparts[:2])} else: all_groups |= {aparts[0]} - config.all_groups = list(all_groups) - config.all_groups.sort() + all_groups = sorted(all_groups) + global _all_test_groups + _all_test_groups = all_groups if args.list_suites: - print('\n'.join(config.all_suites)) + print('\n'.join(all_suites)) sys.exit(0) if args.list_groups: - print('\n'.join(config.all_groups)) + print('\n'.join(all_groups)) sys.exit(0) if args.list_cases: @@ -125,13 +121,6 @@ def main(): print('\n'.join(list(cases))) sys.exit(0) - program_path = args.program_path[0] - if not config.setProgramPath(program_path): - print('One or more required executables not found at {}\n'.format(program_path)) - parser.print_usage() - sys.exit(1) - - # if sys.stdout.encoding != 'UTF-8': import codecs import locale @@ -142,7 +131,7 @@ def main(): run_suite = unittest.defaultTestLoader.loadTestsFromNames(run_ids) runner = unittest.TextTestRunner(verbosity=args.verbose) # for unittest compatibility (not needed with pytest) - fixtures_ws.fixtures.create_session() + fixtures_ws.fixtures.create_session(args) try: test_result = runner.run(run_suite) finally: |