aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNeels Hofmeyr <neels@hofmeyr.de>2019-12-03 15:38:31 +0100
committerNeels Hofmeyr <nhofmeyr@sysmocom.de>2023-04-15 00:06:13 +0200
commitddf2d59a62e317b1f94a96e62a174e9b50b81f91 (patch)
tree69d78e323a78a5c8e5d543ac4d188c4f8a238e03
parent46d1b84c3f321fecc7c762acb19101623c7d392e (diff)
add utils/osmo-gsm-sharkneels/sock
-rw-r--r--debian/libosmocore-utils.install1
-rw-r--r--utils/Makefile.am1
-rwxr-xr-xutils/osmo-gsm-shark3594
3 files changed, 3596 insertions, 0 deletions
diff --git a/debian/libosmocore-utils.install b/debian/libosmocore-utils.install
index 0e411978..7245db9c 100644
--- a/debian/libosmocore-utils.install
+++ b/debian/libosmocore-utils.install
@@ -3,3 +3,4 @@ usr/bin/osmo-auc-gen
usr/bin/osmo-aka-verify
usr/bin/osmo-config-merge
usr/bin/osmo-gsmtap-logsend
+usr/bin/osmo-gsm-shark
diff --git a/utils/Makefile.am b/utils/Makefile.am
index 3ec71ea3..af0b5ce1 100644
--- a/utils/Makefile.am
+++ b/utils/Makefile.am
@@ -9,6 +9,7 @@ if ENABLE_UTILITIES
EXTRA_DIST = conv_gen.py conv_codes_gsm.py
bin_PROGRAMS += osmo-arfcn osmo-auc-gen osmo-config-merge osmo-aka-verify osmo-gsmtap-logsend
+bin_SCRIPTS = osmo-gsm-shark
osmo_arfcn_SOURCES = osmo-arfcn.c
diff --git a/utils/osmo-gsm-shark b/utils/osmo-gsm-shark
new file mode 100755
index 00000000..abfd4e42
--- /dev/null
+++ b/utils/osmo-gsm-shark
@@ -0,0 +1,3594 @@
+#!/usr/bin/env python3
+
+# osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber.
+# Copyright (C) 2019 by Neels Hofmeyr <neels@hofmeyr.de>
+#
+# All Rights Reserved
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+'''osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber.
+
+Copyright (C) 2019 by Neels Hofmeyr <neels@hofmeyr.de>
+SPDX-License-Identifier: GPL-2.0+
+
+This tool uses tshark (pyshark) to analyze a pcap file or a live network capture to:
+
+- Associate almost all messages with a subscriber. It is possible to filter by subscriber.
+- Separate the different network elements (BSC, MSC, hNodeB, ...).
+- Output a ladder diagram.
+- Combine repetitive messages.
+- Combine/abstract messages into short activity summary.
+
+Examples:
+
+ osmo-gsm-shark -f trace.pcapng
+ osmo-gsm-shark -l any
+
+ osmo-gsm-shark -l any --filter-subscr 901701234567123
+ osmo-gsm-shark -f trace.pcapng --filter-msg dtap
+'''
+
+import collections
+import pyshark
+import re
+import sys
+import types
+import time
+import traceback
+
+SHOW_ALL_DEBUG = False
+
+SHOW_ALL_LAYERS = False
+SCCP_COLLAPSE_STP = True
+IUH_COLLAPSE_HNBGW = True # doesnt work
+
+DTAP_COMPL_L3 = ('Location-Updating-Request', 'CM-Service-Request', 'Paging-Response', 'IMSI-Detach-Indication')
+GMM_COMPL_L3 = ('Attach-Request', 'Detach-Request')
+
+class Color:
+ codes = (
+ ('red', '\033[1;31m'),
+ ('green', '\033[1;32m'),
+ ('yellow', '\033[1;33m'),
+ ('blue', '\033[1;34m'),
+ ('purple', '\033[1;35m'),
+ ('cyan', '\033[1;36m'),
+ ('darkred', '\033[31m'),
+ ('darkgreen', '\033[32m'),
+ ('darkyellow', '\033[33m'),
+ ('darkblue', '\033[34m'),
+ ('darkpurple', '\033[35m'),
+ ('darkcyan', '\033[36m'),
+ ('darkgrey', '\033[1;30m'),
+ ('brightwhite', '\033[1;37m'),
+ )
+ codes_dict = dict(codes)
+ end = '\033[0;m'
+
+ def colored(code, text):
+ if type(code) is int:
+ code = Color.codes[code % len(Color.codes)][1]
+ else:
+ code = Color.codes_dict[code]
+ return f'{code}{text}{Color.end}'
+
+
+def set_instance_vars_from_args(*ignore, self='s'):
+ f = sys._getframe(1).f_locals
+ s = f.get(self)
+ for k,v in f.items():
+ if v is s:
+ continue
+ if k in ignore:
+ continue
+ setattr(s, k, v)
+
+def same_nonempty(a, b):
+ if isinstance(a, types.GeneratorType):
+ return list(a) == list(b)
+ return a and a == b
+
+def str_drop(a_str, drop_str):
+ if a_str and a_str.startswith(drop_str):
+ return a_str[len(drop_str):]
+ return a_str
+
+def sane_msgtype(msgtype):
+ if not msgtype:
+ return msgtype
+ return sane_showname(msgtype).replace(' ','-')
+
+re_msgtype_label = re.compile('message.type *', re.I)
+def sane_showname(showname):
+ if not showname:
+ return showname
+ if ': ' in showname:
+ showname = showname[showname.index(': ')+1:]
+ if '(' in showname:
+ showname = showname[:showname.index('(')]
+ showname = re_msgtype_label.sub('', showname)
+ return showname.strip()
+
+def dir_vals(elem):
+ strs = []
+ for name in dir(elem):
+ if name.startswith('_'):
+ continue
+ try:
+ strs.append('%r=%r' % (name, getattr(elem, name)))
+ except:
+ strs.append('%r=<Exception>' % (name))
+ return '\n' + '\n'.join(strs)
+
+def dir_p(p, name):
+ return f'=== {name}\n{dir_vals(p.get(name))}\n---{name}'
+
+def str_to_int(nr_str):
+ 'convert hex or decimal string to int'
+ if not nr_str:
+ return None
+ elif nr_str.startswith('0x'):
+ return int(nr_str, 16)
+ else:
+ return int(nr_str, 10)
+
+def out_text(*args, **kwargs):
+ print(*args, **kwargs)
+
+g_ui = None
+g_current_msg = None
+g_debug_full = False
+
+def to_text(*args, **kwargs):
+ if kwargs:
+ args = list(args) + [repr(kwargs)]
+ return ' '.join(str(arg) for arg in args)
+
+def out_text_now(*args, **kwargs):
+ if g_ui is not None:
+ g_ui.out_text_now(*args, **kwargs)
+ else:
+ print(to_text(*args, **kwargs))
+
+def LOG(*args, **kwargs):
+ if g_current_msg is not None:
+ g_current_msg.log(*args, **kwargs)
+ else:
+ out_text_now(*args, **kwargs)
+
+def DBG(*args, **kwargs):
+ if g_current_msg is not None:
+ g_current_msg.dbg(*args, **kwargs)
+ else:
+ out_text_now(*args, **kwargs)
+
+def ERR(*args, **kwargs):
+ LOG(Color.colored('red', '***** ERROR:'), *args, **kwargs)
+
+
+def trace():
+ return ''.join(traceback.format_stack())
+
+def because_str(because):
+ if not because:
+ return '-'
+ t = ['BECAUSE']
+ for b in because:
+ if isinstance(b, tuple) or isinstance(b, list):
+ t.extend(because_str(b).splitlines())
+ elif isinstance(b, str):
+ t.append(b)
+ elif isinstance(b, Message):
+ t.append(b.str(show_traits=True, show_conns=True))
+ else:
+ t.append(str(b))
+ return '\n|'.join(t)
+
+def out_error(*args, **kwargs):
+ if g_ui is not None:
+ g_ui.out_error(*args, **kwargs)
+ else:
+ out_text_now(Color.colored('red', '*** ERROR:'), *args, **kwargs)
+ if g_current_msg:
+ out_text_now(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(show_traits=True, show_conns=True))
+ out_text_now(trace())
+
+def tmsi_standardize(tmsi):
+ try:
+ return format(int(tmsi), '08x')
+ except:
+ return None
+
+# a dict containing a list for each key; l.add(name, item) adds item to the list at key=name.
+class listdict(dict):
+ '''A dict where each entry is a list of items'''
+ def _have(ld, name):
+ l = ld.get(name)
+ if not l:
+ l = []
+ ld[name] = l
+ return l
+
+ def add(ld, name, item):
+ l = ld._have(name)
+ l.append(item)
+ return ld
+
+ def add_dict(ld, d):
+ for k,v in d.items():
+ ld.add(k, v)
+
+ def update(ld, other_ld):
+ for name, items in other_ld.items():
+ ld.extend(name, items)
+ return ld
+
+ def extend(ld, name, vals):
+ l = ld._have(name)
+ l.extend(vals)
+ return ld
+
+ def remove(ld, name, item):
+ l = ld.get(name)
+ if item in l:
+ l.remove(item)
+ def _have(ld, name):
+ l = ld.get(name)
+ if not l:
+ l = []
+ ld[name] = l
+ return l
+
+ def add(ld, name, item):
+ l = ld._have(name)
+ l.append(item)
+ return ld
+
+ def add_dict(ld, d):
+ for k,v in d.items():
+ ld.add(k, v)
+
+ def update(ld, other_ld):
+ for name, items in other_ld.items():
+ ld.extend(name, items)
+ return ld
+
+ def extend(ld, name, vals):
+ l = ld._have(name)
+ l.extend(vals)
+ return ld
+
+ def remove(ld, name, item):
+ l = ld.get(name)
+ if item in l:
+ l.remove(item)
+
+class UniqueList(list):
+ def append(s, item):
+ if item in s or item is None:
+ return False
+ super().append(item)
+ return True
+
+ def insert(s, idx, item):
+ if item in s or item is None:
+ return False
+ super().insert(idx, item)
+ return True
+
+ def extend(s, items):
+ added = 0
+ for item in items:
+ if s.append(item):
+ added += 1
+ return added
+
+class NamedIds(dict):
+ def __init__(s, start_id:int=1):
+ set_instance_vars_from_args()
+
+ def next(s, name):
+ next_id = s.get(name, s.start_id)
+ s[name] = next_id + 1
+ return next_id
+
+class Packet:
+ def __init__(s, idx, cap_p):
+ set_instance_vars_from_args()
+ # sanitize impossible attr with dot in its name,
+ # seen gsm_a.bssmap and gsm_a.dtap
+ for name in dir(s.cap_p):
+ if '.' in name:
+ new_name = name.replace('.', '_')
+ elif not name:
+ new_name = 'unnamed'
+ else:
+ continue
+ setattr(s.cap_p, new_name, getattr(s.cap_p, name))
+
+ @classmethod
+ def pget(cls, cap_p, tokens, ifnone=None):
+ if cap_p is None or len(tokens) < 1:
+ return ifnone
+ p_field = getattr(cap_p, tokens[0], None)
+ if p_field is None:
+ p_field = getattr(cap_p, '.'.join(tokens), None)
+ if p_field is None:
+ return ifnone
+ if len(tokens) > 1:
+ return Packet.pget(p_field, tokens[1:], ifnone=ifnone)
+ return p_field
+
+ def get(s, field, ifnone=None):
+ return Packet.pget(s.cap_p, field.split('.'), ifnone=ifnone)
+
+ def str(s, elem_name=None):
+ strs = []
+ if elem_name:
+ elem = s.get(elem_name)
+ else:
+ elem = s.cap_p
+ return dir_vals(elem);
+
+ def field_names(s, elem_name=None, elem=None):
+ strs = ['', f'=== {elem_name} ===']
+ if elem is None:
+ elem = s.get(elem_name)
+ if not elem:
+ strs.append('None')
+ else:
+ for f in elem._get_all_fields_with_alternates():
+ for n in dir(f):
+ if n.startswith('_'):
+ continue
+ strs.append('%r=%r' % (n, getattr(f, n)))
+ return '\n'.join(strs)
+
+ def all_str(s, elem_name=None, elem=None, depth=1000):
+ strs = []
+ if elem is None:
+ if elem_name:
+ elem = s.get(elem_name)
+ else:
+ elem = s.cap_p
+ elem_name = '/'
+ strs.append('%s:' % elem_name)
+ for name in dir(elem):
+ if name.startswith('_') or name.endswith('_value') or name in [
+ 'sort', 'reverse', 'remove', 'pop', 'insert', 'index', 'extend', 'count',
+ 'copy', 'clear', 'append', 'zfill', 'max', 'min', 'resolution',
+ ]:
+ continue
+ try:
+ full_name = '%s.%s' % (elem_name, name)
+ val = getattr(elem, name)
+ if callable(val) or name in ['base16_value']:
+ continue
+ strs.append('%r=%r' % (full_name, val))
+ if depth and type(val) not in [int, str]:
+ strs.append(s.all_str(full_name, val, depth-1))
+ except:
+ pass
+ if hasattr(elem, '_get_all_fields_with_alternates'):
+ for f in elem._get_all_fields_with_alternates():
+ for n in dir(f):
+ if n.startswith('_'):
+ continue
+ full_name = '%r[%r]' % (elem_name, n)
+ try:
+ val = getattr(f, n)
+ except:
+ continue
+ if callable(val):
+ continue
+ strs.append('%s=%r' % (full_name, val))
+ if depth:
+ strs.append(s.all_str(full_name, val, depth-1))
+ return '\n'.join(strs)
+
+class IpPort:
+ all_ports = {}
+
+ @classmethod
+ def _key(cls, ip, port):
+ return f'{ip}:{port}'
+
+ @classmethod
+ def get(cls, ip, port, entity=None, proto=None, create=True):
+ o = IpPort.all_ports.get(IpPort._key(ip, port))
+ if o is None and create:
+ return IpPort(ip, port, entity, proto)
+ if o is not None:
+ if entity is not None:
+ if o.entity is not None and o.entity is not entity:
+ ERR('Port changes:', o, 'to', entity)
+ entity.add_port(o.proto, o)
+ return o
+
+ def __init__(s, ip:str=None, port:str=None, entity=None, proto=None):
+ set_instance_vars_from_args()
+ IpPort.all_ports[s.key()] = s
+
+ def __repr__(s):
+ r = ''
+ tokens = []
+ if s.entity:
+ tokens.append(f'{s.entity}')
+ if s.proto:
+ tokens.append(f'{s.proto}')
+ tokens.append(s.key())
+ return '-'.join(tokens)
+
+ def __eq__(s, other):
+ return s is other
+
+ def __hash__(s):
+ return hash(s.key())
+
+ def key(s):
+ return IpPort._key(s.ip, s.port)
+
+ @classmethod
+ def from_sdp(p:Packet):
+ ip = p.get('sdp.connection_info_address')
+ port = p.get('sdp.media_port')
+ return IpPort.get(ip, port)
+
+ @classmethod
+ def _from_ip(cls, p:Packet, src_dst:str, port:int):
+ ip = p.get('ip.' + src_dst)
+ if ip is None:
+ ipv6 = p.get('ipv6.' + src_dst)
+ if ipv6 is not None:
+ ip = '[' + ipv6 + ']'
+ return IpPort.get(ip, port)
+
+ @classmethod
+ def from_ip_source(cls, p:Packet, port:int):
+ return cls._from_ip(p, 'src', port)
+
+ @classmethod
+ def from_ip_dest(cls, p:Packet, port:int):
+ return cls._from_ip(p, 'dst', port)
+
+ @classmethod
+ def from_udp_source(cls, p:Packet):
+ return cls.from_ip_source(p, p.get('udp.srcport'))
+
+ @classmethod
+ def from_udp_dest(cls, p:Packet):
+ return cls.from_ip_dest(p, p.get('udp.dstport'))
+
+ @classmethod
+ def from_tcp_source(cls, p:Packet):
+ return cls.from_ip_source(p, p.get('tcp.srcport'))
+
+ @classmethod
+ def from_tcp_dest(cls, p:Packet):
+ return cls.from_ip_dest(p, p.get('tcp.dstport'))
+
+ @classmethod
+ def from_sctp_source(cls, p:Packet):
+ return cls.from_ip_source(p, p.get('sctp.srcport'))
+
+ @classmethod
+ def from_sctp_dest(cls, p:Packet):
+ return cls.from_ip_dest(p, p.get('sctp.dstport'))
+
+class Message:
+ pass
+
+class Trait:
+ def __init__(s, **kwargs):
+ if len(kwargs) > 1:
+ raise Exception('only one trait allowed per Trait(): %r' % kwargs)
+ for k, v in kwargs.items():
+ s.name = k
+ s.val = v
+
+ def __repr__(s):
+ return '%r=%r' % (s.name, s.val)
+
+class Traits(collections.OrderedDict):
+ def __init__(s, *args, **kwargs):
+ for arg in args:
+ s.add(arg)
+ s.set_vals(**kwargs)
+
+ def add(s, trait:Trait):
+ s[trait.name] = trait
+
+ def set(s, name, val):
+ if val is not None:
+ s.add(Trait(**{name: val}))
+
+ def set_vals(s, **kwargs):
+ for k,v in kwargs.items():
+ if v is None:
+ continue
+ if type(v) not in (str, int, float, bool, IpPort):
+ v = str(v)
+ s.set(k, v)
+
+ def __repr__(s):
+ strs = []
+ for k,trait in s.items():
+ assert k == trait.name
+ strs.append(repr(trait))
+ return '{%s}' % ', '.join(strs)
+
+def find_recent_msg(msg:Message, messages:list, my_idx:int, condition_func, max_t=1):
+ for i in reversed(range(my_idx)):
+ prev_msg = messages[i]
+ if not prev_msg:
+ continue
+ if prev_msg.finalized:
+ return None
+ if msg.timestamp - prev_msg.timestamp > max_t:
+ return None
+ if condition_func(prev_msg):
+ yield prev_msg
+ return None
+
+def find_same_trait(msg:Message, messages:list, my_idx:int, proto:str, name:str, max_t=1, operator=any):
+ def same_traits(prev_msg):
+ return msg.same_traits(prev_msg, proto, name, operator=operator)
+ return find_recent_msg(msg, messages, my_idx, same_traits, max_t)
+
+class dddict(dict):
+ @classmethod
+ def _get(cls, d, keys, create=False):
+ if not keys:
+ return d
+ k = keys[0]
+ v = d.get(k)
+ if len(keys) > 0:
+ if v is None:
+ if create:
+ v = {}
+ d[k] = v
+ else:
+ return None
+ r = cls._get(v, keys[1:], create=create)
+ return r
+
+ def gget(s, keys, create=None):
+ v = dddict._get(s, keys, create=False)
+ if v is None:
+ if create is None:
+ return None
+ else:
+ return s.sset(keys, create)
+ return v
+
+ def sset(s, keys, item):
+ d = dddict._get(s, keys[:-1], create=True)
+ d[keys[-1]] = item
+ return item
+
+ def ppop(s, keys):
+ d = dddict._get(s, keys[:-1])
+ if d is None:
+ return None
+ r = d.pop(keys[-1])
+ if not d:
+ if len(keys) > 1:
+ s.ppop(keys[:-1])
+ return r
+
+ @classmethod
+ def _count(cls, d):
+ if isinstance(d, dict):
+ count = 0
+ for val in d.values():
+ count += cls._count(val)
+ return count
+ return 1
+
+ def count(s):
+ return dddict._count(s)
+
+ @classmethod
+ def _all(cls, d):
+ if isinstance(d, dict):
+ for val in d.values():
+ yield from cls._all(val)
+ else:
+ yield d
+
+ def all(s):
+ return dddict._all(s)
+
+
+class Conn:
+ open_conns = dddict()
+ closed_conns = dddict()
+
+ '''One end of a time-limited connection for a protocol layer'''
+ def __init__(s, proto:str, port:IpPort, conn_id:str, start_msg:Message, close_msg:Message=None, idx=-1,
+ entity=None, counterparts:list=[], subscriber_conn=None, merge_counterparts=True, add_message=True,
+ meta=False):
+ set_instance_vars_from_args('entity', 'add_message', 'merge_counterparts')
+ s.tx_messages = UniqueList()
+ s.rx_messages = UniqueList()
+ if s.subscriber_conn:
+ s.subscriber_conn.conns.append(s)
+
+ s.keys = (proto, port.key(), conn_id)
+ Conn.open_conns.sset(s.keys, s)
+ # A counterpart is the same conn seen from the other side.
+ # For example, if a BSC opens a conn, the conterpart is the MSC's perspective on the same conn.
+ s.counterparts = UniqueList()
+ for cp in counterparts:
+ if cp is None:
+ continue
+ s.counterparts.append(cp)
+ cp.counterparts.append(s)
+
+ if entity:
+ entity.add_port(proto, port, from_msg=start_msg)
+
+ if add_message:
+ s.add_message(start_msg)
+
+ if not counterparts:
+ LOG(Color.colored('green', f'New conn (now {Conn.count_open_conns()})'), s.proto, '%r' % s.conn_id)
+ if merge_counterparts:
+ for other_conn in s.counterparts:
+ s.merge_subscr_conns(other_conn)
+
+ @classmethod
+ def _find(cls, keys, find_in_closed_conns=False):
+ ret = Conn.open_conns.gget(keys)
+ if ret is not None:
+ return ret
+ if find_in_closed_conns:
+ return Conn.closed_conns.gget(keys)
+ return None
+
+ @classmethod
+ def find(cls, proto:str, port:IpPort, conn_id:str, find_in_closed_conns=False):
+ return cls._find((proto, port.key(), conn_id), find_in_closed_conns=find_in_closed_conns)
+
+ @classmethod
+ def open(cls, proto:str, port:IpPort, conn_id:str, *args, **kwargs):
+ exists = cls.find(proto, port, conn_id)
+ if exists is not None:
+ ERR('Conn already open:', type(exists), repr(exists))
+ LOG(trace())
+ return Conn(proto, port, conn_id, *args, **kwargs)
+
+ @classmethod
+ def find_or_open(cls, proto:str, port:IpPort, conn_id:str, *args, **kwargs):
+ exists = cls.find(proto, port, conn_id)
+ if exists is not None:
+ return exists
+ return Conn(proto, port, conn_id, *args, **kwargs)
+
+ @classmethod
+ def open_2way(cls, proto:str, conn_id:str, msg:Message, *args, **kwargs):
+ conn1 = cls.open(proto, msg.src, conn_id, msg, *args, **kwargs)
+ cls.open(proto, msg.dst, conn_id, msg, *args, counterparts=[conn1], **kwargs)
+ return conn1
+
+ @classmethod
+ def close_conn(cls, conn, msg):
+ conn._add_message(msg)
+ conn.close_msg = msg
+ Conn.open_conns.ppop(conn.keys)
+ l = Conn.closed_conns.gget(conn.keys, create=[])
+ l.append(conn)
+ LOG(Color.colored('blue', f'Close conn ({Conn.count_open_conns()} left)'), conn)
+
+ @classmethod
+ def close(cls, proto, port, conn_id, close_msg, close_counterparts=True, if_exists=False):
+ conn = Conn.find(proto, port, conn_id)
+ if conn is None or not conn.is_open():
+ if if_exists == False:
+ ERR('Cannot close, conn not open:', Conn.key_label(proto, port, conn_id))
+ return None
+ Conn.close_conn(conn, close_msg)
+ assert Conn.find(proto, port, conn_id) is None
+ if close_counterparts:
+ for cp in conn.counterparts:
+ if cp.is_open():
+ Conn.close_conn(cp, close_msg)
+ return conn
+
+ @classmethod
+ def message(cls, proto, port, conn_id, msg):
+ conn = cls.find(proto, port, conn_id)
+ if conn is None:
+ return None
+ conn.add_message(msg)
+ return conn
+
+ def _add_message(s, msg):
+ if s.port == msg.src:
+ if s.meta:
+ msg.meta_conns.append(s)
+ else:
+ msg.src_conns.append(s)
+ s.tx_messages.append(msg)
+ elif s.port == msg.dst:
+ if s.meta:
+ msg.meta_conns.append(s)
+ else:
+ msg.dst_conns.append(s)
+ s.rx_messages.append(msg)
+
+ def add_message(s, msg, add_to_counterparts=True):
+ if s.close_msg:
+ out_error('Message on a closed conn', s, msg)
+ s._add_message(msg)
+ if add_to_counterparts:
+ for cp in s.counterparts:
+ cp._add_message(msg)
+
+ @classmethod
+ def key_label(cls, proto, port, conn_id):
+ if conn_id:
+ return f'{proto}:{conn_id}@{port}'
+ else:
+ return f'{proto}@{port}'
+
+ def label(s):
+ return s.key_label(s.proto, s.port, s.conn_id)
+
+ def __repr__(s):
+ tokens = [s.label()]
+ for r in s.counterparts:
+ if r is None:
+ tokens.append('None')
+ else:
+ tokens.append(r.label())
+ return ' -> '.join(tokens)
+
+ def merge_subscr_conns(s, other_conn):
+ if other_conn is None:
+ return
+ if not isinstance(other_conn, Conn):
+ for item in other_conn:
+ s.merge_subscr_conns(item)
+ return
+ assert isinstance(other_conn, Conn)
+ if s.subscriber_conn is not None and s.subscriber_conn is other_conn.subscriber_conn:
+ return
+ s.subscriber_conn = SubscriberConn.merge(s.subscriber_conn, other_conn.subscriber_conn)
+ if s.subscriber_conn is None:
+ s.subscriber_conn = SubscriberConn()
+ s.subscriber_conn.add_conn(s)
+ other_conn.subscriber_conn = s.subscriber_conn
+ s.subscriber_conn.add_conn(other_conn)
+
+ def find_entity(s, kind, with_port=None):
+ if s.subscriber_conn:
+ return s.subscriber_conn.find_entity(kind, with_port=with_port)
+ return None, None
+
+ def is_open(s):
+ return s.close_msg is None and Conn._find(s.keys) is s
+
+ @classmethod
+ def count_open_conns(cls):
+ count = 0
+ have = set()
+ for conn in cls.open_conns.all():
+ do_count = True
+ for conn2 in conn.counterparts:
+ if conn2.label() in have:
+ do_count = False
+ have.add(conn2.label())
+ if conn.label() in have:
+ do_count = False
+ have.add(conn.label())
+ if do_count:
+ count += 1
+ return count
+
+
+class Layer:
+
+ _classes = {}
+ traits = None
+ proto = None
+ cap_p_name = None
+
+ def __init__(s, msg:Message, proto:str, msgtype:str, traits:Traits, minor=False, hidden=False, cap_p_name:str=None):
+ set_instance_vars_from_args()
+ if proto in msg.layers:
+ raise Exception(f'duplicate proto {proto} for message')
+ msg.layers[proto] = s
+ s.msgtype = sane_msgtype(s.msgtype)
+ if not s.cap_p_name:
+ s.cap_p_name = s.proto
+ s.__class__.proto = s.proto
+ s.__class__.cap_p_name = s.cap_p_name
+
+ def label(s):
+ if s.msgtype:
+ return f'{s.proto}.{s.msgtype}'
+ else:
+ return s.proto
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ '''return a list of Message.EntityIdent instances describing source and/or destination entity that message msg identifies.'''
+ return None
+
+ @classmethod
+ def identify_conns(s, messages, my_idx):
+ pass
+
+ def collapse(s, messages, my_idx):
+ '''return the message itself if it remains in messages, if another absorbed it return the other message,
+ or if if it is dropped completely return None'''
+ return messages[my_idx]
+
+ @classmethod
+ def classes(cls):
+ if not Layer._classes:
+ for cls in Layer.__subclasses__():
+ name = cls.__name__
+ if not name.startswith('Layer_'):
+ continue
+ proto_name = name[len('Layer_'):]
+ Layer._classes[proto_name] = cls
+ #cls.proto = proto_name
+ return Layer._classes
+
+ @classmethod
+ def parse(cls, msg:Message):
+
+ for proto_name,child_class in Layer.classes().items():
+ if not msg.p.get(proto_name):
+ continue
+ child_class(msg)
+
+
+class Message:
+
+ def __init__(s, p:Packet, finalized=False):
+ set_instance_vars_from_args()
+ s.layers = collections.OrderedDict()
+ s.count = 1
+ s.count_back = 0
+ s.timestamp = float(p.cap_p.sniff_timestamp)
+ s.hide = False
+ s.src = None
+ s.dst = None
+ s.src_conns = []
+ s.dst_conns = []
+ s.meta_conns = []
+ s.absorbed = UniqueList()
+ s.strong_relation = True
+ s.debug = SHOW_ALL_DEBUG
+ s._log = []
+
+ def log(s, *text, **kwtext):
+ s._log.append(to_text(*text, *kwtext))
+
+ def dbg(s, *text, **kwtext):
+ if s.debug:
+ s._log.append(to_text(*text, *kwtext))
+
+ def is_minor(s):
+ return all(l.minor for l in s.layers.values())
+
+ def get_trait(s, proto:str, name:str, ifnone=None):
+ # allow alternative lists for proto, like s.get_trait(('tcp', 'udp'), 'src')
+ if proto is None:
+ proto = s.layers.keys()
+ if type(proto) is not str:
+ for proto_ in proto:
+ val = s.get_trait(proto_, name, None)
+ if val is not None:
+ return val
+ return ifnone
+ # allow alternative lists for name, like s.get_trait('tcp', ('src', 'dst))
+ if type(name) is not str:
+ for name_ in name:
+ val = s.get_trait(proto, name_, None)
+ if val is not None:
+ return val
+ return ifnone
+
+ if name == 'timestamp':
+ return int(s.timestamp)
+ layer = s.layers.get(proto, None)
+ if not layer:
+ return ifnone
+ if name == 'msgtype':
+ return layer.msgtype
+ trait = layer.traits.get(name, None)
+ if trait is None:
+ return ifnone
+ if trait.val is None:
+ return ifnone
+ return trait.val
+
+ def get_traits(s, proto=None, names=None, proto_and_names=None):
+ pn = []
+ if proto or names:
+ if proto is None or isinstance(proto, str):
+ proto = [proto]
+ for p in proto:
+ pn.append((p, names))
+ if proto_and_names:
+ pn.extend(proto_and_names)
+ for proto_, names in pn:
+ if proto_ is None:
+ proto_ = s.layers.keys()
+ if isinstance(proto_, str):
+ proto_ = [proto_]
+ for proto in proto_:
+ if names is None:
+ l = s.layers.get(proto, None)
+ if not l:
+ continue
+ names = l.traits.keys()
+ if type(names) is str:
+ names = [names]
+ for name in names:
+ result = s.get_trait(proto, name, ifnone=None)
+ if result is not None:
+ yield (proto, name, result)
+
+ def get_all_traits(s, proto:str):
+ layer = s.layers.get(proto)
+ if not layer:
+ return {}
+ return layer.traits
+
+ def same_traits(s, other_msg, proto:str, name:str, allow_unset=False, operator=all):
+ if type(proto) is not str:
+ return operator(
+ s.same_traits(other_msg, proto_, name, allow_unset=allow_unset)
+ for proto_ in proto
+ )
+
+ if name is None:
+ my_traits = s.get_all_traits(proto)
+ other_traits = other_msg.get_all_traits(proto)
+ names = set(my_traits.keys())
+ names.update(other_traits.keys())
+ name = list(names)
+
+ if type(name) is not str:
+ return operator(
+ s.same_traits(other_msg, proto, name_, allow_unset=allow_unset)
+ for name_ in name
+ )
+
+ val = s.get_trait(proto, name)
+ other_val = other_msg.get_trait(proto, name)
+ if not allow_unset:
+ if val is None or other_val is None:
+ return False
+ return val == other_val
+
+ def set_trait(s, proto, name, val):
+ layer = s.layers.get(proto, None)
+ if layer is None:
+ layer = Layer(s, proto, None, Traits(Trait(name, val)))
+ else:
+ layer.traits.set(name, val)
+
+ def collapse(s, messages, my_idx):
+ '''iterate backwards over recent messages and see if messages can be combined'''
+ orig_msg = messages[my_idx]
+ for layer in s.layers.values():
+ msg = layer.collapse(messages, my_idx)
+ if orig_msg is not msg:
+ break
+ return msg
+
+ def absorb_msg(s, other_msg):
+ global g_current_msg
+ if g_current_msg is other_msg:
+ g_current_msg = s
+ s._log.extend(other_msg._log)
+ if other_msg and other_msg is not s:
+ s.absorbed.append(other_msg)
+ if other_msg.absorbed:
+ other_absorbed = other_msg.absorbed
+ other_msg.absorbed = []
+ for oa in other_absorbed:
+ s.absorb_msg(oa)
+
+ def identify_conns(s, messages, my_idx):
+ msg = messages[my_idx]
+ for layer_class in Layer.classes().values():
+ if layer_class.proto not in msg.layers:
+ continue
+ layer_class.identify_conns(messages, my_idx)
+
+ class EntityIdent:
+ def __init__(s, proto=None, src_port=None, src_kind=None, src_entity=None, dst_port=None, dst_kind=None, dst_entity=None, rename=False):
+ set_instance_vars_from_args()
+
+ def identify_entities(s, messages, my_idx):
+ '''From protocol and message discriminators, see if we can identify the src and dst port of the message
+ to be of a specific core network entity.'''
+ for layer in s.layers.values():
+ try:
+ identifieds = layer.identify_entities(s, messages, my_idx)
+ if identifieds is None:
+ continue
+ if isinstance(identifieds, Message.EntityIdent):
+ identifieds = [identifieds]
+
+ for ident in identifieds:
+ if ident is None:
+ continue
+ Entity.find_or_create(ident.proto, ident.src_kind,
+ ident.src_port or s.src,
+ ident.src_entity, from_msg=s,
+ rename=(ident.rename is True or ident.rename == 'src'))
+ Entity.find_or_create(ident.proto, ident.dst_kind,
+ ident.dst_port or s.dst,
+ ident.dst_entity, from_msg=s,
+ rename=(ident.rename is True or ident.rename == 'dst'))
+ except:
+ raise Exception(f'In layer {layer} {s}')
+
+ def find_entity(s, kind, with_port=None):
+ for conn in (s.src_conns + s.dst_conns):
+ ret = conn.find_entity(kind, with_port=with_port)
+ if ret and ret[0] is not None:
+ return ret
+ return None, None
+
+ def get_port(s, entity_kind:str):
+ if s.src_entity_is(entity_kind):
+ return s.src
+ elif s.dst_entity_is(entity_kind):
+ return s.dst
+ return None
+
+ def entity(s, *kinds):
+ if s.src.entity and s.src.entity.kind in kinds:
+ return s.src.entity
+ if s.dst.entity and s.dst.entity.kind in kinds:
+ return s.dst.entity
+
+ def src_entity_is(s, *kinds):
+ return s.src.entity and s.src.entity.kind in kinds
+
+ def dst_entity_is(s, *kinds):
+ return s.dst.entity and s.dst.entity.kind in kinds
+
+ def same_src_dst(s, other, forward=None, reverse=None):
+ # assume forward and reverse if neither are set.
+ # if one of them is set to True, assume the other as False.
+ if forward is None and reverse is None:
+ forward = True
+ reverse = True
+ a = (s.src.key(), s.dst.key())
+ b = (other.src.key(), other.dst.key())
+ if forward and reverse:
+ return set(a) == set(b)
+ elif forward:
+ return a == b
+ elif reverse:
+ return a == tuple(reversed(b))
+ else:
+ return False
+
+ @classmethod
+ def parse(cls, p:Packet):
+ msg = Message(p)
+ Layer.parse(msg)
+ msg.src = msg.get_trait(('tcp','udp','sctp'), 'src')
+ msg.dst = msg.get_trait(('tcp','udp','sctp'), 'dst')
+ if msg.src is None or msg.dst is None:
+ return None
+ assert isinstance(msg.src, IpPort)
+ assert isinstance(msg.dst, IpPort)
+ return msg
+
+ def label(s):
+ label = []
+ for l in s.layers.values():
+ if not SHOW_ALL_LAYERS:
+ if l.minor:
+ continue
+ if l.hidden and not all((ll.minor or ll.hidden) for ll in s.layers.values()):
+ continue
+ label.insert(0, l.label())
+ return '/'.join(label)
+
+ def related_subscribers(s):
+ subscribers = UniqueList()
+ src_sc = s.src_subscriber_conn()
+ if src_sc:
+ subscribers.append(src_sc.subscriber)
+ dst_sc = s.dst_subscriber_conn()
+ if dst_sc:
+ subscribers.append(dst_sc.subscriber)
+ for a in s.absorbed:
+ subscribers.extend(a.related_subscribers())
+ return subscribers
+
+ def is_subscriber_related(s, subscriber):
+ return subscriber in s.related_subscribers()
+
+ def __repr__(s):
+ return s.__str__()
+
+ def __str__(s):
+ return s.str()
+
+ def str(s, ladder=False, one_column_per_kind=False, show_traits=True, show_conns=True):
+ t_name = []
+ t_name.extend(subscr.label() for subscr in s.related_subscribers())
+ name = s.label()
+ if name:
+ t_name.append(name)
+
+ src = str(s.src)
+ dst = str(s.dst)
+
+ if s.src.entity is not None:
+ src_str = s.src.entity.label()
+ else:
+ src_str = src
+
+ if s.count and dst == src:
+ dst_str = '(self)'
+ elif s.dst.entity is not None:
+ dst_str = s.dst.entity.label()
+ else:
+ dst_str = dst
+
+ src_pos = 0
+ dst_pos = 0
+ if s.src.entity:
+ src_pos = s.src.entity.labelcolumn(one_column_per_kind)
+ if s.dst.entity:
+ dst_pos = s.dst.entity.labelcolumn(one_column_per_kind)
+
+ # allows injecting informational fake messages (Entity.news)
+ if not s.count and not s.count_back:
+ dst_pos = src_pos
+
+ if not s.src.entity and not s.dst.entity:
+ if src > dst:
+ src_pos = dst_pos + 1
+ else:
+ dst_pos = src_pos + 1
+
+ if not ladder:
+ if src_pos > dst_pos:
+ src_pos = 1
+ dst_pos = 0
+ else:
+ src_pos = 0
+ dst_pos = 1
+
+ if src_pos <= dst_pos:
+ left_pos = src_pos
+ right_pos = dst_pos
+ left_label = src_str
+ right_label = dst_str
+ to_left_count = s.count_back
+ to_right_count = s.count
+ else:
+ left_pos = dst_pos
+ right_pos = src_pos
+ left_label = dst_str
+ right_label = src_str
+ to_left_count = s.count
+ to_right_count = s.count_back
+
+ left_strs = []
+ left_strs.append(left_label)
+ if to_left_count:
+ left_strs.append('<')
+ if to_left_count > 1:
+ left_strs.append(f'{to_left_count}')
+
+ right_strs = []
+ if to_right_count:
+ if to_right_count > 1:
+ right_strs.append(f'{to_right_count}')
+ right_strs.append('>')
+ right_strs.append(right_label)
+
+ real_left_pos = max(0, left_pos - (len(left_label)//2))
+ real_right_pos = right_pos - (len(right_label)//2) + len(right_label) + 1 - (len(right_label)&1)
+
+ left_str = ''.join(left_strs)
+ right_str = ''.join(right_strs)
+
+ mid_gap = real_right_pos - real_left_pos - len(right_str) - len(left_str)
+ mid_gap = max(1, mid_gap)
+
+ if not ladder:
+ mid_name_margin = 6
+ else:
+ mid_name_margin = mid_gap - len(name)
+ if to_left_count or to_right_count:
+ if mid_name_margin > 50:
+ mid_gap_strs = ['-' * int(mid_name_margin // 2),
+ name,
+ '-' * int(mid_name_margin - (mid_name_margin//2))]
+ name = ''
+ else:
+ mid_gap_strs = ['-' * int(mid_gap)]
+ else:
+ mid_gap_strs = []
+
+ t = [' ' * int(real_left_pos),
+ left_str,]
+ t.extend(mid_gap_strs)
+ t.append(right_str)
+
+ if ladder:
+ t = [''.join(t)]
+ right_end = len(t[0])
+ label_pos = Entity.textcol_one_per_kind if one_column_per_kind else Entity.textcol_one_per_entity
+ diff = label_pos - right_end
+ if diff > 0:
+ t.append(' ' * int(diff))
+
+ if show_traits:
+ if isinstance(show_traits, str):
+ show_traits = [show_traits]
+ for proto,l in s.layers.items():
+ if not l.traits:
+ continue
+ if (show_traits is not True) and (proto not in show_traits):
+ continue
+ t_name.append('%s%s' % (proto, l.traits))
+
+ if show_conns:
+ conns = set()
+ for c in (s.src_conns + s.dst_conns):
+ conns.add(f'{c.proto}:{c.conn_id}')
+ #conns.add(f'{c}')
+ t_name.append(' '.join(conns))
+
+ idxs = [s.p.idx] + [a.p.idx for a in s.absorbed]
+ if len(idxs) <= 3:
+ t_name.append('+'.join(str(i) for i in sorted(idxs)))
+ else:
+ t_name.append(f'{min(idxs)}-{max(idxs)}')
+ t_name.append(f'{s.timestamp:.3f}')
+ t.append(' ')
+ t = [''.join(t)]
+ indent = '\n' + (' ' * len(t[0]) + ' | ')
+ t.append(' '.join(t_name))
+
+ for l in s._log:
+ t.append(indent)
+ t.append(l)
+ return ''.join(t)
+
+
+ def src_subscriber_conn(s):
+ for conn in s.src_conns:
+ if conn.subscriber_conn is not None:
+ return conn.subscriber_conn
+ return None
+
+ def dst_subscriber_conn(s):
+ for conn in s.dst_conns:
+ if conn.subscriber_conn is not None:
+ return conn.subscriber_conn
+ return None
+
+ def find_message(s, proto, trait, value):
+ for subscr_conn in (s.src_subscriber_conn(), s.dst_subscriber_conn()):
+ if subscr_conn is None:
+ continue
+ msg = subscr_conn.find_message(proto, trait, value)
+ if msg:
+ return msg
+ return None
+
+
+
+class Entity:
+ '''A core network program like BSC, MSC, ...'''
+ KINDS_SORTING = ('MS', 'BTS', 'PCU', 'hNodeB', 'BSC', 'MGW@BSC', 'HNBGW', 'STP', 'MSC', 'MGW@MSC', 'MGW',
+ 'SGSN', 'HLR', 'SIP', 'SIPcon', 'PBX', 'GGSN')
+ KINDS_SORTING_EXIST = ()
+ entities = listdict()
+ state_version = 1 # whether to update cached text columns
+ spacing = 5
+ label_spacing = 2
+ textcol_one_per_kind = 0
+ textcol_one_per_entity = 0
+
+ # proxy / forwarding addresses to ignore, like the STP port
+ blacklist = []
+
+ def __init__(s, kind:str):
+ set_instance_vars_from_args()
+ s.idx = None
+ s.state_version = 0
+ s.ports = listdict()
+ s.labelcol_one_per_kind = 0
+ s.labelcol_one_per_entity = 0
+ Entity.add(s)
+
+ @classmethod
+ def add(cls, entity):
+ Entity.entities.add(entity.kind, entity)
+ entity.idx = entity.idx_in_kind()
+ if entity.kind not in Entity.KINDS_SORTING_EXIST:
+ # a new kind has come up, refresh Entity.KINDS_SORTING_EXIST
+ exist = []
+ for k in Entity.KINDS_SORTING:
+ if k in Entity.entities.keys():
+ exist.append(k)
+ for k in Entity.entities.keys():
+ if k not in exist:
+ exist.append(k)
+ Entity.KINDS_SORTING_EXIST = tuple(exist)
+
+ Entity.state_version += 1
+
+ @classmethod
+ def count_entities(cls, kind):
+ l = Entity.entities.get(kind)
+ return len(l)
+
+ @classmethod
+ def add_to_blacklist(cls, port:IpPort):
+ if port in cls.blacklist:
+ return
+ cls.blacklist.append(port);
+
+ def rename(s, to_kind):
+ Entity.entities.remove(s.kind, s)
+ was_kind = s.kind
+ s.kind = to_kind
+ Entity.add(s)
+ for proto,l in s.ports.items():
+ for port in l:
+ LOG(Color.colored('yellow', 'Rename port'), 'from', was_kind, 'to', s, proto, port)
+
+ @classmethod
+ def find_or_create(cls, proto, kind, port, matched_entity=None, from_msg=None, rename=False):
+ if not port:
+ return None
+ if port in Entity.blacklist:
+ return None
+
+ found_entity = port.entity
+ if found_entity and matched_entity and (found_entity is not matched_entity):
+ LOG(Color.colored('purple', 'Renaming entity port:'), port, 'to', matched_entity)
+ if not matched_entity:
+ matched_entity = found_entity
+ if matched_entity:
+ if kind and matched_entity.kind != kind and rename:
+ matched_entity.rename(kind)
+ matched_entity.add_port(proto, port, from_msg=from_msg)
+ return matched_entity
+
+ if kind is None or rename:
+ for l in Entity.entities.values():
+ for e in l:
+ if e.has_port(port):
+ if kind and e.kind != kind and rename:
+ e.rename(kind)
+ return e
+
+ if kind is None:
+ return None
+ else:
+ l = Entity.entities.get(kind)
+ if l:
+ for e in l:
+ if e.has_port(port):
+ return e
+ e = Entity(kind)
+ e.add_port(proto, port, from_msg=from_msg)
+ return e
+
+ def absorb(s, other_entity):
+ '''Merge two entities to one, adopting the other's ports'''
+ for port in other_entity.ports:
+ s.add_port(port)
+ del other_entity
+
+ def label(s):
+ idx = ''
+ if s.idx:
+ idx = str(s.idx + 1)
+ return f'{s.kind}{idx}'
+
+ def __repr__(s):
+ return s.label()
+ def __str__(s):
+ return repr(s)
+
+ def kind_idx(s):
+ '''this entity kind's position in the currently known entity kinds:
+ For 'BSC', if we've seen BTS, BSC and MSC, return 1.'''
+ return Entity.KINDS_SORTING_EXIST.index(s.kind)
+
+ def idx_in_all(s):
+ '''this entity kind's position in all currently known entities:
+ For the second 'BSC', if we've seen 2 BTS, 3 BSC and 1 MSC, return 2 (BTS) + 1 (second BSC) = 3.'''
+ idx = 0
+ for k in Entity.KINDS_SORTING_EXIST:
+ if k == s.kind:
+ idx += Entity.entities.get(s.kind).index(s)
+ return idx
+ idx += Entity.count_entities(k)
+ return idx
+
+ def idx_in_kind(s):
+ '''this entity kind's position in the list of entities of the same kind'''
+ return Entity.entities.get(s.kind).index(s)
+
+ def check_update_state(s):
+ if s.state_version == Entity.state_version:
+ return
+ Entity.calculate_textcolumns()
+ s.state_version = Entity.state_version
+
+ def labelcolumn(s, one_column_per_kind=False, mid=True):
+ s.check_update_state()
+ if one_column_per_kind:
+ midcol = s.labelcol_one_per_kind
+ else:
+ midcol = s.labelcol_one_per_entity
+
+ if mid:
+ ret = midcol
+ else:
+ ret = int(midcol - (len(s.label()) // 2))
+ return ret
+
+ @classmethod
+ def calculate_textcolumns(cls):
+ '''In text rendering of a ladder diagram, return the text column for this entity,
+ if rendering each entity in its own column (not sharing one column per entity kind)'''
+ entity_col = 0
+ kind_col = 0
+ for k in Entity.KINDS_SORTING_EXIST:
+ l = Entity.entities.get(k)
+
+ kind_col += len(k) // 2
+ for e in l:
+ e.labelcol_one_per_kind = kind_col
+ Entity.textcol_one_per_kind = kind_col + len(e.label()) + Entity.spacing
+
+ entity_col += len(e.kind)//2
+ e.labelcol_one_per_entity = entity_col
+ entity_col += len(e.label()) + Entity.spacing
+ Entity.textcol_one_per_entity = entity_col
+
+ kind_col += len(k) + Entity.spacing
+
+ def has_port(s, port, proto=None):
+ if proto:
+ if port in s.ports.get(proto, []):
+ return proto
+ return None
+ for proto,l in s.ports.items():
+ if port in l:
+ return proto
+ return None
+
+ def remove_port(s, port):
+ for proto,l in s.ports.items():
+ if port in l:
+ l.remove(port)
+ port.entity = None
+ return
+
+ def add_port(s, proto, port, from_msg=None):
+ if port.entity and port.entity is not s:
+ port.entity.remove_port(port)
+ if s.has_port(port, proto=proto):
+ return
+ s.ports.add(proto, port)
+ port.entity = s
+ port.proto = proto
+ LOG(Color.colored('cyan', 'New port:'), port)
+
+
+class Subscriber:
+ next_ident = 1
+ next_tmsi_idx = 1
+ imsis = {}
+ tmsis = {}
+ imeis = {}
+ msisdns = {}
+
+ def __init__(s, imsi:str=None, tmsi=None, imei=None, msisdn=None):
+ set_instance_vars_from_args()
+ s.subscriber_conns = UniqueList()
+ s.tmsis = set()
+ s.tmsi_idx = 0
+ s.set_last_tmsi(tmsi)
+ s.ident = Subscriber.next_ident
+ Subscriber.next_ident += 1
+ Subscriber.update_dicts(s)
+
+ def set_last_tmsi(s, tmsi):
+ if tmsi is None:
+ return
+ s.tmsi = tmsi
+ if (not isinstance(tmsi, str)) or len(str(tmsi)) < 8:
+ raise Exception('Invalid TMSI: ' + str(tmsi))
+ s.tmsi_idx = Subscriber.next_tmsi_idx
+ Subscriber.next_tmsi_idx += 1
+ s.tmsis.add(tmsi)
+ Subscriber.tmsis[s.tmsi] = s
+
+ @classmethod
+ def update_dicts(cls, s):
+ if s.imsi:
+ Subscriber.imsis[s.imsi] = s
+ for tmsi in s.tmsis:
+ Subscriber.tmsis[tmsi] = s
+ if s.imei:
+ Subscriber.imeis[s.imei] = s
+ if s.msisdn:
+ Subscriber.msisdns[s.msisdn] = s
+
+ def label(s, full=False):
+ l = []
+ if full or (not s.imsi and not s.tmsi and not s.imei and not s.msisdn):
+ l.append(f'subscr{s.ident}')
+ if s.imsi and (full or not s.msisdn):
+ l.append(f'imsi{s.imsi}')
+
+ if s.tmsi and (full or not s.imsi):
+ l.append(f'tmsi{s.tmsi}')
+ if s.imei and (full or (not s.imsi and not s.tmsi)):
+ l.append(f'imei{s.imei}')
+ if s.msisdn:
+ l.append(f'msisdn{s.msisdn}')
+ return Color.colored(s.ident, ':'.join(l))
+
+ def __repr__(s):
+ return s.label(full=True)
+ def __str__(s):
+ return s.label()
+
+ @classmethod
+ def identify_subscriber(cls, msg:Message):
+ imsi = msg.get_trait(('dtap','bssmap','rsl','gsup'), 'imsi')
+ tmsi = tmsi_standardize(msg.get_trait(('dtap','bssmap','rsl'), 'tmsi'))
+ imei = msg.get_trait('dtap', 'imei')
+ msisdn = msg.get_trait('gsup', 'msisdn')
+
+ if not (imsi or tmsi or imei or msisdn):
+ return
+ #if not msg.src_conns and not msg.dst_conns:
+ # return
+
+ # could start out with subscr = None, but to use a few less subscriber.ids start out with a present
+ # subscriber, if any.
+ subscr_conn = msg.src_subscriber_conn() or msg.dst_subscriber_conn()
+ if subscr_conn is not None:
+ subscr = subscr_conn.subscriber
+ else:
+ subscr = None
+
+ if imsi:
+ subscr = Subscriber.merge(Subscriber.by_imsi(imsi), subscr)
+ if tmsi:
+ subscr = Subscriber.merge(Subscriber.by_tmsi(tmsi), subscr)
+ if imei:
+ subscr = Subscriber.merge(Subscriber.by_imei(imei), subscr)
+ if msisdn:
+ subscr = Subscriber.merge(Subscriber.by_msisdn(msisdn), subscr)
+
+ if subscr is None:
+ return
+ if subscr_conn is None:
+ subscr_conn = SubscriberConn()
+ subscr_conn.set_subscriber(subscr)
+
+ for c in (msg.src_conns + msg.dst_conns):
+ if c.subscriber_conn is None:
+ subscr_conn.add_conn(c)
+ c.subscriber_conn = subscr_conn
+ else:
+ c.subscriber_conn = SubscriberConn.merge(c.subscriber_conn, subscr_conn)
+
+ @classmethod
+ def by_imsi(cls, imsi):
+ subscr = Subscriber.imsis.get(imsi)
+ if not subscr:
+ subscr = Subscriber(imsi=imsi)
+ return subscr
+
+ @classmethod
+ def by_tmsi(cls, tmsi):
+ subscr = Subscriber.tmsis.get(tmsi)
+ if not subscr:
+ subscr = Subscriber(tmsi=tmsi)
+ return subscr
+
+ @classmethod
+ def by_imei(cls, imei):
+ subscr = Subscriber.imeis.get(imei)
+ if not subscr:
+ subscr = Subscriber(imei=imei)
+ return subscr
+
+ @classmethod
+ def by_msisdn(cls, msisdn):
+ subscr = Subscriber.msisdns.get(msisdn)
+ if not subscr:
+ subscr = Subscriber(msisdn=msisdn)
+ return subscr
+
+ @classmethod
+ def merge(cls, a, b):
+ assert a is None or isinstance(a, Subscriber)
+ assert b is None or isinstance(b, Subscriber)
+ if a is None and b is None:
+ return None
+ if a is None:
+ return b
+ if b is None or b is a:
+ return a
+
+ if not a.imsi and a.ident > b.ident:
+ return cls.merge(b, a)
+
+ if a.imsi and b.imsi and a.imsi != b.imsi:
+ out_error(f'cannot absorb, subscriber would change IMSI: {b.imsi} -> {a.imsi}')
+ return None
+ if a.imei and b.imei and a.imei != b.imei:
+ out_error(f'cannot absorb, subscriber would change IMEI: {b.imei} -> {a.imei}')
+ return None
+
+ if b.imsi:
+ a.imsi = b.imsi
+ b.imsi = None
+
+ if b.tmsis:
+ a.tmsis.update(b.tmsis)
+ b.tmsis = set()
+ if b.tmsi_idx > a.tmsi_idx:
+ a.set_last_tmsi(b.tmsi)
+ b.tmsi = None
+
+ if b.imei:
+ a.imei = b.imei
+ b.imei = None
+
+ if b.msisdn:
+ if a.msisdn and a.msisdn != b.msisdn:
+ LOG(f'subscriber {a} changes MSISDN: {a.msisdn} -> {b.msisdn}')
+ a.msisdn = b.msisdn
+ b.msisdn = None
+
+ Subscriber.update_dicts(a)
+
+ for sc in b.subscriber_conns:
+ a.add_subscriber_conn(sc)
+ b.subscriber_conns = []
+ return a
+
+ def add_subscriber_conn(s, subscriber_conn):
+ if subscriber_conn.subscriber is s:
+ return
+ if subscriber_conn.subscriber:
+ subscriber_conn.subscriber.subscriber_conns.remove(subscriber_conn)
+ s.subscriber_conns.append(subscriber_conn)
+ subscriber_conn.subscriber = s
+ assert subscriber_conn in subscriber_conn.subscriber.subscriber_conns
+
+ def find_entity(s, kind, with_port=None):
+ for subscriber_conn in reversed(s.subscriber_conns):
+ found, found_subscriber_conn = subscriber_conn.find_entity(kind, ask_subscriber=False,
+ with_port=with_port)
+ if found is not None:
+ return found, found_subscriber_conn
+ return None, None
+
+class SubscriberConn:
+ '''A SubscriberConn is a collection of conns that feed into each other.
+ For example, the Abis and the BSSMAP link for the same subscriber are related, as well as the MGCP and
+ RTP spoken for that subscriber.
+ If a subscriber disconnects and connects again, that is a new separate SubscriberConn;
+ also if a subscriber would concurrently attach in twice somehow, that would be separate SubscriberConn
+ instances.
+
+ Note that a Message's src_conns and a dst_conns are not necessarily listed in the same SubscriberConn, for example
+ for RTP, SIP or SMS, the messages may pass from one subscriber to another.'''
+
+ def __init__(s, subscriber=None):
+ set_instance_vars_from_args()
+ s.conns = UniqueList()
+
+ @classmethod
+ def merge(cls, a, b):
+ assert a is None or isinstance(a, SubscriberConn)
+ assert b is None or isinstance(b, SubscriberConn)
+ if a is None and b is None:
+ return None
+ if a is None:
+ return b
+ if b is None or b is a:
+ return a
+ b_subscr = b.subscriber
+ if b.subscriber:
+ b.subscriber.subscriber_conns.remove(b)
+ b.subscriber = None
+ a.subscriber = Subscriber.merge(a.subscriber, b_subscr)
+ if a.subscriber:
+ a.subscriber.subscriber_conns.append(a)
+ for conn in b.conns:
+ conn.subscriber_conn = a
+ a.conns.append(conn)
+ b.conns = None
+ return a
+
+ def find_entity(s, kind, with_port=None, ask_subscriber=True):
+ if ask_subscriber and s.subscriber is not None:
+ return s.subscriber.find_entity(kind, with_port=with_port)
+ if with_port is not None and isinstance(with_port, str):
+ with_port = [with_port]
+ for conn in reversed(s.conns):
+ if with_port is not None and conn.port.proto not in with_port:
+ continue
+ if conn.port.entity is not None and conn.port.entity.kind == kind:
+ return conn.port.entity, s
+ return None, None
+
+ def find_message(s, proto, trait, val):
+ for conn in reversed(s.conns):
+ for msgs in (conn.tx_messages, conn.rx_messages):
+ for msg in msgs:
+ for p, t, v in msg.get_traits(proto, trait):
+ if val is None or val == v:
+ return msg
+ return None
+
+ def set_subscriber(s, subscriber):
+ s.subscriber = subscriber
+ if subscriber is not None:
+ subscriber.subscriber_conns.append(s)
+
+ def add_conn(s, conn):
+ s.conns.append(conn)
+ conn.subscriber_conn = s
+
+ def __repr__(s):
+ return f'{s.subscriber}~{s.conns}'
+
+
+class Layer_tcp(Layer):
+ def __init__(s, msg:Message):
+ p = msg.p
+ traits = Traits(
+ src=IpPort.from_tcp_source(p),
+ dst=IpPort.from_tcp_dest(p),
+ )
+ super().__init__(msg=msg, proto='tcp', msgtype=None, traits=traits, minor=True)
+
+class Layer_udp(Layer):
+ def __init__(s, msg:Message):
+ p = msg.p
+ traits = Traits(
+ src=IpPort.from_udp_source(p),
+ dst=IpPort.from_udp_dest(p),
+ )
+ super().__init__(msg=msg, proto='udp', msgtype=None, traits=traits, minor=True)
+
+class Layer_sctp(Layer):
+ def __init__(s, msg:Message):
+ p = msg.p
+ traits = Traits(
+ src=IpPort.from_sctp_source(p),
+ dst=IpPort.from_sctp_dest(p),
+ stream_id = p.get('sctp.data_sid'),
+ stream_seq = p.get('sctp.data_ssn'),
+ )
+ super().__init__(msg=msg, proto='sctp', msgtype=None, traits=traits, minor=True)
+
+class Layer_rtp(Layer):
+ def __init__(s, msg:Message):
+ pt = msg.p.get('rtp.p_type')
+
+ iuup_msgtype = sane_showname(msg.p.get('iuup.pdu_type.showname'))
+
+ if iuup_msgtype is not None:
+ msgtype = f'{pt}.{iuup_msgtype}'
+ else:
+ msgtype = pt
+
+ traits = Traits(
+ pt=pt,
+ iuup=iuup_msgtype
+ )
+ super().__init__(msg=msg, proto='rtp', msgtype=msgtype, traits=traits)
+
+ def collapse(s, messages, my_idx):
+ msgtype = s.msg.get_trait('rtp', 'msgtype')
+ src = s.msg.src
+ dst = s.msg.dst
+ for i in reversed(range(my_idx)):
+ prev_msg = messages[i]
+ if not prev_msg:
+ continue
+ if prev_msg.finalized:
+ break
+ if not 'rtp' in prev_msg.layers:
+ if prev_msg.is_minor():
+ continue
+ else:
+ break
+ if prev_msg.get_trait('rtp', 'msgtype') != msgtype:
+ continue
+ if s.msg.same_src_dst(prev_msg, forward=True):
+ # found a recent RTP similar RTP packet, combine
+ prev_msg.count += 1
+ messages[my_idx] = None
+ prev_msg.absorb_msg(s.msg)
+ return prev_msg
+ if 1 and s.msg.same_src_dst(prev_msg, reverse=True):
+ # same but backwards
+ prev_msg.count_back += 1
+ messages[my_idx] = None
+ prev_msg.absorb_msg(s.msg)
+ return prev_msg
+ return s.msg
+
+ # identify_entities: RTP ports are identified by watching RSL and MGCP, see Layer_gsm_abis_rsl.identify_conns_rtp
+
+ @classmethod
+ def identify_conns(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+
+ conn = Conn.find('rtp', msg.src, conn_id=msg.src.key())
+ if conn is not None:
+ conn.add_message(msg)
+
+ conn = Conn.find('rtp', msg.dst, conn_id=msg.dst.key())
+ if conn is not None:
+ conn.add_message(msg)
+
+class Layer_mgcp(Layer):
+ def __init__(s, msg:Message):
+ p = msg.p
+ verb = p.get('mgcp.req_verb')
+ rsp = p.get('mgcp.rsp_rspstring')
+ msgtype = verb or rsp or '?'
+ tid = p.get('mgcp.transid', '')
+
+ rtp_port = None
+ sdp_rtp_ip = p.get('sdp.connection_info_address')
+ sdp_rtp_port = p.get('sdp.media_port')
+ if sdp_rtp_ip and sdp_rtp_port:
+ rtp_port = IpPort.get(sdp_rtp_ip, sdp_rtp_port)
+
+ if rsp:
+ endp = p.get('mgcp.param_specificendpointid')
+ else:
+ endp = p.get('mgcp.req_endpoint')
+ if endp and endp.startswith('rtpbridge/*@'):
+ endp = None
+ traits = Traits(
+ tid=tid,
+ endp=endp,
+ ci=p.get('mgcp.param_connectionid'),
+ verb=verb,
+ rsp=rsp,
+ rtp_port=rtp_port,
+ )
+
+ s.tid = tid
+ super().__init__(msg=msg, proto='mgcp', msgtype=msgtype, traits=traits)
+
+ def label(s):
+ return f'mgcp.{s.tid}.{s.msgtype}'
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ if msg.get_trait('mgcp', 'verb') == 'CRCX':
+ dst_kind = 'MGW'
+ if msg.src_entity_is('BSC'):
+ dst_kind = 'MGW@BSC'
+ elif msg.src_entity_is('MSC'):
+ dst_kind = 'MGW@MSC'
+ return Message.EntityIdent(proto='mgcp', dst_kind=dst_kind)
+ elif msg.get_trait('mgcp', 'rsp') and msg.src_entity_is('MGW', 'MGW@BSC','MGW@MSC'):
+ rtp = msg.get_trait('mgcp', 'rtp_port')
+ if rtp:
+ msg.src.entity.add_port('rtp', rtp)
+ return None
+
+ @classmethod
+ def find_req(cls, messages, my_idx):
+ msg = messages[my_idx]
+ for match in find_same_trait(msg, messages, my_idx, 'mgcp', 'tid'):
+ if match.get_trait('mgcp', 'rsp'):
+ continue
+ if not match.same_src_dst(msg, reverse=True):
+ continue
+ return match
+ return None
+
+ @classmethod
+ def identify_conns(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+ proto = 'mgcp'
+
+ if msg.get_trait('mgcp', 'verb'):
+ endp = msg.get_trait('mgcp', 'endp')
+ mgw_port = msg.dst
+ endp_conn = Conn.find(proto, mgw_port, endp)
+ if endp_conn is not None:
+ endp_conn.add_message(msg)
+ for c in (msg.src_conns + msg.dst_conns):
+ endp_conn.merge_subscr_conns(c)
+
+ ci = msg.get_trait('mgcp', 'ci')
+ conn_id = f'{endp}:{ci}'
+ conn = Conn.find(proto, mgw_port, conn_id)
+ if conn:
+ conn.add_message(msg)
+ for c in (msg.src_conns + msg.dst_conns + msg.meta_conns):
+ conn.merge_subscr_conns(c)
+
+ if msg.get_trait('mgcp', 'rsp') == 'OK':
+ req = cls.find_req(messages, my_idx)
+ if req is None:
+ ERR('MGCP response without request')
+ return
+ mgw_port = req.dst
+
+ verb = req.get_trait('mgcp', 'verb')
+ if verb == 'CRCX':
+ # The MGCP connection
+ endp = msg.get_trait('mgcp', 'endp') or req.get_trait('mgcp', 'endp')
+ ci = msg.get_trait('mgcp', 'ci')
+ if not endp or not ci:
+ ERR('MGCP CRCX with endp =', endp, 'ci =', ci)
+
+ # creating two levels of conn: a meta conn with just endp,
+ # and a proper one with endp+ci
+ # endp:
+ endp_conn = Conn.find(proto, mgw_port, endp)
+ if not endp_conn:
+ endp_conn = Conn.open(proto, mgw_port, conn_id=endp, start_msg=msg)
+
+ # endp+ci:
+ conn_id = f'{endp}:{ci}'
+ conn = Conn.find(proto, mgw_port, conn_id)
+ if not conn:
+ conn = Conn.open(proto, mgw_port, conn_id, msg)
+ conn.add_message(req)
+ conn.add_message(msg)
+ conn.merge_subscr_conns(endp_conn)
+
+ # The RTP connection set up by MGCP
+ rtp_port = msg.get_trait('mgcp', 'rtp_port')
+ if rtp_port:
+ rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, merge_counterparts=False,
+ entity=conn.port.entity)
+ rtp_conn.merge_subscr_conns(conn)
+ # bssmap or ranap layer will mention this rtp_port in their Assignment / RAB-Assignment
+
+ else:
+ endp = req.get_trait('mgcp', 'endp')
+ ci = req.get_trait('mgcp', 'ci')
+ Conn.message(proto, mgw_port, endp, msg)
+
+ conn_id = f'{endp}:{ci}'
+ Conn.message(proto, mgw_port, conn_id, msg)
+
+ if verb == 'DLCX':
+ # The MGCP connection
+ endp = req.get_trait('mgcp', 'endp')
+ ci = req.get_trait('mgcp', 'ci')
+ if not endp:
+ ERR('MGCP DLCX without endp')
+
+ def close_ci(ci):
+ ci_conn_id = f'{endp}:{ci}'
+ # go through all RTP ports created in this conn
+ ci_conn = Conn.find(proto, mgw_port, ci_conn_id)
+ if ci_conn is not None:
+ all_rtp_ports = UniqueList()
+ for msgs in (ci_conn.rx_messages, ci_conn.tx_messages):
+ for msg in msgs:
+ all_rtp_ports.append(msg.get_trait('mgcp', 'rtp_port'))
+
+ for rtp_port in all_rtp_ports:
+ Conn.close('rtp', rtp_port, conn_id=rtp_port.key(), close_msg=msg, if_exists=True)
+ Conn.close(proto, mgw_port, ci_conn_id, msg)
+
+ if ci:
+ close_ci(ci)
+ else:
+ endp_conn = Conn.find(proto, mgw_port, endp)
+ all_ci = UniqueList()
+ if endp_conn:
+ for msgs in (endp_conn.tx_messages, endp_conn.rx_messages):
+ for msg in msgs:
+ all_ci.append(msg.get_trait('mgcp', 'ci'))
+ for ci in all_ci:
+ close_ci(ci)
+
+ Conn.close(proto, mgw_port, endp, msg, if_exists=True)
+
+
+ elif msg.get_trait('mgcp', 'verb') == 'MDCX':
+ # The RTP connection set up by BSC or MSC
+ rtp_port = msg.get_trait('mgcp', 'rtp_port')
+ if rtp_port and Conn.find('rtp', rtp_port, conn_id=rtp_port.key()) is None:
+ rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, merge_counterparts=False)
+ for c in msg.src_conns:
+ rtp_conn.merge_subscr_conns(c)
+
+class Layer_sccp(Layer):
+ def __init__(s, msg:Message):
+ p = msg.p
+ msgtype = p.get('sccp.message_type.showname')
+ traits = Traits(
+ src_lref=p.get('sccp.slr'),
+ dst_lref=p.get('sccp.dlr'),
+ )
+ super().__init__(msg=msg, proto='sccp', msgtype=msgtype, traits=traits, hidden=True)
+
+ def collapse(s, messages, my_idx):
+ msg = s.msg
+
+ # cut out STP hop
+ if SCCP_COLLAPSE_STP:
+ src = msg.src
+ t = msg.timestamp
+ for i in reversed(range(my_idx)):
+ prev_msg = messages[i]
+ if not prev_msg:
+ continue
+ if t - prev_msg.timestamp > 1:
+ break
+ if prev_msg.absorbed:
+ continue
+ prev_sccp = prev_msg.layers.get(s.proto, None)
+ if prev_sccp is None:
+ continue
+ #if src != prev_msg.dst:
+ # continue
+ if s.msgtype != prev_sccp.msgtype:
+ continue
+ if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True):
+ continue
+ if not msg.same_traits(prev_msg, 'sctp', 'stream_id'):
+ continue
+ if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')):
+ continue
+ if not msg.same_traits(prev_msg, 'm3ua', 'message_length'):
+ continue
+
+ prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst'))
+ prev_msg.dst = msg.dst
+ prev_msg.absorb_msg(msg)
+ messages[i] = None
+ messages[my_idx] = prev_msg
+ Entity.add_to_blacklist(src)
+ return prev_msg
+ return msg
+
+ @classmethod
+ def identify_conns(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+ proto = 'sccp'
+ msgtype = msg.get_trait(proto, 'msgtype')
+ if SCCP_COLLAPSE_STP and not msg.absorbed:
+ return
+ src_id = msg.get_trait(proto, 'src_lref')
+ dst_id = msg.get_trait(proto, 'dst_lref')
+ if msgtype == 'Connection-Request':
+ Conn.open(proto, msg.src, src_id, msg)
+ elif msgtype == 'Connection-Confirm':
+ Conn.open(proto, msg.src, src_id, msg,
+ counterparts=[Conn.find(proto, msg.dst, dst_id)])
+ elif msgtype in ('Release-Complete',):
+ Conn.close(proto, msg.src, src_id, msg)
+ else:
+ if src_id:
+ Conn.message(proto, msg.src, src_id, msg)
+ if dst_id:
+ Conn.message(proto, msg.dst, dst_id, msg)
+
+
+class Layer_m3ua(Layer):
+ def __init__(s, msg:Message):
+ traits = Traits(
+ opc = msg.p.get('m3ua.protocol_data_opc'),
+ dpc = msg.p.get('m3ua.protocol_data_dpc'),
+ message_length = msg.p.get('m3ua.message_length'),
+ )
+ super().__init__(msg=msg, proto='m3ua', msgtype=None, traits=traits, minor=True)
+
+# wireshark commonly falsely classifies a BSSMAP Ciphering Mode Command as RNSAP PDU
+class Layer_rnsap(Layer):
+ def __init__(s, msg:Message):
+ p = msg.p
+ traits = Traits()
+ msgtype = 'Cipher Mode Command'
+ super().__init__(msg=msg, proto='bssmap', msgtype=msgtype, traits=traits)
+
+class Layer_bssap(Layer):
+ 'BSSAP, either BSS Management (see Layer_gsm_a_bssmap) or Direct Transfer (see Layer_gsm_a_dtap)'
+ def __init__(s, msg:Message):
+ msgtype = msg.p.get('bssap.msgtype.showname')
+ msgtype = msg.p.get('bssap.pdu_type.showname')
+
+ traits = Traits(
+ msgtype_nr=int(msg.p.get('bssap.pdu_type'), 16),
+ )
+
+ super().__init__(msg=msg, proto='bssap', msgtype=msgtype, traits=traits, minor=True)
+
+class Layer_bssgp(Layer):
+ def __init__(s, msg:Message):
+ msgtype = sane_msgtype(msg.p.get('bssgp.pdu_type.showname'))
+ traits = Traits(
+ tlli=msg.p.get('bssgp.gsm_a_rr_tlli'),
+ )
+ super().__init__(msg=msg, proto='bssgp', msgtype=msgtype, traits=traits)
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ if msg.get_trait('bssgp', 'msgtype') == 'FLOW-CONTROL-BVC':
+ return Message.EntityIdent(proto='bssgp', src_kind='PCU', dst_kind='SGSN')
+ return None
+
+ @classmethod
+ def identify_conns(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+ proto = 'bssgp'
+ msgtype = msg.get_trait('dtap', 'msgtype')
+ tlli = msg.get_trait('bssgp', 'tlli')
+ conn_id = tlli
+ if not conn_id:
+ return
+ if msgtype == 'Attach-Request':
+ Conn.open(proto, msg.src, conn_id, msg,
+ counterparts=[Conn.open(proto, msg.dst, conn_id, msg)])
+ elif msgtype == 'Attach-Accept':
+ conn = Conn.close(proto, msg.src, conn_id, msg)
+ new_conn_id = msg.get_trait('dtap', 'tmsi')
+ new_conn = Conn.open(proto, msg.src, new_conn_id, msg,
+ counterparts=[Conn.open(proto, msg.dst, new_conn_id, msg)])
+ new_conn.merge_subscr_conns(conn)
+ elif msgtype == 'Attach-Complete':
+ Conn.close(proto, msg.src, conn_id, msg)
+ else:
+ conn = Conn.message(proto, msg.src, conn_id, msg)
+
+
+
+
+class Layer_hnbap(Layer):
+ def __init__(s, msg:Message):
+ def strip_till_dash(dashstr):
+ if not dashstr or not '-' in dashstr:
+ return dashstr
+ dash = dashstr.rindex('-')
+ return dashstr[dash+1:]
+
+ msgtype = strip_till_dash(msg.p.get('hnbap.procedurecode.showname'))
+ pdutype = strip_till_dash(sane_msgtype(msg.p.get('hnbap.hnbap_pdu.showname')))
+ pdutype_nr = msg.p.get('hnbap.hnbap_pdu')
+ traits = Traits(
+ msgtype_nr=int(msg.p.get('hnbap.procedurecode')),
+ pdutype=pdutype,
+ pdutype_nr=int(pdutype_nr),
+ )
+ super().__init__(msg=msg, proto='hnbap', msgtype=msgtype, traits=traits)
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ if (msg.get_trait('hnbap', 'msgtype') in ('Register', 'HNBRegister', 'UERegister')) and (msg.get_trait('hnbap', 'pdutype_nr') == 0):
+ return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW')
+ return None
+
+class Layer_rua(Layer):
+ def __init__(s, msg:Message):
+ def strip_till_dash(dashstr):
+ if not dashstr or not '-' in dashstr:
+ return dashstr
+ dash = dashstr.rindex('-')
+ return dashstr[dash+1:]
+
+ msgtype = strip_till_dash(msg.p.get('rua.procedurecode.showname'))
+ pdutype = strip_till_dash(sane_msgtype(msg.p.get('rua.rua_pdu.showname')))
+ pdutype_nr = msg.p.get('rua.rua_pdu')
+ cn_domain_i = msg.p.get('rua.cn_domainindicator')
+ cn_domain = None
+ if cn_domain_i == '0':
+ cn_domain = 'cs'
+ elif cn_domain_i == '1':
+ cn_domain = 'ps'
+ traits = Traits(
+ msgtype_nr=int(msg.p.get('rua.procedurecode')),
+ pdutype=pdutype,
+ cn_domain=cn_domain,
+ rua_ctx=msg.p.get('rua.context_id'),
+ )
+ super().__init__(msg=msg, proto='rua', msgtype=msgtype, traits=traits, cap_p_name='rua')
+
+ @classmethod
+ def identify_conns(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+ proto = 'rua'
+ msgtype = msg.get_trait(proto, 'msgtype')
+ conn_id = (msg.get_trait(proto, 'cn_domain') or '?') + ':' + (msg.get_trait(proto, 'rua_ctx') or '?')
+ if msgtype == 'Connect':
+ conn = Conn.open(proto, msg.src, conn_id, msg)
+ Conn.open(proto, msg.dst, conn_id, msg, counterparts=[conn])
+ elif msgtype == 'Disconnect':
+ Conn.close(proto, msg.dst, conn_id, msg)
+ else:
+ Conn.message(proto, msg.src, conn_id, msg)
+
+
+class Layer_ranap(Layer):
+ def __init__(s, msg:Message):
+ msgtype = msg.p.get('ranap.rab_assignmentrequest_element'
+ ) or msg.p.get('ranap.rab_assignmentresponse_element'
+ ) or msg.p.get('ranap.initiatingmessage_element')
+
+ rtp_port = None
+ ip = msg.p.get('ranap.nsap_ipv4_addr')
+ port_bin = msg.p.get('ranap.bindingid.binary_value')
+ if ip and port_bin and len(port_bin) >= 2:
+ port = int.from_bytes(port_bin[:2], "big")
+ rtp_port = IpPort.get(ip, port)
+
+ traits = Traits(
+ rtp_port=rtp_port,
+ )
+ super().__init__(msg=msg, proto='ranap', msgtype=msgtype, traits=traits)
+
+ def collapse(s, messages, my_idx):
+ msg = s.msg
+
+ # cut out HNBGW hop
+ if IUH_COLLAPSE_HNBGW:
+ src = msg.src
+ t = msg.timestamp
+ for i in reversed(range(my_idx)):
+ prev_msg = messages[i]
+ if not prev_msg:
+ continue
+ if t - prev_msg.timestamp > 1:
+ break
+ if src != prev_msg.dst:
+ continue
+ if msg.src.entity is not prev_msg.dst.entity:
+ continue
+ # DOESNT WORK
+ if not msg.same_traits(prev_msg, 'ranap', None):
+ continue
+ if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True):
+ continue
+ if not msg.same_traits(prev_msg, 'sctp', 'stream_id'):
+ continue
+ if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')):
+ continue
+
+ prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst'))
+ prev_msg.dst = msg.dst
+ prev_msg.absorb_msg(msg)
+ messages[i] = None
+ messages[my_idx] = prev_msg
+ Entity.add_to_blacklist(src)
+ return prev_msg
+ return msg
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ ids = []
+ ids.append(s.identify_attach(msg, messages, my_idx))
+
+ msgtype = msg.get_trait('ranap', 'msgtype')
+ rtp_port = msg.get_trait('ranap', 'rtp_port')
+
+ if rtp_port and msgtype == 'RAB-AssignmentRequest':
+ # associate the MSC's MGCP port, but take care to not say the STP is an MSC
+ crcx_ok = msg.find_message('mgcp', 'rtp_port', rtp_port)
+ if crcx_ok and crcx_ok.src_entity_is('MGW'):
+ mgw = crcx_ok.src.entity
+
+ msc = None
+ if msg.src_entity_is('MSC'):
+ msc = msg.src.entity
+
+ ids.append(Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@MSC', rename=True,
+ dst_entity=msc, dst_port=crcx_ok.dst if msc else None))
+
+ if rtp_port and msgtype == 'RAB-AssignmentResponse' and msg.src_entity_is('hNodeB'):
+ ids.append(Message.EntityIdent(proto='rtp', src_kind='hNodeB', src_port=rtp_port, src_entity=msg.src.entity))
+
+ return ids
+
+ def identify_attach(s, msg:Message, messages, my_idx):
+ ids = []
+ dst_kind = None
+ msgtype = msg.get_trait('dtap', 'msgtype')
+ if msgtype in DTAP_COMPL_L3:
+ dst_kind = 'MSC'
+ proto = 'IuCS'
+ elif msgtype in GMM_COMPL_L3:
+ dst_kind = 'SGSN'
+ proto = 'IuPS'
+
+ if not dst_kind:
+ return None
+
+ if 'rua' in msg.layers:
+ return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW')
+
+ # don't mistake the STP as MSC or SGSN
+ if SCCP_COLLAPSE_STP and not msg.absorbed:
+ return None
+ if not SCCP_COLLAPSE_STP and msg.src_entity_is('HNBGW'):
+ return None
+
+ # FIXME: below only makes sense with SCCP_COLLAPSE_STP == True
+ if not SCCP_COLLAPSE_STP:
+ return None
+
+ # find a HNBGW that has recently received the same LU,
+ # associate IuCS port
+ src_entity = None
+ for match in find_same_trait(msg, messages, my_idx, 'dtap', None):
+ if 'rua' not in match.layers:
+ continue
+ if not match.dst_entity_is('HNBGW'):
+ continue
+ src_entity = match.dst.entity
+ if src_entity:
+ break
+
+ return Message.EntityIdent(proto=proto, src_kind='HNBGW', src_entity=src_entity, dst_kind=dst_kind)
+
+ @classmethod
+ def identify_conns(cls, messages, my_idx):
+ msg = messages[my_idx]
+
+ if SCCP_COLLAPSE_STP and not msg.absorbed:
+ return
+
+ rtp_port = msg.get_trait('ranap', 'rtp_port')
+ if rtp_port:
+ rtp_conn = Conn.find('rtp', rtp_port, rtp_port.key())
+ if rtp_conn:
+ for c in msg.src_conns:
+ rtp_conn.merge_subscr_conns(c)
+
+
+class Layer_gsm_a_bssmap(Layer):
+ def __init__(s, msg:Message):
+ p = msg.p
+ msgtype = p.get('gsm_a_bssmap.msgtype.showname')
+
+ rtp_port = None
+ ip = p.get('gsm_a_bssmap.aoip_trans_ipv4')
+ port = p.get('gsm_a_bssmap.aoip_trans_port')
+ if ip and port:
+ rtp_port = IpPort.get(ip, port)
+
+ tmsi = tmsi_standardize(p.get('gsm_a_bssmap.gsm_a_tmsi'))
+
+ traits = Traits(
+ msgtype_nr=str_to_int(p.get('gsm_a_bssmap.msgtype')),
+ rtp_port=rtp_port,
+ imsi=p.get('gsm_a_bssmap.e212_imsi'),
+ tmsi=tmsi,
+ )
+ super().__init__(msg=msg, proto='bssmap', msgtype=msgtype, traits=traits, cap_p_name='gsm_a_bssmap')
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ # don't mistake the STP as MSC or BSC
+ if SCCP_COLLAPSE_STP and not msg.absorbed:
+ return None
+
+ msgtype = msg.get_trait('bssmap', 'msgtype')
+ if msgtype in ('Complete-Layer-3-Information', 'Clear-Complete'):
+
+ # associate BSC BSSMAP port with BSC RSL port
+ src_entity = None
+ for match in find_same_trait(msg, messages, my_idx, 'dtap', ('tmsi', 'imsi')):
+ if 'rsl' not in match.layers:
+ continue
+ if not match.dst.entity or match.dst.entity.kind != 'BSC':
+ continue
+ src_entity = match.dst.entity
+ if src_entity:
+ break
+
+ return Message.EntityIdent(proto='bssmap', src_kind='BSC', dst_kind='MSC', src_entity = src_entity)
+
+ if msgtype == 'Assignment-Request':
+ # This Assignment-Request's rtp_port should match an earlier MGCP CRCX-OK message, and we now
+ # know that this MSC asked for it.
+ rtp_port = msg.get_trait('bssmap', 'rtp_port')
+ if not rtp_port:
+ return None
+
+ def cond(prev_msg):
+ return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port
+ crcx_ok = None
+ for prev_msg in find_recent_msg(msg, messages, my_idx, cond):
+ crcx_ok = prev_msg
+ break
+ if not crcx_ok:
+ return None
+ msc = msg.src.entity
+ mgw = crcx_ok.src.entity
+ return Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@MSC', rename=True,
+ dst_port=crcx_ok.dst, dst_entity=msc, dst_kind='MSC')
+
+
+ return None
+
+ @classmethod
+ def identify_conns(cls, messages, my_idx):
+ msg = messages[my_idx]
+ msgtype = msg.get_trait('bssmap', 'msgtype')
+
+ if SCCP_COLLAPSE_STP and not msg.absorbed:
+ return
+
+ # Paging does not have a proper end, it may never be answered.
+ # The RSL paging command hopefully happens, and closes this Conn.
+ if msgtype == 'Paging':
+ imsi = msg.get_trait('bssmap', 'imsi')
+ if imsi is not None:
+ c = Conn.open('bssmap', msg.dst, f'page_imsi{imsi}', msg)
+ Conn.close_conn(c, msg)
+ tmsi = msg.get_trait('bssmap', 'tmsi')
+ if tmsi is not None:
+ c = Conn.open('bssmap', msg.dst, f'page_tmsi{tmsi}', msg)
+ Conn.close_conn(c, msg)
+
+ rtp_port = msg.get_trait('bssmap', 'rtp_port')
+ if rtp_port is None:
+ return
+
+ rtp_conn = Conn.find('rtp', rtp_port, rtp_port.key())
+ if rtp_conn is None:
+ return
+ for c in msg.src_conns:
+ rtp_conn.merge_subscr_conns(c)
+
+ def cond(prev_msg):
+ return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port
+ crcx_ok = None
+ for prev_msg in find_recent_msg(msg, messages, my_idx, cond):
+ for c in (prev_msg.src_conns + prev_msg.dst_conns):
+ rtp_conn.merge_subscr_conns(c)
+
+
+class Layer_gsm_abis_rsl(Layer):
+ def __init__(s, msg:Message):
+ p = msg.p
+ msgtype = sane_msgtype(p.get('gsm_abis_rsl.msg_type.showname'))
+ msgtype_nr = p.get('gsm_abis_rsl.msg_type')
+ msgtype_nr = str_to_int(msgtype_nr)
+
+ ch = None
+ ch_imm_ass = None
+
+ # For Immediate Assignment, the assigned TS/chan is more interesting
+ ts = p.get('gsm_a_ccch.gsm_a_rr_timeslot')
+ cbits = (p.get('gsm_a_ccch.gsm_a_rr_sdcch4_sdcchc4_cbch')
+ or p.get('gsm_a_ccch.gsm_a_rr_sdcch8_sdcchc8_cbch'))
+ try:
+ ch_ts = int(ts)
+ ch_cbits = int(cbits)
+ ch_imm_ass = f'{ch_ts}-{ch_cbits}'
+ except:
+ pass
+
+ # normal RSL messages on a given TS/chan
+ ts = p.get('gsm_abis_rsl.ch_no_tn')
+ cbits = p.get('gsm_abis_rsl.ch_no_cbits')
+ if ts is not None and cbits is not None:
+ try:
+ ch_ts = int(ts)
+ ch_cbits = int(cbits)
+ ch = f'{ch_ts}-{ch_cbits}'
+ except:
+ raise
+
+ ch_assign = None
+ new_ch_ts = p.get('gsm_a_dtap.gsm_a_rr_timeslot')
+ new_ch_ss = p.get('gsm_a_dtap.gsm_a_rr_tch_facch_sacchf')
+ if new_ch_ts and new_ch_ss:
+ ch_assign = f'{new_ch_ts}-{new_ch_ss}'
+
+ rtp_local_port = None
+ ipacc_rtp_local_ip = p.get('gsm_abis_rsl.ipacc_local_ip')
+ ipacc_rtp_local_port = p.get('gsm_abis_rsl.ipacc_local_port')
+ if ipacc_rtp_local_ip and ipacc_rtp_local_port:
+ rtp_local_port = IpPort.get(ipacc_rtp_local_ip, ipacc_rtp_local_port)
+
+ rtp_remote_port = None
+ ipacc_rtp_remote_ip = p.get('gsm_abis_rsl.ipacc_remote_ip')
+ ipacc_rtp_remote_port = p.get('gsm_abis_rsl.ipacc_remote_port')
+ if ipacc_rtp_remote_ip and ipacc_rtp_remote_port:
+ rtp_remote_port = IpPort.get(ipacc_rtp_remote_ip, ipacc_rtp_remote_port)
+
+ req_ref_l = (p.get('gsm_abis_rsl.req_ref_ra'),
+ p.get('gsm_abis_rsl.req_ref_t1prim'),
+ p.get('gsm_abis_rsl.req_ref_t2'),
+ p.get('gsm_abis_rsl.req_ref_t3'))
+ if not all(req_ref_l):
+ req_ref_l = (p.get('gsm_a_ccch.gsm_a_rr_ra'),
+ p.get('gsm_a_ccch.gsm_a_rr_t1prim'),
+ p.get('gsm_a_ccch.gsm_a_rr_t2'),
+ p.get('gsm_a_ccch.gsm_a_rr_t3'))
+ req_ref = None
+ if all(req_ref_l):
+ req_ref = '-'.join(req_ref_l)
+
+ try:
+ page_tmsi = tmsi_standardize(msg.p.cap_p.gsm_abis_rsl._all_fields['3gpp.tmsi'])
+ except:
+ page_tmsi = None
+
+ traits = Traits(
+ msgtype_nr=msgtype_nr,
+ ch=ch,
+ ch_imm_ass=ch_imm_ass,
+ ch_assign=ch_assign,
+ chan_type=p.get('gsm_abis_rsl.ch_type'),
+ rtp_port=rtp_local_port,
+ rtp_remote_port=rtp_remote_port,
+ tmsi=tmsi_standardize(p.get('gsm_abis_rsl.gsm_a_tmsi')),
+ imsi=p.get('gsm_abis_rsl.gsm_a_imsi'),
+ page_tmsi=page_tmsi,
+ arfcn = p.get('gsm_a_dtap.gsm_a_rr_single_channel_arfcn') or p.get('gsm_abis_rsl.gsm_a_rr_single_channel_arfcn'),
+ req_ref = req_ref,
+ )
+
+ super().__init__(msg=msg, proto='rsl', msgtype=msgtype, traits=traits, cap_p_name='gsm_abis_rsl')
+ # ignore CCCH Load INDication
+ #if msgtype_nr == 18:
+ # msg.hide = True
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ ids = []
+ msgtype = msg.get_trait('rsl', 'msgtype')
+
+ if msgtype in ('RF-RESource-INDication', 'CCCH-LOAD-INDication', 'CHANnel-ReQuireD', ):
+ # INDication from BTS to BSC
+ ids.append(Message.EntityIdent(proto='rsl', src_kind='BTS', dst_kind='BSC'))
+ if msgtype in ('CHANnel-ACTIVation', 'IMMEDIATE-ASSIGN-COMMAND'):
+ # from BSC to BTS
+ ids.append(Message.EntityIdent(proto='rsl', dst_kind='BTS', src_kind='BSC'))
+
+ rtp_port = msg.get_trait('rsl', 'rtp_port')
+ if rtp_port and msg.src_entity_is('BTS') and msgtype in (
+ 'ip.access-CRCX-ACK', 'ip.access-MDCX-ACK'):
+ ids.append(Message.EntityIdent(proto='rtp', src_kind='BTS', src_entity=msg.src.entity, src_port=rtp_port))
+
+ if msgtype == 'ip.access-MDCX':
+ # This ip.a MDCX's rtp_remote_port should match an earlier MGCP CRCX-OK message, and we now
+ # know that this BSC asked for it.
+
+ rtp_port = msg.get_trait('rsl', 'rtp_remote_port')
+ if not rtp_port:
+ return None
+
+ def cond(prev_msg):
+ return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port
+ crcx_ok = None
+ for prev_msg in find_recent_msg(msg, messages, my_idx, cond):
+ crcx_ok = prev_msg
+ break
+ if not crcx_ok:
+ return None
+ bsc = msg.src.entity
+ mgw = crcx_ok.src.entity
+ ids.append( Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@BSC', rename=True,
+ dst_port=crcx_ok.dst, dst_entity=bsc, dst_kind='BSC') )
+
+ return ids
+
+ def collapse(s, messages, my_idx):
+ # combine duplicates like rsl.CCCH-LOAD-INDication
+ for i in reversed(range(my_idx)):
+ prev_msg = messages[i]
+ if not prev_msg:
+ continue
+ if prev_msg.finalized:
+ break
+ # stop combining at any non-rsl (and non-minor) message
+ if not 'rsl' in prev_msg.layers:
+ if all(l.minor for l in prev_msg.layers.values()):
+ continue
+ else:
+ break
+ if not same_nonempty(prev_msg.get_traits('rsl'), s.msg.get_traits('rsl')):
+ continue
+ if s.msg.same_src_dst(prev_msg, forward=True):
+ # found a recent similar packet, combine
+ prev_msg.count += 1
+ messages[my_idx] = None
+ prev_msg.absorb_msg(s.msg)
+ return prev_msg
+ return s.msg
+
+ @classmethod
+ def identify_conns_ra(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+
+ msgtype = msg.get_trait('rsl', 'msgtype')
+
+ bts = msg.entity('BTS')
+ if bts is None:
+ return None
+ bts_port = msg.get_port('BTS')
+ if bts_port is None:
+ return None
+ ra = msg.get_trait('rsl', 'req_ref')
+ if ra is None:
+ return None
+ bts_ra = f'{bts_port.entity.label()}.ra{ra}'
+
+ proto = 'rsl'
+ conn_id = bts_ra
+
+ conn = None
+
+ if msgtype == 'CHANnel-ReQuireD':
+ if not msg.src_entity_is('BTS'):
+ return
+ bts = msg.src.entity
+ bts_port = msg.src
+ bsc = msg.dst.entity
+ bsc_port = msg.dst
+ conn = Conn.open(proto, bts_port, conn_id, msg, entity=bts)
+ Conn.open(proto, bsc_port, conn_id, msg, entity=bsc, counterparts=[conn])
+
+ elif msgtype in ('CHANnel-ACTIVation'):
+ conn = Conn.message(proto, msg.dst, conn_id, msg)
+
+ elif msgtype == 'IMMEDIATE-ASSIGN-COMMAND':
+ # the RA token has fulfilled its use as soon as an IMM ASS happened
+ Conn.close(proto, msg.dst, conn_id, msg)
+
+ else:
+ Conn.message(proto, msg.dst, conn_id, msg)
+
+ if conn:
+ for c in (msg.src_conns + msg.dst_conns):
+ conn.merge_subscr_conns(c)
+
+ @classmethod
+ def identify_conns_ch(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+ proto = 'rsl'
+ msgtype = msg.get_trait('rsl', 'msgtype')
+
+ # For Immediate Assignment, the assigned TS/chan is the one to match on
+ if msgtype == 'IMMEDIATE-ASSIGN-COMMAND':
+ ch = msg.get_trait('rsl', 'ch_imm_ass')
+ else:
+ ch = msg.get_trait('rsl', 'ch')
+
+ if ch is None:
+ return None
+
+ if msgtype in ('CHANnel-ACTIVation') or msg.src_entity_is('BSC'):
+ bsc = msg.src.entity
+ bsc_port = msg.src
+ bts = msg.dst.entity
+ bts_port = msg.dst
+ elif msg.src_entity_is('BTS'):
+ bts = msg.src.entity
+ bts_port = msg.src
+ bsc = msg.dst.entity
+ bsc_port = msg.dst
+ else:
+ return None
+
+ if bts_port.entity is None:
+ return None
+
+ bts_ch = f'{bts_port.entity.label()}.ch{ch}'
+ conn_id = bts_ch
+
+ if msgtype == 'CHANnel-ACTIVation':
+ conn = Conn.open(proto, bts_port, conn_id, msg, entity=bts)
+ Conn.open(proto, bsc_port, conn_id, msg, entity=bsc, counterparts=[conn])
+ elif msgtype == 'RF-CHANnel-RELease-ACKnowledge':
+ conn = Conn.close(proto, msg.src, conn_id, msg)
+ else:
+ conn = Conn.message(proto, bts_port, conn_id, msg)
+ if conn:
+ for c in (msg.src_conns + msg.dst_conns):
+ conn.merge_subscr_conns(c)
+
+ # when changing to a new ch, e.g. a regular Assignment Command to change from SDCCH to TCH,
+ # associate the new channel with the conn
+ ch_assign = msg.get_trait('rsl', 'ch_assign')
+ if ch_assign:
+ conn_id = f'{bts_port.entity.label()}.ch{ch_assign}'
+ conn = Conn.find(proto, bts_port, conn_id)
+ if conn:
+ conn.add_message(msg)
+ for c in (msg.src_conns + msg.dst_conns):
+ conn.merge_subscr_conns(c)
+
+
+ @classmethod
+ def identify_conns_rtp(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+ # BTS RTP
+ rtp_port = msg.get_trait('rsl', 'rtp_port')
+ if rtp_port and msg.src_entity_is('BTS') and msg.get_trait('rsl','msgtype') == 'ip.access-CRCX-ACK':
+ rtp_port.entity = msg.src.entity
+ rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, add_message=False)
+ for c in (msg.src_conns + msg.dst_conns):
+ rtp_conn.merge_subscr_conns(c)
+
+ # MGW@BSC RTP towards BTS
+ rtp_port = msg.get_trait('rsl', 'rtp_remote_port')
+ if rtp_port:
+ rtp_conn = Conn.find('rtp', rtp_port, conn_id=rtp_port.key())
+ if rtp_conn:
+ rtp_conn.add_message(msg)
+ for c in (msg.src_conns + msg.dst_conns):
+ rtp_conn.merge_subscr_conns(c)
+
+ @classmethod
+ def identify_conns_paging(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+ msgtype = msg.get_trait('rsl', 'msgtype')
+
+ if msgtype != 'PAGING-CoMmanD':
+ return
+
+ tmsi = msg.get_trait('rsl', 'page_tmsi')
+ if tmsi is None:
+ return
+ conn_id = f'page_tmsi{tmsi}'
+
+ subscr = Subscriber.by_tmsi(tmsi)
+ if subscr is None:
+ return
+
+ rsl_conn = Conn.open('rsl', msg.src, conn_id, msg)
+ subscr_conn = SubscriberConn()
+ subscr.add_subscriber_conn(subscr_conn)
+ subscr_conn.add_conn(rsl_conn)
+
+ # Paging does not have a proper end, it may never be answered.
+ # A Conn wants to be closed at some point. Just close it directly.
+ Conn.close_conn(rsl_conn, msg)
+
+ # Expecting a recent Paging on BSSMAP
+ bssmap_paging = None
+ def cond(prev_msg):
+ return (prev_msg.get_trait('bssmap', 'msgtype') == 'Paging'
+ and prev_msg.get_trait('bssmap', 'tmsi') == tmsi)
+ for match in find_recent_msg(msg, messages, my_idx, cond):
+ bssmap_paging = match
+ break
+
+ if bssmap_paging is None:
+ return
+
+ bssmap_conn = Conn.find('bssmap', bssmap_paging.dst, conn_id, find_in_closed_conns=True)
+ if bssmap_conn:
+ rsl_conn.merge_subscr_conns(bssmap_conn)
+
+ @classmethod
+ def identify_conns(cls, messages:list, my_idx:int):
+ cls.identify_conns_ra(messages, my_idx)
+ cls.identify_conns_ch(messages, my_idx)
+ cls.identify_conns_rtp(messages, my_idx)
+ cls.identify_conns_paging(messages, my_idx)
+
+class Layer_gsm_a_dtap(Layer):
+ def __init__(s, msg:Message):
+ dtap = msg.p.get('gsm_a_dtap')
+ assert dtap is not None
+
+ msgtype = None
+ msgtype_nr = None
+ for f in dtap._get_all_fields_with_alternates():
+ if f.name.startswith('gsm_a.dtap.msg_') and f.name.endswith('_type'):
+ msgtype = f.showname_value
+ try:
+ msgtype_nr = int(f.raw_value)
+ except:
+ pass
+ traits = Traits(
+ msgtype_nr=msgtype_nr,
+ imsi=msg.p.get('gsm_a_dtap.e212_imsi') or msg.p.get('gsm_a_dtap.gsm_a_imsi'),
+ tmsi=tmsi_standardize(msg.p.get('gsm_a_dtap.gsm_a_tmsi') or msg.p.get('gsm_a_dtap.3gpp_tmsi')),
+ imei=msg.p.get('gsm_a_dtap.gsm_a_imei'),
+ to_msisdn=msg.p.get('gsm_a_dtap.cld_party_bcd_num'),
+ )
+ super().__init__(msg=msg, proto='dtap', msgtype=msgtype, traits=traits, cap_p_name='gsm_a_dtap')
+
+
+class Layer_gsup(Layer):
+ def __init__(s, msg:Message):
+ msgtype = sane_msgtype(msg.p.get('gsup.msg_type.showname'))
+ msisdn = None
+ if '-forwardSM-' not in msgtype:
+ msisdn = msg.p.get('gsup.e164_msisdn')
+ to_msisdn = msg.p.get('gsup.gsm_sms_tp_da')
+
+ msgtype_nr = None
+ is_request = None
+ try:
+ msgtype_nr = int(msg.p.get('gsup.msg_type'))
+ is_request = not (msgtype_nr & 0x3)
+ except:
+ pass
+
+ session_state = None
+ try:
+ session_state = int(msg.p.get('gsup.session_state'))
+ except:
+ pass
+
+ traits = Traits(
+ imsi=msg.p.get('gsup.e212_imsi'),
+ msgtype_nr=msgtype_nr,
+ is_request=is_request,
+ cn_domain=sane_showname(msg.p.get('gsup.cn_domain.showname')),
+ msisdn=msisdn,
+ to_msisdn=to_msisdn,
+ source_name=msg.p.get('gsup.source_name_text'),
+ destination_name=msg.p.get('gsup.destination_name_text'),
+ session_id=msg.p.get('gsup.session_id'),
+ session_state=session_state,
+ )
+ super().__init__(msg=msg, proto='gsup', msgtype=msgtype, traits=traits)
+
+ @classmethod
+ def identify_conns(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+ imsi = msg.get_trait('gsup', 'imsi')
+ is_request = msg.get_trait('gsup', 'is_request')
+ session_id = msg.get_trait('gsup', 'session_id')
+ session_state = msg.get_trait('gsup', 'session_state')
+ if not imsi:
+ return
+ proto = 'gsup'
+ if session_id:
+ conn_id = f'{imsi}:{session_id}'
+ have = Conn.find(proto, msg.src, conn_id) or Conn.find(proto, msg.dst, conn_id)
+ if have is None:
+ have = Conn.open(proto, msg.src, conn_id, msg,
+ counterparts=[Conn.open(proto, msg.dst, conn_id, msg)])
+ if session_state is not None and session_state == 0x3:
+ Conn.close(proto, msg.src, conn_id, msg)
+ else:
+ have.add_message(msg)
+
+ else:
+ conn_id = f'{imsi}'
+
+ if is_request:
+ have = Conn.find(proto, msg.src, conn_id)
+ if have is None:
+ Conn.open(proto, msg.src, conn_id, msg)
+ else:
+ have.add_message(msg)
+ else:
+ have = Conn.find(proto, msg.dst, conn_id)
+ if have is None:
+ # it's a stray response? anyway create a conn for association with a subscriber
+ Conn.open(proto, msg.dst, conn_id, msg)
+ Conn.close(proto, msg.dst, conn_id, msg)
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ if msg.get_trait('gsup', 'msgtype') in ('SendAuthInfo-Request', 'UpdateLocation-Request', 'PurgeMS-Request'):
+ cn = msg.get_trait('gsup', 'cn_domain')
+ src_kind = None
+ src_entity = None
+ src_subscr_conn = None
+ if msg.get_trait('gsup', 'source_name'):
+ # proxy forwarding
+ src_kind = 'HLR'
+ traits = [name for proto, name, result in msg.get_traits('gsup') if name not in ('source_name', 'destination_name')]
+ for match in find_same_trait(msg, messages, my_idx, 'gsup', traits):
+ if not match.dst_entity_is('HLR'):
+ continue
+ src_entity = match.dst.entity
+ break
+ else:
+ if cn == 'CS':
+ src_kind = 'MSC'
+ elif cn == 'PS':
+ src_kind = 'SGSN'
+
+ if src_kind:
+ # associate MSC GSUP port with MSC BSSMAP port
+ imsi = msg.get_trait('gsup', 'imsi')
+ if imsi:
+ subscr = Subscriber.by_imsi(imsi)
+ def cond(prev_msg):
+ return (prev_msg.dst_entity_is(src_kind)
+ and prev_msg.is_subscriber_related(subscr))
+ for match in find_recent_msg(msg, messages, my_idx, cond):
+ src_entity = match.dst.entity
+ break
+
+ # i forgot whatever this does:
+ if not src_entity:
+ src_entity, src_subscr_conn = s.msg.find_entity(src_kind,
+ with_port=('bssgp', 'IuPS'))
+ else:
+ # no cn_domain in the GSUP message, try to guess
+ msc, msc_subscr_conn = s.msg.find_entity('MSC')
+ sgsn, sgsn_subscr_conn = s.msg.find_entity('SGSN')
+ if msc and not sgsn:
+ src_entity = msc
+ src_subscr_conn = msc_subscr_conn
+ if sgsn and not msc:
+ src_entity = sgsn
+ src_subscr_conn = sgsn_subscr_conn
+
+ if src_subscr_conn is not None and msg.src_conns:
+ for c in msg.src_conns:
+ c.subscriber_conn = SubscriberConn.merge(c.subscriber_conn, src_subscr_conn)
+
+ return Message.EntityIdent(proto='gsup', src_kind=src_kind, dst_kind='HLR', src_entity=src_entity, dst_entity=msg.dst.entity)
+
+class Layer_sip(Layer):
+ def __init__(s, msg:Message):
+ method = msg.p.get('sip.method')
+ cseq_method = msg.p.get('sip.cseq_method')
+ status_code = msg.p.get('sip.status_code')
+ status_line = msg.p.get('sip.status_line')
+ if status_line:
+ if '--' in status_line:
+ status_line = status_line[:status_line.index('--')]
+ status_line = status_line.strip()
+ status = status_line.split()[-1]
+ else:
+ status = status_code
+ if status:
+ msgtype = f'{cseq_method}-{status}'
+ elif method:
+ msgtype = method
+ else:
+ msgtype = cseq_method
+
+ sip_from_host = msg.p.get('sip.from_host')
+ sip_from_port = msg.p.get('sip.from_port')
+ sip_from = IpPort.get(sip_from_host, sip_from_port)
+
+ rtp_port = None
+ ip = msg.p.get('sip.sdp_connection_info_address')
+ port = msg.p.get('sip.sdp_media_port')
+ if ip and port:
+ rtp_port = IpPort.get(ip, port)
+
+ server = msg.p.get('sip.Server')
+ agent = msg.p.get('sip.User-Agent')
+
+ traits = Traits(
+ sip_agent=server or agent,
+ sip_from=sip_from,
+ call_id = msg.p.get('sip.call_id'),
+ method = cseq_method,
+ seq = msg.p.get('sip.cseq_seq'),
+ to_msisdn = msg.p.get('sip.to_user'),
+ from_msisdn = msg.p.get('sip.from_user'),
+ from_tag = msg.p.get('sip.from_tag'),
+ r_uri = msg.p.get('sip.r_uri'),
+ status_code = status_code,
+ rtp_port=rtp_port,
+ )
+ super().__init__(msg=msg, proto='sip', msgtype=msgtype, traits=traits)
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ src_kind = 'SIP'
+ sip_from = msg.get_trait('sip', 'sip_from')
+ agent = msg.get_trait('sip', 'sip_agent')
+
+ rename = False
+ if agent and sip_from and sip_from == msg.src:
+ rename = 'src'
+ if agent.startswith('kamailio'):
+ src_kind = 'PBX'
+ elif agent.startswith('sofia'):
+ src_kind = 'SIPCON'
+ else:
+ rename = False
+
+ return Message.EntityIdent(proto='sip', src_kind=src_kind, dst_kind='SIP', rename=rename)
+
+ @classmethod
+ def identify_conns(cls, messages:list, my_idx:int):
+ msg = messages[my_idx]
+
+ msgtype = msg.get_trait('sip', 'msgtype')
+ call_id = msg.get_trait('sip', 'call_id')
+ sip_from = msg.get_trait('sip', 'sip_from')
+
+ if msgtype in ('INVITE','INVITE-OK') and sip_from and sip_from == msg.src:
+ conn = Conn.open('sip', msg.src, call_id, start_msg=msg)
+ rtp_port = msg.get_trait('sip', 'rtp_port')
+ if rtp_port is None:
+ return
+
+ rtp_conn = Conn.find('rtp', rtp_port, conn_id=rtp_port.key())
+ if rtp_conn is None:
+ return
+ if not conn.subscriber_conn:
+ conn.merge_subscr_conns(rtp_conn)
+ else:
+ conn = Conn.message('sip', msg.src, call_id, msg)
+
+ if conn is None:
+ return
+
+
+class Layer_gsmtap_log(Layer):
+ def __init__(s, msg:Message):
+ app = msg.p.get('gsmtap_log.ident')
+ level = msg.p.get('gsmtap_log.level')
+ level_str = sane_showname(msg.p.get('gsmtap_log.level.showname'))
+ logmsg = msg.p.get('gsmtap_log.string')
+ cat = msg.p.get('gsmtap_log.subsys')
+
+ if level_str != 'ERROR':
+ return
+
+ return
+
+ msgtype = f'{app}.{level_str}'
+ msg.log(Color.colored('red', logmsg))
+
+ traits = Traits(
+ msgtype=msgtype,
+ level=level,
+ app=app,
+ cat=cat,
+ logmsg=logmsg,
+ )
+ super().__init__(msg=msg, proto='log', msgtype=msgtype, traits=traits)
+
+ def identify_entities(s, msg:Message, messages, my_idx):
+ app = msg.get_trait('log', 'app')
+ if app.startswith('Osmo'):
+ app = app[4:]
+ return Message.EntityIdent(proto='log', src_kind=app, dst_kind='LOG')
+
+
+class MessageFilter:
+ def __init__(s, layer=None, idx=None, values=[], negate=False):
+ set_instance_vars_from_args()
+
+ def matches(s, msg:Message):
+ r = s._matches(msg)
+ if s.negate:
+ return not r
+ return r
+
+ def _matches(s, msg:Message):
+ if s.layer and s.layer not in msg.layers:
+ return False
+ if s.idx and not (msg.p.idx == s.idx or any(a.p.idx == s.idx for a in msg.absorbed)):
+ return False
+ if s.values:
+ layers = s.layer or msg.layers.keys()
+ for k,v in s.values:
+ for proto, name, result in msg.get_traits(layers, k):
+ if v is None and name == k:
+ return True
+ if result is None or result == v:
+ return True
+ p_layers = None
+ if s.layer:
+ msg_layer = msg.layers.get(s.layer)
+ if msg_layer:
+ p_layers = [msg_layer.cap_p_name]
+ else:
+ p_layers = [s.layer]
+ else:
+ p_layers = [layer.cap_p_name for layer in msg.layers.values()]
+ for k,v in s.values:
+ for p_layer in p_layers:
+ p_val = msg.p.get(p_layer + '.' + k)
+ if p_val is None:
+ continue
+ if v is None:
+ return True
+ if v == str(p_val):
+ return True
+ return False
+ return True
+
+ @classmethod
+ def debug(cls, flt_list, msg:Message):
+ layers = set()
+ for flt in flt_list:
+ if flt.negate:
+ continue
+ if flt.layer is None:
+ layers.update(msg.layers.keys())
+ else:
+ layers.add(flt.layer)
+ for layer_name, layer in msg.layers.items():
+ if layer_name not in layers:
+ continue
+ LOG(dir_p(msg.p, layer.cap_p_name))
+
+ def __repr__(s):
+ t = []
+ if s.negate:
+ t.append('NOT')
+ if s.layer:
+ t.append(f'layer {s.layer!r}')
+ if s.idx:
+ t.append(f'packet nr {s.idx}')
+ if s.values:
+ t_v = [(f'{k} == {v!r}' if v is not None else f'has {k}') for k,v in s.values]
+ t.append('values: ' + (' or '.join(t_v)))
+ return ', '.join(t)
+
+ word_re = re.compile('^[a-zA-Z0-9_=-]*')
+ @classmethod
+ def parse(cls, spec_str):
+ if spec_str is None:
+ return []
+ if not spec_str:
+ return [MessageFilter()]
+ filters = []
+ token = None
+ try:
+ for token in spec_str.split(','):
+ negate = False
+ if token.startswith('!'):
+ token = token[1:]
+ negate = True
+ layer = cls.word_re.match(token).group()
+ flt = MessageFilter(layer=layer, negate=negate)
+ rest = cls.word_re.split(token)[1]
+ while rest:
+ char = rest[0]
+ rest = rest[1:]
+ word = cls.word_re.match(rest).group()
+ rest = cls.word_re.split(rest)[1]
+ if char == '#':
+ flt.idx = int(word)
+ elif char == '.':
+ if not flt.values:
+ flt.values = []
+ if '=' in word:
+ name = word[:word.index('=')]
+ val = word[word.index('=')+1:]
+ flt.values.append((name, val))
+ else:
+ flt.values.append((word, None))
+ else:
+ raise Exception('Unknown token: %r' % (char + word))
+ filters.append(flt)
+ return filters
+ except:
+ out_error('Some mistake in message filter: %r in token %r' % (spec_str, token))
+ raise
+
+ @classmethod
+ def match(cls, flt_list, msg):
+ any_match = False
+ for flt in flt_list:
+ r = flt.matches(msg)
+ if flt.negate and not r:
+ return False
+ any_match = any_match or r
+ return any_match
+
+
+ DOC = '''messagefilter examples, for --filter-msg and --debug:
+ dtap
+ All messages that contain a DTAP layer.
+ dtap.msgtype=Identity-Request
+ All DTAP msgtype=Identity-Request messages; for debug, show
+ dtap layer.
+ .msgtype=Identity-Request
+ All msgtype=Identity-Request messages; for debug, show all
+ layers. (Value names can be either parsed traits or raw packet
+ names.)
+ sccp.src_lref=0x00010000.dst_lref=0x00010000
+ All messages with an SCCP layer and either src_lref or dst_lref
+ == 0x00010000.
+ .imsi
+ All messages where any layer contains a value named 'imsi'.
+ '#123'
+ Message number 123 (don't forget to quote for the shell).
+ 'gsup,#614,sccp.src_lref=0x00010000'
+ Show all GSUP messages, packet number 614 and all SCCP with
+ given source local reference.
+ '!rsl.msgtype=CCCH-LOAD-INDication'
+ Don't show CCCH-LOAD-INDication message types (quote for the
+ shell).
+ '''
+
+class UI:
+ def __init__(s, opts, finalize_after_seconds=5):
+ set_instance_vars_from_args()
+
+ s.messages = []
+ s.finalized_idx = -1
+
+ s.show_traits = None
+ if s.opts.show_traits:
+ if s.opts.show_traits == 'all':
+ s.show_traits = True
+ else:
+ s.show_traits = s.opts.show_traits.split(',')
+ s.show_conns = None
+ if s.opts.show_conns:
+ if s.opts.show_conns == 'all':
+ s.show_conns = True
+ else:
+ s.show_conns = s.opts.show_conns.split(',')
+ s.filter_msg = MessageFilter.parse(s.opts.filter_msg)
+ for flt in s.filter_msg:
+ out_text_now('Filter-msg:', flt)
+ s.debug = MessageFilter.parse(s.opts.debug)
+ for dbg in s.debug:
+ out_text_now('Debug:', dbg)
+
+ s.filter_subscr = []
+ if s.opts.filter_subscr:
+ tokens = s.opts.filter_subscr.split(',')
+ names = ('imsi', '0x', 'imei', 'msisdn', 'tmsi')
+ for token in tokens:
+ handled = False
+ for name in names:
+ if token.startswith(name):
+ token_val = token[len(name):]
+ if not token_val.isdigit():
+ continue
+ s.filter_subscr.append(name + token[len(name):])
+ handled = True
+ break
+ if not handled:
+ s.filter_subscr.append(token)
+ s.filter_subscr.extend([name + token for name in names])
+
+ def out_text_now(s, *args, **kwargs):
+ 'to be implemented by child class'
+ assert False
+
+ def out_error(s, *args, **kwargs):
+ 'to be implemented by child class'
+ assert False
+
+ def msg_finalized(s, msg, apply_filter=True):
+ 'to be implemented by child class'
+ assert False
+
+ def flush_msg(s, msg):
+ msg.finalized = True
+ s.msg_finalized(msg)
+
+ def msg_filter(s, msg):
+ '''Return True when the message passes active message filtering, False if it should be hidden'''
+ if s.filter_msg and not MessageFilter.match(s.filter_msg, msg):
+ return False
+ if all(l.minor for l in msg.layers.values()):
+ return False
+ if msg.hide:
+ return False
+ if s.filter_subscr:
+ match = False
+ match_vals = set()
+ for subscr in msg.related_subscribers():
+ match_vals.update((f'imsi{subscr.imsi}', f'imei{subscr.imei}', f'msisdn{subscr.msisdn}'))
+ match_vals.update(subscr.tmsis)
+ match_vals.update(f'tmsi{tmsi[2:]}' for tmsi in subscr.tmsis)
+ match_vals.update(f'tmsi{tmsi}' for tmsi in subscr.tmsis)
+
+ if not any(token in match_vals for token in s.filter_subscr):
+ return False
+ return True
+
+ def flush(s, timestamp_now=0, finalize_after_seconds=0):
+ flush_t = timestamp_now - finalize_after_seconds
+ for i in range(s.finalized_idx+1, len(s.messages)):
+ msg = s.messages[i]
+ if not msg:
+ continue
+ if timestamp_now and msg.timestamp > flush_t:
+ break
+ s.finalized_idx = i
+ s.flush_msg(msg)
+
+ def start(s):
+ pass
+
+ def stop(s):
+ pass
+
+ def add_msg(s, msg):
+ global g_current_msg
+ s.flush(msg.timestamp, s.finalize_after_seconds)
+ try:
+ if s.debug and any(dbg_filter.matches(msg) for dbg_filter in s.debug):
+ msg.debug = True
+ g_current_msg = msg
+ if not msg.layers:
+ return
+ s.messages.append(msg)
+ idx = len(s.messages) - 1
+ changed_msg = msg.collapse(s.messages, idx)
+ # if the received message was absorbed by another, continue to identify the modified message using the
+ # new index
+ if changed_msg is not None and changed_msg is not msg:
+ msg = changed_msg
+ idx = s.messages.index(msg)
+ msg.identify_entities(s.messages, idx)
+ msg.identify_conns(s.messages, idx)
+ Subscriber.identify_subscriber(msg)
+
+ except:
+ s.flush()
+
+ out_error('Exception')
+ raise
+
+ def process_messages(s, msg_src):
+ for msg in msg_src.next():
+ if 0 and msg.p.idx in (3676,):
+ msg.log('rsl all_fields ', msg.p.cap_p.gsm_abis_rsl._all_fields)
+ msg.log(msg.p.all_str('gsm_abis_rsl'))
+
+ s.add_msg(msg)
+ s.flush()
+ msg_src.done()
+
+class UI_Plain(UI):
+ def out_text_now(s, *args, **kwargs):
+ print(to_text(*args, **kwargs))
+
+ def out_error(s, *args, **kwargs):
+ s.out_text_now(Color.colored('red', '*** ERROR:'), *args, **kwargs)
+ if g_current_msg:
+ s.out_text_now(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(show_traits=True, show_conns=True))
+ s.out_text_now(trace())
+
+ def msg_print(s, msg, apply_filter=True):
+ if apply_filter and not s.msg_filter(msg):
+ return
+ s.out_text_now(msg.str(ladder=True, one_column_per_kind=True, show_traits=s.show_traits, show_conns=s.show_conns))
+ if s.debug and MessageFilter.match(s.debug, msg):
+ MessageFilter.debug(s.debug, msg)
+
+ def msg_finalized(s, msg):
+ s.msg_print(msg, apply_filter=True)
+
+class UI_Quiet(UI_Plain):
+ def msg_finalized(s, msg):
+ pass
+
+class UI_Curses(UI):
+ pass
+
+class MsgSource:
+ def __init__(s, opts):
+ set_instance_vars_from_args()
+ s.start_t = None
+ s.p_min_t = None
+ s.p_max_t = None
+ s.end_t = None
+
+ def _next_cap_p(s):
+ assert False
+
+ def next(s) -> Message:
+ p_idx = 0
+ s.start_t = time.time()
+ warn_t = s.start_t
+ warn_p_t = None
+ for cap_p in s._next_cap_p():
+ p_idx += 1
+ if p_idx < s.opts.packet_start:
+ continue
+ if s.opts.packet_count and (p_idx - s.opts.packet_start) > s.opts.packet_count:
+ break
+ if s.opts.packet_end and p_idx > s.opts.packet_end:
+ break
+ msg = Message.parse(Packet(p_idx, cap_p))
+ if msg is None or not msg.layers:
+ continue
+ s.p_min_t = msg.timestamp if s.p_min_t is None else min(s.p_min_t, msg.timestamp)
+ s.p_max_t = msg.timestamp if s.p_max_t is None else max(s.p_max_t, msg.timestamp)
+
+ now = time.time()
+ if warn_p_t is None or now > warn_t + 3:
+ if warn_p_t:
+ packet_time = s.p_max_t - warn_p_t
+ real_time = now - warn_t
+ if real_time > (1.3 * packet_time):
+ out_text_now(f'! taking longer to calculate than packets arrive by {100.*(real_time - packet_time)/packet_time:.1f}%')
+ warn_t = now
+ warn_p_t = s.p_max_t
+
+ yield msg
+
+ def done(s):
+ if s.start_t is None or s.p_min_t is None:
+ out_text_now('Nothing processed.')
+ s.end_t = time.time()
+ out_text_now(f'packet time: {s.p_max_t - s.p_min_t:.1f}s in real time: {s.end_t - s.start_t:.1f}s')
+
+class MsgSource_File(MsgSource):
+ def __init__(s, path, opts):
+ set_instance_vars_from_args()
+ super().__init__(opts)
+
+ def _next_cap_p(s):
+ for cap_p in pyshark.FileCapture(s.path):
+ yield cap_p
+
+class MsgSource_Live(MsgSource):
+ def __init__(s, iface, opts):
+ set_instance_vars_from_args()
+ super().__init__(opts)
+
+ def _next_cap_p(s):
+ for cap_p in pyshark.LiveCapture(s.iface).sniff_continuously():
+ yield cap_p
+
+class MsgSource_Pipe(MsgSource):
+ def __init__(s, opts):
+ super().__init__(opts)
+
+ def _next_cap_p(s):
+ from pyshark.capture.pipe_capture import PipeCapture
+ for cap_p in PipeCapture(sys.stdin):
+ yield cap_p
+
+def run_tests():
+ def out_test(*args, **kwargs):
+ print(*args, **kwargs)
+
+ d = dddict()
+ d.sset(('a', 'b', 'c'), 'abc')
+ d.sset(('a', 'b', 'd'), 'abd')
+ out_test(d)
+ assert d == {'a': {'b': {'c': 'abc', 'd': 'abd'}}}
+ def verify_gget(keys, expect):
+ val = d.gget(keys)
+ out_test('gget:', keys,'=',val)
+ assert val == expect
+ verify_gget(('a', 'b', 'c'), 'abc')
+ verify_gget(('a', 'b', 'd'), 'abd')
+ verify_gget(('a', 'b', 'x'), None)
+ verify_gget(('a', 'b'), {'c': 'abc', 'd': 'abd'})
+ verify_gget(('a',), {'b': {'c': 'abc', 'd': 'abd'}})
+ verify_gget(('x',), None)
+ def verify_ppop(keys, expect):
+ try:
+ val = d.ppop(keys)
+ assert expect is not None
+ except KeyError:
+ assert expect is None
+ out_test('ppop:', keys,'=',None)
+ return
+ out_test('ppop:', keys,'=',val)
+ assert val == expect
+ assert d.gget(keys) is None
+ verify_ppop(('a', 'b', 'c'), 'abc')
+ verify_ppop(('a', 'b', 'd'), 'abd')
+ verify_ppop(('a', 'b', 'x'), None)
+ d.sset(('a', 'b', 'c'), 'abc')
+ d.sset(('a', 'b', 'd'), 'abd')
+ verify_ppop(('a', 'b'), {'c': 'abc', 'd': 'abd'})
+ d.sset(('a', 'b', 'c'), 'abc')
+ d.sset(('a', 'b', 'd'), 'abd')
+ verify_ppop(('a',), {'b': {'c': 'abc', 'd': 'abd'}})
+ verify_ppop(('x',), None)
+
+SUBSCRIBERFILTER_DOC = '''subscriberfilter examples, for --filter-subscr:
+ 123
+ Show subscriber where any value matches 123 (probably only MSISDN will
+ match, because '123' is too short for IMSI etc).
+ imsi123456789012345
+ imei123456789012345
+ msisdn123456789012345
+ tmsi1234abcd
+ Show subscriber with the given IMSI/IMEI/MSISDN/TMSI.
+ imsi123456,imsi987654
+ Show both these IMSIs.
+ imsi123456,msisdn123,1234abcd,imei987654
+ Show all of these subscribers: IMSI 123456, MSISDN 123, TMSI 0x1234abcd
+ and IMEI 987654.
+'''
+def parse_args():
+ import argparse
+ parser = argparse.ArgumentParser(description=__doc__
+ + '\n' + SUBSCRIBERFILTER_DOC
+ +'\n' + MessageFilter.DOC,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument('--pcap-file', '-f', metavar='file')
+ parser.add_argument('--live-capture', '-l', metavar='interface')
+ parser.add_argument('--stdin-capture', '-i', action='store_true')
+ parser.add_argument('--packet-start', '-S', default=0, type=int)
+ parser.add_argument('--packet-count', '-C', default=0, type=int)
+ parser.add_argument('--packet-end', '-E', default=0, type=int)
+ parser.add_argument('--ui', '-u', metavar='USER-INTERFACE',
+ help='How to display messages: plain / p = print to stdout as plain-text;'
+ ' curses / c = interactive curses interface; none / n = quiet',
+ default='plain')
+ parser.add_argument('--filter-subscr', default=None,
+ help='Show only messages related to the given subscriberfilter')
+ parser.add_argument('--filter-msg', default=None, metavar='messagefilter',
+ help='Show only messages matching this messagefilter')
+ parser.add_argument('--show-traits', default=None)
+ parser.add_argument('--show-conns', default=None, help="'all' for all, or specific conn names (comma separated)")
+ parser.add_argument('--collapse-stp', '-s', default=None, help="BSSAP via STP may appear duplicated. '-s1' collapses the duplicates, -s0 does not.")
+ parser.add_argument('--test', action='store_true')
+ parser.add_argument('--debug', metavar='messagefilter',
+ help='Show a lot more info on messages matching this messagefilter')
+ return parser.parse_args()
+
+def main():
+ opts = parse_args()
+
+ if opts.test:
+ run_tests()
+ else:
+ if (opts.collapse_stp or '').upper() in ['1', 'Y', 'YES', 'TRUE']:
+ SCCP_COLLAPSE_STP = True
+ if (opts.collapse_stp or '').upper() in ['0', 'N', 'NO', 'FALSE']:
+ SCCP_COLLAPSE_STP = False
+ ui_class = None
+ ui_type = opts.ui or 'none'
+ if 'plain'.startswith(ui_type):
+ ui_class = UI_Plain
+ elif 'curses'.startswith(ui_type):
+ ui_class = UI_Curses
+ elif 'none'.startswith(ui_type):
+ ui_class = UI_Quiet
+ else:
+ ERR('Unknown UI type:', repr(ui_type))
+ return 1
+ g_ui = ui_class(opts)
+ msg_source = None
+ if opts.pcap_file:
+ msg_source = MsgSource_File(opts.pcap_file, opts)
+ elif opts.live_capture:
+ msg_source = MsgSource_Live(opts.live_capture, opts)
+ elif opts.stdin_capture:
+ msg_source = MsgSource_Pipe(opts)
+ else:
+ ERR('No message source, try `-l any` or `-f my.pcap`')
+ return 1
+ g_ui.process_messages(msg_source)
+ g_ui.flush()
+ if Conn.open_conns:
+ print('still open conns:', repr(Conn.open_conns))
+ return 0
+
+if __name__ == '__main__':
+ if False:
+ import cProfile
+ cProfile.run('main()', sort='tottime')
+ else:
+ exit(main())
+# vim: noexpandtab tabstop=8 shiftwidth=8