aboutsummaryrefslogtreecommitdiffstats
path: root/src/osmo_gsm_tester/obj/osmo_vty.py
blob: 6fee5dc90ff058e4386e583a9aad3796194eb9fb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# osmo_gsm_tester: VTY connection
#
# Copyright (C) 2020 by sysmocom - s.f.m.c. GmbH
#
# Author: Neels Hofmeyr <neels@hofmeyr.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import socket
import struct
import re
import time
import sys

from ..core import log
from ..core.event_loop import MainLoop

class VtyInterfaceExn(Exception):
    pass

class OsmoVty(log.Origin):
    '''Suggested usage:
         with OsmoVty(...) as vty:
             vty.cmds('enable', 'configure network', 'net')
             response = vty.cmd('foo 1 2 3')
             print('\n'.join(response))

       Using 'with' ensures that the connection is closed again.
       There should not be nested 'with' statements on this object.

       Note that test env objects (like tenv.bsc()) may keep a VTY connected until the test exits. A 'with' should not
       be used on those.
    '''

##############
# PROTECTED
##############

    def __init__(self, host, port, prompt=None):
        super().__init__(log.C_BUS, 'Vty', host=host, port=port)
        self.host = host
        self.port = port
        self.sck = None
        self.prompt = prompt
        self.re_prompt = None
        self.this_node = None
        self.this_prompt_char = None
        self.last_node = None
        self.last_prompt_char = None

    def try_connect(self):
        '''Do a connection attempt, return True when successful, False otherwise.
           Does not raise exceptions, but logs them to the debug log.'''
        assert self.sck is None
        try:
            self.dbg('Connecting')
            sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                sck.connect((self.host, self.port))
            except:
                sck.close()
                raise
            # set self.sck only after the connect was successful
            self.sck = sck
            return True
        except:
            self.dbg('Failed to connect', sys.exc_info()[0])
            return False

    def _command(self, command_str, timeout=10, strict=True):
        '''Send a command and return the response.'''
        # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
        self.dbg('Sending', command_str=command_str)
        self.sck.send(command_str.encode())

        waited_since = time.time()
        received_lines = []
        last_line = ''

        # (not using MainLoop.wait() to accumulate received responses across
        # iterations)
        while True:
            new_data = self.sck.recv(4096).decode('utf-8')

            last_line = "%s%s" % (last_line, new_data)

            if last_line:
                # Separate the received response into lines.
                # But note: the VTY logging currently separates with '\n\r', not '\r\n',
                # see _vty_output() in libosmocore logging_vty.c.
                # So we need to jump through hoops to not separate 'abc\n\rdef' as
                # [ 'abc', '', 'def' ]; but also not to convert '\r\n\r\n' to '\r\n\n' ('\r{\r\n}\n')
                # Simplest is to just drop all the '\r' and only care about the '\n'.
                last_line = last_line.replace('\r', '')
                lines = last_line.splitlines()
                if last_line.endswith('\n'):
                    received_lines.extend(lines)
                    last_line = ""
                else:
                    # if pkt buffer ends in the middle of a line, we need to keep
                    # last non-finished line:
                    received_lines.extend(lines[:-1])
                    last_line = lines[-1]

            match = self.re_prompt.match(last_line)
            if not match:
                if time.time() - waited_since > timeout:
                    raise IOError("Failed to read data (did the app crash?)")
                MainLoop.sleep(.1)
                continue

            self.last_node = self.this_node
            self.last_prompt_char = self.this_prompt_char
            self.this_node = match.group(1) or None
            self.this_prompt_char = match.group(2)
            break

        # expecting to have received the command we sent as echo, remove it
        clean_command_str = command_str.strip()
        if clean_command_str.endswith('?'):
            clean_command_str = clean_command_str[:-1]
        if received_lines and received_lines[0] == clean_command_str:
            received_lines = received_lines[1:]
        if len(received_lines) > 1:
            self.dbg('Received\n|', '\n| '.join(received_lines), '\n')
        elif len(received_lines) == 1:
            self.dbg('Received', repr(received_lines[0]))

        if received_lines == ['% Unknown command.']:
            errmsg = 'VTY reports unknown command: %r' % command_str
            if strict:
                raise VtyInterfaceExn(errmsg)
            else:
                self.log('ignoring error:', errmsg)

        return received_lines

########################
# PUBLIC - INTERNAL API
########################

    def connect(self, timeout=30):
        '''Connect to the VTY self.host and self.port, retry for 'timeout' seconds.
           connect() and disconnect() are called implicitly when using the 'with' statement.
           See class OsmoVty's doc.
           '''
        MainLoop.wait(self.try_connect, timestep=3, timeout=timeout)
        self.sck.setblocking(1)

        # read first prompt
        # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
        self.this_node = None
        self.this_prompt_char = '>' # slight cheat for initial prompt char
        self.last_node = None
        self.last_prompt_char = None

        data = self.sck.recv(4096)
        if not self.prompt:
            b = data
            b = b[b.rfind(b'\n') + 1:]
            while b and (b[0] < ord('A') or b[0] > ord('z')):
                b = b[1:]
            prompt_str = b.decode('utf-8')
            if '>' in prompt_str:
                self.prompt = prompt_str[:prompt_str.find('>')]
            self.dbg(prompt=self.prompt)
        if not self.prompt:
            raise VtyInterfaceExn('Could not find application name; needed to decode prompts.'
                    ' Initial data was: %r' % data)
        self.re_prompt = re.compile('^%s(?:\(([\w-]*)\))?([#>]) (.*)$' % self.prompt)

    def disconnect(self):
        '''Disconnect.
           connect() and disconnect() are called implicitly when using the 'with' statement.
           See class OsmoVty's doc.
           '''
        if self.sck is None:
            return
        self.dbg('Disconnecting')
        self.sck.close()
        self.sck = None

###################
# PUBLIC (test API included)
###################

    def cmd(self, command_str, timeout=10, strict=True):
        '''Send one VTY command and return its response.
           Return a list of strings, one string per line, without line break characters:
             [ 'first line', 'second line', 'third line' ]
           When strict==False, do not raise exceptions on '% Unknown command'.
           If the connection is not yet open, briefly connect for only this command and disconnect again. If it is open,
           use the open connection and leave it open.
        '''
        # allow calling for both already connected VTY as well as establishing
        # a connection just for this command.
        if self.sck is None:
            with self:
                return self.cmd(command_str, timeout, strict)

        # (copied from https://git.osmocom.org/python/osmo-python-tests/tree/osmopy/osmo_interact/vty.py)
        command_str = command_str or '\r'
        if command_str[-1] not in '?\r\t':
            command_str = command_str + '\r'

        received_lines = self._command(command_str, timeout, strict)

        # send escape to cancel the '?' command line
        if command_str[-1] == '?':
            self._command('\x03', timeout)

        return received_lines

    def cmds(self, *cmds, timeout=10, strict=True):
        '''Send a series of commands and return each command's response:
             cmds('foo', 'bar', 'baz') --> [ ['foo line 1','foo line 2'], ['bar line 1'], ['baz line 1']]
           When strict==False, do not raise exceptions on '% Unknown command'.
           If the connection is not yet open, briefly connect for only these commands and disconnect again. If it is
           open, use the open connection and leave it open.
        '''
        # allow calling for both already connected VTY as well as establishing
        # a connection just for this command.
        if self.sck is None:
            with self:
                return self.cmds(*cmds, timeout=timeout, strict=strict)

        responses = []
        for cmd in cmds:
            responses.append(self.cmd(cmd, timeout, strict))
        return responses

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, *exc_info):
        self.disconnect()

# vim: expandtab tabstop=4 shiftwidth=4