aboutsummaryrefslogtreecommitdiffstats
path: root/test/suite_capture.py
blob: eb1a4ffcab62eb2b0702387ad91386cf21e7ced2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
#
# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''Capture tests'''

import config
import glob
import os
import re
import subprocess
import subprocesstest
import sys
import time
import unittest
import uuid

capture_duration = 5

testout_pcap = 'testout.pcap'
snapshot_len = 96

def start_pinging(self):
    ping_procs = []
    if sys.platform.startswith('win32'):
        # Fake '-i' with a subsecond interval.
        for st in (0.1, 0.1, 0):
            ping_procs.append(self.startProcess(config.args_ping))
            time.sleep(st)
    else:
        ping_procs.append(self.startProcess(config.args_ping))
    return ping_procs

def stop_pinging(ping_procs):
    for proc in ping_procs:
        proc.kill()

def check_capture_10_packets(self, cmd=None, to_stdout=False):
    # Similar to suite_io.check_io_4_packets.
    if not config.canCapture():
        self.skipTest('Test requires capture privileges and an interface.')
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    if not config.args_ping:
        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
    self.assertIsNotNone(cmd)
    testout_file = self.filename_from_id(testout_pcap)
    ping_procs = start_pinging(self)
    if to_stdout:
        capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
            '-i', '"{}"'.format(config.capture_interface),
            '-p',
            '-w', '-',
            '-c', '10',
            '-a', 'duration:{}'.format(capture_duration),
            '-f', '"icmp || icmp6"',
            '>', testout_file,
            shell=True
        ),
        shell=True
        )
    else:
        capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
            '-i', config.capture_interface,
            '-p',
            '-w', testout_file,
            '-c', '10',
            '-a', 'duration:{}'.format(capture_duration),
            '-f', 'icmp || icmp6',
        ))
    capture_returncode = capture_proc.returncode
    stop_pinging(ping_procs)
    if capture_returncode != 0:
        self.log_fd.write('{} -D output:\n'.format(cmd))
        self.runProcess((cmd, '-D'))
    self.assertEqual(capture_returncode, 0)
    if (capture_returncode == 0):
        self.checkPacketCount(10)

def check_capture_fifo(self, cmd=None):
    if not config.canMkfifo():
        self.skipTest('Test requires OS fifo support.')
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    self.assertIsNotNone(cmd)
    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
    testout_file = self.filename_from_id(testout_pcap)
    fifo_file = self.filename_from_id('testout.fifo')
    try:
        # If a previous test left its fifo laying around, e.g. from a failure, remove it.
        os.unlink(fifo_file)
    except:
        pass
    os.mkfifo(fifo_file)
    slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
    fifo_proc = self.startProcess(
        ('{0} > {1}'.format(slow_dhcp_cmd, fifo_file)),
        shell=True)
    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
        '-i', fifo_file,
        '-p',
        '-w', testout_file,
        '-a', 'duration:{}'.format(capture_duration),
    ))
    fifo_proc.kill()
    self.assertTrue(os.path.isfile(testout_file))
    capture_returncode = capture_proc.returncode
    self.assertEqual(capture_returncode, 0)
    if (capture_returncode == 0):
        self.checkPacketCount(8)

def check_capture_stdin(self, cmd=None):
    # Similar to suite_io.check_io_4_packets.
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    self.assertIsNotNone(cmd)
    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
    testout_file = self.filename_from_id(testout_pcap)
    slow_dhcp_cmd = subprocesstest.cat_dhcp_command('slow')
    capture_cmd = subprocesstest.capture_command(cmd,
        '-i', '-',
        '-w', testout_file,
        '-a', 'duration:{}'.format(capture_duration),
        shell=True
    )
    if cmd == config.cmd_wireshark:
        capture_cmd += ' -o console.log.level:127'
    pipe_proc = self.runProcess(slow_dhcp_cmd + ' | ' + capture_cmd, shell=True)
    pipe_returncode = pipe_proc.returncode
    self.assertEqual(pipe_returncode, 0)
    if cmd == config.cmd_wireshark:
        self.assertTrue(self.grepOutput('Wireshark is up and ready to go'), 'No startup message.')
        self.assertTrue(self.grepOutput('Capture started'), 'No capture start message.')
        self.assertTrue(self.grepOutput('Capture stopped'), 'No capture stop message.')
    self.assertTrue(os.path.isfile(testout_file))
    if (pipe_returncode == 0):
        self.checkPacketCount(8)

def check_capture_read_filter(self, cmd=None):
    if not config.canCapture():
        self.skipTest('Test requires capture privileges and an interface.')
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    if not config.args_ping:
        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
    self.assertIsNotNone(cmd)
    ping_procs = start_pinging(self)
    testout_file = self.filename_from_id(testout_pcap)
    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
        '-i', config.capture_interface,
        '-p',
        '-w', testout_file,
        '-2',
        '-R', 'dcerpc.cn_call_id==123456', # Something unlikely.
        '-c', '10',
        '-a', 'duration:{}'.format(capture_duration),
        '-f', 'icmp || icmp6',
    ))
    capture_returncode = capture_proc.returncode
    stop_pinging(ping_procs)
    self.assertEqual(capture_returncode, 0)

    if (capture_returncode == 0):
        self.checkPacketCount(0)

def check_capture_snapshot_len(self, cmd=None):
    if not config.canCapture():
        self.skipTest('Test requires capture privileges and an interface.')
    if cmd == config.cmd_wireshark and not config.canDisplay():
        self.skipTest('Test requires a display.')
    if not config.args_ping:
        self.skipTest('Your platform ({}) does not have a defined ping command.'.format(sys.platform))
    self.assertIsNotNone(cmd)
    ping_procs = start_pinging(self)
    testout_file = self.filename_from_id(testout_pcap)
    capture_proc = self.runProcess(subprocesstest.capture_command(cmd,
        '-i', config.capture_interface,
        '-p',
        '-w', testout_file,
        '-s', str(snapshot_len),
        '-a', 'duration:{}'.format(capture_duration),
        '-f', 'icmp || icmp6',
    ))
    capture_returncode = capture_proc.returncode
    stop_pinging(ping_procs)
    self.assertEqual(capture_returncode, 0)
    self.assertTrue(os.path.isfile(testout_file))

    # Use tshark to filter out all packets larger than 68 bytes.
    testout2_file = self.filename_from_id('testout2.pcap')

    filter_proc = self.runProcess((config.cmd_tshark,
        '-r', testout_file,
        '-w', testout2_file,
        '-Y', 'frame.cap_len>{}'.format(snapshot_len),
    ))
    filter_returncode = filter_proc.returncode
    self.assertEqual(capture_returncode, 0)
    if (capture_returncode == 0):
        self.checkPacketCount(0, cap_file=testout2_file)

def check_dumpcap_autostop_stdin(self, packets=None, filesize=None):
    # Similar to check_capture_stdin.
    cmd = config.cmd_dumpcap
    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
    testout_file = self.filename_from_id(testout_pcap)
    cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
    condition='oops:invalid'

    self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
    self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')

    if packets is not None:
        condition = 'packets:{}'.format(packets)
    elif filesize is not None:
        condition = 'filesize:{}'.format(filesize)

    capture_cmd = subprocesstest.capture_command(cmd,
        '-i', '-',
        '-w', testout_file,
        '-a', condition,
        shell=True
    )
    pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
    pipe_returncode = pipe_proc.returncode
    self.assertEqual(pipe_returncode, 0)
    self.assertTrue(os.path.isfile(testout_file))
    if (pipe_returncode != 0):
        return

    if packets is not None:
        self.checkPacketCount(packets)
    elif filesize is not None:
        capturekb = os.path.getsize(testout_file) / 1000
        self.assertGreaterEqual(capturekb, filesize)

def check_dumpcap_ringbuffer_stdin(self, packets=None, filesize=None):
    # Similar to check_capture_stdin.
    cmd = config.cmd_dumpcap
    capture_file = os.path.join(config.capture_dir, 'dhcp.pcap')
    rb_unique = 'dhcp_rb_' + uuid.uuid4().hex[:6] # Random ID
    testout_file = '{}.{}.pcapng'.format(self.id(), rb_unique)
    testout_glob = '{}.{}_*.pcapng'.format(self.id(), rb_unique)
    cat100_dhcp_cmd = subprocesstest.cat_dhcp_command('cat100')
    condition='oops:invalid'

    self.assertTrue(packets is not None or filesize is not None, 'Need one of packets or filesize')
    self.assertFalse(packets is not None and filesize is not None, 'Need one of packets or filesize')

    if packets is not None:
        condition = 'packets:{}'.format(packets)
    elif filesize is not None:
        condition = 'filesize:{}'.format(filesize)

    capture_cmd = subprocesstest.capture_command(cmd,
        '-i', '-',
        '-w', testout_file,
        '-a', 'files:2',
        '-b', condition,
        shell=True
    )
    pipe_proc = self.runProcess(cat100_dhcp_cmd + ' | ' + capture_cmd, shell=True)
    pipe_returncode = pipe_proc.returncode
    self.assertEqual(pipe_returncode, 0)
    if (pipe_returncode != 0):
        return

    rb_files = glob.glob(testout_glob)
    for rbf in rb_files:
        self.cleanup_files.append(rbf)

    self.assertEqual(len(rb_files), 2)

    for rbf in rb_files:
        self.assertTrue(os.path.isfile(rbf))
        if packets is not None:
            self.checkPacketCount(packets, cap_file=rbf)
        elif filesize is not None:
            capturekb = os.path.getsize(rbf) / 1000
            self.assertGreaterEqual(capturekb, filesize)

class case_wireshark_capture(subprocesstest.SubprocessTestCase):
    def test_wireshark_capture_10_packets_to_file(self):
        '''Capture 10 packets from the network to a file using Wireshark'''
        check_capture_10_packets(self, cmd=config.cmd_wireshark)

    # Wireshark doesn't currently support writing to stdout while capturing.
    # def test_wireshark_capture_10_packets_to_stdout(self):
    #     '''Capture 10 packets from the network to stdout using Wireshark'''
    #     check_capture_10_packets(self, cmd=config.cmd_wireshark, to_stdout=True)

    def test_wireshark_capture_from_fifo(self):
        '''Capture from a fifo using Wireshark'''
        check_capture_fifo(self, cmd=config.cmd_wireshark)

    def test_wireshark_capture_from_stdin(self):
        '''Capture from stdin using Wireshark'''
        check_capture_stdin(self, cmd=config.cmd_wireshark)

    def test_wireshark_capture_snapshot_len(self):
        '''Capture truncated packets using Wireshark'''
        check_capture_snapshot_len(self, cmd=config.cmd_wireshark)

class case_tshark_capture(subprocesstest.SubprocessTestCase):
    def test_tshark_capture_10_packets_to_file(self):
        '''Capture 10 packets from the network to a file using TShark'''
        check_capture_10_packets(self, cmd=config.cmd_tshark)

    def test_tshark_capture_10_packets_to_stdout(self):
        '''Capture 10 packets from the network to stdout using TShark'''
        check_capture_10_packets(self, cmd=config.cmd_tshark, to_stdout=True)

    def test_tshark_capture_from_fifo(self):
        '''Capture from a fifo using TShark'''
        check_capture_fifo(self, cmd=config.cmd_tshark)

    def test_tshark_capture_from_stdin(self):
        '''Capture from stdin using TShark'''
        check_capture_stdin(self, cmd=config.cmd_tshark)

    def test_tshark_capture_snapshot_len(self):
        '''Capture truncated packets using TShark'''
        check_capture_snapshot_len(self, cmd=config.cmd_tshark)

class case_dumpcap_capture(subprocesstest.SubprocessTestCase):
    def test_dumpcap_capture_10_packets_to_file(self):
        '''Capture 10 packets from the network to a file using Dumpcap'''
        check_capture_10_packets(self, cmd=config.cmd_dumpcap)

    def test_dumpcap_capture_10_packets_to_stdout(self):
        '''Capture 10 packets from the network to stdout using Dumpcap'''
        check_capture_10_packets(self, cmd=config.cmd_dumpcap, to_stdout=True)

    def test_dumpcap_capture_from_fifo(self):
        '''Capture from a fifo using Dumpcap'''
        check_capture_fifo(self, cmd=config.cmd_dumpcap)

    def test_dumpcap_capture_from_stdin(self):
        '''Capture from stdin using Dumpcap'''
        check_capture_stdin(self, cmd=config.cmd_dumpcap)

    def test_dumpcap_capture_snapshot_len(self):
        '''Capture truncated packets using Dumpcap'''
        check_capture_snapshot_len(self, cmd=config.cmd_dumpcap)

class case_dumpcap_autostop(subprocesstest.SubprocessTestCase):
    # duration, filesize, packets, files
    def test_dumpcap_autostop_filesize(self):
        '''Capture from stdin using Dumpcap until we reach a file size limit'''
        check_dumpcap_autostop_stdin(self, filesize=15)

    def test_dumpcap_autostop_packets(self):
        '''Capture from stdin using Dumpcap until we reach a packet limit'''
        check_dumpcap_autostop_stdin(self, packets=97) # Last prime before 100. Arbitrary.

class case_dumpcap_ringbuffer(subprocesstest.SubprocessTestCase):
    # duration, interval, filesize, packets, files
    # Need a function that finds ringbuffer file names.
    def test_dumpcap_ringbuffer_filesize(self):
        '''Capture from stdin using Dumpcap and write multiple files until we reach a file size limit'''
        check_dumpcap_ringbuffer_stdin(self, filesize=15)

    def test_dumpcap_ringbuffer_packets(self):
        '''Capture from stdin using Dumpcap and write multiple files until we reach a packet limit'''
        check_dumpcap_ringbuffer_stdin(self, packets=47) # Last prime before 50. Arbitrary.