diff options
author | Gerald Combs <gerald@wireshark.org> | 2018-04-04 11:56:06 -0700 |
---|---|---|
committer | Gerald Combs <gerald@wireshark.org> | 2018-04-04 23:22:52 +0000 |
commit | a63542fe33541d1aa5eedd1eea4e986ffb0d4101 (patch) | |
tree | ae767086f9b825f973131872111d6f1224f79358 /doc | |
parent | 57fed5d17fa22340fde76cabe2b5340ad48a1989 (diff) |
Fixup extcap_example.py
Run 2to3. Convert our payload to bytes. Switch from tab to 4-space
indentation as described in PEP8, which fixes a "TabError" message.
Change-Id: Ic20b6bc1fc1a945758c8be1bb54435f3326fb605
Reviewed-on: https://code.wireshark.org/review/26744
Reviewed-by: Gerald Combs <gerald@wireshark.org>
Diffstat (limited to 'doc')
-rwxr-xr-x | doc/extcap_example.py | 649 |
1 files changed, 326 insertions, 323 deletions
diff --git a/doc/extcap_example.py b/doc/extcap_example.py index 6d1d1d32bc..6aeeb16159 100755 --- a/doc/extcap_example.py +++ b/doc/extcap_example.py @@ -29,7 +29,7 @@ other script-based formates beside VBScript """ -from __future__ import print_function + import os import sys import signal @@ -77,28 +77,28 @@ button_disabled = False This code has been taken from http://stackoverflow.com/questions/5943249/python-argparse-and-controlling-overriding-the-exit-status-code - originally developed by Rob Cowie http://stackoverflow.com/users/46690/rob-cowie """ class ArgumentParser(argparse.ArgumentParser): - def _get_action_from_name(self, name): - """Given a name, get the Action instance registered with this parser. - If only it were made available in the ArgumentError object. It is - passed as it's first arg... - """ - container = self._actions - if name is None: - return None - for action in container: - if '/'.join(action.option_strings) == name: - return action - elif action.metavar == name: - return action - elif action.dest == name: - return action - - def error(self, message): - exc = sys.exc_info()[1] - if exc: - exc.argument = self._get_action_from_name(exc.argument_name) - raise exc - super(ArgumentParser, self).error(message) + def _get_action_from_name(self, name): + """Given a name, get the Action instance registered with this parser. + If only it were made available in the ArgumentError object. It is + passed as it's first arg... + """ + container = self._actions + if name is None: + return None + for action in container: + if '/'.join(action.option_strings) == name: + return action + elif action.metavar == name: + return action + elif action.dest == name: + return action + + def error(self, message): + exc = sys.exc_info()[1] + if exc: + exc.argument = self._get_action_from_name(exc.argument_name) + raise exc + super(ArgumentParser, self).error(message) #### EXTCAP FUNCTIONALITY @@ -108,67 +108,67 @@ interface in Wireshark to present a interface specific configuration for this extcap plugin """ def extcap_config(interface): - args = [] - values = [] + args = [] + values = [] - args.append ( (0, '--delay', 'Time delay', 'Time delay between packages', 'integer', '{range=1,15}{default=5}') ) - args.append ( (1, '--message', 'Message', 'Package message content', 'string', '{required=true}{placeholder=Please enter a message here ...}') ) - args.append ( (2, '--verify', 'Verify', 'Verify package content', 'boolflag', '{default=yes}') ) - args.append ( (3, '--remote', 'Remote Channel', 'Remote Channel Selector', 'selector', '')) - args.append ( (4, '--fake_ip', 'Fake IP Address', 'Use this ip address as sender', 'string', '{save=false}{validation=\\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\b}')) - args.append ( (5, '--ltest', 'Long Test', 'Long Test Value', 'long', '{default=123123123123123123}')) - args.append ( (6, '--d1test', 'Double 1 Test', 'Long Test Value', 'double', '{default=123.456}')) - args.append ( (7, '--d2test', 'Double 2 Test', 'Long Test Value', 'double', '{default= 123,456}')) - args.append ( (8, '--password', 'Password', 'Package message password', 'password', '') ) - args.append ( (9, '--ts', 'Start Time', 'Capture start time', 'timestamp', '') ) - args.append ( (10, '--logfile', 'Log File Test', 'The Log File Test', 'fileselect', '') ) - args.append ( (11, '--radio', 'Radio Test', 'Radio Test Value', 'radio', '') ) - args.append ( (12, '--multi', 'MultiCheck Test', 'MultiCheck Test Value', 'multicheck', '') ) + args.append ( (0, '--delay', 'Time delay', 'Time delay between packages', 'integer', '{range=1,15}{default=5}') ) + args.append ( (1, '--message', 'Message', 'Package message content', 'string', '{required=true}{placeholder=Please enter a message here ...}') ) + args.append ( (2, '--verify', 'Verify', 'Verify package content', 'boolflag', '{default=yes}') ) + args.append ( (3, '--remote', 'Remote Channel', 'Remote Channel Selector', 'selector', '')) + args.append ( (4, '--fake_ip', 'Fake IP Address', 'Use this ip address as sender', 'string', '{save=false}{validation=\\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\b}')) + args.append ( (5, '--ltest', 'Long Test', 'Long Test Value', 'long', '{default=123123123123123123}')) + args.append ( (6, '--d1test', 'Double 1 Test', 'Long Test Value', 'double', '{default=123.456}')) + args.append ( (7, '--d2test', 'Double 2 Test', 'Long Test Value', 'double', '{default= 123,456}')) + args.append ( (8, '--password', 'Password', 'Package message password', 'password', '') ) + args.append ( (9, '--ts', 'Start Time', 'Capture start time', 'timestamp', '') ) + args.append ( (10, '--logfile', 'Log File Test', 'The Log File Test', 'fileselect', '') ) + args.append ( (11, '--radio', 'Radio Test', 'Radio Test Value', 'radio', '') ) + args.append ( (12, '--multi', 'MultiCheck Test', 'MultiCheck Test Value', 'multicheck', '') ) - values.append ( (3, "if1", "Remote1", "true" ) ) - values.append ( (3, "if2", "Remote2", "false" ) ) + values.append ( (3, "if1", "Remote1", "true" ) ) + values.append ( (3, "if2", "Remote2", "false" ) ) - values.append ( (11, "r1", "Radio1", "false" ) ) - values.append ( (11, "r2", "Radio2", "true" ) ) + values.append ( (11, "r1", "Radio1", "false" ) ) + values.append ( (11, "r2", "Radio2", "true" ) ) - values.append ( (12, "m1", "MultiCheck1", "false" ) ) - values.append ( (12, "m2", "MultiCheck2", "false" ) ) + values.append ( (12, "m1", "MultiCheck1", "false" ) ) + values.append ( (12, "m2", "MultiCheck2", "false" ) ) - for arg in args: - print ("arg {number=%d}{call=%s}{display=%s}{tooltip=%s}{type=%s}%s" % arg) + for arg in args: + print ("arg {number=%d}{call=%s}{display=%s}{tooltip=%s}{type=%s}%s" % arg) - for value in values: - print ("value {arg=%d}{value=%s}{display=%s}{default=%s}" % value) + for value in values: + print ("value {arg=%d}{value=%s}{display=%s}{default=%s}" % value) def extcap_interfaces(): - print ("extcap {version=1.0}{help=http://www.wireshark.org}{display=Example extcap interface}") - print ("interface {value=example1}{display=Example interface 1 for extcap}") - print ("interface {value=example2}{display=Example interface 2 for extcap}") - print ("control {number=%d}{type=string}{display=Message}{tooltip=Package message content. Must start with a capital letter.}{placeholder=Enter package message content here ...}{validation=^[A-Z]+}" % CTRL_ARG_MESSAGE) - print ("control {number=%d}{type=selector}{display=Time delay}{tooltip=Time delay between packages}" % CTRL_ARG_DELAY) - print ("control {number=%d}{type=boolean}{display=Verify}{default=true}{tooltip=Verify package content}" % CTRL_ARG_VERIFY) - print ("control {number=%d}{type=button}{display=Turn on}{tooltip=Turn on or off}" % CTRL_ARG_BUTTON) - print ("control {number=%d}{type=button}{role=help}{display=Help}{tooltip=Show help}" % CTRL_ARG_HELP) - print ("control {number=%d}{type=button}{role=restore}{display=Restore}{tooltip=Restore default values}" % CTRL_ARG_RESTORE) - print ("control {number=%d}{type=button}{role=logger}{display=Log}{tooltip=Show capture log}" % CTRL_ARG_LOGGER) - print ("value {control=%d}{value=1}{display=1}" % CTRL_ARG_DELAY) - print ("value {control=%d}{value=2}{display=2}" % CTRL_ARG_DELAY) - print ("value {control=%d}{value=3}{display=3}" % CTRL_ARG_DELAY) - print ("value {control=%d}{value=4}{display=4}" % CTRL_ARG_DELAY) - print ("value {control=%d}{value=5}{display=5}{default=true}" % CTRL_ARG_DELAY) - print ("value {control=%d}{value=60}{display=60}" % CTRL_ARG_DELAY) + print ("extcap {version=1.0}{help=http://www.wireshark.org}{display=Example extcap interface}") + print ("interface {value=example1}{display=Example interface 1 for extcap}") + print ("interface {value=example2}{display=Example interface 2 for extcap}") + print ("control {number=%d}{type=string}{display=Message}{tooltip=Package message content. Must start with a capital letter.}{placeholder=Enter package message content here ...}{validation=^[A-Z]+}" % CTRL_ARG_MESSAGE) + print ("control {number=%d}{type=selector}{display=Time delay}{tooltip=Time delay between packages}" % CTRL_ARG_DELAY) + print ("control {number=%d}{type=boolean}{display=Verify}{default=true}{tooltip=Verify package content}" % CTRL_ARG_VERIFY) + print ("control {number=%d}{type=button}{display=Turn on}{tooltip=Turn on or off}" % CTRL_ARG_BUTTON) + print ("control {number=%d}{type=button}{role=help}{display=Help}{tooltip=Show help}" % CTRL_ARG_HELP) + print ("control {number=%d}{type=button}{role=restore}{display=Restore}{tooltip=Restore default values}" % CTRL_ARG_RESTORE) + print ("control {number=%d}{type=button}{role=logger}{display=Log}{tooltip=Show capture log}" % CTRL_ARG_LOGGER) + print ("value {control=%d}{value=1}{display=1}" % CTRL_ARG_DELAY) + print ("value {control=%d}{value=2}{display=2}" % CTRL_ARG_DELAY) + print ("value {control=%d}{value=3}{display=3}" % CTRL_ARG_DELAY) + print ("value {control=%d}{value=4}{display=4}" % CTRL_ARG_DELAY) + print ("value {control=%d}{value=5}{display=5}{default=true}" % CTRL_ARG_DELAY) + print ("value {control=%d}{value=60}{display=60}" % CTRL_ARG_DELAY) def extcap_dlts(interface): - if ( interface == '1' ): - print ("dlt {number=147}{name=USER0}{display=Demo Implementation for Extcap}") - elif ( interface == '2' ): - print ("dlt {number=148}{name=USER1}{display=Demo Implementation for Extcap}") + if ( interface == '1' ): + print ("dlt {number=147}{name=USER0}{display=Demo Implementation for Extcap}") + elif ( interface == '2' ): + print ("dlt {number=148}{name=USER1}{display=Demo Implementation for Extcap}") def validate_capture_filter(capture_filter): - if capture_filter != "filter" and capture_filter != "valid": - print("Illegal capture filter") + if capture_filter != "filter" and capture_filter != "valid": + print("Illegal capture filter") """ @@ -185,293 +185,296 @@ Extcap capture routine """ def unsigned(n): - return int(n) & 0xFFFFFFFF + return int(n) & 0xFFFFFFFF def pcap_fake_header(): - header = bytearray() - header += struct.pack('<L', int ('a1b2c3d4', 16 )) - header += struct.pack('<H', unsigned(2) ) # Pcap Major Version - header += struct.pack('<H', unsigned(4) ) # Pcap Minor Version - header += struct.pack('<I', int(0)) # Timezone - header += struct.pack('<I', int(0)) # Accurancy of timestamps - header += struct.pack('<L', int ('0000ffff', 16 )) # Max Length of capture frame - header += struct.pack('<L', unsigned(1)) # Ethernet - return header + header = bytearray() + header += struct.pack('<L', int ('a1b2c3d4', 16 )) + header += struct.pack('<H', unsigned(2) ) # Pcap Major Version + header += struct.pack('<H', unsigned(4) ) # Pcap Minor Version + header += struct.pack('<I', int(0)) # Timezone + header += struct.pack('<I', int(0)) # Accurancy of timestamps + header += struct.pack('<L', int ('0000ffff', 16 )) # Max Length of capture frame + header += struct.pack('<L', unsigned(1)) # Ethernet + return header # Calculates and returns the IP checksum based on the given IP Header def ip_checksum(iph): - #split into bytes - words = splitN(''.join(iph.split()),4) - csum = 0; - for word in words: - csum += int(word, base=16) - csum += (csum >> 16) - csum = csum & 0xFFFF ^ 0xFFFF - return csum + #split into bytes + words = splitN(''.join(iph.split()),4) + csum = 0; + for word in words: + csum += int(word, base=16) + csum += (csum >> 16) + csum = csum & 0xFFFF ^ 0xFFFF + return csum def pcap_fake_package ( message, fake_ip ): - pcap = bytearray() - #length = 14 bytes [ eth ] + 20 bytes [ ip ] + messagelength + pcap = bytearray() + #length = 14 bytes [ eth ] + 20 bytes [ ip ] + messagelength - caplength = len(message) + 14 + 20 - timestamp = int(time.time()) + caplength = len(message) + 14 + 20 + timestamp = int(time.time()) - pcap += struct.pack('<L', unsigned(timestamp ) ) # timestamp seconds - pcap += struct.pack('<L', 0x00 ) # timestamp nanoseconds - pcap += struct.pack('<L', unsigned(caplength ) ) # length captured - pcap += struct.pack('<L', unsigned(caplength ) ) # length in frame + pcap += struct.pack('<L', unsigned(timestamp ) ) # timestamp seconds + pcap += struct.pack('<L', 0x00 ) # timestamp nanoseconds + pcap += struct.pack('<L', unsigned(caplength ) ) # length captured + pcap += struct.pack('<L', unsigned(caplength ) ) # length in frame # ETH - pcap += struct.pack('h', 0 ) # source mac - pcap += struct.pack('h', 0 ) # source mac - pcap += struct.pack('h', 0 ) # source mac - pcap += struct.pack('h', 0 ) # dest mac - pcap += struct.pack('h', 0 ) # dest mac - pcap += struct.pack('h', 0 ) # dest mac - pcap += struct.pack('<h', unsigned(8 )) # protocol (ip) + pcap += struct.pack('h', 0 ) # source mac + pcap += struct.pack('h', 0 ) # source mac + pcap += struct.pack('h', 0 ) # source mac + pcap += struct.pack('h', 0 ) # dest mac + pcap += struct.pack('h', 0 ) # dest mac + pcap += struct.pack('h', 0 ) # dest mac + pcap += struct.pack('<h', unsigned(8 )) # protocol (ip) # IP - pcap += struct.pack('b', int ( '45', 16 )) # IP version - pcap += struct.pack('b', int ( '0', 16 )) # - pcap += struct.pack('>H', unsigned(len(message)+20) ) # length of data + payload - pcap += struct.pack('<H', int ( '0', 16 )) # Identification - pcap += struct.pack('b', int ( '40', 16 )) # Don't fragment - pcap += struct.pack('b', int ( '0', 16 )) # Fragment Offset - pcap += struct.pack('b', int ( '40', 16 )) - pcap += struct.pack('B', 0xFE ) # Protocol (2 = unspecified) - pcap += struct.pack('<H', int ( '0000', 16 )) # Checksum - - parts = fake_ip.split('.') - ipadr = (int(parts[0]) << 24) + (int(parts[1]) << 16) + (int(parts[2]) << 8) + int(parts[3]) - pcap += struct.pack('>L', ipadr ) # Source IP - pcap += struct.pack('>L', int ( '7F000001', 16 )) # Dest IP - - pcap += message - return pcap + pcap += struct.pack('b', int ( '45', 16 )) # IP version + pcap += struct.pack('b', int ( '0', 16 )) # + pcap += struct.pack('>H', unsigned(len(message)+20) ) # length of data + payload + pcap += struct.pack('<H', int ( '0', 16 )) # Identification + pcap += struct.pack('b', int ( '40', 16 )) # Don't fragment + pcap += struct.pack('b', int ( '0', 16 )) # Fragment Offset + pcap += struct.pack('b', int ( '40', 16 )) + pcap += struct.pack('B', 0xFE ) # Protocol (2 = unspecified) + pcap += struct.pack('<H', int ( '0000', 16 )) # Checksum + + parts = fake_ip.split('.') + ipadr = (int(parts[0]) << 24) + (int(parts[1]) << 16) + (int(parts[2]) << 8) + int(parts[3]) + pcap += struct.pack('>L', ipadr ) # Source IP + pcap += struct.pack('>L', int ( '7F000001', 16 )) # Dest IP + + pcap += message + return pcap def control_read(fn): - try: - header = fn.read(6) - sp, _, length, arg, typ = struct.unpack('>sBHBB', header) - if length > 2: - payload = fn.read(length - 2) - else: - payload = '' - return arg, typ, payload - except: - return None, None, None + try: + header = fn.read(6) + sp, _, length, arg, typ = struct.unpack('>sBHBB', header) + if length > 2: + payload = fn.read(length - 2) + else: + payload = '' + return arg, typ, payload + except: + return None, None, None def control_read_thread(control_in, fn_out): - global initialized, message, delay, verify, button, button_disabled - with open(control_in, 'rb', 0 ) as fn: - arg = 0 - while arg != None: - arg, typ, payload = control_read(fn) - log = '' - if typ == CTRL_CMD_INITIALIZED: - initialized = True - elif arg == CTRL_ARG_MESSAGE: - message = payload - log = "Message = " + payload - elif arg == CTRL_ARG_DELAY: - delay = float(payload) - log = "Time delay = " + payload - elif arg == CTRL_ARG_VERIFY: - # Only read this after initialized - if initialized: - verify = (payload[0] != '\0') - log = "Verify = " + str(verify) - control_write(fn_out, CTRL_ARG_NONE, CTRL_CMD_STATUSBAR, "Verify changed") - elif arg == CTRL_ARG_BUTTON: - control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_DISABLE, "") - button_disabled = True - if button == True: - control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_SET, "Turn on") - button = False - log = "Button turned off" - else: - control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_SET, "Turn off") - button = True - log = "Button turned on" - - if len(log) > 0: - control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_ADD, log + "\n") + global initialized, message, delay, verify, button, button_disabled + with open(control_in, 'rb', 0 ) as fn: + arg = 0 + while arg != None: + arg, typ, payload = control_read(fn) + log = '' + if typ == CTRL_CMD_INITIALIZED: + initialized = True + elif arg == CTRL_ARG_MESSAGE: + message = payload + log = "Message = " + payload + elif arg == CTRL_ARG_DELAY: + delay = float(payload) + log = "Time delay = " + payload + elif arg == CTRL_ARG_VERIFY: + # Only read this after initialized + if initialized: + verify = (payload[0] != '\0') + log = "Verify = " + str(verify) + control_write(fn_out, CTRL_ARG_NONE, CTRL_CMD_STATUSBAR, "Verify changed") + elif arg == CTRL_ARG_BUTTON: + control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_DISABLE, "") + button_disabled = True + if button == True: + control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_SET, "Turn on") + button = False + log = "Button turned off" + else: + control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_SET, "Turn off") + button = True + log = "Button turned on" + + if len(log) > 0: + control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_ADD, log + "\n") def control_write(fn, arg, typ, payload): - packet = bytearray() - packet += struct.pack('>sBHBB', 'T', 0, len(payload) + 2, arg, typ) - packet += payload - fn.write(packet) + packet = bytearray() + packet += struct.pack('>sBHBB', b'T', 0, len(payload) + 2, arg, typ) + if sys.version_info[0] >= 3 and isinstance(payload, str): + packet += payload.encode('utf-8') + else: + packet += payload + fn.write(packet) def control_write_defaults(fn_out): - global initialized, message, delay, verify + global initialized, message, delay, verify - while not initialized: - time.sleep(.1) # Wait for initial control values + while not initialized: + time.sleep(.1) # Wait for initial control values - # Write startup configuration to Toolbar controls - control_write(fn_out, CTRL_ARG_MESSAGE, CTRL_CMD_SET, message) - control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_SET, str(int(delay))) - control_write(fn_out, CTRL_ARG_VERIFY, CTRL_CMD_SET, struct.pack('B', verify)) + # Write startup configuration to Toolbar controls + control_write(fn_out, CTRL_ARG_MESSAGE, CTRL_CMD_SET, message) + control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_SET, str(int(delay))) + control_write(fn_out, CTRL_ARG_VERIFY, CTRL_CMD_SET, struct.pack('B', verify)) - for i in range(1,16): - item = bytearray() - item += str(i) + struct.pack('B', 0) + str(i) + " sec" - control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_ADD, item) + for i in range(1,16): + item = bytearray() + item += str(i) + struct.pack('B', 0) + str(i) + " sec" + control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_ADD, item) - control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_REMOVE, str(60)) + control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_REMOVE, str(60)) def extcap_capture(interface, fifo, control_in, control_out, in_delay, in_verify, in_message, remote, fake_ip): - global message, delay, verify, button_disabled - delay = in_delay if in_delay != 0 else 5 - message = in_message - verify = in_verify - counter = 1 - fn_out = None - - with open(fifo, 'wb', 0 ) as fh: - fh.write (pcap_fake_header()) - - if control_out != None: - fn_out = open(control_out, 'wb', 0) - control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_SET, "Log started at " + time.strftime("%c") + "\n") - - if control_in != None: - # Start reading thread - thread = Thread(target = control_read_thread, args = (control_in, fn_out)) - thread.start() - - if fn_out != None: - control_write_defaults(fn_out) - - while True: - if fn_out != None: - log = "Received packet #" + str(counter) + "\n" - control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_ADD, log) - counter = counter + 1 - - if button_disabled == True: - control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_ENABLE, "") - control_write(fn_out, CTRL_ARG_NONE, CTRL_CMD_INFORMATION, "Turn action finished.") - button_disabled = False - - out = ("%s|%04X%s|%s" % ( remote.strip(), len(message), message, verify )).encode("utf8") - fh.write (pcap_fake_package(out, fake_ip)) - time.sleep(delay) - - thread.join() - if fn_out != None: - fn_out.close() + global message, delay, verify, button_disabled + delay = in_delay if in_delay != 0 else 5 + message = in_message + verify = in_verify + counter = 1 + fn_out = None + + with open(fifo, 'wb', 0 ) as fh: + fh.write (pcap_fake_header()) + + if control_out != None: + fn_out = open(control_out, 'wb', 0) + control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_SET, "Log started at " + time.strftime("%c") + "\n") + + if control_in != None: + # Start reading thread + thread = Thread(target = control_read_thread, args = (control_in, fn_out)) + thread.start() + + if fn_out != None: + control_write_defaults(fn_out) + + while True: + if fn_out != None: + log = "Received packet #" + str(counter) + "\n" + control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_ADD, log) + counter = counter + 1 + + if button_disabled == True: + control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_ENABLE, "") + control_write(fn_out, CTRL_ARG_NONE, CTRL_CMD_INFORMATION, "Turn action finished.") + button_disabled = False + + out = ("%s|%04X%s|%s" % ( remote.strip(), len(message), message, verify )).encode("utf8") + fh.write (pcap_fake_package(out, fake_ip)) + time.sleep(delay) + + thread.join() + if fn_out != None: + fn_out.close() def extcap_close_fifo(fifo): - # This is apparently needed to workaround an issue on Windows/macOS - # where the message cannot be read. (really?) - fh = open(fifo, 'wb', 0 ) - fh.close() + # This is apparently needed to workaround an issue on Windows/macOS + # where the message cannot be read. (really?) + fh = open(fifo, 'wb', 0 ) + fh.close() #### def usage(): - print ( "Usage: %s <--extcap-interfaces | --extcap-dlts | --extcap-interface | --extcap-config | --capture | --extcap-capture-filter | --fifo>" % sys.argv[0] ) + print ( "Usage: %s <--extcap-interfaces | --extcap-dlts | --extcap-interface | --extcap-config | --capture | --extcap-capture-filter | --fifo>" % sys.argv[0] ) if __name__ == '__main__': - interface = "" - - # Capture options - delay = 0 - message = "" - fake_ip = "" - ts = 0 - - parser = ArgumentParser( - prog="Extcap Example", - description="Extcap example program for python" - ) - - # Extcap Arguments - parser.add_argument("--capture", help="Start the capture routine", action="store_true" ) - parser.add_argument("--extcap-interfaces", help="Provide a list of interfaces to capture from", action="store_true") - parser.add_argument("--extcap-interface", help="Provide the interface to capture from") - parser.add_argument("--extcap-dlts", help="Provide a list of dlts for the given interface", action="store_true") - parser.add_argument("--extcap-config", help="Provide a list of configurations for the given interface", action="store_true") - parser.add_argument("--extcap-capture-filter", help="Used together with capture to provide a capture filter") - parser.add_argument("--fifo", help="Use together with capture to provide the fifo to dump data to") - parser.add_argument("--extcap-control-in", help="Used to get control messages from toolbar") - parser.add_argument("--extcap-control-out", help="Used to send control messages to toolbar") - - # Interface Arguments - parser.add_argument("--verify", help="Demonstrates a verification bool flag", action="store_true" ) - parser.add_argument("--delay", help="Demonstrates an integer variable", type=int, default=0, choices=[0, 1, 2, 3, 4, 5, 6] ) - parser.add_argument("--remote", help="Demonstrates a selector choice", default="if1", choices=["if1", "if2"] ) - parser.add_argument("--message", help="Demonstrates string variable", nargs='?', default="" ) - parser.add_argument("--fake_ip", help="Add a fake sender IP adress", nargs='?', default="127.0.0.1" ) - parser.add_argument("--ts", help="Capture start time", action="store_true" ) - - try: - args, unknown = parser.parse_known_args() - except argparse.ArgumentError as exc: - print( "%s: %s" % ( exc.argument.dest, exc.message ), file=sys.stderr) - fifo_found = 0 - fifo = "" - for arg in sys.argv: - if (arg == "--fifo" or arg == "--extcap-fifo") : - fifo_found = 1 - elif ( fifo_found == 1 ): - fifo = arg - break - extcap_close_fifo(fifo) - sys.exit(ERROR_ARG) - - if ( len(sys.argv) <= 1 ): - parser.exit("No arguments given!") - - if ( args.extcap_interfaces == False and args.extcap_interface == None ): - parser.exit("An interface must be provided or the selection must be displayed") - if ( args.extcap_capture_filter and not args.capture ): - validate_capture_filter(args.extcap_capture_filter) - sys.exit(0) - - if ( args.extcap_interfaces == True or args.extcap_interface == None ): - extcap_interfaces() - sys.exit(0) - - if ( len(unknown) > 1 ): - print("Extcap Example %d unknown arguments given" % len(unknown) ) - - m = re.match ( 'example(\d+)', args.extcap_interface ) - if not m: - sys.exit(ERROR_INTERFACE) - interface = m.group(1) - - message = args.message - if ( args.message == None or len(args.message) == 0 ): - message = "Extcap Test" - - fake_ip = args.fake_ip - if ( args.fake_ip == None or len(args.fake_ip) < 7 or len(args.fake_ip.split('.')) != 4 ): - fake_ip = "127.0.0.1" - - ts = args.ts - - if args.extcap_config: - extcap_config(interface) - elif args.extcap_dlts: - extcap_dlts(interface) - elif args.capture: - if args.fifo is None: - sys.exit(ERROR_FIFO) - # The following code demonstrates error management with extcap - if args.delay > 5: - print("Value for delay [%d] too high" % args.delay, file=sys.stderr) - extcap_close_fifo(args.fifo) - sys.exit(ERROR_DELAY) - - try: - extcap_capture(interface, args.fifo, args.extcap_control_in, args.extcap_control_out, args.delay, args.verify, message, args.remote, fake_ip) - except KeyboardInterrupt: - pass - else: - usage() - sys.exit(ERROR_USAGE) + interface = "" + + # Capture options + delay = 0 + message = "" + fake_ip = "" + ts = 0 + + parser = ArgumentParser( + prog="Extcap Example", + description="Extcap example program for python" + ) + + # Extcap Arguments + parser.add_argument("--capture", help="Start the capture routine", action="store_true" ) + parser.add_argument("--extcap-interfaces", help="Provide a list of interfaces to capture from", action="store_true") + parser.add_argument("--extcap-interface", help="Provide the interface to capture from") + parser.add_argument("--extcap-dlts", help="Provide a list of dlts for the given interface", action="store_true") + parser.add_argument("--extcap-config", help="Provide a list of configurations for the given interface", action="store_true") + parser.add_argument("--extcap-capture-filter", help="Used together with capture to provide a capture filter") + parser.add_argument("--fifo", help="Use together with capture to provide the fifo to dump data to") + parser.add_argument("--extcap-control-in", help="Used to get control messages from toolbar") + parser.add_argument("--extcap-control-out", help="Used to send control messages to toolbar") + + # Interface Arguments + parser.add_argument("--verify", help="Demonstrates a verification bool flag", action="store_true" ) + parser.add_argument("--delay", help="Demonstrates an integer variable", type=int, default=0, choices=[0, 1, 2, 3, 4, 5, 6] ) + parser.add_argument("--remote", help="Demonstrates a selector choice", default="if1", choices=["if1", "if2"] ) + parser.add_argument("--message", help="Demonstrates string variable", nargs='?', default="" ) + parser.add_argument("--fake_ip", help="Add a fake sender IP adress", nargs='?', default="127.0.0.1" ) + parser.add_argument("--ts", help="Capture start time", action="store_true" ) + + try: + args, unknown = parser.parse_known_args() + except argparse.ArgumentError as exc: + print( "%s: %s" % ( exc.argument.dest, exc.message ), file=sys.stderr) + fifo_found = 0 + fifo = "" + for arg in sys.argv: + if (arg == "--fifo" or arg == "--extcap-fifo") : + fifo_found = 1 + elif ( fifo_found == 1 ): + fifo = arg + break + extcap_close_fifo(fifo) + sys.exit(ERROR_ARG) + + if ( len(sys.argv) <= 1 ): + parser.exit("No arguments given!") + + if ( args.extcap_interfaces == False and args.extcap_interface == None ): + parser.exit("An interface must be provided or the selection must be displayed") + if ( args.extcap_capture_filter and not args.capture ): + validate_capture_filter(args.extcap_capture_filter) + sys.exit(0) + + if ( args.extcap_interfaces == True or args.extcap_interface == None ): + extcap_interfaces() + sys.exit(0) + + if ( len(unknown) > 1 ): + print("Extcap Example %d unknown arguments given" % len(unknown) ) + + m = re.match ( 'example(\d+)', args.extcap_interface ) + if not m: + sys.exit(ERROR_INTERFACE) + interface = m.group(1) + + message = args.message + if ( args.message == None or len(args.message) == 0 ): + message = "Extcap Test" + + fake_ip = args.fake_ip + if ( args.fake_ip == None or len(args.fake_ip) < 7 or len(args.fake_ip.split('.')) != 4 ): + fake_ip = "127.0.0.1" + + ts = args.ts + + if args.extcap_config: + extcap_config(interface) + elif args.extcap_dlts: + extcap_dlts(interface) + elif args.capture: + if args.fifo is None: + sys.exit(ERROR_FIFO) + # The following code demonstrates error management with extcap + if args.delay > 5: + print("Value for delay [%d] too high" % args.delay, file=sys.stderr) + extcap_close_fifo(args.fifo) + sys.exit(ERROR_DELAY) + + try: + extcap_capture(interface, args.fifo, args.extcap_control_in, args.extcap_control_out, args.delay, args.verify, message, args.remote, fake_ip) + except KeyboardInterrupt: + pass + else: + usage() + sys.exit(ERROR_USAGE) |