#!/usr/bin/env python2 # -*- coding: utf-8 -*- ################################################## # GNU Radio Python Flow Graph # Title: Gr-gsm Livemon # Author: Piotr Krysik # Description: Interactive monitor of a single C0 channel with analysis performed by Wireshark (command to run wireshark: sudo wireshark -k -f udp -Y gsmtap -i lo) # Generated: Mon Jul 18 18:08:34 2016 ################################################## if __name__ == '__main__': import ctypes import sys if sys.platform.startswith('linux'): try: x11 = ctypes.cdll.LoadLibrary('libX11.so') x11.XInitThreads() except: print "Warning: failed to XInitThreads()" from PyQt4 import Qt from gnuradio import blocks from gnuradio import eng_notation from gnuradio import gr from gnuradio import qtgui from gnuradio.eng_option import eng_option from gnuradio.filter import firdes from gnuradio.qtgui import Range, RangeWidget from math import pi from optparse import OptionParser import grgsm import osmosdr import pmt import sip import sys import time class grgsm_livemon(gr.top_block, Qt.QWidget): def __init__(self, args="", fc=939.4e6, gain=30, ppm=0, samp_rate=2000000.052982, shiftoff=400e3, osr=4): gr.top_block.__init__(self, "Gr-gsm Livemon") Qt.QWidget.__init__(self) self.setWindowTitle("Gr-gsm Livemon") try: self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) except: pass self.top_scroll_layout = Qt.QVBoxLayout() self.setLayout(self.top_scroll_layout) self.top_scroll = Qt.QScrollArea() self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) self.top_scroll_layout.addWidget(self.top_scroll) self.top_scroll.setWidgetResizable(True) self.top_widget = Qt.QWidget() self.top_scroll.setWidget(self.top_widget) self.top_layout = Qt.QVBoxLayout(self.top_widget) self.top_grid_layout = Qt.QGridLayout() self.top_layout.addLayout(self.top_grid_layout) self.settings = Qt.QSettings("GNU Radio", "grgsm_livemon") self.restoreGeometry(self.settings.value("geometry").toByteArray()) ################################################## # Parameters ################################################## self.args = args self.fc = fc self.gain = gain self.ppm = ppm self.samp_rate = samp_rate self.shiftoff = shiftoff self.osr = osr ################################################## # Variables ################################################## self.ppm_slider = ppm_slider = ppm self.g_slider = g_slider = gain self.fc_slider = fc_slider = fc ################################################## # Blocks ################################################## self._ppm_slider_range = Range(-150, 150, 1, ppm, 100) self._ppm_slider_win = RangeWidget(self._ppm_slider_range, self.set_ppm_slider, "PPM Offset", "counter", float) self.top_layout.addWidget(self._ppm_slider_win) self._g_slider_range = Range(0, 50, 0.5, gain, 100) self._g_slider_win = RangeWidget(self._g_slider_range, self.set_g_slider, "Gain", "counter", float) self.top_layout.addWidget(self._g_slider_win) self._fc_slider_range = Range(925e6, 1990e6, 2e5, fc, 100) self._fc_slider_win = RangeWidget(self._fc_slider_range, self.set_fc_slider, "Frequency", "counter_slider", float) self.top_layout.addWidget(self._fc_slider_win) self.rtlsdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + args ) self.rtlsdr_source_0.set_sample_rate(samp_rate) self.rtlsdr_source_0.set_center_freq(fc_slider-shiftoff, 0) self.rtlsdr_source_0.set_freq_corr(ppm_slider, 0) self.rtlsdr_source_0.set_dc_offset_mode(2, 0) self.rtlsdr_source_0.set_iq_balance_mode(2, 0) self.rtlsdr_source_0.set_gain_mode(False, 0) self.rtlsdr_source_0.set_gain(g_slider, 0) self.rtlsdr_source_0.set_if_gain(20, 0) self.rtlsdr_source_0.set_bb_gain(20, 0) self.rtlsdr_source_0.set_antenna("", 0) self.rtlsdr_source_0.set_bandwidth(250e3+abs(shiftoff), 0) self.qtgui_freq_sink_x_0 = qtgui.freq_sink_c( 1024, #size firdes.WIN_BLACKMAN_hARRIS, #wintype fc_slider, #fc samp_rate, #bw "", #name 1 #number of inputs ) self.qtgui_freq_sink_x_0.set_update_time(0.10) self.qtgui_freq_sink_x_0.set_y_axis(-140, 10) self.qtgui_freq_sink_x_0.set_trigger_mode(qtgui.TRIG_MODE_FREE, 0.0, 0, "") self.qtgui_freq_sink_x_0.enable_autoscale(False) self.qtgui_freq_sink_x_0.enable_grid(False) self.qtgui_freq_sink_x_0.set_fft_average(1.0) self.qtgui_freq_sink_x_0.enable_control_panel(False) if not True: self.qtgui_freq_sink_x_0.disable_legend() if "complex" == "float" or "complex" == "msg_float": self.qtgui_freq_sink_x_0.set_plot_pos_half(not True) labels = ["", "", "", "", "", "", "", "", "", ""] widths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] colors = ["blue", "red", "green", "black", "cyan", "magenta", "yellow", "dark red", "dark green", "dark blue"] alphas = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] for i in xrange(1): if len(labels[i]) == 0: self.qtgui_freq_sink_x_0.set_line_label(i, "Data {0}".format(i)) else: self.qtgui_freq_sink_x_0.set_line_label(i, labels[i]) self.qtgui_freq_sink_x_0.set_line_width(i, widths[i]) self.qtgui_freq_sink_x_0.set_line_color(i, colors[i]) self.qtgui_freq_sink_x_0.set_line_alpha(i, alphas[i]) self._qtgui_freq_sink_x_0_win = sip.wrapinstance(self.qtgui_freq_sink_x_0.pyqwidget(), Qt.QWidget) self.top_layout.addWidget(self._qtgui_freq_sink_x_0_win) self.gsm_sdcch8_demapper_0 = grgsm.gsm_sdcch8_demapper( timeslot_nr=1, ) self.gsm_receiver_0 = grgsm.receiver(4, ([0]), ([]), False) self.gsm_message_printer_1 = grgsm.message_printer(pmt.intern(""), False, False, False) self.gsm_input_0 = grgsm.gsm_input( ppm=0, osr=4, fc=fc, samp_rate_in=samp_rate, ) self.gsm_decryption_0 = grgsm.decryption(([]), 1) self.gsm_control_channels_decoder_0_0 = grgsm.control_channels_decoder() self.gsm_control_channels_decoder_0 = grgsm.control_channels_decoder() self.gsm_clock_offset_control_0 = grgsm.clock_offset_control(fc-shiftoff, samp_rate, osr) self.gsm_bcch_ccch_demapper_0 = grgsm.gsm_bcch_ccch_demapper( timeslot_nr=0, ) self.blocks_socket_pdu_0_0 = blocks.socket_pdu("UDP_SERVER", "127.0.0.1", "4729", 10000, False) self.blocks_socket_pdu_0 = blocks.socket_pdu("UDP_CLIENT", "127.0.0.1", "4729", 10000, False) self.blocks_rotator_cc_0 = blocks.rotator_cc(-2*pi*shiftoff/samp_rate) ################################################## # Connections ################################################## self.msg_connect((self.gsm_bcch_ccch_demapper_0, 'bursts'), (self.gsm_control_channels_decoder_0, 'bursts')) self.msg_connect((self.gsm_clock_offset_control_0, 'ctrl'), (self.gsm_input_0, 'ctrl_in')) self.msg_connect((self.gsm_control_channels_decoder_0, 'msgs'), (self.blocks_socket_pdu_0, 'pdus')) self.msg_connect((self.gsm_control_channels_decoder_0, 'msgs'), (self.gsm_message_printer_1, 'msgs')) self.msg_connect((self.gsm_control_channels_decoder_0_0, 'msgs'), (self.blocks_socket_pdu_0, 'pdus')) self.msg_connect((self.gsm_control_channels_decoder_0_0, 'msgs'), (self.gsm_message_printer_1, 'msgs')) self.msg_connect((self.gsm_decryption_0, 'bursts'), (self.gsm_control_channels_decoder_0_0, 'bursts')) self.msg_connect((self.gsm_receiver_0, 'C0'), (self.gsm_bcch_ccch_demapper_0, 'bursts')) self.msg_connect((self.gsm_receiver_0, 'measurements'), (self.gsm_clock_offset_control_0, 'measurements')) self.msg_connect((self.gsm_receiver_0, 'C0'), (self.gsm_sdcch8_demapper_0, 'bursts')) self.msg_connect((self.gsm_sdcch8_demapper_0, 'bursts'), (self.gsm_decryption_0, 'bursts')) self.connect((self.blocks_rotator_cc_0, 0), (self.gsm_input_0, 0)) self.connect((self.blocks_rotator_cc_0, 0), (self.qtgui_freq_sink_x_0, 0)) self.connect((self.gsm_input_0, 0), (self.gsm_receiver_0, 0)) self.connect((self.rtlsdr_source_0, 0), (self.blocks_rotator_cc_0, 0)) def closeEvent(self, event): self.settings = Qt.QSettings("GNU Radio", "grgsm_livemon") self.settings.setValue("geometry", self.saveGeometry()) event.accept() def get_args(self): return self.args def set_args(self, args): self.args = args def get_fc(self): return self.fc def set_fc(self, fc): self.fc = fc self.set_fc_slider(self.fc) self.gsm_input_0.set_fc(self.fc) def get_gain(self): return self.gain def set_gain(self, gain): self.gain = gain self.set_g_slider(self.gain) def get_ppm(self): return self.ppm def set_ppm(self, ppm): self.ppm = ppm self.set_ppm_slider(self.ppm) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.gsm_input_0.set_samp_rate_in(self.samp_rate) self.qtgui_freq_sink_x_0.set_frequency_range(self.fc_slider, self.samp_rate) self.rtlsdr_source_0.set_sample_rate(self.samp_rate) self.blocks_rotator_cc_0.set_phase_inc(-2*pi*self.shiftoff/self.samp_rate) def get_shiftoff(self): return self.shiftoff def set_shiftoff(self, shiftoff): self.shiftoff = shiftoff self.rtlsdr_source_0.set_center_freq(self.fc_slider-self.shiftoff, 0) self.rtlsdr_source_0.set_bandwidth(250e3+abs(self.shiftoff), 0) self.blocks_rotator_cc_0.set_phase_inc(-2*pi*self.shiftoff/self.samp_rate) def get_osr(self): return self.osr def set_osr(self, osr): self.osr = osr def get_ppm_slider(self): return self.ppm_slider def set_ppm_slider(self, ppm_slider): self.ppm_slider = ppm_slider self.rtlsdr_source_0.set_freq_corr(self.ppm_slider, 0) def get_g_slider(self): return self.g_slider def set_g_slider(self, g_slider): self.g_slider = g_slider self.rtlsdr_source_0.set_gain(self.g_slider, 0) def get_fc_slider(self): return self.fc_slider def set_fc_slider(self, fc_slider): self.fc_slider = fc_slider self.qtgui_freq_sink_x_0.set_frequency_range(self.fc_slider, self.samp_rate) self.rtlsdr_source_0.set_center_freq(self.fc_slider-self.shiftoff, 0) def argument_parser(): parser = OptionParser(option_class=eng_option, usage="%prog: [options]") parser.add_option( "", "--args", dest="args", type="string", default="", help="Set Device Arguments [default=%default]") parser.add_option( "-f", "--fc", dest="fc", type="eng_float", default=eng_notation.num_to_str(939.4e6), help="Set fc [default=%default]") parser.add_option( "-g", "--gain", dest="gain", type="eng_float", default=eng_notation.num_to_str(30), help="Set gain [default=%default]") parser.add_option( "-p", "--ppm", dest="ppm", type="intx", default=0, help="Set ppm [default=%default]") parser.add_option( "-s", "--samp-rate", dest="samp_rate", type="eng_float", default=eng_notation.num_to_str(2000000.052982), help="Set samp_rate [default=%default]") parser.add_option( "-o", "--shiftoff", dest="shiftoff", type="eng_float", default=eng_notation.num_to_str(400e3), help="Set shiftoff [default=%default]") parser.add_option( "", "--osr", dest="osr", type="intx", default=4, help="Set OSR [default=%default]") return parser def main(top_block_cls=grgsm_livemon, options=None): if options is None: options, _ = argument_parser().parse_args() from distutils.version import StrictVersion if StrictVersion(Qt.qVersion()) >= StrictVersion("4.5.0"): style = gr.prefs().get_string('qtgui', 'style', 'raster') Qt.QApplication.setGraphicsSystem(style) qapp = Qt.QApplication(sys.argv) tb = top_block_cls(args=options.args, fc=options.fc, gain=options.gain, ppm=options.ppm, samp_rate=options.samp_rate, shiftoff=options.shiftoff, osr=options.osr) tb.start() tb.show() def quitting(): tb.stop() tb.wait() qapp.connect(qapp, Qt.SIGNAL("aboutToQuit()"), quitting) qapp.exec_() if __name__ == '__main__': main()