aboutsummaryrefslogtreecommitdiffstats
path: root/test/suite_capture.py
blob: cdb78b4a2fdb4bc340cccdee60b48c6f4dd06bc4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
#
# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''Capture tests'''

import config
import os
import re
import subprocess
import subprocesstest
import sys
import time
import unittest

capture_duration = 5

testout_pcap = 'testout.pcap'
snapshot_len = 96

def start_pinging(self):
    ping_procs = []
    if sys.platform.startswith('win32'):
        # Fake '-i' with a subsecond interval.
        for st in (0.1, 0.1, 0):
            ping_procs.append(self.startProcess(config.args_ping))
            time.sleep(st)
    else:
        ping_procs.append(self.startProcess(config.args_ping))
    return ping_procs

def stop_pinging(ping_procs):
    for proc in ping_procs:
        proc.kill()

def check_capture_10_packets(self, cmd=None, to_stdout=False):
    # Similar to suite_io.check_io_4_packets.
    if not config.canCapture():
        self.skipTest('Test requires capture privileges and an interface.')
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    if not config.args_ping:
        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
    self.assertIsNotNone(cmd)
    testout_file = self.filename_from_id(testout_pcap)
    ping_procs = start_pinging(self)
    if to_stdout:
        capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
            '-i', '"{}"'.format(config.capture_interface),
            '-p',
            '-w', '-',
            '-c', '10',
            '-a', 'duration:{}'.format(capture_duration),
            '-f', '"icmp || icmp6"',
            '>', testout_file,
            shell=True
        ),
        shell=True
        )
    else:
        capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
            '-i', config.capture_interface,
            '-p',
            '-w', testout_file,
            '-c', '10',
            '-a', 'duration:{}'.format(capture_duration),
            '-f', 'icmp || icmp6',
        ))
    capture_returncode = capture_proc.returncode
    stop_pinging(ping_procs)
    if capture_returncode != 0:
        self.log_fd.write('{} -D output:\n'.format(cmd))
        self.runProcess((cmd, '-D'))
    self.assertEqual(capture_returncode, 0)
    if (capture_returncode == 0):
        self.checkPacketCount(10)

def check_capture_fifo(self, cmd=None):
    if not config.canMkfifo():
        self.skipTest('Test requires OS fifo support.')
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    self.assertIsNotNone(cmd)
    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
    testout_file = self.filename_from_id(testout_pcap)
    fifo_file = self.filename_from_id('testout.fifo')
    try:
        # If a previous test left its fifo laying around, e.g. from a failure, remove it.
        os.unlink(fifo_file)
    except:
        pass
    os.mkfifo(fifo_file)
    slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
    fifo_proc = self.startProcess(
        ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
        shell=True)
    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
        '-i', fifo_file,
        '-p',
        '-w', testout_file,
        '-a', 'duration:{}'.format(capture_duration),
    ))
    fifo_proc.kill()
    self.assertTrue(os.path.isfile(testout_file))
    capture_returncode = capture_proc.returncode
    self.assertEqual(capture_returncode, 0)
    if (capture_returncode == 0):
        self.checkPacketCount(8)

def check_capture_stdin(self, cmd=None):
    # Similar to suite_io.check_io_4_packets.
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    self.assertIsNotNone(cmd)
    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
    testout_file = self.filename_from_id(testout_pcap)
    slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
    capture_cmd = subprocesstest.capture_command(cmd,
        '-i', '-',
        '-w', testout_file,
        '-a', 'duration:{}'.format(capture_duration),
        shell=True
    )
    if cmd == config.cmd_wireshark:
        capture_cmd += ' -o console.log.level:127'
    pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
    pipe_returncode = pipe_proc.returncode
    self.assertEqual(pipe_returncode, 0)
    if cmd == config.cmd_wireshark:
        self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
        self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
        self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
    self.assertTrue(os.path.isfile(testout_file))
    if (pipe_returncode == 0):
        self.checkPacketCount(8)

def check_capture_2multi_10packets(self, cmd=None):
    # This was present in the Bash version but was incorrect and not part of any suite.
    # It's apparently intended to test file rotation.
    self.skipTest('Not yet implemented')

def check_capture_read_filter(self, cmd=None):
    if not config.canCapture():
        self.skipTest('Test requires capture privileges and an interface.')
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    if not config.args_ping:
        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
    self.assertIsNotNone(cmd)
    ping_procs = start_pinging(self)
    testout_file = self.filename_from_id(testout_pcap)
    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
        '-i', config.capture_interface,
        '-p',
        '-w', testout_file,
        '-2',
        '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
        '-c', '10',
        '-a', 'duration:{}'.format(capture_duration),
        '-f', 'icmp || icmp6',
    ))
    capture_returncode = capture_proc.returncode
    stop_pinging(ping_procs)
    self.assertEqual(capture_returncode, 0)

    if (capture_returncode == 0):
        self.checkPacketCount(0)

def check_capture_snapshot_len(self, cmd=None):
    if not config.canCapture():
        self.skipTest('Test requires capture privileges and an interface.')
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    if not config.args_ping:
        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
    self.assertIsNotNone(cmd)
    ping_procs = start_pinging(self)
    testout_file = self.filename_from_id(testout_pcap)
    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
        '-i', config.capture_interface,
        '-p',
        '-w', testout_file,
        '-s', str(snapshot_len),
        '-a', 'duration:{}'.format(capture_duration),
        '-f', 'icmp || icmp6',
    ))
    capture_returncode = capture_proc.returncode
    stop_pinging(ping_procs)
    self.assertEqual(capture_returncode, 0)
    self.assertTrue(os.path.isfile(testout_file))

    # Use tshark to filter out all packets larger than 68 bytes.
    testout2_file = self.filename_from_id('testout2.pcap')

    filter_proc = self.runProcess((config.cmd_tshark,
        '-r', testout_file,
        '-w', testout2_file,
        '-Y', 'frame.cap_len>{}'.format(snapshot_len),
    ))
    filter_returncode = filter_proc.returncode
    self.assertEqual(capture_returncode, 0)
    if (capture_returncode == 0):
        self.checkPacketCount(0, cap_file=testout2_file)

class case_wireshark_capture(subprocesstest.SubprocessTestCase):
    def test_wireshark_capture_10_packets_to_file(self):
        '''Capture 10 packets from the network to a file using Wireshark'''
        check_capture_10_packets(self, cmd=config.cmd_wireshark)

    # Wireshark doesn't currently support writing to stdout while capturing.
    # def test_wireshark_capture_10_packets_to_stdout(self):
    #     '''Capture 10 packets from the network to stdout using Wireshark'''
    #     check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)

    def test_wireshark_capture_from_fifo(self):
        '''Capture from a fifo using Wireshark'''
        check_capture_fifo(self, cmd=config.cmd_wireshark)

    def test_wireshark_capture_from_stdin(self):
        '''Capture from stdin using Wireshark'''
        check_capture_stdin(self, cmd=config.cmd_wireshark)

    def test_wireshark_capture_snapshot_len(self):
        '''Capture truncated packets using Wireshark'''
        check_capture_snapshot_len(self, cmd=config.cmd_wireshark)

class case_tshark_capture(subprocesstest.SubprocessTestCase):
    def test_tshark_capture_10_packets_to_file(self):
        '''Capture 10 packets from the network to a file using TShark'''
        check_capture_10_packets(self, cmd=config.cmd_tshark)

    def test_tshark_capture_10_packets_to_stdout(self):
        '''Capture 10 packets from the network to stdout using TShark'''
        check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)

    def test_tshark_capture_from_fifo(self):
        '''Capture from a fifo using TShark'''
        check_capture_fifo(self, cmd=config.cmd_tshark)

    def test_tshark_capture_from_stdin(self):
        '''Capture from stdin using TShark'''
        check_capture_stdin(self, cmd=config.cmd_tshark)

    def test_tshark_capture_snapshot_len(self):
        '''Capture truncated packets using TShark'''
        check_capture_snapshot_len(self, cmd=config.cmd_tshark)

class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
    def test_dumpcap_capture_10_packets_to_file(self):
        '''Capture 10 packets from the network to a file using Dumpcap'''
        check_capture_10_packets(self, cmd=config.cmd_dumpcap)

    def test_dumpcap_capture_10_packets_to_stdout(self):
        '''Capture 10 packets from the network to stdout using Dumpcap'''
        check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)

    def test_dumpcap_capture_from_fifo(self):
        '''Capture from a fifo using Dumpcap'''
        check_capture_fifo(self, cmd=config.cmd_dumpcap)

    def test_dumpcap_capture_from_stdin(self):
        '''Capture from stdin using Dumpcap'''
        check_capture_stdin(self, cmd=config.cmd_dumpcap)

    def test_dumpcap_capture_snapshot_len(self):
        '''Capture truncated packets using Dumpcap'''
        check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)