diff options
author | João Valverde <j@v6e.pt> | 2023-06-03 17:59:38 +0100 |
---|---|---|
committer | João Valverde <j@v6e.pt> | 2023-06-05 10:49:34 +0000 |
commit | d871561ca4a8cb95a17b04580c84b162e89022e3 (patch) | |
tree | bce436e9ef2c35293911ecc9a2164690b5a08d0a | |
parent | a7422a55bcf480f75447c6a014496728a66c9b30 (diff) |
Tests: Replace self.runProcess() (capture)
Remove and indirect dependency on the unittest module as
part of the move to a pytest-only implementation.
Use a bare subprocess.run call. We may want to wrap this in
another function or class.
-rw-r--r-- | test/suite_capture.py | 44 |
1 files changed, 26 insertions, 18 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py index bdac36ae97..99b18015f0 100644 --- a/test/suite_capture.py +++ b/test/suite_capture.py @@ -90,7 +90,7 @@ def check_capture_10_packets(capture_interface, check_packet_count, traffic_gene testout_file = result_file(testout_pcap) stop_traffic = start_traffic() if to_stdout: - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = subprocess.run(capture_command(cmd, '-i', '"{}"'.format(capture_interface), '-p', '-w', '-', @@ -103,7 +103,7 @@ def check_capture_10_packets(capture_interface, check_packet_count, traffic_gene shell=True ) else: - capture_proc = self.runProcess(capture_command(cmd, + capture_proc = subprocess.run(capture_command(cmd, '-i', capture_interface, '-p', '-w', testout_file, @@ -137,12 +137,13 @@ def check_capture_fifo(check_packet_count, result_file): fifo_proc = self.startProcess( ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), shell=True) - capture_proc = self.assertRun(capture_command(cmd, + capture_proc = subprocess.run(capture_command(cmd, '-i', fifo_file, '-p', '-w', testout_file, '-a', 'duration:{}'.format(capture_duration), )) + assert capture_proc.returncode == 0 fifo_proc.kill() assert os.path.isfile(testout_file) check_packet_count(8, testout_file) @@ -168,7 +169,8 @@ def check_capture_stdin(check_packet_count, result_file): capture_cmd += ' --log-level=info' if sysconfig.get_platform().startswith('mingw'): fixtures.skip('FIXME Pipes are broken with the MSYS2 shell') - pipe_proc = self.assertRun(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_proc = subprocess.run(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True) + assert pipe_proc.returncode == 0 if is_gui: assert grep_output('Wireshark is up and ready to go'), 'No startup message.' assert grep_output('Capture started'), 'No capture start message.' @@ -185,7 +187,7 @@ def check_capture_read_filter(capture_interface, traffic_generator, check_packet assert cmd is not None testout_file = result_file(testout_pcap) stop_traffic = start_traffic() - capture_proc = self.assertRun(capture_command(cmd, + capture_proc = subprocess.run(capture_command(cmd, '-i', capture_interface, '-p', '-w', testout_file, @@ -195,6 +197,7 @@ def check_capture_read_filter(capture_interface, traffic_generator, check_packet '-a', 'duration:{}'.format(capture_duration), '-f', cfilter, )) + assert capture_proc.returncode == 0 stop_traffic() check_packet_count(0, testout_file) return check_capture_read_filter_real @@ -206,7 +209,7 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator, assert cmd is not None stop_traffic = start_traffic() testout_file = result_file(testout_pcap) - capture_proc = self.assertRun(capture_command(cmd, + capture_proc = subprocess.run(capture_command(cmd, '-i', capture_interface, '-p', '-w', testout_file, @@ -214,17 +217,19 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator, '-a', 'duration:{}'.format(capture_duration), '-f', cfilter, )) + assert capture_proc.returncode == 0 stop_traffic() assert os.path.isfile(testout_file) # Use tshark to filter out all packets larger than 68 bytes. testout2_file = result_file('testout2.pcap') - filter_proc = self.assertRun((cmd_tshark, + filter_proc = subprocess.run((cmd_tshark, '-r', testout_file, '-w', testout2_file, '-Y', 'frame.cap_len>{}'.format(snapshot_len), )) + assert filter_proc.returncode == 0 check_packet_count(0, testout2_file) return check_capture_snapshot_len_real @@ -252,7 +257,8 @@ def check_dumpcap_autostop_stdin(cmd_dumpcap, check_packet_count, result_file): )) if sysconfig.get_platform().startswith('mingw'): fixtures.skip('FIXME Pipes are broken with the MSYS2 shell') - pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_proc = subprocess.run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_proc.returncode == 0 assert os.path.isfile(testout_file) if packets is not None: @@ -289,7 +295,8 @@ def check_dumpcap_ringbuffer_stdin(cmd_dumpcap, check_packet_count, result_file) )) if sysconfig.get_platform().startswith('mingw'): fixtures.skip('FIXME Pipes are broken with the MSYS2 shell') - pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_proc = subprocess.run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True) + pipe_proc.returncode == 0 rb_files = glob.glob(testout_glob) assert len(rb_files) == 2 @@ -405,7 +412,8 @@ def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, check_packet_count, c capture_cmd = capture_command(cmd_dumpcap, *capture_cmd_args) - capture_proc = self.assertRun(capture_cmd) + capture_proc = subprocess.run(capture_cmd) + capture_proc.returncode == 0 for fifo_proc in fifo_procs: fifo_proc.kill() rb_files = [] @@ -445,26 +453,26 @@ def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, check_packet_count, c for check_val in check_vals: check_packet_count(check_val['packet_count'], check_val['filename']) - tshark_proc = self.assertRun(capture_command(cmd_tshark, + tshark_proc = subprocess.run(capture_command(cmd_tshark, '-r', check_val['filename'], '-V', '-X', 'read_format:MIME Files Format' - )) + ), capture_output=True, encoding='utf-8') # XXX Are there any other sanity checks we should run? if idb_compare_eq: - assert count_output(tshark_proc.stdout_str, r'Block: Interface Description Block') \ + assert count_output(tshark_proc.stdout, r'Block: Interface Description Block') \ == check_val['idb_count'] else: - assert count_output(tshark_proc.stdout_str, r'Block: Interface Description Block') \ + assert count_output(tshark_proc.stdout, r'Block: Interface Description Block') \ >= check_val['idb_count'] idb_compare_eq = True - assert count_output(tshark_proc.stdout_str, r'Option: User Application = Passthrough test #1') \ + assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #1') \ == check_val['ua_pt1_count'] - assert count_output(tshark_proc.stdout_str, r'Option: User Application = Passthrough test #2') \ + assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #2') \ == check_val['ua_pt2_count'] - assert count_output(tshark_proc.stdout_str, r'Option: User Application = Passthrough test #3') \ + assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #3') \ == check_val['ua_pt3_count'] - assert count_output(tshark_proc.stdout_str, r'Option: User Application = Dumpcap \(Wireshark\)') \ + assert count_output(tshark_proc.stdout, r'Option: User Application = Dumpcap \(Wireshark\)') \ == check_val['ua_dc_count'] return check_dumpcap_pcapng_sections_real |