#!/usr/bin/env python3 # osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber. # Copyright (C) 2019 by Neels Hofmeyr # # All Rights Reserved # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. '''osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber. Copyright (C) 2019 by Neels Hofmeyr SPDX-License-Identifier: GPL-2.0+ This tool uses tshark (pyshark) to analyze a pcap file or a live network capture to: - Associate almost all messages with a subscriber. It is possible to filter by subscriber. - Separate the different network elements (BSC, MSC, hNodeB, ...). - Output a ladder diagram. - Combine repetitive messages. - Combine/abstract messages into short activity summary. Examples: osmo-gsm-shark -f trace.pcapng osmo-gsm-shark -l any osmo-gsm-shark -l any --filter-subscr 901701234567123 osmo-gsm-shark -f trace.pcapng --filter-msg dtap ''' import collections import pyshark import re import sys import types import time import traceback SHOW_ALL_DEBUG = False SHOW_ALL_LAYERS = False SCCP_COLLAPSE_STP = True IUH_COLLAPSE_HNBGW = True # doesnt work DTAP_COMPL_L3 = ('Location-Updating-Request', 'CM-Service-Request', 'Paging-Response', 'IMSI-Detach-Indication') GMM_COMPL_L3 = ('Attach-Request', 'Detach-Request') class Color: codes = ( ('red', '\033[1;31m'), ('green', '\033[1;32m'), ('yellow', '\033[1;33m'), ('blue', '\033[1;34m'), ('purple', '\033[1;35m'), ('cyan', '\033[1;36m'), ('darkred', '\033[31m'), ('darkgreen', '\033[32m'), ('darkyellow', '\033[33m'), ('darkblue', '\033[34m'), ('darkpurple', '\033[35m'), ('darkcyan', '\033[36m'), ('darkgrey', '\033[1;30m'), ('brightwhite', '\033[1;37m'), ) codes_dict = dict(codes) end = '\033[0;m' def colored(code, text): if type(code) is int: code = Color.codes[code % len(Color.codes)][1] else: code = Color.codes_dict[code] return f'{code}{text}{Color.end}' def set_instance_vars_from_args(*ignore, self='s'): f = sys._getframe(1).f_locals s = f.get(self) for k,v in f.items(): if v is s: continue if k in ignore: continue setattr(s, k, v) def same_nonempty(a, b): if isinstance(a, types.GeneratorType): return list(a) == list(b) return a and a == b def str_drop(a_str, drop_str): if a_str and a_str.startswith(drop_str): return a_str[len(drop_str):] return a_str def sane_msgtype(msgtype): if not msgtype: return msgtype return sane_showname(msgtype).replace(' ','-') re_msgtype_label = re.compile('message.type *', re.I) def sane_showname(showname): if not showname: return showname if ': ' in showname: showname = showname[showname.index(': ')+1:] if '(' in showname: showname = showname[:showname.index('(')] showname = re_msgtype_label.sub('', showname) return showname.strip() def dir_vals(elem): strs = [] for name in dir(elem): if name.startswith('_'): continue try: strs.append('%r=%r' % (name, getattr(elem, name))) except: strs.append('%r=' % (name)) return '\n' + '\n'.join(strs) def dir_p(p, name): return f'=== {name}\n{dir_vals(p.get(name))}\n---{name}' def str_to_int(nr_str): 'convert hex or decimal string to int' if not nr_str: return None elif nr_str.startswith('0x'): return int(nr_str, 16) else: return int(nr_str, 10) def out_text(*args, **kwargs): print(*args, **kwargs) g_ui = None g_current_msg = None g_debug_full = False def to_text(*args, **kwargs): if kwargs: args = list(args) + [repr(kwargs)] return ' '.join(str(arg) for arg in args) def out_text_now(*args, **kwargs): if g_ui is not None: g_ui.out_text_now(*args, **kwargs) else: print(to_text(*args, **kwargs)) def LOG(*args, **kwargs): if g_current_msg is not None: g_current_msg.log(*args, **kwargs) else: out_text_now(*args, **kwargs) def DBG(*args, **kwargs): if g_current_msg is not None: g_current_msg.dbg(*args, **kwargs) else: out_text_now(*args, **kwargs) def ERR(*args, **kwargs): LOG(Color.colored('red', '***** ERROR:'), *args, **kwargs) def trace(): return ''.join(traceback.format_stack()) def because_str(because): if not because: return '-' t = ['BECAUSE'] for b in because: if isinstance(b, tuple) or isinstance(b, list): t.extend(because_str(b).splitlines()) elif isinstance(b, str): t.append(b) elif isinstance(b, Message): t.append(b.str(show_traits=True, show_conns=True)) else: t.append(str(b)) return '\n|'.join(t) def out_error(*args, **kwargs): if g_ui is not None: g_ui.out_error(*args, **kwargs) else: out_text_now(Color.colored('red', '*** ERROR:'), *args, **kwargs) if g_current_msg: out_text_now(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(show_traits=True, show_conns=True)) out_text_now(trace()) def tmsi_standardize(tmsi): try: return format(int(tmsi), '08x') except: return None # a dict containing a list for each key; l.add(name, item) adds item to the list at key=name. class listdict(dict): '''A dict where each entry is a list of items''' def _have(ld, name): l = ld.get(name) if not l: l = [] ld[name] = l return l def add(ld, name, item): l = ld._have(name) l.append(item) return ld def add_dict(ld, d): for k,v in d.items(): ld.add(k, v) def update(ld, other_ld): for name, items in other_ld.items(): ld.extend(name, items) return ld def extend(ld, name, vals): l = ld._have(name) l.extend(vals) return ld def remove(ld, name, item): l = ld.get(name) if item in l: l.remove(item) def _have(ld, name): l = ld.get(name) if not l: l = [] ld[name] = l return l def add(ld, name, item): l = ld._have(name) l.append(item) return ld def add_dict(ld, d): for k,v in d.items(): ld.add(k, v) def update(ld, other_ld): for name, items in other_ld.items(): ld.extend(name, items) return ld def extend(ld, name, vals): l = ld._have(name) l.extend(vals) return ld def remove(ld, name, item): l = ld.get(name) if item in l: l.remove(item) class UniqueList(list): def append(s, item): if item in s or item is None: return False super().append(item) return True def insert(s, idx, item): if item in s or item is None: return False super().insert(idx, item) return True def extend(s, items): added = 0 for item in items: if s.append(item): added += 1 return added class NamedIds(dict): def __init__(s, start_id:int=1): set_instance_vars_from_args() def next(s, name): next_id = s.get(name, s.start_id) s[name] = next_id + 1 return next_id class Packet: def __init__(s, idx, cap_p): set_instance_vars_from_args() # sanitize impossible attr with dot in its name, # seen gsm_a.bssmap and gsm_a.dtap for name in dir(s.cap_p): if '.' in name: new_name = name.replace('.', '_') elif not name: new_name = 'unnamed' else: continue setattr(s.cap_p, new_name, getattr(s.cap_p, name)) @classmethod def pget(cls, cap_p, tokens, ifnone=None): if cap_p is None or len(tokens) < 1: return ifnone p_field = getattr(cap_p, tokens[0], None) if p_field is None: p_field = getattr(cap_p, '.'.join(tokens), None) if p_field is None: return ifnone if len(tokens) > 1: return Packet.pget(p_field, tokens[1:], ifnone=ifnone) return p_field def get(s, field, ifnone=None): return Packet.pget(s.cap_p, field.split('.'), ifnone=ifnone) def str(s, elem_name=None): strs = [] if elem_name: elem = s.get(elem_name) else: elem = s.cap_p return dir_vals(elem); def field_names(s, elem_name=None, elem=None): strs = ['', f'=== {elem_name} ==='] if elem is None: elem = s.get(elem_name) if not elem: strs.append('None') else: for f in elem._get_all_fields_with_alternates(): for n in dir(f): if n.startswith('_'): continue strs.append('%r=%r' % (n, getattr(f, n))) return '\n'.join(strs) def all_str(s, elem_name=None, elem=None, depth=1000): strs = [] if elem is None: if elem_name: elem = s.get(elem_name) else: elem = s.cap_p elem_name = '/' strs.append('%s:' % elem_name) for name in dir(elem): if name.startswith('_') or name.endswith('_value') or name in [ 'sort', 'reverse', 'remove', 'pop', 'insert', 'index', 'extend', 'count', 'copy', 'clear', 'append', 'zfill', 'max', 'min', 'resolution', ]: continue try: full_name = '%s.%s' % (elem_name, name) val = getattr(elem, name) if callable(val) or name in ['base16_value']: continue strs.append('%r=%r' % (full_name, val)) if depth and type(val) not in [int, str]: strs.append(s.all_str(full_name, val, depth-1)) except: pass if hasattr(elem, '_get_all_fields_with_alternates'): for f in elem._get_all_fields_with_alternates(): for n in dir(f): if n.startswith('_'): continue full_name = '%r[%r]' % (elem_name, n) try: val = getattr(f, n) except: continue if callable(val): continue strs.append('%s=%r' % (full_name, val)) if depth: strs.append(s.all_str(full_name, val, depth-1)) return '\n'.join(strs) class IpPort: all_ports = {} @classmethod def _key(cls, ip, port): return f'{ip}:{port}' @classmethod def get(cls, ip, port, entity=None, proto=None, create=True): o = IpPort.all_ports.get(IpPort._key(ip, port)) if o is None and create: return IpPort(ip, port, entity, proto) if o is not None: if entity is not None: if o.entity is not None and o.entity is not entity: ERR('Port changes:', o, 'to', entity) entity.add_port(o.proto, o) return o def __init__(s, ip:str=None, port:str=None, entity=None, proto=None): set_instance_vars_from_args() IpPort.all_ports[s.key()] = s def __repr__(s): r = '' tokens = [] if s.entity: tokens.append(f'{s.entity}') if s.proto: tokens.append(f'{s.proto}') tokens.append(s.key()) return '-'.join(tokens) def __eq__(s, other): return s is other def __hash__(s): return hash(s.key()) def key(s): return IpPort._key(s.ip, s.port) @classmethod def from_sdp(p:Packet): ip = p.get('sdp.connection_info_address') port = p.get('sdp.media_port') return IpPort.get(ip, port) @classmethod def _from_ip(cls, p:Packet, src_dst:str, port:int): ip = p.get('ip.' + src_dst) if ip is None: ipv6 = p.get('ipv6.' + src_dst) if ipv6 is not None: ip = '[' + ipv6 + ']' return IpPort.get(ip, port) @classmethod def from_ip_source(cls, p:Packet, port:int): return cls._from_ip(p, 'src', port) @classmethod def from_ip_dest(cls, p:Packet, port:int): return cls._from_ip(p, 'dst', port) @classmethod def from_udp_source(cls, p:Packet): return cls.from_ip_source(p, p.get('udp.srcport')) @classmethod def from_udp_dest(cls, p:Packet): return cls.from_ip_dest(p, p.get('udp.dstport')) @classmethod def from_tcp_source(cls, p:Packet): return cls.from_ip_source(p, p.get('tcp.srcport')) @classmethod def from_tcp_dest(cls, p:Packet): return cls.from_ip_dest(p, p.get('tcp.dstport')) @classmethod def from_sctp_source(cls, p:Packet): return cls.from_ip_source(p, p.get('sctp.srcport')) @classmethod def from_sctp_dest(cls, p:Packet): return cls.from_ip_dest(p, p.get('sctp.dstport')) class Message: pass class Trait: def __init__(s, **kwargs): if len(kwargs) > 1: raise Exception('only one trait allowed per Trait(): %r' % kwargs) for k, v in kwargs.items(): s.name = k s.val = v def __repr__(s): return '%r=%r' % (s.name, s.val) class Traits(collections.OrderedDict): def __init__(s, *args, **kwargs): for arg in args: s.add(arg) s.set_vals(**kwargs) def add(s, trait:Trait): s[trait.name] = trait def set(s, name, val): if val is not None: s.add(Trait(**{name: val})) def set_vals(s, **kwargs): for k,v in kwargs.items(): if v is None: continue if type(v) not in (str, int, float, bool, IpPort): v = str(v) s.set(k, v) def __repr__(s): strs = [] for k,trait in s.items(): assert k == trait.name strs.append(repr(trait)) return '{%s}' % ', '.join(strs) def find_recent_msg(msg:Message, messages:list, my_idx:int, condition_func, max_t=1): for i in reversed(range(my_idx)): prev_msg = messages[i] if not prev_msg: continue if prev_msg.finalized: return None if msg.timestamp - prev_msg.timestamp > max_t: return None if condition_func(prev_msg): yield prev_msg return None def find_same_trait(msg:Message, messages:list, my_idx:int, proto:str, name:str, max_t=1, operator=any): def same_traits(prev_msg): return msg.same_traits(prev_msg, proto, name, operator=operator) return find_recent_msg(msg, messages, my_idx, same_traits, max_t) class dddict(dict): @classmethod def _get(cls, d, keys, create=False): if not keys: return d k = keys[0] v = d.get(k) if len(keys) > 0: if v is None: if create: v = {} d[k] = v else: return None r = cls._get(v, keys[1:], create=create) return r def gget(s, keys, create=None): v = dddict._get(s, keys, create=False) if v is None: if create is None: return None else: return s.sset(keys, create) return v def sset(s, keys, item): d = dddict._get(s, keys[:-1], create=True) d[keys[-1]] = item return item def ppop(s, keys): d = dddict._get(s, keys[:-1]) if d is None: return None r = d.pop(keys[-1]) if not d: if len(keys) > 1: s.ppop(keys[:-1]) return r @classmethod def _count(cls, d): if isinstance(d, dict): count = 0 for val in d.values(): count += cls._count(val) return count return 1 def count(s): return dddict._count(s) @classmethod def _all(cls, d): if isinstance(d, dict): for val in d.values(): yield from cls._all(val) else: yield d def all(s): return dddict._all(s) class Conn: open_conns = dddict() closed_conns = dddict() '''One end of a time-limited connection for a protocol layer''' def __init__(s, proto:str, port:IpPort, conn_id:str, start_msg:Message, close_msg:Message=None, idx=-1, entity=None, counterparts:list=[], subscriber_conn=None, merge_counterparts=True, add_message=True, meta=False): set_instance_vars_from_args('entity', 'add_message', 'merge_counterparts') s.tx_messages = UniqueList() s.rx_messages = UniqueList() if s.subscriber_conn: s.subscriber_conn.conns.append(s) s.keys = (proto, port.key(), conn_id) Conn.open_conns.sset(s.keys, s) # A counterpart is the same conn seen from the other side. # For example, if a BSC opens a conn, the conterpart is the MSC's perspective on the same conn. s.counterparts = UniqueList() for cp in counterparts: if cp is None: continue s.counterparts.append(cp) cp.counterparts.append(s) if entity: entity.add_port(proto, port, from_msg=start_msg) if add_message: s.add_message(start_msg) if not counterparts: LOG(Color.colored('green', f'New conn (now {Conn.count_open_conns()})'), s.proto, '%r' % s.conn_id) if merge_counterparts: for other_conn in s.counterparts: s.merge_subscr_conns(other_conn) @classmethod def _find(cls, keys, find_in_closed_conns=False): ret = Conn.open_conns.gget(keys) if ret is not None: return ret if find_in_closed_conns: return Conn.closed_conns.gget(keys) return None @classmethod def find(cls, proto:str, port:IpPort, conn_id:str, find_in_closed_conns=False): return cls._find((proto, port.key(), conn_id), find_in_closed_conns=find_in_closed_conns) @classmethod def open(cls, proto:str, port:IpPort, conn_id:str, *args, **kwargs): exists = cls.find(proto, port, conn_id) if exists is not None: ERR('Conn already open:', type(exists), repr(exists)) LOG(trace()) return Conn(proto, port, conn_id, *args, **kwargs) @classmethod def find_or_open(cls, proto:str, port:IpPort, conn_id:str, *args, **kwargs): exists = cls.find(proto, port, conn_id) if exists is not None: return exists return Conn(proto, port, conn_id, *args, **kwargs) @classmethod def open_2way(cls, proto:str, conn_id:str, msg:Message, *args, **kwargs): conn1 = cls.open(proto, msg.src, conn_id, msg, *args, **kwargs) cls.open(proto, msg.dst, conn_id, msg, *args, counterparts=[conn1], **kwargs) return conn1 @classmethod def close_conn(cls, conn, msg): conn._add_message(msg) conn.close_msg = msg Conn.open_conns.ppop(conn.keys) l = Conn.closed_conns.gget(conn.keys, create=[]) l.append(conn) LOG(Color.colored('blue', f'Close conn ({Conn.count_open_conns()} left)'), conn) @classmethod def close(cls, proto, port, conn_id, close_msg, close_counterparts=True, if_exists=False): conn = Conn.find(proto, port, conn_id) if conn is None or not conn.is_open(): if if_exists == False: ERR('Cannot close, conn not open:', Conn.key_label(proto, port, conn_id)) return None Conn.close_conn(conn, close_msg) assert Conn.find(proto, port, conn_id) is None if close_counterparts: for cp in conn.counterparts: if cp.is_open(): Conn.close_conn(cp, close_msg) return conn @classmethod def message(cls, proto, port, conn_id, msg): conn = cls.find(proto, port, conn_id) if conn is None: return None conn.add_message(msg) return conn def _add_message(s, msg): if s.port == msg.src: if s.meta: msg.meta_conns.append(s) else: msg.src_conns.append(s) s.tx_messages.append(msg) elif s.port == msg.dst: if s.meta: msg.meta_conns.append(s) else: msg.dst_conns.append(s) s.rx_messages.append(msg) def add_message(s, msg, add_to_counterparts=True): if s.close_msg: out_error('Message on a closed conn', s, msg) s._add_message(msg) if add_to_counterparts: for cp in s.counterparts: cp._add_message(msg) @classmethod def key_label(cls, proto, port, conn_id): if conn_id: return f'{proto}:{conn_id}@{port}' else: return f'{proto}@{port}' def label(s): return s.key_label(s.proto, s.port, s.conn_id) def __repr__(s): tokens = [s.label()] for r in s.counterparts: if r is None: tokens.append('None') else: tokens.append(r.label()) return ' -> '.join(tokens) def merge_subscr_conns(s, other_conn): if other_conn is None: return if not isinstance(other_conn, Conn): for item in other_conn: s.merge_subscr_conns(item) return assert isinstance(other_conn, Conn) if s.subscriber_conn is not None and s.subscriber_conn is other_conn.subscriber_conn: return s.subscriber_conn = SubscriberConn.merge(s.subscriber_conn, other_conn.subscriber_conn) if s.subscriber_conn is None: s.subscriber_conn = SubscriberConn() s.subscriber_conn.add_conn(s) other_conn.subscriber_conn = s.subscriber_conn s.subscriber_conn.add_conn(other_conn) def find_entity(s, kind, with_port=None): if s.subscriber_conn: return s.subscriber_conn.find_entity(kind, with_port=with_port) return None, None def is_open(s): return s.close_msg is None and Conn._find(s.keys) is s @classmethod def count_open_conns(cls): count = 0 have = set() for conn in cls.open_conns.all(): do_count = True for conn2 in conn.counterparts: if conn2.label() in have: do_count = False have.add(conn2.label()) if conn.label() in have: do_count = False have.add(conn.label()) if do_count: count += 1 return count class Layer: _classes = {} traits = None proto = None cap_p_name = None def __init__(s, msg:Message, proto:str, msgtype:str, traits:Traits, minor=False, hidden=False, cap_p_name:str=None): set_instance_vars_from_args() if proto in msg.layers: raise Exception(f'duplicate proto {proto} for message') msg.layers[proto] = s s.msgtype = sane_msgtype(s.msgtype) if not s.cap_p_name: s.cap_p_name = s.proto s.__class__.proto = s.proto s.__class__.cap_p_name = s.cap_p_name def label(s): if s.msgtype: return f'{s.proto}.{s.msgtype}' else: return s.proto def identify_entities(s, msg:Message, messages, my_idx): '''return a list of Message.EntityIdent instances describing source and/or destination entity that message msg identifies.''' return None @classmethod def identify_conns(s, messages, my_idx): pass def collapse(s, messages, my_idx): '''return the message itself if it remains in messages, if another absorbed it return the other message, or if if it is dropped completely return None''' return messages[my_idx] @classmethod def classes(cls): if not Layer._classes: for cls in Layer.__subclasses__(): name = cls.__name__ if not name.startswith('Layer_'): continue proto_name = name[len('Layer_'):] Layer._classes[proto_name] = cls #cls.proto = proto_name return Layer._classes @classmethod def parse(cls, msg:Message): for proto_name,child_class in Layer.classes().items(): if not msg.p.get(proto_name): continue child_class(msg) class Message: def __init__(s, p:Packet, finalized=False): set_instance_vars_from_args() s.layers = collections.OrderedDict() s.count = 1 s.count_back = 0 s.timestamp = float(p.cap_p.sniff_timestamp) s.hide = False s.src = None s.dst = None s.src_conns = [] s.dst_conns = [] s.meta_conns = [] s.absorbed = UniqueList() s.strong_relation = True s.debug = SHOW_ALL_DEBUG s._log = [] def log(s, *text, **kwtext): s._log.append(to_text(*text, *kwtext)) def dbg(s, *text, **kwtext): if s.debug: s._log.append(to_text(*text, *kwtext)) def is_minor(s): return all(l.minor for l in s.layers.values()) def get_trait(s, proto:str, name:str, ifnone=None): # allow alternative lists for proto, like s.get_trait(('tcp', 'udp'), 'src') if proto is None: proto = s.layers.keys() if type(proto) is not str: for proto_ in proto: val = s.get_trait(proto_, name, None) if val is not None: return val return ifnone # allow alternative lists for name, like s.get_trait('tcp', ('src', 'dst)) if type(name) is not str: for name_ in name: val = s.get_trait(proto, name_, None) if val is not None: return val return ifnone if name == 'timestamp': return int(s.timestamp) layer = s.layers.get(proto, None) if not layer: return ifnone if name == 'msgtype': return layer.msgtype trait = layer.traits.get(name, None) if trait is None: return ifnone if trait.val is None: return ifnone return trait.val def get_traits(s, proto=None, names=None, proto_and_names=None): pn = [] if proto or names: if proto is None or isinstance(proto, str): proto = [proto] for p in proto: pn.append((p, names)) if proto_and_names: pn.extend(proto_and_names) for proto_, names in pn: if proto_ is None: proto_ = s.layers.keys() if isinstance(proto_, str): proto_ = [proto_] for proto in proto_: if names is None: l = s.layers.get(proto, None) if not l: continue names = l.traits.keys() if type(names) is str: names = [names] for name in names: result = s.get_trait(proto, name, ifnone=None) if result is not None: yield (proto, name, result) def get_all_traits(s, proto:str): layer = s.layers.get(proto) if not layer: return {} return layer.traits def same_traits(s, other_msg, proto:str, name:str, allow_unset=False, operator=all): if type(proto) is not str: return operator( s.same_traits(other_msg, proto_, name, allow_unset=allow_unset) for proto_ in proto ) if name is None: my_traits = s.get_all_traits(proto) other_traits = other_msg.get_all_traits(proto) names = set(my_traits.keys()) names.update(other_traits.keys()) name = list(names) if type(name) is not str: return operator( s.same_traits(other_msg, proto, name_, allow_unset=allow_unset) for name_ in name ) val = s.get_trait(proto, name) other_val = other_msg.get_trait(proto, name) if not allow_unset: if val is None or other_val is None: return False return val == other_val def set_trait(s, proto, name, val): layer = s.layers.get(proto, None) if layer is None: layer = Layer(s, proto, None, Traits(Trait(name, val))) else: layer.traits.set(name, val) def collapse(s, messages, my_idx): '''iterate backwards over recent messages and see if messages can be combined''' orig_msg = messages[my_idx] for layer in s.layers.values(): msg = layer.collapse(messages, my_idx) if orig_msg is not msg: break return msg def absorb_msg(s, other_msg): global g_current_msg if g_current_msg is other_msg: g_current_msg = s s._log.extend(other_msg._log) if other_msg and other_msg is not s: s.absorbed.append(other_msg) if other_msg.absorbed: other_absorbed = other_msg.absorbed other_msg.absorbed = [] for oa in other_absorbed: s.absorb_msg(oa) def identify_conns(s, messages, my_idx): msg = messages[my_idx] for layer_class in Layer.classes().values(): if layer_class.proto not in msg.layers: continue layer_class.identify_conns(messages, my_idx) class EntityIdent: def __init__(s, proto=None, src_port=None, src_kind=None, src_entity=None, dst_port=None, dst_kind=None, dst_entity=None, rename=False): set_instance_vars_from_args() def identify_entities(s, messages, my_idx): '''From protocol and message discriminators, see if we can identify the src and dst port of the message to be of a specific core network entity.''' for layer in s.layers.values(): try: identifieds = layer.identify_entities(s, messages, my_idx) if identifieds is None: continue if isinstance(identifieds, Message.EntityIdent): identifieds = [identifieds] for ident in identifieds: if ident is None: continue Entity.find_or_create(ident.proto, ident.src_kind, ident.src_port or s.src, ident.src_entity, from_msg=s, rename=(ident.rename is True or ident.rename == 'src')) Entity.find_or_create(ident.proto, ident.dst_kind, ident.dst_port or s.dst, ident.dst_entity, from_msg=s, rename=(ident.rename is True or ident.rename == 'dst')) except: raise Exception(f'In layer {layer} {s}') def find_entity(s, kind, with_port=None): for conn in (s.src_conns + s.dst_conns): ret = conn.find_entity(kind, with_port=with_port) if ret and ret[0] is not None: return ret return None, None def get_port(s, entity_kind:str): if s.src_entity_is(entity_kind): return s.src elif s.dst_entity_is(entity_kind): return s.dst return None def entity(s, *kinds): if s.src.entity and s.src.entity.kind in kinds: return s.src.entity if s.dst.entity and s.dst.entity.kind in kinds: return s.dst.entity def src_entity_is(s, *kinds): return s.src.entity and s.src.entity.kind in kinds def dst_entity_is(s, *kinds): return s.dst.entity and s.dst.entity.kind in kinds def same_src_dst(s, other, forward=None, reverse=None): # assume forward and reverse if neither are set. # if one of them is set to True, assume the other as False. if forward is None and reverse is None: forward = True reverse = True a = (s.src.key(), s.dst.key()) b = (other.src.key(), other.dst.key()) if forward and reverse: return set(a) == set(b) elif forward: return a == b elif reverse: return a == tuple(reversed(b)) else: return False @classmethod def parse(cls, p:Packet): msg = Message(p) Layer.parse(msg) msg.src = msg.get_trait(('tcp','udp','sctp'), 'src') msg.dst = msg.get_trait(('tcp','udp','sctp'), 'dst') if msg.src is None or msg.dst is None: return None assert isinstance(msg.src, IpPort) assert isinstance(msg.dst, IpPort) return msg def label(s): label = [] for l in s.layers.values(): if not SHOW_ALL_LAYERS: if l.minor: continue if l.hidden and not all((ll.minor or ll.hidden) for ll in s.layers.values()): continue label.insert(0, l.label()) return '/'.join(label) def related_subscribers(s): subscribers = UniqueList() src_sc = s.src_subscriber_conn() if src_sc: subscribers.append(src_sc.subscriber) dst_sc = s.dst_subscriber_conn() if dst_sc: subscribers.append(dst_sc.subscriber) for a in s.absorbed: subscribers.extend(a.related_subscribers()) return subscribers def is_subscriber_related(s, subscriber): return subscriber in s.related_subscribers() def __repr__(s): return s.__str__() def __str__(s): return s.str() def str(s, ladder=False, one_column_per_kind=False, show_traits=True, show_conns=True): t_name = [] t_name.extend(subscr.label() for subscr in s.related_subscribers()) name = s.label() if name: t_name.append(name) src = str(s.src) dst = str(s.dst) if s.src.entity is not None: src_str = s.src.entity.label() else: src_str = src if s.count and dst == src: dst_str = '(self)' elif s.dst.entity is not None: dst_str = s.dst.entity.label() else: dst_str = dst src_pos = 0 dst_pos = 0 if s.src.entity: src_pos = s.src.entity.labelcolumn(one_column_per_kind) if s.dst.entity: dst_pos = s.dst.entity.labelcolumn(one_column_per_kind) # allows injecting informational fake messages (Entity.news) if not s.count and not s.count_back: dst_pos = src_pos if not s.src.entity and not s.dst.entity: if src > dst: src_pos = dst_pos + 1 else: dst_pos = src_pos + 1 if not ladder: if src_pos > dst_pos: src_pos = 1 dst_pos = 0 else: src_pos = 0 dst_pos = 1 if src_pos <= dst_pos: left_pos = src_pos right_pos = dst_pos left_label = src_str right_label = dst_str to_left_count = s.count_back to_right_count = s.count else: left_pos = dst_pos right_pos = src_pos left_label = dst_str right_label = src_str to_left_count = s.count to_right_count = s.count_back left_strs = [] left_strs.append(left_label) if to_left_count: left_strs.append('<') if to_left_count > 1: left_strs.append(f'{to_left_count}') right_strs = [] if to_right_count: if to_right_count > 1: right_strs.append(f'{to_right_count}') right_strs.append('>') right_strs.append(right_label) real_left_pos = max(0, left_pos - (len(left_label)//2)) real_right_pos = right_pos - (len(right_label)//2) + len(right_label) + 1 - (len(right_label)&1) left_str = ''.join(left_strs) right_str = ''.join(right_strs) mid_gap = real_right_pos - real_left_pos - len(right_str) - len(left_str) mid_gap = max(1, mid_gap) if not ladder: mid_name_margin = 6 else: mid_name_margin = mid_gap - len(name) if to_left_count or to_right_count: if mid_name_margin > 50: mid_gap_strs = ['-' * int(mid_name_margin // 2), name, '-' * int(mid_name_margin - (mid_name_margin//2))] name = '' else: mid_gap_strs = ['-' * int(mid_gap)] else: mid_gap_strs = [] t = [' ' * int(real_left_pos), left_str,] t.extend(mid_gap_strs) t.append(right_str) if ladder: t = [''.join(t)] right_end = len(t[0]) label_pos = Entity.textcol_one_per_kind if one_column_per_kind else Entity.textcol_one_per_entity diff = label_pos - right_end if diff > 0: t.append(' ' * int(diff)) if show_traits: if isinstance(show_traits, str): show_traits = [show_traits] for proto,l in s.layers.items(): if not l.traits: continue if (show_traits is not True) and (proto not in show_traits): continue t_name.append('%s%s' % (proto, l.traits)) if show_conns: conns = set() for c in (s.src_conns + s.dst_conns): conns.add(f'{c.proto}:{c.conn_id}') #conns.add(f'{c}') t_name.append(' '.join(conns)) idxs = [s.p.idx] + [a.p.idx for a in s.absorbed] if len(idxs) <= 3: t_name.append('+'.join(str(i) for i in sorted(idxs))) else: t_name.append(f'{min(idxs)}-{max(idxs)}') t_name.append(f'{s.timestamp:.3f}') t.append(' ') t = [''.join(t)] indent = '\n' + (' ' * len(t[0]) + ' | ') t.append(' '.join(t_name)) for l in s._log: t.append(indent) t.append(l) return ''.join(t) def src_subscriber_conn(s): for conn in s.src_conns: if conn.subscriber_conn is not None: return conn.subscriber_conn return None def dst_subscriber_conn(s): for conn in s.dst_conns: if conn.subscriber_conn is not None: return conn.subscriber_conn return None def find_message(s, proto, trait, value): for subscr_conn in (s.src_subscriber_conn(), s.dst_subscriber_conn()): if subscr_conn is None: continue msg = subscr_conn.find_message(proto, trait, value) if msg: return msg return None class Entity: '''A core network program like BSC, MSC, ...''' KINDS_SORTING = ('MS', 'BTS', 'PCU', 'hNodeB', 'BSC', 'MGW@BSC', 'HNBGW', 'STP', 'MSC', 'MGW@MSC', 'MGW', 'SGSN', 'HLR', 'SIP', 'SIPcon', 'PBX', 'GGSN') KINDS_SORTING_EXIST = () entities = listdict() state_version = 1 # whether to update cached text columns spacing = 5 label_spacing = 2 textcol_one_per_kind = 0 textcol_one_per_entity = 0 # proxy / forwarding addresses to ignore, like the STP port blacklist = [] def __init__(s, kind:str): set_instance_vars_from_args() s.idx = None s.state_version = 0 s.ports = listdict() s.labelcol_one_per_kind = 0 s.labelcol_one_per_entity = 0 Entity.add(s) @classmethod def add(cls, entity): Entity.entities.add(entity.kind, entity) entity.idx = entity.idx_in_kind() if entity.kind not in Entity.KINDS_SORTING_EXIST: # a new kind has come up, refresh Entity.KINDS_SORTING_EXIST exist = [] for k in Entity.KINDS_SORTING: if k in Entity.entities.keys(): exist.append(k) for k in Entity.entities.keys(): if k not in exist: exist.append(k) Entity.KINDS_SORTING_EXIST = tuple(exist) Entity.state_version += 1 @classmethod def count_entities(cls, kind): l = Entity.entities.get(kind) return len(l) @classmethod def add_to_blacklist(cls, port:IpPort): if port in cls.blacklist: return cls.blacklist.append(port); def rename(s, to_kind): Entity.entities.remove(s.kind, s) was_kind = s.kind s.kind = to_kind Entity.add(s) for proto,l in s.ports.items(): for port in l: LOG(Color.colored('yellow', 'Rename port'), 'from', was_kind, 'to', s, proto, port) @classmethod def find_or_create(cls, proto, kind, port, matched_entity=None, from_msg=None, rename=False): if not port: return None if port in Entity.blacklist: return None found_entity = port.entity if found_entity and matched_entity and (found_entity is not matched_entity): LOG(Color.colored('purple', 'Renaming entity port:'), port, 'to', matched_entity) if not matched_entity: matched_entity = found_entity if matched_entity: if kind and matched_entity.kind != kind and rename: matched_entity.rename(kind) matched_entity.add_port(proto, port, from_msg=from_msg) return matched_entity if kind is None or rename: for l in Entity.entities.values(): for e in l: if e.has_port(port): if kind and e.kind != kind and rename: e.rename(kind) return e if kind is None: return None else: l = Entity.entities.get(kind) if l: for e in l: if e.has_port(port): return e e = Entity(kind) e.add_port(proto, port, from_msg=from_msg) return e def absorb(s, other_entity): '''Merge two entities to one, adopting the other's ports''' for port in other_entity.ports: s.add_port(port) del other_entity def label(s): idx = '' if s.idx: idx = str(s.idx + 1) return f'{s.kind}{idx}' def __repr__(s): return s.label() def __str__(s): return repr(s) def kind_idx(s): '''this entity kind's position in the currently known entity kinds: For 'BSC', if we've seen BTS, BSC and MSC, return 1.''' return Entity.KINDS_SORTING_EXIST.index(s.kind) def idx_in_all(s): '''this entity kind's position in all currently known entities: For the second 'BSC', if we've seen 2 BTS, 3 BSC and 1 MSC, return 2 (BTS) + 1 (second BSC) = 3.''' idx = 0 for k in Entity.KINDS_SORTING_EXIST: if k == s.kind: idx += Entity.entities.get(s.kind).index(s) return idx idx += Entity.count_entities(k) return idx def idx_in_kind(s): '''this entity kind's position in the list of entities of the same kind''' return Entity.entities.get(s.kind).index(s) def check_update_state(s): if s.state_version == Entity.state_version: return Entity.calculate_textcolumns() s.state_version = Entity.state_version def labelcolumn(s, one_column_per_kind=False, mid=True): s.check_update_state() if one_column_per_kind: midcol = s.labelcol_one_per_kind else: midcol = s.labelcol_one_per_entity if mid: ret = midcol else: ret = int(midcol - (len(s.label()) // 2)) return ret @classmethod def calculate_textcolumns(cls): '''In text rendering of a ladder diagram, return the text column for this entity, if rendering each entity in its own column (not sharing one column per entity kind)''' entity_col = 0 kind_col = 0 for k in Entity.KINDS_SORTING_EXIST: l = Entity.entities.get(k) kind_col += len(k) // 2 for e in l: e.labelcol_one_per_kind = kind_col Entity.textcol_one_per_kind = kind_col + len(e.label()) + Entity.spacing entity_col += len(e.kind)//2 e.labelcol_one_per_entity = entity_col entity_col += len(e.label()) + Entity.spacing Entity.textcol_one_per_entity = entity_col kind_col += len(k) + Entity.spacing def has_port(s, port, proto=None): if proto: if port in s.ports.get(proto, []): return proto return None for proto,l in s.ports.items(): if port in l: return proto return None def remove_port(s, port): for proto,l in s.ports.items(): if port in l: l.remove(port) port.entity = None return def add_port(s, proto, port, from_msg=None): if port.entity and port.entity is not s: port.entity.remove_port(port) if s.has_port(port, proto=proto): return s.ports.add(proto, port) port.entity = s port.proto = proto LOG(Color.colored('cyan', 'New port:'), port) class Subscriber: next_ident = 1 next_tmsi_idx = 1 imsis = {} tmsis = {} imeis = {} msisdns = {} def __init__(s, imsi:str=None, tmsi=None, imei=None, msisdn=None): set_instance_vars_from_args() s.subscriber_conns = UniqueList() s.tmsis = set() s.tmsi_idx = 0 s.set_last_tmsi(tmsi) s.ident = Subscriber.next_ident Subscriber.next_ident += 1 Subscriber.update_dicts(s) def set_last_tmsi(s, tmsi): if tmsi is None: return s.tmsi = tmsi if (not isinstance(tmsi, str)) or len(str(tmsi)) < 8: raise Exception('Invalid TMSI: ' + str(tmsi)) s.tmsi_idx = Subscriber.next_tmsi_idx Subscriber.next_tmsi_idx += 1 s.tmsis.add(tmsi) Subscriber.tmsis[s.tmsi] = s @classmethod def update_dicts(cls, s): if s.imsi: Subscriber.imsis[s.imsi] = s for tmsi in s.tmsis: Subscriber.tmsis[tmsi] = s if s.imei: Subscriber.imeis[s.imei] = s if s.msisdn: Subscriber.msisdns[s.msisdn] = s def label(s, full=False): l = [] if full or (not s.imsi and not s.tmsi and not s.imei and not s.msisdn): l.append(f'subscr{s.ident}') if s.imsi and (full or not s.msisdn): l.append(f'imsi{s.imsi}') if s.tmsi and (full or not s.imsi): l.append(f'tmsi{s.tmsi}') if s.imei and (full or (not s.imsi and not s.tmsi)): l.append(f'imei{s.imei}') if s.msisdn: l.append(f'msisdn{s.msisdn}') return Color.colored(s.ident, ':'.join(l)) def __repr__(s): return s.label(full=True) def __str__(s): return s.label() @classmethod def identify_subscriber(cls, msg:Message): imsi = msg.get_trait(('dtap','bssmap','rsl','gsup'), 'imsi') tmsi = tmsi_standardize(msg.get_trait(('dtap','bssmap','rsl'), 'tmsi')) imei = msg.get_trait('dtap', 'imei') msisdn = msg.get_trait('gsup', 'msisdn') if not (imsi or tmsi or imei or msisdn): return #if not msg.src_conns and not msg.dst_conns: # return # could start out with subscr = None, but to use a few less subscriber.ids start out with a present # subscriber, if any. subscr_conn = msg.src_subscriber_conn() or msg.dst_subscriber_conn() if subscr_conn is not None: subscr = subscr_conn.subscriber else: subscr = None if imsi: subscr = Subscriber.merge(Subscriber.by_imsi(imsi), subscr) if tmsi: subscr = Subscriber.merge(Subscriber.by_tmsi(tmsi), subscr) if imei: subscr = Subscriber.merge(Subscriber.by_imei(imei), subscr) if msisdn: subscr = Subscriber.merge(Subscriber.by_msisdn(msisdn), subscr) if subscr is None: return if subscr_conn is None: subscr_conn = SubscriberConn() subscr_conn.set_subscriber(subscr) for c in (msg.src_conns + msg.dst_conns): if c.subscriber_conn is None: subscr_conn.add_conn(c) c.subscriber_conn = subscr_conn else: c.subscriber_conn = SubscriberConn.merge(c.subscriber_conn, subscr_conn) @classmethod def by_imsi(cls, imsi): subscr = Subscriber.imsis.get(imsi) if not subscr: subscr = Subscriber(imsi=imsi) return subscr @classmethod def by_tmsi(cls, tmsi): subscr = Subscriber.tmsis.get(tmsi) if not subscr: subscr = Subscriber(tmsi=tmsi) return subscr @classmethod def by_imei(cls, imei): subscr = Subscriber.imeis.get(imei) if not subscr: subscr = Subscriber(imei=imei) return subscr @classmethod def by_msisdn(cls, msisdn): subscr = Subscriber.msisdns.get(msisdn) if not subscr: subscr = Subscriber(msisdn=msisdn) return subscr @classmethod def merge(cls, a, b): assert a is None or isinstance(a, Subscriber) assert b is None or isinstance(b, Subscriber) if a is None and b is None: return None if a is None: return b if b is None or b is a: return a if not a.imsi and a.ident > b.ident: return cls.merge(b, a) if a.imsi and b.imsi and a.imsi != b.imsi: out_error(f'cannot absorb, subscriber would change IMSI: {b.imsi} -> {a.imsi}') return None if a.imei and b.imei and a.imei != b.imei: out_error(f'cannot absorb, subscriber would change IMEI: {b.imei} -> {a.imei}') return None if b.imsi: a.imsi = b.imsi b.imsi = None if b.tmsis: a.tmsis.update(b.tmsis) b.tmsis = set() if b.tmsi_idx > a.tmsi_idx: a.set_last_tmsi(b.tmsi) b.tmsi = None if b.imei: a.imei = b.imei b.imei = None if b.msisdn: if a.msisdn and a.msisdn != b.msisdn: LOG(f'subscriber {a} changes MSISDN: {a.msisdn} -> {b.msisdn}') a.msisdn = b.msisdn b.msisdn = None Subscriber.update_dicts(a) for sc in b.subscriber_conns: a.add_subscriber_conn(sc) b.subscriber_conns = [] return a def add_subscriber_conn(s, subscriber_conn): if subscriber_conn.subscriber is s: return if subscriber_conn.subscriber: subscriber_conn.subscriber.subscriber_conns.remove(subscriber_conn) s.subscriber_conns.append(subscriber_conn) subscriber_conn.subscriber = s assert subscriber_conn in subscriber_conn.subscriber.subscriber_conns def find_entity(s, kind, with_port=None): for subscriber_conn in reversed(s.subscriber_conns): found, found_subscriber_conn = subscriber_conn.find_entity(kind, ask_subscriber=False, with_port=with_port) if found is not None: return found, found_subscriber_conn return None, None class SubscriberConn: '''A SubscriberConn is a collection of conns that feed into each other. For example, the Abis and the BSSMAP link for the same subscriber are related, as well as the MGCP and RTP spoken for that subscriber. If a subscriber disconnects and connects again, that is a new separate SubscriberConn; also if a subscriber would concurrently attach in twice somehow, that would be separate SubscriberConn instances. Note that a Message's src_conns and a dst_conns are not necessarily listed in the same SubscriberConn, for example for RTP, SIP or SMS, the messages may pass from one subscriber to another.''' def __init__(s, subscriber=None): set_instance_vars_from_args() s.conns = UniqueList() @classmethod def merge(cls, a, b): assert a is None or isinstance(a, SubscriberConn) assert b is None or isinstance(b, SubscriberConn) if a is None and b is None: return None if a is None: return b if b is None or b is a: return a b_subscr = b.subscriber if b.subscriber: b.subscriber.subscriber_conns.remove(b) b.subscriber = None a.subscriber = Subscriber.merge(a.subscriber, b_subscr) if a.subscriber: a.subscriber.subscriber_conns.append(a) for conn in b.conns: conn.subscriber_conn = a a.conns.append(conn) b.conns = None return a def find_entity(s, kind, with_port=None, ask_subscriber=True): if ask_subscriber and s.subscriber is not None: return s.subscriber.find_entity(kind, with_port=with_port) if with_port is not None and isinstance(with_port, str): with_port = [with_port] for conn in reversed(s.conns): if with_port is not None and conn.port.proto not in with_port: continue if conn.port.entity is not None and conn.port.entity.kind == kind: return conn.port.entity, s return None, None def find_message(s, proto, trait, val): for conn in reversed(s.conns): for msgs in (conn.tx_messages, conn.rx_messages): for msg in msgs: for p, t, v in msg.get_traits(proto, trait): if val is None or val == v: return msg return None def set_subscriber(s, subscriber): s.subscriber = subscriber if subscriber is not None: subscriber.subscriber_conns.append(s) def add_conn(s, conn): s.conns.append(conn) conn.subscriber_conn = s def __repr__(s): return f'{s.subscriber}~{s.conns}' class Layer_tcp(Layer): def __init__(s, msg:Message): p = msg.p traits = Traits( src=IpPort.from_tcp_source(p), dst=IpPort.from_tcp_dest(p), ) super().__init__(msg=msg, proto='tcp', msgtype=None, traits=traits, minor=True) class Layer_udp(Layer): def __init__(s, msg:Message): p = msg.p traits = Traits( src=IpPort.from_udp_source(p), dst=IpPort.from_udp_dest(p), ) super().__init__(msg=msg, proto='udp', msgtype=None, traits=traits, minor=True) class Layer_sctp(Layer): def __init__(s, msg:Message): p = msg.p traits = Traits( src=IpPort.from_sctp_source(p), dst=IpPort.from_sctp_dest(p), stream_id = p.get('sctp.data_sid'), stream_seq = p.get('sctp.data_ssn'), ) super().__init__(msg=msg, proto='sctp', msgtype=None, traits=traits, minor=True) class Layer_rtp(Layer): def __init__(s, msg:Message): pt = msg.p.get('rtp.p_type') iuup_msgtype = sane_showname(msg.p.get('iuup.pdu_type.showname')) if iuup_msgtype is not None: msgtype = f'{pt}.{iuup_msgtype}' else: msgtype = pt traits = Traits( pt=pt, iuup=iuup_msgtype ) super().__init__(msg=msg, proto='rtp', msgtype=msgtype, traits=traits) def collapse(s, messages, my_idx): msgtype = s.msg.get_trait('rtp', 'msgtype') src = s.msg.src dst = s.msg.dst for i in reversed(range(my_idx)): prev_msg = messages[i] if not prev_msg: continue if prev_msg.finalized: break if not 'rtp' in prev_msg.layers: if prev_msg.is_minor(): continue else: break if prev_msg.get_trait('rtp', 'msgtype') != msgtype: continue if s.msg.same_src_dst(prev_msg, forward=True): # found a recent RTP similar RTP packet, combine prev_msg.count += 1 messages[my_idx] = None prev_msg.absorb_msg(s.msg) return prev_msg if 1 and s.msg.same_src_dst(prev_msg, reverse=True): # same but backwards prev_msg.count_back += 1 messages[my_idx] = None prev_msg.absorb_msg(s.msg) return prev_msg return s.msg # identify_entities: RTP ports are identified by watching RSL and MGCP, see Layer_gsm_abis_rsl.identify_conns_rtp @classmethod def identify_conns(cls, messages:list, my_idx:int): msg = messages[my_idx] conn = Conn.find('rtp', msg.src, conn_id=msg.src.key()) if conn is not None: conn.add_message(msg) conn = Conn.find('rtp', msg.dst, conn_id=msg.dst.key()) if conn is not None: conn.add_message(msg) class Layer_mgcp(Layer): def __init__(s, msg:Message): p = msg.p verb = p.get('mgcp.req_verb') rsp = p.get('mgcp.rsp_rspstring') msgtype = verb or rsp or '?' tid = p.get('mgcp.transid', '') rtp_port = None sdp_rtp_ip = p.get('sdp.connection_info_address') sdp_rtp_port = p.get('sdp.media_port') if sdp_rtp_ip and sdp_rtp_port: rtp_port = IpPort.get(sdp_rtp_ip, sdp_rtp_port) if rsp: endp = p.get('mgcp.param_specificendpointid') else: endp = p.get('mgcp.req_endpoint') if endp and endp.startswith('rtpbridge/*@'): endp = None traits = Traits( tid=tid, endp=endp, ci=p.get('mgcp.param_connectionid'), verb=verb, rsp=rsp, rtp_port=rtp_port, ) s.tid = tid super().__init__(msg=msg, proto='mgcp', msgtype=msgtype, traits=traits) def label(s): return f'mgcp.{s.tid}.{s.msgtype}' def identify_entities(s, msg:Message, messages, my_idx): if msg.get_trait('mgcp', 'verb') == 'CRCX': dst_kind = 'MGW' if msg.src_entity_is('BSC'): dst_kind = 'MGW@BSC' elif msg.src_entity_is('MSC'): dst_kind = 'MGW@MSC' return Message.EntityIdent(proto='mgcp', dst_kind=dst_kind) elif msg.get_trait('mgcp', 'rsp') and msg.src_entity_is('MGW', 'MGW@BSC','MGW@MSC'): rtp = msg.get_trait('mgcp', 'rtp_port') if rtp: msg.src.entity.add_port('rtp', rtp) return None @classmethod def find_req(cls, messages, my_idx): msg = messages[my_idx] for match in find_same_trait(msg, messages, my_idx, 'mgcp', 'tid'): if match.get_trait('mgcp', 'rsp'): continue if not match.same_src_dst(msg, reverse=True): continue return match return None @classmethod def identify_conns(cls, messages:list, my_idx:int): msg = messages[my_idx] proto = 'mgcp' if msg.get_trait('mgcp', 'verb'): endp = msg.get_trait('mgcp', 'endp') mgw_port = msg.dst endp_conn = Conn.find(proto, mgw_port, endp) if endp_conn is not None: endp_conn.add_message(msg) for c in (msg.src_conns + msg.dst_conns): endp_conn.merge_subscr_conns(c) ci = msg.get_trait('mgcp', 'ci') conn_id = f'{endp}:{ci}' conn = Conn.find(proto, mgw_port, conn_id) if conn: conn.add_message(msg) for c in (msg.src_conns + msg.dst_conns + msg.meta_conns): conn.merge_subscr_conns(c) if msg.get_trait('mgcp', 'rsp') == 'OK': req = cls.find_req(messages, my_idx) if req is None: ERR('MGCP response without request') return mgw_port = req.dst verb = req.get_trait('mgcp', 'verb') if verb == 'CRCX': # The MGCP connection endp = msg.get_trait('mgcp', 'endp') or req.get_trait('mgcp', 'endp') ci = msg.get_trait('mgcp', 'ci') if not endp or not ci: ERR('MGCP CRCX with endp =', endp, 'ci =', ci) # creating two levels of conn: a meta conn with just endp, # and a proper one with endp+ci # endp: endp_conn = Conn.find(proto, mgw_port, endp) if not endp_conn: endp_conn = Conn.open(proto, mgw_port, conn_id=endp, start_msg=msg) # endp+ci: conn_id = f'{endp}:{ci}' conn = Conn.find(proto, mgw_port, conn_id) if not conn: conn = Conn.open(proto, mgw_port, conn_id, msg) conn.add_message(req) conn.add_message(msg) conn.merge_subscr_conns(endp_conn) # The RTP connection set up by MGCP rtp_port = msg.get_trait('mgcp', 'rtp_port') if rtp_port: rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, merge_counterparts=False, entity=conn.port.entity) rtp_conn.merge_subscr_conns(conn) # bssmap or ranap layer will mention this rtp_port in their Assignment / RAB-Assignment else: endp = req.get_trait('mgcp', 'endp') ci = req.get_trait('mgcp', 'ci') Conn.message(proto, mgw_port, endp, msg) conn_id = f'{endp}:{ci}' Conn.message(proto, mgw_port, conn_id, msg) if verb == 'DLCX': # The MGCP connection endp = req.get_trait('mgcp', 'endp') ci = req.get_trait('mgcp', 'ci') if not endp: ERR('MGCP DLCX without endp') def close_ci(ci): ci_conn_id = f'{endp}:{ci}' # go through all RTP ports created in this conn ci_conn = Conn.find(proto, mgw_port, ci_conn_id) if ci_conn is not None: all_rtp_ports = UniqueList() for msgs in (ci_conn.rx_messages, ci_conn.tx_messages): for msg in msgs: all_rtp_ports.append(msg.get_trait('mgcp', 'rtp_port')) for rtp_port in all_rtp_ports: Conn.close('rtp', rtp_port, conn_id=rtp_port.key(), close_msg=msg, if_exists=True) Conn.close(proto, mgw_port, ci_conn_id, msg) if ci: close_ci(ci) else: endp_conn = Conn.find(proto, mgw_port, endp) all_ci = UniqueList() if endp_conn: for msgs in (endp_conn.tx_messages, endp_conn.rx_messages): for msg in msgs: all_ci.append(msg.get_trait('mgcp', 'ci')) for ci in all_ci: close_ci(ci) Conn.close(proto, mgw_port, endp, msg, if_exists=True) elif msg.get_trait('mgcp', 'verb') == 'MDCX': # The RTP connection set up by BSC or MSC rtp_port = msg.get_trait('mgcp', 'rtp_port') if rtp_port and Conn.find('rtp', rtp_port, conn_id=rtp_port.key()) is None: rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, merge_counterparts=False) for c in msg.src_conns: rtp_conn.merge_subscr_conns(c) class Layer_sccp(Layer): def __init__(s, msg:Message): p = msg.p msgtype = p.get('sccp.message_type.showname') traits = Traits( src_lref=p.get('sccp.slr'), dst_lref=p.get('sccp.dlr'), ) super().__init__(msg=msg, proto='sccp', msgtype=msgtype, traits=traits, hidden=True) def collapse(s, messages, my_idx): msg = s.msg # cut out STP hop if SCCP_COLLAPSE_STP: src = msg.src t = msg.timestamp for i in reversed(range(my_idx)): prev_msg = messages[i] if not prev_msg: continue if t - prev_msg.timestamp > 1: break if prev_msg.absorbed: continue prev_sccp = prev_msg.layers.get(s.proto, None) if prev_sccp is None: continue #if src != prev_msg.dst: # continue if s.msgtype != prev_sccp.msgtype: continue if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True): continue if not msg.same_traits(prev_msg, 'sctp', 'stream_id'): continue if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')): continue if not msg.same_traits(prev_msg, 'm3ua', 'message_length'): continue prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst')) prev_msg.dst = msg.dst prev_msg.absorb_msg(msg) messages[i] = None messages[my_idx] = prev_msg Entity.add_to_blacklist(src) return prev_msg return msg @classmethod def identify_conns(cls, messages:list, my_idx:int): msg = messages[my_idx] proto = 'sccp' msgtype = msg.get_trait(proto, 'msgtype') if SCCP_COLLAPSE_STP and not msg.absorbed: return src_id = msg.get_trait(proto, 'src_lref') dst_id = msg.get_trait(proto, 'dst_lref') if msgtype == 'Connection-Request': Conn.open(proto, msg.src, src_id, msg) elif msgtype == 'Connection-Confirm': Conn.open(proto, msg.src, src_id, msg, counterparts=[Conn.find(proto, msg.dst, dst_id)]) elif msgtype in ('Release-Complete',): Conn.close(proto, msg.src, src_id, msg) else: if src_id: Conn.message(proto, msg.src, src_id, msg) if dst_id: Conn.message(proto, msg.dst, dst_id, msg) class Layer_m3ua(Layer): def __init__(s, msg:Message): traits = Traits( opc = msg.p.get('m3ua.protocol_data_opc'), dpc = msg.p.get('m3ua.protocol_data_dpc'), message_length = msg.p.get('m3ua.message_length'), ) super().__init__(msg=msg, proto='m3ua', msgtype=None, traits=traits, minor=True) # wireshark commonly falsely classifies a BSSMAP Ciphering Mode Command as RNSAP PDU class Layer_rnsap(Layer): def __init__(s, msg:Message): p = msg.p traits = Traits() msgtype = 'Cipher Mode Command' super().__init__(msg=msg, proto='bssmap', msgtype=msgtype, traits=traits) class Layer_bssap(Layer): 'BSSAP, either BSS Management (see Layer_gsm_a_bssmap) or Direct Transfer (see Layer_gsm_a_dtap)' def __init__(s, msg:Message): msgtype = msg.p.get('bssap.msgtype.showname') msgtype = msg.p.get('bssap.pdu_type.showname') traits = Traits( msgtype_nr=int(msg.p.get('bssap.pdu_type'), 16), ) super().__init__(msg=msg, proto='bssap', msgtype=msgtype, traits=traits, minor=True) class Layer_bssgp(Layer): def __init__(s, msg:Message): msgtype = sane_msgtype(msg.p.get('bssgp.pdu_type.showname')) traits = Traits( tlli=msg.p.get('bssgp.gsm_a_rr_tlli'), ) super().__init__(msg=msg, proto='bssgp', msgtype=msgtype, traits=traits) def identify_entities(s, msg:Message, messages, my_idx): if msg.get_trait('bssgp', 'msgtype') == 'FLOW-CONTROL-BVC': return Message.EntityIdent(proto='bssgp', src_kind='PCU', dst_kind='SGSN') return None @classmethod def identify_conns(cls, messages:list, my_idx:int): msg = messages[my_idx] proto = 'bssgp' msgtype = msg.get_trait('dtap', 'msgtype') tlli = msg.get_trait('bssgp', 'tlli') conn_id = tlli if not conn_id: return if msgtype == 'Attach-Request': Conn.open(proto, msg.src, conn_id, msg, counterparts=[Conn.open(proto, msg.dst, conn_id, msg)]) elif msgtype == 'Attach-Accept': conn = Conn.close(proto, msg.src, conn_id, msg) new_conn_id = msg.get_trait('dtap', 'tmsi') new_conn = Conn.open(proto, msg.src, new_conn_id, msg, counterparts=[Conn.open(proto, msg.dst, new_conn_id, msg)]) new_conn.merge_subscr_conns(conn) elif msgtype == 'Attach-Complete': Conn.close(proto, msg.src, conn_id, msg) else: conn = Conn.message(proto, msg.src, conn_id, msg) class Layer_hnbap(Layer): def __init__(s, msg:Message): def strip_till_dash(dashstr): if not dashstr or not '-' in dashstr: return dashstr dash = dashstr.rindex('-') return dashstr[dash+1:] msgtype = strip_till_dash(msg.p.get('hnbap.procedurecode.showname')) pdutype = strip_till_dash(sane_msgtype(msg.p.get('hnbap.hnbap_pdu.showname'))) pdutype_nr = msg.p.get('hnbap.hnbap_pdu') traits = Traits( msgtype_nr=int(msg.p.get('hnbap.procedurecode')), pdutype=pdutype, pdutype_nr=int(pdutype_nr), ) super().__init__(msg=msg, proto='hnbap', msgtype=msgtype, traits=traits) def identify_entities(s, msg:Message, messages, my_idx): if (msg.get_trait('hnbap', 'msgtype') in ('Register', 'HNBRegister', 'UERegister')) and (msg.get_trait('hnbap', 'pdutype_nr') == 0): return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW') return None class Layer_rua(Layer): def __init__(s, msg:Message): def strip_till_dash(dashstr): if not dashstr or not '-' in dashstr: return dashstr dash = dashstr.rindex('-') return dashstr[dash+1:] msgtype = strip_till_dash(msg.p.get('rua.procedurecode.showname')) pdutype = strip_till_dash(sane_msgtype(msg.p.get('rua.rua_pdu.showname'))) pdutype_nr = msg.p.get('rua.rua_pdu') cn_domain_i = msg.p.get('rua.cn_domainindicator') cn_domain = None if cn_domain_i == '0': cn_domain = 'cs' elif cn_domain_i == '1': cn_domain = 'ps' traits = Traits( msgtype_nr=int(msg.p.get('rua.procedurecode')), pdutype=pdutype, cn_domain=cn_domain, rua_ctx=msg.p.get('rua.context_id'), ) super().__init__(msg=msg, proto='rua', msgtype=msgtype, traits=traits, cap_p_name='rua') @classmethod def identify_conns(cls, messages:list, my_idx:int): msg = messages[my_idx] proto = 'rua' msgtype = msg.get_trait(proto, 'msgtype') conn_id = (msg.get_trait(proto, 'cn_domain') or '?') + ':' + (msg.get_trait(proto, 'rua_ctx') or '?') if msgtype == 'Connect': conn = Conn.open(proto, msg.src, conn_id, msg) Conn.open(proto, msg.dst, conn_id, msg, counterparts=[conn]) elif msgtype == 'Disconnect': Conn.close(proto, msg.dst, conn_id, msg) else: Conn.message(proto, msg.src, conn_id, msg) class Layer_ranap(Layer): def __init__(s, msg:Message): msgtype = msg.p.get('ranap.rab_assignmentrequest_element' ) or msg.p.get('ranap.rab_assignmentresponse_element' ) or msg.p.get('ranap.initiatingmessage_element') rtp_port = None ip = msg.p.get('ranap.nsap_ipv4_addr') port_bin = msg.p.get('ranap.bindingid.binary_value') if ip and port_bin and len(port_bin) >= 2: port = int.from_bytes(port_bin[:2], "big") rtp_port = IpPort.get(ip, port) traits = Traits( rtp_port=rtp_port, ) super().__init__(msg=msg, proto='ranap', msgtype=msgtype, traits=traits) def collapse(s, messages, my_idx): msg = s.msg # cut out HNBGW hop if IUH_COLLAPSE_HNBGW: src = msg.src t = msg.timestamp for i in reversed(range(my_idx)): prev_msg = messages[i] if not prev_msg: continue if t - prev_msg.timestamp > 1: break if src != prev_msg.dst: continue if msg.src.entity is not prev_msg.dst.entity: continue # DOESNT WORK if not msg.same_traits(prev_msg, 'ranap', None): continue if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True): continue if not msg.same_traits(prev_msg, 'sctp', 'stream_id'): continue if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')): continue prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst')) prev_msg.dst = msg.dst prev_msg.absorb_msg(msg) messages[i] = None messages[my_idx] = prev_msg Entity.add_to_blacklist(src) return prev_msg return msg def identify_entities(s, msg:Message, messages, my_idx): ids = [] ids.append(s.identify_attach(msg, messages, my_idx)) msgtype = msg.get_trait('ranap', 'msgtype') rtp_port = msg.get_trait('ranap', 'rtp_port') if rtp_port and msgtype == 'RAB-AssignmentRequest': # associate the MSC's MGCP port, but take care to not say the STP is an MSC crcx_ok = msg.find_message('mgcp', 'rtp_port', rtp_port) if crcx_ok and crcx_ok.src_entity_is('MGW'): mgw = crcx_ok.src.entity msc = None if msg.src_entity_is('MSC'): msc = msg.src.entity ids.append(Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@MSC', rename=True, dst_entity=msc, dst_port=crcx_ok.dst if msc else None)) if rtp_port and msgtype == 'RAB-AssignmentResponse' and msg.src_entity_is('hNodeB'): ids.append(Message.EntityIdent(proto='rtp', src_kind='hNodeB', src_port=rtp_port, src_entity=msg.src.entity)) return ids def identify_attach(s, msg:Message, messages, my_idx): ids = [] dst_kind = None msgtype = msg.get_trait('dtap', 'msgtype') if msgtype in DTAP_COMPL_L3: dst_kind = 'MSC' proto = 'IuCS' elif msgtype in GMM_COMPL_L3: dst_kind = 'SGSN' proto = 'IuPS' if not dst_kind: return None if 'rua' in msg.layers: return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW') # don't mistake the STP as MSC or SGSN if SCCP_COLLAPSE_STP and not msg.absorbed: return None if not SCCP_COLLAPSE_STP and msg.src_entity_is('HNBGW'): return None # FIXME: below only makes sense with SCCP_COLLAPSE_STP == True if not SCCP_COLLAPSE_STP: return None # find a HNBGW that has recently received the same LU, # associate IuCS port src_entity = None for match in find_same_trait(msg, messages, my_idx, 'dtap', None): if 'rua' not in match.layers: continue if not match.dst_entity_is('HNBGW'): continue src_entity = match.dst.entity if src_entity: break return Message.EntityIdent(proto=proto, src_kind='HNBGW', src_entity=src_entity, dst_kind=dst_kind) @classmethod def identify_conns(cls, messages, my_idx): msg = messages[my_idx] if SCCP_COLLAPSE_STP and not msg.absorbed: return rtp_port = msg.get_trait('ranap', 'rtp_port') if rtp_port: rtp_conn = Conn.find('rtp', rtp_port, rtp_port.key()) if rtp_conn: for c in msg.src_conns: rtp_conn.merge_subscr_conns(c) class Layer_gsm_a_bssmap(Layer): def __init__(s, msg:Message): p = msg.p msgtype = p.get('gsm_a_bssmap.msgtype.showname') rtp_port = None ip = p.get('gsm_a_bssmap.aoip_trans_ipv4') port = p.get('gsm_a_bssmap.aoip_trans_port') if ip and port: rtp_port = IpPort.get(ip, port) tmsi = tmsi_standardize(p.get('gsm_a_bssmap.gsm_a_tmsi')) traits = Traits( msgtype_nr=str_to_int(p.get('gsm_a_bssmap.msgtype')), rtp_port=rtp_port, imsi=p.get('gsm_a_bssmap.e212_imsi'), tmsi=tmsi, ) super().__init__(msg=msg, proto='bssmap', msgtype=msgtype, traits=traits, cap_p_name='gsm_a_bssmap') def identify_entities(s, msg:Message, messages, my_idx): # don't mistake the STP as MSC or BSC if SCCP_COLLAPSE_STP and not msg.absorbed: return None msgtype = msg.get_trait('bssmap', 'msgtype') if msgtype in ('Complete-Layer-3-Information', 'Clear-Complete'): # associate BSC BSSMAP port with BSC RSL port src_entity = None for match in find_same_trait(msg, messages, my_idx, 'dtap', ('tmsi', 'imsi')): if 'rsl' not in match.layers: continue if not match.dst.entity or match.dst.entity.kind != 'BSC': continue src_entity = match.dst.entity if src_entity: break return Message.EntityIdent(proto='bssmap', src_kind='BSC', dst_kind='MSC', src_entity = src_entity) if msgtype == 'Assignment-Request': # This Assignment-Request's rtp_port should match an earlier MGCP CRCX-OK message, and we now # know that this MSC asked for it. rtp_port = msg.get_trait('bssmap', 'rtp_port') if not rtp_port: return None def cond(prev_msg): return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port crcx_ok = None for prev_msg in find_recent_msg(msg, messages, my_idx, cond): crcx_ok = prev_msg break if not crcx_ok: return None msc = msg.src.entity mgw = crcx_ok.src.entity return Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@MSC', rename=True, dst_port=crcx_ok.dst, dst_entity=msc, dst_kind='MSC') return None @classmethod def identify_conns(cls, messages, my_idx): msg = messages[my_idx] msgtype = msg.get_trait('bssmap', 'msgtype') if SCCP_COLLAPSE_STP and not msg.absorbed: return # Paging does not have a proper end, it may never be answered. # The RSL paging command hopefully happens, and closes this Conn. if msgtype == 'Paging': imsi = msg.get_trait('bssmap', 'imsi') if imsi is not None: c = Conn.open('bssmap', msg.dst, f'page_imsi{imsi}', msg) Conn.close_conn(c, msg) tmsi = msg.get_trait('bssmap', 'tmsi') if tmsi is not None: c = Conn.open('bssmap', msg.dst, f'page_tmsi{tmsi}', msg) Conn.close_conn(c, msg) rtp_port = msg.get_trait('bssmap', 'rtp_port') if rtp_port is None: return rtp_conn = Conn.find('rtp', rtp_port, rtp_port.key()) if rtp_conn is None: return for c in msg.src_conns: rtp_conn.merge_subscr_conns(c) def cond(prev_msg): return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port crcx_ok = None for prev_msg in find_recent_msg(msg, messages, my_idx, cond): for c in (prev_msg.src_conns + prev_msg.dst_conns): rtp_conn.merge_subscr_conns(c) class Layer_gsm_abis_rsl(Layer): def __init__(s, msg:Message): p = msg.p msgtype = sane_msgtype(p.get('gsm_abis_rsl.msg_type.showname')) msgtype_nr = p.get('gsm_abis_rsl.msg_type') msgtype_nr = str_to_int(msgtype_nr) ch = None ch_imm_ass = None # For Immediate Assignment, the assigned TS/chan is more interesting ts = p.get('gsm_a_ccch.gsm_a_rr_timeslot') cbits = (p.get('gsm_a_ccch.gsm_a_rr_sdcch4_sdcchc4_cbch') or p.get('gsm_a_ccch.gsm_a_rr_sdcch8_sdcchc8_cbch')) try: ch_ts = int(ts) ch_cbits = int(cbits) ch_imm_ass = f'{ch_ts}-{ch_cbits}' except: pass # normal RSL messages on a given TS/chan ts = p.get('gsm_abis_rsl.ch_no_tn') cbits = p.get('gsm_abis_rsl.ch_no_cbits') if ts is not None and cbits is not None: try: ch_ts = int(ts) ch_cbits = int(cbits) ch = f'{ch_ts}-{ch_cbits}' except: raise ch_assign = None new_ch_ts = p.get('gsm_a_dtap.gsm_a_rr_timeslot') new_ch_ss = p.get('gsm_a_dtap.gsm_a_rr_tch_facch_sacchf') if new_ch_ts and new_ch_ss: ch_assign = f'{new_ch_ts}-{new_ch_ss}' rtp_local_port = None ipacc_rtp_local_ip = p.get('gsm_abis_rsl.ipacc_local_ip') ipacc_rtp_local_port = p.get('gsm_abis_rsl.ipacc_local_port') if ipacc_rtp_local_ip and ipacc_rtp_local_port: rtp_local_port = IpPort.get(ipacc_rtp_local_ip, ipacc_rtp_local_port) rtp_remote_port = None ipacc_rtp_remote_ip = p.get('gsm_abis_rsl.ipacc_remote_ip') ipacc_rtp_remote_port = p.get('gsm_abis_rsl.ipacc_remote_port') if ipacc_rtp_remote_ip and ipacc_rtp_remote_port: rtp_remote_port = IpPort.get(ipacc_rtp_remote_ip, ipacc_rtp_remote_port) req_ref_l = (p.get('gsm_abis_rsl.req_ref_ra'), p.get('gsm_abis_rsl.req_ref_t1prim'), p.get('gsm_abis_rsl.req_ref_t2'), p.get('gsm_abis_rsl.req_ref_t3')) if not all(req_ref_l): req_ref_l = (p.get('gsm_a_ccch.gsm_a_rr_ra'), p.get('gsm_a_ccch.gsm_a_rr_t1prim'), p.get('gsm_a_ccch.gsm_a_rr_t2'), p.get('gsm_a_ccch.gsm_a_rr_t3')) req_ref = None if all(req_ref_l): req_ref = '-'.join(req_ref_l) try: page_tmsi = tmsi_standardize(msg.p.cap_p.gsm_abis_rsl._all_fields['3gpp.tmsi']) except: page_tmsi = None traits = Traits( msgtype_nr=msgtype_nr, ch=ch, ch_imm_ass=ch_imm_ass, ch_assign=ch_assign, chan_type=p.get('gsm_abis_rsl.ch_type'), rtp_port=rtp_local_port, rtp_remote_port=rtp_remote_port, tmsi=tmsi_standardize(p.get('gsm_abis_rsl.gsm_a_tmsi')), imsi=p.get('gsm_abis_rsl.gsm_a_imsi'), page_tmsi=page_tmsi, arfcn = p.get('gsm_a_dtap.gsm_a_rr_single_channel_arfcn') or p.get('gsm_abis_rsl.gsm_a_rr_single_channel_arfcn'), req_ref = req_ref, ) super().__init__(msg=msg, proto='rsl', msgtype=msgtype, traits=traits, cap_p_name='gsm_abis_rsl') # ignore CCCH Load INDication #if msgtype_nr == 18: # msg.hide = True def identify_entities(s, msg:Message, messages, my_idx): ids = [] msgtype = msg.get_trait('rsl', 'msgtype') if msgtype in ('RF-RESource-INDication', 'CCCH-LOAD-INDication', 'CHANnel-ReQuireD', ): # INDication from BTS to BSC ids.append(Message.EntityIdent(proto='rsl', src_kind='BTS', dst_kind='BSC')) if msgtype in ('CHANnel-ACTIVation', 'IMMEDIATE-ASSIGN-COMMAND'): # from BSC to BTS ids.append(Message.EntityIdent(proto='rsl', dst_kind='BTS', src_kind='BSC')) rtp_port = msg.get_trait('rsl', 'rtp_port') if rtp_port and msg.src_entity_is('BTS') and msgtype in ( 'ip.access-CRCX-ACK', 'ip.access-MDCX-ACK'): ids.append(Message.EntityIdent(proto='rtp', src_kind='BTS', src_entity=msg.src.entity, src_port=rtp_port)) if msgtype == 'ip.access-MDCX': # This ip.a MDCX's rtp_remote_port should match an earlier MGCP CRCX-OK message, and we now # know that this BSC asked for it. rtp_port = msg.get_trait('rsl', 'rtp_remote_port') if not rtp_port: return None def cond(prev_msg): return prev_msg.get_trait('mgcp', 'rtp_port') == rtp_port crcx_ok = None for prev_msg in find_recent_msg(msg, messages, my_idx, cond): crcx_ok = prev_msg break if not crcx_ok: return None bsc = msg.src.entity mgw = crcx_ok.src.entity ids.append( Message.EntityIdent(proto='mgcp', src_port=crcx_ok.src, src_entity=mgw, src_kind='MGW@BSC', rename=True, dst_port=crcx_ok.dst, dst_entity=bsc, dst_kind='BSC') ) return ids def collapse(s, messages, my_idx): # combine duplicates like rsl.CCCH-LOAD-INDication for i in reversed(range(my_idx)): prev_msg = messages[i] if not prev_msg: continue if prev_msg.finalized: break # stop combining at any non-rsl (and non-minor) message if not 'rsl' in prev_msg.layers: if all(l.minor for l in prev_msg.layers.values()): continue else: break if not same_nonempty(prev_msg.get_traits('rsl'), s.msg.get_traits('rsl')): continue if s.msg.same_src_dst(prev_msg, forward=True): # found a recent similar packet, combine prev_msg.count += 1 messages[my_idx] = None prev_msg.absorb_msg(s.msg) return prev_msg return s.msg @classmethod def identify_conns_ra(cls, messages:list, my_idx:int): msg = messages[my_idx] msgtype = msg.get_trait('rsl', 'msgtype') bts = msg.entity('BTS') if bts is None: return None bts_port = msg.get_port('BTS') if bts_port is None: return None ra = msg.get_trait('rsl', 'req_ref') if ra is None: return None bts_ra = f'{bts_port.entity.label()}.ra{ra}' proto = 'rsl' conn_id = bts_ra conn = None if msgtype == 'CHANnel-ReQuireD': if not msg.src_entity_is('BTS'): return bts = msg.src.entity bts_port = msg.src bsc = msg.dst.entity bsc_port = msg.dst conn = Conn.open(proto, bts_port, conn_id, msg, entity=bts) Conn.open(proto, bsc_port, conn_id, msg, entity=bsc, counterparts=[conn]) elif msgtype in ('CHANnel-ACTIVation'): conn = Conn.message(proto, msg.dst, conn_id, msg) elif msgtype == 'IMMEDIATE-ASSIGN-COMMAND': # the RA token has fulfilled its use as soon as an IMM ASS happened Conn.close(proto, msg.dst, conn_id, msg) else: Conn.message(proto, msg.dst, conn_id, msg) if conn: for c in (msg.src_conns + msg.dst_conns): conn.merge_subscr_conns(c) @classmethod def identify_conns_ch(cls, messages:list, my_idx:int): msg = messages[my_idx] proto = 'rsl' msgtype = msg.get_trait('rsl', 'msgtype') # For Immediate Assignment, the assigned TS/chan is the one to match on if msgtype == 'IMMEDIATE-ASSIGN-COMMAND': ch = msg.get_trait('rsl', 'ch_imm_ass') else: ch = msg.get_trait('rsl', 'ch') if ch is None: return None if msgtype in ('CHANnel-ACTIVation') or msg.src_entity_is('BSC'): bsc = msg.src.entity bsc_port = msg.src bts = msg.dst.entity bts_port = msg.dst elif msg.src_entity_is('BTS'): bts = msg.src.entity bts_port = msg.src bsc = msg.dst.entity bsc_port = msg.dst else: return None if bts_port.entity is None: return None bts_ch = f'{bts_port.entity.label()}.ch{ch}' conn_id = bts_ch if msgtype == 'CHANnel-ACTIVation': conn = Conn.open(proto, bts_port, conn_id, msg, entity=bts) Conn.open(proto, bsc_port, conn_id, msg, entity=bsc, counterparts=[conn]) elif msgtype == 'RF-CHANnel-RELease-ACKnowledge': conn = Conn.close(proto, msg.src, conn_id, msg) else: conn = Conn.message(proto, bts_port, conn_id, msg) if conn: for c in (msg.src_conns + msg.dst_conns): conn.merge_subscr_conns(c) # when changing to a new ch, e.g. a regular Assignment Command to change from SDCCH to TCH, # associate the new channel with the conn ch_assign = msg.get_trait('rsl', 'ch_assign') if ch_assign: conn_id = f'{bts_port.entity.label()}.ch{ch_assign}' conn = Conn.find(proto, bts_port, conn_id) if conn: conn.add_message(msg) for c in (msg.src_conns + msg.dst_conns): conn.merge_subscr_conns(c) @classmethod def identify_conns_rtp(cls, messages:list, my_idx:int): msg = messages[my_idx] # BTS RTP rtp_port = msg.get_trait('rsl', 'rtp_port') if rtp_port and msg.src_entity_is('BTS') and msg.get_trait('rsl','msgtype') == 'ip.access-CRCX-ACK': rtp_port.entity = msg.src.entity rtp_conn = Conn.open('rtp', rtp_port, conn_id=rtp_port.key(), start_msg=msg, add_message=False) for c in (msg.src_conns + msg.dst_conns): rtp_conn.merge_subscr_conns(c) # MGW@BSC RTP towards BTS rtp_port = msg.get_trait('rsl', 'rtp_remote_port') if rtp_port: rtp_conn = Conn.find('rtp', rtp_port, conn_id=rtp_port.key()) if rtp_conn: rtp_conn.add_message(msg) for c in (msg.src_conns + msg.dst_conns): rtp_conn.merge_subscr_conns(c) @classmethod def identify_conns_paging(cls, messages:list, my_idx:int): msg = messages[my_idx] msgtype = msg.get_trait('rsl', 'msgtype') if msgtype != 'PAGING-CoMmanD': return tmsi = msg.get_trait('rsl', 'page_tmsi') if tmsi is None: return conn_id = f'page_tmsi{tmsi}' subscr = Subscriber.by_tmsi(tmsi) if subscr is None: return rsl_conn = Conn.open('rsl', msg.src, conn_id, msg) subscr_conn = SubscriberConn() subscr.add_subscriber_conn(subscr_conn) subscr_conn.add_conn(rsl_conn) # Paging does not have a proper end, it may never be answered. # A Conn wants to be closed at some point. Just close it directly. Conn.close_conn(rsl_conn, msg) # Expecting a recent Paging on BSSMAP bssmap_paging = None def cond(prev_msg): return (prev_msg.get_trait('bssmap', 'msgtype') == 'Paging' and prev_msg.get_trait('bssmap', 'tmsi') == tmsi) for match in find_recent_msg(msg, messages, my_idx, cond): bssmap_paging = match break if bssmap_paging is None: return bssmap_conn = Conn.find('bssmap', bssmap_paging.dst, conn_id, find_in_closed_conns=True) if bssmap_conn: rsl_conn.merge_subscr_conns(bssmap_conn) @classmethod def identify_conns(cls, messages:list, my_idx:int): cls.identify_conns_ra(messages, my_idx) cls.identify_conns_ch(messages, my_idx) cls.identify_conns_rtp(messages, my_idx) cls.identify_conns_paging(messages, my_idx) class Layer_gsm_a_dtap(Layer): def __init__(s, msg:Message): dtap = msg.p.get('gsm_a_dtap') assert dtap is not None msgtype = None msgtype_nr = None for f in dtap._get_all_fields_with_alternates(): if f.name.startswith('gsm_a.dtap.msg_') and f.name.endswith('_type'): msgtype = f.showname_value try: msgtype_nr = int(f.raw_value) except: pass traits = Traits( msgtype_nr=msgtype_nr, imsi=msg.p.get('gsm_a_dtap.e212_imsi') or msg.p.get('gsm_a_dtap.gsm_a_imsi'), tmsi=tmsi_standardize(msg.p.get('gsm_a_dtap.gsm_a_tmsi') or msg.p.get('gsm_a_dtap.3gpp_tmsi')), imei=msg.p.get('gsm_a_dtap.gsm_a_imei'), to_msisdn=msg.p.get('gsm_a_dtap.cld_party_bcd_num'), ) super().__init__(msg=msg, proto='dtap', msgtype=msgtype, traits=traits, cap_p_name='gsm_a_dtap') class Layer_gsup(Layer): def __init__(s, msg:Message): msgtype = sane_msgtype(msg.p.get('gsup.msg_type.showname')) msisdn = None if '-forwardSM-' not in msgtype: msisdn = msg.p.get('gsup.e164_msisdn') to_msisdn = msg.p.get('gsup.gsm_sms_tp_da') msgtype_nr = None is_request = None try: msgtype_nr = int(msg.p.get('gsup.msg_type')) is_request = not (msgtype_nr & 0x3) except: pass session_state = None try: session_state = int(msg.p.get('gsup.session_state')) except: pass traits = Traits( imsi=msg.p.get('gsup.e212_imsi'), msgtype_nr=msgtype_nr, is_request=is_request, cn_domain=sane_showname(msg.p.get('gsup.cn_domain.showname')), msisdn=msisdn, to_msisdn=to_msisdn, source_name=msg.p.get('gsup.source_name_text'), destination_name=msg.p.get('gsup.destination_name_text'), session_id=msg.p.get('gsup.session_id'), session_state=session_state, ) super().__init__(msg=msg, proto='gsup', msgtype=msgtype, traits=traits) @classmethod def identify_conns(cls, messages:list, my_idx:int): msg = messages[my_idx] imsi = msg.get_trait('gsup', 'imsi') is_request = msg.get_trait('gsup', 'is_request') session_id = msg.get_trait('gsup', 'session_id') session_state = msg.get_trait('gsup', 'session_state') if not imsi: return proto = 'gsup' if session_id: conn_id = f'{imsi}:{session_id}' have = Conn.find(proto, msg.src, conn_id) or Conn.find(proto, msg.dst, conn_id) if have is None: have = Conn.open(proto, msg.src, conn_id, msg, counterparts=[Conn.open(proto, msg.dst, conn_id, msg)]) if session_state is not None and session_state == 0x3: Conn.close(proto, msg.src, conn_id, msg) else: have.add_message(msg) else: conn_id = f'{imsi}' if is_request: have = Conn.find(proto, msg.src, conn_id) if have is None: Conn.open(proto, msg.src, conn_id, msg) else: have.add_message(msg) else: have = Conn.find(proto, msg.dst, conn_id) if have is None: # it's a stray response? anyway create a conn for association with a subscriber Conn.open(proto, msg.dst, conn_id, msg) Conn.close(proto, msg.dst, conn_id, msg) def identify_entities(s, msg:Message, messages, my_idx): if msg.get_trait('gsup', 'msgtype') in ('SendAuthInfo-Request', 'UpdateLocation-Request', 'PurgeMS-Request'): cn = msg.get_trait('gsup', 'cn_domain') src_kind = None src_entity = None src_subscr_conn = None if msg.get_trait('gsup', 'source_name'): # proxy forwarding src_kind = 'HLR' traits = [name for proto, name, result in msg.get_traits('gsup') if name not in ('source_name', 'destination_name')] for match in find_same_trait(msg, messages, my_idx, 'gsup', traits): if not match.dst_entity_is('HLR'): continue src_entity = match.dst.entity break else: if cn == 'CS': src_kind = 'MSC' elif cn == 'PS': src_kind = 'SGSN' if src_kind: # associate MSC GSUP port with MSC BSSMAP port imsi = msg.get_trait('gsup', 'imsi') if imsi: subscr = Subscriber.by_imsi(imsi) def cond(prev_msg): return (prev_msg.dst_entity_is(src_kind) and prev_msg.is_subscriber_related(subscr)) for match in find_recent_msg(msg, messages, my_idx, cond): src_entity = match.dst.entity break # i forgot whatever this does: if not src_entity: src_entity, src_subscr_conn = s.msg.find_entity(src_kind, with_port=('bssgp', 'IuPS')) else: # no cn_domain in the GSUP message, try to guess msc, msc_subscr_conn = s.msg.find_entity('MSC') sgsn, sgsn_subscr_conn = s.msg.find_entity('SGSN') if msc and not sgsn: src_entity = msc src_subscr_conn = msc_subscr_conn if sgsn and not msc: src_entity = sgsn src_subscr_conn = sgsn_subscr_conn if src_subscr_conn is not None and msg.src_conns: for c in msg.src_conns: c.subscriber_conn = SubscriberConn.merge(c.subscriber_conn, src_subscr_conn) return Message.EntityIdent(proto='gsup', src_kind=src_kind, dst_kind='HLR', src_entity=src_entity, dst_entity=msg.dst.entity) class Layer_sip(Layer): def __init__(s, msg:Message): method = msg.p.get('sip.method') cseq_method = msg.p.get('sip.cseq_method') status_code = msg.p.get('sip.status_code') status_line = msg.p.get('sip.status_line') if status_line: if '--' in status_line: status_line = status_line[:status_line.index('--')] status_line = status_line.strip() status = status_line.split()[-1] else: status = status_code if status: msgtype = f'{cseq_method}-{status}' elif method: msgtype = method else: msgtype = cseq_method sip_from_host = msg.p.get('sip.from_host') sip_from_port = msg.p.get('sip.from_port') sip_from = IpPort.get(sip_from_host, sip_from_port) rtp_port = None ip = msg.p.get('sip.sdp_connection_info_address') port = msg.p.get('sip.sdp_media_port') if ip and port: rtp_port = IpPort.get(ip, port) server = msg.p.get('sip.Server') agent = msg.p.get('sip.User-Agent') traits = Traits( sip_agent=server or agent, sip_from=sip_from, call_id = msg.p.get('sip.call_id'), method = cseq_method, seq = msg.p.get('sip.cseq_seq'), to_msisdn = msg.p.get('sip.to_user'), from_msisdn = msg.p.get('sip.from_user'), from_tag = msg.p.get('sip.from_tag'), r_uri = msg.p.get('sip.r_uri'), status_code = status_code, rtp_port=rtp_port, ) super().__init__(msg=msg, proto='sip', msgtype=msgtype, traits=traits) def identify_entities(s, msg:Message, messages, my_idx): src_kind = 'SIP' sip_from = msg.get_trait('sip', 'sip_from') agent = msg.get_trait('sip', 'sip_agent') rename = False if agent and sip_from and sip_from == msg.src: rename = 'src' if agent.startswith('kamailio'): src_kind = 'PBX' elif agent.startswith('sofia'): src_kind = 'SIPCON' else: rename = False return Message.EntityIdent(proto='sip', src_kind=src_kind, dst_kind='SIP', rename=rename) @classmethod def identify_conns(cls, messages:list, my_idx:int): msg = messages[my_idx] msgtype = msg.get_trait('sip', 'msgtype') call_id = msg.get_trait('sip', 'call_id') sip_from = msg.get_trait('sip', 'sip_from') if msgtype in ('INVITE','INVITE-OK') and sip_from and sip_from == msg.src: conn = Conn.open('sip', msg.src, call_id, start_msg=msg) rtp_port = msg.get_trait('sip', 'rtp_port') if rtp_port is None: return rtp_conn = Conn.find('rtp', rtp_port, conn_id=rtp_port.key()) if rtp_conn is None: return if not conn.subscriber_conn: conn.merge_subscr_conns(rtp_conn) else: conn = Conn.message('sip', msg.src, call_id, msg) if conn is None: return class Layer_gsmtap_log(Layer): def __init__(s, msg:Message): app = msg.p.get('gsmtap_log.ident') level = msg.p.get('gsmtap_log.level') level_str = sane_showname(msg.p.get('gsmtap_log.level.showname')) logmsg = msg.p.get('gsmtap_log.string') cat = msg.p.get('gsmtap_log.subsys') if level_str != 'ERROR': return return msgtype = f'{app}.{level_str}' msg.log(Color.colored('red', logmsg)) traits = Traits( msgtype=msgtype, level=level, app=app, cat=cat, logmsg=logmsg, ) super().__init__(msg=msg, proto='log', msgtype=msgtype, traits=traits) def identify_entities(s, msg:Message, messages, my_idx): app = msg.get_trait('log', 'app') if app.startswith('Osmo'): app = app[4:] return Message.EntityIdent(proto='log', src_kind=app, dst_kind='LOG') class MessageFilter: def __init__(s, layer=None, idx=None, values=[], negate=False): set_instance_vars_from_args() def matches(s, msg:Message): r = s._matches(msg) if s.negate: return not r return r def _matches(s, msg:Message): if s.layer and s.layer not in msg.layers: return False if s.idx and not (msg.p.idx == s.idx or any(a.p.idx == s.idx for a in msg.absorbed)): return False if s.values: layers = s.layer or msg.layers.keys() for k,v in s.values: for proto, name, result in msg.get_traits(layers, k): if v is None and name == k: return True if result is None or result == v: return True p_layers = None if s.layer: msg_layer = msg.layers.get(s.layer) if msg_layer: p_layers = [msg_layer.cap_p_name] else: p_layers = [s.layer] else: p_layers = [layer.cap_p_name for layer in msg.layers.values()] for k,v in s.values: for p_layer in p_layers: p_val = msg.p.get(p_layer + '.' + k) if p_val is None: continue if v is None: return True if v == str(p_val): return True return False return True @classmethod def debug(cls, flt_list, msg:Message): layers = set() for flt in flt_list: if flt.negate: continue if flt.layer is None: layers.update(msg.layers.keys()) else: layers.add(flt.layer) for layer_name, layer in msg.layers.items(): if layer_name not in layers: continue LOG(dir_p(msg.p, layer.cap_p_name)) def __repr__(s): t = [] if s.negate: t.append('NOT') if s.layer: t.append(f'layer {s.layer!r}') if s.idx: t.append(f'packet nr {s.idx}') if s.values: t_v = [(f'{k} == {v!r}' if v is not None else f'has {k}') for k,v in s.values] t.append('values: ' + (' or '.join(t_v))) return ', '.join(t) word_re = re.compile('^[a-zA-Z0-9_=-]*') @classmethod def parse(cls, spec_str): if spec_str is None: return [] if not spec_str: return [MessageFilter()] filters = [] token = None try: for token in spec_str.split(','): negate = False if token.startswith('!'): token = token[1:] negate = True layer = cls.word_re.match(token).group() flt = MessageFilter(layer=layer, negate=negate) rest = cls.word_re.split(token)[1] while rest: char = rest[0] rest = rest[1:] word = cls.word_re.match(rest).group() rest = cls.word_re.split(rest)[1] if char == '#': flt.idx = int(word) elif char == '.': if not flt.values: flt.values = [] if '=' in word: name = word[:word.index('=')] val = word[word.index('=')+1:] flt.values.append((name, val)) else: flt.values.append((word, None)) else: raise Exception('Unknown token: %r' % (char + word)) filters.append(flt) return filters except: out_error('Some mistake in message filter: %r in token %r' % (spec_str, token)) raise @classmethod def match(cls, flt_list, msg): any_match = False for flt in flt_list: r = flt.matches(msg) if flt.negate and not r: return False any_match = any_match or r return any_match DOC = '''messagefilter examples, for --filter-msg and --debug: dtap All messages that contain a DTAP layer. dtap.msgtype=Identity-Request All DTAP msgtype=Identity-Request messages; for debug, show dtap layer. .msgtype=Identity-Request All msgtype=Identity-Request messages; for debug, show all layers. (Value names can be either parsed traits or raw packet names.) sccp.src_lref=0x00010000.dst_lref=0x00010000 All messages with an SCCP layer and either src_lref or dst_lref == 0x00010000. .imsi All messages where any layer contains a value named 'imsi'. '#123' Message number 123 (don't forget to quote for the shell). 'gsup,#614,sccp.src_lref=0x00010000' Show all GSUP messages, packet number 614 and all SCCP with given source local reference. '!rsl.msgtype=CCCH-LOAD-INDication' Don't show CCCH-LOAD-INDication message types (quote for the shell). ''' class UI: def __init__(s, opts, finalize_after_seconds=5): set_instance_vars_from_args() s.messages = [] s.finalized_idx = -1 s.show_traits = None if s.opts.show_traits: if s.opts.show_traits == 'all': s.show_traits = True else: s.show_traits = s.opts.show_traits.split(',') s.show_conns = None if s.opts.show_conns: if s.opts.show_conns == 'all': s.show_conns = True else: s.show_conns = s.opts.show_conns.split(',') s.filter_msg = MessageFilter.parse(s.opts.filter_msg) for flt in s.filter_msg: out_text_now('Filter-msg:', flt) s.debug = MessageFilter.parse(s.opts.debug) for dbg in s.debug: out_text_now('Debug:', dbg) s.filter_subscr = [] if s.opts.filter_subscr: tokens = s.opts.filter_subscr.split(',') names = ('imsi', '0x', 'imei', 'msisdn', 'tmsi') for token in tokens: handled = False for name in names: if token.startswith(name): token_val = token[len(name):] if not token_val.isdigit(): continue s.filter_subscr.append(name + token[len(name):]) handled = True break if not handled: s.filter_subscr.append(token) s.filter_subscr.extend([name + token for name in names]) def out_text_now(s, *args, **kwargs): 'to be implemented by child class' assert False def out_error(s, *args, **kwargs): 'to be implemented by child class' assert False def msg_finalized(s, msg, apply_filter=True): 'to be implemented by child class' assert False def flush_msg(s, msg): msg.finalized = True s.msg_finalized(msg) def msg_filter(s, msg): '''Return True when the message passes active message filtering, False if it should be hidden''' if s.filter_msg and not MessageFilter.match(s.filter_msg, msg): return False if all(l.minor for l in msg.layers.values()): return False if msg.hide: return False if s.filter_subscr: match = False match_vals = set() for subscr in msg.related_subscribers(): match_vals.update((f'imsi{subscr.imsi}', f'imei{subscr.imei}', f'msisdn{subscr.msisdn}')) match_vals.update(subscr.tmsis) match_vals.update(f'tmsi{tmsi[2:]}' for tmsi in subscr.tmsis) match_vals.update(f'tmsi{tmsi}' for tmsi in subscr.tmsis) if not any(token in match_vals for token in s.filter_subscr): return False return True def flush(s, timestamp_now=0, finalize_after_seconds=0): flush_t = timestamp_now - finalize_after_seconds for i in range(s.finalized_idx+1, len(s.messages)): msg = s.messages[i] if not msg: continue if timestamp_now and msg.timestamp > flush_t: break s.finalized_idx = i s.flush_msg(msg) def start(s): pass def stop(s): pass def add_msg(s, msg): global g_current_msg s.flush(msg.timestamp, s.finalize_after_seconds) try: if s.debug and any(dbg_filter.matches(msg) for dbg_filter in s.debug): msg.debug = True g_current_msg = msg if not msg.layers: return s.messages.append(msg) idx = len(s.messages) - 1 changed_msg = msg.collapse(s.messages, idx) # if the received message was absorbed by another, continue to identify the modified message using the # new index if changed_msg is not None and changed_msg is not msg: msg = changed_msg idx = s.messages.index(msg) msg.identify_entities(s.messages, idx) msg.identify_conns(s.messages, idx) Subscriber.identify_subscriber(msg) except: s.flush() out_error('Exception') raise def process_messages(s, msg_src): for msg in msg_src.next(): if 0 and msg.p.idx in (3676,): msg.log('rsl all_fields ', msg.p.cap_p.gsm_abis_rsl._all_fields) msg.log(msg.p.all_str('gsm_abis_rsl')) s.add_msg(msg) s.flush() msg_src.done() class UI_Plain(UI): def out_text_now(s, *args, **kwargs): print(to_text(*args, **kwargs)) def out_error(s, *args, **kwargs): s.out_text_now(Color.colored('red', '*** ERROR:'), *args, **kwargs) if g_current_msg: s.out_text_now(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(show_traits=True, show_conns=True)) s.out_text_now(trace()) def msg_print(s, msg, apply_filter=True): if apply_filter and not s.msg_filter(msg): return s.out_text_now(msg.str(ladder=True, one_column_per_kind=True, show_traits=s.show_traits, show_conns=s.show_conns)) if s.debug and MessageFilter.match(s.debug, msg): MessageFilter.debug(s.debug, msg) def msg_finalized(s, msg): s.msg_print(msg, apply_filter=True) class UI_Quiet(UI_Plain): def msg_finalized(s, msg): pass class UI_Curses(UI): pass class MsgSource: def __init__(s, opts): set_instance_vars_from_args() s.start_t = None s.p_min_t = None s.p_max_t = None s.end_t = None def _next_cap_p(s): assert False def next(s) -> Message: p_idx = 0 s.start_t = time.time() warn_t = s.start_t warn_p_t = None for cap_p in s._next_cap_p(): p_idx += 1 if p_idx < s.opts.packet_start: continue if s.opts.packet_count and (p_idx - s.opts.packet_start) > s.opts.packet_count: break if s.opts.packet_end and p_idx > s.opts.packet_end: break msg = Message.parse(Packet(p_idx, cap_p)) if msg is None or not msg.layers: continue s.p_min_t = msg.timestamp if s.p_min_t is None else min(s.p_min_t, msg.timestamp) s.p_max_t = msg.timestamp if s.p_max_t is None else max(s.p_max_t, msg.timestamp) now = time.time() if warn_p_t is None or now > warn_t + 3: if warn_p_t: packet_time = s.p_max_t - warn_p_t real_time = now - warn_t if real_time > (1.3 * packet_time): out_text_now(f'! taking longer to calculate than packets arrive by {100.*(real_time - packet_time)/packet_time:.1f}%') warn_t = now warn_p_t = s.p_max_t yield msg def done(s): if s.start_t is None or s.p_min_t is None: out_text_now('Nothing processed.') s.end_t = time.time() out_text_now(f'packet time: {s.p_max_t - s.p_min_t:.1f}s in real time: {s.end_t - s.start_t:.1f}s') class MsgSource_File(MsgSource): def __init__(s, path, opts): set_instance_vars_from_args() super().__init__(opts) def _next_cap_p(s): for cap_p in pyshark.FileCapture(s.path): yield cap_p class MsgSource_Live(MsgSource): def __init__(s, iface, opts): set_instance_vars_from_args() super().__init__(opts) def _next_cap_p(s): for cap_p in pyshark.LiveCapture(s.iface).sniff_continuously(): yield cap_p class MsgSource_Pipe(MsgSource): def __init__(s, opts): super().__init__(opts) def _next_cap_p(s): from pyshark.capture.pipe_capture import PipeCapture for cap_p in PipeCapture(sys.stdin): yield cap_p def run_tests(): def out_test(*args, **kwargs): print(*args, **kwargs) d = dddict() d.sset(('a', 'b', 'c'), 'abc') d.sset(('a', 'b', 'd'), 'abd') out_test(d) assert d == {'a': {'b': {'c': 'abc', 'd': 'abd'}}} def verify_gget(keys, expect): val = d.gget(keys) out_test('gget:', keys,'=',val) assert val == expect verify_gget(('a', 'b', 'c'), 'abc') verify_gget(('a', 'b', 'd'), 'abd') verify_gget(('a', 'b', 'x'), None) verify_gget(('a', 'b'), {'c': 'abc', 'd': 'abd'}) verify_gget(('a',), {'b': {'c': 'abc', 'd': 'abd'}}) verify_gget(('x',), None) def verify_ppop(keys, expect): try: val = d.ppop(keys) assert expect is not None except KeyError: assert expect is None out_test('ppop:', keys,'=',None) return out_test('ppop:', keys,'=',val) assert val == expect assert d.gget(keys) is None verify_ppop(('a', 'b', 'c'), 'abc') verify_ppop(('a', 'b', 'd'), 'abd') verify_ppop(('a', 'b', 'x'), None) d.sset(('a', 'b', 'c'), 'abc') d.sset(('a', 'b', 'd'), 'abd') verify_ppop(('a', 'b'), {'c': 'abc', 'd': 'abd'}) d.sset(('a', 'b', 'c'), 'abc') d.sset(('a', 'b', 'd'), 'abd') verify_ppop(('a',), {'b': {'c': 'abc', 'd': 'abd'}}) verify_ppop(('x',), None) SUBSCRIBERFILTER_DOC = '''subscriberfilter examples, for --filter-subscr: 123 Show subscriber where any value matches 123 (probably only MSISDN will match, because '123' is too short for IMSI etc). imsi123456789012345 imei123456789012345 msisdn123456789012345 tmsi1234abcd Show subscriber with the given IMSI/IMEI/MSISDN/TMSI. imsi123456,imsi987654 Show both these IMSIs. imsi123456,msisdn123,1234abcd,imei987654 Show all of these subscribers: IMSI 123456, MSISDN 123, TMSI 0x1234abcd and IMEI 987654. ''' def parse_args(): import argparse parser = argparse.ArgumentParser(description=__doc__ + '\n' + SUBSCRIBERFILTER_DOC +'\n' + MessageFilter.DOC, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('--pcap-file', '-f', metavar='file') parser.add_argument('--live-capture', '-l', metavar='interface') parser.add_argument('--stdin-capture', '-i', action='store_true') parser.add_argument('--packet-start', '-S', default=0, type=int) parser.add_argument('--packet-count', '-C', default=0, type=int) parser.add_argument('--packet-end', '-E', default=0, type=int) parser.add_argument('--ui', '-u', metavar='USER-INTERFACE', help='How to display messages: plain / p = print to stdout as plain-text;' ' curses / c = interactive curses interface; none / n = quiet', default='plain') parser.add_argument('--filter-subscr', default=None, help='Show only messages related to the given subscriberfilter') parser.add_argument('--filter-msg', default=None, metavar='messagefilter', help='Show only messages matching this messagefilter') parser.add_argument('--show-traits', default=None) parser.add_argument('--show-conns', default=None, help="'all' for all, or specific conn names (comma separated)") parser.add_argument('--collapse-stp', '-s', default=None, help="BSSAP via STP may appear duplicated. '-s1' collapses the duplicates, -s0 does not.") parser.add_argument('--test', action='store_true') parser.add_argument('--debug', metavar='messagefilter', help='Show a lot more info on messages matching this messagefilter') return parser.parse_args() def main(): opts = parse_args() if opts.test: run_tests() else: if (opts.collapse_stp or '').upper() in ['1', 'Y', 'YES', 'TRUE']: SCCP_COLLAPSE_STP = True if (opts.collapse_stp or '').upper() in ['0', 'N', 'NO', 'FALSE']: SCCP_COLLAPSE_STP = False ui_class = None ui_type = opts.ui or 'none' if 'plain'.startswith(ui_type): ui_class = UI_Plain elif 'curses'.startswith(ui_type): ui_class = UI_Curses elif 'none'.startswith(ui_type): ui_class = UI_Quiet else: ERR('Unknown UI type:', repr(ui_type)) return 1 g_ui = ui_class(opts) msg_source = None if opts.pcap_file: msg_source = MsgSource_File(opts.pcap_file, opts) elif opts.live_capture: msg_source = MsgSource_Live(opts.live_capture, opts) elif opts.stdin_capture: msg_source = MsgSource_Pipe(opts) else: ERR('No message source, try `-l any` or `-f my.pcap`') return 1 g_ui.process_messages(msg_source) g_ui.flush() if Conn.open_conns: print('still open conns:', repr(Conn.open_conns)) return 0 if __name__ == '__main__': if False: import cProfile cProfile.run('main()', sort='tottime') else: exit(main()) # vim: noexpandtab tabstop=8 shiftwidth=8