aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoão Valverde <j@v6e.pt>2023-06-03 17:59:38 +0100
committerJoão Valverde <j@v6e.pt>2023-06-05 10:49:34 +0000
commitd871561ca4a8cb95a17b04580c84b162e89022e3 (patch)
treebce436e9ef2c35293911ecc9a2164690b5a08d0a
parenta7422a55bcf480f75447c6a014496728a66c9b30 (diff)
Tests: Replace self.runProcess() (capture)
Remove and indirect dependency on the unittest module as part of the move to a pytest-only implementation. Use a bare subprocess.run call. We may want to wrap this in another function or class.
-rw-r--r--test/suite_capture.py44
1 files changed, 26 insertions, 18 deletions
diff --git a/test/suite_capture.py b/test/suite_capture.py
index bdac36ae97..99b18015f0 100644
--- a/test/suite_capture.py
+++ b/test/suite_capture.py
@@ -90,7 +90,7 @@ def check_capture_10_packets(capture_interface, check_packet_count, traffic_gene
testout_file = result_file(testout_pcap)
stop_traffic = start_traffic()
if to_stdout:
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = subprocess.run(capture_command(cmd,
'-i', '"{}"'.format(capture_interface),
'-p',
'-w', '-',
@@ -103,7 +103,7 @@ def check_capture_10_packets(capture_interface, check_packet_count, traffic_gene
shell=True
)
else:
- capture_proc = self.runProcess(capture_command(cmd,
+ capture_proc = subprocess.run(capture_command(cmd,
'-i', capture_interface,
'-p',
'-w', testout_file,
@@ -137,12 +137,13 @@ def check_capture_fifo(check_packet_count, result_file):
fifo_proc = self.startProcess(
('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
shell=True)
- capture_proc = self.assertRun(capture_command(cmd,
+ capture_proc = subprocess.run(capture_command(cmd,
'-i', fifo_file,
'-p',
'-w', testout_file,
'-a', 'duration:{}'.format(capture_duration),
))
+ assert capture_proc.returncode == 0
fifo_proc.kill()
assert os.path.isfile(testout_file)
check_packet_count(8, testout_file)
@@ -168,7 +169,8 @@ def check_capture_stdin(check_packet_count, result_file):
capture_cmd += ' --log-level=info'
if sysconfig.get_platform().startswith('mingw'):
fixtures.skip('FIXME Pipes are broken with the MSYS2 shell')
- pipe_proc = self.assertRun(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_proc = subprocess.run(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ assert pipe_proc.returncode == 0
if is_gui:
assert grep_output('Wireshark is up and ready to go'), 'No startup message.'
assert grep_output('Capture started'), 'No capture start message.'
@@ -185,7 +187,7 @@ def check_capture_read_filter(capture_interface, traffic_generator, check_packet
assert cmd is not None
testout_file = result_file(testout_pcap)
stop_traffic = start_traffic()
- capture_proc = self.assertRun(capture_command(cmd,
+ capture_proc = subprocess.run(capture_command(cmd,
'-i', capture_interface,
'-p',
'-w', testout_file,
@@ -195,6 +197,7 @@ def check_capture_read_filter(capture_interface, traffic_generator, check_packet
'-a', 'duration:{}'.format(capture_duration),
'-f', cfilter,
))
+ assert capture_proc.returncode == 0
stop_traffic()
check_packet_count(0, testout_file)
return check_capture_read_filter_real
@@ -206,7 +209,7 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator,
assert cmd is not None
stop_traffic = start_traffic()
testout_file = result_file(testout_pcap)
- capture_proc = self.assertRun(capture_command(cmd,
+ capture_proc = subprocess.run(capture_command(cmd,
'-i', capture_interface,
'-p',
'-w', testout_file,
@@ -214,17 +217,19 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator,
'-a', 'duration:{}'.format(capture_duration),
'-f', cfilter,
))
+ assert capture_proc.returncode == 0
stop_traffic()
assert os.path.isfile(testout_file)
# Use tshark to filter out all packets larger than 68 bytes.
testout2_file = result_file('testout2.pcap')
- filter_proc = self.assertRun((cmd_tshark,
+ filter_proc = subprocess.run((cmd_tshark,
'-r', testout_file,
'-w', testout2_file,
'-Y', 'frame.cap_len>{}'.format(snapshot_len),
))
+ assert filter_proc.returncode == 0
check_packet_count(0, testout2_file)
return check_capture_snapshot_len_real
@@ -252,7 +257,8 @@ def check_dumpcap_autostop_stdin(cmd_dumpcap, check_packet_count, result_file):
))
if sysconfig.get_platform().startswith('mingw'):
fixtures.skip('FIXME Pipes are broken with the MSYS2 shell')
- pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_proc = subprocess.run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_proc.returncode == 0
assert os.path.isfile(testout_file)
if packets is not None:
@@ -289,7 +295,8 @@ def check_dumpcap_ringbuffer_stdin(cmd_dumpcap, check_packet_count, result_file)
))
if sysconfig.get_platform().startswith('mingw'):
fixtures.skip('FIXME Pipes are broken with the MSYS2 shell')
- pipe_proc = self.assertRun(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_proc = subprocess.run(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
+ pipe_proc.returncode == 0
rb_files = glob.glob(testout_glob)
assert len(rb_files) == 2
@@ -405,7 +412,8 @@ def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, check_packet_count, c
capture_cmd = capture_command(cmd_dumpcap, *capture_cmd_args)
- capture_proc = self.assertRun(capture_cmd)
+ capture_proc = subprocess.run(capture_cmd)
+ capture_proc.returncode == 0
for fifo_proc in fifo_procs: fifo_proc.kill()
rb_files = []
@@ -445,26 +453,26 @@ def check_dumpcap_pcapng_sections(cmd_dumpcap, cmd_tshark, check_packet_count, c
for check_val in check_vals:
check_packet_count(check_val['packet_count'], check_val['filename'])
- tshark_proc = self.assertRun(capture_command(cmd_tshark,
+ tshark_proc = subprocess.run(capture_command(cmd_tshark,
'-r', check_val['filename'],
'-V',
'-X', 'read_format:MIME Files Format'
- ))
+ ), capture_output=True, encoding='utf-8')
# XXX Are there any other sanity checks we should run?
if idb_compare_eq:
- assert count_output(tshark_proc.stdout_str, r'Block: Interface Description Block') \
+ assert count_output(tshark_proc.stdout, r'Block: Interface Description Block') \
== check_val['idb_count']
else:
- assert count_output(tshark_proc.stdout_str, r'Block: Interface Description Block') \
+ assert count_output(tshark_proc.stdout, r'Block: Interface Description Block') \
>= check_val['idb_count']
idb_compare_eq = True
- assert count_output(tshark_proc.stdout_str, r'Option: User Application = Passthrough test #1') \
+ assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #1') \
== check_val['ua_pt1_count']
- assert count_output(tshark_proc.stdout_str, r'Option: User Application = Passthrough test #2') \
+ assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #2') \
== check_val['ua_pt2_count']
- assert count_output(tshark_proc.stdout_str, r'Option: User Application = Passthrough test #3') \
+ assert count_output(tshark_proc.stdout, r'Option: User Application = Passthrough test #3') \
== check_val['ua_pt3_count']
- assert count_output(tshark_proc.stdout_str, r'Option: User Application = Dumpcap \(Wireshark\)') \
+ assert count_output(tshark_proc.stdout, r'Option: User Application = Dumpcap \(Wireshark\)') \
== check_val['ua_dc_count']
return check_dumpcap_pcapng_sections_real