#!/usr/bin/env python2 # -*- coding: utf-8 -*- # @file # @author (C) 2014-2016 by Piotr Krysik # @section LICENSE # # Gr-gsm is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # Gr-gsm is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with gr-gsm; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # # ################################################## # GNU Radio Python Flow Graph # Title: Gr-gsm Livemon # Author: Piotr Krysik # Description: Interactive monitor of a single C0 channel with analysis performed by Wireshark (command to run wireshark: sudo wireshark -k -f udp -Y gsmtap -i lo) # Generated: Sat Aug 26 12:54:55 2017 ################################################## if __name__ == '__main__': import ctypes import sys if sys.platform.startswith('linux'): try: x11 = ctypes.cdll.LoadLibrary('libX11.so') x11.XInitThreads() except: print "Warning: failed to XInitThreads()" from PyQt4 import Qt from gnuradio import blocks from gnuradio import eng_notation from gnuradio import gr from gnuradio import qtgui from gnuradio.eng_option import eng_option from gnuradio.filter import firdes from gnuradio.qtgui import Range, RangeWidget from math import pi from optparse import OptionParser import grgsm import osmosdr import pmt import sip import sys import time class grgsm_livemon(gr.top_block, Qt.QWidget): def __init__(self, args="", fc=941.8e6, gain=30, osr=4, ppm=0, samp_rate=2000000.052982, shiftoff=400e3, serverport="4729"): gr.top_block.__init__(self, "Gr-gsm Livemon") Qt.QWidget.__init__(self) self.setWindowTitle("Gr-gsm Livemon") try: self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) except: pass self.top_scroll_layout = Qt.QVBoxLayout() self.setLayout(self.top_scroll_layout) self.top_scroll = Qt.QScrollArea() self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) self.top_scroll_layout.addWidget(self.top_scroll) self.top_scroll.setWidgetResizable(True) self.top_widget = Qt.QWidget() self.top_scroll.setWidget(self.top_widget) self.top_layout = Qt.QVBoxLayout(self.top_widget) self.top_grid_layout = Qt.QGridLayout() self.top_layout.addLayout(self.top_grid_layout) self.settings = Qt.QSettings("GNU Radio", "grgsm_livemon") self.restoreGeometry(self.settings.value("geometry").toByteArray()) ################################################## # Parameters ################################################## self.args = args self.fc = fc self.gain = gain self.osr = osr self.ppm = ppm self.samp_rate = samp_rate self.shiftoff = shiftoff self.serverport = serverport ################################################## # Variables ################################################## self.ppm_slider = ppm_slider = ppm self.g_slider = g_slider = gain self.fc_slider = fc_slider = fc ################################################## # Blocks ################################################## self._ppm_slider_range = Range(-150, 150, 0.1, ppm, 100) self._ppm_slider_win = RangeWidget(self._ppm_slider_range, self.set_ppm_slider, "PPM Offset", "counter", float) self.top_layout.addWidget(self._ppm_slider_win) self._g_slider_range = Range(0, 50, 0.5, gain, 100) self._g_slider_win = RangeWidget(self._g_slider_range, self.set_g_slider, "Gain", "counter", float) self.top_layout.addWidget(self._g_slider_win) self._fc_slider_range = Range(800e6, 1990e6, 2e5, fc, 100) self._fc_slider_win = RangeWidget(self._fc_slider_range, self.set_fc_slider, "Frequency", "counter_slider", float) self.top_layout.addWidget(self._fc_slider_win) self.rtlsdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + args ) self.rtlsdr_source_0.set_sample_rate(samp_rate) self.rtlsdr_source_0.set_center_freq(fc_slider-shiftoff, 0) self.rtlsdr_source_0.set_freq_corr(ppm_slider, 0) self.rtlsdr_source_0.set_dc_offset_mode(2, 0) self.rtlsdr_source_0.set_iq_balance_mode(2, 0) self.rtlsdr_source_0.set_gain_mode(False, 0) self.rtlsdr_source_0.set_gain(g_slider, 0) self.rtlsdr_source_0.set_if_gain(20, 0) self.rtlsdr_source_0.set_bb_gain(20, 0) self.rtlsdr_source_0.set_antenna("", 0) self.rtlsdr_source_0.set_bandwidth(250e3+abs(shiftoff), 0) self.qtgui_freq_sink_x_0 = qtgui.freq_sink_c( 1024, #size firdes.WIN_BLACKMAN_hARRIS, #wintype fc_slider, #fc samp_rate, #bw "", #name 1 #number of inputs ) self.qtgui_freq_sink_x_0.set_update_time(0.10) self.qtgui_freq_sink_x_0.set_y_axis(-140, 10) self.qtgui_freq_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, 0.0, 0, "") self.qtgui_freq_sink_x_0.enable_autoscale(False) self.qtgui_freq_sink_x_0.enable_grid(False) self.qtgui_freq_sink_x_0.set_fft_average(1.0) self.qtgui_freq_sink_x_0.enable_control_panel(False) if not True: self.qtgui_freq_sink_x_0.disable_legend() if "complex" == "float" or "complex" == "msg_float": self.qtgui_freq_sink_x_0.set_plot_pos_half(not True) labels = ["", "", "", "", "", "", "", "", "", ""] widths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] colors = ["blue", "red", "green", "black", "cyan", "magenta", "yellow", "dark red", "dark green", "dark blue"] alphas = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] for i in xrange(1): if len(labels[i]) == 0: self.qtgui_freq_sink_x_0.set_line_label(i, "Data {0}".format(i)) else: self.qtgui_freq_sink_x_0.set_line_label(i, labels[i]) self.qtgui_freq_sink_x_0.set_line_width(i, widths[i]) self.qtgui_freq_sink_x_0.set_line_color(i, colors[i]) self.qtgui_freq_sink_x_0.set_line_alpha(i, alphas[i]) self._qtgui_freq_sink_x_0_win = sip.wrapinstance(self.qtgui_freq_sink_x_0.pyqwidget(), Qt.QWidget) self.top_layout.addWidget(self._qtgui_freq_sink_x_0_win) self.gsm_sdcch8_demapper_0 = grgsm.gsm_sdcch8_demapper( timeslot_nr=1, ) self.gsm_receiver_0 = grgsm.receiver(4, ([0]), ([]), False) self.gsm_message_printer_1 = grgsm.message_printer(pmt.intern(""), False, False, False) self.gsm_input_0 = grgsm.gsm_input( ppm=ppm-int(ppm), osr=4, fc=fc, samp_rate_in=samp_rate, ) self.gsm_decryption_0 = grgsm.decryption(([]), 1) self.gsm_control_channels_decoder_0_0 = grgsm.control_channels_decoder() self.gsm_control_channels_decoder_0 = grgsm.control_channels_decoder() self.gsm_clock_offset_control_0 = grgsm.clock_offset_control(fc-shiftoff, samp_rate, osr) self.gsm_bcch_ccch_sdcch4_demapper_0 = grgsm.gsm_bcch_ccch_sdcch4_demapper( timeslot_nr=0, ) self.blocks_socket_pdu_0_0 = blocks.socket_pdu("UDP_SERVER", "127.0.0.1", serverport, 10000, False) self.blocks_socket_pdu_0 = blocks.socket_pdu("UDP_CLIENT", "127.0.0.1", "4729", 10000, False) self.blocks_rotator_cc_0 = blocks.rotator_cc(-2*pi*shiftoff/samp_rate) ################################################## # Connections ################################################## self.msg_connect((self.blocks_socket_pdu_0_0, 'pdus'), (self.gsm_message_printer_1, 'msgs')) self.msg_connect((self.gsm_bcch_ccch_sdcch4_demapper_0, 'bursts'), (self.gsm_control_channels_decoder_0, 'bursts')) self.msg_connect((self.gsm_clock_offset_control_0, 'ctrl'), (self.gsm_input_0, 'ctrl_in')) self.msg_connect((self.gsm_control_channels_decoder_0, 'msgs'), (self.blocks_socket_pdu_0, 'pdus')) self.msg_connect((self.gsm_control_channels_decoder_0_0, 'msgs'), (self.blocks_socket_pdu_0, 'pdus')) self.msg_connect((self.gsm_decryption_0, 'bursts'), (self.gsm_control_channels_decoder_0_0, 'bursts')) self.msg_connect((self.gsm_receiver_0, 'C0'), (self.gsm_bcch_ccch_sdcch4_demapper_0, 'bursts')) self.msg_connect((self.gsm_receiver_0, 'measurements'), (self.gsm_clock_offset_control_0, 'measurements')) self.msg_connect((self.gsm_receiver_0, 'C0'), (self.gsm_sdcch8_demapper_0, 'bursts')) self.msg_connect((self.gsm_sdcch8_demapper_0, 'bursts'), (self.gsm_decryption_0, 'bursts')) self.connect((self.blocks_rotator_cc_0, 0), (self.gsm_input_0, 0)) self.connect((self.blocks_rotator_cc_0, 0), (self.qtgui_freq_sink_x_0, 0)) self.connect((self.gsm_input_0, 0), (self.gsm_receiver_0, 0)) self.connect((self.rtlsdr_source_0, 0), (self.blocks_rotator_cc_0, 0)) def closeEvent(self, event): self.settings = Qt.QSettings("GNU Radio", "grgsm_livemon") self.settings.setValue("geometry", self.saveGeometry()) event.accept() def get_args(self): return self.args def set_args(self, args): self.args = args def get_fc(self): return self.fc def set_fc(self, fc): self.fc = fc self.set_fc_slider(self.fc) self.gsm_input_0.set_fc(self.fc) def get_gain(self): return self.gain def set_gain(self, gain): self.gain = gain self.set_g_slider(self.gain) def get_osr(self): return self.osr def set_osr(self, osr): self.osr = osr def get_ppm(self): return self.ppm def set_ppm(self, ppm): self.ppm = ppm self.set_ppm_slider(self.ppm) self.gsm_input_0.set_ppm(self.ppm-int(self.ppm)) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.blocks_rotator_cc_0.set_phase_inc(-2*pi*self.shiftoff/self.samp_rate) self.gsm_input_0.set_samp_rate_in(self.samp_rate) self.qtgui_freq_sink_x_0.set_frequency_range(self.fc_slider, self.samp_rate) self.rtlsdr_source_0.set_sample_rate(self.samp_rate) def get_shiftoff(self): return self.shiftoff def set_shiftoff(self, shiftoff): self.shiftoff = shiftoff self.blocks_rotator_cc_0.set_phase_inc(-2*pi*self.shiftoff/self.samp_rate) self.rtlsdr_source_0.set_center_freq(self.fc_slider-self.shiftoff, 0) self.rtlsdr_source_0.set_bandwidth(250e3+abs(self.shiftoff), 0) def get_serverport(self): return self.serverport def set_serverport(self, serverport): self.serverport = serverport def get_ppm_slider(self): return self.ppm_slider def set_ppm_slider(self, ppm_slider): self.ppm_slider = ppm_slider self.rtlsdr_source_0.set_freq_corr(self.ppm_slider, 0) def get_g_slider(self): return self.g_slider def set_g_slider(self, g_slider): self.g_slider = g_slider self.rtlsdr_source_0.set_gain(self.g_slider, 0) def get_fc_slider(self): return self.fc_slider def set_fc_slider(self, fc_slider): self.fc_slider = fc_slider self.qtgui_freq_sink_x_0.set_frequency_range(self.fc_slider, self.samp_rate) self.rtlsdr_source_0.set_center_freq(self.fc_slider-self.shiftoff, 0) def argument_parser(): parser = OptionParser(option_class=eng_option, usage="%prog: [options]") parser.add_option( "", "--args", dest="args", type="string", default="", help="Set Device Arguments [default=%default]") parser.add_option( "-f", "--fc", dest="fc", type="eng_float", default=eng_notation.num_to_str(941.8e6), help="Set GSM channel's central frequency [default=%default]") parser.add_option( "-g", "--gain", dest="gain", type="eng_float", default=eng_notation.num_to_str(30), help="Set gain [default=%default]") parser.add_option( "", "--osr", dest="osr", type="intx", default=4, help="Set OverSampling Ratio [default=%default]") parser.add_option( "-p", "--ppm", dest="ppm", type="eng_float", default=eng_notation.num_to_str(0), help="Set ppm [default=%default]") parser.add_option( "-s", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2000000.052982), help="Set samp_rate [default=%default]") parser.add_option( "-o", "--shiftoff", dest="shiftoff", type="eng_float", default=eng_notation.num_to_str(400e3), help="Set Frequency Shiftoff [default=%default]") parser.add_option( "", "--serverport", dest="serverport", type="string", default="4729", help="Set UDP server listening port [default=%default]") return parser def main(top_block_cls=grgsm_livemon, options=None): if options is None: options, _ = argument_parser().parse_args() from distutils.version import StrictVersion if StrictVersion(Qt.qVersion()) >= StrictVersion("4.5.0"): style = gr.prefs().get_string('qtgui', 'style', 'raster') Qt.QApplication.setGraphicsSystem(style) qapp = Qt.QApplication(sys.argv) tb = top_block_cls(args=options.args, fc=options.fc, gain=options.gain, osr=options.osr, ppm=options.ppm, samp_rate=options.samp_rate, shiftoff=options.shiftoff, serverport=options.serverport) tb.start() tb.show() def quitting(): tb.stop() tb.wait() qapp.connect(qapp, Qt.SIGNAL("aboutToQuit()"), quitting) qapp.exec_() if __name__ == '__main__': main()