diff options
-rw-r--r-- | CMakeLists.txt | 34 | ||||
-rwxr-xr-x | debian/rules | 8 | ||||
-rw-r--r-- | docbook/developer-guide.asciidoc | 2 | ||||
-rw-r--r-- | docbook/wsdg_src/WSDG_chapter_tests.asciidoc | 170 | ||||
-rw-r--r-- | test/README.test | 88 | ||||
-rw-r--r-- | test/config.py | 186 | ||||
-rw-r--r-- | test/subprocesstest.py | 206 | ||||
-rwxr-xr-x | test/suite-decryption.sh | 32 | ||||
-rw-r--r-- | test/suite_capture.py | 321 | ||||
-rw-r--r-- | test/suite_clopts.py | 176 | ||||
-rw-r--r-- | test/suite_decryption.py | 496 | ||||
-rw-r--r-- | test/suite_dissection.py | 30 | ||||
-rwxr-xr-x | test/test.py | 120 | ||||
-rw-r--r-- | test/util_slow_dhcp_pcap.py | 23 |
14 files changed, 1802 insertions, 90 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 6f57c882c7..1da14aae24 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3134,7 +3134,7 @@ if (DOXYGEN_EXECUTABLE) ) endif(DOXYGEN_EXECUTABLE) -# Test suite wrapper +# Old test suite wrapper if(ENABLE_APPLICATION_BUNDLE) set(TEST_SH_BIN_DIR ${CMAKE_BINARY_DIR}/run) else() @@ -3168,6 +3168,38 @@ set_target_properties(test-programs PROPERTIES EXCLUDE_FROM_DEFAULT_BUILD True ) +# Test suites +enable_testing() +file(GLOB _test_suite_py_list test/suite_*.py) +if(WIN32) + set(_test_suite_program_path ./run/$<CONFIG>) +else() + set(_test_suite_program_path ./run) +endif() + +# We currently don't handle spaces in arguments. On Windows this +# means that you will probably have to pass in an interface index +# instead of a name. +set(TEST_EXTRA_ARGS "" CACHE STRING "Extra arguments to pass to test/test.py") +separate_arguments(TEST_EXTRA_ARGS) + +# We can enumerate suites two ways: by probing the filesystem and by +# running `test.py --list-suites`. Probe the filesystem for now, which +# should hopefully give us enough parallelization. If we want to split +# our tests by cases or individual tests we'll have to run and parse +# `test.py --list-cases` or `test.py --list` respectively. +foreach(_suite_py ${_test_suite_py_list}) + get_filename_component(_suite_name ${_suite_py} NAME_WE) + add_test( + NAME ${_suite_name} + COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/test/test.py + --program-path ${_test_suite_program_path} + ${TEST_EXTRA_ARGS} + ${_suite_name} + ) + set_tests_properties(${_suite_name} PROPERTIES TIMEOUT 600) +endforeach() + if (GIT_EXECUTABLE) # Update AUTHORS file with entries from git shortlog add_custom_target( diff --git a/debian/rules b/debian/rules index 3d1c3eaed2..ced9cb339d 100755 --- a/debian/rules +++ b/debian/rules @@ -61,3 +61,11 @@ override_dh_fixperms: debian/wireshark-dev/usr/share/pyshared/wireshark_be.py \ debian/wireshark-dev/usr/share/pyshared/wireshark_gen.py +# Adapted from https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=861988 +override_dh_auto_test: +ifeq ($(filter $(DEB_BUILD_OPTIONS),nocheck),) + # XXX Add -- --verbose? + dh_auto_test +else + @echo '"DEB_BUILD_OPTIONS" has "nocheck". Skipping tests' +endif diff --git a/docbook/developer-guide.asciidoc b/docbook/developer-guide.asciidoc index 468e42874c..c89bd0c606 100644 --- a/docbook/developer-guide.asciidoc +++ b/docbook/developer-guide.asciidoc @@ -56,4 +56,6 @@ include::wsluarm.asciidoc[] include::wsdg_src/WSDG_chapter_userinterface.asciidoc[] +include::wsdg_src/WSDG_chapter_tests.asciidoc[] + include::common_src/GPL_appendix.asciidoc[] diff --git a/docbook/wsdg_src/WSDG_chapter_tests.asciidoc b/docbook/wsdg_src/WSDG_chapter_tests.asciidoc new file mode 100644 index 0000000000..249e5a416d --- /dev/null +++ b/docbook/wsdg_src/WSDG_chapter_tests.asciidoc @@ -0,0 +1,170 @@ +// WSDG Chapter Setup + +[[ChapterTests]] +== Wireshark Tests + +The Wireshark sources include a collection of Python scripts that test +the features of Wireshark, TShark, Dumpcap, and other programs that +accompany Wireshark. + +The command line options of Wireshark and its companion command line +tools are numerous. These tests help to ensure that we don't introduce +bugs as Wireshark grows and evolves. + +=== Quick Start + +The main testing script is `test.py`. It will attempt to test as much as +possible by default, including packet capture. This means that you will +probably either have to supply a capture interface (`--capture-interface +<interface>`) or disable capture tests (`--disable-capture`). + +To run all tests from CMake do the following: +* Pass `-DTEST_EXTRA_ARGS=--disable-capture` or + `-DTEST_EXTRA_ARGS=--capture-interface=<interface>` + as needed for your system. +* Build the “test” target or run ctest, e.g. `ctest --jobs=4 --verbose`. + +To run all tests directly, run `test.py -p +/path/to/wireshark-build/run-directory <capture args>`. + +To see a list of all options, run `test.py -h` or `test.py --help`. + +To see a list of all tests, run `test.py -l`. + +=== Test Coverage And Availability + +The testing framework can run programs and check their stdout, stderr, +and exit codes. It cannot interact with the Wireshark UI. Tests cover +capture, command line options, decryption, file format support and +conversion, Lua scripting, and other functionality. + +Available tests depend on the libraries with which Wireshark was built. +For example, some decryption tests depend on a minimum version of +Libgcrypt and Lua tests depend on Lua. + +Capture tests depend on the permissions of the user running the test +script. We assume that the test user has capture permissions on Windows +and macOS and capture tests are enabled by default on those platforms. + +=== Suites, Cases, and Tests + +The `test.py` script uses Python's “unittest” module. Our tests are +patterned after it, and individual tests are organized according to +suites, cases, and individual tests. Suites correspond to python modules +that match the pattern “suite_*.py”. Cases correspond to one or more +classes in each module, and case class methods matching the pattern +”test_*” correspond to individual tests. For example, the invalid +capture filter test in the TShark capture command line options test case +in the command line options suite has the ID +“suite_clopts.case_tshark_capture_clopts.test_tshark_invalid_capfilter”. + +=== Listing And Running Tests + +Tests can be run via the `test.py` Python script. To run all tests, +either run `test.py` in the directory that contains the Wireshark +executables (`wireshark`, `tshark`, etc.), or pass the the executable +path via the `-p` flag: + +[source,sh] +---- +$ python test.py -p /path/to/wireshark-build/run +---- + +You can list tests by passing one or more complete or partial names to +`tshark.py`. The `-l` flag lists tests. By default all tests are shown. + +[source,sh] +---- +# List all tests +$ python test.py -l +$ python test.py -l all +$ python test.py --list +$ python test.py --list all + +# List only tests containing "dumpcap" +$ python test.py -l dumpcap + +# List all suites +$ python test.py --list-suites + +# List all suites and cases +$ python test.py --list-cases +---- + +If one of the listing flags is not present, tests are run. If no names or `all` is supplied, +all tests are run. Otherwise tests that match are run. + +[source,sh] +---- +# Run all tests +$ python test.py +$ python test.py all + +# Only run tests containing "dumpcap" +$ python test.py -l dumpcap + +# Run the "clopts" suite +$ python test.py suite_clopts +---- + +=== Adding Or Modifying Tests + +Tests must be in a Python module whose name matches “suite_*.py”. The +module must contain one or more subclasses of “SubprocessTestCase” or +“unittest.TestCase”. “SubprocessTestCase” is recommended since it +contains several convenience methods for running processes, checking +output, and displaying error information. Each test case method +whose name starts with “test_” constitutes an individual test. + +Success or failure conditions can be signalled using the +“unittest.assertXXX()” or “subprocesstest.assertXXX()” methods. + +The “config” module contains common configuration information which has +been derived from the current environment or specified on the command +line. + +The “subprocesstest” class contains the following methods for running +processes. Stdout and stderr is written to “<test id>.log”: + +startProcess:: Start a process without waiting for it to finish. +runProcess:: Start a process and wait for it to finish. +assertRun:: Start a process, wait for it to finish, and check its exit code. + +All of the current tests run one or more of Wireshark's suite of +executables and either checks their return code or their output. A +simple example is “suite_clopts.case_basic_clopts.test_existing_file”, +which reads a capture file using TShark and checks its exit code. + +[source,python] +---- +import config +import subprocesstest + +class case_basic_clopts(subprocesstest.SubprocessTestCase): + def test_existing_file(self): + cap_file = os.path.join(self.capture_dir, 'dhcp.pcap') + self.assertRun((config.cmd_tshark, '-r', cap_file)) +---- + +Program output can be checked using “subprocesstest.grepOutput” +or “subprocesstest.countOutput”: + +[source,python] +---- +import config +import subprocesstest + +class case_decrypt_80211(subprocesstest.SubprocessTestCase): + def test_80211_wpa_psk(self): + capture_file = os.path.join(config.capture_dir, 'wpa-Induction.pcap.gz') + self.runProcess((config.cmd_tshark, + '-o', 'wlan.enable_decryption: TRUE', + '-Tfields', + '-e', 'http.request.uri', + '-r', capture_file, + '-Y', 'http', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('favicon.ico')) +---- + diff --git a/test/README.test b/test/README.test index 068d7d8816..efc8f1ea1b 100644 --- a/test/README.test +++ b/test/README.test @@ -1,79 +1,21 @@ -What is it? ------------ -This is a collection of bash scripts which test the features of: +Wireshark Tests - - Wireshark - - TShark - - Dumpcap +The main testing script is `test.py`. It will attempt to test as much as +possible by default, including packet capture. This means that you will +probably either have to supply a capture interface (`--capture-interface +<interface>`) or disable capture tests (`--disable-capture`). -Motivation ----------- +To run all tests from CMake do the following: +- Pass `-DTEST_EXTRA_ARGS=--disable-capture` or + `-DTEST_EXTRA_ARGS=--capture-interface=<interface>` + as needed for your system. +- Build the “test” target or run ctest, e.g. `ctest --jobs=4 --verbose`. -The command line options of Wireshark and the companion command line tools are -numerous. This makes it hard to find newly introduced bugs doing manual testing -(try and error) with source code changes. +To run all tests directly, run `test.py -p +/path/to/wireshark-build/run-directory <capture args>`. -The current way is to do some changes, testing some scenarios by hand and -commit the code so other users will complain about new problems. This obviously -is far from being optimal. +To see a list of all options, run `test.py -h` or `test.py --help`. -Limitations ------------ +To see a list of all tests, run `test.py -l`. -The test set currently provided will only do some basic tests, but even that -is far better than nothing. This may involve in time as new tests can be added -to fix problems reported by users. This will hopefully lead to a "complete" -and reliable testset in the future. - -The tests are limited to command line tests, other things like unit tests or -GUI test are not included. - -Prerequisites -------------- - -What you'll need (to do): - - - edit the file config.sh to suit your configuration - - build the "all" target - - build the "test-programs" target - - have a bash (cygwin should do well) - - have tput (e.g. in the cygwin ncurses package) - - you'll need a network interface with some network traffic - (so you can run the capture tests) - - (for non-Windows platforms) An X server for running the capture tests with - the graphical Wireshark program. - -A Test Ride ------------ - -The default configuration might not be suitable for your set-up. Most settings -can be adjusted by setting an environment variable matching or by editing the -setting in config.sh. - -For instance, the first network interface might not be used for traffic (like an -unconnected Ethernet port). In that case, you might want to set the environment -variable TRAFFIC_CAPTURE_IFACE to pick another interface. Use `dumpcap -D` to -get a list of devices. - -On Windows, it is assumed that the user is able to perform captures. On -non-Windows platforms, the opposite is assumed. If your dumpcap executable -allows you to perform captures (for example, when it has appropriate -capabilities), then you can override the default with: - - SKIP_CAPTURE=0 - -If you do not want to test the binaries in the build directory, you can override -it with: - - WS_BIN_PATH=/usr/bin - -When your configuration is sane, you can start test.sh which should provide a -basic menu. Just press Enter to start all tests. - -It should start all the available tests. Each test will throw out a line -which should end with a green "Ok". If one of the tests fail, the script -will report it and stop at this test step. - -Please remember to have some ICMP traffic on your network interface! The test -suite will ping to www.wireshark.org while running capture tests, but this will -slow down the tests. +See the “Wireshark Tests” chapter of the Developer's Guide for details. diff --git a/test/config.py b/test/config.py new file mode 100644 index 0000000000..78c2bffd4e --- /dev/null +++ b/test/config.py @@ -0,0 +1,186 @@ +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Configuration''' + +import os +import os.path +import re +import subprocess +import sys +import tempfile + +commands = ( + 'dumpcap', + 'tshark', + 'wireshark', + 'capinfos', +) + +can_capture = False +capture_interface = None + +# Our executables +# Strings +cmd_tshark = None +cmd_dumpcap = None +cmd_wireshark = None +cmd_capinfos = None +# Arrays +args_ping = None + +have_lua = False +have_nghttp2 = False +have_kerberos = False +have_libgcrypt17 = False + +test_env = None +home_path = None +conf_path = None +this_dir = os.path.dirname(__file__) +baseline_dir = os.path.join(this_dir, 'baseline') +capture_dir = os.path.join(this_dir, 'captures') +config_dir = os.path.join(this_dir, 'config') +key_dir = os.path.join(this_dir, 'keys') +lua_dir = os.path.join(this_dir, 'lua') + +def canCapture(): + return can_capture and capture_interface is not None + +def setCanCapture(new_cc): + can_capture = new_cc + +def setCaptureInterface(iface): + global capture_interface + capture_interface = iface + +def canMkfifo(): + return not sys.platform.startswith('win32') + +def canDisplay(): + if sys.platform.startswith('win32') or sys.platform.startswith('darwin'): + return True + # Qt requires XKEYBOARD and Xrender, which Xvnc doesn't provide. + return False + +def getTsharkInfo(): + global have_lua + global have_nghttp2 + global have_kerberos + global have_libgcrypt17 + have_lua = False + have_nghttp2 = False + have_kerberos = False + have_libgcrypt17 = False + try: + tshark_v_blob = str(subprocess.check_output((cmd_tshark, '--version'), stderr=subprocess.PIPE)) + tshark_v = ' '.join(tshark_v_blob.splitlines()) + if re.search('with +Lua', tshark_v): + have_lua = True + if re.search('with +nghttp2', tshark_v): + have_nghttp2 = True + if re.search('(with +MIT +Kerberos|with +Heimdal +Kerberos)', tshark_v): + have_kerberos = True + gcry_m = re.search('with +Gcrypt +([0-9]+\.[0-9]+)', tshark_v) + have_libgcrypt = gcry_m and float(gcry_m.group(1)) >= 1.7 + except: + pass + +def getDefaultCaptureInterface(): + '''Choose a default capture interface for our platform. Currently Windows only.''' + global capture_interface + if capture_interface: + return + if cmd_dumpcap is None: + return + if not sys.platform.startswith('win32'): + return + try: + dumpcap_d = subprocess.check_output((cmd_dumpcap, '-D'), stderr=subprocess.PIPE) + for d_line in dumpcap_d.splitlines(): + iface_m = re.search('(\d+)\..*(Ethernet|Network Connection|VMware|Intel)', d_line) + if iface_m: + capture_interface = iface_m.group(1) + break + except: + pass + +def getPingCommand(): + '''Return an argument list required to ping www.wireshark.org for 60 seconds.''' + global args_ping + # XXX The shell script tests swept over packet sizes from 1 to 240 every 0.25 seconds. + if sys.platform.startswith('win32'): + # XXX Check for psping? https://docs.microsoft.com/en-us/sysinternals/downloads/psping + args_ping = ('ping', '-n', '60', '-l', '100', 'www.wireshark.org') + elif sys.platform.startswith('linux') or sys.platform.startswith('freebsd'): + args_ping = ('ping', '-c', '240', '-s', '100', '-i', '0.25', 'www.wireshark.org') + elif sys.platform.startswith('darwin'): + args_ping = ('ping', '-c', '1', '-g', '1', '-G', '240', '-i', '0.25', 'www.wireshark.org') + # XXX Other BSDs, Solaris, etc + +def setProgramPath(path): + global program_path + program_path = path + retval = True + dotexe = '' + if sys.platform.startswith('win32'): + dotexe = '.exe' + for cmd in commands: + cmd_var = 'cmd_' + cmd + cmd_path = os.path.join(path, cmd + dotexe) + if not os.path.exists(cmd_path) or not os.access(cmd_path, os.X_OK): + cmd_path = None + retval = False + globals()[cmd_var] = cmd_path + getTsharkInfo() + getDefaultCaptureInterface() + return retval + +def testEnvironment(): + return test_env + +def setUpTestEnvironment(): + global home_path + global conf_path + global test_env + test_confdir = tempfile.mkdtemp(prefix='wireshark-tests.') + home_path = os.path.join(test_confdir, 'home') + if sys.platform.startswith('win32'): + home_env = 'APPDATA' + conf_path = os.path.join(home_path, 'Wireshark') + else: + home_env = 'HOME' + conf_path = os.path.join(home_path, '.config', 'wireshark') + os.makedirs(conf_path) + test_env = os.environ.copy() + test_env[home_env] = home_path + +def setUpConfigFile(conf_file): + global home_path + global conf_path + if home_path is None or conf_path is None: + setUpTestEnvironment() + template = os.path.join(os.path.dirname(__file__), 'config', conf_file) + '.tmpl' + with open(template, 'r') as tplt_fd: + tplt_contents = tplt_fd.read() + tplt_fd.close() + key_dir_path = os.path.join(key_dir, '') + # uat.c replaces backslashes... + key_dir_path = key_dir_path.replace('\\', '\\x5c') + cf_contents = tplt_contents.replace('TEST_KEYS_DIR', key_dir_path) + out_file = os.path.join(conf_path, conf_file) + with open(out_file, 'w') as cf_fd: + cf_fd.write(cf_contents) + cf_fd.close() + +if sys.platform.startswith('win32') or sys.platform.startswith('darwin'): + can_capture = True + +# Initialize ourself. +getPingCommand() +setProgramPath(os.path.curdir) diff --git a/test/subprocesstest.py b/test/subprocesstest.py new file mode 100644 index 0000000000..18313db795 --- /dev/null +++ b/test/subprocesstest.py @@ -0,0 +1,206 @@ +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Subprocess test case superclass''' + +import io +import os +import os.path +import re +import subprocess +import sys +import unittest + +# To do: +# - Add a subprocesstest.SkipUnlessCapture decorator? +# - Try to catch crashes? See the comments below in waitProcess. + +# XXX This should probably be in config.py and settable from +# the command line. +if sys.version_info[0] >= 3: + process_timeout = 300 # Seconds + +class LoggingPopen(subprocess.Popen): + '''Run a process using subprocess.Popen. Capture and log its output. + + Stdout and stderr are captured to memory and decoded as UTF-8. The + program command and output is written to log_fd. + ''' + def __init__(self, proc_args, *args, **kwargs): + self.log_fd = kwargs.pop('log_fd', None) + kwargs['stdout'] = subprocess.PIPE + kwargs['stderr'] = subprocess.PIPE + # Make sure communicate() gives us bytes. + kwargs['universal_newlines'] = False + self.cmd_str = 'command ' + repr(proc_args) + super(LoggingPopen, self).__init__(proc_args, *args, **kwargs) + self.stdout_str = '' + self.stderr_str = '' + + def wait_and_log(self): + '''Wait for the process to finish and log its output.''' + # Wherein we navigate the Python 2 and 3 Unicode compatibility maze. + if sys.version_info[0] >= 3: + out_data, err_data = self.communicate(timeout=process_timeout) + out_log = out_data.decode('UTF-8', 'replace') + err_log = err_data.decode('UTF-8', 'replace') + else: + out_data, err_data = self.communicate() + out_log = unicode(out_data, 'UTF-8', 'replace') + err_log = unicode(err_data, 'UTF-8', 'replace') + # Throwing a UnicodeDecodeError exception here is arguably a good thing. + self.stdout_str = out_data.decode('UTF-8', 'strict') + self.stderr_str = err_data.decode('UTF-8', 'strict') + self.log_fd.flush() + self.log_fd.write(u'-- Begin stdout for {} --\n'.format(self.cmd_str)) + self.log_fd.write(out_log) + self.log_fd.write(u'-- End stdout for {} --\n'.format(self.cmd_str)) + self.log_fd.write(u'-- Begin stderr for {} --\n'.format(self.cmd_str)) + self.log_fd.write(err_log) + self.log_fd.write(u'-- End stderr for {} --\n'.format(self.cmd_str)) + self.log_fd.flush() + + def stop_process(self, kill=False): + '''Stop the process immediately.''' + if kill: + super(LoggingPopen, self).kill() + else: + super(LoggingPopen, self).terminate() + + def terminate(self): + '''Terminate the process. Do not log its output.''' + # XXX Currently unused. + self.stop_process(kill=False) + + def kill(self): + '''Kill the process. Do not log its output.''' + self.stop_process(kill=True) + +class SubprocessTestCase(unittest.TestCase): + '''Run a program and gather its stdout and stderr.''' + + def __init__(self, *args, **kwargs): + super(SubprocessTestCase, self).__init__(*args, **kwargs) + self.exit_ok = 0 + self.exit_command_line = 1 + self.exit_error = 2 + self.exit_code = None + self.log_fname = None + self.log_fd = None + self.processes = [] + self.cleanup_files = [] + self.dump_files = [] + + def log_fd_write_bytes(self, log_data): + if sys.version_info[0] >= 3: + self.log_fd.write(log_data) + else: + self.log_fd.write(unicode(log_data, 'UTF-8', 'replace')) + + def filename_from_id(self, filename): + '''Generate a filename prefixed with our test ID.''' + return self.id() + '.' + filename + + def kill_processes(self): + '''Kill any processes we've opened so far''' + for proc in self.processes: + try: + proc.kill() + except: + pass + + def run(self, result=None): + # Subclass run() so that we can do the following: + # - Open our log file and add it to the cleanup list. + # - Check our result before and after the run so that we can tell + # if the current test was successful. + + # Probably not needed, but shouldn't hurt. + self.kill_processes() + self.processes = [] + self.log_fname = self.filename_from_id('log') + # Our command line utilities generate UTF-8. The log file endcoding + # needs to match that. + self.log_fd = io.open(self.log_fname, 'w', encoding='UTF-8') + self.cleanup_files.append(self.log_fname) + pre_run_problem_count = 0 + if result: + pre_run_problem_count = len(result.failures) + len(result.errors) + try: + super(SubprocessTestCase, self).run(result=result) + except KeyboardInterrupt: + # XXX This doesn't seem to work on Windows, which is where we need it the most. + self.kill_processes() + + # Tear down our test. We don't do this in tearDown() because Python 3 + # updates "result" after calling tearDown(). + self.kill_processes() + self.log_fd.close() + if result: + post_run_problem_count = len(result.failures) + len(result.errors) + if pre_run_problem_count != post_run_problem_count: + self.dump_files.append(self.log_fname) + # Leave some evidence behind. + self.cleanup_files = [] + print('\nProcess output for {}:'.format(self.id())) + with io.open(self.log_fname, 'r', encoding='UTF-8') as log_fd: + for line in log_fd: + sys.stdout.write(line) + for filename in self.cleanup_files: + try: + os.unlink(filename) + except OSError: + pass + self.cleanup_files = [] + + def countOutput(self, search_pat, proc=None): + '''Returns the number of output lines (search_pat=None), otherwise returns a match count.''' + match_count = 0 + if proc is None: + proc = self.processes[-1] + # We might want to let the caller decide what we're searching. + out_data = proc.stdout_str + proc.stderr_str + search_re = re.compile(search_pat) + for line in out_data.splitlines(): + if search_re.search(line): + match_count += 1 + return match_count + + def grepOutput(self, search_pat, proc=None): + return self.countOutput(search_pat, proc) > 0 + + def startProcess(self, proc_args, env=None, shell=False): + '''Start a process in the background. Returns a subprocess.Popen object. You typically wait for it using waitProcess() or assertWaitProcess().''' + proc = LoggingPopen(proc_args, env=env, shell=shell, log_fd=self.log_fd) + self.processes.append(proc) + return proc + + def waitProcess(self, process): + '''Wait for a process to finish.''' + process.wait_and_log() + # XXX The shell version ran processes using a script called run_and_catch_crashes + # which looked for core dumps and printed stack traces if found. We might want + # to do something similar here. This may not be easy on modern Ubuntu systems, + # which default to using Apport: https://wiki.ubuntu.com/Apport + + def assertWaitProcess(self, process, expected_return=0): + '''Wait for a process to finish and check its exit code.''' + process.wait_and_log() + self.assertEqual(process.returncode, expected_return) + + def runProcess(self, args, env=None, shell=False): + '''Start a process and wait for it to finish.''' + process = self.startProcess(args, env=env, shell=shell) + process.wait_and_log() + return process + + def assertRun(self, args, env=None, shell=False, expected_return=0): + '''Start a process and wait for it to finish. Check its return code.''' + process = self.runProcess(args, env=env, shell=shell) + self.assertEqual(process.returncode, expected_return) + return process diff --git a/test/suite-decryption.sh b/test/suite-decryption.sh index 7f24143567..f81c2bec2c 100755 --- a/test/suite-decryption.sh +++ b/test/suite-decryption.sh @@ -183,22 +183,6 @@ decryption_step_udt_dtls() { test_step_ok } -# IPsec ESP -# https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12671 -decryption_step_ipsec_esp() { - $TESTS_DIR/run_and_catch_crashes env $TS_DC_ENV $TSHARK $TS_DC_ARGS \ - -o "esp.enable_encryption_decode: TRUE" \ - -Tfields -e data.data \ - -r "$CAPTURE_DIR/esp-bug-12671.pcapng.gz" -Y data \ - | grep "08:09:0a:0b:0c:0d:0e:0f:10:11:12:13:14:15:16:17" > /dev/null 2>&1 - RETURNVALUE=$? - if [ ! $RETURNVALUE -eq $EXIT_OK ]; then - test_step_failed "Failed to decrypt DTLS" - return - fi - test_step_ok -} - # SSL, using the server's private key # https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=view&target=snakeoil2_070531.tgz decryption_step_ssl() { @@ -419,6 +403,22 @@ decryption_step_dvb_ci() { test_step_ok } +# IPsec ESP +# https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12671 +decryption_step_ipsec_esp() { + $TESTS_DIR/run_and_catch_crashes env $TS_DC_ENV $TSHARK $TS_DC_ARGS \ + -o "esp.enable_encryption_decode: TRUE" \ + -Tfields -e data.data \ + -r "$CAPTURE_DIR/esp-bug-12671.pcapng.gz" -Y data \ + | grep "08:09:0a:0b:0c:0d:0e:0f:10:11:12:13:14:15:16:17" > /dev/null 2>&1 + RETURNVALUE=$? + if [ ! $RETURNVALUE -eq $EXIT_OK ]; then + test_step_failed "Failed to decrypt DTLS" + return + fi + test_step_ok +} + # IKEv1 (ISAKMP) with certificates # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=7951 decryption_step_ikev1_certs() { diff --git a/test/suite_capture.py b/test/suite_capture.py new file mode 100644 index 0000000000..a6f7e17163 --- /dev/null +++ b/test/suite_capture.py @@ -0,0 +1,321 @@ +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Capture tests''' + +import config +import os +import re +import subprocess +import subprocesstest +import sys +import time +import unittest + +capture_duration = 5 + +testout_pcap = 'testout.pcap' +snapshot_len = 96 +capture_env = os.environ.copy() +capture_env['WIRESHARK_QUIT_AFTER_CAPTURE'] = 'True' + +def capture_command(cmd, *args, **kwargs): + shell = kwargs.pop('shell', False) + if shell: + cap_cmd = ['"' + cmd + '"'] + else: + cap_cmd = [cmd] + if cmd == config.cmd_wireshark: + cap_cmd += ('-o', 'gui.update.enabled:FALSE', '-k') + cap_cmd += args + if shell: + return ' '.join(cap_cmd) + else: + return cap_cmd + +def slow_dhcp_command(): + # XXX Do this in Python in a thread? + sd_cmd = '' + if sys.executable: + sd_cmd = sys.executable + ' ' + sd_cmd += os.path.join(config.this_dir, 'util_slow_dhcp_pcap.py') + return sd_cmd + +def start_pinging(self): + ping_procs = [] + if sys.platform.startswith('win32'): + # Fake '-i' with a subsecond interval. + for st in (0.1, 0.1, 0): + ping_procs.append(self.startProcess(config.args_ping)) + time.sleep(st) + else: + ping_procs.append(self.startProcess(config.args_ping)) + return ping_procs + +def stop_pinging(ping_procs): + for proc in ping_procs: + proc.kill() + +def check_testout_num_packets(self, num_packets, cap_file=None): + got_num_packets = False + if not cap_file: + cap_file = self.filename_from_id(testout_pcap) + self.log_fd.write(u'\nOutput of {0} {1}:\n'.format(config.cmd_capinfos, cap_file)) + capinfos_testout = str(subprocess.check_output((config.cmd_capinfos, cap_file))) + self.log_fd_write_bytes(capinfos_testout) + count_pat = 'Number of packets:\s+{}'.format(num_packets) + if re.search(count_pat, capinfos_testout): + got_num_packets = True + self.assertTrue(got_num_packets, 'Failed to capture exactly {} packets'.format(num_packets)) + +def check_capture_10_packets(self, cmd=None, to_stdout=False): + if not config.canCapture(): + self.skipTest('Test requires capture privileges and an interface.') + if cmd == config.cmd_wireshark and not config.canDisplay(): + self.skipTest('Test requires a display.') + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + testout_file = self.filename_from_id(testout_pcap) + ping_procs = start_pinging(self) + if to_stdout: + capture_proc = self.runProcess(capture_command(cmd, + '-i', '"{}"'.format(config.capture_interface), + '-p', + '-w', '-', + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', '"icmp || icmp6"', + '>', testout_file, + shell=True + ), + env=capture_env, + shell=True + ) + else: + capture_proc = self.runProcess(capture_command(cmd, + '-i', config.capture_interface, + '-p', + '-w', testout_file, + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', 'icmp || icmp6', + ), + env=capture_env + ) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + self.cleanup_files.append(testout_file) + if capture_returncode != 0: + self.log_fd.write('{} -D output:\n'.format(cmd)) + self.runProcess((cmd, '-D')) + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + check_testout_num_packets(self, 10) + +def check_capture_fifo(self, cmd=None): + if not config.canMkfifo(): + self.skipTest('Test requires OS fifo support.') + if cmd == config.cmd_wireshark and not config.canDisplay(): + self.skipTest('Test requires a display.') + self.assertIsNotNone(cmd) + capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') + testout_file = self.filename_from_id(testout_pcap) + fifo_file = self.filename_from_id('testout.fifo') + self.cleanup_files.append(fifo_file) + try: + # If a previous test left its fifo laying around, e.g. from a failure, remove it. + os.unlink(fifo_file) + except: + pass + os.mkfifo(fifo_file) + slow_dhcp_cmd = slow_dhcp_command() + fifo_proc = self.startProcess( + ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)), + shell=True) + capture_proc = self.runProcess(capture_command(cmd, + '-i', fifo_file, + '-p', + '-w', testout_file, + '-a', 'duration:{}'.format(capture_duration), + ), + env=capture_env + ) + self.cleanup_files.append(testout_file) + fifo_proc.kill() + self.assertTrue(os.path.isfile(testout_file)) + capture_returncode = capture_proc.returncode + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + check_testout_num_packets(self, 8) + +def check_capture_stdin(self, cmd=None): + if cmd == config.cmd_wireshark and not config.canDisplay(): + self.skipTest('Test requires a display.') + self.assertIsNotNone(cmd) + capture_file = os.path.join(config.capture_dir, 'dhcp.pcap') + testout_file = self.filename_from_id(testout_pcap) + slow_dhcp_cmd = slow_dhcp_command() + capture_cmd = capture_command(cmd, + '-i', '-', + '-w', testout_file, + '-a', 'duration:{}'.format(capture_duration), + shell=True + ) + if cmd == config.cmd_wireshark: + capture_cmd += ' -o console.log.level:127' + pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, env=capture_env, shell=True) + self.cleanup_files.append(testout_file) + pipe_returncode = pipe_proc.returncode + self.assertEqual(pipe_returncode, 0) + if cmd == config.cmd_wireshark: + self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.') + self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.') + self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.') + self.assertTrue(os.path.isfile(testout_file)) + if (pipe_returncode == 0): + check_testout_num_packets(self, 8) + +def check_capture_2multi_10packets(self, cmd=None): + # This was present in the Bash version but was incorrect and not part of any suite. + # It's apparently intended to test file rotation. + self.skipTest('Not yet implemented') + +def check_capture_read_filter(self, cmd=None): + if not config.canCapture(): + self.skipTest('Test requires capture privileges and an interface.') + if cmd == config.cmd_wireshark and not config.canDisplay(): + self.skipTest('Test requires a display.') + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + ping_procs = start_pinging(self) + testout_file = self.filename_from_id(testout_pcap) + capture_proc = self.runProcess(capture_command(cmd, + '-i', config.capture_interface, + '-p', + '-w', testout_file, + '-2', + '-R', 'dcerpc.cn_call_id==123456', # Something unlikely. + '-c', '10', + '-a', 'duration:{}'.format(capture_duration), + '-f', 'icmp || icmp6', + ), + env=capture_env + ) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + self.cleanup_files.append(testout_file) + self.assertEqual(capture_returncode, 0) + + if (capture_returncode == 0): + check_testout_num_packets(self, 0) + +def check_capture_snapshot_len(self, cmd=None): + if not config.canCapture(): + self.skipTest('Test requires capture privileges and an interface.') + if cmd == config.cmd_wireshark and not config.canDisplay(): + self.skipTest('Test requires a display.') + if not config.args_ping: + self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform)) + self.assertIsNotNone(cmd) + ping_procs = start_pinging(self) + testout_file = self.filename_from_id(testout_pcap) + capture_proc = self.runProcess(capture_command(cmd, + '-i', config.capture_interface, + '-p', + '-w', testout_file, + '-s', str(snapshot_len), + '-a', 'duration:{}'.format(capture_duration), + '-f', 'icmp || icmp6', + ), + env=capture_env + ) + capture_returncode = capture_proc.returncode + stop_pinging(ping_procs) + self.cleanup_files.append(testout_file) + self.assertEqual(capture_returncode, 0) + self.assertTrue(os.path.isfile(testout_file)) + + # Use tshark to filter out all packets larger than 68 bytes. + testout2_file = self.filename_from_id('testout2.pcap') + + filter_proc = self.runProcess((config.cmd_tshark, + '-r', testout_file, + '-w', testout2_file, + '-Y', 'frame.cap_len>{}'.format(snapshot_len), + )) + filter_returncode = filter_proc.returncode + self.cleanup_files.append(testout2_file) + self.assertEqual(capture_returncode, 0) + if (capture_returncode == 0): + check_testout_num_packets(self, 0, cap_file=testout2_file) + +class case_wireshark_capture(subprocesstest.SubprocessTestCase): + def test_wireshark_capture_10_packets_to_file(self): + '''Capture 10 packets from the network to a file using Wireshark''' + check_capture_10_packets(self, cmd=config.cmd_wireshark) + + # Wireshark doesn't currently support writing to stdout while capturing. + # def test_wireshark_capture_10_packets_to_stdout(self): + # '''Capture 10 packets from the network to stdout using Wireshark''' + # check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True) + + def test_wireshark_capture_from_fifo(self): + '''Capture from a fifo using Wireshark''' + check_capture_fifo(self, cmd=config.cmd_wireshark) + + def test_wireshark_capture_from_stdin(self): + '''Capture from stdin using Wireshark''' + check_capture_stdin(self, cmd=config.cmd_wireshark) + + def test_wireshark_capture_snapshot_len(self): + '''Capture truncated packets using Wireshark''' + check_capture_snapshot_len(self, cmd=config.cmd_wireshark) + +class case_tshark_capture(subprocesstest.SubprocessTestCase): + def test_tshark_capture_10_packets_to_file(self): + '''Capture 10 packets from the network to a file using TShark''' + check_capture_10_packets(self, cmd=config.cmd_tshark) + + def test_tshark_capture_10_packets_to_stdout(self): + '''Capture 10 packets from the network to stdout using TShark''' + check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True) + + def test_tshark_capture_from_fifo(self): + '''Capture from a fifo using TShark''' + check_capture_fifo(self, cmd=config.cmd_tshark) + + def test_tshark_capture_from_stdin(self): + '''Capture from stdin using TShark''' + check_capture_stdin(self, cmd=config.cmd_tshark) + + def test_tshark_capture_snapshot_len(self): + '''Capture truncated packets using TShark''' + check_capture_snapshot_len(self, cmd=config.cmd_tshark) + +class case_dumpcap_capture(subprocesstest.SubprocessTestCase): + def test_dumpcap_capture_10_packets_to_file(self): + '''Capture 10 packets from the network to a file using Dumpcap''' + check_capture_10_packets(self, cmd=config.cmd_dumpcap) + + def test_dumpcap_capture_10_packets_to_stdout(self): + '''Capture 10 packets from the network to stdout using Dumpcap''' + check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True) + + def test_dumpcap_capture_from_fifo(self): + '''Capture from a fifo using Dumpcap''' + check_capture_fifo(self, cmd=config.cmd_dumpcap) + + def test_dumpcap_capture_from_stdin(self): + '''Capture from stdin using Dumpcap''' + check_capture_stdin(self, cmd=config.cmd_dumpcap) + + def test_dumpcap_capture_snapshot_len(self): + '''Capture truncated packets using Dumpcap''' + check_capture_snapshot_len(self, cmd=config.cmd_dumpcap) diff --git a/test/suite_clopts.py b/test/suite_clopts.py new file mode 100644 index 0000000000..37165852d5 --- /dev/null +++ b/test/suite_clopts.py @@ -0,0 +1,176 @@ +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Command line option tests''' + +import config +import os.path +import subprocess +import subprocesstest +import unittest + +#glossaries = ('fields', 'protocols', 'values', 'decodes', 'defaultprefs', 'currentprefs') + +glossaries = ('decodes', 'values') + +class case_dumpcap_invalid_chars(subprocesstest.SubprocessTestCase): + # XXX Should we generate individual test functions instead of looping? + def test_dumpcap_invalid_chars(self): + '''Invalid dumpcap parameters''' + for char_arg in 'CEFGHJKNOQRTUVWXYejloxz': + self.assertRun((config.cmd_dumpcap, '-' + char_arg), + expected_return=self.exit_command_line) + + +class case_dumpcap_valid_chars(subprocesstest.SubprocessTestCase): + # XXX Should we generate individual test functions instead of looping? + def test_dumpcap_valid_chars(self): + for char_arg in 'hv': + self.assertRun((config.cmd_dumpcap, '-' + char_arg)) + + +class case_dumpcap_invalid_interface_chars(subprocesstest.SubprocessTestCase): + # XXX Should we generate individual test functions instead of looping? + def test_dumpcap_interface_chars(self): + '''Valid dumpcap parameters requiring capture permissions''' + valid_returns = [self.exit_ok, self.exit_error] + for char_arg in 'DL': + process = self.runProcess((config.cmd_dumpcap, '-' + char_arg)) + self.assertIn(process.returncode, valid_returns) + + +class case_dumpcap_capture_clopts(subprocesstest.SubprocessTestCase): + @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges') + def test_dumpcap_invalid_capfilter(self): + '''Invalid capture filter''' + invalid_filter = '__invalid_protocol' + # $DUMPCAP -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1 + self.runProcess((config.cmd_dumpcap, '-f', invalid_filter, '-w', 'testout.pcap' )) + self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface')) + + @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges') + def test_dumpcap_invalid_interface_name(self): + '''Invalid capture interface name''' + invalid_interface = '__invalid_interface' + # $DUMPCAP -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1 + self.runProcess((config.cmd_dumpcap, '-i', invalid_interface, '-w', 'testout.pcap')) + self.assertTrue(self.grepOutput('The capture session could not be initiated')) + + @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges') + def test_dumpcap_invalid_interface_index(self): + '''Invalid capture interface index''' + invalid_index = '0' + # $DUMPCAP -i 0 -w './testout.pcap' > ./testout.txt 2>&1 + self.runProcess((config.cmd_dumpcap, '-i', invalid_index, '-w', 'testout.pcap')) + self.assertTrue(self.grepOutput('There is no interface with that adapter index')) + + +class case_basic_clopts(subprocesstest.SubprocessTestCase): + def test_existing_file(self): + # $TSHARK -r "${CAPTURE_DIR}dhcp.pcap" > ./testout.txt 2>&1 + cap_file = os.path.join(config.capture_dir, 'dhcp.pcap') + self.assertRun((config.cmd_tshark, '-r', cap_file)) + + def test_nonexistent_file(self): + # $TSHARK - r ThisFileDontExist.pcap > ./testout.txt 2 > &1 + cap_file = os.path.join(config.capture_dir, '__ceci_nest_pas_une.pcap') + self.assertRun((config.cmd_tshark, '-r', cap_file), + expected_return=self.exit_error) + + +class case_tshark_invalid_chars(subprocesstest.SubprocessTestCase): + # XXX Should we generate individual test functions instead of looping? + def test_tshark_invalid_chars(self): + '''Invalid tshark parameters''' + for char_arg in 'ABCEFHJKMNORTUWXYZabcdefijkmorstuwyz': + self.assertRun((config.cmd_tshark, '-' + char_arg), + expected_return=self.exit_command_line) + + +class case_tshark_valid_chars(subprocesstest.SubprocessTestCase): + # XXX Should we generate individual test functions instead of looping? + def test_tshark_valid_chars(self): + for char_arg in 'Ghv': + self.assertRun((config.cmd_tshark, '-' + char_arg)) + + +class case_tshark_invalid_interface_chars(subprocesstest.SubprocessTestCase): + # XXX Should we generate individual test functions instead of looping? + def test_tshark_interface_chars(self): + '''Valid tshark parameters requiring capture permissions''' + valid_returns = [self.exit_ok, self.exit_error] + for char_arg in 'DL': + process = self.runProcess((config.cmd_tshark, '-' + char_arg)) + self.assertIn(process.returncode, valid_returns) + + +class case_tshark_capture_clopts(subprocesstest.SubprocessTestCase): + @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges') + def test_tshark_invalid_capfilter(self): + '''Invalid capture filter''' + invalid_filter = '__invalid_protocol' + # $TSHARK -f 'jkghg' -w './testout.pcap' > ./testout.txt 2>&1 + self.runProcess((config.cmd_tshark, '-f', invalid_filter, '-w', 'testout.pcap' )) + self.assertTrue(self.grepOutput('Invalid capture filter "' + invalid_filter + '" for interface')) + + @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges') + def test_tshark_invalid_interface_name(self): + '''Invalid capture interface name''' + invalid_interface = '__invalid_interface' + # $TSHARK -i invalid_interface -w './testout.pcap' > ./testout.txt 2>&1 + self.runProcess((config.cmd_tshark, '-i', invalid_interface, '-w', 'testout.pcap')) + self.assertTrue(self.grepOutput('The capture session could not be initiated')) + + @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges') + def test_tshark_invalid_interface_index(self): + '''Invalid capture interface index''' + invalid_index = '0' + # $TSHARK -i 0 -w './testout.pcap' > ./testout.txt 2>&1 + self.runProcess((config.cmd_tshark, '-i', invalid_index, '-w', 'testout.pcap')) + self.assertTrue(self.grepOutput('There is no interface with that adapter index')) + + +class case_tshark_name_resolution_clopts(subprocesstest.SubprocessTestCase): + @unittest.skipUnless(config.canCapture(), 'Test requires capture privileges') + def test_tshark_valid_name_resolution(self): + # $TSHARK -N mntC -a duration:1 > ./testout.txt 2>&1 + self.assertRun((config.cmd_tshark, '-N', 'mntC', '-a', 'duration: 1')) + + # XXX Add invalid name resolution. + +class case_tshark_dump_glossaries(subprocesstest.SubprocessTestCase): + def test_tshark_dump_glossary(self): + for glossary in glossaries: + try: + self.log_fd.truncate() + except: + pass + self.assertRun((config.cmd_tshark, '-G', glossary)) + + def test_tshark_glossary_valid_utf8(self): + for glossary in glossaries: + env = os.environ.copy() + env['LANG'] = 'en_US.UTF-8' + g_contents = subprocess.check_output((config.cmd_tshark, '-G', glossary), env=env, stderr=subprocess.PIPE) + decoded = True + try: + g_contents.decode('UTF-8') + except UnicodeDecodeError: + decoded = False + self.assertTrue(decoded, '{} is not valid UTF-8'.format(glossary)) + + def test_tshark_glossary_plugin_count(self): + self.runProcess((config.cmd_tshark, '-G', 'plugins')) + self.assertGreaterEqual(self.countOutput('dissector'), 10, 'Fewer than 10 dissector plugins found') + + +# Purposefully fail a test. Used for testing the test framework. +# class case_fail_on_purpose(subprocesstest.SubprocessTestCase): +# def test_fail_on_purpose(self): +# self.runProcess(('echo', 'hello, world')) +# self.fail('Not implemented') diff --git a/test/suite_decryption.py b/test/suite_decryption.py new file mode 100644 index 0000000000..5b0bc77b83 --- /dev/null +++ b/test/suite_decryption.py @@ -0,0 +1,496 @@ +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Decryption tests''' + +import config +import os.path +import subprocesstest +import unittest + +uat_files = [ + '80211_keys', + 'dtlsdecrypttablefile', + 'esp_sa', + 'ssl_keys', + 'c1222_decryption_table', + 'ikev1_decryption_table', + 'ikev2_decryption_table', +] +for uat in uat_files: + config.setUpConfigFile(uat) + + +class case_decrypt_80211(subprocesstest.SubprocessTestCase): + def test_80211_wpa_psk(self): + '''IEEE 802.11 WPA PSK''' + # https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=view&target=wpa-Induction.pcap + capture_file = os.path.join(config.capture_dir, 'wpa-Induction.pcap.gz') + self.runProcess((config.cmd_tshark, + '-o', 'wlan.enable_decryption: TRUE', + '-Tfields', + '-e', 'http.request.uri', + '-r', capture_file, + '-Y', 'http', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('favicon.ico')) + + def test_80211_wpa_eap(self): + '''IEEE 802.11 WPA EAP (EAPOL Rekey)''' + # Included in git sources test/captures/wpa-eap-tls.pcap.gz + capture_file = os.path.join(config.capture_dir, 'wpa-eap-tls.pcap.gz') + self.runProcess((config.cmd_tshark, + '-o', 'wlan.enable_decryption: TRUE', + '-r', capture_file, + '-Y', 'wlan.analysis.tk==7d9987daf5876249b6c773bf454a0da7', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('Group Message')) + + def test_80211_wpa_eapol_incomplete_rekeys(self): + '''WPA decode with message1+2 only and secure bit set on message 2''' + # Included in git sources test/captures/wpa-test-decode.pcap.gz + capture_file = os.path.join(config.capture_dir, 'wpa-test-decode.pcap.gz') + self.runProcess((config.cmd_tshark, + '-o', 'wlan.enable_decryption: TRUE', + '-r', capture_file, + '-Y', 'icmp.resp_to == 4263', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('Echo')) + + def test_80211_wpa_psk_mfp(self): + '''WPA decode management frames with MFP enabled (802.11w)''' + # Included in git sources test/captures/wpa-test-decode-mgmt.pcap.gz + capture_file = os.path.join(config.capture_dir, 'wpa-test-decode-mgmt.pcap.gz') + self.runProcess((config.cmd_tshark, + '-o', 'wlan.enable_decryption: TRUE', + '-r', capture_file, + '-Y', 'wlan.fixed.reason_code == 2 || wlan.fixed.category_code == 3', + ), + env=config.test_env) + self.assertEqual(self.countOutput('802.11.*SN=.*FN=.*Flags='), 3) + + + def test_80211_wpa_tdls(self): + '''WPA decode traffic in a TDLS (Tunneled Direct-Link Setup) session (802.11z)''' + # Included in git sources test/captures/wpa-test-decode-tdls.pcap.gz + capture_file = os.path.join(config.capture_dir, 'wpa-test-decode-tdls.pcap.gz') + self.runProcess((config.cmd_tshark, + '-o', 'wlan.enable_decryption: TRUE', + '-r', capture_file, + '-Y', 'icmp', + ), + env=config.test_env) + self.assertEqual(self.countOutput('ICMP.*Echo .ping'), 2) + +class case_decrypt_dtls(subprocesstest.SubprocessTestCase): + def test_dtls(self): + '''DTLS''' + # https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=view&target=snakeoil.tgz + capture_file = os.path.join(config.capture_dir, 'snakeoil-dtls.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'data.data', + '-Y', 'data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('69:74:20:77:6f:72:6b:20:21:0a')) + + def test_dtls_psk_aes128ccm8(self): + '''DTLS 1.2 with PSK, AES-128-CCM-8''' + capture_file = os.path.join(config.capture_dir, 'dtls12-aes128ccm8.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'dtls.psk:ca19e028a8a372ad2d325f950fcaceed', + '-x' + ), + env=config.test_env) + dt_count = self.countOutput('Decrypted DTLS') + wfm_count = self.countOutput('Works for me!.') + self.assertTrue(dt_count == 7 and wfm_count == 2) + + def test_dtls_udt(self): + '''UDT over DTLS 1.2 with RSA key''' + capture_file = os.path.join(config.capture_dir, 'udt-dtls.pcapng.gz') + key_file = os.path.join(config.key_dir, 'udt-dtls.key') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'dtls.keys_list:0.0.0.0,0,data,{}'.format(key_file), + '-Y', 'dtls && udt.type==ack', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('UDT')) + +class case_decrypt_tls(subprocesstest.SubprocessTestCase): + def test_ssl(self): + '''SSL using the server's private key''' + # https://wiki.wireshark.org/SampleCaptures?action=AttachFile&do=view&target=snakeoil2_070531.tgz + capture_file = os.path.join(config.capture_dir, 'rsasnakeoil2.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'http.request.uri', + '-Y', 'http', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('favicon.ico')) + + def test_ssl_rsa_pq(self): + '''SSL using the server's private key with p < q + (test whether libgcrypt is correctly called)''' + capture_file = os.path.join(config.capture_dir, 'rsa-p-lt-q.pcap') + key_file = os.path.join(config.key_dir, 'rsa-p-lt-q.key') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'ssl.keys_list:0.0.0.0,443,http,{}'.format(key_file), + '-Tfields', + '-e', 'http.request.uri', + '-Y', 'http', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('/')) + + def test_ssl_with_password(self): + '''SSL using the server's private key with password''' + capture_file = os.path.join(config.capture_dir, 'dmgr.pcapng') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'http.request.uri', + '-Y', 'http', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('unsecureLogon.jsp')) + + def test_ssl_master_secret(self): + '''SSL using the master secret''' + capture_file = os.path.join(config.capture_dir, 'dhe1.pcapng.gz') + key_file = os.path.join(config.key_dir, 'dhe1_keylog.dat') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'ssl.keylog_file: {}'.format(key_file), + '-o', 'ssl.desegment_ssl_application_data: FALSE', + '-o', 'http.ssl.port: 443', + '-Tfields', + '-e', 'http.request.uri', + '-Y', 'http', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('test')) + + def test_tls12_renegotiation(self): + '''TLS 1.2 with renegotiation''' + capture_file = os.path.join(config.capture_dir, 'tls-renegotiation.pcap') + key_file = os.path.join(config.key_dir, 'rsasnakeoil2.key') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'ssl.keys_list:0.0.0.0,4433,http,{}'.format(key_file), + '-Tfields', + '-e', 'http.content_length', + '-Y', 'http', + ), + env=config.test_env) + count_0 = self.countOutput('^0$') + count_2151 = self.countOutput('^2151$') + self.assertTrue(count_0 == 1 and count_2151 == 1) + + def test_tls12_psk_aes128ccm(self): + '''TLS 1.2 with PSK, AES-128-CCM''' + capture_file = os.path.join(config.capture_dir, 'tls12-aes128ccm.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'ssl.psk:ca19e028a8a372ad2d325f950fcaceed', + '-q', + '-z', 'follow,ssl,ascii,0', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('http://www.gnu.org/software/gnutls')) + + def test_tls12_psk_aes256gcm(self): + '''TLS 1.2 with PSK, AES-256-GCM''' + capture_file = os.path.join(config.capture_dir, 'tls12-aes256gcm.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'ssl.psk:ca19e028a8a372ad2d325f950fcaceed', + '-q', + '-z', 'follow,ssl,ascii,0', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('http://www.gnu.org/software/gnutls')) + + def test_tls12_chacha20poly1305(self): + '''TLS 1.2 with ChaCha20-Poly1305''' + if not config.have_libgcrypt17: + self.skipTest('Requires GCrypt 1.7 or later.') + capture_file = os.path.join(config.capture_dir, 'tls12-chacha20poly1305.pcap') + key_file = os.path.join(config.key_dir, 'tls12-chacha20poly1305.keys') + ciphers=[ + 'ECDHE-ECDSA-CHACHA20-POLY1305', + 'ECDHE-RSA-CHACHA20-POLY1305', + 'DHE-RSA-CHACHA20-POLY1305', + 'RSA-PSK-CHACHA20-POLY1305', + 'DHE-PSK-CHACHA20-POLY1305', + 'ECDHE-PSK-CHACHA20-POLY1305', + 'PSK-CHACHA20-POLY1305', + ] + stream = 0 + for cipher in ciphers: + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'ssl.keylog_file: {}'.format(key_file), + '-q', + '-z', 'follow,ssl,ascii,{}'.format(stream), + ), + env=config.test_env) + stream += 1 + self.assertTrue(self.grepOutput('Cipher is {}'.format(cipher))) + + def test_tls13_chacha20poly1305(self): + '''TLS 1.3 with ChaCha20-Poly1305''' + if not config.have_libgcrypt17: + self.skipTest('Requires GCrypt 1.7 or later.') + capture_file = os.path.join(config.capture_dir, 'tls13-20-chacha20poly1305.pcap') + key_file = os.path.join(config.key_dir, 'tls13-20-chacha20poly1305.keys') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'ssl.keylog_file: {}'.format(key_file), + '-q', + '-z', 'follow,ssl,ascii,0', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('TLS13-CHACHA20-POLY1305-SHA256')) + +class case_decrypt_zigbee(subprocesstest.SubprocessTestCase): + def test_zigbee(self): + '''ZigBee''' + # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=7022 + capture_file = os.path.join(config.capture_dir, 'sample_control4_2012-03-24.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'data.data', + '-Y', 'zbee_aps', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('30:67:63:63:38:65:20:63:34:2e:64:6d:2e:74:76:20')) + +class case_decrypt_ansi_c1222(subprocesstest.SubprocessTestCase): + def test_ansi_c1222(self): + '''ANSI C12.22''' + # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=9196 + capture_file = os.path.join(config.capture_dir, 'c1222_std_example8.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'c1222.decrypt: TRUE', + '-o', 'c1222.baseoid: 2.16.124.113620.1.22.0', + '-Tfields', + '-e', 'c1222.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('00:10:4d:41:4e:55:46:41:43:54:55:52:45:52:20:53:4e:20:92')) + +class case_decrypt_dvb_ci(subprocesstest.SubprocessTestCase): + def test_dvb_ci(self): + '''DVB-CI''' + # simplified version of the sample capture in + # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=6700 + capture_file = os.path.join(config.capture_dir, 'dvb-ci_UV1_0000.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'dvb-ci.sek: 00000000000000000000000000000000', + '-o', 'dvb-ci.siv: 00000000000000000000000000000000', + '-Tfields', + '-e', 'dvb-ci.cc.sac.padding', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('80:00:00:00:00:00:00:00:00:00:00:00')) + +class case_decrypt_ipsec(subprocesstest.SubprocessTestCase): + def test_ipsec_esp(self): + '''IPsec ESP''' + # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12671 + capture_file = os.path.join(config.capture_dir, 'esp-bug-12671.pcapng.gz') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'esp.enable_encryption_decode: TRUE', + '-Tfields', + '-e', 'data.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('08:09:0a:0b:0c:0d:0e:0f:10:11:12:13:14:15:16:17')) + +class case_decrypt_ike_isakmp(subprocesstest.SubprocessTestCase): + def test_ikev1_certs(self): + '''IKEv1 (ISAKMP) with certificates''' + # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=7951 + capture_file = os.path.join(config.capture_dir, 'ikev1-certs.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'x509sat.printableString', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('OpenSwan')) + + def test_ikev1_simultaneous(self): + '''IKEv1 (ISAKMP) simultaneous exchanges''' + # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12610 + capture_file = os.path.join(config.capture_dir, 'ikev1-bug-12610.pcapng.gz') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.hash', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('b5:25:21:f7:74:96:74:02:c9:f6:ce:e9:5f:d1:7e:5b')) + + def test_ikev1_unencrypted(self): + '''IKEv1 (ISAKMP) unencrypted phase 1''' + # https://bugs.wireshark.org/bugzilla/show_bug.cgi?id=12620 + capture_file = os.path.join(config.capture_dir, 'ikev1-bug-12620.pcapng.gz') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.hash', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('40:04:3b:64:0f:43:73:25:0d:5a:c3:a1:fb:63:15:3c')) + + def test_ikev2_3des_sha160(self): + '''IKEv2 decryption test (3DES-CBC/SHA1_160)''' + capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-3des-sha1_160.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.auth.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('02:f7:a0:d5:f1:fd:c8:ea:81:03:98:18:c6:5b:b9:bd:09:af:9b:89:17:31:9b:88:7f:f9:ba:30:46:c3:44:c7')) + + def test_ikev2_aes128_ccm12(self): + '''IKEv2 decryption test (AES-128-CCM-12) - with CBC-MAC verification''' + capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes128ccm12.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.auth.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('c2:10:43:94:29:9e:1f:fe:79:08:ea:72:0a:d5:d1:37:17:a0:d4:54:e4:fa:0a:21:28:ea:68:94:11:f4:79:c4')) + + def test_ikev2_aes128_ccm12_2(self): + '''IKEv2 decryption test (AES-128-CCM-12 using CTR mode, without checksum)''' + capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes128ccm12-2.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.auth.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('aa:a2:81:c8:7b:4a:19:04:6c:57:27:1d:55:74:88:ca:41:3b:57:22:8c:b9:51:f5:fa:96:40:99:2a:02:85:b9')) + + def test_ikev2_aes192ctr_sha512(self): + '''IKEv2 decryption test (AES-192-CTR/SHA2-512)''' + capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes192ctr.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.auth.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('3e:c2:3d:cf:93:48:48:56:38:40:7c:75:45:47:ae:b3:08:52:90:08:2c:49:f5:83:fd:ba:e5:92:63:a2:0b:4a')) + + def test_ikev2_aes256cbc_sha256(self): + '''IKEv2 decryption test (AES-256-CBC/SHA2-256)''' + capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes256cbc.pcapng') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.auth.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('e1:a8:d5:50:06:42:01:a7:ec:02:4a:85:75:8d:06:73:c6:1c:5c:51:0a:c1:3b:cd:22:5d:63:27:f5:0d:a3:d3')) + + def test_ikev2_aes256ccm16(self): + '''IKEv2 decryption test (AES-256-CCM-16)''' + capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes256ccm16.pcapng') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.auth.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('fa:2e:74:bd:c0:1e:30:fb:0b:3d:dc:97:23:c9:44:90:95:96:9d:a5:1f:69:e5:60:20:9d:2c:2b:79:40:21:0a')) + + def test_ikev2_aes256gcm16(self): + '''IKEv2 decryption test (AES-256-GCM-16)''' + capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes256gcm16.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.auth.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('9a:b7:1f:14:ab:55:3c:ad:87:3a:1a:a7:0b:99:df:15:5d:ee:77:cd:cf:36:94:b3:b7:52:7a:cb:b9:71:2d:ed')) + + def test_ikev2_aes256gcm8(self): + '''IKEv2 decryption test (AES-256-GCM-8)''' + capture_file = os.path.join(config.capture_dir, 'ikev2-decrypt-aes256gcm8.pcap') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'isakmp.auth.data', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('4a:66:d8:22:d0:af:bc:22:ad:9a:92:a2:cf:42:87:c9:20:ad:8a:c3:b0:69:a4:a7:e7:5f:e0:a5:d4:99:f9:14')) + +class case_decrypt_http2(subprocesstest.SubprocessTestCase): + def test_http2(self): + '''HTTP2 (HPACK)''' + if not config.have_nghttp2: + self.skipTest('Requires nghttp2.') + capture_file = os.path.join(config.capture_dir, 'packet-h2-14_headers.pcapng') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-Tfields', + '-e', 'http2.header.value', + '-d', 'tcp.port==3000,http2', + ), + env=config.test_env) + test_passed = self.grepOutput('nghttp2') + if not test_passed: + self.log_fd.write(u'\n\n-- Verbose output --\n\n') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-V', + '-d', 'tcp.port==3000,http2', + ), + env=config.test_env) + self.assertTrue(test_passed) + +class case_decrypt_kerberos(subprocesstest.SubprocessTestCase): + def test_kerberos(self): + '''Kerberos''' + # Files are from krb-816.zip on the SampleCaptures page. + if not config.have_kerberos: + self.skipTest('Requires nghttp2.') + capture_file = os.path.join(config.capture_dir, 'krb-816.pcap.gz') + keytab_file = os.path.join(config.key_dir, 'krb-816.keytab') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'kerberos.decrypt: TRUE', + '-o', 'kerberos.file: {}'.format(keytab_file), + '-Tfields', + '-e', 'kerberos.keyvalue', + ), + env=config.test_env) + # keyvalue: ccda7d48219f73c3b28311c4ba7242b3 + self.assertTrue(self.grepOutput('cc:da:7d:48:21:9f:73:c3:b2:83:11:c4:ba:72:42:b3')) diff --git a/test/suite_dissection.py b/test/suite_dissection.py new file mode 100644 index 0000000000..cfdab9253b --- /dev/null +++ b/test/suite_dissection.py @@ -0,0 +1,30 @@ +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Dissection tests''' + +import config +import os.path +import subprocesstest +import unittest + +class case_dissect_http2(subprocesstest.SubprocessTestCase): + def test_http2_data_reassembly(self): + '''HTTP2 data reassembly''' + if not config.have_nghttp2: + self.skipTest('Requires nghttp2.') + capture_file = os.path.join(config.capture_dir, 'http2-data-reassembly.pcap') + key_file = os.path.join(config.key_dir, 'http2-data-reassembly.keys') + self.runProcess((config.cmd_tshark, + '-r', capture_file, + '-o', 'ssl.keylog_file: {}'.format(key_file), + '-d', 'tcp.port==8443,ssl', + '-Y', 'http2.data.data matches "PNG" && http2.data.data matches "END"', + ), + env=config.test_env) + self.assertTrue(self.grepOutput('DATA')) diff --git a/test/test.py b/test/test.py new file mode 100755 index 0000000000..f53d90abaa --- /dev/null +++ b/test/test.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Main test script''' + +# To do: +# - Avoid printing Python tracebacks when we assert? It looks like we'd need +# to override unittest.TextTestResult.addFailure(). + +import argparse +import config +import os.path +import sys +import unittest + +def find_test_ids(suite, all_ids): + if hasattr(suite, '__iter__'): + for s in suite: + find_test_ids(s, all_ids) + else: + all_ids.append(suite.id()) + +def dump_failed_output(suite): + if hasattr(suite, '__iter__'): + for s in suite: + dump_failures = getattr(s, 'dump_failures', None) + if dump_failures: + dump_failures() + else: + dump_failed_output(s) + +def main(): + parser = argparse.ArgumentParser(description='Wireshark unit tests') + cap_group = parser.add_mutually_exclusive_group() + cap_group.add_argument('-e', '--enable-capture', action='store_true', help='Enable capture tests') + cap_group.add_argument('-E', '--disable-capture', action='store_true', help='Disable capture tests') + cap_group.add_argument('-i', '--capture-interface', nargs=1, default=None, help='Capture interface index or name') + parser.add_argument('-p', '--program-path', nargs=1, default=os.path.curdir, help='Path to Wireshark executables.') + list_group = parser.add_mutually_exclusive_group() + list_group.add_argument('-l', '--list', action='store_true', help='List tests. One of "all" or a full or partial test name.') + list_group.add_argument('--list-suites', action='store_true', help='List all suites.') + list_group.add_argument('--list-cases', action='store_true', help='List all suites and cases.') + parser.add_argument('-v', '--verbose', action='store_const', const=2, default=1, help='Verbose tests.') + parser.add_argument('tests_to_run', nargs='*', metavar='test', default=['all'], help='Tests to run. One of "all" or a full or partial test name. Default is "all".') + args = parser.parse_args() + + if args.enable_capture: + config.setCanCapture(True) + elif args.disable_capture: + config.setCanCapture(False) + + if args.capture_interface: + config.setCaptureInterface(args.capture_interface[0]) + + all_tests = unittest.defaultTestLoader.discover(os.path.dirname(__file__), pattern='suite_*.py') + + all_ids = [] + find_test_ids(all_tests, all_ids) + + run_ids = [] + for tid in all_ids: + for ttr in args.tests_to_run: + ttrl = ttr.lower() + if ttrl == 'all': + run_ids = all_ids + break + if ttrl in tid.lower(): + run_ids.append(tid) + + if not run_ids: + print('No tests found. You asked for:\n ' + '\n '.join(args.tests_to_run)) + parser.print_usage() + sys.exit(1) + + if args.list: + print('\n'.join(run_ids)) + sys.exit(0) + + if args.list_suites: + suites = set() + for rid in run_ids: + rparts = rid.split('.') + suites |= {rparts[0]} + print('\n'.join(list(suites))) + sys.exit(0) + + if args.list_cases: + cases = set() + for rid in run_ids: + rparts = rid.split('.') + cases |= {'.'.join(rparts[:2])} + print('\n'.join(list(cases))) + sys.exit(0) + + program_path = args.program_path[0] + if not config.setProgramPath(program_path): + print('One or more required executables not found at {}\n'.format(program_path)) + parser.print_usage() + sys.exit(1) + + run_suite = unittest.defaultTestLoader.loadTestsFromNames(run_ids) + runner = unittest.TextTestRunner(verbosity=args.verbose) + test_result = runner.run(run_suite) + + dump_failed_output(run_suite) + + if test_result.errors: + sys.exit(2) + + if test_result.failures: + sys.exit(1) + +if __name__ == '__main__': + main() diff --git a/test/util_slow_dhcp_pcap.py b/test/util_slow_dhcp_pcap.py new file mode 100644 index 0000000000..0adff51aff --- /dev/null +++ b/test/util_slow_dhcp_pcap.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +# +# Wireshark tests +# By Gerald Combs <gerald@wireshark.org> +# +# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping +# +# SPDX-License-Identifier: GPL-2.0-or-later +# +'''Write captures/dhcp.pcap to stdout, pause 1.5 seconds, and write it again.''' + +import os +import os.path +import time +import sys + +dhcp_pcap = os.path.join(os.path.dirname(__file__), 'captures', 'dhcp.pcap') + +dhcp_fd = open(dhcp_pcap, 'rb') +contents = dhcp_fd.read() +os.write(1, contents) +time.sleep(1.5) +os.write(1, contents[24:]) |