diff options
-rw-r--r-- | docbook/wsdg_src/WSDG_chapter_tests.asciidoc | 9 | ||||
-rw-r--r-- | test/baseline/ff-ts-usec-pcap-direct.txt | 4 | ||||
-rw-r--r-- | test/config.py | 9 | ||||
-rw-r--r-- | test/subprocesstest.py | 63 | ||||
-rw-r--r-- | test/suite_capture.py | 70 | ||||
-rw-r--r-- | test/suite_clopts.py | 20 | ||||
-rw-r--r-- | test/suite_decryption.py | 1 | ||||
-rw-r--r-- | test/suite_dissection.py | 1 | ||||
-rw-r--r-- | test/suite_fileformats.py | 112 | ||||
-rw-r--r-- | test/suite_io.py | 89 | ||||
-rwxr-xr-x | test/test.py | 1 | ||||
-rwxr-xr-x | test/util_dump_dhcp_pcap.py | 44 | ||||
-rw-r--r-- | test/util_slow_dhcp_pcap.py | 27 |
13 files changed, 358 insertions, 92 deletions
diff --git a/docbook/wsdg_src/WSDG_chapter_tests.asciidoc b/docbook/wsdg_src/WSDG_chapter_tests.asciidoc index 249e5a416d..092ebe88cc 100644 --- a/docbook/wsdg_src/WSDG_chapter_tests.asciidoc +++ b/docbook/wsdg_src/WSDG_chapter_tests.asciidoc @@ -22,7 +22,10 @@ To run all tests from CMake do the following: * Pass `-DTEST_EXTRA_ARGS=--disable-capture` or `-DTEST_EXTRA_ARGS=--capture-interface=<interface>` as needed for your system. -* Build the “test” target or run ctest, e.g. `ctest --jobs=4 --verbose`. +* Build the “test” target or run ctest, e.g. `ctest --force-new-ctest-process -j 4 --verbose`. + +On Windows, “ctest” requires a build configuration parameter, e.g. +`ctest -C RelWithDebInfo --force-new-ctest-process -j 4 --verbose`. To run all tests directly, run `test.py -p /path/to/wireshark-build/run-directory <capture args>`. @@ -168,3 +171,7 @@ class case_decrypt_80211(subprocesstest.SubprocessTestCase): self.assertTrue(self.grepOutput('favicon.ico')) ---- +Tests can be run in parallel. This means that any files you create must +be unique for each test. “subprocesstest.filename_from_id” can be used +to generate a filename based on the current test name. It also ensures +that the file will be automatically removed after the test has run. diff --git a/test/baseline/ff-ts-usec-pcap-direct.txt b/test/baseline/ff-ts-usec-pcap-direct.txt new file mode 100644 index 0000000000..f4ac417f77 --- /dev/null +++ b/test/baseline/ff-ts-usec-pcap-direct.txt @@ -0,0 +1,4 @@ +1 1102274184.317453000 0.000000000 +2 1102274184.317748000 0.000295000 +3 1102274184.387484000 0.069736000 +4 1102274184.387798000 0.000314000 diff --git a/test/config.py b/test/config.py index e767a4aa10..5b34d13f40 100644 --- a/test/config.py +++ b/test/config.py @@ -1,4 +1,5 @@ # +# -*- coding: utf-8 -*- # Wireshark tests # By Gerald Combs <gerald@wireshark.org> # @@ -16,10 +17,11 @@ import sys import tempfile commands = ( + 'capinfos', 'dumpcap', + 'rawshark', 'tshark', 'wireshark', - 'capinfos', ) can_capture = False @@ -27,10 +29,11 @@ capture_interface = None # Our executables # Strings -cmd_tshark = None +cmd_capinfos = None cmd_dumpcap = None +cmd_rawshark = None +cmd_tshark = None cmd_wireshark = None -cmd_capinfos = None # Arrays args_ping = None diff --git a/test/subprocesstest.py b/test/subprocesstest.py index 18313db795..357104b461 100644 --- a/test/subprocesstest.py +++ b/test/subprocesstest.py @@ -1,4 +1,5 @@ # +# -*- coding: utf-8 -*- # Wireshark tests # By Gerald Combs <gerald@wireshark.org> # @@ -8,6 +9,8 @@ # '''Subprocess test case superclass''' +import config +import difflib import io import os import os.path @@ -25,6 +28,32 @@ import unittest if sys.version_info[0] >= 3: process_timeout = 300 # Seconds +def capture_command(cmd, *args, **kwargs): + '''Convert the supplied arguments into a command suitable for SubprocessTestCase. + + If shell is true, return a string. Otherwise, return a list of arguments.''' + shell = kwargs.pop('shell', False) + if shell: + cap_cmd = ['"' + cmd + '"'] + else: + cap_cmd = [cmd] + if cmd == config.cmd_wireshark: + cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k') + cap_cmd += args + if shell: + return ' '.join(cap_cmd) + else: + return cap_cmd + +def cat_dhcp_command(mode): + '''Create a command string for dumping dhcp.pcap to stdout''' + # XXX Do this in Python in a thread? + sd_cmd = '' + if sys.executable: + sd_cmd = sys.executable + ' ' + sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode) + return sd_cmd + class LoggingPopen(subprocess.Popen): '''Run a process using subprocess.Popen. Capture and log its output. @@ -104,7 +133,10 @@ class SubprocessTestCase(unittest.TestCase): def filename_from_id(self, filename): '''Generate a filename prefixed with our test ID.''' - return self.id() + '.' + filename + id_filename = self.id() + '.' + filename + if id_filename not in self.cleanup_files: + self.cleanup_files.append(id_filename) + return id_filename def kill_processes(self): '''Kill any processes we've opened so far''' @@ -158,6 +190,18 @@ class SubprocessTestCase(unittest.TestCase): pass self.cleanup_files = [] + def checkPacketCount(self, num_packets, cap_file=None): + got_num_packets = False + if not cap_file: + cap_file = self.filename_from_id('testout.pcap') + self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file)) + capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file))) + self.log_fd_write_bytes(capinfos_testout) + count_pat = 'Number of packets:\s+{}'.format(num_packets) + if re.search(count_pat, capinfos_testout): + got_num_packets = True + self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets)) + def countOutput(self, search_pat, proc=None): '''Returns the number of output lines (search_pat=None), otherwise returns a match count.''' match_count = 0 @@ -174,6 +218,23 @@ class SubprocessTestCase(unittest.TestCase): def grepOutput(self, search_pat, proc=None): return self.countOutput(search_pat, proc) > 0 + def diffOutput(self, blob_a, blob_b, *args, **kwargs): + '''Check for differences between blob_a and blob_b. Return False and log a unified diff if they differ. + + blob_a and blob_b must be UTF-8 strings.''' + lines_a = blob_a.splitlines() + lines_b = blob_b.splitlines() + diff = '\n'.join(list(difflib.unified_diff(lines_a, lines_b, *args, **kwargs))) + if len(diff) > 0: + if sys.version_info[0] < 3 and not isinstance(diff, unicode): + diff = unicode(diff, 'UTF-8', 'replace') + self.log_fd.flush() + self.log_fd.write(u'-- Begin diff output --\n') + self.log_fd.writelines(diff) + self.log_fd.write(u'-- End diff output --\n') + return False + return True + def startProcess(self, proc_args, env=None, shell=False): '''Start a process in the background. Returns a subprocess.Popen object. You typically wait for it using waitProcess() or assertWaitProcess().''' proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd) diff --git a/test/suite_capture.py b/test/suite_capture.py index ff6804f5df..d76e2e24e0 100644 --- a/test/suite_capture.py +++ b/test/suite_capture.py @@ -1,4 +1,5 @@ # +# -*- coding: utf-8 -*- # Wireshark tests # By Gerald Combs <gerald@wireshark.org> # @@ -24,28 +25,6 @@ snapshot_len = 96 capture_env = os.environ.copy() capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True' -def capture_command(cmd, *args, **kwargs): - shell = kwargs.pop('shell', False) - if shell: - cap_cmd = ['"' + cmd + '"'] - else: - cap_cmd = [cmd] - if cmd == config.cmd_wireshark: - cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k') - cap_cmd += args - if shell: - return ' '.join(cap_cmd) - else: - return cap_cmd - -def slow_dhcp_command(): - # XXX Do this in Python in a thread? - sd_cmd = '' - if sys.executable: - sd_cmd = '"{}" '.format(sys.executable) - sd_cmd += os.path.join(config.this_dir, 'util_slow_dhcp_pcap.py') - return sd_cmd - def start_pinging(self): ping_procs = [] if sys.platform.startswith('win32'): @@ -61,19 +40,8 @@ def stop_pinging(ping_procs): for proc in ping_procs: proc.kill() -def check_testout_num_packets(self, num_packets, cap_file=None): - got_num_packets = False - if not cap_file: - cap_file = self.filename_from_id(testout_pcap) - self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file)) - capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file))) - self.log_fd_write_bytes(capinfos_testout) - count_pat = 'Number of packets:\s+{}'.format(num_packets) - if re.search(count_pat, capinfos_testout): - got_num_packets = True - self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets)) - def check_capture_10_packets(self, cmd=None, to_stdout=False): + # Similar to suite_io.check_io_4_packets. if not config.canCapture(): self.skipTest('Test requires capture privileges and an interface.') if cmd == config.cmd_wireshark and not config.canDisplay(): @@ -84,7 +52,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False): testout_file = self.filename_from_id(testout_pcap) ping_procs = start_pinging(self) if to_stdout: - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', '"{}"'.format(config.capture_interface), '-p', '-w', '-', @@ -98,7 +66,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False): shell=True ) else: - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', config.capture_interface, '-p', '-w', testout_file, @@ -110,13 +78,12 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False): ) capture_returncode = capture_proc.returncode stop_pinging(ping_procs) - self.cleanup_files.append(testout_file) if capture_returncode != 0: self.log_fd.write('{} -D output:\n'.format(cmd)) self.runProcess((cmd, '-D')) self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): - check_testout_num_packets(self, 10) + self.checkPacketCount(10) def check_capture_fifo(self, cmd=None): if not config.canMkfifo(): @@ -127,18 +94,17 @@ def check_capture_fifo(self, cmd=None): capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') testout_file = self.filename_from_id(testout_pcap) fifo_file = self.filename_from_id('testout.fifo') - self.cleanup_files.append(fifo_file) try: # If a previous test left its fifo laying around, e.g. from a failure, remove it. os.unlink(fifo_file) except: pass os.mkfifo(fifo_file) - slow_dhcp_cmd = slow_dhcp_command() + slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') fifo_proc = self.startProcess( ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), shell=True) - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', fifo_file, '-p', '-w', testout_file, @@ -146,22 +112,22 @@ def check_capture_fifo(self, cmd=None): ), env=capture_env ) - self.cleanup_files.append(testout_file) fifo_proc.kill() self.assertTrue(os.path.isfile(testout_file)) capture_returncode = capture_proc.returncode self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): - check_testout_num_packets(self, 8) + self.checkPacketCount(8) def check_capture_stdin(self, cmd=None): + # Similar to suite_io.check_io_4_packets. if cmd == config.cmd_wireshark and not config.canDisplay(): self.skipTest('Test requires a display.') self.assertIsNotNone(cmd) capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') testout_file = self.filename_from_id(testout_pcap) - slow_dhcp_cmd = slow_dhcp_command() - capture_cmd = capture_command(cmd, + slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') + capture_cmd = subprocesstest.capture_command(cmd, '-i', '-', '-w', testout_file, '-a', 'duration:{}'.format(capture_duration), @@ -170,7 +136,6 @@ def check_capture_stdin(self, cmd=None): if cmd == config.cmd_wireshark: capture_cmd += ' -o console.log.level:127' pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True) - self.cleanup_files.append(testout_file) pipe_returncode = pipe_proc.returncode self.assertEqual(pipe_returncode, 0) if cmd == config.cmd_wireshark: @@ -179,7 +144,7 @@ def check_capture_stdin(self, cmd=None): self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.') self.assertTrue(os.path.isfile(testout_file)) if (pipe_returncode == 0): - check_testout_num_packets(self, 8) + self.checkPacketCount(8) def check_capture_2multi_10packets(self, cmd=None): # This was present in the Bash version but was incorrect and not part of any suite. @@ -196,7 +161,7 @@ def check_capture_read_filter(self, cmd=None): self.assertIsNotNone(cmd) ping_procs = start_pinging(self) testout_file = self.filename_from_id(testout_pcap) - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', config.capture_interface, '-p', '-w', testout_file, @@ -210,11 +175,10 @@ def check_capture_read_filter(self, cmd=None): ) capture_returncode = capture_proc.returncode stop_pinging(ping_procs) - self.cleanup_files.append(testout_file) self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): - check_testout_num_packets(self, 0) + self.checkPacketCount(0) def check_capture_snapshot_len(self, cmd=None): if not config.canCapture(): @@ -226,7 +190,7 @@ def check_capture_snapshot_len(self, cmd=None): self.assertIsNotNone(cmd) ping_procs = start_pinging(self) testout_file = self.filename_from_id(testout_pcap) - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', config.capture_interface, '-p', '-w', testout_file, @@ -238,7 +202,6 @@ def check_capture_snapshot_len(self, cmd=None): ) capture_returncode = capture_proc.returncode stop_pinging(ping_procs) - self.cleanup_files.append(testout_file) self.assertEqual(capture_returncode, 0) self.assertTrue(os.path.isfile(testout_file)) @@ -251,10 +214,9 @@ def check_capture_snapshot_len(self, cmd=None): '-Y', 'frame.cap_len>{}'.format(snapshot_len), )) filter_returncode = filter_proc.returncode - self.cleanup_files.append(testout2_file) self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): - check_testout_num_packets(self, 0, cap_file=testout2_file) + self.checkPacketCount(0, cap_file=testout2_file) class case_wireshark_capture(subprocesstest.SubprocessTestCase): def test_wireshark_capture_10_packets_to_file(self): diff --git a/test/suite_clopts.py b/test/suite_clopts.py index 3516d5fa36..a5312c4ad5 100644 --- a/test/suite_clopts.py +++ b/test/suite_clopts.py @@ -1,4 +1,5 @@ # +# -*- coding: utf-8 -*- # Wireshark tests # By Gerald Combs <gerald@wireshark.org> # @@ -17,6 +18,7 @@ import unittest #glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs') glossaries = ('decodes', 'values') +testout_pcap = 'testout.pcap' class case_dumpcap_invalid_chars(subprocesstest.SubprocessTestCase): # XXX Should we generate individual test functions instead of looping? @@ -51,7 +53,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase): self.skipTest('Test requires capture privileges and an interface.') invalid_filter = '__invalid_protocol' # $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1 - self.runProcess((config.cmd_dumpcap, '-f', invalid_filter, '-w', 'testout.pcap' )) + testout_file = self.filename_from_id(testout_pcap) + self.runProcess((config.cmd_dumpcap, '-f', invalid_filter, '-w', testout_file )) self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface')) def test_dumpcap_invalid_interface_name(self): @@ -60,7 +63,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase): self.skipTest('Test requires capture privileges and an interface.') invalid_interface = '__invalid_interface' # $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1 - self.runProcess((config.cmd_dumpcap, '-i', invalid_interface, '-w', 'testout.pcap')) + testout_file = self.filename_from_id(testout_pcap) + self.runProcess((config.cmd_dumpcap, '-i', invalid_interface, '-w', testout_file)) self.assertTrue(self.grepOutput('The capture session could not be initiated')) def test_dumpcap_invalid_interface_index(self): @@ -69,7 +73,8 @@ class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase): self.skipTest('Test requires capture privileges and an interface.') invalid_index = '0' # $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1 - self.runProcess((config.cmd_dumpcap, '-i', invalid_index, '-w', 'testout.pcap')) + testout_file = self.filename_from_id(testout_pcap) + self.runProcess((config.cmd_dumpcap, '-i', invalid_index, '-w', testout_file)) self.assertTrue(self.grepOutput('There is no interface with that adapter index')) @@ -119,7 +124,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase): self.skipTest('Test requires capture privileges and an interface.') invalid_filter = '__invalid_protocol' # $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1 - self.runProcess((config.cmd_tshark, '-f', invalid_filter, '-w', 'testout.pcap' )) + testout_file = self.filename_from_id(testout_pcap) + self.runProcess((config.cmd_tshark, '-f', invalid_filter, '-w', testout_file )) self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface')) def test_tshark_invalid_interface_name(self): @@ -128,7 +134,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase): self.skipTest('Test requires capture privileges and an interface.') invalid_interface = '__invalid_interface' # $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1 - self.runProcess((config.cmd_tshark, '-i', invalid_interface, '-w', 'testout.pcap')) + testout_file = self.filename_from_id(testout_pcap) + self.runProcess((config.cmd_tshark, '-i', invalid_interface, '-w', testout_file)) self.assertTrue(self.grepOutput('The capture session could not be initiated')) def test_tshark_invalid_interface_index(self): @@ -137,7 +144,8 @@ class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase): self.skipTest('Test requires capture privileges and an interface.') invalid_index = '0' # $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1 - self.runProcess((config.cmd_tshark, '-i', invalid_index, '-w', 'testout.pcap')) + testout_file = self.filename_from_id(testout_pcap) + self.runProcess((config.cmd_tshark, '-i', invalid_index, '-w', testout_file)) self.assertTrue(self.grepOutput('There is no interface with that adapter index')) diff --git a/test/suite_decryption.py b/test/suite_decryption.py index 5b0bc77b83..e5092a19c5 100644 --- a/test/suite_decryption.py +++ b/test/suite_decryption.py @@ -1,4 +1,5 @@ # +# -*- coding: utf-8 -*- # Wireshark tests # By Gerald Combs <gerald@wireshark.org> # diff --git a/test/suite_dissection.py b/test/suite_dissection.py index cfdab9253b..b81d048b53 100644 --- a/test/suite_dissection.py +++ b/test/suite_dissection.py @@ -1,4 +1,5 @@ # +# -*- coding: utf-8 -*- # Wireshark tests # By Gerald Combs <gerald@wireshark.org> # diff --git a/test/suite_fileformats.py b/test/suite_fileformats.py new file mode 100644 index 0000000000..6ebf88d2fb --- /dev/null +++ b/test/suite_fileformats.py @@ -0,0 +1,112 @@ +# +# -*- coding: utf-8 -*- +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''File format conversion tests''' + +import config +import io +import os.path +import subprocesstest +import sys +import unittest + +# XXX Currently unused. It would be nice to be able to use this below. +time_output_args = ('-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta') + +# Microsecond pcap, direct read was used to generate the baseline: +# tshark -Tfields -e frame.number -e frame.time_epoch -e frame.time_delta \ +# -r captures/dhcp.pcap > baseline/ff-ts-usec-pcap-direct.txt +baseline_file = 'ff-ts-usec-pcap-direct.txt' +baseline_fd = io.open(os.path.join(config.baseline_dir, baseline_file), 'r', encoding='UTF-8', errors='replace') +baseline_str = baseline_fd.read() +baseline_fd.close() + +class case_fileformat_pcap(subprocesstest.SubprocessTestCase): + def test_pcap_usec_stdin(self): + '''Microsecond pcap direct vs microsecond pcap stdin''' + capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') + capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark, + '-r', '-', + '-Tfields', + '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', + '<', capture_file + , shell=True), + shell=True) + self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file)) + + def test_pcap_nsec_stdin(self): + '''Microsecond pcap direct vs nanosecond pcap stdin''' + capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcap') + capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark, + '-r', '-', + '-Tfields', + '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', + '<', capture_file + , shell=True), + shell=True) + self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file)) + + def test_pcap_nsec_direct(self): + '''Microsecond pcap direct vs nanosecond pcap direct''' + capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcap') + capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', + ), + ) + self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file)) + +class case_fileformat_pcapng(subprocesstest.SubprocessTestCase): + def test_pcapng_usec_stdin(self): + '''Microsecond pcap direct vs microsecond pcapng stdin''' + capture_file = os.path.join(config.capture_dir, 'dhcp.pcapng') + capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark, + '-r', '-', + '-Tfields', + '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta' + '<', capture_file + , shell=True), + shell=True) + self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file)) + + def test_pcapng_usec_direct(self): + '''Microsecond pcap direct vs microsecond pcapng direct''' + capture_file = os.path.join(config.capture_dir, 'dhcp.pcapng') + capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', + ), + ) + self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file)) + + def test_pcapng_nsec_stdin(self): + '''Microsecond pcap direct vs nanosecond pcapng stdin''' + capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcapng') + capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark, + '-r', '-', + '-Tfields', + '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta' + '<', capture_file + , shell=True), + shell=True) + self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file)) + + def test_pcapng_nsec_direct(self): + '''Microsecond pcap direct vs nanosecond pcapng direct''' + capture_file = os.path.join(config.capture_dir, 'dhcp-nanosecond.pcapng') + capture_proc = self.runProcess(subprocesstest.capture_command(config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', + ), + ) + self.assertTrue(self.diffOutput(capture_proc.stdout_str, baseline_str, 'tshark', baseline_file)) + diff --git a/test/suite_io.py b/test/suite_io.py new file mode 100644 index 0000000000..c8591ce69c --- /dev/null +++ b/test/suite_io.py @@ -0,0 +1,89 @@ +# +# -*- coding: utf-8 -*- +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''File I/O tests''' + +import config +import io +import os.path +import subprocesstest +import sys +import unittest + +testout_pcap = 'testout.pcap' +baseline_file = 'io-rawshark-dhcp-pcap.txt' +baseline_fd = io.open(os.path.join(config.baseline_dir, baseline_file), 'r', encoding='UTF-8', errors='replace') +baseline_str = baseline_fd.read() +baseline_fd.close() + +def check_io_4_packets(self, cmd=None, from_stdin=False, to_stdout=False): + # Test direct->direct, stdin->direct, and direct->stdout file I/O. + # Similar to suite_capture.check_capture_10_packets and + # suite_capture.check_capture_stdin. + if cmd == config.cmd_wireshark and not config.canDisplay(): + self.skipTest('Test requires a display.') + self.assertIsNotNone(cmd) + capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') + testout_file = self.filename_from_id(testout_pcap) + if from_stdin and to_stdout: + # XXX If we support this, should we bother with separate stdin->direct + # and direct->stdout tests? + self.fail('Stdin and stdout not supported in the same test.') + elif from_stdin: + # cat -B "${CAPTURE_DIR}dhcp.pcap" | $DUT -r - -w ./testout.pcap 2>./testout.txt + cat_dhcp_cmd = subprocesstest.cat_dhcp_command('cat') + stdin_cmd = '{0} | "{1}" -r - -w "{2}"'.format(cat_dhcp_cmd, cmd, testout_file) + io_proc = self.runProcess(stdin_cmd, shell=True) + elif to_stdout: + # $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w - > ./testout.pcap 2>./testout.txt + stdout_cmd = '"{0}" -r "{1}" -w - > "{2}"'.format(cmd, capture_file, testout_file) + io_proc = self.runProcess(stdout_cmd, shell=True) + else: # direct->direct + # $DUT -r "${CAPTURE_DIR}dhcp.pcap" -w ./testout.pcap > ./testout.txt 2>&1 + capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') + io_proc = self.runProcess(subprocesstest.capture_command(cmd, + '-r', capture_file, + '-w', testout_file, + )) + io_returncode = io_proc.returncode + self.assertEqual(io_returncode, 0) + self.assertTrue(os.path.isfile(testout_file)) + if (io_returncode == 0): + self.checkPacketCount(4) + +class case_tshark_io(subprocesstest.SubprocessTestCase): + def test_tshark_io_stdin_direct(self): + '''Read from stdin and write direct using TShark''' + check_io_4_packets(self, cmd=config.cmd_tshark, from_stdin=True) + + def test_tshark_io_direct_stdout(self): + '''Read direct and write to stdout using TShark''' + check_io_4_packets(self, cmd=config.cmd_tshark, to_stdout=True) + + def test_tshark_io_direct_direct(self): + '''Read direct and write direct using TShark''' + check_io_4_packets(self, cmd=config.cmd_tshark) + +# The Bash version didn't test Wireshark or dumpcap + +class case_rawshark_io(subprocesstest.SubprocessTestCase): + @unittest.skipUnless(sys.byteorder == 'little', 'Requires a little endian system') + def test_rawshark_io_stdin(self): + '''Read from stdin using Rawshark''' + # tail -c +25 "${CAPTURE_DIR}dhcp.pcap" | $RAWSHARK -dencap:1 -R "udp.port==68" -nr - > $IO_RAWSHARK_DHCP_PCAP_TESTOUT 2> /dev/null + # diff -u --strip-trailing-cr $IO_RAWSHARK_DHCP_PCAP_BASELINE $IO_RAWSHARK_DHCP_PCAP_TESTOUT > $DIFF_OUT 2>&1 + capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') + testout_file = self.filename_from_id(testout_pcap) + raw_dhcp_cmd = subprocesstest.cat_dhcp_command('raw') + rawshark_cmd = '{0} | "{1}" -r - -n -dencap:1 -R "udp.port==68"'.format(raw_dhcp_cmd, config.cmd_rawshark) + rawshark_proc = self.runProcess(rawshark_cmd, shell=True) + rawshark_returncode = rawshark_proc.returncode + self.assertEqual(rawshark_returncode, 0) + if (rawshark_returncode == 0): + self.assertTrue(self.diffOutput(rawshark_proc.stdout_str, baseline_str, 'rawshark', baseline_file)) diff --git a/test/test.py b/test/test.py index f53d90abaa..26e1a75239 100755 --- a/test/test.py +++ b/test/test.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- # # Wireshark tests # By Gerald Combs <gerald@wireshark.org> diff --git a/test/util_dump_dhcp_pcap.py b/test/util_dump_dhcp_pcap.py new file mode 100755 index 0000000000..2fbd7fa138 --- /dev/null +++ b/test/util_dump_dhcp_pcap.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Write captures/dhcp.pcap to stdout, optionally writing only packet records or writing them slowly.''' + +import argparse +import os +import os.path +import time +import sys + +def main(): + parser = argparse.ArgumentParser(description='Dump dhcp.pcap') + parser.add_argument('dump_type', choices=['cat', 'slow', 'raw'], + help='cat: Just dump the file. slow: Dump the file, pause, and dump its packet records. raw: Dump only the packet records.') + args = parser.parse_args() + + if sys.version_info[0] < 3 and sys.platform == "win32": + import msvcrt + msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) + + dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap') + + dhcp_fd = open(dhcp_pcap, 'rb') + contents = dhcp_fd.read() + if args.dump_type != 'raw': + os.write(1, contents) + if args.dump_type == 'cat': + sys.exit(0) + if args.dump_type == 'slow': + time.sleep(1.5) + # slow, raw + os.write(1, contents[24:]) + sys.exit(0) + +if __name__ == '__main__': + main() + diff --git a/test/util_slow_dhcp_pcap.py b/test/util_slow_dhcp_pcap.py deleted file mode 100644 index c0a34e6d54..0000000000 --- a/test/util_slow_dhcp_pcap.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python -# -# Wireshark tests -# By Gerald Combs <gerald@wireshark.org> -# -# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping -# -# SPDX-License-Identifier: GPL-2.0-or-later -# -'''Write captures/dhcp.pcap to stdout, pause 1.5 seconds, and write it again.''' - -import os -import os.path -import time -import sys - -if sys.version_info[0] < 3 and sys.platform == "win32": - import msvcrt - msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) - -dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap') - -dhcp_fd = open(dhcp_pcap, 'rb') -contents = dhcp_fd.read() -os.write(1, contents) -time.sleep(1.5) -os.write(1, contents[24:]) |