diff options
author | Gerald Combs <gerald@wireshark.org> | 2018-10-31 10:03:04 +0100 |
---|---|---|
committer | Anders Broman <a.broman58@gmail.com> | 2018-11-09 05:55:11 +0000 |
commit | 11a9a501fb004bd3259f457714670ffb6d3d21e9 (patch) | |
tree | cf1a39471dff0d4e9b8a8da493d243eb53588954 /test | |
parent | 9b6b36beaeb2f58a209943d95c05486b72c6907f (diff) |
Dumpcap+Qt: Add support for `-a packets:NUM` and `-b packets:NUM`.
Add the ability to rotate files after a specified number of packets (`-b
packets:NUM`). Move some condition checks to capture_loop_write_packet_cb.
Add `-a packets:NUM` in order to be consistent. It is functionally
equivalent to the `-c` flag.
Add a corresponding "packets" option to the Capture Interfaces dialog
Output tab.
Add initial tests for autostop and ringbuffer conditions.
Change-Id: I66eb968927ed287deb8edb96db96d7c73526c257
Reviewed-on: https://code.wireshark.org/review/30534
Reviewed-by: Gerald Combs <gerald@wireshark.org>
Petri-Dish: Gerald Combs <gerald@wireshark.org>
Tested-by: Petri Dish Buildbot
Reviewed-by: Anders Broman <a.broman58@gmail.com>
Diffstat (limited to 'test')
-rw-r--r-- | test/suite_capture.py | 108 | ||||
-rwxr-xr-x | test/util_dump_dhcp_pcap.py | 11 |
2 files changed, 111 insertions, 8 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py index cdb78b4a2f..eb1a4ffcab 100644 --- a/test/suite_capture.py +++ b/test/suite_capture.py @@ -10,6 +10,7 @@ '''Capture tests''' import config +import glob import os import re import subprocess @@ -17,6 +18,7 @@ import subprocesstest import sys import time import unittest +import uuid capture_duration = 5 @@ -139,11 +141,6 @@ def check_capture_stdin(self, cmd=None): if (pipe_returncode == 0): self.checkPacketCount(8) -def check_capture_2multi_10packets(self, cmd=None): - # This was present in the Bash version but was incorrect and not part of any suite. - # It's apparently intended to test file rotation. - self.skipTest('Not yet implemented') - def check_capture_read_filter(self, cmd=None): if not config.canCapture(): self.skipTest('Test requires capture privileges and an interface.') @@ -207,6 +204,86 @@ def check_capture_snapshot_len(self, cmd=None): if (capture_returncode == 0): self.checkPacketCount(0, cap_file=testout2_file) +def check_dumpcap_autostop_stdin(self, packets=None, filesize=None): + # Similar to check_capture_stdin. + cmd = config.cmd_dumpcap + capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') + testout_file = self.filename_from_id(testout_pcap) + cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') + condition='oops:invalid' + + self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') + self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') + + if packets is not None: + condition = 'packets:{}'.format(packets) + elif filesize is not None: + condition = 'filesize:{}'.format(filesize) + + capture_cmd = subprocesstest.capture_command(cmd, + '-i', '-', + '-w', testout_file, + '-a', condition, + shell=True + ) + pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + self.assertTrue(os.path.isfile(testout_file)) + if (pipe_returncode != 0): + return + + if packets is not None: + self.checkPacketCount(packets) + elif filesize is not None: + capturekb = os.path.getsize(testout_file) / 1000 + self.assertGreaterEqual(capturekb, filesize) + +def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None): + # Similar to check_capture_stdin. + cmd = config.cmd_dumpcap + capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') + rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID + testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique) + testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique) + cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') + condition='oops:invalid' + + self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') + self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') + + if packets is not None: + condition = 'packets:{}'.format(packets) + elif filesize is not None: + condition = 'filesize:{}'.format(filesize) + + capture_cmd = subprocesstest.capture_command(cmd, + '-i', '-', + '-w', testout_file, + '-a', 'files:2', + '-b', condition, + shell=True + ) + pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + if (pipe_returncode != 0): + return + + rb_files = glob.glob(testout_glob) + for rbf in rb_files: + self.cleanup_files.append(rbf) + + self.assertEqual(len(rb_files), 2) + + for rbf in rb_files: + self.assertTrue(os.path.isfile(rbf)) + if packets is not None: + self.checkPacketCount(packets, cap_file=rbf) + elif filesize is not None: + capturekb = os.path.getsize(rbf) / 1000 + self.assertGreaterEqual(capturekb, filesize) + class case_wireshark_capture(subprocesstest.SubprocessTestCase): def test_wireshark_capture_10_packets_to_file(self): '''Capture 10 packets from the network to a file using Wireshark''' @@ -270,3 +347,24 @@ class case_dumpcap_capture(subprocesstest.SubprocessTestCase): def test_dumpcap_capture_snapshot_len(self): '''Capture truncated packets using Dumpcap''' check_capture_snapshot_len(self, cmd=config.cmd_dumpcap) + +class case_dumpcap_autostop(subprocesstest.SubprocessTestCase): + # duration, filesize, packets, files + def test_dumpcap_autostop_filesize(self): + '''Capture from stdin using Dumpcap until we reach a file size limit''' + check_dumpcap_autostop_stdin(self, filesize=15) + + def test_dumpcap_autostop_packets(self): + '''Capture from stdin using Dumpcap until we reach a packet limit''' + check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary. + +class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase): + # duration, interval, filesize, packets, files + # Need a function that finds ringbuffer file names. + def test_dumpcap_ringbuffer_filesize(self): + '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit''' + check_dumpcap_ringbuffer_stdin(self, filesize=15) + + def test_dumpcap_ringbuffer_packets(self): + '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit''' + check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary. diff --git a/test/util_dump_dhcp_pcap.py b/test/util_dump_dhcp_pcap.py index 2e0b242818..28b3823767 100755 --- a/test/util_dump_dhcp_pcap.py +++ b/test/util_dump_dhcp_pcap.py @@ -17,8 +17,8 @@ import sys def main(): parser = argparse.ArgumentParser(description='Dump dhcp.pcap') - parser.add_argument('dump_type', choices=['cat', 'slow', 'raw'], - help='cat: Just dump the file. slow: Dump the file, pause, and dump its packet records. raw: Dump only the packet records.') + parser.add_argument('dump_type', choices=['cat', 'cat100', 'slow', 'raw'], + help='cat: Just dump the file. cat100: Dump 100 packet records. slow: Dump the file, pause, and dump its packet records. raw: Dump only the packet records.') args = parser.parse_args() dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap') @@ -27,12 +27,17 @@ def main(): contents = dhcp_fd.read() if args.dump_type != 'raw': os.write(1, contents) - if args.dump_type == 'cat': + if args.dump_type == 'cat100': + # The capture contains 4 packets. Write 96 more. + for _ in range(24): + os.write(1, contents[24:]) + if args.dump_type.startswith('cat'): sys.exit(0) if args.dump_type == 'slow': time.sleep(1.5) # slow, raw os.write(1, contents[24:]) + sys.exit(0) if __name__ == '__main__': |