aboutsummaryrefslogtreecommitdiffstats
path: root/test/suite_dissectors/dissectorstest.py
blob: d2227ed9e199bdfaf7e7ac6b25555a0ff10ee067 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Wireshark dissector tests
# By Atli Guðmundsson <atli@tern.is>
#
# SPDX-License-Identifier: GPL-2.0-or-later

# Standard modules
import inspect
import json

# Wireshark modules
import fixtures
import subprocesstest


class _dissection_validator_real:
    '''
    Collects a set of byte bundles, matching json objects and a protocol
    name and verifies that a byte bundle converts into the matching json
    object using the following execution chain:

        byte bundle -> text2pcap -> tshark <protocol> -> json

    Note: The idea for this approach came about when it was realized that
    calling text2pcap and tshark for each byte bundle resulted in
    unacceptable overhead during execution of the unittests.
    '''

    def __init__(self, protocol, request, cmd_tshark, cmd_text2pcap):
        self.dissection_list = []
        self.protocol = protocol
        self.cmd_tshark = cmd_tshark
        self.cmd_text2pcap = cmd_text2pcap
        self.test_case = request.instance

    def add_dissection(self, byte_list, expected_result, line_no=None):
        '''Adds a byte bundle and an expected result to the set of byte
        bundles to verify.

        byte bundles must be iterable.'''

        hex_string = ' '.join('{:02x}'.format(ele) for ele in bytes(byte_list))

        if line_no is None:
            caller = inspect.getframeinfo(inspect.stack()[1][0])
            line_no = caller.lineno

        self.dissection_list.append((line_no, hex_string, expected_result))

# Uncomment the following lines to record in a text file all the dissector byte
# bundles, in the order they are presented:
#
#         with open("full.txt", 'a') as f:
#             f.write("0 {}\n".format(hex_string))

# Then use the following command to convert full.txt into a pcap file,
# replacing <port> with the default port of your protocol:
#       # text2pcap -u <port>,<port> full.txt out.pcap

    def check_dissections(self):
        '''Processes and verifies all added byte bundles and their expected
        results. At the end of processing the current set is emptied.'''

        text_file = self.test_case.filename_from_id('txt')
        pcap_file = self.test_case.filename_from_id('pcap')

        # create our text file of hex encoded messages
        with open(text_file, 'w') as f:
            for line_no, hex_string, expected_result in self.dissection_list:
                f.write("0 {}\n".format(hex_string))

        # generate our pcap file by feeding the messages to text2pcap
        self.test_case.assertRun((
            self.cmd_text2pcap,
            '-u', '1234,1234',
            text_file, pcap_file
        ))

        # generate our dissection from our pcap file
        tshark_proc = self.test_case.assertRun((
            self.cmd_tshark,
            '-r', pcap_file,
            '-T', 'json',
            '-d', 'udp.port==1234,{}'.format(self.protocol),
            '-J', self.protocol
        ))

        dissections = json.loads(tshark_proc.stdout_str)
        for (line_no, hex_string, expected_result), dissection in zip(self.dissection_list, dissections):

            # strip away everything except the protocol
            result = dissection['_source']['layers']
            self.test_case.assertIn(self.protocol, result)
            result = result[self.protocol]

            # verify that the dissection is as expected
            self.test_case.assertEqual(
                expected_result,
                result,
                "expected != result, while dissecting [{}] from line {}.".format(hex_string, line_no))

        # cleanup for next test
        self.dissection_list = []


@fixtures.fixture
def dissection_validator(request, cmd_tshark, cmd_text2pcap):

    def generate_validator(protocol):
        retval = _dissection_validator_real(
            protocol,
            request,
            cmd_tshark,
            cmd_text2pcap)
        return retval

    return generate_validator