aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPeter Wu <peter@lekensteyn.nl>2018-11-16 02:28:32 +0100
committerPeter Wu <peter@lekensteyn.nl>2018-11-16 13:55:40 +0000
commit88ce087dcfc3aaae3d290276333091d324a6fcb9 (patch)
tree41b474b1c641029ba36d4099a8877625e64da64a
parent3ab521118a0d068e0d5c795a5a57b13cd7790a75 (diff)
test: finalize suite_capture conversion to fixtures, drop config.py
Convert the old start_pinging routine to use pytest fixtures, rewriting it to enable a different generator that uses (for example) UDP. Remove the config module since it is no longer neded. Change-Id: Ic4727157faab084b41144e8f16ea44f59c9037d8 Reviewed-on: https://code.wireshark.org/review/30659 Petri-Dish: Peter Wu <peter@lekensteyn.nl> Tested-by: Petri Dish Buildbot Reviewed-by: Peter Wu <peter@lekensteyn.nl>
-rw-r--r--test/config.py257
-rw-r--r--test/fixtures_ws.py43
-rw-r--r--test/subprocesstest.py8
-rw-r--r--test/suite_capture.py89
4 files changed, 89 insertions, 308 deletions
diff --git a/test/config.py b/test/config.py
deleted file mode 100644
index 87aa1270ff..0000000000
--- a/test/config.py
+++ /dev/null
@@ -1,257 +0,0 @@
-#
-# -*- coding: utf-8 -*-
-# Wireshark tests
-# By Gerald Combs <gerald@wireshark.org>
-#
-# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
-#
-# SPDX-License-Identifier: GPL-2.0-or-later
-#
-'''Configuration'''
-
-import logging
-import os
-import os.path
-import re
-import shutil
-import subprocess
-import sys
-import tempfile
-
-commands = (
- 'capinfos',
- 'dumpcap',
- 'mergecap',
- 'rawshark',
- 'sharkd',
- 'text2pcap',
- 'tshark',
- 'wireshark',
-)
-
-can_capture = False
-capture_interface = None
-
-# Our executables
-program_path = None
-# Strings
-cmd_capinfos = None
-cmd_dumpcap = None
-cmd_mergecap = None
-cmd_rawshark = None
-cmd_tshark = None
-cmd_text2pcap = None
-cmd_wireshark = None
-# Arrays
-args_ping = None
-
-have_lua = False
-have_nghttp2 = False
-have_kerberos = False
-have_libgcrypt16 = False
-have_libgcrypt17 = False
-
-test_env = None
-program_path = None
-home_path = None
-conf_path = None
-custom_profile_path = None
-custom_profile_name = 'Custom Profile'
-
-this_dir = os.path.dirname(__file__)
-baseline_dir = os.path.join(this_dir, 'baseline')
-capture_dir = os.path.join(this_dir, 'captures')
-config_dir = os.path.join(this_dir, 'config')
-key_dir = os.path.join(this_dir, 'keys')
-lua_dir = os.path.join(this_dir, 'lua')
-tools_dir = os.path.join(this_dir, '..', 'tools')
-
-all_groups = []
-
-def canCapture():
- # XXX This appears to be evaluated at the wrong time when called
- # from a unittest.skipXXX decorator.
- return can_capture and capture_interface is not None
-
-def setCanCapture(new_cc):
- can_capture = new_cc
-
-def setCaptureInterface(iface):
- global capture_interface
- setCanCapture(True)
- capture_interface = iface
-
-def canMkfifo():
- return not sys.platform.startswith('win32')
-
-def canDisplay():
- if sys.platform.startswith('win32') or sys.platform.startswith('darwin'):
- return True
- # Qt requires XKEYBOARD and Xrender, which Xvnc doesn't provide.
- return False
-
-def getTsharkInfo():
- global have_lua
- global have_nghttp2
- global have_kerberos
- global have_libgcrypt16
- global have_libgcrypt17
- if not cmd_tshark:
- logging.warning("tshark binary is not yet set")
- return
- try:
- tshark_v = subprocess.check_output(
- (cmd_tshark, '--version'),
- stderr=subprocess.PIPE,
- universal_newlines=True,
- env=baseEnv()
- ).replace('\n', ' ')
- except subprocess.CalledProcessError as e:
- logging.warning("Failed to detect tshark features: %s", e)
- tshark_v = ''
- have_lua = bool(re.search('with +Lua', tshark_v))
- have_nghttp2 = bool(re.search('with +nghttp2', tshark_v))
- have_kerberos = bool(re.search('(with +MIT +Kerberos|with +Heimdal +Kerberos)', tshark_v))
- gcry_m = re.search('with +Gcrypt +([0-9]+\.[0-9]+)', tshark_v)
- have_libgcrypt16 = gcry_m and float(gcry_m.group(1)) >= 1.6
- have_libgcrypt17 = gcry_m and float(gcry_m.group(1)) >= 1.7
-
-def getDefaultCaptureInterface():
- '''Choose a default capture interface for our platform. Currently Windows only.'''
- global capture_interface
- if capture_interface:
- return
- if cmd_dumpcap is None:
- return
- if not sys.platform.startswith('win32'):
- return
- try:
- dumpcap_d_data = subprocess.check_output((cmd_dumpcap, '-D'), stderr=subprocess.PIPE)
- dumpcap_d_stdout = dumpcap_d_data.decode('UTF-8', 'replace')
- for d_line in dumpcap_d_stdout.splitlines():
- iface_m = re.search('(\d+)\..*(Ethernet|Network Connection|VMware|Intel)', d_line)
- if iface_m:
- capture_interface = iface_m.group(1)
- break
- except:
- pass
-
-def getPingCommand():
- '''Return an argument list required to ping www.wireshark.org for 60 seconds.'''
- global args_ping
- # XXX The shell script tests swept over packet sizes from 1 to 240 every 0.25 seconds.
- if sys.platform.startswith('win32'):
- # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
- args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
- elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
- args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
- elif sys.platform.startswith('darwin'):
- args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
- # XXX Other BSDs, Solaris, etc
-
-def setProgramPath(path):
- global program_path
- program_path = path
- retval = True
- dotexe = ''
- if sys.platform.startswith('win32'):
- dotexe = '.exe'
- for cmd in commands:
- cmd_var = 'cmd_' + cmd
- cmd_path = os.path.normpath(os.path.join(path, cmd + dotexe))
- if not os.path.exists(cmd_path) or not os.access(cmd_path, os.X_OK):
- cmd_path = None
- program_path = None
- retval = False
- globals()[cmd_var] = cmd_path
- getTsharkInfo()
- getDefaultCaptureInterface()
- setUpHostFiles()
- return retval
-
-def baseEnv(home=None):
- """A modified environment to ensure reproducible tests."""
- env = os.environ.copy()
- env['TZ'] = 'UTC'
- home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
- if home:
- env[home_env] = home
- else:
- # This directory is supposed not to be written and is used by
- # "readonly" tests that do not read any other preferences.
- env[home_env] = "/wireshark-tests-unused"
- return env
-
-def setUpTestEnvironment():
- global home_path
- global conf_path
- global custom_profile_path
- global test_env
-
- # Create our directories
- test_confdir = tempfile.mkdtemp(prefix='wireshark-tests.')
- home_path = os.path.join(test_confdir, 'home')
- if sys.platform.startswith('win32'):
- conf_path = os.path.join(home_path, 'Wireshark')
- else:
- conf_path = os.path.join(home_path, '.config', 'wireshark')
- os.makedirs(conf_path)
- # Test spaces while we're here.
- custom_profile_path = os.path.join(conf_path, 'profiles', custom_profile_name)
- os.makedirs(custom_profile_path)
-
- # Populate our UAT files
- uat_files = [
- '80211_keys',
- 'dtlsdecrypttablefile',
- 'esp_sa',
- 'ssl_keys',
- 'c1222_decryption_table',
- 'ikev1_decryption_table',
- 'ikev2_decryption_table',
- ]
- for uat in uat_files:
- setUpUatFile(conf_path, uat)
-
- # Set up our environment
- test_env = baseEnv(home=home_path)
- test_env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = 'True'
- test_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True'
-
-def setUpUatFile(conf_path, conf_file):
- template = os.path.join(os.path.dirname(__file__), 'config', conf_file) + '.tmpl'
- with open(template, 'r') as tplt_fd:
- tplt_contents = tplt_fd.read()
- tplt_fd.close()
- key_dir_path = os.path.join(key_dir, '')
- # uat.c replaces backslashes...
- key_dir_path = key_dir_path.replace('\\', '\\x5c')
- cf_contents = tplt_contents.replace('TEST_KEYS_DIR', key_dir_path)
- out_file = os.path.join(conf_path, conf_file)
- with open(out_file, 'w') as cf_fd:
- cf_fd.write(cf_contents)
- cf_fd.close()
-
-def setUpHostFiles():
- global program_path
- global conf_path
- global custom_profile_path
- if program_path is None:
- return
- if conf_path is None or custom_profile_path is None:
- setUpTestEnvironment()
- bundle_path = os.path.join(program_path, 'Wireshark.app', 'Contents', 'MacOS')
- if os.path.isdir(bundle_path):
- global_path = bundle_path
- else:
- global_path = program_path
- hosts_path_pfx = os.path.join(this_dir, 'hosts.')
- shutil.copyfile(hosts_path_pfx + 'global', os.path.join(global_path, 'hosts'))
- shutil.copyfile(hosts_path_pfx + 'personal', os.path.join(conf_path, 'hosts'))
- shutil.copyfile(hosts_path_pfx + 'custom', os.path.join(custom_profile_path, 'hosts'))
-
-if sys.platform.startswith('win32') or sys.platform.startswith('darwin'):
- can_capture = True
-
-# Initialize ourself.
-getPingCommand()
diff --git a/test/fixtures_ws.py b/test/fixtures_ws.py
index 55e3e2151c..053d89e0ce 100644
--- a/test/fixtures_ws.py
+++ b/test/fixtures_ws.py
@@ -17,7 +17,6 @@ import tempfile
import types
import fixtures
-import config
import subprocesstest
@@ -126,15 +125,14 @@ def wireshark_command(cmd_wireshark):
@fixtures.fixture(scope='session')
-def features(cmd_tshark):
+def features(cmd_tshark, make_env):
'''Returns an object describing available features in tshark.'''
try:
- # XXX stop using config
tshark_v = subprocess.check_output(
(cmd_tshark, '--version'),
stderr=subprocess.PIPE,
universal_newlines=True,
- env=config.baseEnv()
+ env=make_env()
)
tshark_v = re.sub(r'\s+', ' ', tshark_v)
except subprocess.CalledProcessError as ex:
@@ -190,14 +188,28 @@ def conf_path(home_path):
return conf_path
+@fixtures.fixture(scope='session')
+def make_env():
+ """A factory for a modified environment to ensure reproducible tests."""
+ def make_env_real(home=None):
+ env = os.environ.copy()
+ env['TZ'] = 'UTC'
+ home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
+ if home:
+ env[home_env] = home
+ else:
+ # This directory is supposed not to be written and is used by
+ # "readonly" tests that do not read any other preferences.
+ env[home_env] = "/wireshark-tests-unused"
+ return env
+ return make_env_real
+
+
@fixtures.fixture
-def base_env(home_path, request):
+def base_env(home_path, make_env, request):
"""A modified environment to ensure reproducible tests. Tests can modify
this environment as they see fit."""
- env = os.environ.copy()
- env['TZ'] = 'UTC'
- home_env = 'APPDATA' if sys.platform.startswith('win32') else 'HOME'
- env[home_env] = home_path
+ env = make_env(home=home_path)
# Remove this if test instances no longer inherit from SubprocessTestCase?
if isinstance(request.instance, subprocesstest.SubprocessTestCase):
@@ -207,7 +219,7 @@ def base_env(home_path, request):
@fixtures.fixture
-def test_env(base_env, conf_path, request):
+def test_env(base_env, conf_path, request, dirs):
'''A process environment with a populated configuration directory.'''
# Populate our UAT files
uat_files = [
@@ -219,9 +231,16 @@ def test_env(base_env, conf_path, request):
'ikev1_decryption_table',
'ikev2_decryption_table',
]
+ # uat.c replaces backslashes...
+ key_dir_path = os.path.join(dirs.key_dir, '').replace('\\', '\\x5c')
for uat in uat_files:
- # XXX stop using config
- config.setUpUatFile(conf_path, uat)
+ template_file = os.path.join(dirs.config_dir, uat + '.tmpl')
+ out_file = os.path.join(conf_path, uat)
+ with open(template_file, 'r') as f:
+ template_contents = f.read()
+ cf_contents = template_contents.replace('TEST_KEYS_DIR', key_dir_path)
+ with open(out_file, 'w') as f:
+ f.write(cf_contents)
env = base_env
env['WIRESHARK_RUN_FROM_BUILD_DIRECTORY'] = '1'
diff --git a/test/subprocesstest.py b/test/subprocesstest.py
index 060ebeb3a4..9f6b2f001f 100644
--- a/test/subprocesstest.py
+++ b/test/subprocesstest.py
@@ -23,8 +23,6 @@ import unittest
# - Add a subprocesstest.SkipUnlessCapture decorator?
# - Try to catch crashes? See the comments below in waitProcess.
-# XXX This should probably be in config.py and settable from
-# the command line.
process_timeout = 300 # Seconds
def cat_dhcp_command(mode):
@@ -33,7 +31,8 @@ def cat_dhcp_command(mode):
sd_cmd = ''
if sys.executable:
sd_cmd = '"{}" '.format(sys.executable)
- sd_cmd += os.path.join(config.this_dir, 'util_dump_dhcp_pcap.py ' + mode)
+ this_dir = os.path.dirname(__file__)
+ sd_cmd += os.path.join(this_dir, 'util_dump_dhcp_pcap.py ' + mode)
return sd_cmd
class LoggingPopen(subprocess.Popen):
@@ -256,9 +255,6 @@ class SubprocessTestCase(unittest.TestCase):
# fixture (via a test method parameter or class decorator).
assert not (env is None and hasattr(self, '_fixture_request')), \
"Decorate class with @fixtures.mark_usefixtures('test_env')"
- if env is None:
- # Avoid using the test user's real environment by default.
- env = config.test_env
proc = LoggingPopen(proc_args, stdin=stdin, env=env, shell=shell, log_fd=self.log_fd)
self.processes.append(proc)
return proc
diff --git a/test/suite_capture.py b/test/suite_capture.py
index a9ac1aa9c0..1eaf4ad621 100644
--- a/test/suite_capture.py
+++ b/test/suite_capture.py
@@ -9,7 +9,6 @@
#
'''Capture tests'''
-import config
import glob
import os
import subprocess
@@ -24,20 +23,48 @@ capture_duration = 5
testout_pcap = 'testout.pcap'
snapshot_len = 96
-def start_pinging(self):
- ping_procs = []
+@fixtures.fixture
+def traffic_generator():
+ '''
+ Traffic generator factory. Invoking it returns a tuple (start_func, cfilter)
+ where cfilter is a capture filter to match the generated traffic.
+ start_func can be invoked to start generating traffic and returns a function
+ which can be used to stop traffic generation early.
+ Currently calls ping www.wireshark.org for 60 seconds.
+ '''
+ # XXX replace this by something that generates UDP traffic to localhost?
+ # That would avoid external access which is forbidden by the Debian policy.
+ nprocs = 1
if sys.platform.startswith('win32'):
- # Fake '-i' with a subsecond interval.
- for st in (0.1, 0.1, 0):
- ping_procs.append(self.startProcess(config.args_ping))
- time.sleep(st)
+ # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping
+ args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org')
+ nprocs = 3
+ elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'):
+ args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org')
+ elif sys.platform.startswith('darwin'):
+ args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org')
else:
- ping_procs.append(self.startProcess(config.args_ping))
- return ping_procs
-
-def stop_pinging(ping_procs):
- for proc in ping_procs:
- proc.kill()
+ # XXX Other BSDs, Solaris, etc
+ fixtures.skip('ping utility is unavailable - cannot generate traffic')
+ procs = []
+ def kill_processes():
+ for proc in procs:
+ proc.kill()
+ for proc in procs:
+ proc.wait()
+ procs.clear()
+ def start_processes():
+ for i in range(nprocs):
+ if i > 0:
+ # Fake subsecond interval if the ping utility lacks support.
+ time.sleep(0.1)
+ proc = subprocess.Popen(args_ping, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
+ procs.append(proc)
+ return kill_processes
+ try:
+ yield start_processes, 'icmp || icmp6'
+ finally:
+ kill_processes()
@fixtures.fixture(scope='session')
@@ -54,14 +81,12 @@ def capture_command(*cmd_args, shell=False):
@fixtures.fixture
-def check_capture_10_packets(capture_interface, cmd_dumpcap):
+def check_capture_10_packets(capture_interface, cmd_dumpcap, traffic_generator):
+ start_traffic, cfilter = traffic_generator
def check_capture_10_packets_real(self, cmd=None, to_stdout=False):
- # Similar to suite_io.check_io_4_packets.
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
self.assertIsNotNone(cmd)
testout_file = self.filename_from_id(testout_pcap)
- ping_procs = start_pinging(self)
+ stop_traffic = start_traffic()
if to_stdout:
capture_proc = self.runProcess(capture_command(cmd,
'-i', '"{}"'.format(capture_interface),
@@ -69,7 +94,7 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap):
'-w', '-',
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
- '-f', '"icmp || icmp6"',
+ '-f', '"{}"'.format(cfilter),
'>', testout_file,
shell=True
),
@@ -82,10 +107,10 @@ def check_capture_10_packets(capture_interface, cmd_dumpcap):
'-w', testout_file,
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
- '-f', 'icmp || icmp6',
+ '-f', cfilter,
))
+ stop_traffic()
capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
if capture_returncode != 0:
self.log_fd.write('{} -D output:\n'.format(cmd))
self.runProcess((cmd, '-D'))
@@ -160,13 +185,12 @@ def check_capture_stdin(cmd_dumpcap):
@fixtures.fixture
-def check_capture_read_filter(capture_interface):
+def check_capture_read_filter(capture_interface, traffic_generator):
+ start_traffic, cfilter = traffic_generator
def check_capture_read_filter_real(self, cmd=None):
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
self.assertIsNotNone(cmd)
- ping_procs = start_pinging(self)
testout_file = self.filename_from_id(testout_pcap)
+ stop_traffic = start_traffic()
capture_proc = self.runProcess(capture_command(cmd,
'-i', capture_interface,
'-p',
@@ -175,10 +199,10 @@ def check_capture_read_filter(capture_interface):
'-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
'-c', '10',
'-a', 'duration:{}'.format(capture_duration),
- '-f', 'icmp || icmp6',
+ '-f', cfilter,
))
+ stop_traffic()
capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
self.assertEqual(capture_returncode, 0)
if (capture_returncode == 0):
@@ -186,12 +210,11 @@ def check_capture_read_filter(capture_interface):
return check_capture_read_filter_real
@fixtures.fixture
-def check_capture_snapshot_len(capture_interface, cmd_tshark):
+def check_capture_snapshot_len(capture_interface, cmd_tshark, traffic_generator):
+ start_traffic, cfilter = traffic_generator
def check_capture_snapshot_len_real(self, cmd=None):
- if not config.args_ping:
- self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
self.assertIsNotNone(cmd)
- ping_procs = start_pinging(self)
+ stop_traffic = start_traffic()
testout_file = self.filename_from_id(testout_pcap)
capture_proc = self.runProcess(capture_command(cmd,
'-i', capture_interface,
@@ -199,10 +222,10 @@ def check_capture_snapshot_len(capture_interface, cmd_tshark):
'-w', testout_file,
'-s', str(snapshot_len),
'-a', 'duration:{}'.format(capture_duration),
- '-f', 'icmp || icmp6',
+ '-f', cfilter,
))
+ stop_traffic()
capture_returncode = capture_proc.returncode
- stop_pinging(ping_procs)
self.assertEqual(capture_returncode, 0)
self.assertTrue(os.path.isfile(testout_file))