diff options
author | Gerald Combs <gerald@wireshark.org> | 2018-04-27 10:35:17 -0700 |
---|---|---|
committer | Gerald Combs <gerald@wireshark.org> | 2018-04-27 19:52:04 +0000 |
commit | e6d129bf2382b9f0261c813c5b501db8911b254b (patch) | |
tree | 46d23bb44ca6621efbabb2ada0c65854b2d68b6a /test/suite_capture.py | |
parent | 9b3be1711f2a92ed4c59e32961d4d19b8f593d94 (diff) |
Test: Add fileformats and I/O.
Add the fileformats and I/O suites. Move some more common code to
subprocesstest.py and add a diffOutput method.
Change-Id: I2ec34e46539022bdce78520645fdca6dfc1a8c1a
Reviewed-on: https://code.wireshark.org/review/27183
Petri-Dish: Gerald Combs <gerald@wireshark.org>
Tested-by: Petri Dish Buildbot
Reviewed-by: Gerald Combs <gerald@wireshark.org>
Diffstat (limited to 'test/suite_capture.py')
-rw-r--r-- | test/suite_capture.py | 70 |
1 files changed, 16 insertions, 54 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py index ff6804f5df..d76e2e24e0 100644 --- a/test/suite_capture.py +++ b/test/suite_capture.py @@ -1,4 +1,5 @@ # +# -*- coding: utf-8 -*- # Wireshark tests # By Gerald Combs <gerald@wireshark.org> # @@ -24,28 +25,6 @@ snapshot_len = 96 capture_env = os.environ.copy() capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True' -def capture_command(cmd, *args, **kwargs): - shell = kwargs.pop('shell', False) - if shell: - cap_cmd = ['"' + cmd + '"'] - else: - cap_cmd = [cmd] - if cmd == config.cmd_wireshark: - cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k') - cap_cmd += args - if shell: - return ' '.join(cap_cmd) - else: - return cap_cmd - -def slow_dhcp_command(): - # XXX Do this in Python in a thread? - sd_cmd = '' - if sys.executable: - sd_cmd = '"{}" '.format(sys.executable) - sd_cmd += os.path.join(config.this_dir, 'util_slow_dhcp_pcap.py') - return sd_cmd - def start_pinging(self): ping_procs = [] if sys.platform.startswith('win32'): @@ -61,19 +40,8 @@ def stop_pinging(ping_procs): for proc in ping_procs: proc.kill() -def check_testout_num_packets(self, num_packets, cap_file=None): - got_num_packets = False - if not cap_file: - cap_file = self.filename_from_id(testout_pcap) - self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file)) - capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file))) - self.log_fd_write_bytes(capinfos_testout) - count_pat = 'Number of packets:\s+{}'.format(num_packets) - if re.search(count_pat, capinfos_testout): - got_num_packets = True - self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets)) - def check_capture_10_packets(self, cmd=None, to_stdout=False): + # Similar to suite_io.check_io_4_packets. if not config.canCapture(): self.skipTest('Test requires capture privileges and an interface.') if cmd == config.cmd_wireshark and not config.canDisplay(): @@ -84,7 +52,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False): testout_file = self.filename_from_id(testout_pcap) ping_procs = start_pinging(self) if to_stdout: - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', '"{}"'.format(config.capture_interface), '-p', '-w', '-', @@ -98,7 +66,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False): shell=True ) else: - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', config.capture_interface, '-p', '-w', testout_file, @@ -110,13 +78,12 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False): ) capture_returncode = capture_proc.returncode stop_pinging(ping_procs) - self.cleanup_files.append(testout_file) if capture_returncode != 0: self.log_fd.write('{} -D output:\n'.format(cmd)) self.runProcess((cmd, '-D')) self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): - check_testout_num_packets(self, 10) + self.checkPacketCount(10) def check_capture_fifo(self, cmd=None): if not config.canMkfifo(): @@ -127,18 +94,17 @@ def check_capture_fifo(self, cmd=None): capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') testout_file = self.filename_from_id(testout_pcap) fifo_file = self.filename_from_id('testout.fifo') - self.cleanup_files.append(fifo_file) try: # If a previous test left its fifo laying around, e.g. from a failure, remove it. os.unlink(fifo_file) except: pass os.mkfifo(fifo_file) - slow_dhcp_cmd = slow_dhcp_command() + slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') fifo_proc = self.startProcess( ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), shell=True) - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', fifo_file, '-p', '-w', testout_file, @@ -146,22 +112,22 @@ def check_capture_fifo(self, cmd=None): ), env=capture_env ) - self.cleanup_files.append(testout_file) fifo_proc.kill() self.assertTrue(os.path.isfile(testout_file)) capture_returncode = capture_proc.returncode self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): - check_testout_num_packets(self, 8) + self.checkPacketCount(8) def check_capture_stdin(self, cmd=None): + # Similar to suite_io.check_io_4_packets. if cmd == config.cmd_wireshark and not config.canDisplay(): self.skipTest('Test requires a display.') self.assertIsNotNone(cmd) capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') testout_file = self.filename_from_id(testout_pcap) - slow_dhcp_cmd = slow_dhcp_command() - capture_cmd = capture_command(cmd, + slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') + capture_cmd = subprocesstest.capture_command(cmd, '-i', '-', '-w', testout_file, '-a', 'duration:{}'.format(capture_duration), @@ -170,7 +136,6 @@ def check_capture_stdin(self, cmd=None): if cmd == config.cmd_wireshark: capture_cmd += ' -o console.log.level:127' pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True) - self.cleanup_files.append(testout_file) pipe_returncode = pipe_proc.returncode self.assertEqual(pipe_returncode, 0) if cmd == config.cmd_wireshark: @@ -179,7 +144,7 @@ def check_capture_stdin(self, cmd=None): self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.') self.assertTrue(os.path.isfile(testout_file)) if (pipe_returncode == 0): - check_testout_num_packets(self, 8) + self.checkPacketCount(8) def check_capture_2multi_10packets(self, cmd=None): # This was present in the Bash version but was incorrect and not part of any suite. @@ -196,7 +161,7 @@ def check_capture_read_filter(self, cmd=None): self.assertIsNotNone(cmd) ping_procs = start_pinging(self) testout_file = self.filename_from_id(testout_pcap) - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', config.capture_interface, '-p', '-w', testout_file, @@ -210,11 +175,10 @@ def check_capture_read_filter(self, cmd=None): ) capture_returncode = capture_proc.returncode stop_pinging(ping_procs) - self.cleanup_files.append(testout_file) self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): - check_testout_num_packets(self, 0) + self.checkPacketCount(0) def check_capture_snapshot_len(self, cmd=None): if not config.canCapture(): @@ -226,7 +190,7 @@ def check_capture_snapshot_len(self, cmd=None): self.assertIsNotNone(cmd) ping_procs = start_pinging(self) testout_file = self.filename_from_id(testout_pcap) - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = self.runProcess(subprocesstest.capture_command(cmd, '-i', config.capture_interface, '-p', '-w', testout_file, @@ -238,7 +202,6 @@ def check_capture_snapshot_len(self, cmd=None): ) capture_returncode = capture_proc.returncode stop_pinging(ping_procs) - self.cleanup_files.append(testout_file) self.assertEqual(capture_returncode, 0) self.assertTrue(os.path.isfile(testout_file)) @@ -251,10 +214,9 @@ def check_capture_snapshot_len(self, cmd=None): '-Y', 'frame.cap_len>{}'.format(snapshot_len), )) filter_returncode = filter_proc.returncode - self.cleanup_files.append(testout2_file) self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): - check_testout_num_packets(self, 0, cap_file=testout2_file) + self.checkPacketCount(0, cap_file=testout2_file) class case_wireshark_capture(subprocesstest.SubprocessTestCase): def test_wireshark_capture_10_packets_to_file(self): |