aboutsummaryrefslogtreecommitdiffstats
path: root/test/suite_sharkd.py
blob: 27eb9d58ce1854942519e5d112adaa5aa259bddd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#
# -*- coding: utf-8 -*-
# Wireshark tests
# By Gerald Combs <gerald@wireshark.org>
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
'''sharkd tests'''

import config
import json
import os.path
import subprocess
import subprocesstest
import sys
import unittest

dhcp_pcap = os.path.join(config.capture_dir, 'dhcp.pcap')

class case_sharkd(subprocesstest.SubprocessTestCase):
    def test_sharkd_hello_no_pcap(self):
        '''sharkd hello message, no capture file'''
        sharkd_proc = self.startProcess((config.cmd_sharkd, '-'),
            stdin=subprocess.PIPE
        )

        sharkd_commands = '{"req":"status"}\n'
        if sys.version_info[0] >= 3:
            sharkd_commands = sharkd_commands.encode('UTF-8')
        sharkd_proc.stdin.write(sharkd_commands)
        self.waitProcess(sharkd_proc)

        self.assertEqual(self.countOutput('Hello in child.', count_stdout=False, count_stderr=True), 1, 'No hello message.')

        try:
            jdata = json.loads(sharkd_proc.stdout_str)
            self.assertEqual(jdata['duration'], 0.0, 'Missing duration.')
        except:
            self.fail('Invalid JSON: "{}"'.format(sharkd_proc.stdout_str))

    def test_sharkd_hello_dhcp_pcap(self):
        '''sharkd hello message, simple capture file'''
        sharkd_proc = self.startProcess((config.cmd_sharkd, '-'),
            stdin=subprocess.PIPE
        )

        sharkd_commands = ''
        sharkd_commands = '{"req":"load","file":' + json.JSONEncoder().encode(dhcp_pcap) + '}\n'
        sharkd_commands += '{"req":"status"}\n'
        sharkd_commands += '{"req":"frames"}\n'
        if sys.version_info[0] >= 3:
            sharkd_commands = sharkd_commands.encode('UTF-8')

        sharkd_proc.stdin.write(sharkd_commands)
        self.waitProcess(sharkd_proc)

        has_dhcp = False
        for line in sharkd_proc.stdout_str.splitlines():
            line = line.strip()
            if not line: continue
            try:
                jdata = json.loads(line)
            except:
                self.fail('Invalid JSON for "{}"'.format(line))

            try:
                if 'DHCP' in jdata[0]['c']:
                    has_dhcp = True
            except:
                pass

        self.assertTrue(has_dhcp, 'Failed to find DHCP in JSON output')