aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorGerald Combs <gerald@wireshark.org>2018-10-31 10:03:04 +0100
committerAnders Broman <a.broman58@gmail.com>2018-11-09 05:55:11 +0000
commit11a9a501fb004bd3259f457714670ffb6d3d21e9 (patch)
treecf1a39471dff0d4e9b8a8da493d243eb53588954 /test
parent9b6b36beaeb2f58a209943d95c05486b72c6907f (diff)
Dumpcap+Qt: Add support for `-a packets:NUM` and `-b packets:NUM`.
Add the ability to rotate files after a specified number of packets (`-b packets:NUM`). Move some condition checks to capture_loop_write_packet_cb. Add `-a packets:NUM` in order to be consistent. It is functionally equivalent to the `-c` flag. Add a corresponding "packets" option to the Capture Interfaces dialog Output tab. Add initial tests for autostop and ringbuffer conditions. Change-Id: I66eb968927ed287deb8edb96db96d7c73526c257 Reviewed-on: https://code.wireshark.org/review/30534 Reviewed-by: Gerald Combs <gerald@wireshark.org> Petri-Dish: Gerald Combs <gerald@wireshark.org> Tested-by: Petri Dish Buildbot Reviewed-by: Anders Broman <a.broman58@gmail.com>
Diffstat (limited to 'test')
-rw-r--r--test/suite_capture.py108
-rwxr-xr-xtest/util_dump_dhcp_pcap.py11
2 files changed, 111 insertions, 8 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py
index cdb78b4a2f..eb1a4ffcab 100644
--- a/test/suite_capture.py
+++ b/test/suite_capture.py
@@ -10,6 +10,7 @@
'''Capture tests'''
import config
+import glob
import os
import re
import subprocess
@@ -17,6 +18,7 @@ import subprocesstest
import sys
import time
import unittest
+import uuid
capture_duration = 5
@@ -139,11 +141,6 @@ def check_capture_stdin(self, cmd=None):
if (pipe_returncode == 0):
self.checkPacketCount(8)
-def check_capture_2multi_10packets(self, cmd=None):
- # This was present in the Bash version but was incorrect and not part of any suite.
- # It's apparently intended to test file rotation.
- self.skipTest('Not yet implemented')
-
def check_capture_read_filter(self, cmd=None):
if not config.canCapture():
self.skipTest('Test requires capture privileges and an interface.')
@@ -207,6 +204,86 @@ def check_capture_snapshot_len(self, cmd=None):
if (capture_returncode == 0):
self.checkPacketCount(0, cap_file=testout2_file)
+def check_dumpcap_autostop_stdin(self, packets=None, filesize=None):
+ # Similar to check_capture_stdin.
+ cmd = config.cmd_dumpcap
+ capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+ testout_file = self.filename_from_id(testout_pcap)
+ cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
+ condition='oops:invalid'
+
+ self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
+ self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
+
+ if packets is not None:
+ condition = 'packets:{}'.format(packets)
+ elif filesize is not None:
+ condition = 'filesize:{}'.format(filesize)
+
+ capture_cmd = subprocesstest.capture_command(cmd,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', condition,
+ shell=True
+ )
+ pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_returncode = pipe_proc.returncode
+ self.assertEqual(pipe_returncode, 0)
+ self.assertTrue(os.path.isfile(testout_file))
+ if (pipe_returncode != 0):
+ return
+
+ if packets is not None:
+ self.checkPacketCount(packets)
+ elif filesize is not None:
+ capturekb = os.path.getsize(testout_file) / 1000
+ self.assertGreaterEqual(capturekb, filesize)
+
+def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None):
+ # Similar to check_capture_stdin.
+ cmd = config.cmd_dumpcap
+ capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
+ rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
+ testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
+ testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
+ cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
+ condition='oops:invalid'
+
+ self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
+ self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')
+
+ if packets is not None:
+ condition = 'packets:{}'.format(packets)
+ elif filesize is not None:
+ condition = 'filesize:{}'.format(filesize)
+
+ capture_cmd = subprocesstest.capture_command(cmd,
+ '-i', '-',
+ '-w', testout_file,
+ '-a', 'files:2',
+ '-b', condition,
+ shell=True
+ )
+ pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_returncode = pipe_proc.returncode
+ self.assertEqual(pipe_returncode, 0)
+ if (pipe_returncode != 0):
+ return
+
+ rb_files = glob.glob(testout_glob)
+ for rbf in rb_files:
+ self.cleanup_files.append(rbf)
+
+ self.assertEqual(len(rb_files), 2)
+
+ for rbf in rb_files:
+ self.assertTrue(os.path.isfile(rbf))
+ if packets is not None:
+ self.checkPacketCount(packets, cap_file=rbf)
+ elif filesize is not None:
+ capturekb = os.path.getsize(rbf) / 1000
+ self.assertGreaterEqual(capturekb, filesize)
+
class case_wireshark_capture(subprocesstest.SubprocessTestCase):
def test_wireshark_capture_10_packets_to_file(self):
'''Capture 10 packets from the network to a file using Wireshark'''
@@ -270,3 +347,24 @@ class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
def test_dumpcap_capture_snapshot_len(self):
'''Capture truncated packets using Dumpcap'''
check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)
+
+class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
+ # duration, filesize, packets, files
+ def test_dumpcap_autostop_filesize(self):
+ '''Capture from stdin using Dumpcap until we reach a file size limit'''
+ check_dumpcap_autostop_stdin(self, filesize=15)
+
+ def test_dumpcap_autostop_packets(self):
+ '''Capture from stdin using Dumpcap until we reach a packet limit'''
+ check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.
+
+class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
+ # duration, interval, filesize, packets, files
+ # Need a function that finds ringbuffer file names.
+ def test_dumpcap_ringbuffer_filesize(self):
+ '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
+ check_dumpcap_ringbuffer_stdin(self, filesize=15)
+
+ def test_dumpcap_ringbuffer_packets(self):
+ '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
+ check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.
diff --git a/test/util_dump_dhcp_pcap.py b/test/util_dump_dhcp_pcap.py
index 2e0b242818..28b3823767 100755
--- a/test/util_dump_dhcp_pcap.py
+++ b/test/util_dump_dhcp_pcap.py
@@ -17,8 +17,8 @@ import sys
def main():
parser = argparse.ArgumentParser(description='Dump dhcp.pcap')
- parser.add_argument('dump_type', choices=['cat', 'slow', 'raw'],
- help='cat: Just dump the file. slow: Dump the file, pause, and dump its packet records. raw: Dump only the packet records.')
+ parser.add_argument('dump_type', choices=['cat', 'cat100', 'slow', 'raw'],
+ help='cat: Just dump the file. cat100: Dump 100 packet records. slow: Dump the file, pause, and dump its packet records. raw: Dump only the packet records.')
args = parser.parse_args()
dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap')
@@ -27,12 +27,17 @@ def main():
contents = dhcp_fd.read()
if args.dump_type != 'raw':
os.write(1, contents)
- if args.dump_type == 'cat':
+ if args.dump_type == 'cat100':
+ # The capture contains 4 packets. Write 96 more.
+ for _ in range(24):
+ os.write(1, contents[24:])
+ if args.dump_type.startswith('cat'):
sys.exit(0)
if args.dump_type == 'slow':
time.sleep(1.5)
# slow, raw
os.write(1, contents[24:])
+
sys.exit(0)
if __name__ == '__main__':