diff options
author | Peter Wu <peter@lekensteyn.nl> | 2018-11-16 02:28:32 +0100 |
---|---|---|
committer | Peter Wu <peter@lekensteyn.nl> | 2018-11-16 13:55:40 +0000 |
commit | 88ce087dcfc3aaae3d290276333091d324a6fcb9 (patch) | |
tree | 41b474b1c641029ba36d4099a8877625e64da64a | |
parent | 3ab521118a0d068e0d5c795a5a57b13cd7790a75 (diff) |
test: finalize suite_capture conversion to fixtures, drop config.py
Convert the old start_pinging routine to use pytest fixtures, rewriting
it to enable a different generator that uses (for example) UDP.
Remove the config module since it is no longer neded.
Change-Id: Ic4727157faab084b41144e8f16ea44f59c9037d8
Reviewed-on: https://code.wireshark.org/review/30659
Petri-Dish: Peter Wu <peter@lekensteyn.nl>
Tested-by: Petri Dish Buildbot
Reviewed-by: Peter Wu <peter@lekensteyn.nl>
-rw-r--r-- | test/config.py | 257 | ||||
-rw-r--r-- | test/fixtures_ws.py | 43 | ||||
-rw-r--r-- | test/subprocesstest.py | 8 | ||||
-rw-r--r-- | test/suite_capture.py | 89 |
4 files changed, 89 insertions, 308 deletions
diff --git a/test/config.py b/test/config.py deleted file mode 100644 index 87aa1270ff..0000000000 --- a/test/config.py +++ /dev/null @@ -1,257 +0,0 @@ -# -# -*- coding: utf-8 -*- -# Wireshark tests -# By Gerald Combs <gerald@wireshark.org> -# -# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping -# -# SPDX-License-Identifier: GPL-2.0-or-later -# -'''Configuration''' - -import logging -import os -import os.path -import re -import shutil -import subprocess -import sys -import tempfile - -commands = ( - 'capinfos', - 'dumpcap', - 'mergecap', - 'rawshark', - 'sharkd', - 'text2pcap', - 'tshark', - 'wireshark', -) - -can_capture = False -capture_interface = None - -# Our executables -program_path = None -# Strings -cmd_capinfos = None -cmd_dumpcap = None -cmd_mergecap = None -cmd_rawshark = None -cmd_tshark = None -cmd_text2pcap = None -cmd_wireshark = None -# Arrays -args_ping = None - -have_lua = False -have_nghttp2 = False -have_kerberos = False -have_libgcrypt16 = False -have_libgcrypt17 = False - -test_env = None -program_path = None -home_path = None -conf_path = None -custom_profile_path = None -custom_profile_name = 'Custom Profile' - -this_dir = os.path.dirname(__file__) -baseline_dir = os.path.join(this_dir, 'baseline') -capture_dir = os.path.join(this_dir, 'captures') -config_dir = os.path.join(this_dir, 'config') -key_dir = os.path.join(this_dir, 'keys') -lua_dir = os.path.join(this_dir, 'lua') -tools_dir = os.path.join(this_dir, '..', 'tools') - -all_groups = [] - -def canCapture(): - # XXX This appears to be evaluated at the wrong time when called - # from a unittest.skipXXX decorator. - return can_capture and capture_interface is not None - -def setCanCapture(new_cc): - can_capture = new_cc - -def setCaptureInterface(iface): - global capture_interface - setCanCapture(True) - capture_interface = iface - -def canMkfifo(): - return not sys.platform.startswith('win32') - -def canDisplay(): - if sys.platform.startswith('win32') or sys.platform.startswith('darwin'): - return True - # Qt requires XKEYBOARD and Xrender, which Xvnc doesn't provide. - return False - -def getTsharkInfo(): - global have_lua - global have_nghttp2 - global have_kerberos - global have_libgcrypt16 - global have_libgcrypt17 - if not cmd_tshark: - logging.warning("tshark binary is not yet set") - return - try: - tshark_v = subprocess.check_output( - (cmd_tshark, '--version'), - stderr=subprocess.PIPE, - universal_newlines=True, - env=baseEnv() - ).replace('\n', ' ') - except subprocess.CalledProcessError as e: - logging.warning("Failed to detect tshark features: %s", e) - tshark_v = '' - have_lua = bool(re.search('with +Lua', tshark_v)) - have_nghttp2 = bool(re.search('with +nghttp2', tshark_v)) - have_kerberos = bool(re.search('(with +MIT +Kerberos|with +Heimdal +Kerberos)', tshark_v)) - gcry_m = re.search('with +Gcrypt +([0-9]+\.[0-9]+)', tshark_v) - have_libgcrypt16 = gcry_m and float(gcry_m.group(1)) >= 1.6 - have_libgcrypt17 = gcry_m and float(gcry_m.group(1)) >= 1.7 - -def getDefaultCaptureInterface(): - '''Choose a default capture interface for our platform. Currently Windows only.''' - global capture_interface - if capture_interface: - return - if cmd_dumpcap is None: - return - if not sys.platform.startswith('win32'): - return - try: - dumpcap_d_data = subprocess.check_output((cmd_dumpcap, '-D'), stderr=subprocess.PIPE) - dumpcap_d_stdout = dumpcap_d_data.decode('UTF-8', 'replace') - for d_line in dumpcap_d_stdout.splitlines(): - iface_m = re.search('(\d+)\..*(Ethernet|Network Connection|VMware|Intel)', d_line) - if iface_m: - capture_interface = iface_m.group(1) - break - except: - pass - -def getPingCommand(): - '''Return an argument list required to ping www.wireshark.org for 60 seconds.''' - global args_ping - # XXX The shell script tests swept over packet sizes from 1 to 240 every 0.25 seconds. - if sys.platform.startswith('win32'): - # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping - args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org') - elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'): - args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org') - elif sys.platform.startswith('darwin'): - args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org') - # XXX Other BSDs, Solaris, etc - -def setProgramPath(path): - global program_path - program_path = path - retval = True - dotexe = '' - if sys.platform.startswith('win32'): - dotexe = '.exe' - for cmd in commands: - cmd_var = 'cmd_' + cmd - cmd_path = os.path.normpath(os.path.join(path, cmd + dotexe)) - if not os.path.exists(cmd_path) or not os.access(cmd_path, os.X_OK): - cmd_path = None - program_path = None - retval = False - globals()[cmd_var] = cmd_path - getTsharkInfo() - getDefaultCaptureInterface() - setUpHostFiles() - return retval - -def baseEnv(home=None): - """A modified environment to ensure reproducible tests.""" - env = os.environ.copy() - env['TZ'] = 'UTC' - home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME' - if home: - env[home_env] = home - else: - # This directory is supposed not to be written and is used by - # "readonly" tests that do not read any other preferences. - env[home_env] = "/wireshark-tests-unused" - return env - -def setUpTestEnvironment(): - global home_path - global conf_path - global custom_profile_path - global test_env - - # Create our directories - test_confdir = tempfile.mkdtemp(prefix='wireshark-tests.') - home_path = os.path.join(test_confdir, 'home') - if sys.platform.startswith('win32'): - conf_path = os.path.join(home_path, 'Wireshark') - else: - conf_path = os.path.join(home_path, '.config', 'wireshark') - os.makedirs(conf_path) - # Test spaces while we're here. - custom_profile_path = os.path.join(conf_path, 'profiles', custom_profile_name) - os.makedirs(custom_profile_path) - - # Populate our UAT files - uat_files = [ - '80211_keys', - 'dtlsdecrypttablefile', - 'esp_sa', - 'ssl_keys', - 'c1222_decryption_table', - 'ikev1_decryption_table', - 'ikev2_decryption_table', - ] - for uat in uat_files: - setUpUatFile(conf_path, uat) - - # Set up our environment - test_env = baseEnv(home=home_path) - test_env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = 'True' - test_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True' - -def setUpUatFile(conf_path, conf_file): - template = os.path.join(os.path.dirname(__file__), 'config', conf_file) + '.tmpl' - with open(template, 'r') as tplt_fd: - tplt_contents = tplt_fd.read() - tplt_fd.close() - key_dir_path = os.path.join(key_dir, '') - # uat.c replaces backslashes... - key_dir_path = key_dir_path.replace('\\', '\\x5c') - cf_contents = tplt_contents.replace('TEST_KEYS_DIR', key_dir_path) - out_file = os.path.join(conf_path, conf_file) - with open(out_file, 'w') as cf_fd: - cf_fd.write(cf_contents) - cf_fd.close() - -def setUpHostFiles(): - global program_path - global conf_path - global custom_profile_path - if program_path is None: - return - if conf_path is None or custom_profile_path is None: - setUpTestEnvironment() - bundle_path = os.path.join(program_path, 'Wireshark.app', 'Contents', 'MacOS') - if os.path.isdir(bundle_path): - global_path = bundle_path - else: - global_path = program_path - hosts_path_pfx = os.path.join(this_dir, 'hosts.') - shutil.copyfile(hosts_path_pfx + 'global', os.path.join(global_path, 'hosts')) - shutil.copyfile(hosts_path_pfx + 'personal', os.path.join(conf_path, 'hosts')) - shutil.copyfile(hosts_path_pfx + 'custom', os.path.join(custom_profile_path, 'hosts')) - -if sys.platform.startswith('win32') or sys.platform.startswith('darwin'): - can_capture = True - -# Initialize ourself. -getPingCommand() diff --git a/test/fixtures_ws.py b/test/fixtures_ws.py index 55e3e2151c..053d89e0ce 100644 --- a/test/fixtures_ws.py +++ b/test/fixtures_ws.py @@ -17,7 +17,6 @@ import tempfile import types import fixtures -import config import subprocesstest @@ -126,15 +125,14 @@ def wireshark_command(cmd_wireshark): @fixtures.fixture(scope='session') -def features(cmd_tshark): +def features(cmd_tshark, make_env): '''Returns an object describing available features in tshark.''' try: - # XXX stop using config tshark_v = subprocess.check_output( (cmd_tshark, '--version'), stderr=subprocess.PIPE, universal_newlines=True, - env=config.baseEnv() + env=make_env() ) tshark_v = re.sub(r'\s+', ' ', tshark_v) except subprocess.CalledProcessError as ex: @@ -190,14 +188,28 @@ def conf_path(home_path): return conf_path +@fixtures.fixture(scope='session') +def make_env(): + """A factory for a modified environment to ensure reproducible tests.""" + def make_env_real(home=None): + env = os.environ.copy() + env['TZ'] = 'UTC' + home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME' + if home: + env[home_env] = home + else: + # This directory is supposed not to be written and is used by + # "readonly" tests that do not read any other preferences. + env[home_env] = "/wireshark-tests-unused" + return env + return make_env_real + + @fixtures.fixture -def base_env(home_path, request): +def base_env(home_path, make_env, request): """A modified environment to ensure reproducible tests. Tests can modify this environment as they see fit.""" - env = os.environ.copy() - env['TZ'] = 'UTC' - home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME' - env[home_env] = home_path + env = make_env(home=home_path) # Remove this if test instances no longer inherit from SubprocessTestCase? if isinstance(request.instance, subprocesstest.SubprocessTestCase): @@ -207,7 +219,7 @@ def base_env(home_path, request): @fixtures.fixture -def test_env(base_env, conf_path, request): +def test_env(base_env, conf_path, request, dirs): '''A process environment with a populated configuration directory.''' # Populate our UAT files uat_files = [ @@ -219,9 +231,16 @@ def test_env(base_env, conf_path, request): 'ikev1_decryption_table', 'ikev2_decryption_table', ] + # uat.c replaces backslashes... + key_dir_path = os.path.join(dirs.key_dir, '').replace('\\', '\\x5c') for uat in uat_files: - # XXX stop using config - config.setUpUatFile(conf_path, uat) + template_file = os.path.join(dirs.config_dir, uat + '.tmpl') + out_file = os.path.join(conf_path, uat) + with open(template_file, 'r') as f: + template_contents = f.read() + cf_contents = template_contents.replace('TEST_KEYS_DIR', key_dir_path) + with open(out_file, 'w') as f: + f.write(cf_contents) env = base_env env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = '1' diff --git a/test/subprocesstest.py b/test/subprocesstest.py index 060ebeb3a4..9f6b2f001f 100644 --- a/test/subprocesstest.py +++ b/test/subprocesstest.py @@ -23,8 +23,6 @@ import unittest # - Add a subprocesstest.SkipUnlessCapture decorator? # - Try to catch crashes? See the comments below in waitProcess. -# XXX This should probably be in config.py and settable from -# the command line. process_timeout = 300 # Seconds def cat_dhcp_command(mode): @@ -33,7 +31,8 @@ def cat_dhcp_command(mode): sd_cmd = '' if sys.executable: sd_cmd = '"{}" '.format(sys.executable) - sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode) + this_dir = os.path.dirname(__file__) + sd_cmd += os.path.join(this_dir, 'util_dump_dhcp_pcap.py ' + mode) return sd_cmd class LoggingPopen(subprocess.Popen): @@ -256,9 +255,6 @@ class SubprocessTestCase(unittest.TestCase): # fixture (via a test method parameter or class decorator). assert not (env is None and hasattr(self, '_fixture_request')), \ "Decorate class with @fixtures.mark_usefixtures('test_env')" - if env is None: - # Avoid using the test user's real environment by default. - env = config.test_env proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd) self.processes.append(proc) return proc diff --git a/test/suite_capture.py b/test/suite_capture.py index a9ac1aa9c0..1eaf4ad621 100644 --- a/test/suite_capture.py +++ b/test/suite_capture.py @@ -9,7 +9,6 @@ # '''Capture tests''' -import config import glob import os import subprocess @@ -24,20 +23,48 @@ capture_duration = 5 testout_pcap = 'testout.pcap' snapshot_len = 96 -def start_pinging(self): - ping_procs = [] +@fixtures.fixture +def traffic_generator(): + ''' + Traffic generator factory. Invoking it returns a tuple (start_func, cfilter) + where cfilter is a capture filter to match the generated traffic. + start_func can be invoked to start generating traffic and returns a function + which can be used to stop traffic generation early. + Currently calls ping www.wireshark.org for 60 seconds. + ''' + # XXX replace this by something that generates UDP traffic to localhost? + # That would avoid external access which is forbidden by the Debian policy. + nprocs = 1 if sys.platform.startswith('win32'): - # Fake '-i' with a subsecond interval. - for st in (0.1, 0.1, 0): - ping_procs.append(self.startProcess(config.args_ping)) - time.sleep(st) + # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping + args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org') + nprocs = 3 + elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'): + args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org') + elif sys.platform.startswith('darwin'): + args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org') else: - ping_procs.append(self.startProcess(config.args_ping)) - return ping_procs - -def stop_pinging(ping_procs): - for proc in ping_procs: - proc.kill() + # XXX Other BSDs, Solaris, etc + fixtures.skip('ping utility is unavailable - cannot generate traffic') + procs = [] + def kill_processes(): + for proc in procs: + proc.kill() + for proc in procs: + proc.wait() + procs.clear() + def start_processes(): + for i in range(nprocs): + if i > 0: + # Fake subsecond interval if the ping utility lacks support. + time.sleep(0.1) + proc = subprocess.Popen(args_ping, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + procs.append(proc) + return kill_processes + try: + yield start_processes, 'icmp || icmp6' + finally: + kill_processes() @fixtures.fixture(scope='session') @@ -54,14 +81,12 @@ def capture_command(*cmd_args, shell=False): @fixtures.fixture -def check_capture_10_packets(capture_interface, cmd_dumpcap): +def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator): + start_traffic, cfilter = traffic_generator def check_capture_10_packets_real(self, cmd=None, to_stdout=False): - # Similar to suite_io.check_io_4_packets. - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) self.assertIsNotNone(cmd) testout_file = self.filename_from_id(testout_pcap) - ping_procs = start_pinging(self) + stop_traffic = start_traffic() if to_stdout: capture_proc = self.runProcess(capture_command(cmd, '-i', '"{}"'.format(capture_interface), @@ -69,7 +94,7 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap): '-w', '-', '-c', '10', '-a', 'duration:{}'.format(capture_duration), - '-f', '"icmp || icmp6"', + '-f', '"{}"'.format(cfilter), '>', testout_file, shell=True ), @@ -82,10 +107,10 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap): '-w', testout_file, '-c', '10', '-a', 'duration:{}'.format(capture_duration), - '-f', 'icmp || icmp6', + '-f', cfilter, )) + stop_traffic() capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) if capture_returncode != 0: self.log_fd.write('{} -D output:\n'.format(cmd)) self.runProcess((cmd, '-D')) @@ -160,13 +185,12 @@ def check_capture_stdin(cmd_dumpcap): @fixtures.fixture -def check_capture_read_filter(capture_interface): +def check_capture_read_filter(capture_interface, traffic_generator): + start_traffic, cfilter = traffic_generator def check_capture_read_filter_real(self, cmd=None): - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) self.assertIsNotNone(cmd) - ping_procs = start_pinging(self) testout_file = self.filename_from_id(testout_pcap) + stop_traffic = start_traffic() capture_proc = self.runProcess(capture_command(cmd, '-i', capture_interface, '-p', @@ -175,10 +199,10 @@ def check_capture_read_filter(capture_interface): '-R', 'dcerpc.cn_call_id==123456', # Something unlikely. '-c', '10', '-a', 'duration:{}'.format(capture_duration), - '-f', 'icmp || icmp6', + '-f', cfilter, )) + stop_traffic() capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) self.assertEqual(capture_returncode, 0) if (capture_returncode == 0): @@ -186,12 +210,11 @@ def check_capture_read_filter(capture_interface): return check_capture_read_filter_real @fixtures.fixture -def check_capture_snapshot_len(capture_interface, cmd_tshark): +def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator): + start_traffic, cfilter = traffic_generator def check_capture_snapshot_len_real(self, cmd=None): - if not config.args_ping: - self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) self.assertIsNotNone(cmd) - ping_procs = start_pinging(self) + stop_traffic = start_traffic() testout_file = self.filename_from_id(testout_pcap) capture_proc = self.runProcess(capture_command(cmd, '-i', capture_interface, @@ -199,10 +222,10 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark): '-w', testout_file, '-s', str(snapshot_len), '-a', 'duration:{}'.format(capture_duration), - '-f', 'icmp || icmp6', + '-f', cfilter, )) + stop_traffic() capture_returncode = capture_proc.returncode - stop_pinging(ping_procs) self.assertEqual(capture_returncode, 0) self.assertTrue(os.path.isfile(testout_file)) |