aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docbook/wsdg_src/WSDG_chapter_tests.asciidoc9
-rw-r--r--test/baseline/ff-ts-usec-pcap-direct.txt4
-rw-r--r--test/config.py9
-rw-r--r--test/subprocesstest.py63
-rw-r--r--test/suite_capture.py70
-rw-r--r--test/suite_clopts.py20
-rw-r--r--test/suite_decryption.py1
-rw-r--r--test/suite_dissection.py1
-rw-r--r--test/suite_fileformats.py112
-rw-r--r--test/suite_io.py89
-rwxr-xr-xtest/test.py1
-rwxr-xr-xtest/util_dump_dhcp_pcap.py44
-rw-r--r--test/util_slow_dhcp_pcap.py27
13 files changed, 358 insertions, 92 deletions
diff --git a/docbook/wsdg_src/WSDG_chapter_tests.asciidoc b/docbook/wsdg_src/WSDG_chapter_tests.asciidoc
index 249e5a416d..092ebe88cc 100644
--- a/docbook/wsdg_src/WSDG_chapter_tests.asciidoc
+++ b/docbook/wsdg_src/WSDG_chapter_tests.asciidoc
@@ -22,7 +22,10 @@ To run all tests from CMake do the following:
* Pass `-DTEST_EXTRA_ARGS=--disable-capture` or
`-DTEST_EXTRA_ARGS=--capture-interface=<interface>`
as needed for your system.
-* Build the “test” target or run ctest, e.g. `ctest --jobs=4 --verbose`.
+* Build the “test” target or run ctest, e.g. `ctest --force-new-ctest-process -j 4 --verbose`.
+
+On Windows, “ctest” requires a build configuration parameter, e.g.
+`ctest -C RelWithDebInfo --force-new-ctest-process -j 4 --verbose`.
To run all tests directly, run `test.py -p
/path/to/wireshark-build/run-directory <capture args>`.
@@ -168,3 +171,7 @@ class case_decrypt_80211(subprocesstest.SubprocessTestCase):
self.assertTrue(self.grepOutput('favicon.ico'))
----
+Tests can be run in parallel. This means that any files you create must
+be unique for each test. “subprocesstest.filename_from_id” can be used
+to generate a filename based on the current test name. It also ensures
+that the file will be automatically removed after the test has run.
diff --git a/test/baseline/ff-ts-usec-pcap-direct.txt b/test/baseline/ff-ts-usec-pcap-direct.txt
new file mode 100644
index 0000000000..f4ac417f77
--- /dev/null
+++ b/test/baseline/ff-ts-usec-pcap-direct.txt
@@ -0,0 +1,4 @@
+1 1102274184.317453000 0.000000000
+2 1102274184.317748000 0.000295000
+3 1102274184.387484000 0.069736000
+4 1102274184.387798000 0.000314000
diff --git a/test/config.py b/test/config.py
index e767a4aa10..5b34d13f40 100644
--- a/test/config.py
+++ b/test/config.py
@@ -1,4 +1,5 @@
#
+# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
@@ -16,10 +17,11 @@ import sys
import tempfile
commands = (
+ 'capinfos',
'dumpcap',
+ 'rawshark',
'tshark',
'wireshark',
- 'capinfos',
)
can_capture = False
@@ -27,10 +29,11 @@ capture_interface = None
# Our executables
# Strings
-cmd_tshark = None
+cmd_capinfos = None
cmd_dumpcap = None
+cmd_rawshark = None
+cmd_tshark = None
cmd_wireshark = None
-cmd_capinfos = None
# Arrays
args_ping = None
diff --git a/test/subprocesstest.py b/test/subprocesstest.py
index 18313db795..357104b461 100644
--- a/test/subprocesstest.py
+++ b/test/subprocesstest.py
@@ -1,4 +1,5 @@
#
+# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
@@ -8,6 +9,8 @@
#
'''Subprocess test case superclass'''
+import config
+import difflib
import io
import os
import os.path
@@ -25,6 +28,32 @@ import unittest
if sys.version_info[0] >= 3:
process_timeout = 300 # Seconds
+def capture_command(cmd, *args, **kwargs):
+ '''Convert the supplied arguments into a command suitable for SubprocessTestCase.
+
+ If shell is true, return a string. Otherwise, return a list of arguments.'''
+ shell = kwargs.pop('shell', False)
+ if shell:
+ cap_cmd = ['"' + cmd + '"']
+ else:
+ cap_cmd = [cmd]
+ if cmd == config.cmd_wireshark:
+ cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
+ cap_cmd += args
+ if shell:
+ return ' '.join(cap_cmd)
+ else:
+ return cap_cmd
+
+def cat_dhcp_command(mode):
+ '''Create a command string for dumping dhcp.pcap to stdout'''
+ # XXX Do this in Python in a thread?
+ sd_cmd = ''
+ if sys.executable:
+ sd_cmd = sys.executable + ' '
+ sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
+ return sd_cmd
+
class LoggingPopen(subprocess.Popen):
'''Run a process using subprocess.Popen. Capture and log its output.
@@ -104,7 +133,10 @@ class SubprocessTestCase(unittest.TestCase):
def filename_from_id(self, filename):
'''Generate a filename prefixed with our test ID.'''
- return self.id() + '.' + filename
+ id_filename = self.id() + '.' + filename
+ if id_filename not in self.cleanup_files:
+ self.cleanup_files.append(id_filename)
+ return id_filename
def kill_processes(self):
'''Kill any processes we've opened so far'''
@@ -158,6 +190,18 @@ class SubprocessTestCase(unittest.TestCase):
pass
self.cleanup_files = []
+ def checkPacketCount(self, num_packets, cap_file=None):
+ got_num_packets = False
+ if not cap_file:
+ cap_file = self.filename_from_id('testout.pcap')
+ self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
+ capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file)))
+ self.log_fd_write_bytes(capinfos_testout)
+ count_pat = 'Number of packets:\s+{}'.format(num_packets)
+ if re.search(count_pat, capinfos_testout):
+ got_num_packets = True
+ self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
+
def countOutput(self, search_pat, proc=None):
'''Returns the number of output lines (search_pat=None), otherwise returns a match count.'''
match_count = 0
@@ -174,6 +218,23 @@ class SubprocessTestCase(unittest.TestCase):
def grepOutput(self, search_pat, proc=None):
return self.countOutput(search_pat, proc) > 0
+ def diffOutput(self, blob_a, blob_b, *args, **kwargs):
+ '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ.
+
+ blob_a and blob_b must be UTF-8 strings.'''
+ lines_a = blob_a.splitlines()
+ lines_b = blob_b.splitlines()
+ diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs)))
+ if len(diff) > 0:
+ if sys.version_info[0] < 3 and not isinstance(diff, unicode):
+ diff = unicode(diff, 'UTF-8', 'replace')
+ self.log_fd.flush()
+ self.log_fd.write(u'-- Begin diff output --\n')
+ self.log_fd.writelines(diff)
+ self.log_fd.write(u'-- End diff output --\n')
+ return False
+ return True
+
def startProcess(self, proc_args, env=None, shell=False):
'''Start a process in the background. Returns a subprocess.Popen object. You typically wait for it using waitProcess() or assertWaitProcess().'''
proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd)
diff --git a/test/suite_capture.py b/test/suite_capture.py
index ff6804f5df..d76e2e24e0 100644
--- a/test/suite_capture.py
+++ b/test/suite_capture.py
@@ -1,4 +1,5 @@
#
+# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
@@ -24,28 +25,6 @@ snapshot_len = 96
capture_env = os.environ.copy()
capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
-def capture_command(cmd, *args, **kwargs):
- shell = kwargs.pop('shell', False)
- if shell:
- cap_cmd = ['"' + cmd + '"']
- else:
- cap_cmd = [cmd]
- if cmd == config.cmd_wireshark:
- cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
- cap_cmd += args
- if shell:
- return ' '.join(cap_cmd)
- else:
- return cap_cmd
-
-def slow_dhcp_command():
- # XXX Do this in Python in a thread?
- sd_cmd = ''
- if sys.executable:
- sd_cmd = '"{}" '.format(sys.executable)
- sd_cmd += os.path.join(config.this_dir, 'util_slow_dhcp_pcap.py')
- return sd_cmd
-
def start_pinging(self):
ping_procs = []
if sys.platform.startswith('win32'):
@@ -61,19 +40,8 @@ def stop_pinging(ping_procs):
for proc in ping_procs:
proc.kill()
-def check_testout_num_packets(self, num_packets, cap_file=None):
- got_num_packets = False
- if not cap_file:
- cap_file = self.filename_from_id(testout_pcap)
- self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
- capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file)))
- self.log_fd_write_bytes(capinfos_testout)
- count_pat = 'Number of packets:\s+{}'.format(num_packets)
- if re.search(count_pat, capinfos_testout):
- got_num_packets = True
- self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
-
def check_capture_10_packets(self, cmd=None, to_stdout=False):
+ # Similar to suite_io.check_io_4_packets.
if not config.canCapture():
self.skipTest('Test requires capture privileges and an interface.')
if cmd == config.cmd_wireshark and not config.canDisplay():
@@ -84,7 +52,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
testout_file = self.filename_from_id(testout_pcap)
ping_procs = start_pinging(self)
if to_stdout:
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', '"{}"'.format(config.capture_interface),
'-p',
'-w', '-',
@@ -98,7 +66,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
shell=True
)
else:
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', config.capture_interface,
'-p',
'-w', testout_file,
@@ -110,13 +78,12 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
)
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
- self.cleanup_files.append(testout_file)
if capture_returncode != 0:
self.log_fd.write('{} -D output:\n'.format(cmd))
self.runProcess((cmd, '-D'))
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
- check_testout_num_packets(self, 10)
+ self.checkPacketCount(10)
def check_capture_fifo(self, cmd=None):
if not config.canMkfifo():
@@ -127,18 +94,17 @@ def check_capture_fifo(self, cmd=None):
capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
testout_file = self.filename_from_id(testout_pcap)
fifo_file = self.filename_from_id('testout.fifo')
- self.cleanup_files.append(fifo_file)
try:
# If a previous test left its fifo laying around, e.g. from a failure, remove it.
os.unlink(fifo_file)
except:
pass
os.mkfifo(fifo_file)
- slow_dhcp_cmd = slow_dhcp_command()
+ slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
fifo_proc = self.startProcess(
('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
shell=True)
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', fifo_file,
'-p',
'-w', testout_file,
@@ -146,22 +112,22 @@ def check_capture_fifo(self, cmd=None):
),
env=capture_env
)
- self.cleanup_files.append(testout_file)
fifo_proc.kill()
self.assertTrue(os.path.isfile(testout_file))
capture_returncode = capture_proc.returncode
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
- check_testout_num_packets(self, 8)
+ self.checkPacketCount(8)
def check_capture_stdin(self, cmd=None):
+ # Similar to suite_io.check_io_4_packets.
if cmd == config.cmd_wireshark and not config.canDisplay():
self.skipTest('Test requires a display.')
self.assertIsNotNone(cmd)
capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
testout_file = self.filename_from_id(testout_pcap)
- slow_dhcp_cmd = slow_dhcp_command()
- capture_cmd = capture_command(cmd,
+ slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+ capture_cmd = subprocesstest.capture_command(cmd,
'-i', '-',
'-w', testout_file,
'-a', 'duration:{}'.format(capture_duration),
@@ -170,7 +136,6 @@ def check_capture_stdin(self, cmd=None):
if cmd == config.cmd_wireshark:
capture_cmd += ' -o console.log.level:127'
pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True)
- self.cleanup_files.append(testout_file)
pipe_returncode = pipe_proc.returncode
self.assertEqual(pipe_returncode, 0)
if cmd == config.cmd_wireshark:
@@ -179,7 +144,7 @@ def check_capture_stdin(self, cmd=None):
self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
self.assertTrue(os.path.isfile(testout_file))
if (pipe_returncode == 0):
- check_testout_num_packets(self, 8)
+ self.checkPacketCount(8)
def check_capture_2multi_10packets(self, cmd=None):
# This was present in the Bash version but was incorrect and not part of any suite.
@@ -196,7 +161,7 @@ def check_capture_read_filter(self, cmd=None):
self.assertIsNotNone(cmd)
ping_procs = start_pinging(self)
testout_file = self.filename_from_id(testout_pcap)
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', config.capture_interface,
'-p',
'-w', testout_file,
@@ -210,11 +175,10 @@ def check_capture_read_filter(self, cmd=None):
)
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
- self.cleanup_files.append(testout_file)
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
- check_testout_num_packets(self, 0)
+ self.checkPacketCount(0)
def check_capture_snapshot_len(self, cmd=None):
if not config.canCapture():
@@ -226,7 +190,7 @@ def check_capture_snapshot_len(self, cmd=None):
self.assertIsNotNone(cmd)
ping_procs = start_pinging(self)
testout_file = self.filename_from_id(testout_pcap)
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', config.capture_interface,
'-p',
'-w', testout_file,
@@ -238,7 +202,6 @@ def check_capture_snapshot_len(self, cmd=None):
)
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
- self.cleanup_files.append(testout_file)
self.assertEqual(capture_returncode, 0)
self.assertTrue(os.path.isfile(testout_file))
@@ -251,10 +214,9 @@ def check_capture_snapshot_len(self, cmd=None):
'-Y', 'frame.cap_len>{}'.format(snapshot_len),
))
filter_returncode = filter_proc.returncode
- self.cleanup_files.append(testout2_file)
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
- check_testout_num_packets(self, 0, cap_file=testout2_file)
+ self.checkPacketCount(0, cap_file=testout2_file)
class case_wireshark_capture(subprocesstest.SubprocessTestCase):
def test_wireshark_capture_10_packets_to_file(self):
diff --git a/test/suite_clopts.py b/test/suite_clopts.py
index 3516d5fa36..a5312c4ad5 100644
--- a/test/suite_clopts.py
+++ b/test/suite_clopts.py
@@ -1,4 +1,5 @@
#
+# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
@@ -17,6 +18,7 @@ import unittest
#glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs')
glossaries = ('decodes', 'values')
+testout_pcap = 'testout.pcap'
class case_dumpcap_invalid_chars(subprocesstest.SubprocessTestCase):
# XXX Should we generate individual test functions instead of looping?
@@ -51,7 +53,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
self.skipTest('Test requires capture privileges and an interface.')
invalid_filter = '__invalid_protocol'
# $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
- self.runProcess((config.cmd_dumpcap, '-f', invalid_filter, '-w', 'testout.pcap' ))
+ testout_file = self.filename_from_id(testout_pcap)
+ self.runProcess((config.cmd_dumpcap, '-f', invalid_filter, '-w', testout_file ))
self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
def test_dumpcap_invalid_interface_name(self):
@@ -60,7 +63,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
self.skipTest('Test requires capture privileges and an interface.')
invalid_interface = '__invalid_interface'
# $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
- self.runProcess((config.cmd_dumpcap, '-i', invalid_interface, '-w', 'testout.pcap'))
+ testout_file = self.filename_from_id(testout_pcap)
+ self.runProcess((config.cmd_dumpcap, '-i', invalid_interface, '-w', testout_file))
self.assertTrue(self.grepOutput('The capture session could not be initiated'))
def test_dumpcap_invalid_interface_index(self):
@@ -69,7 +73,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase):
self.skipTest('Test requires capture privileges and an interface.')
invalid_index = '0'
# $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1
- self.runProcess((config.cmd_dumpcap, '-i', invalid_index, '-w', 'testout.pcap'))
+ testout_file = self.filename_from_id(testout_pcap)
+ self.runProcess((config.cmd_dumpcap, '-i', invalid_index, '-w', testout_file))
self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
@@ -119,7 +124,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
self.skipTest('Test requires capture privileges and an interface.')
invalid_filter = '__invalid_protocol'
# $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1
- self.runProcess((config.cmd_tshark, '-f', invalid_filter, '-w', 'testout.pcap' ))
+ testout_file = self.filename_from_id(testout_pcap)
+ self.runProcess((config.cmd_tshark, '-f', invalid_filter, '-w', testout_file ))
self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface'))
def test_tshark_invalid_interface_name(self):
@@ -128,7 +134,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
self.skipTest('Test requires capture privileges and an interface.')
invalid_interface = '__invalid_interface'
# $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1
- self.runProcess((config.cmd_tshark, '-i', invalid_interface, '-w', 'testout.pcap'))
+ testout_file = self.filename_from_id(testout_pcap)
+ self.runProcess((config.cmd_tshark, '-i', invalid_interface, '-w', testout_file))
self.assertTrue(self.grepOutput('The capture session could not be initiated'))
def test_tshark_invalid_interface_index(self):
@@ -137,7 +144,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase):
self.skipTest('Test requires capture privileges and an interface.')
invalid_index = '0'
# $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1
- self.runProcess((config.cmd_tshark, '-i', invalid_index, '-w', 'testout.pcap'))
+ testout_file = self.filename_from_id(testout_pcap)
+ self.runProcess((config.cmd_tshark, '-i', invalid_index, '-w', testout_file))
self.assertTrue(self.grepOutput('There is no interface with that adapter index'))
diff --git a/test/suite_decryption.py b/test/suite_decryption.py
index 5b0bc77b83..e5092a19c5 100644
--- a/test/suite_decryption.py
+++ b/test/suite_decryption.py
@@ -1,4 +1,5 @@
#
+# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
diff --git a/test/suite_dissection.py b/test/suite_dissection.py
index cfdab9253b..b81d048b53 100644
--- a/test/suite_dissection.py
+++ b/test/suite_dissection.py
@@ -1,4 +1,5 @@
#
+# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
diff --git a/test/suite_fileformats.py b/test/suite_fileformats.py
new file mode 100644
index 0000000000..6ebf88d2fb
--- /dev/null
+++ b/test/suite_fileformats.py
@@ -0,0 +1,112 @@
+#
+# -*- coding: utf-8 -*-
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''File format conversion tests'''
+
+import config
+import io
+import os.path
+import subprocesstest
+import sys
+import unittest
+
+# XXX Currently unused. It would be nice to be able to use this below.
+time_output_args = ('-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta')
+
+# Microsecond pcap, direct read was used to generate the baseline:
+# tshark -Tfields -e frame.number -e frame.time_epoch -e frame.time_delta \
+# -r captures/dhcp.pcap > baseline/ff-ts-usec-pcap-direct.txt
+baseline_file = 'ff-ts-usec-pcap-direct.txt'
+baseline_fd = io.open(os.path.join(config.baseline_dir, baseline_file), 'r', encoding='UTF-8', errors='replace')
+baseline_str = baseline_fd.read()
+baseline_fd.close()
+
+class case_fileformat_pcap(subprocesstest.SubprocessTestCase):
+ def test_pcap_usec_stdin(self):
+ '''Microsecond pcap direct vs microsecond pcap stdin'''
+ capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+ capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+ '-r', '-',
+ '-Tfields',
+ '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+ '<', capture_file
+ , shell=True),
+ shell=True)
+ self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+ def test_pcap_nsec_stdin(self):
+ '''Microsecond pcap direct vs nanosecond pcap stdin'''
+ capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcap')
+ capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+ '-r', '-',
+ '-Tfields',
+ '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+ '<', capture_file
+ , shell=True),
+ shell=True)
+ self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+ def test_pcap_nsec_direct(self):
+ '''Microsecond pcap direct vs nanosecond pcap direct'''
+ capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcap')
+ capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+ '-r', capture_file,
+ '-Tfields',
+ '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+ ),
+ )
+ self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+class case_fileformat_pcapng(subprocesstest.SubprocessTestCase):
+ def test_pcapng_usec_stdin(self):
+ '''Microsecond pcap direct vs microsecond pcapng stdin'''
+ capture_file = os.path.join(config.capture_dir, 'dhcp.pcapng')
+ capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+ '-r', '-',
+ '-Tfields',
+ '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
+ '<', capture_file
+ , shell=True),
+ shell=True)
+ self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+ def test_pcapng_usec_direct(self):
+ '''Microsecond pcap direct vs microsecond pcapng direct'''
+ capture_file = os.path.join(config.capture_dir, 'dhcp.pcapng')
+ capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+ '-r', capture_file,
+ '-Tfields',
+ '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+ ),
+ )
+ self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+ def test_pcapng_nsec_stdin(self):
+ '''Microsecond pcap direct vs nanosecond pcapng stdin'''
+ capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcapng')
+ capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+ '-r', '-',
+ '-Tfields',
+ '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta'
+ '<', capture_file
+ , shell=True),
+ shell=True)
+ self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
+ def test_pcapng_nsec_direct(self):
+ '''Microsecond pcap direct vs nanosecond pcapng direct'''
+ capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcapng')
+ capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark,
+ '-r', capture_file,
+ '-Tfields',
+ '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta',
+ ),
+ )
+ self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file))
+
diff --git a/test/suite_io.py b/test/suite_io.py
new file mode 100644
index 0000000000..c8591ce69c
--- /dev/null
+++ b/test/suite_io.py
@@ -0,0 +1,89 @@
+#
+# -*- coding: utf-8 -*-
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''File I/O tests'''
+
+import config
+import io
+import os.path
+import subprocesstest
+import sys
+import unittest
+
+testout_pcap = 'testout.pcap'
+baseline_file = 'io-rawshark-dhcp-pcap.txt'
+baseline_fd = io.open(os.path.join(config.baseline_dir, baseline_file), 'r', encoding='UTF-8', errors='replace')
+baseline_str = baseline_fd.read()
+baseline_fd.close()
+
+def check_io_4_packets(self, cmd=None, from_stdin=False, to_stdout=False):
+ # Test direct->direct, stdin->direct, and direct->stdout file I/O.
+ # Similar to suite_capture.check_capture_10_packets and
+ # suite_capture.check_capture_stdin.
+ if cmd == config.cmd_wireshark and not config.canDisplay():
+ self.skipTest('Test requires a display.')
+ self.assertIsNotNone(cmd)
+ capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+ testout_file = self.filename_from_id(testout_pcap)
+ if from_stdin and to_stdout:
+ # XXX If we support this, should we bother with separate stdin->direct
+ # and direct->stdout tests?
+ self.fail('Stdin and stdout not supported in the same test.')
+ elif from_stdin:
+ # cat -B "${CAPTURE_DIR}dhcp.pcap" | $DUT -r - -w ./testout.pcap 2>./testout.txt
+ cat_dhcp_cmd = subprocesstest.cat_dhcp_command('cat')
+ stdin_cmd = '{0} | "{1}" -r - -w "{2}"'.format(cat_dhcp_cmd, cmd, testout_file)
+ io_proc = self.runProcess(stdin_cmd, shell=True)
+ elif to_stdout:
+ # $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w - > ./testout.pcap 2>./testout.txt
+ stdout_cmd = '"{0}" -r "{1}" -w - > "{2}"'.format(cmd, capture_file, testout_file)
+ io_proc = self.runProcess(stdout_cmd, shell=True)
+ else: # direct->direct
+ # $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w ./testout.pcap > ./testout.txt 2>&1
+ capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+ io_proc = self.runProcess(subprocesstest.capture_command(cmd,
+ '-r', capture_file,
+ '-w', testout_file,
+ ))
+ io_returncode = io_proc.returncode
+ self.assertEqual(io_returncode, 0)
+ self.assertTrue(os.path.isfile(testout_file))
+ if (io_returncode == 0):
+ self.checkPacketCount(4)
+
+class case_tshark_io(subprocesstest.SubprocessTestCase):
+ def test_tshark_io_stdin_direct(self):
+ '''Read from stdin and write direct using TShark'''
+ check_io_4_packets(self, cmd=config.cmd_tshark, from_stdin=True)
+
+ def test_tshark_io_direct_stdout(self):
+ '''Read direct and write to stdout using TShark'''
+ check_io_4_packets(self, cmd=config.cmd_tshark, to_stdout=True)
+
+ def test_tshark_io_direct_direct(self):
+ '''Read direct and write direct using TShark'''
+ check_io_4_packets(self, cmd=config.cmd_tshark)
+
+# The Bash version didn't test Wireshark or dumpcap
+
+class case_rawshark_io(subprocesstest.SubprocessTestCase):
+ @unittest.skipUnless(sys.byteorder == 'little', 'Requires a little endian system')
+ def test_rawshark_io_stdin(self):
+ '''Read from stdin using Rawshark'''
+ # tail -c +25 "${CAPTURE_DIR}dhcp.pcap" | $RAWSHARK -dencap:1 -R "udp.port==68" -nr - > $IO_RAWSHARK_DHCP_PCAP_TESTOUT 2> /dev/null
+ # diff -u --strip-trailing-cr $IO_RAWSHARK_DHCP_PCAP_BASELINE $IO_RAWSHARK_DHCP_PCAP_TESTOUT > $DIFF_OUT 2>&1
+ capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+ testout_file = self.filename_from_id(testout_pcap)
+ raw_dhcp_cmd = subprocesstest.cat_dhcp_command('raw')
+ rawshark_cmd = '{0} | "{1}" -r - -n -dencap:1 -R "udp.port==68"'.format(raw_dhcp_cmd, config.cmd_rawshark)
+ rawshark_proc = self.runProcess(rawshark_cmd, shell=True)
+ rawshark_returncode = rawshark_proc.returncode
+ self.assertEqual(rawshark_returncode, 0)
+ if (rawshark_returncode == 0):
+ self.assertTrue(self.diffOutput(rawshark_proc.stdout_str, baseline_str, 'rawshark', baseline_file))
diff --git a/test/test.py b/test/test.py
index f53d90abaa..26e1a75239 100755
--- a/test/test.py
+++ b/test/test.py
@@ -1,4 +1,5 @@
#!/usr/bin/env python
+# -*- coding: utf-8 -*-
#
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
diff --git a/test/util_dump_dhcp_pcap.py b/test/util_dump_dhcp_pcap.py
new file mode 100755
index 0000000000..2fbd7fa138
--- /dev/null
+++ b/test/util_dump_dhcp_pcap.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+#
+# Wireshark tests
+# By Gerald Combs <gerald@wireshark.org>
+#
+# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+'''Write captures/dhcp.pcap to stdout, optionally writing only packet records or writing them slowly.'''
+
+import argparse
+import os
+import os.path
+import time
+import sys
+
+def main():
+ parser = argparse.ArgumentParser(description='Dump dhcp.pcap')
+ parser.add_argument('dump_type', choices=['cat', 'slow', 'raw'],
+ help='cat: Just dump the file. slow: Dump the file, pause, and dump its packet records. raw: Dump only the packet records.')
+ args = parser.parse_args()
+
+ if sys.version_info[0] < 3 and sys.platform == "win32":
+ import msvcrt
+ msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
+
+ dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap')
+
+ dhcp_fd = open(dhcp_pcap, 'rb')
+ contents = dhcp_fd.read()
+ if args.dump_type != 'raw':
+ os.write(1, contents)
+ if args.dump_type == 'cat':
+ sys.exit(0)
+ if args.dump_type == 'slow':
+ time.sleep(1.5)
+ # slow, raw
+ os.write(1, contents[24:])
+ sys.exit(0)
+
+if __name__ == '__main__':
+ main()
+
diff --git a/test/util_slow_dhcp_pcap.py b/test/util_slow_dhcp_pcap.py
deleted file mode 100644
index c0a34e6d54..0000000000
--- a/test/util_slow_dhcp_pcap.py
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env python
-#
-# Wireshark tests
-# By Gerald Combs <gerald@wireshark.org>
-#
-# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
-#
-# SPDX-License-Identifier: GPL-2.0-or-later
-#
-'''Write captures/dhcp.pcap to stdout, pause 1.5 seconds, and write it again.'''
-
-import os
-import os.path
-import time
-import sys
-
-if sys.version_info[0] < 3 and sys.platform == "win32":
- import msvcrt
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
-
-dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap')
-
-dhcp_fd = open(dhcp_pcap, 'rb')
-contents = dhcp_fd.read()
-os.write(1, contents)
-time.sleep(1.5)
-os.write(1, contents[24:])