aboutsummaryrefslogtreecommitdiffstats
path: root/test/suite_capture.py
diff options
context:
space:
mode:
authorGerald Combs <gerald@wireshark.org>2018-04-27 10:35:17 -0700
committerGerald Combs <gerald@wireshark.org>2018-04-27 19:52:04 +0000
commite6d129bf2382b9f0261c813c5b501db8911b254b (patch)
tree46d23bb44ca6621efbabb2ada0c65854b2d68b6a /test/suite_capture.py
parent9b3be1711f2a92ed4c59e32961d4d19b8f593d94 (diff)
Test: Add fileformats and I/O.
Add the fileformats and I/O suites. Move some more common code to subprocesstest.py and add a diffOutput method. Change-Id: I2ec34e46539022bdce78520645fdca6dfc1a8c1a Reviewed-on: https://code.wireshark.org/review/27183 Petri-Dish: Gerald Combs <gerald@wireshark.org> Tested-by: Petri Dish Buildbot Reviewed-by: Gerald Combs <gerald@wireshark.org>
Diffstat (limited to 'test/suite_capture.py')
-rw-r--r--test/suite_capture.py70
1 files changed, 16 insertions, 54 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py
index ff6804f5df..d76e2e24e0 100644
--- a/test/suite_capture.py
+++ b/test/suite_capture.py
@@ -1,4 +1,5 @@
#
+# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
@@ -24,28 +25,6 @@ snapshot_len = 96
capture_env = os.environ.copy()
capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
-def capture_command(cmd, *args, **kwargs):
- shell = kwargs.pop('shell', False)
- if shell:
- cap_cmd = ['"' + cmd + '"']
- else:
- cap_cmd = [cmd]
- if cmd == config.cmd_wireshark:
- cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k')
- cap_cmd += args
- if shell:
- return ' '.join(cap_cmd)
- else:
- return cap_cmd
-
-def slow_dhcp_command():
- # XXX Do this in Python in a thread?
- sd_cmd = ''
- if sys.executable:
- sd_cmd = '"{}" '.format(sys.executable)
- sd_cmd += os.path.join(config.this_dir, 'util_slow_dhcp_pcap.py')
- return sd_cmd
-
def start_pinging(self):
ping_procs = []
if sys.platform.startswith('win32'):
@@ -61,19 +40,8 @@ def stop_pinging(ping_procs):
for proc in ping_procs:
proc.kill()
-def check_testout_num_packets(self, num_packets, cap_file=None):
- got_num_packets = False
- if not cap_file:
- cap_file = self.filename_from_id(testout_pcap)
- self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file))
- capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file)))
- self.log_fd_write_bytes(capinfos_testout)
- count_pat = 'Number of packets:\s+{}'.format(num_packets)
- if re.search(count_pat, capinfos_testout):
- got_num_packets = True
- self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets))
-
def check_capture_10_packets(self, cmd=None, to_stdout=False):
+ # Similar to suite_io.check_io_4_packets.
if not config.canCapture():
self.skipTest('Test requires capture privileges and an interface.')
if cmd == config.cmd_wireshark and not config.canDisplay():
@@ -84,7 +52,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
testout_file = self.filename_from_id(testout_pcap)
ping_procs = start_pinging(self)
if to_stdout:
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', '"{}"'.format(config.capture_interface),
'-p',
'-w', '-',
@@ -98,7 +66,7 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
shell=True
)
else:
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', config.capture_interface,
'-p',
'-w', testout_file,
@@ -110,13 +78,12 @@ def check_capture_10_packets(self, cmd=None, to_stdout=False):
)
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
- self.cleanup_files.append(testout_file)
if capture_returncode != 0:
self.log_fd.write('{} -D output:\n'.format(cmd))
self.runProcess((cmd, '-D'))
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
- check_testout_num_packets(self, 10)
+ self.checkPacketCount(10)
def check_capture_fifo(self, cmd=None):
if not config.canMkfifo():
@@ -127,18 +94,17 @@ def check_capture_fifo(self, cmd=None):
capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
testout_file = self.filename_from_id(testout_pcap)
fifo_file = self.filename_from_id('testout.fifo')
- self.cleanup_files.append(fifo_file)
try:
# If a previous test left its fifo laying around, e.g. from a failure, remove it.
os.unlink(fifo_file)
except:
pass
os.mkfifo(fifo_file)
- slow_dhcp_cmd = slow_dhcp_command()
+ slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
fifo_proc = self.startProcess(
('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
shell=True)
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', fifo_file,
'-p',
'-w', testout_file,
@@ -146,22 +112,22 @@ def check_capture_fifo(self, cmd=None):
),
env=capture_env
)
- self.cleanup_files.append(testout_file)
fifo_proc.kill()
self.assertTrue(os.path.isfile(testout_file))
capture_returncode = capture_proc.returncode
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
- check_testout_num_packets(self, 8)
+ self.checkPacketCount(8)
def check_capture_stdin(self, cmd=None):
+ # Similar to suite_io.check_io_4_packets.
if cmd == config.cmd_wireshark and not config.canDisplay():
self.skipTest('Test requires a display.')
self.assertIsNotNone(cmd)
capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
testout_file = self.filename_from_id(testout_pcap)
- slow_dhcp_cmd = slow_dhcp_command()
- capture_cmd = capture_command(cmd,
+ slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
+ capture_cmd = subprocesstest.capture_command(cmd,
'-i', '-',
'-w', testout_file,
'-a', 'duration:{}'.format(capture_duration),
@@ -170,7 +136,6 @@ def check_capture_stdin(self, cmd=None):
if cmd == config.cmd_wireshark:
capture_cmd += ' -o console.log.level:127'
pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True)
- self.cleanup_files.append(testout_file)
pipe_returncode = pipe_proc.returncode
self.assertEqual(pipe_returncode, 0)
if cmd == config.cmd_wireshark:
@@ -179,7 +144,7 @@ def check_capture_stdin(self, cmd=None):
self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
self.assertTrue(os.path.isfile(testout_file))
if (pipe_returncode == 0):
- check_testout_num_packets(self, 8)
+ self.checkPacketCount(8)
def check_capture_2multi_10packets(self, cmd=None):
# This was present in the Bash version but was incorrect and not part of any suite.
@@ -196,7 +161,7 @@ def check_capture_read_filter(self, cmd=None):
self.assertIsNotNone(cmd)
ping_procs = start_pinging(self)
testout_file = self.filename_from_id(testout_pcap)
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', config.capture_interface,
'-p',
'-w', testout_file,
@@ -210,11 +175,10 @@ def check_capture_read_filter(self, cmd=None):
)
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
- self.cleanup_files.append(testout_file)
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
- check_testout_num_packets(self, 0)
+ self.checkPacketCount(0)
def check_capture_snapshot_len(self, cmd=None):
if not config.canCapture():
@@ -226,7 +190,7 @@ def check_capture_snapshot_len(self, cmd=None):
self.assertIsNotNone(cmd)
ping_procs = start_pinging(self)
testout_file = self.filename_from_id(testout_pcap)
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
'-i', config.capture_interface,
'-p',
'-w', testout_file,
@@ -238,7 +202,6 @@ def check_capture_snapshot_len(self, cmd=None):
)
capture_returncode = capture_proc.returncode
stop_pinging(ping_procs)
- self.cleanup_files.append(testout_file)
self.assertEqual(capture_returncode, 0)
self.assertTrue(os.path.isfile(testout_file))
@@ -251,10 +214,9 @@ def check_capture_snapshot_len(self, cmd=None):
'-Y', 'frame.cap_len>{}'.format(snapshot_len),
))
filter_returncode = filter_proc.returncode
- self.cleanup_files.append(testout2_file)
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
- check_testout_num_packets(self, 0, cap_file=testout2_file)
+ self.checkPacketCount(0, cap_file=testout2_file)
class case_wireshark_capture(subprocesstest.SubprocessTestCase):
def test_wireshark_capture_10_packets_to_file(self):