aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/conftest.py29
-rw-r--r--test/fixtures.py43
-rw-r--r--test/fixtures_ws.py58
-rw-r--r--test/subprocesstest.py23
-rw-r--r--test/suite_capture.py565
-rw-r--r--test/suite_clopts.py40
-rw-r--r--test/suite_fileformats.py22
-rw-r--r--test/suite_io.py2
-rw-r--r--test/suite_unittests.py7
-rwxr-xr-xtest/test.py43
10 files changed, 454 insertions, 378 deletions
diff --git a/test/conftest.py b/test/conftest.py
index a09e86c485..c2646bf7a8 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -8,31 +8,34 @@
#
'''py.test configuration'''
-import os
-import sys
import fixtures
-import config
-# XXX remove globals in config and create py.test-specific fixtures
-try:
- _program_path = os.environ['WS_BIN_PATH']
-except KeyError:
- print('Please set env var WS_BIN_PATH to the run directory with binaries')
- sys.exit(1)
-if not config.setProgramPath(_program_path):
- print('One or more required executables not found at {}\n'.format(_program_path))
- sys.exit(1)
+def pytest_addoption(parser):
+ parser.addoption('--disable-capture', action='store_true',
+ help='Disable capture tests'
+ )
+ parser.addoption('--capture-interface',
+ help='Capture interface index or name.'
+ )
+ parser.addoption('--program-path', help='Path to Wireshark executables.')
+
+_all_test_groups = None
# this is set only to please case_unittests.test_unit_ctest_coverage
def pytest_collection_modifyitems(items):
'''Find all test groups.'''
+ global _all_test_groups
suites = []
for item in items:
name = item.nodeid.split("::")[0].replace(".py", "").replace("/", ".")
if name not in suites:
suites.append(name)
- config.all_groups = list(sorted(suites))
+ _all_test_groups = sorted(suites)
# Must enable pytest before importing fixtures_ws.
fixtures.enable_pytest()
from fixtures_ws import *
+
+@fixtures.fixture(scope='session')
+def all_test_groups():
+ return _all_test_groups
diff --git a/test/fixtures.py b/test/fixtures.py
index 26c0ed371f..1dbfe40977 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -7,6 +7,7 @@
# SPDX-License-Identifier: (GPL-2.0-or-later OR MIT)
#
+import argparse
import functools
import inspect
import sys
@@ -197,10 +198,13 @@ class _ExecutionScope(object):
def cached_result(self, spec):
'''Obtain the cached result for a previously executed fixture.'''
- value, exc = self._find_scope(spec.scope).cache[spec.name]
+ entry = self._find_scope(spec.scope).cache.get(spec.name)
+ if not entry:
+ return None, False
+ value, exc = entry
if exc:
raise exc
- return value
+ return value, True
def _execute_one(self, spec, test_fn):
# A fixture can only execute in the same or earlier scopes
@@ -265,7 +269,15 @@ class _FixtureRequest(object):
if not spec:
assert argname == 'request'
return self
- return self._context.cached_result(spec)
+ value, ok = self._context.cached_result(spec)
+ if not ok:
+ # If getfixturevalue is called directly from a setUp function, the
+ # fixture value might not have computed before, so evaluate it now.
+ # As the test function is not available, use None.
+ self._context.execute(spec, test_fn=None)
+ value, ok = self._context.cached_result(spec)
+ assert ok, 'Failed to execute fixture %s' % (spec,)
+ return value
def destroy(self):
self._context.destroy()
@@ -277,6 +289,11 @@ class _FixtureRequest(object):
def instance(self):
return self.function.__self__
+ @property
+ def config(self):
+ '''The pytest config object associated with this request.'''
+ return _config
+
def _patch_unittest_testcase_class(cls):
'''
@@ -304,8 +321,20 @@ def _patch_unittest_testcase_class(cls):
cls._orig_tearDown, cls.tearDown = cls.tearDown, tearDown
+class _Config(object):
+ def __init__(self, args):
+ assert isinstance(args, argparse.Namespace)
+ self.args = args
+
+ def getoption(self, name, default):
+ '''Partial emulation for pytest Config.getoption.'''
+ name = name.lstrip('-').replace('-', '_')
+ return getattr(self.args, name, default)
+
+
_fallback = None
_session_context = None
+_config = None
def init_fallback_fixtures_once():
@@ -317,10 +346,14 @@ def init_fallback_fixtures_once():
# Register standard fixtures here as needed
-def create_session():
- global _session_context
+def create_session(args=None):
+ '''Start a test session where args is from argparse.'''
+ global _session_context, _config
assert not _use_native_pytest
_session_context = _ExecutionScope('session', None)
+ if args is None:
+ args = argparse.Namespace()
+ _config = _Config(args)
def destroy_session():
diff --git a/test/fixtures_ws.py b/test/fixtures_ws.py
index 18be13631f..55e3e2151c 100644
--- a/test/fixtures_ws.py
+++ b/test/fixtures_ws.py
@@ -22,9 +22,51 @@ import subprocesstest
@fixtures.fixture(scope='session')
-def program_path():
- # XXX stop using config
- return config.program_path
+def capture_interface(request, cmd_dumpcap):
+ '''
+ Name of capture interface. Tests will be skipped if dumpcap is not
+ available or if the capture interface is unknown.
+ '''
+ iface = request.config.getoption('--capture-interface', default=None)
+ disabled = request.config.getoption('--disable-capture', default=False)
+ if disabled:
+ fixtures.skip('Capture tests are disabled via --disable-capture')
+ if iface:
+ # If a non-empty interface is given, assume capturing is possible.
+ return iface
+ else:
+ if sys.platform == 'win32':
+ patterns = '.*(Ethernet|Network Connection|VMware|Intel)'
+ else:
+ patterns = None
+ if patterns:
+ try:
+ output = subprocess.check_output((cmd_dumpcap, '-D'),
+ stderr=subprocess.DEVNULL,
+ universal_newlines=True)
+ m = re.search(r'^(\d+)\. %s' % patterns, output, re.MULTILINE)
+ if m:
+ return m.group(1)
+ except subprocess.CalledProcessError:
+ pass
+ fixtures.skip('Test requires capture privileges and an interface.')
+
+
+@fixtures.fixture(scope='session')
+def program_path(request):
+ '''
+ Path to the Wireshark binaries as set by the --program-path option, the
+ WS_BIN_PATH environment variable or (curdir)/run.
+ '''
+ paths = (
+ request.config.getoption('--program-path', default=None),
+ os.environ.get('WS_BIN_PATH'),
+ os.path.join(os.curdir, 'run'),
+ )
+ for path in paths:
+ if type(path) == str and os.path.isdir(path):
+ return path
+ raise AssertionError('Missing directory with Wireshark binaries')
@fixtures.fixture(scope='session')
@@ -76,6 +118,14 @@ def cmd_wireshark(program):
@fixtures.fixture(scope='session')
+def wireshark_command(cmd_wireshark):
+ if sys.platform not in ('win32', 'darwin'):
+ # TODO check DISPLAY for X11 on Linux or BSD?
+ fixtures.skip('Wireshark GUI tests requires DISPLAY')
+ return (cmd_wireshark, '-ogui.update.enabled:FALSE')
+
+
+@fixtures.fixture(scope='session')
def features(cmd_tshark):
'''Returns an object describing available features in tshark.'''
try:
@@ -182,5 +232,3 @@ def test_env(base_env, conf_path, request):
# Inject the test environment as default if it was not overridden.
request.instance.injected_test_env = env
return env
-
-# XXX capture: capture_interface
diff --git a/test/subprocesstest.py b/test/subprocesstest.py
index 911868624b..060ebeb3a4 100644
--- a/test/subprocesstest.py
+++ b/test/subprocesstest.py
@@ -27,23 +27,6 @@ import unittest
# the command line.
process_timeout = 300 # Seconds
-def capture_command(cmd, *args, **kwargs):
- '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
-
- If shell is true, return a string. Otherwise, return a list of arguments.'''
- shell = kwargs.pop('shell', False)
- if shell:
- cap_cmd = ['"' + cmd + '"']
- else:
- cap_cmd = [cmd]
- if cmd == config.cmd_wireshark:
- cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
- cap_cmd += args
- if shell:
- return ' '.join(cap_cmd)
- else:
- return cap_cmd
-
def cat_dhcp_command(mode):
'''Create a command string for dumping dhcp.pcap to stdout'''
# XXX Do this in Python in a thread?
@@ -196,10 +179,12 @@ class SubprocessTestCase(unittest.TestCase):
capinfos_args must be a sequence.
Default cap_file is <test id>.testout.pcap.'''
+ # XXX convert users to use a new fixture instead of this function.
+ cmd_capinfos = self._fixture_request.getfixturevalue('cmd_capinfos')
if not cap_file:
cap_file = self.filename_from_id('testout.pcap')
- self.log_fd.write('\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
- capinfos_cmd = [config.cmd_capinfos]
+ self.log_fd.write('\nOutput of {0} {1}:\n'.format(cmd_capinfos, cap_file))
+ capinfos_cmd = [cmd_capinfos]
if capinfos_args is not None:
capinfos_cmd += capinfos_args
capinfos_cmd.append(cap_file)
diff --git a/test/suite_capture.py b/test/suite_capture.py
index eb1a4ffcab..a9ac1aa9c0 100644
--- a/test/suite_capture.py
+++ b/test/suite_capture.py
@@ -12,13 +12,12 @@
import config
import glob
import os
-import re
import subprocess
import subprocesstest
import sys
import time
-import unittest
import uuid
+import fixtures
capture_duration = 5
@@ -40,331 +39,365 @@ def stop_pinging(ping_procs):
for proc in ping_procs:
proc.kill()
-def check_capture_10_packets(self, cmd=None, to_stdout=False):
- # Similar to suite_io.check_io_4_packets.
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
- self.assertIsNotNone(cmd)
- testout_file = self.filename_from_id(testout_pcap)
- ping_procs = start_pinging(self)
- if to_stdout:
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', '"{}"'.format(config.capture_interface),
+
+@fixtures.fixture(scope='session')
+def wireshark_k(wireshark_command):
+ return tuple(list(wireshark_command) + ['-k'])
+
+def capture_command(*cmd_args, shell=False):
+ if type(cmd_args[0]) != str:
+ # Assume something like ['wireshark', '-k']
+ cmd_args = list(cmd_args[0]) + list(cmd_args)[1:]
+ if shell:
+ cmd_args = ' '.join(cmd_args)
+ return cmd_args
+
+
+@fixtures.fixture
+def check_capture_10_packets(capture_interface, cmd_dumpcap):
+ def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
+ # Similar to suite_io.check_io_4_packets.
+ if not config.args_ping:
+ self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+ self.assertIsNotNone(cmd)
+ testout_file = self.filename_from_id(testout_pcap)
+ ping_procs = start_pinging(self)
+ if to_stdout:
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', '"{}"'.format(capture_interface),
+ '-p',
+ '-w', '-',
+ '-c', '10',
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', '"icmp || icmp6"',
+ '>', testout_file,
+ shell=True
+ ),
+ shell=True
+ )
+ else:
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', capture_interface,
+ '-p',
+ '-w', testout_file,
+ '-c', '10',
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', 'icmp || icmp6',
+ ))
+ capture_returncode = capture_proc.returncode
+ stop_pinging(ping_procs)
+ if capture_returncode != 0:
+ self.log_fd.write('{} -D output:\n'.format(cmd))
+ self.runProcess((cmd, '-D'))
+ self.assertEqual(capture_returncode, 0)
+ if (capture_returncode == 0):
+ self.checkPacketCount(10)
+ return check_capture_10_packets_real
+
+
+@fixtures.fixture
+def check_capture_fifo(cmd_dumpcap):
+ if sys.platform == 'win32':
+ fixtures.skip('Test requires OS fifo support.')
+
+ def check_capture_fifo_real(self, cmd=None):
+ self.assertIsNotNone(cmd)
+ testout_file = self.filename_from_id(testout_pcap)
+ fifo_file = self.filename_from_id('testout.fifo')
+ try:
+ # If a previous test left its fifo laying around, e.g. from a failure, remove it.
+ os.unlink(fifo_file)
+ except:
+ pass
+ os.mkfifo(fifo_file)
+ slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+ fifo_proc = self.startProcess(
+ ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
+ shell=True)
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', fifo_file,
'-p',
- '-w', '-',
- '-c', '10',
+ '-w', testout_file,
+ '-a', 'duration:{}'.format(capture_duration),
+ ))
+ fifo_proc.kill()
+ self.assertTrue(os.path.isfile(testout_file))
+ capture_returncode = capture_proc.returncode
+ self.assertEqual(capture_returncode, 0)
+ if (capture_returncode == 0):
+ self.checkPacketCount(8)
+ return check_capture_fifo_real
+
+
+@fixtures.fixture
+def check_capture_stdin(cmd_dumpcap):
+ # Capturing always requires dumpcap, hence the dependency on it.
+ def check_capture_stdin_real(self, cmd=None):
+ # Similar to suite_io.check_io_4_packets.
+ self.assertIsNotNone(cmd)
+ testout_file = self.filename_from_id(testout_pcap)
+ slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+ capture_cmd = capture_command(cmd,
+ '-i', '-',
+ '-w', testout_file,
'-a', 'duration:{}'.format(capture_duration),
- '-f', '"icmp || icmp6"',
- '>', testout_file,
shell=True
- ),
- shell=True
)
- else:
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', config.capture_interface,
+ is_gui = type(cmd) != str and '-k' in cmd[0]
+ if is_gui:
+ capture_cmd += ' -o console.log.level:127'
+ pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_returncode = pipe_proc.returncode
+ self.assertEqual(pipe_returncode, 0)
+ if is_gui:
+ self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
+ self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
+ self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
+ self.assertTrue(os.path.isfile(testout_file))
+ if (pipe_returncode == 0):
+ self.checkPacketCount(8)
+ return check_capture_stdin_real
+
+
+@fixtures.fixture
+def check_capture_read_filter(capture_interface):
+ def check_capture_read_filter_real(self, cmd=None):
+ if not config.args_ping:
+ self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+ self.assertIsNotNone(cmd)
+ ping_procs = start_pinging(self)
+ testout_file = self.filename_from_id(testout_pcap)
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', capture_interface,
'-p',
'-w', testout_file,
+ '-2',
+ '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
'-f', 'icmp || icmp6',
))
- capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
- if capture_returncode != 0:
- self.log_fd.write('{} -D output:\n'.format(cmd))
- self.runProcess((cmd, '-D'))
- self.assertEqual(capture_returncode, 0)
- if (capture_returncode == 0):
- self.checkPacketCount(10)
-
-def check_capture_fifo(self, cmd=None):
- if not config.canMkfifo():
- self.skipTest('Test requires OS fifo support.')
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- self.assertIsNotNone(cmd)
- capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
- testout_file = self.filename_from_id(testout_pcap)
- fifo_file = self.filename_from_id('testout.fifo')
- try:
- # If a previous test left its fifo laying around, e.g. from a failure, remove it.
- os.unlink(fifo_file)
- except:
- pass
- os.mkfifo(fifo_file)
- slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
- fifo_proc = self.startProcess(
- ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
- shell=True)
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', fifo_file,
- '-p',
- '-w', testout_file,
- '-a', 'duration:{}'.format(capture_duration),
- ))
- fifo_proc.kill()
- self.assertTrue(os.path.isfile(testout_file))
- capture_returncode = capture_proc.returncode
- self.assertEqual(capture_returncode, 0)
- if (capture_returncode == 0):
- self.checkPacketCount(8)
-
-def check_capture_stdin(self, cmd=None):
- # Similar to suite_io.check_io_4_packets.
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- self.assertIsNotNone(cmd)
- capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
- testout_file = self.filename_from_id(testout_pcap)
- slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
- capture_cmd = subprocesstest.capture_command(cmd,
- '-i', '-',
- '-w', testout_file,
- '-a', 'duration:{}'.format(capture_duration),
- shell=True
- )
- if cmd == config.cmd_wireshark:
- capture_cmd += ' -o console.log.level:127'
- pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
- pipe_returncode = pipe_proc.returncode
- self.assertEqual(pipe_returncode, 0)
- if cmd == config.cmd_wireshark:
- self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
- self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
- self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
- self.assertTrue(os.path.isfile(testout_file))
- if (pipe_returncode == 0):
- self.checkPacketCount(8)
-
-def check_capture_read_filter(self, cmd=None):
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
- self.assertIsNotNone(cmd)
- ping_procs = start_pinging(self)
- testout_file = self.filename_from_id(testout_pcap)
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', config.capture_interface,
- '-p',
- '-w', testout_file,
- '-2',
- '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
- '-c', '10',
- '-a', 'duration:{}'.format(capture_duration),
- '-f', 'icmp || icmp6',
- ))
- capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
- self.assertEqual(capture_returncode, 0)
-
- if (capture_returncode == 0):
- self.checkPacketCount(0)
-
-def check_capture_snapshot_len(self, cmd=None):
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
- if cmd == config.cmd_wireshark and not config.canDisplay():
- self.skipTest('Test requires a display.')
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
- self.assertIsNotNone(cmd)
- ping_procs = start_pinging(self)
- testout_file = self.filename_from_id(testout_pcap)
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
- '-i', config.capture_interface,
- '-p',
- '-w', testout_file,
- '-s', str(snapshot_len),
- '-a', 'duration:{}'.format(capture_duration),
- '-f', 'icmp || icmp6',
- ))
- capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
- self.assertEqual(capture_returncode, 0)
- self.assertTrue(os.path.isfile(testout_file))
-
- # Use tshark to filter out all packets larger than 68 bytes.
- testout2_file = self.filename_from_id('testout2.pcap')
-
- filter_proc = self.runProcess((config.cmd_tshark,
- '-r', testout_file,
- '-w', testout2_file,
- '-Y', 'frame.cap_len>{}'.format(snapshot_len),
- ))
- filter_returncode = filter_proc.returncode
- self.assertEqual(capture_returncode, 0)
- if (capture_returncode == 0):
- self.checkPacketCount(0, cap_file=testout2_file)
-
-def check_dumpcap_autostop_stdin(self, packets=None, filesize=None):
- # Similar to check_capture_stdin.
- cmd = config.cmd_dumpcap
- capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
- testout_file = self.filename_from_id(testout_pcap)
- cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
- condition='oops:invalid'
-
- self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
- self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
-
- if packets is not None:
- condition = 'packets:{}'.format(packets)
- elif filesize is not None:
- condition = 'filesize:{}'.format(filesize)
-
- capture_cmd = subprocesstest.capture_command(cmd,
- '-i', '-',
- '-w', testout_file,
- '-a', condition,
- shell=True
- )
- pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
- pipe_returncode = pipe_proc.returncode
- self.assertEqual(pipe_returncode, 0)
- self.assertTrue(os.path.isfile(testout_file))
- if (pipe_returncode != 0):
- return
-
- if packets is not None:
- self.checkPacketCount(packets)
- elif filesize is not None:
- capturekb = os.path.getsize(testout_file) / 1000
- self.assertGreaterEqual(capturekb, filesize)
-
-def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None):
- # Similar to check_capture_stdin.
- cmd = config.cmd_dumpcap
- capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
- rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
- testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
- testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
- cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
- condition='oops:invalid'
-
- self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
- self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
-
- if packets is not None:
- condition = 'packets:{}'.format(packets)
- elif filesize is not None:
- condition = 'filesize:{}'.format(filesize)
-
- capture_cmd = subprocesstest.capture_command(cmd,
- '-i', '-',
- '-w', testout_file,
- '-a', 'files:2',
- '-b', condition,
- shell=True
- )
- pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
- pipe_returncode = pipe_proc.returncode
- self.assertEqual(pipe_returncode, 0)
- if (pipe_returncode != 0):
- return
-
- rb_files = glob.glob(testout_glob)
- for rbf in rb_files:
- self.cleanup_files.append(rbf)
-
- self.assertEqual(len(rb_files), 2)
-
- for rbf in rb_files:
- self.assertTrue(os.path.isfile(rbf))
+ capture_returncode = capture_proc.returncode
+ stop_pinging(ping_procs)
+ self.assertEqual(capture_returncode, 0)
+
+ if (capture_returncode == 0):
+ self.checkPacketCount(0)
+ return check_capture_read_filter_real
+
+@fixtures.fixture
+def check_capture_snapshot_len(capture_interface, cmd_tshark):
+ def check_capture_snapshot_len_real(self, cmd=None):
+ if not config.args_ping:
+ self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
+ self.assertIsNotNone(cmd)
+ ping_procs = start_pinging(self)
+ testout_file = self.filename_from_id(testout_pcap)
+ capture_proc = self.runProcess(capture_command(cmd,
+ '-i', capture_interface,
+ '-p',
+ '-w', testout_file,
+ '-s', str(snapshot_len),
+ '-a', 'duration:{}'.format(capture_duration),
+ '-f', 'icmp || icmp6',
+ ))
+ capture_returncode = capture_proc.returncode
+ stop_pinging(ping_procs)
+ self.assertEqual(capture_returncode, 0)
+ self.assertTrue(os.path.isfile(testout_file))
+
+ # Use tshark to filter out all packets larger than 68 bytes.
+ testout2_file = self.filename_from_id('testout2.pcap')
+
+ filter_proc = self.runProcess((cmd_tshark,
+ '-r', testout_file,
+ '-w', testout2_file,
+ '-Y', 'frame.cap_len>{}'.format(snapshot_len),
+ ))
+ filter_returncode = filter_proc.returncode
+ self.assertEqual(capture_returncode, 0)
+ if (capture_returncode == 0):
+ self.checkPacketCount(0, cap_file=testout2_file)
+ return check_capture_snapshot_len_real
+
+
+@fixtures.fixture
+def check_dumpcap_autostop_stdin(cmd_dumpcap):
+ def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None):
+ # Similar to check_capture_stdin.
+ testout_file = self.filename_from_id(testout_pcap)
+ cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
+ condition='oops:invalid'
+
+ self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
+ self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
+
+ if packets is not None:
+ condition = 'packets:{}'.format(packets)
+ elif filesize is not None:
+ condition = 'filesize:{}'.format(filesize)
+
+ capture_cmd = ' '.join((cmd_dumpcap,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', condition,
+ ))
+ pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_returncode = pipe_proc.returncode
+ self.assertEqual(pipe_returncode, 0)
+ self.assertTrue(os.path.isfile(testout_file))
+ if (pipe_returncode != 0):
+ return
+
if packets is not None:
- self.checkPacketCount(packets, cap_file=rbf)
+ self.checkPacketCount(packets)
elif filesize is not None:
- capturekb = os.path.getsize(rbf) / 1000
+ capturekb = os.path.getsize(testout_file) / 1000
self.assertGreaterEqual(capturekb, filesize)
+ return check_dumpcap_autostop_stdin_real
+
+
+@fixtures.fixture
+def check_dumpcap_ringbuffer_stdin(cmd_dumpcap):
+ def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None):
+ # Similar to check_capture_stdin.
+ rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
+ testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
+ testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
+ cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
+ condition='oops:invalid'
+
+ self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
+ self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
+ if packets is not None:
+ condition = 'packets:{}'.format(packets)
+ elif filesize is not None:
+ condition = 'filesize:{}'.format(filesize)
+
+ capture_cmd = ' '.join((cmd_dumpcap,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', 'files:2',
+ '-b', condition,
+ ))
+ pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_returncode = pipe_proc.returncode
+ self.assertEqual(pipe_returncode, 0)
+ if (pipe_returncode != 0):
+ return
+
+ rb_files = glob.glob(testout_glob)
+ for rbf in rb_files:
+ self.cleanup_files.append(rbf)
+
+ self.assertEqual(len(rb_files), 2)
+
+ for rbf in rb_files:
+ self.assertTrue(os.path.isfile(rbf))
+ if packets is not None:
+ self.checkPacketCount(packets, cap_file=rbf)
+ elif filesize is not None:
+ capturekb = os.path.getsize(rbf) / 1000
+ self.assertGreaterEqual(capturekb, filesize)
+ return check_dumpcap_ringbuffer_stdin_real
+
+
+@fixtures.mark_usefixtures('test_env')
+@fixtures.uses_fixtures
class case_wireshark_capture(subprocesstest.SubprocessTestCase):
- def test_wireshark_capture_10_packets_to_file(self):
+ def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets):
'''Capture 10 packets from the network to a file using Wireshark'''
- check_capture_10_packets(self, cmd=config.cmd_wireshark)
+ check_capture_10_packets(self, cmd=wireshark_k)
# Wireshark doesn't currently support writing to stdout while capturing.
- # def test_wireshark_capture_10_packets_to_stdout(self):
+ # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets):
# '''Capture 10 packets from the network to stdout using Wireshark'''
- # check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)
+ # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True)
- def test_wireshark_capture_from_fifo(self):
+ def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo):
'''Capture from a fifo using Wireshark'''
- check_capture_fifo(self, cmd=config.cmd_wireshark)
+ check_capture_fifo(self, cmd=wireshark_k)
- def test_wireshark_capture_from_stdin(self):
+ def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin):
'''Capture from stdin using Wireshark'''
- check_capture_stdin(self, cmd=config.cmd_wireshark)
+ check_capture_stdin(self, cmd=wireshark_k)
- def test_wireshark_capture_snapshot_len(self):
+ def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len):
'''Capture truncated packets using Wireshark'''
- check_capture_snapshot_len(self, cmd=config.cmd_wireshark)
+ check_capture_snapshot_len(self, cmd=wireshark_k)
+
+@fixtures.mark_usefixtures('test_env')
+@fixtures.uses_fixtures
class case_tshark_capture(subprocesstest.SubprocessTestCase):
- def test_tshark_capture_10_packets_to_file(self):
+ def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets):
'''Capture 10 packets from the network to a file using TShark'''
- check_capture_10_packets(self, cmd=config.cmd_tshark)
+ check_capture_10_packets(self, cmd=cmd_tshark)
- def test_tshark_capture_10_packets_to_stdout(self):
+ def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets):
'''Capture 10 packets from the network to stdout using TShark'''
- check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)
+ check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True)
- def test_tshark_capture_from_fifo(self):
+ def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo):
'''Capture from a fifo using TShark'''
- check_capture_fifo(self, cmd=config.cmd_tshark)
+ check_capture_fifo(self, cmd=cmd_tshark)
- def test_tshark_capture_from_stdin(self):
+ def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin):
'''Capture from stdin using TShark'''
- check_capture_stdin(self, cmd=config.cmd_tshark)
+ check_capture_stdin(self, cmd=cmd_tshark)
- def test_tshark_capture_snapshot_len(self):
+ def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len):
'''Capture truncated packets using TShark'''
- check_capture_snapshot_len(self, cmd=config.cmd_tshark)
+ check_capture_snapshot_len(self, cmd=cmd_tshark)
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
- def test_dumpcap_capture_10_packets_to_file(self):
+ def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets):
'''Capture 10 packets from the network to a file using Dumpcap'''
- check_capture_10_packets(self, cmd=config.cmd_dumpcap)
+ check_capture_10_packets(self, cmd=cmd_dumpcap)
- def test_dumpcap_capture_10_packets_to_stdout(self):
+ def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets):
'''Capture 10 packets from the network to stdout using Dumpcap'''
- check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)
+ check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True)
- def test_dumpcap_capture_from_fifo(self):
+ def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo):
'''Capture from a fifo using Dumpcap'''
- check_capture_fifo(self, cmd=config.cmd_dumpcap)
+ check_capture_fifo(self, cmd=cmd_dumpcap)
- def test_dumpcap_capture_from_stdin(self):
+ def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin):
'''Capture from stdin using Dumpcap'''
- check_capture_stdin(self, cmd=config.cmd_dumpcap)
+ check_capture_stdin(self, cmd=cmd_dumpcap)
- def test_dumpcap_capture_snapshot_len(self):
+ def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap):
'''Capture truncated packets using Dumpcap'''
- check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)
+ check_capture_snapshot_len(self, cmd=cmd_dumpcap)
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
# duration, filesize, packets, files
- def test_dumpcap_autostop_filesize(self):
+ def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin):
'''Capture from stdin using Dumpcap until we reach a file size limit'''
check_dumpcap_autostop_stdin(self, filesize=15)
- def test_dumpcap_autostop_packets(self):
+ def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin):
'''Capture from stdin using Dumpcap until we reach a packet limit'''
check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
+
+@fixtures.mark_usefixtures('base_env')
+@fixtures.uses_fixtures
class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
# duration, interval, filesize, packets, files
# Need a function that finds ringbuffer file names.
- def test_dumpcap_ringbuffer_filesize(self):
+ def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin):
'''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
check_dumpcap_ringbuffer_stdin(self, filesize=15)
- def test_dumpcap_ringbuffer_packets(self):
+ def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin):
'''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.
diff --git a/test/suite_clopts.py b/test/suite_clopts.py
index 9be3e13079..722769b08d 100644
--- a/test/suite_clopts.py
+++ b/test/suite_clopts.py
@@ -9,10 +9,8 @@
#
'''Command line option tests'''
-import config
import subprocess
import subprocesstest
-import unittest
import fixtures
#glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs')
@@ -44,36 +42,31 @@ class case_dumpcap_options(subprocesstest.SubprocessTestCase):
self.assertIn(process.returncode, valid_returns)
+@fixtures.mark_usefixtures('base_env')
@fixtures.uses_fixtures
class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
- def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, base_env):
+ def test_dumpcap_invalid_capfilter(self, cmd_dumpcap, capture_interface):
'''Invalid capture filter'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_filter = '__invalid_protocol'
# $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
- self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file), env=base_env)
+ self.runProcess((cmd_dumpcap, '-f', invalid_filter, '-w', testout_file))
self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
- def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, base_env):
+ def test_dumpcap_invalid_interface_name(self, cmd_dumpcap, capture_interface):
'''Invalid capture interface name'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_interface = '__invalid_interface'
# $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
- self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file), env=base_env)
+ self.runProcess((cmd_dumpcap, '-i', invalid_interface, '-w', testout_file))
self.assertTrue(self.grepOutput('The capture session could not be initiated'))
- def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, base_env):
+ def test_dumpcap_invalid_interface_index(self, cmd_dumpcap, capture_interface):
'''Invalid capture interface index'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_index = '0'
# $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
- self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file), env=base_env)
+ self.runProcess((cmd_dumpcap, '-i', invalid_index, '-w', testout_file))
self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
@@ -106,8 +99,9 @@ class case_tshark_options(subprocesstest.SubprocessTestCase):
self.assertRun((cmd_tshark, '-' + char_arg))
# XXX Should we generate individual test functions instead of looping?
- def test_tshark_interface_chars(self, cmd_tshark):
+ def test_tshark_interface_chars(self, cmd_tshark, cmd_dumpcap):
'''Valid tshark parameters requiring capture permissions'''
+ # These options require dumpcap
valid_returns = [self.exit_ok, self.exit_error]
for char_arg in 'DL':
process = self.runProcess((cmd_tshark, '-' + char_arg))
@@ -117,30 +111,24 @@ class case_tshark_options(subprocesstest.SubprocessTestCase):
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
- def test_tshark_invalid_capfilter(self, cmd_tshark):
+ def test_tshark_invalid_capfilter(self, cmd_tshark, capture_interface):
'''Invalid capture filter'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_filter = '__invalid_protocol'
# $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
self.runProcess((cmd_tshark, '-f', invalid_filter, '-w', testout_file ))
self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
- def test_tshark_invalid_interface_name(self, cmd_tshark):
+ def test_tshark_invalid_interface_name(self, cmd_tshark, capture_interface):
'''Invalid capture interface name'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_interface = '__invalid_interface'
# $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
self.runProcess((cmd_tshark, '-i', invalid_interface, '-w', testout_file))
self.assertTrue(self.grepOutput('The capture session could not be initiated'))
- def test_tshark_invalid_interface_index(self, cmd_tshark):
+ def test_tshark_invalid_interface_index(self, cmd_tshark, capture_interface):
'''Invalid capture interface index'''
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
invalid_index = '0'
# $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1
testout_file = self.filename_from_id(testout_pcap)
@@ -151,9 +139,7 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
@fixtures.mark_usefixtures('test_env')
@fixtures.uses_fixtures
class case_tshark_name_resolution_clopts(subprocesstest.SubprocessTestCase):
- def test_tshark_valid_name_resolution(self, cmd_tshark):
- if not config.canCapture():
- self.skipTest('Test requires capture privileges and an interface.')
+ def test_tshark_valid_name_resolution(self, cmd_tshark, capture_interface):
# $TSHARK -N mnNtdv -a duration:1 > ./testout.txt 2>&1
self.assertRun((cmd_tshark, '-N', 'mnNtdv', '-a', 'duration: 1'))
diff --git a/test/suite_fileformats.py b/test/suite_fileformats.py
index 86be14d378..1d482dff43 100644
--- a/test/suite_fileformats.py
+++ b/test/suite_fileformats.py
@@ -34,29 +34,29 @@ def fileformats_baseline_str(dirs):
class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
def test_pcap_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs microsecond pcap stdin'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess(' '.join((cmd_tshark,
'-r', '-',
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
'<', capture_file('dhcp.pcap')
- , shell=True),
+ )),
shell=True)
self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
def test_pcap_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs nanosecond pcap stdin'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess(' '.join((cmd_tshark,
'-r', '-',
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
'<', capture_file('dhcp-nanosecond.pcap')
- , shell=True),
+ )),
shell=True)
self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
def test_pcap_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs nanosecond pcap direct'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess((cmd_tshark,
'-r', capture_file('dhcp-nanosecond.pcap'),
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
@@ -70,18 +70,18 @@ class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
def test_pcapng_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs microsecond pcapng stdin'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess(' '.join((cmd_tshark,
'-r', '-',
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
'<', capture_file('dhcp.pcapng')
- , shell=True),
+ )),
shell=True)
self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
def test_pcapng_usec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs microsecond pcapng direct'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess((cmd_tshark,
'-r', capture_file('dhcp.pcapng'),
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
@@ -91,18 +91,18 @@ class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
def test_pcapng_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs nanosecond pcapng stdin'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess(' '.join((cmd_tshark,
'-r', '-',
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
'<', capture_file('dhcp-nanosecond.pcapng')
- , shell=True),
+ )),
shell=True)
self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file))
def test_pcapng_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str):
'''Microsecond pcap direct vs nanosecond pcapng direct'''
- capture_proc = self.runProcess(subprocesstest.capture_command(cmd_tshark,
+ capture_proc = self.runProcess((cmd_tshark,
'-r', capture_file('dhcp-nanosecond.pcapng'),
'-Tfields',
'-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
diff --git a/test/suite_io.py b/test/suite_io.py
index 9561b6418e..9b0fa9cf42 100644
--- a/test/suite_io.py
+++ b/test/suite_io.py
@@ -47,7 +47,7 @@ def check_io_4_packets(self, capture_file, cmd=None, from_stdin=False, to_stdout
io_proc = self.runProcess(stdout_cmd, shell=True)
else: # direct->direct
# $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w ./testout.pcap > ./testout.txt 2>&1
- io_proc = self.runProcess(subprocesstest.capture_command(cmd,
+ io_proc = self.runProcess((cmd,
'-r', capture_file('dhcp.pcap'),
'-w', testout_file,
))
diff --git a/test/suite_unittests.py b/test/suite_unittests.py
index 94621ed7fa..6200dcfcba 100644
--- a/test/suite_unittests.py
+++ b/test/suite_unittests.py
@@ -11,7 +11,6 @@
#
'''EPAN unit tests'''
-import config
import difflib
import os.path
import re
@@ -53,7 +52,7 @@ class case_unittests(subprocesstest.SubprocessTestCase):
'''fieldcount'''
self.assertRun((cmd_tshark, '-G', 'fieldcount'), env=test_env)
- def test_unit_ctest_coverage(self):
+ def test_unit_ctest_coverage(self, all_test_groups):
'''Make sure CTest runs all of our tests.'''
with open(os.path.join(os.path.dirname(__file__), '..', 'CMakeLists.txt')) as cml_fd:
group_re = re.compile(r'set *\( *_test_group_list')
@@ -68,8 +67,8 @@ class case_unittests(subprocesstest.SubprocessTestCase):
break
cml_groups.append(cml_line.strip())
cml_groups.sort()
- if not config.all_groups == cml_groups:
- diff = '\n'.join(list(difflib.unified_diff(config.all_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups')))
+ if not all_test_groups == cml_groups:
+ diff = '\n'.join(list(difflib.unified_diff(all_test_groups, cml_groups, 'all test groups', 'CMakeLists.txt test groups')))
self.fail("CMakeLists.txt doesn't test all available groups:\n" + diff)
diff --git a/test/test.py b/test/test.py
index c81f14c367..f775bae85c 100755
--- a/test/test.py
+++ b/test/test.py
@@ -13,17 +13,22 @@
# To do:
# - Avoid printing Python tracebacks when we assert? It looks like we'd need
# to override unittest.TextTestResult.addFailure().
-# - Remove BIN_PATH/hosts via config.tearDownHostFiles + case_name_resolution.tearDownClass?
import argparse
-import config
import os.path
import sys
import unittest
+import fixtures
# Required to make fixtures available to tests!
import fixtures_ws
+_all_test_groups = None
+
+@fixtures.fixture(scope='session')
+def all_test_groups():
+ return _all_test_groups
+
def find_test_ids(suite, all_ids):
if hasattr(suite, '__iter__'):
for s in suite:
@@ -47,10 +52,9 @@ def main():
parser = argparse.ArgumentParser(description='Wireshark unit tests')
cap_group = parser.add_mutually_exclusive_group()
- cap_group.add_argument('-e', '--enable-capture', action='store_true', help='Enable capture tests')
cap_group.add_argument('-E', '--disable-capture', action='store_true', help='Disable capture tests')
- cap_group.add_argument('-i', '--capture-interface', nargs=1, default=None, help='Capture interface index or name')
- parser.add_argument('-p', '--program-path', nargs=1, default=os.path.curdir, help='Path to Wireshark executables.')
+ cap_group.add_argument('-i', '--capture-interface', help='Capture interface index or name')
+ parser.add_argument('-p', '--program-path', default=os.path.curdir, help='Path to Wireshark executables.')
list_group = parser.add_mutually_exclusive_group()
list_group.add_argument('-l', '--list', action='store_true', help='List tests. One of "all" or a full or partial test name.')
list_group.add_argument('--list-suites', action='store_true', help='List all suites.')
@@ -60,14 +64,6 @@ def main():
parser.add_argument('tests_to_run', nargs='*', metavar='test', default=['all'], help='Tests to run. One of "all" or a full or partial test name. Default is "all".')
args = parser.parse_args()
- if args.enable_capture:
- config.setCanCapture(True)
- elif args.disable_capture:
- config.setCanCapture(False)
-
- if args.capture_interface:
- config.setCaptureInterface(args.capture_interface[0])
-
all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*')
all_ids = []
@@ -96,8 +92,7 @@ def main():
for aid in all_ids:
aparts = aid.split('.')
all_suites |= {aparts[0]}
- config.all_suites = list(all_suites)
- config.all_suites.sort()
+ all_suites = sorted(all_suites)
all_groups = set()
for aid in all_ids:
@@ -106,15 +101,16 @@ def main():
all_groups |= {'.'.join(aparts[:2])}
else:
all_groups |= {aparts[0]}
- config.all_groups = list(all_groups)
- config.all_groups.sort()
+ all_groups = sorted(all_groups)
+ global _all_test_groups
+ _all_test_groups = all_groups
if args.list_suites:
- print('\n'.join(config.all_suites))
+ print('\n'.join(all_suites))
sys.exit(0)
if args.list_groups:
- print('\n'.join(config.all_groups))
+ print('\n'.join(all_groups))
sys.exit(0)
if args.list_cases:
@@ -125,13 +121,6 @@ def main():
print('\n'.join(list(cases)))
sys.exit(0)
- program_path = args.program_path[0]
- if not config.setProgramPath(program_path):
- print('One or more required executables not found at {}\n'.format(program_path))
- parser.print_usage()
- sys.exit(1)
-
- #
if sys.stdout.encoding != 'UTF-8':
import codecs
import locale
@@ -142,7 +131,7 @@ def main():
run_suite = unittest.defaultTestLoader.loadTestsFromNames(run_ids)
runner = unittest.TextTestRunner(verbosity=args.verbose)
# for unittest compatibility (not needed with pytest)
- fixtures_ws.fixtures.create_session()
+ fixtures_ws.fixtures.create_session(args)
try:
test_result = runner.run(run_suite)
finally: