aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2018-11-15 18:44:59 +0100
committerPeter Wu <peter@lekensteyn.nl>2018-11-16 13:55:28 +0000
commit3ab521118a0d068e0d5c795a5a57b13cd7790a75 (patch)
tree72900c817752260ab91945cc85302b78f8eaffe6 /test
parentcb9be3850da46ca1f706a00b16cdb65a95ce66a0 (diff)
test: convert capture tests to use fixtures, fix tests without dumpcap
Add a new --capture-interface option to pytest, similar to test.py. It will grab some Ethernet interface on Windows. An empty value overrides this and disables capture tests. Remove the test.py --enable-capture option since that is implied by the --capture-interface option. Port the `test.py --program-path` option to pytest and additionally make the pytest look in the current working directory if neither WS_BIN_PATH nor --program-path are specified. Drop config.setProgramPath, this allows tests to be run even if not all binaries are available. With all capture tests converted to fixtures, it is now possible to run tests when Wireshark is not built with libpcap as tests that depend on cmd_dumpcap (or capture_interface) will be skipped. Bug: 14949 Change-Id: Ie802c07904936de4cd30a4c68b6a5139e6680fbd Reviewed-on: https://code.wireshark.org/review/30656 Petri-Dish: Peter Wu <peter@lekensteyn.nl> Tested-by: Petri Dish Buildbot Reviewed-by: Peter Wu <peter@lekensteyn.nl>
Diffstat (limited to 'test')
-rw-r--r--test/conftest.py29
-rw-r--r--test/fixtures.py43
-rw-r--r--test/fixtures_ws.py58
-rw-r--r--test/subprocesstest.py23
-rw-r--r--test/suite_capture.py565
-rw-r--r--test/suite_clopts.py40
-rw-r--r--test/suite_fileformats.py22
-rw-r--r--test/suite_io.py2
-rw-r--r--test/suite_unittests.py7
-rwxr-xr-xtest/test.py43
10 files changed, 454 insertions, 378 deletions
diff --git a/test/conftest.py b/test/conftest.py
index a09e86c485..c2646bf7a8 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -8,31 +8,34 @@
#
'''py.test configuration'''
-import os
-import sys
import fixtures
-import config
-# XXX remove globals in config and create py.test-specific fixtures
-try:
- _program_path = os.environ['WS_BIN_PATH']
-except KeyError:
- print('Please set env var WS_BIN_PATH to the run directory with binaries')
- sys.exit(1)
-if not config.setProgramPath(_program_path):
- print('One or more required executables not found at {}\n'.format(_program_path))
- sys.exit(1)
+def pytest_addoption(parser):
+ parser.addoption('--disable-capture', action='store_true',
+ help='Disable capture tests'
+ )
+ parser.addoption('--capture-interface',
+ help='Capture interface index or name.'
+ )
+ parser.addoption('--program-path', help='Path to Wireshark executables.')
+
+_all_test_groups = None
# this is set only to please case_unittests.test_unit_ctest_coverage
def pytest_collection_modifyitems(items):
'''Find all test groups.'''
+ global _all_test_groups
suites = []
for item in items:
name = item.nodeid.split("::")[0].replace(".py", "").replace("/", ".")
if name not in suites:
suites.append(name)
- config.all_groups = list(sorted(suites))
+ _all_test_groups = sorted(suites)
# Must enable pytest before importing fixtures_ws.
fixtures.enable_pytest()
from fixtures_ws import *
+
+@fixtures.fixture(scope='session')
+def all_test_groups():
+ return _all_test_groups
diff --git a/test/fixtures.py b/test/fixtures.py
index 26c0ed371f..1dbfe40977 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -7,6 +7,7 @@
# SPDX-License-Identifier: (GPL-2.0-or-later OR MIT)
#
+import argparse
import functools
import inspect
import sys
@@ -197,10 +198,13 @@ class _ExecutionScope(object):
def cached_result(self, spec):
'''Obtain the cached result for a previously executed fixture.'''
- value, exc = self._find_scope(spec.scope).cache[spec.name]
+ entry = self._find_scope(spec.scope).cache.get(spec.name)
+ if not entry:
+ return None, False
+ value, exc = entry
if exc:
raise exc
- return value
+ return value, True
def _execute_one(self, spec, test_fn):
# A fixture can only execute in the same or earlier scopes
@@ -265,7 +269,15 @@ class _FixtureRequest(object):
if not spec:
assert argname == 'request'
return self
- return self._context.cached_result(spec)
+ value, ok = self._context.cached_result(spec)
+ if not ok:
+ # If getfixturevalue is called directly from a setUp function, the
+ # fixture value might not have computed before, so evaluate it now.
+ # As the test function is not available, use None.
+ self._context.execute(spec, test_fn=None)
+ value, ok = self._context.cached_result(spec)
+ assert ok, 'Failed to execute fixture %s' % (spec,)
+ return value
def destroy(self):
self._context.destroy()
@@ -277,6 +289,11 @@ class _FixtureRequest(object):
def instance(self):
return self.function.__self__
+ @property
+ def config(self):
+ '''The pytest config object associated with this request.'''
+ return _config
+
def _patch_unittest_testcase_class(cls):
'''
@@ -304,8 +321,20 @@ def _patch_unittest_testcase_class(cls):
cls._orig_tearDown, cls.tearDown = cls.tearDown, tearDown
+class _Config(object):
+ def __init__(self, args):
+ assert isinstance(args, argparse.Namespace)
+ self.args = args
+
+ def getoption(self, name, default):
+ '''Partial emulation for pytest Config.getoption.'''
+ name = name.lstrip('-').replace('-', '_')
+ return getattr(self.args, name, default)
+
+
_fallback = None
_session_context = None
+_config = None
def init_fallback_fixtures_once():
@@ -317,10 +346,14 @@ def init_fallback_fixtures_once():
# Register standard fixtures here as needed
-def create_session():
- global _session_context
+def create_session(args=None):
+ '''Start a test session where args is from argparse.'''
+ global _session_context, _config
assert not _use_native_pytest
_session_context = _ExecutionScope('session', None)
+ if args is None:
+ args = argparse.Namespace()
+ _config = _Config(args)
def destroy_session():
diff --git a/test/fixtures_ws.py b/test/fixtures_ws.py
index 18be13631f..55e3e2151c 100644
--- a/test/fixtures_ws.py
+++ b/test/fixtures_ws.py
@@ -22,9 +22,51 @@ import subprocesstest
@fixtures.fixture(scope='session')
-def program_path():
- # XXX stop using config
- return config.program_path
+def capture_interface(request, cmd_dumpcap):
+ '''
+ Name of capture interface. Tests will be skipped if dumpcap is not
+ available or if the capture interface is unknown.
+ '''
+ iface = request.config.getoption('--capture-interface', default=None)
+ disabled = request.config.getoption('--disable-capture', default=False)
+ if disabled:
+ fixtures.skip('Capture tests are disabled via --disable-capture')
+ if iface:
+ # If a non-empty interface is given, assume capturing is possible.
+ return iface
+ else:
+ if sys.platform == 'win32':
+ patterns = '.*(Ethernet|Network Connection|VMware|Intel)'
+ else:
+ patterns = None
+ if patterns:
+ try:
+ output = subprocess.check_output((cmd_dumpcap, '-D'),
+ stderr=subprocess.DEVNULL,
+ universal_newlines=True)
+ m = re.search(r'^(\d+)\. %s' % patterns, output, re.MULTILINE)
+ if m:
+ return m.group(1)
+ except subprocess.CalledProcessError:
+ pass
+ fixtures.skip('Test requires capture privileges and an interface.')
+
+
+@fixtures.fixture(scope='session')
+def program_path(request):
+ '''
+ Path to the Wireshark binaries as set by the --program-path option, the
+ WS_BIN_PATH environment variable or (curdir)/run.
+ '''
+ paths = (
+ request.config.getoption('--program-path', default=None),
+ os.environ.get('WS_BIN_PATH'),
+ os.path.join(os.curdir, 'run'),
+ )
+ for path in paths:
+ if type(path) == str and os.path.isdir(path):
+ return path
+ raise AssertionError('Missing directory with Wireshark binaries')
@fixtures.fixture(scope='session')
@@ -76,6 +118,14 @@ def cmd_wireshark(program):
@fixtures.fixture(scope='session')
+def wireshark_command(cmd_wireshark):
+ if sys.platform not in ('win32', 'darwin'):
+ # TODO check DISPLAY for X11 on Linux or BSD?
+ fixtures.skip('Wireshark GUI tests requires DISPLAY')
+ return (cmd_wireshark, '-ogui.update.enabled:FALSE')
+
+
+@fixtures.fixture(scope='session')
def features(cmd_tshark):
'''Returns an object describing available features in tshark.'''
try:
@@ -182,5 +232,3 @@ def test_env(base_env, conf_path, request):
# Inject the test environment as default if it was not overridden.
request.instance.injected_test_env = env
return env
-
-# XXX capture: capture_interface
diff --git a/test/subprocesstest.py b/test/subprocesstest.py
index 911868624b..060ebeb3a4 100644
--- a/test/subprocesstest.py
+++ b/test/subprocesstest.py
@@ -27,23 +27,6 @@ import unittest
# the command line.
process_timeout = 300 # Seconds
-def capture_command(cmd, *args, **kwargs):
- '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
-
- If shell is true, return a string. Otherwise, return a list of arguments.'''
- shell = kwargs.pop('shell', False)
- if shell:
- cap_cmd = ['"' + cmd + '"']
- else:
- cap_cmd = [cmd]
- if cmd == config.cmd_wireshark:
- cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
- cap_cmd += args
- if shell:
- return ' '.join(cap_cmd)
- else:
- return cap_cmd
-
def cat_dhcp_command(mode):
'''Create a command string for dumping dhcp.pcap to stdout'''
# XXX Do this in Python in a thread?
@@ -196,10 +179,12 @@ class SubprocessTestCase(unittest.TestCase):
capinfos_args must be a sequence.
Default cap_file is <test id>.testout.pcap.'''
+ # XXX convert users to use a new fixture instead of this function.
+ cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
if not cap_file:
cap_file = self.filename_from_id('testout.pcap')
- self.log_fd.write('\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
- capinfos_cmd = [config.cmd_capinfos]
+ self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
+ capinfos_cmd = [cmd_capinfos]
if capinfos_args is not None:
capinfos_cmd += capinfos_args
capinfos_cmd.append(cap_file)
diff --git a/test/suite_capture.py b/test/suite_capture.py
index eb1a4ffcab..a9ac1aa9c0 100644
--- a/test/suite_capture.py
+++ b/test/suite_capture.py
@@ -12,13 +12,12 @@
import config
import glob
import os
-import re
import subprocess
import subprocesstest
import sys
import time
-import unittest
import uuid
+import fixtures
capture_duration = 5
@@ -40,331 +39,365 @@ def stop_pinging(ping_procs):
for proc in ping_procs:
proc.kill()
-def check_capture_10_packets(self, cmd=None, to_stdout=False):
- # Similar to suite_io.check_io_4_packets.
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
- self.assertIsNotNone(cmd)
- testout_file = self.filename_from_id(testout_pcap)
- ping_procs = start_pinging(self)
- if to_stdout:
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', '"{}"'.format(config.capture_interface),
+
+@fixtures.fixture(scope='session')
+def wireshark_k(wireshark_command):
+ return tuple(list(wireshark_command) + ['-k'])
+
+def capture_command(*cmd_args, shell=False):
+ if type(cmd_args[0]) != str:
+ # Assume something like ['wireshark', '-k']
+ cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
+ if shell:
+ cmd_args = ' '.join(cmd_args)
+ return cmd_args
+
+
+@fixtures.fixture
+def check_capture_10_packets(capture_interface, cmd_dumpcap):
+ def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
+ # Similar to suite_io.check_io_4_packets.
+ if not config.args_ping:
+ self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+ self.assertIsNotNone(cmd)
+ testout_file = self.filename_from_id(testout_pcap)
+ ping_procs = start_pinging(self)
+ if to_stdout:
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', '"{}"'.format(capture_interface),
+ '-p',
+ '-w', '-',
+ '-c', '10',
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', '"icmp || icmp6"',
+ '>', testout_file,
+ shell=True
+ ),
+ shell=True
+ )
+ else:
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', capture_interface,
+ '-p',
+ '-w', testout_file,
+ '-c', '10',
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', 'icmp || icmp6',
+ ))
+ capture_returncode = capture_proc.returncode
+ stop_pinging(ping_procs)
+ if capture_returncode != 0:
+ self.log_fd.write('{} -D output:\n'.format(cmd))
+ self.runProcess((cmd, '-D'))
+ self.assertEqual(capture_returncode, 0)
+ if (capture_returncode == 0):
+ self.checkPacketCount(10)
+ return check_capture_10_packets_real
+
+
+@fixtures.fixture
+def check_capture_fifo(cmd_dumpcap):
+ if sys.platform == 'win32':
+ fixtures.skip('Test requires OS fifo support.')
+
+ def check_capture_fifo_real(self, cmd=None):
+ self.assertIsNotNone(cmd)
+ testout_file = self.filename_from_id(testout_pcap)
+ fifo_file = self.filename_from_id('testout.fifo')
+ try:
+ # If a previous test left its fifo laying around, e.g. from a failure, remove it.
+ os.unlink(fifo_file)
+ except:
+ pass
+ os.mkfifo(fifo_file)
+ slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+ fifo_proc = self.startProcess(
+ ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
+ shell=True)
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', fifo_file,
'-p',
- '-w', '-',
- '-c', '10',
+ '-w', testout_file,
+ '-a', 'duration:{}'.format(capture_duration),
+ ))
+ fifo_proc.kill()
+ self.assertTrue(os.path.isfile(testout_file))
+ capture_returncode = capture_proc.returncode
+ self.assertEqual(capture_returncode, 0)
+ if (capture_returncode == 0):
+ self.checkPacketCount(8)
+ return check_capture_fifo_real
+
+
+@fixtures.fixture
+def check_capture_stdin(cmd_dumpcap):
+ # Capturing always requires dumpcap, hence the dependency on it.
+ def check_capture_stdin_real(self, cmd=None):
+ # Similar to suite_io.check_io_4_packets.
+ self.assertIsNotNone(cmd)
+ testout_file = self.filename_from_id(testout_pcap)
+ slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+ capture_cmd = capture_command(cmd,
+ '-i', '-',
+ '-w', testout_file,
'-a', 'duration:{}'.format(capture_duration),
- '-f', '"icmp || icmp6"',
- '>', testout_file,
shell=True
- ),
- shell=True
)
- else:
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', config.capture_interface,
+ is_gui = type(cmd) != str and '-k' in cmd[0]
+ if is_gui:
+ capture_cmd += ' -o console.log.level:127'
+ pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_returncode = pipe_proc.returncode
+ self.assertEqual(pipe_returncode, 0)
+ if is_gui:
+ self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
+ self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
+ self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
+ self.assertTrue(os.path.isfile(testout_file))
+ if (pipe_returncode == 0):
+ self.checkPacketCount(8)
+ return check_capture_stdin_real
+
+
+@fixtures.fixture
+def check_capture_read_filter(capture_interface):
+ def check_capture_read_filter_real(self, cmd=None):
+ if not config.args_ping:
+ self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+ self.assertIsNotNone(cmd)
+ ping_procs = start_pinging(self)
+ testout_file = self.filename_from_id(testout_pcap)
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', capture_interface,
'-p',
'-w', testout_file,
+ '-2',
+ '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
'-f', 'icmp || icmp6',
))
- capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
- if capture_returncode != 0:
- self.log_fd.write('{} -D output:\n'.format(cmd))
- self.runProcess((cmd, '-D'))
- self.assertEqual(capture_returncode, 0)
- if (capture_returncode == 0):
- self.checkPacketCount(10)
-
-def check_capture_fifo(self, cmd=None):
- if not config.canMkfifo():
- self.skipTest('Test requires OS fifo support.')
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- self.assertIsNotNone(cmd)
- capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
- testout_file = self.filename_from_id(testout_pcap)
- fifo_file = self.filename_from_id('testout.fifo')
- try:
- # If a previous test left its fifo laying around, e.g. from a failure, remove it.
- os.unlink(fifo_file)
- except:
- pass
- os.mkfifo(fifo_file)
- slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
- fifo_proc = self.startProcess(
- ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
- shell=True)
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', fifo_file,
- '-p',
- '-w', testout_file,
- '-a', 'duration:{}'.format(capture_duration),
- ))
- fifo_proc.kill()
- self.assertTrue(os.path.isfile(testout_file))
- capture_returncode = capture_proc.returncode
- self.assertEqual(capture_returncode, 0)
- if (capture_returncode == 0):
- self.checkPacketCount(8)
-
-def check_capture_stdin(self, cmd=None):
- # Similar to suite_io.check_io_4_packets.
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- self.assertIsNotNone(cmd)
- capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
- testout_file = self.filename_from_id(testout_pcap)
- slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
- capture_cmd = subprocesstest.capture_command(cmd,
- '-i', '-',
- '-w', testout_file,
- '-a', 'duration:{}'.format(capture_duration),
- shell=True
- )
- if cmd == config.cmd_wireshark:
- capture_cmd += ' -o console.log.level:127'
- pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
- pipe_returncode = pipe_proc.returncode
- self.assertEqual(pipe_returncode, 0)
- if cmd == config.cmd_wireshark:
- self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
- self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
- self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
- self.assertTrue(os.path.isfile(testout_file))
- if (pipe_returncode == 0):
- self.checkPacketCount(8)
-
-def check_capture_read_filter(self, cmd=None):
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
- self.assertIsNotNone(cmd)
- ping_procs = start_pinging(self)
- testout_file = self.filename_from_id(testout_pcap)
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', config.capture_interface,
- '-p',
- '-w', testout_file,
- '-2',
- '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
- '-c', '10',
- '-a', 'duration:{}'.format(capture_duration),
- '-f', 'icmp || icmp6',
- ))
- capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
- self.assertEqual(capture_returncode, 0)
-
- if (capture_returncode == 0):
- self.checkPacketCount(0)
-
-def check_capture_snapshot_len(self, cmd=None):
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
- self.assertIsNotNone(cmd)
- ping_procs = start_pinging(self)
- testout_file = self.filename_from_id(testout_pcap)
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', config.capture_interface,
- '-p',
- '-w', testout_file,
- '-s', str(snapshot_len),
- '-a', 'duration:{}'.format(capture_duration),
- '-f', 'icmp || icmp6',
- ))
- capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
- self.assertEqual(capture_returncode, 0)
- self.assertTrue(os.path.isfile(testout_file))
-
- # Use tshark to filter out all packets larger than 68 bytes.
- testout2_file = self.filename_from_id('testout2.pcap')
-
- filter_proc = self.runProcess((config.cmd_tshark,
- '-r', testout_file,
- '-w', testout2_file,
- '-Y', 'frame.cap_len>{}'.format(snapshot_len),
- ))
- filter_returncode = filter_proc.returncode
- self.assertEqual(capture_returncode, 0)
- if (capture_returncode == 0):
- self.checkPacketCount(0, cap_file=testout2_file)
-
-def check_dumpcap_autostop_stdin(self, packets=None, filesize=None):
- # Similar to check_capture_stdin.
- cmd = config.cmd_dumpcap
- capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
- testout_file = self.filename_from_id(testout_pcap)
- cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
- condition='oops:invalid'
-
- self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
- self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
-
- if packets is not None:
- condition = 'packets:{}'.format(packets)
- elif filesize is not None:
- condition = 'filesize:{}'.format(filesize)
-
- capture_cmd = subprocesstest.capture_command(cmd,
- '-i', '-',
- '-w', testout_file,
- '-a', condition,
- shell=True
- )
- pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
- pipe_returncode = pipe_proc.returncode
- self.assertEqual(pipe_returncode, 0)
- self.assertTrue(os.path.isfile(testout_file))
- if (pipe_returncode != 0):
- return
-
- if packets is not None:
- self.checkPacketCount(packets)
- elif filesize is not None:
- capturekb = os.path.getsize(testout_file) / 1000
- self.assertGreaterEqual(capturekb, filesize)
-
-def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None):
- # Similar to check_capture_stdin.
- cmd = config.cmd_dumpcap
- capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
- rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
- testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
- testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
- cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
- condition='oops:invalid'
-
- self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
- self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
-
- if packets is not None:
- condition = 'packets:{}'.format(packets)
- elif filesize is not None:
- condition = 'filesize:{}'.format(filesize)
-
- capture_cmd = subprocesstest.capture_command(cmd,
- '-i', '-',
- '-w', testout_file,
- '-a', 'files:2',
- '-b', condition,
- shell=True
- )
- pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
- pipe_returncode = pipe_proc.returncode
- self.assertEqual(pipe_returncode, 0)
- if (pipe_returncode != 0):
- return
-
- rb_files = glob.glob(testout_glob)
- for rbf in rb_files:
- self.cleanup_files.append(rbf)
-
- self.assertEqual(len(rb_files), 2)
-
- for rbf in rb_files:
- self.assertTrue(os.path.isfile(rbf))
+ capture_returncode = capture_proc.returncode
+ stop_pinging(ping_procs)
+ self.assertEqual(capture_returncode, 0)
+
+ if (capture_returncode == 0):
+ self.checkPacketCount(0)
+ return check_capture_read_filter_real
+
+@fixtures.fixture
+def check_capture_snapshot_len(capture_interface, cmd_tshark):
+ def check_capture_snapshot_len_real(self, cmd=None):
+ if not config.args_ping:
+ self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+ self.assertIsNotNone(cmd)
+ ping_procs = start_pinging(self)
+ testout_file = self.filename_from_id(testout_pcap)
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', capture_interface,
+ '-p',
+ '-w', testout_file,
+ '-s', str(snapshot_len),
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', 'icmp || icmp6',
+ ))
+ capture_returncode = capture_proc.returncode
+ stop_pinging(ping_procs)
+ self.assertEqual(capture_returncode, 0)
+ self.assertTrue(os.path.isfile(testout_file))
+
+ # Use tshark to filter out all packets larger than 68 bytes.
+ testout2_file = self.filename_from_id('testout2.pcap')
+
+ filter_proc = self.runProcess((cmd_tshark,
+ '-r', testout_file,
+ '-w', testout2_file,
+ '-Y', 'frame.cap_len>{}'.format(snapshot_len),
+ ))
+ filter_returncode = filter_proc.returncode
+ self.assertEqual(capture_returncode, 0)
+ if (capture_returncode == 0):
+ self.checkPacketCount(0, cap_file=testout2_file)
+ return check_capture_snapshot_len_real
+
+
+@fixtures.fixture
+def check_dumpcap_autostop_stdin(cmd_dumpcap):
+ def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
+ # Similar to check_capture_stdin.
+ testout_file = self.filename_from_id(testout_pcap)
+ cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
+ condition='oops:invalid'
+
+ self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
+ self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
+
+ if packets is not None:
+ condition = 'packets:{}'.format(packets)
+ elif filesize is not None:
+ condition = 'filesize:{}'.format(filesize)
+
+ capture_cmd = ' '.join((cmd_dumpcap,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', condition,
+ ))
+ pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_returncode = pipe_proc.returncode
+ self.assertEqual(pipe_returncode, 0)
+ self.assertTrue(os.path.isfile(testout_file))
+ if (pipe_returncode != 0):
+ return
+
if packets is not None:
- self.checkPacketCount(packets, cap_file=rbf)
+ self.checkPacketCount(packets)
elif filesize is not None:
- capturekb = os.path.getsize(rbf) / 1000
+ capturekb = os.path.getsize(testout_file) / 1000
self.assertGreaterEqual(capturekb, filesize)
+ return check_dumpcap_autostop_stdin_real
+
+
+@fixtures.fixture
+def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
+ def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
+ # Similar to check_capture_stdin.
+ rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
+ testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
+ testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
+ cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
+ condition='oops:invalid'
+
+ self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
+ self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
+ if packets is not None:
+ condition = 'packets:{}'.format(packets)
+ elif filesize is not None:
+ condition = 'filesize:{}'.format(filesize)
+
+ capture_cmd = ' '.join((cmd_dumpcap,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', 'files:2',
+ '-b', condition,
+ ))
+ pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_returncode = pipe_proc.returncode
+ self.assertEqual(pipe_returncode, 0)
+ if (pipe_returncode != 0):
+ return
+
+ rb_files = glob.glob(testout_glob)
+ for rbf in rb_files:
+ self.cleanup_files.append(rbf)
+
+ self.assertEqual(len(rb_files), 2)
+
+ for rbf in rb_files:
+ self.assertTrue(os.path.isfile(rbf))
+ if packets is not None:
+ self.checkPacketCount(packets, cap_file=rbf)
+ elif filesize is not None:
+ capturekb = os.path.getsize(rbf) / 1000
+ self.assertGreaterEqual(capturekb, filesize)
+ return check_dumpcap_ringbuffer_stdin_real
+
+
+@fixtures.mark_usefixtures('test_env')
+@fixtures.uses_fixtures
class case_wireshark_capture(subprocesstest.SubprocessTestCase):
- def test_wireshark_capture_10_packets_to_file(self):
+ def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets):
'''Capture 10 packets from the network to a file using Wireshark'''
- check_capture_10_packets(self, cmd=config.cmd_wireshark)
+ check_capture_10_packets(self, cmd=wireshark_k)
# Wireshark doesn't currently support writing to stdout while capturing.
- # def test_wireshark_capture_10_packets_to_stdout(self):
+ # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
# '''Capture 10 packets from the network to stdout using Wireshark'''
- # check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)
+ # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
- def test_wireshark_capture_from_fifo(self):
+ def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo):
'''Capture from a fifo using Wireshark'''
- check_capture_fifo(self, cmd=config.cmd_wireshark)
+ check_capture_fifo(self, cmd=wireshark_k)
- def test_wireshark_capture_from_stdin(self):
+ def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin):
'''Capture from stdin using Wireshark'''
- check_capture_stdin(self, cmd=config.cmd_wireshark)
+ check_capture_stdin(self, cmd=wireshark_k)
- def test_wireshark_capture_snapshot_len(self):
+ def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len):
'''Capture truncated packets using Wireshark'''
- check_capture_snapshot_len(self, cmd=config.cmd_wireshark)
+ check_capture_snapshot_len(self, cmd=wireshark_k)
+
+@fixtures.mark_usefixtures('test_env')
+@fixtures.uses_fixtures
class case_tshark_capture(subprocesstest.SubprocessTestCase):
- def test_tshark_capture_10_packets_to_file(self):
+ def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
'''Capture 10 packets from the network to a file using TShark'''
- check_capture_10_packets(self, cmd=config.cmd_tshark)
+ check_capture_10_packets(self, cmd=cmd_tshark)
- def test_tshark_capture_10_packets_to_stdout(self):
+ def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
'''Capture 10 packets from the network to stdout using TShark'''
- check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)
+ check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
- def test_tshark_capture_from_fifo(self):
+ def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
'''Capture from a fifo using TShark'''
- check_capture_fifo(self, cmd=config.cmd_tshark)
+ check_capture_fifo(self, cmd=cmd_tshark)
- def test_tshark_capture_from_stdin(self):
+ def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
'''Capture from stdin using TShark'''
- check_capture_stdin(self, cmd=config.cmd_tshark)
+ check_capture_stdin(self, cmd=cmd_tshark)
- def test_tshark_capture_snapshot_len(self):
+ def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
'''Capture truncated packets using TShark'''
- check_capture_snapshot_len(self, cmd=config.cmd_tshark)
+ check_capture_snapshot_len(self, cmd=cmd_tshark)
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
- def test_dumpcap_capture_10_packets_to_file(self):
+ def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
'''Capture 10 packets from the network to a file using Dumpcap'''
- check_capture_10_packets(self, cmd=config.cmd_dumpcap)
+ check_capture_10_packets(self, cmd=cmd_dumpcap)
- def test_dumpcap_capture_10_packets_to_stdout(self):
+ def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
'''Capture 10 packets from the network to stdout using Dumpcap'''
- check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)
+ check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
- def test_dumpcap_capture_from_fifo(self):
+ def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
'''Capture from a fifo using Dumpcap'''
- check_capture_fifo(self, cmd=config.cmd_dumpcap)
+ check_capture_fifo(self, cmd=cmd_dumpcap)
- def test_dumpcap_capture_from_stdin(self):
+ def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
'''Capture from stdin using Dumpcap'''
- check_capture_stdin(self, cmd=config.cmd_dumpcap)
+ check_capture_stdin(self, cmd=cmd_dumpcap)
- def test_dumpcap_capture_snapshot_len(self):
+ def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
'''Capture truncated packets using Dumpcap'''
- check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)
+ check_capture_snapshot_len(self, cmd=cmd_dumpcap)
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
# duration, filesize, packets, files
- def test_dumpcap_autostop_filesize(self):
+ def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
'''Capture from stdin using Dumpcap until we reach a file size limit'''
check_dumpcap_autostop_stdin(self, filesize=15)
- def test_dumpcap_autostop_packets(self):
+ def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
'''Capture from stdin using Dumpcap until we reach a packet limit'''
check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
# duration, interval, filesize, packets, files
# Need a function that finds ringbuffer file names.
- def test_dumpcap_ringbuffer_filesize(self):
+ def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
'''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
check_dumpcap_ringbuffer_stdin(self, filesize=15)
- def test_dumpcap_ringbuffer_packets(self):
+ def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
'''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.
diff --git a/test/suite_clopts.py b/test/suite_clopts.py
index 9be3e13079..722769b08d 100644
--- a/test/suite_clopts.py
+++ b/test/suite_clopts.py
@@ -9,10 +9,8 @@
#
'''Command line option tests'''
-import config
import subprocess
import subprocesstest
-import unittest
import fixtures
#glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs')
@@ -44,36 +42,31 @@ class case_dumpcap_options(subprocesstest.SubprocessTestCase):
self.assertIn(process.returncode, valid_returns)
+@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
- def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, base_env):
+ def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, capture_interface):
'''Invalid capture filter'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_filter = '__invalid_protocol'
# $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
- self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file), env=base_env)
+ self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file))
self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
- def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, base_env):
+ def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, capture_interface):
'''Invalid capture interface name'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_interface = '__invalid_interface'
# $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
- self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file), env=base_env)
+ self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file))
self.assertTrue(self.grepOutput('The capture session could not be initiated'))
- def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, base_env):
+ def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, capture_interface):
'''Invalid capture interface index'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_index = '0'
# $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
- self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file), env=base_env)
+ self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file))
self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
@@ -106,8 +99,9 @@ class case_tshark_options(subprocesstest.SubprocessTestCase):
self.assertRun((cmd_tshark, '-' + char_arg))
# XXX Should we generate individual test functions instead of looping?
- def test_tshark_interface_chars(self, cmd_tshark):
+ def test_tshark_interface_chars(self, cmd_tshark, cmd_dumpcap):
'''Valid tshark parameters requiring capture permissions'''
+ # These options require dumpcap
valid_returns = [self.exit_ok, self.exit_error]
for char_arg in 'DL':
process = self.runProcess((cmd_tshark, '-' + char_arg))
@@ -117,30 +111,24 @@ class case_tshark_options(subprocesstest.SubprocessTestCase):
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
- def test_tshark_invalid_capfilter(self, cmd_tshark):
+ def test_tshark_invalid_capfilter(self, cmd_tshark, capture_interface):
'''Invalid capture filter'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_filter = '__invalid_protocol'
# $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
self.runProcess((cmd_tshark, '-f', invalid_filter, '-w', testout_file ))
self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
- def test_tshark_invalid_interface_name(self, cmd_tshark):
+ def test_tshark_invalid_interface_name(self, cmd_tshark, capture_interface):
'''Invalid capture interface name'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_interface = '__invalid_interface'
# $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
self.runProcess((cmd_tshark, '-i', invalid_interface, '-w', testout_file))
self.assertTrue(self.grepOutput('The capture session could not be initiated'))
- def test_tshark_invalid_interface_index(self, cmd_tshark):
+ def test_tshark_invalid_interface_index(self, cmd_tshark, capture_interface):
'''Invalid capture interface index'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_index = '0'
# $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
@@ -151,9 +139,7 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_tshark_name_resolution_clopts(subprocesstest.SubprocessTestCase):
- def test_tshark_valid_name_resolution(self, cmd_tshark):
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
+ def test_tshark_valid_name_resolution(self, cmd_tshark, capture_interface):
# $TSHARK -N mnNtdv -a duration:1 > ./testout.txt 2>&1
self.assertRun((cmd_tshark, '-N', 'mnNtdv', '-a', 'duration: 1'))
diff --git a/test/suite_fileformats.py b/test/suite_fileformats.py
index 86be14d378..1d482dff43 100644
--- a/test/suite_fileformats.py
+++ b/test/suite_fileformats.py
@@ -34,29 +34,29 @@ def fileformats_baseline_str(dirs):
class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
def test_pcap_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs microsecond pcap stdin'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess(' '.join((cmd_tshark,
'-r', '-',
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
'<', capture_file('dhcp.pcap')
- , shell=True),
+ )),
shell=True)
self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
def test_pcap_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs nanosecond pcap stdin'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess(' '.join((cmd_tshark,
'-r', '-',
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
'<', capture_file('dhcp-nanosecond.pcap')
- , shell=True),
+ )),
shell=True)
self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
def test_pcap_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs nanosecond pcap direct'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess((cmd_tshark,
'-r', capture_file('dhcp-nanosecond.pcap'),
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
@@ -70,18 +70,18 @@ class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
def test_pcapng_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs microsecond pcapng stdin'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess(' '.join((cmd_tshark,
'-r', '-',
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
'<', capture_file('dhcp.pcapng')
- , shell=True),
+ )),
shell=True)
self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
def test_pcapng_usec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs microsecond pcapng direct'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess((cmd_tshark,
'-r', capture_file('dhcp.pcapng'),
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
@@ -91,18 +91,18 @@ class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
def test_pcapng_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs nanosecond pcapng stdin'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess(' '.join((cmd_tshark,
'-r', '-',
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
'<', capture_file('dhcp-nanosecond.pcapng')
- , shell=True),
+ )),
shell=True)
self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
def test_pcapng_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs nanosecond pcapng direct'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess((cmd_tshark,
'-r', capture_file('dhcp-nanosecond.pcapng'),
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
diff --git a/test/suite_io.py b/test/suite_io.py
index 9561b6418e..9b0fa9cf42 100644
--- a/test/suite_io.py
+++ b/test/suite_io.py
@@ -47,7 +47,7 @@ def check_io_4_packets(self, capture_file, cmd=None, from_stdin=False, to_stdout
io_proc = self.runProcess(stdout_cmd, shell=True)
else: # direct->direct
# $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w ./testout.pcap > ./testout.txt 2>&1
- io_proc = self.runProcess(subprocesstest.capture_command(cmd,
+ io_proc = self.runProcess((cmd,
'-r', capture_file('dhcp.pcap'),
'-w', testout_file,
))
diff --git a/test/suite_unittests.py b/test/suite_unittests.py
index 94621ed7fa..6200dcfcba 100644
--- a/test/suite_unittests.py
+++ b/test/suite_unittests.py
@@ -11,7 +11,6 @@
#
'''EPAN unit tests'''
-import config
import difflib
import os.path
import re
@@ -53,7 +52,7 @@ class case_unittests(subprocesstest.SubprocessTestCase):
'''fieldcount'''
self.assertRun((cmd_tshark, '-G', 'fieldcount'), env=test_env)
- def test_unit_ctest_coverage(self):
+ def test_unit_ctest_coverage(self, all_test_groups):
'''Make sure CTest runs all of our tests.'''
with open(os.path.join(os.path.dirname(__file__), '..', 'CMakeLists.txt')) as cml_fd:
group_re = re.compile(r'set *\( *_test_group_list')
@@ -68,8 +67,8 @@ class case_unittests(subprocesstest.SubprocessTestCase):
break
cml_groups.append(cml_line.strip())
cml_groups.sort()
- if not config.all_groups == cml_groups:
- diff = '\n'.join(list(difflib.unified_diff(config.all_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups')))
+ if not all_test_groups == cml_groups:
+ diff = '\n'.join(list(difflib.unified_diff(all_test_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups')))
self.fail("CMakeLists.txt doesn't test all available groups:\n" + diff)
diff --git a/test/test.py b/test/test.py
index c81f14c367..f775bae85c 100755
--- a/test/test.py
+++ b/test/test.py
@@ -13,17 +13,22 @@
# To do:
# - Avoid printing Python tracebacks when we assert? It looks like we'd need
# to override unittest.TextTestResult.addFailure().
-# - Remove BIN_PATH/hosts via config.tearDownHostFiles + case_name_resolution.tearDownClass?
import argparse
-import config
import os.path
import sys
import unittest
+import fixtures
# Required to make fixtures available to tests!
import fixtures_ws
+_all_test_groups = None
+
+@fixtures.fixture(scope='session')
+def all_test_groups():
+ return _all_test_groups
+
def find_test_ids(suite, all_ids):
if hasattr(suite, '__iter__'):
for s in suite:
@@ -47,10 +52,9 @@ def main():
parser = argparse.ArgumentParser(description='Wireshark unit tests')
cap_group = parser.add_mutually_exclusive_group()
- cap_group.add_argument('-e', '--enable-capture', action='store_true', help='Enable capture tests')
cap_group.add_argument('-E', '--disable-capture', action='store_true', help='Disable capture tests')
- cap_group.add_argument('-i', '--capture-interface', nargs=1, default=None, help='Capture interface index or name')
- parser.add_argument('-p', '--program-path', nargs=1, default=os.path.curdir, help='Path to Wireshark executables.')
+ cap_group.add_argument('-i', '--capture-interface', help='Capture interface index or name')
+ parser.add_argument('-p', '--program-path', default=os.path.curdir, help='Path to Wireshark executables.')
list_group = parser.add_mutually_exclusive_group()
list_group.add_argument('-l', '--list', action='store_true', help='List tests. One of "all" or a full or partial test name.')
list_group.add_argument('--list-suites', action='store_true', help='List all suites.')
@@ -60,14 +64,6 @@ def main():
parser.add_argument('tests_to_run', nargs='*', metavar='test', default=['all'], help='Tests to run. One of "all" or a full or partial test name. Default is "all".')
args = parser.parse_args()
- if args.enable_capture:
- config.setCanCapture(True)
- elif args.disable_capture:
- config.setCanCapture(False)
-
- if args.capture_interface:
- config.setCaptureInterface(args.capture_interface[0])
-
all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*')
all_ids = []
@@ -96,8 +92,7 @@ def main():
for aid in all_ids:
aparts = aid.split('.')
all_suites |= {aparts[0]}
- config.all_suites = list(all_suites)
- config.all_suites.sort()
+ all_suites = sorted(all_suites)
all_groups = set()
for aid in all_ids:
@@ -106,15 +101,16 @@ def main():
all_groups |= {'.'.join(aparts[:2])}
else:
all_groups |= {aparts[0]}
- config.all_groups = list(all_groups)
- config.all_groups.sort()
+ all_groups = sorted(all_groups)
+ global _all_test_groups
+ _all_test_groups = all_groups
if args.list_suites:
- print('\n'.join(config.all_suites))
+ print('\n'.join(all_suites))
sys.exit(0)
if args.list_groups:
- print('\n'.join(config.all_groups))
+ print('\n'.join(all_groups))
sys.exit(0)
if args.list_cases:
@@ -125,13 +121,6 @@ def main():
print('\n'.join(list(cases)))
sys.exit(0)
- program_path = args.program_path[0]
- if not config.setProgramPath(program_path):
- print('One or more required executables not found at {}\n'.format(program_path))
- parser.print_usage()
- sys.exit(1)
-
- #
if sys.stdout.encoding != 'UTF-8':
import codecs
import locale
@@ -142,7 +131,7 @@ def main():
run_suite = unittest.defaultTestLoader.loadTestsFromNames(run_ids)
runner = unittest.TextTestRunner(verbosity=args.verbose)
# for unittest compatibility (not needed with pytest)
- fixtures_ws.fixtures.create_session()
+ fixtures_ws.fixtures.create_session(args)
try:
test_result = runner.run(run_suite)
finally: