aboutsummaryrefslogtreecommitdiffstats
path: root/doc
diff options
context:
space:
mode:
authorGerald Combs <gerald@wireshark.org>2018-04-04 11:56:06 -0700
committerGerald Combs <gerald@wireshark.org>2018-04-04 23:22:52 +0000
commita63542fe33541d1aa5eedd1eea4e986ffb0d4101 (patch)
treeae767086f9b825f973131872111d6f1224f79358 /doc
parent57fed5d17fa22340fde76cabe2b5340ad48a1989 (diff)
Fixup extcap_example.py
Run 2to3. Convert our payload to bytes. Switch from tab to 4-space indentation as described in PEP8, which fixes a "TabError" message. Change-Id: Ic20b6bc1fc1a945758c8be1bb54435f3326fb605 Reviewed-on: https://code.wireshark.org/review/26744 Reviewed-by: Gerald Combs <gerald@wireshark.org>
Diffstat (limited to 'doc')
-rwxr-xr-xdoc/extcap_example.py649
1 files changed, 326 insertions, 323 deletions
diff --git a/doc/extcap_example.py b/doc/extcap_example.py
index 6d1d1d32bc..6aeeb16159 100755
--- a/doc/extcap_example.py
+++ b/doc/extcap_example.py
@@ -29,7 +29,7 @@ other script-based formates beside VBScript
"""
-from __future__ import print_function
+
import os
import sys
import signal
@@ -77,28 +77,28 @@ button_disabled = False
This code has been taken from http://stackoverflow.com/questions/5943249/python-argparse-and-controlling-overriding-the-exit-status-code - originally developed by Rob Cowie http://stackoverflow.com/users/46690/rob-cowie
"""
class ArgumentParser(argparse.ArgumentParser):
- def _get_action_from_name(self, name):
- """Given a name, get the Action instance registered with this parser.
- If only it were made available in the ArgumentError object. It is
- passed as it's first arg...
- """
- container = self._actions
- if name is None:
- return None
- for action in container:
- if '/'.join(action.option_strings) == name:
- return action
- elif action.metavar == name:
- return action
- elif action.dest == name:
- return action
-
- def error(self, message):
- exc = sys.exc_info()[1]
- if exc:
- exc.argument = self._get_action_from_name(exc.argument_name)
- raise exc
- super(ArgumentParser, self).error(message)
+ def _get_action_from_name(self, name):
+ """Given a name, get the Action instance registered with this parser.
+ If only it were made available in the ArgumentError object. It is
+ passed as it's first arg...
+ """
+ container = self._actions
+ if name is None:
+ return None
+ for action in container:
+ if '/'.join(action.option_strings) == name:
+ return action
+ elif action.metavar == name:
+ return action
+ elif action.dest == name:
+ return action
+
+ def error(self, message):
+ exc = sys.exc_info()[1]
+ if exc:
+ exc.argument = self._get_action_from_name(exc.argument_name)
+ raise exc
+ super(ArgumentParser, self).error(message)
#### EXTCAP FUNCTIONALITY
@@ -108,67 +108,67 @@ interface in Wireshark to present a interface specific configuration for
this extcap plugin
"""
def extcap_config(interface):
- args = []
- values = []
+ args = []
+ values = []
- args.append ( (0, '--delay', 'Time delay', 'Time delay between packages', 'integer', '{range=1,15}{default=5}') )
- args.append ( (1, '--message', 'Message', 'Package message content', 'string', '{required=true}{placeholder=Please enter a message here ...}') )
- args.append ( (2, '--verify', 'Verify', 'Verify package content', 'boolflag', '{default=yes}') )
- args.append ( (3, '--remote', 'Remote Channel', 'Remote Channel Selector', 'selector', ''))
- args.append ( (4, '--fake_ip', 'Fake IP Address', 'Use this ip address as sender', 'string', '{save=false}{validation=\\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\b}'))
- args.append ( (5, '--ltest', 'Long Test', 'Long Test Value', 'long', '{default=123123123123123123}'))
- args.append ( (6, '--d1test', 'Double 1 Test', 'Long Test Value', 'double', '{default=123.456}'))
- args.append ( (7, '--d2test', 'Double 2 Test', 'Long Test Value', 'double', '{default= 123,456}'))
- args.append ( (8, '--password', 'Password', 'Package message password', 'password', '') )
- args.append ( (9, '--ts', 'Start Time', 'Capture start time', 'timestamp', '') )
- args.append ( (10, '--logfile', 'Log File Test', 'The Log File Test', 'fileselect', '') )
- args.append ( (11, '--radio', 'Radio Test', 'Radio Test Value', 'radio', '') )
- args.append ( (12, '--multi', 'MultiCheck Test', 'MultiCheck Test Value', 'multicheck', '') )
+ args.append ( (0, '--delay', 'Time delay', 'Time delay between packages', 'integer', '{range=1,15}{default=5}') )
+ args.append ( (1, '--message', 'Message', 'Package message content', 'string', '{required=true}{placeholder=Please enter a message here ...}') )
+ args.append ( (2, '--verify', 'Verify', 'Verify package content', 'boolflag', '{default=yes}') )
+ args.append ( (3, '--remote', 'Remote Channel', 'Remote Channel Selector', 'selector', ''))
+ args.append ( (4, '--fake_ip', 'Fake IP Address', 'Use this ip address as sender', 'string', '{save=false}{validation=\\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\b}'))
+ args.append ( (5, '--ltest', 'Long Test', 'Long Test Value', 'long', '{default=123123123123123123}'))
+ args.append ( (6, '--d1test', 'Double 1 Test', 'Long Test Value', 'double', '{default=123.456}'))
+ args.append ( (7, '--d2test', 'Double 2 Test', 'Long Test Value', 'double', '{default= 123,456}'))
+ args.append ( (8, '--password', 'Password', 'Package message password', 'password', '') )
+ args.append ( (9, '--ts', 'Start Time', 'Capture start time', 'timestamp', '') )
+ args.append ( (10, '--logfile', 'Log File Test', 'The Log File Test', 'fileselect', '') )
+ args.append ( (11, '--radio', 'Radio Test', 'Radio Test Value', 'radio', '') )
+ args.append ( (12, '--multi', 'MultiCheck Test', 'MultiCheck Test Value', 'multicheck', '') )
- values.append ( (3, "if1", "Remote1", "true" ) )
- values.append ( (3, "if2", "Remote2", "false" ) )
+ values.append ( (3, "if1", "Remote1", "true" ) )
+ values.append ( (3, "if2", "Remote2", "false" ) )
- values.append ( (11, "r1", "Radio1", "false" ) )
- values.append ( (11, "r2", "Radio2", "true" ) )
+ values.append ( (11, "r1", "Radio1", "false" ) )
+ values.append ( (11, "r2", "Radio2", "true" ) )
- values.append ( (12, "m1", "MultiCheck1", "false" ) )
- values.append ( (12, "m2", "MultiCheck2", "false" ) )
+ values.append ( (12, "m1", "MultiCheck1", "false" ) )
+ values.append ( (12, "m2", "MultiCheck2", "false" ) )
- for arg in args:
- print ("arg {number=%d}{call=%s}{display=%s}{tooltip=%s}{type=%s}%s" % arg)
+ for arg in args:
+ print ("arg {number=%d}{call=%s}{display=%s}{tooltip=%s}{type=%s}%s" % arg)
- for value in values:
- print ("value {arg=%d}{value=%s}{display=%s}{default=%s}" % value)
+ for value in values:
+ print ("value {arg=%d}{value=%s}{display=%s}{default=%s}" % value)
def extcap_interfaces():
- print ("extcap {version=1.0}{help=http://www.wireshark.org}{display=Example extcap interface}")
- print ("interface {value=example1}{display=Example interface 1 for extcap}")
- print ("interface {value=example2}{display=Example interface 2 for extcap}")
- print ("control {number=%d}{type=string}{display=Message}{tooltip=Package message content. Must start with a capital letter.}{placeholder=Enter package message content here ...}{validation=^[A-Z]+}" % CTRL_ARG_MESSAGE)
- print ("control {number=%d}{type=selector}{display=Time delay}{tooltip=Time delay between packages}" % CTRL_ARG_DELAY)
- print ("control {number=%d}{type=boolean}{display=Verify}{default=true}{tooltip=Verify package content}" % CTRL_ARG_VERIFY)
- print ("control {number=%d}{type=button}{display=Turn on}{tooltip=Turn on or off}" % CTRL_ARG_BUTTON)
- print ("control {number=%d}{type=button}{role=help}{display=Help}{tooltip=Show help}" % CTRL_ARG_HELP)
- print ("control {number=%d}{type=button}{role=restore}{display=Restore}{tooltip=Restore default values}" % CTRL_ARG_RESTORE)
- print ("control {number=%d}{type=button}{role=logger}{display=Log}{tooltip=Show capture log}" % CTRL_ARG_LOGGER)
- print ("value {control=%d}{value=1}{display=1}" % CTRL_ARG_DELAY)
- print ("value {control=%d}{value=2}{display=2}" % CTRL_ARG_DELAY)
- print ("value {control=%d}{value=3}{display=3}" % CTRL_ARG_DELAY)
- print ("value {control=%d}{value=4}{display=4}" % CTRL_ARG_DELAY)
- print ("value {control=%d}{value=5}{display=5}{default=true}" % CTRL_ARG_DELAY)
- print ("value {control=%d}{value=60}{display=60}" % CTRL_ARG_DELAY)
+ print ("extcap {version=1.0}{help=http://www.wireshark.org}{display=Example extcap interface}")
+ print ("interface {value=example1}{display=Example interface 1 for extcap}")
+ print ("interface {value=example2}{display=Example interface 2 for extcap}")
+ print ("control {number=%d}{type=string}{display=Message}{tooltip=Package message content. Must start with a capital letter.}{placeholder=Enter package message content here ...}{validation=^[A-Z]+}" % CTRL_ARG_MESSAGE)
+ print ("control {number=%d}{type=selector}{display=Time delay}{tooltip=Time delay between packages}" % CTRL_ARG_DELAY)
+ print ("control {number=%d}{type=boolean}{display=Verify}{default=true}{tooltip=Verify package content}" % CTRL_ARG_VERIFY)
+ print ("control {number=%d}{type=button}{display=Turn on}{tooltip=Turn on or off}" % CTRL_ARG_BUTTON)
+ print ("control {number=%d}{type=button}{role=help}{display=Help}{tooltip=Show help}" % CTRL_ARG_HELP)
+ print ("control {number=%d}{type=button}{role=restore}{display=Restore}{tooltip=Restore default values}" % CTRL_ARG_RESTORE)
+ print ("control {number=%d}{type=button}{role=logger}{display=Log}{tooltip=Show capture log}" % CTRL_ARG_LOGGER)
+ print ("value {control=%d}{value=1}{display=1}" % CTRL_ARG_DELAY)
+ print ("value {control=%d}{value=2}{display=2}" % CTRL_ARG_DELAY)
+ print ("value {control=%d}{value=3}{display=3}" % CTRL_ARG_DELAY)
+ print ("value {control=%d}{value=4}{display=4}" % CTRL_ARG_DELAY)
+ print ("value {control=%d}{value=5}{display=5}{default=true}" % CTRL_ARG_DELAY)
+ print ("value {control=%d}{value=60}{display=60}" % CTRL_ARG_DELAY)
def extcap_dlts(interface):
- if ( interface == '1' ):
- print ("dlt {number=147}{name=USER0}{display=Demo Implementation for Extcap}")
- elif ( interface == '2' ):
- print ("dlt {number=148}{name=USER1}{display=Demo Implementation for Extcap}")
+ if ( interface == '1' ):
+ print ("dlt {number=147}{name=USER0}{display=Demo Implementation for Extcap}")
+ elif ( interface == '2' ):
+ print ("dlt {number=148}{name=USER1}{display=Demo Implementation for Extcap}")
def validate_capture_filter(capture_filter):
- if capture_filter != "filter" and capture_filter != "valid":
- print("Illegal capture filter")
+ if capture_filter != "filter" and capture_filter != "valid":
+ print("Illegal capture filter")
"""
@@ -185,293 +185,296 @@ Extcap capture routine
"""
def unsigned(n):
- return int(n) & 0xFFFFFFFF
+ return int(n) & 0xFFFFFFFF
def pcap_fake_header():
- header = bytearray()
- header += struct.pack('<L', int ('a1b2c3d4', 16 ))
- header += struct.pack('<H', unsigned(2) ) # Pcap Major Version
- header += struct.pack('<H', unsigned(4) ) # Pcap Minor Version
- header += struct.pack('<I', int(0)) # Timezone
- header += struct.pack('<I', int(0)) # Accurancy of timestamps
- header += struct.pack('<L', int ('0000ffff', 16 )) # Max Length of capture frame
- header += struct.pack('<L', unsigned(1)) # Ethernet
- return header
+ header = bytearray()
+ header += struct.pack('<L', int ('a1b2c3d4', 16 ))
+ header += struct.pack('<H', unsigned(2) ) # Pcap Major Version
+ header += struct.pack('<H', unsigned(4) ) # Pcap Minor Version
+ header += struct.pack('<I', int(0)) # Timezone
+ header += struct.pack('<I', int(0)) # Accurancy of timestamps
+ header += struct.pack('<L', int ('0000ffff', 16 )) # Max Length of capture frame
+ header += struct.pack('<L', unsigned(1)) # Ethernet
+ return header
# Calculates and returns the IP checksum based on the given IP Header
def ip_checksum(iph):
- #split into bytes
- words = splitN(''.join(iph.split()),4)
- csum = 0;
- for word in words:
- csum += int(word, base=16)
- csum += (csum >> 16)
- csum = csum & 0xFFFF ^ 0xFFFF
- return csum
+ #split into bytes
+ words = splitN(''.join(iph.split()),4)
+ csum = 0;
+ for word in words:
+ csum += int(word, base=16)
+ csum += (csum >> 16)
+ csum = csum & 0xFFFF ^ 0xFFFF
+ return csum
def pcap_fake_package ( message, fake_ip ):
- pcap = bytearray()
- #length = 14 bytes [ eth ] + 20 bytes [ ip ] + messagelength
+ pcap = bytearray()
+ #length = 14 bytes [ eth ] + 20 bytes [ ip ] + messagelength
- caplength = len(message) + 14 + 20
- timestamp = int(time.time())
+ caplength = len(message) + 14 + 20
+ timestamp = int(time.time())
- pcap += struct.pack('<L', unsigned(timestamp ) ) # timestamp seconds
- pcap += struct.pack('<L', 0x00 ) # timestamp nanoseconds
- pcap += struct.pack('<L', unsigned(caplength ) ) # length captured
- pcap += struct.pack('<L', unsigned(caplength ) ) # length in frame
+ pcap += struct.pack('<L', unsigned(timestamp ) ) # timestamp seconds
+ pcap += struct.pack('<L', 0x00 ) # timestamp nanoseconds
+ pcap += struct.pack('<L', unsigned(caplength ) ) # length captured
+ pcap += struct.pack('<L', unsigned(caplength ) ) # length in frame
# ETH
- pcap += struct.pack('h', 0 ) # source mac
- pcap += struct.pack('h', 0 ) # source mac
- pcap += struct.pack('h', 0 ) # source mac
- pcap += struct.pack('h', 0 ) # dest mac
- pcap += struct.pack('h', 0 ) # dest mac
- pcap += struct.pack('h', 0 ) # dest mac
- pcap += struct.pack('<h', unsigned(8 )) # protocol (ip)
+ pcap += struct.pack('h', 0 ) # source mac
+ pcap += struct.pack('h', 0 ) # source mac
+ pcap += struct.pack('h', 0 ) # source mac
+ pcap += struct.pack('h', 0 ) # dest mac
+ pcap += struct.pack('h', 0 ) # dest mac
+ pcap += struct.pack('h', 0 ) # dest mac
+ pcap += struct.pack('<h', unsigned(8 )) # protocol (ip)
# IP
- pcap += struct.pack('b', int ( '45', 16 )) # IP version
- pcap += struct.pack('b', int ( '0', 16 )) #
- pcap += struct.pack('>H', unsigned(len(message)+20) ) # length of data + payload
- pcap += struct.pack('<H', int ( '0', 16 )) # Identification
- pcap += struct.pack('b', int ( '40', 16 )) # Don't fragment
- pcap += struct.pack('b', int ( '0', 16 )) # Fragment Offset
- pcap += struct.pack('b', int ( '40', 16 ))
- pcap += struct.pack('B', 0xFE ) # Protocol (2 = unspecified)
- pcap += struct.pack('<H', int ( '0000', 16 )) # Checksum
-
- parts = fake_ip.split('.')
- ipadr = (int(parts[0]) << 24) + (int(parts[1]) << 16) + (int(parts[2]) << 8) + int(parts[3])
- pcap += struct.pack('>L', ipadr ) # Source IP
- pcap += struct.pack('>L', int ( '7F000001', 16 )) # Dest IP
-
- pcap += message
- return pcap
+ pcap += struct.pack('b', int ( '45', 16 )) # IP version
+ pcap += struct.pack('b', int ( '0', 16 )) #
+ pcap += struct.pack('>H', unsigned(len(message)+20) ) # length of data + payload
+ pcap += struct.pack('<H', int ( '0', 16 )) # Identification
+ pcap += struct.pack('b', int ( '40', 16 )) # Don't fragment
+ pcap += struct.pack('b', int ( '0', 16 )) # Fragment Offset
+ pcap += struct.pack('b', int ( '40', 16 ))
+ pcap += struct.pack('B', 0xFE ) # Protocol (2 = unspecified)
+ pcap += struct.pack('<H', int ( '0000', 16 )) # Checksum
+
+ parts = fake_ip.split('.')
+ ipadr = (int(parts[0]) << 24) + (int(parts[1]) << 16) + (int(parts[2]) << 8) + int(parts[3])
+ pcap += struct.pack('>L', ipadr ) # Source IP
+ pcap += struct.pack('>L', int ( '7F000001', 16 )) # Dest IP
+
+ pcap += message
+ return pcap
def control_read(fn):
- try:
- header = fn.read(6)
- sp, _, length, arg, typ = struct.unpack('>sBHBB', header)
- if length > 2:
- payload = fn.read(length - 2)
- else:
- payload = ''
- return arg, typ, payload
- except:
- return None, None, None
+ try:
+ header = fn.read(6)
+ sp, _, length, arg, typ = struct.unpack('>sBHBB', header)
+ if length > 2:
+ payload = fn.read(length - 2)
+ else:
+ payload = ''
+ return arg, typ, payload
+ except:
+ return None, None, None
def control_read_thread(control_in, fn_out):
- global initialized, message, delay, verify, button, button_disabled
- with open(control_in, 'rb', 0 ) as fn:
- arg = 0
- while arg != None:
- arg, typ, payload = control_read(fn)
- log = ''
- if typ == CTRL_CMD_INITIALIZED:
- initialized = True
- elif arg == CTRL_ARG_MESSAGE:
- message = payload
- log = "Message = " + payload
- elif arg == CTRL_ARG_DELAY:
- delay = float(payload)
- log = "Time delay = " + payload
- elif arg == CTRL_ARG_VERIFY:
- # Only read this after initialized
- if initialized:
- verify = (payload[0] != '\0')
- log = "Verify = " + str(verify)
- control_write(fn_out, CTRL_ARG_NONE, CTRL_CMD_STATUSBAR, "Verify changed")
- elif arg == CTRL_ARG_BUTTON:
- control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_DISABLE, "")
- button_disabled = True
- if button == True:
- control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_SET, "Turn on")
- button = False
- log = "Button turned off"
- else:
- control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_SET, "Turn off")
- button = True
- log = "Button turned on"
-
- if len(log) > 0:
- control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_ADD, log + "\n")
+ global initialized, message, delay, verify, button, button_disabled
+ with open(control_in, 'rb', 0 ) as fn:
+ arg = 0
+ while arg != None:
+ arg, typ, payload = control_read(fn)
+ log = ''
+ if typ == CTRL_CMD_INITIALIZED:
+ initialized = True
+ elif arg == CTRL_ARG_MESSAGE:
+ message = payload
+ log = "Message = " + payload
+ elif arg == CTRL_ARG_DELAY:
+ delay = float(payload)
+ log = "Time delay = " + payload
+ elif arg == CTRL_ARG_VERIFY:
+ # Only read this after initialized
+ if initialized:
+ verify = (payload[0] != '\0')
+ log = "Verify = " + str(verify)
+ control_write(fn_out, CTRL_ARG_NONE, CTRL_CMD_STATUSBAR, "Verify changed")
+ elif arg == CTRL_ARG_BUTTON:
+ control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_DISABLE, "")
+ button_disabled = True
+ if button == True:
+ control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_SET, "Turn on")
+ button = False
+ log = "Button turned off"
+ else:
+ control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_SET, "Turn off")
+ button = True
+ log = "Button turned on"
+
+ if len(log) > 0:
+ control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_ADD, log + "\n")
def control_write(fn, arg, typ, payload):
- packet = bytearray()
- packet += struct.pack('>sBHBB', 'T', 0, len(payload) + 2, arg, typ)
- packet += payload
- fn.write(packet)
+ packet = bytearray()
+ packet += struct.pack('>sBHBB', b'T', 0, len(payload) + 2, arg, typ)
+ if sys.version_info[0] >= 3 and isinstance(payload, str):
+ packet += payload.encode('utf-8')
+ else:
+ packet += payload
+ fn.write(packet)
def control_write_defaults(fn_out):
- global initialized, message, delay, verify
+ global initialized, message, delay, verify
- while not initialized:
- time.sleep(.1) # Wait for initial control values
+ while not initialized:
+ time.sleep(.1) # Wait for initial control values
- # Write startup configuration to Toolbar controls
- control_write(fn_out, CTRL_ARG_MESSAGE, CTRL_CMD_SET, message)
- control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_SET, str(int(delay)))
- control_write(fn_out, CTRL_ARG_VERIFY, CTRL_CMD_SET, struct.pack('B', verify))
+ # Write startup configuration to Toolbar controls
+ control_write(fn_out, CTRL_ARG_MESSAGE, CTRL_CMD_SET, message)
+ control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_SET, str(int(delay)))
+ control_write(fn_out, CTRL_ARG_VERIFY, CTRL_CMD_SET, struct.pack('B', verify))
- for i in range(1,16):
- item = bytearray()
- item += str(i) + struct.pack('B', 0) + str(i) + " sec"
- control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_ADD, item)
+ for i in range(1,16):
+ item = bytearray()
+ item += str(i) + struct.pack('B', 0) + str(i) + " sec"
+ control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_ADD, item)
- control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_REMOVE, str(60))
+ control_write(fn_out, CTRL_ARG_DELAY, CTRL_CMD_REMOVE, str(60))
def extcap_capture(interface, fifo, control_in, control_out, in_delay, in_verify, in_message, remote, fake_ip):
- global message, delay, verify, button_disabled
- delay = in_delay if in_delay != 0 else 5
- message = in_message
- verify = in_verify
- counter = 1
- fn_out = None
-
- with open(fifo, 'wb', 0 ) as fh:
- fh.write (pcap_fake_header())
-
- if control_out != None:
- fn_out = open(control_out, 'wb', 0)
- control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_SET, "Log started at " + time.strftime("%c") + "\n")
-
- if control_in != None:
- # Start reading thread
- thread = Thread(target = control_read_thread, args = (control_in, fn_out))
- thread.start()
-
- if fn_out != None:
- control_write_defaults(fn_out)
-
- while True:
- if fn_out != None:
- log = "Received packet #" + str(counter) + "\n"
- control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_ADD, log)
- counter = counter + 1
-
- if button_disabled == True:
- control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_ENABLE, "")
- control_write(fn_out, CTRL_ARG_NONE, CTRL_CMD_INFORMATION, "Turn action finished.")
- button_disabled = False
-
- out = ("%s|%04X%s|%s" % ( remote.strip(), len(message), message, verify )).encode("utf8")
- fh.write (pcap_fake_package(out, fake_ip))
- time.sleep(delay)
-
- thread.join()
- if fn_out != None:
- fn_out.close()
+ global message, delay, verify, button_disabled
+ delay = in_delay if in_delay != 0 else 5
+ message = in_message
+ verify = in_verify
+ counter = 1
+ fn_out = None
+
+ with open(fifo, 'wb', 0 ) as fh:
+ fh.write (pcap_fake_header())
+
+ if control_out != None:
+ fn_out = open(control_out, 'wb', 0)
+ control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_SET, "Log started at " + time.strftime("%c") + "\n")
+
+ if control_in != None:
+ # Start reading thread
+ thread = Thread(target = control_read_thread, args = (control_in, fn_out))
+ thread.start()
+
+ if fn_out != None:
+ control_write_defaults(fn_out)
+
+ while True:
+ if fn_out != None:
+ log = "Received packet #" + str(counter) + "\n"
+ control_write(fn_out, CTRL_ARG_LOGGER, CTRL_CMD_ADD, log)
+ counter = counter + 1
+
+ if button_disabled == True:
+ control_write(fn_out, CTRL_ARG_BUTTON, CTRL_CMD_ENABLE, "")
+ control_write(fn_out, CTRL_ARG_NONE, CTRL_CMD_INFORMATION, "Turn action finished.")
+ button_disabled = False
+
+ out = ("%s|%04X%s|%s" % ( remote.strip(), len(message), message, verify )).encode("utf8")
+ fh.write (pcap_fake_package(out, fake_ip))
+ time.sleep(delay)
+
+ thread.join()
+ if fn_out != None:
+ fn_out.close()
def extcap_close_fifo(fifo):
- # This is apparently needed to workaround an issue on Windows/macOS
- # where the message cannot be read. (really?)
- fh = open(fifo, 'wb', 0 )
- fh.close()
+ # This is apparently needed to workaround an issue on Windows/macOS
+ # where the message cannot be read. (really?)
+ fh = open(fifo, 'wb', 0 )
+ fh.close()
####
def usage():
- print ( "Usage: %s <--extcap-interfaces | --extcap-dlts | --extcap-interface | --extcap-config | --capture | --extcap-capture-filter | --fifo>" % sys.argv[0] )
+ print ( "Usage: %s <--extcap-interfaces | --extcap-dlts | --extcap-interface | --extcap-config | --capture | --extcap-capture-filter | --fifo>" % sys.argv[0] )
if __name__ == '__main__':
- interface = ""
-
- # Capture options
- delay = 0
- message = ""
- fake_ip = ""
- ts = 0
-
- parser = ArgumentParser(
- prog="Extcap Example",
- description="Extcap example program for python"
- )
-
- # Extcap Arguments
- parser.add_argument("--capture", help="Start the capture routine", action="store_true" )
- parser.add_argument("--extcap-interfaces", help="Provide a list of interfaces to capture from", action="store_true")
- parser.add_argument("--extcap-interface", help="Provide the interface to capture from")
- parser.add_argument("--extcap-dlts", help="Provide a list of dlts for the given interface", action="store_true")
- parser.add_argument("--extcap-config", help="Provide a list of configurations for the given interface", action="store_true")
- parser.add_argument("--extcap-capture-filter", help="Used together with capture to provide a capture filter")
- parser.add_argument("--fifo", help="Use together with capture to provide the fifo to dump data to")
- parser.add_argument("--extcap-control-in", help="Used to get control messages from toolbar")
- parser.add_argument("--extcap-control-out", help="Used to send control messages to toolbar")
-
- # Interface Arguments
- parser.add_argument("--verify", help="Demonstrates a verification bool flag", action="store_true" )
- parser.add_argument("--delay", help="Demonstrates an integer variable", type=int, default=0, choices=[0, 1, 2, 3, 4, 5, 6] )
- parser.add_argument("--remote", help="Demonstrates a selector choice", default="if1", choices=["if1", "if2"] )
- parser.add_argument("--message", help="Demonstrates string variable", nargs='?', default="" )
- parser.add_argument("--fake_ip", help="Add a fake sender IP adress", nargs='?', default="127.0.0.1" )
- parser.add_argument("--ts", help="Capture start time", action="store_true" )
-
- try:
- args, unknown = parser.parse_known_args()
- except argparse.ArgumentError as exc:
- print( "%s: %s" % ( exc.argument.dest, exc.message ), file=sys.stderr)
- fifo_found = 0
- fifo = ""
- for arg in sys.argv:
- if (arg == "--fifo" or arg == "--extcap-fifo") :
- fifo_found = 1
- elif ( fifo_found == 1 ):
- fifo = arg
- break
- extcap_close_fifo(fifo)
- sys.exit(ERROR_ARG)
-
- if ( len(sys.argv) <= 1 ):
- parser.exit("No arguments given!")
-
- if ( args.extcap_interfaces == False and args.extcap_interface == None ):
- parser.exit("An interface must be provided or the selection must be displayed")
- if ( args.extcap_capture_filter and not args.capture ):
- validate_capture_filter(args.extcap_capture_filter)
- sys.exit(0)
-
- if ( args.extcap_interfaces == True or args.extcap_interface == None ):
- extcap_interfaces()
- sys.exit(0)
-
- if ( len(unknown) > 1 ):
- print("Extcap Example %d unknown arguments given" % len(unknown) )
-
- m = re.match ( 'example(\d+)', args.extcap_interface )
- if not m:
- sys.exit(ERROR_INTERFACE)
- interface = m.group(1)
-
- message = args.message
- if ( args.message == None or len(args.message) == 0 ):
- message = "Extcap Test"
-
- fake_ip = args.fake_ip
- if ( args.fake_ip == None or len(args.fake_ip) < 7 or len(args.fake_ip.split('.')) != 4 ):
- fake_ip = "127.0.0.1"
-
- ts = args.ts
-
- if args.extcap_config:
- extcap_config(interface)
- elif args.extcap_dlts:
- extcap_dlts(interface)
- elif args.capture:
- if args.fifo is None:
- sys.exit(ERROR_FIFO)
- # The following code demonstrates error management with extcap
- if args.delay > 5:
- print("Value for delay [%d] too high" % args.delay, file=sys.stderr)
- extcap_close_fifo(args.fifo)
- sys.exit(ERROR_DELAY)
-
- try:
- extcap_capture(interface, args.fifo, args.extcap_control_in, args.extcap_control_out, args.delay, args.verify, message, args.remote, fake_ip)
- except KeyboardInterrupt:
- pass
- else:
- usage()
- sys.exit(ERROR_USAGE)
+ interface = ""
+
+ # Capture options
+ delay = 0
+ message = ""
+ fake_ip = ""
+ ts = 0
+
+ parser = ArgumentParser(
+ prog="Extcap Example",
+ description="Extcap example program for python"
+ )
+
+ # Extcap Arguments
+ parser.add_argument("--capture", help="Start the capture routine", action="store_true" )
+ parser.add_argument("--extcap-interfaces", help="Provide a list of interfaces to capture from", action="store_true")
+ parser.add_argument("--extcap-interface", help="Provide the interface to capture from")
+ parser.add_argument("--extcap-dlts", help="Provide a list of dlts for the given interface", action="store_true")
+ parser.add_argument("--extcap-config", help="Provide a list of configurations for the given interface", action="store_true")
+ parser.add_argument("--extcap-capture-filter", help="Used together with capture to provide a capture filter")
+ parser.add_argument("--fifo", help="Use together with capture to provide the fifo to dump data to")
+ parser.add_argument("--extcap-control-in", help="Used to get control messages from toolbar")
+ parser.add_argument("--extcap-control-out", help="Used to send control messages to toolbar")
+
+ # Interface Arguments
+ parser.add_argument("--verify", help="Demonstrates a verification bool flag", action="store_true" )
+ parser.add_argument("--delay", help="Demonstrates an integer variable", type=int, default=0, choices=[0, 1, 2, 3, 4, 5, 6] )
+ parser.add_argument("--remote", help="Demonstrates a selector choice", default="if1", choices=["if1", "if2"] )
+ parser.add_argument("--message", help="Demonstrates string variable", nargs='?', default="" )
+ parser.add_argument("--fake_ip", help="Add a fake sender IP adress", nargs='?', default="127.0.0.1" )
+ parser.add_argument("--ts", help="Capture start time", action="store_true" )
+
+ try:
+ args, unknown = parser.parse_known_args()
+ except argparse.ArgumentError as exc:
+ print( "%s: %s" % ( exc.argument.dest, exc.message ), file=sys.stderr)
+ fifo_found = 0
+ fifo = ""
+ for arg in sys.argv:
+ if (arg == "--fifo" or arg == "--extcap-fifo") :
+ fifo_found = 1
+ elif ( fifo_found == 1 ):
+ fifo = arg
+ break
+ extcap_close_fifo(fifo)
+ sys.exit(ERROR_ARG)
+
+ if ( len(sys.argv) <= 1 ):
+ parser.exit("No arguments given!")
+
+ if ( args.extcap_interfaces == False and args.extcap_interface == None ):
+ parser.exit("An interface must be provided or the selection must be displayed")
+ if ( args.extcap_capture_filter and not args.capture ):
+ validate_capture_filter(args.extcap_capture_filter)
+ sys.exit(0)
+
+ if ( args.extcap_interfaces == True or args.extcap_interface == None ):
+ extcap_interfaces()
+ sys.exit(0)
+
+ if ( len(unknown) > 1 ):
+ print("Extcap Example %d unknown arguments given" % len(unknown) )
+
+ m = re.match ( 'example(\d+)', args.extcap_interface )
+ if not m:
+ sys.exit(ERROR_INTERFACE)
+ interface = m.group(1)
+
+ message = args.message
+ if ( args.message == None or len(args.message) == 0 ):
+ message = "Extcap Test"
+
+ fake_ip = args.fake_ip
+ if ( args.fake_ip == None or len(args.fake_ip) < 7 or len(args.fake_ip.split('.')) != 4 ):
+ fake_ip = "127.0.0.1"
+
+ ts = args.ts
+
+ if args.extcap_config:
+ extcap_config(interface)
+ elif args.extcap_dlts:
+ extcap_dlts(interface)
+ elif args.capture:
+ if args.fifo is None:
+ sys.exit(ERROR_FIFO)
+ # The following code demonstrates error management with extcap
+ if args.delay > 5:
+ print("Value for delay [%d] too high" % args.delay, file=sys.stderr)
+ extcap_close_fifo(args.fifo)
+ sys.exit(ERROR_DELAY)
+
+ try:
+ extcap_capture(interface, args.fifo, args.extcap_control_in, args.extcap_control_out, args.delay, args.verify, message, args.remote, fake_ip)
+ except KeyboardInterrupt:
+ pass
+ else:
+ usage()
+ sys.exit(ERROR_USAGE)