diff options
Diffstat (limited to 'test/suite_capture.py')
-rw-r--r-- | test/suite_capture.py | 565 |
1 files changed, 299 insertions, 266 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py index eb1a4ffcab..a9ac1aa9c0 100644 --- a/test/suite_capture.py +++ b/test/suite_capture.py @@ -12,13 +12,12 @@ import config import glob import os -import re import subprocess import subprocesstest import sys import time -import unittest import uuid +import fixtures capture_duration = 5 @@ -40,331 +39,365 @@ def stop_pinging(ping_procs): for proc in ping_procs: proc.kill() -def check_capture_10_packets(self, cmd=None, to_stdout=False): - # Similar to suite_io.check_io_4_packets. - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) - self.assertIsNotNone(cmd) - testout_file = self.filename_from_id(testout_pcap) - ping_procs = start_pinging(self) - if to_stdout: - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', '"{}"'.format(config.capture_interface), + +@fixtures.fixture(scope='session') +def wireshark_k(wireshark_command): + return tuple(list(wireshark_command) + ['-k']) + +def capture_command(*cmd_args, shell=False): + if type(cmd_args[0]) != str: + # Assume something like ['wireshark', '-k'] + cmd_args = list(cmd_args[0]) + list(cmd_args)[1:] + if shell: + cmd_args = ' '.join(cmd_args) + return cmd_args + + +@fixtures.fixture +def check_capture_10_packets(capture_interface, cmd_dumpcap): + def check_capture_10_packets_real(self, cmd=None, to_stdout=False): + # Similar to suite_io.check_io_4_packets. + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + testout_file = self.filename_from_id(testout_pcap) + ping_procs = start_pinging(self) + if to_stdout: + capture_proc = self.runProcess(capture_command(cmd, + '-i', '"{}"'.format(capture_interface), + '-p', + '-w', '-', + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', '"icmp || icmp6"', + '>', testout_file, + shell=True + ), + shell=True + ) + else: + capture_proc = self.runProcess(capture_command(cmd, + '-i', capture_interface, + '-p', + '-w', testout_file, + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', 'icmp || icmp6', + )) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + if capture_returncode != 0: + self.log_fd.write('{} -D output:\n'.format(cmd)) + self.runProcess((cmd, '-D')) + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + self.checkPacketCount(10) + return check_capture_10_packets_real + + +@fixtures.fixture +def check_capture_fifo(cmd_dumpcap): + if sys.platform == 'win32': + fixtures.skip('Test requires OS fifo support.') + + def check_capture_fifo_real(self, cmd=None): + self.assertIsNotNone(cmd) + testout_file = self.filename_from_id(testout_pcap) + fifo_file = self.filename_from_id('testout.fifo') + try: + # If a previous test left its fifo laying around, e.g. from a failure, remove it. + os.unlink(fifo_file) + except: + pass + os.mkfifo(fifo_file) + slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') + fifo_proc = self.startProcess( + ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), + shell=True) + capture_proc = self.runProcess(capture_command(cmd, + '-i', fifo_file, '-p', - '-w', '-', - '-c', '10', + '-w', testout_file, + '-a', 'duration:{}'.format(capture_duration), + )) + fifo_proc.kill() + self.assertTrue(os.path.isfile(testout_file)) + capture_returncode = capture_proc.returncode + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + self.checkPacketCount(8) + return check_capture_fifo_real + + +@fixtures.fixture +def check_capture_stdin(cmd_dumpcap): + # Capturing always requires dumpcap, hence the dependency on it. + def check_capture_stdin_real(self, cmd=None): + # Similar to suite_io.check_io_4_packets. + self.assertIsNotNone(cmd) + testout_file = self.filename_from_id(testout_pcap) + slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') + capture_cmd = capture_command(cmd, + '-i', '-', + '-w', testout_file, '-a', 'duration:{}'.format(capture_duration), - '-f', '"icmp || icmp6"', - '>', testout_file, shell=True - ), - shell=True ) - else: - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', config.capture_interface, + is_gui = type(cmd) != str and '-k' in cmd[0] + if is_gui: + capture_cmd += ' -o console.log.level:127' + pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + if is_gui: + self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.') + self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.') + self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.') + self.assertTrue(os.path.isfile(testout_file)) + if (pipe_returncode == 0): + self.checkPacketCount(8) + return check_capture_stdin_real + + +@fixtures.fixture +def check_capture_read_filter(capture_interface): + def check_capture_read_filter_real(self, cmd=None): + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + ping_procs = start_pinging(self) + testout_file = self.filename_from_id(testout_pcap) + capture_proc = self.runProcess(capture_command(cmd, + '-i', capture_interface, '-p', '-w', testout_file, + '-2', + '-R', 'dcerpc.cn_call_id==123456', # Something unlikely. '-c', '10', '-a', 'duration:{}'.format(capture_duration), '-f', 'icmp || icmp6', )) - capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) - if capture_returncode != 0: - self.log_fd.write('{} -D output:\n'.format(cmd)) - self.runProcess((cmd, '-D')) - self.assertEqual(capture_returncode, 0) - if (capture_returncode == 0): - self.checkPacketCount(10) - -def check_capture_fifo(self, cmd=None): - if not config.canMkfifo(): - self.skipTest('Test requires OS fifo support.') - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - self.assertIsNotNone(cmd) - capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') - testout_file = self.filename_from_id(testout_pcap) - fifo_file = self.filename_from_id('testout.fifo') - try: - # If a previous test left its fifo laying around, e.g. from a failure, remove it. - os.unlink(fifo_file) - except: - pass - os.mkfifo(fifo_file) - slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') - fifo_proc = self.startProcess( - ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), - shell=True) - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', fifo_file, - '-p', - '-w', testout_file, - '-a', 'duration:{}'.format(capture_duration), - )) - fifo_proc.kill() - self.assertTrue(os.path.isfile(testout_file)) - capture_returncode = capture_proc.returncode - self.assertEqual(capture_returncode, 0) - if (capture_returncode == 0): - self.checkPacketCount(8) - -def check_capture_stdin(self, cmd=None): - # Similar to suite_io.check_io_4_packets. - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - self.assertIsNotNone(cmd) - capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') - testout_file = self.filename_from_id(testout_pcap) - slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow') - capture_cmd = subprocesstest.capture_command(cmd, - '-i', '-', - '-w', testout_file, - '-a', 'duration:{}'.format(capture_duration), - shell=True - ) - if cmd == config.cmd_wireshark: - capture_cmd += ' -o console.log.level:127' - pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True) - pipe_returncode = pipe_proc.returncode - self.assertEqual(pipe_returncode, 0) - if cmd == config.cmd_wireshark: - self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.') - self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.') - self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.') - self.assertTrue(os.path.isfile(testout_file)) - if (pipe_returncode == 0): - self.checkPacketCount(8) - -def check_capture_read_filter(self, cmd=None): - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) - self.assertIsNotNone(cmd) - ping_procs = start_pinging(self) - testout_file = self.filename_from_id(testout_pcap) - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', config.capture_interface, - '-p', - '-w', testout_file, - '-2', - '-R', 'dcerpc.cn_call_id==123456', # Something unlikely. - '-c', '10', - '-a', 'duration:{}'.format(capture_duration), - '-f', 'icmp || icmp6', - )) - capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) - self.assertEqual(capture_returncode, 0) - - if (capture_returncode == 0): - self.checkPacketCount(0) - -def check_capture_snapshot_len(self, cmd=None): - if not config.canCapture(): - self.skipTest('Test requires capture privileges and an interface.') - if cmd == config.cmd_wireshark and not config.canDisplay(): - self.skipTest('Test requires a display.') - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) - self.assertIsNotNone(cmd) - ping_procs = start_pinging(self) - testout_file = self.filename_from_id(testout_pcap) - capture_proc = self.runProcess(subprocesstest.capture_command(cmd, - '-i', config.capture_interface, - '-p', - '-w', testout_file, - '-s', str(snapshot_len), - '-a', 'duration:{}'.format(capture_duration), - '-f', 'icmp || icmp6', - )) - capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) - self.assertEqual(capture_returncode, 0) - self.assertTrue(os.path.isfile(testout_file)) - - # Use tshark to filter out all packets larger than 68 bytes. - testout2_file = self.filename_from_id('testout2.pcap') - - filter_proc = self.runProcess((config.cmd_tshark, - '-r', testout_file, - '-w', testout2_file, - '-Y', 'frame.cap_len>{}'.format(snapshot_len), - )) - filter_returncode = filter_proc.returncode - self.assertEqual(capture_returncode, 0) - if (capture_returncode == 0): - self.checkPacketCount(0, cap_file=testout2_file) - -def check_dumpcap_autostop_stdin(self, packets=None, filesize=None): - # Similar to check_capture_stdin. - cmd = config.cmd_dumpcap - capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') - testout_file = self.filename_from_id(testout_pcap) - cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') - condition='oops:invalid' - - self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') - self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') - - if packets is not None: - condition = 'packets:{}'.format(packets) - elif filesize is not None: - condition = 'filesize:{}'.format(filesize) - - capture_cmd = subprocesstest.capture_command(cmd, - '-i', '-', - '-w', testout_file, - '-a', condition, - shell=True - ) - pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) - pipe_returncode = pipe_proc.returncode - self.assertEqual(pipe_returncode, 0) - self.assertTrue(os.path.isfile(testout_file)) - if (pipe_returncode != 0): - return - - if packets is not None: - self.checkPacketCount(packets) - elif filesize is not None: - capturekb = os.path.getsize(testout_file) / 1000 - self.assertGreaterEqual(capturekb, filesize) - -def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None): - # Similar to check_capture_stdin. - cmd = config.cmd_dumpcap - capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') - rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID - testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique) - testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique) - cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') - condition='oops:invalid' - - self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') - self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') - - if packets is not None: - condition = 'packets:{}'.format(packets) - elif filesize is not None: - condition = 'filesize:{}'.format(filesize) - - capture_cmd = subprocesstest.capture_command(cmd, - '-i', '-', - '-w', testout_file, - '-a', 'files:2', - '-b', condition, - shell=True - ) - pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) - pipe_returncode = pipe_proc.returncode - self.assertEqual(pipe_returncode, 0) - if (pipe_returncode != 0): - return - - rb_files = glob.glob(testout_glob) - for rbf in rb_files: - self.cleanup_files.append(rbf) - - self.assertEqual(len(rb_files), 2) - - for rbf in rb_files: - self.assertTrue(os.path.isfile(rbf)) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + self.assertEqual(capture_returncode, 0) + + if (capture_returncode == 0): + self.checkPacketCount(0) + return check_capture_read_filter_real + +@fixtures.fixture +def check_capture_snapshot_len(capture_interface, cmd_tshark): + def check_capture_snapshot_len_real(self, cmd=None): + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + ping_procs = start_pinging(self) + testout_file = self.filename_from_id(testout_pcap) + capture_proc = self.runProcess(capture_command(cmd, + '-i', capture_interface, + '-p', + '-w', testout_file, + '-s', str(snapshot_len), + '-a', 'duration:{}'.format(capture_duration), + '-f', 'icmp || icmp6', + )) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + self.assertEqual(capture_returncode, 0) + self.assertTrue(os.path.isfile(testout_file)) + + # Use tshark to filter out all packets larger than 68 bytes. + testout2_file = self.filename_from_id('testout2.pcap') + + filter_proc = self.runProcess((cmd_tshark, + '-r', testout_file, + '-w', testout2_file, + '-Y', 'frame.cap_len>{}'.format(snapshot_len), + )) + filter_returncode = filter_proc.returncode + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + self.checkPacketCount(0, cap_file=testout2_file) + return check_capture_snapshot_len_real + + +@fixtures.fixture +def check_dumpcap_autostop_stdin(cmd_dumpcap): + def check_dumpcap_autostop_stdin_real(self, packets=None, filesize=None): + # Similar to check_capture_stdin. + testout_file = self.filename_from_id(testout_pcap) + cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') + condition='oops:invalid' + + self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') + self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') + + if packets is not None: + condition = 'packets:{}'.format(packets) + elif filesize is not None: + condition = 'filesize:{}'.format(filesize) + + capture_cmd = ' '.join((cmd_dumpcap, + '-i', '-', + '-w', testout_file, + '-a', condition, + )) + pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + self.assertTrue(os.path.isfile(testout_file)) + if (pipe_returncode != 0): + return + if packets is not None: - self.checkPacketCount(packets, cap_file=rbf) + self.checkPacketCount(packets) elif filesize is not None: - capturekb = os.path.getsize(rbf) / 1000 + capturekb = os.path.getsize(testout_file) / 1000 self.assertGreaterEqual(capturekb, filesize) + return check_dumpcap_autostop_stdin_real + + +@fixtures.fixture +def check_dumpcap_ringbuffer_stdin(cmd_dumpcap): + def check_dumpcap_ringbuffer_stdin_real(self, packets=None, filesize=None): + # Similar to check_capture_stdin. + rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID + testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique) + testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique) + cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100') + condition='oops:invalid' + + self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize') + self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize') + if packets is not None: + condition = 'packets:{}'.format(packets) + elif filesize is not None: + condition = 'filesize:{}'.format(filesize) + + capture_cmd = ' '.join((cmd_dumpcap, + '-i', '-', + '-w', testout_file, + '-a', 'files:2', + '-b', condition, + )) + pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + if (pipe_returncode != 0): + return + + rb_files = glob.glob(testout_glob) + for rbf in rb_files: + self.cleanup_files.append(rbf) + + self.assertEqual(len(rb_files), 2) + + for rbf in rb_files: + self.assertTrue(os.path.isfile(rbf)) + if packets is not None: + self.checkPacketCount(packets, cap_file=rbf) + elif filesize is not None: + capturekb = os.path.getsize(rbf) / 1000 + self.assertGreaterEqual(capturekb, filesize) + return check_dumpcap_ringbuffer_stdin_real + + +@fixtures.mark_usefixtures('test_env') +@fixtures.uses_fixtures class case_wireshark_capture(subprocesstest.SubprocessTestCase): - def test_wireshark_capture_10_packets_to_file(self): + def test_wireshark_capture_10_packets_to_file(self, wireshark_k, check_capture_10_packets): '''Capture 10 packets from the network to a file using Wireshark''' - check_capture_10_packets(self, cmd=config.cmd_wireshark) + check_capture_10_packets(self, cmd=wireshark_k) # Wireshark doesn't currently support writing to stdout while capturing. - # def test_wireshark_capture_10_packets_to_stdout(self): + # def test_wireshark_capture_10_packets_to_stdout(self, wireshark_k, check_capture_10_packets): # '''Capture 10 packets from the network to stdout using Wireshark''' - # check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True) + # check_capture_10_packets(self, cmd=wireshark_k, to_stdout=True) - def test_wireshark_capture_from_fifo(self): + def test_wireshark_capture_from_fifo(self, wireshark_k, check_capture_fifo): '''Capture from a fifo using Wireshark''' - check_capture_fifo(self, cmd=config.cmd_wireshark) + check_capture_fifo(self, cmd=wireshark_k) - def test_wireshark_capture_from_stdin(self): + def test_wireshark_capture_from_stdin(self, wireshark_k, check_capture_stdin): '''Capture from stdin using Wireshark''' - check_capture_stdin(self, cmd=config.cmd_wireshark) + check_capture_stdin(self, cmd=wireshark_k) - def test_wireshark_capture_snapshot_len(self): + def test_wireshark_capture_snapshot_len(self, wireshark_k, check_capture_snapshot_len): '''Capture truncated packets using Wireshark''' - check_capture_snapshot_len(self, cmd=config.cmd_wireshark) + check_capture_snapshot_len(self, cmd=wireshark_k) + +@fixtures.mark_usefixtures('test_env') +@fixtures.uses_fixtures class case_tshark_capture(subprocesstest.SubprocessTestCase): - def test_tshark_capture_10_packets_to_file(self): + def test_tshark_capture_10_packets_to_file(self, cmd_tshark, check_capture_10_packets): '''Capture 10 packets from the network to a file using TShark''' - check_capture_10_packets(self, cmd=config.cmd_tshark) + check_capture_10_packets(self, cmd=cmd_tshark) - def test_tshark_capture_10_packets_to_stdout(self): + def test_tshark_capture_10_packets_to_stdout(self, cmd_tshark, check_capture_10_packets): '''Capture 10 packets from the network to stdout using TShark''' - check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True) + check_capture_10_packets(self, cmd=cmd_tshark, to_stdout=True) - def test_tshark_capture_from_fifo(self): + def test_tshark_capture_from_fifo(self, cmd_tshark, check_capture_fifo): '''Capture from a fifo using TShark''' - check_capture_fifo(self, cmd=config.cmd_tshark) + check_capture_fifo(self, cmd=cmd_tshark) - def test_tshark_capture_from_stdin(self): + def test_tshark_capture_from_stdin(self, cmd_tshark, check_capture_stdin): '''Capture from stdin using TShark''' - check_capture_stdin(self, cmd=config.cmd_tshark) + check_capture_stdin(self, cmd=cmd_tshark) - def test_tshark_capture_snapshot_len(self): + def test_tshark_capture_snapshot_len(self, cmd_tshark, check_capture_snapshot_len): '''Capture truncated packets using TShark''' - check_capture_snapshot_len(self, cmd=config.cmd_tshark) + check_capture_snapshot_len(self, cmd=cmd_tshark) + +@fixtures.mark_usefixtures('base_env') +@fixtures.uses_fixtures class case_dumpcap_capture(subprocesstest.SubprocessTestCase): - def test_dumpcap_capture_10_packets_to_file(self): + def test_dumpcap_capture_10_packets_to_file(self, cmd_dumpcap, check_capture_10_packets): '''Capture 10 packets from the network to a file using Dumpcap''' - check_capture_10_packets(self, cmd=config.cmd_dumpcap) + check_capture_10_packets(self, cmd=cmd_dumpcap) - def test_dumpcap_capture_10_packets_to_stdout(self): + def test_dumpcap_capture_10_packets_to_stdout(self, cmd_dumpcap, check_capture_10_packets): '''Capture 10 packets from the network to stdout using Dumpcap''' - check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True) + check_capture_10_packets(self, cmd=cmd_dumpcap, to_stdout=True) - def test_dumpcap_capture_from_fifo(self): + def test_dumpcap_capture_from_fifo(self, cmd_dumpcap, check_capture_fifo): '''Capture from a fifo using Dumpcap''' - check_capture_fifo(self, cmd=config.cmd_dumpcap) + check_capture_fifo(self, cmd=cmd_dumpcap) - def test_dumpcap_capture_from_stdin(self): + def test_dumpcap_capture_from_stdin(self, cmd_dumpcap, check_capture_stdin): '''Capture from stdin using Dumpcap''' - check_capture_stdin(self, cmd=config.cmd_dumpcap) + check_capture_stdin(self, cmd=cmd_dumpcap) - def test_dumpcap_capture_snapshot_len(self): + def test_dumpcap_capture_snapshot_len(self, check_capture_snapshot_len, cmd_dumpcap): '''Capture truncated packets using Dumpcap''' - check_capture_snapshot_len(self, cmd=config.cmd_dumpcap) + check_capture_snapshot_len(self, cmd=cmd_dumpcap) + +@fixtures.mark_usefixtures('base_env') +@fixtures.uses_fixtures class case_dumpcap_autostop(subprocesstest.SubprocessTestCase): # duration, filesize, packets, files - def test_dumpcap_autostop_filesize(self): + def test_dumpcap_autostop_filesize(self, check_dumpcap_autostop_stdin): '''Capture from stdin using Dumpcap until we reach a file size limit''' check_dumpcap_autostop_stdin(self, filesize=15) - def test_dumpcap_autostop_packets(self): + def test_dumpcap_autostop_packets(self, check_dumpcap_autostop_stdin): '''Capture from stdin using Dumpcap until we reach a packet limit''' check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary. + +@fixtures.mark_usefixtures('base_env') +@fixtures.uses_fixtures class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase): # duration, interval, filesize, packets, files # Need a function that finds ringbuffer file names. - def test_dumpcap_ringbuffer_filesize(self): + def test_dumpcap_ringbuffer_filesize(self, check_dumpcap_ringbuffer_stdin): '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit''' check_dumpcap_ringbuffer_stdin(self, filesize=15) - def test_dumpcap_ringbuffer_packets(self): + def test_dumpcap_ringbuffer_packets(self, check_dumpcap_ringbuffer_stdin): '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit''' check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary. |