From 995769341f9d834bbe03a61552530e2cc5fea4c6 Mon Sep 17 00:00:00 2001 From: ha7ilm Date: Thu, 3 Dec 2015 11:21:29 +0100 Subject: [PATCH] ddcd is now supported. --- config_rtl.py | 103 ------- config_webrx.py | 26 +- openwebrx.py | 48 ++-- plugins/dsp/csdr/plugin.py | 23 +- rtl_mus.py | 539 ------------------------------------- 5 files changed, 57 insertions(+), 682 deletions(-) delete mode 100644 config_rtl.py delete mode 100644 rtl_mus.py diff --git a/config_rtl.py b/config_rtl.py deleted file mode 100644 index d31d9518..00000000 --- a/config_rtl.py +++ /dev/null @@ -1,103 +0,0 @@ -''' - This file is part of RTL Multi-User Server, - that makes multi-user access to your DVB-T dongle used as an SDR. - Copyright (c) 2013-2015 by Andras Retzler - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . - - ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - - In addition, as a special exception, the copyright holders - state that config_rtl.py and config_webrx.py are not part of the - Corresponding Source defined in GNU AGPL version 3 section 1. - - (It means that you do not have to redistribute config_rtl.py and - config_webrx.py if you make any changes to these two configuration files, - and use them for running your own web service with OpenWebRX.) -''' - -my_ip='127.0.0.1' # leave blank for listening on all interfaces -my_listening_port = 4951 - -rtl_tcp_host,rtl_tcp_port='localhost',8888 - -send_first="" -#send_first=chr(9)+chr(0)+chr(0)+chr(0)+chr(1) # set direct sampling - -setuid_on_start = 0 # we normally start with root privileges and setuid() to another user -uid = 999 # determine by issuing: $ id -u username -ignore_clients_without_commands = 1 # we won't serve data to telnet sessions and things like that - # we'll start to serve data after getting the first valid command - -freq_allowed_ranges = [[0,2200000000]] - -client_cant_set_until=0 -first_client_can_set=True # openwebrx - spectrum thread will set things on start # no good, clients set parameters and things -buffer_size=25000000 # per client -log_file_path = "/dev/null" # Might be set to /dev/null to turn off logging - -''' -Allow any host to connect: - use_ip_access_control=0 - -Allow from specific ranges: - use_ip_access_control=1 - order_allow_deny=0 # deny and then allow - denied_ip_ranges=() # deny from all - allowed_ip_ranges=('192.168.','44.','127.0.0.1') # allow only from ... - -Deny from specific ranges: - use_ip_access_control=1 - order_allow_deny=0 # allow and then deny - allowed_ip_ranges=() # allow from all - denied_ip_ranges=('192.168.') # deny any hosts from ... -''' -use_ip_access_control=1 #You may want to open up the I/Q server to the public, then set this to zero. -order_allow_deny=0 -denied_ip_ranges=() # deny from all -allowed_ip_ranges=('127.0.0.1') # allow only local connections (from openwebrx). -allow_gain_set=1 - -use_dsp_command=False # you can process raw I/Q data with a custom command that starts a process that we can pipe the data into, and also pipe out of. -debug_dsp_command=False # show sample rate before and after the dsp command -dsp_command="" - -''' -Example DSP commands: - * Compress I/Q data with FLAC: - flac --force-raw-format --channels 2 --sample-rate=250000 --sign=unsigned --bps=8 --endian=little -o - - - * Decompress FLAC-coded I/Q data: - flac --force-raw-format --decode --endian=little --sign=unsigned - - -''' -watchdog_interval=0 -reconnect_interval=10 -''' -If there's no input I/Q data after N seconds, input will be filled with zero samples, -so that GNU Radio won't fail in OpenWebRX. It may reconnect rtl_tcp_thread. -If watchdog_interval is 0, then watchdog thread is not started. - -''' -cache_full_behaviour=2 -''' - 0 = drop samples - 1 = close client - 2 = openwebrx: don't care about that client until it wants samples again (gr-osmosdr bug workaround) -''' - -rtl_tcp_password=None -''' -This one applies to a special version of rtl_tcp that has authentication. -# You can find more info here: https://github.com/ha7ilm/rtl-sdr -# If it is set to a string (e.g. rtl_tcp_password="changeme"), rtl_mus will try to authenticate against the rtl_tcp server. -''' diff --git a/config_webrx.py b/config_webrx.py index faea1c05..9c394451 100644 --- a/config_webrx.py +++ b/config_webrx.py @@ -76,38 +76,28 @@ ppm = 0 audio_compression="adpcm" #valid values: "adpcm", "none" fft_compression="adpcm" #valid values: "adpcm", "none" -start_rtl_thread=True - # ==== I/Q sources (uncomment the appropriate) ==== # >> RTL-SDR via rtl_sdr iq_server_port = 4951 -start_rtl_command="rtl_sdr -s {samp_rate} -f {center_freq} -p {ppm} -g {rf_gain} - | ddcd -p{iq_server_port} -d3".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate, ppm=ppm, iq_server_port=iq_server_port) -format_conversion="csdr convert_u8_f" +receiver_commandline="rtl_sdr -s {samp_rate} -f {center_freq} -p {ppm} -g {rf_gain} - | csdr convert_u8_f".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate, ppm=ppm) -#start_rtl_command="hackrf_transfer -s {samp_rate} -f {center_freq} -g {rf_gain} -l16 -a0 -r hackrf_pipe & cat hackrf_pipe | nc -vvl 127.0.0.1 8888".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate, ppm=ppm) -#format_conversion="csdr convert_i8_f" -## To use a HackRF, first run "mkfifo hackrf_pipe" in the OpenWebRX directory. -## You should also use the csdr git repo from here: -## git clone https://github.com/sgentle/csdr -## git checkout origin/signed_char +# >> HackRF +#receiver_commandline="hackrf_transfer -s {samp_rate} -f {center_freq} -g {rf_gain} -l16 -a0 -r hackrf_pipe & cat hackrf_pipe | csdr convert_s8_f".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate, ppm=ppm) # >> Sound card SDR (needs ALSA) #I did not have the chance to properly test it. #samp_rate = 96000 -#start_rtl_command="arecord -f S16_LE -r {samp_rate} -c2 - | nc -vvl 127.0.0.1 8888".format(samp_rate=samp_rate) -#format_conversion="csdr convert_i16_f | csdr gain_ff 30" +#receiver_commandline="arecord -f S16_LE -r {samp_rate} -c2 - | csdr convert_i16_f | csdr gain_ff 30".format(samp_rate=samp_rate) -# >> RTL_SDR via rtl_tcp -#start_rtl_command="rtl_tcp -s {samp_rate} -f {center_freq} -g {rf_gain} -P {ppm} -p 8888".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate, ppm=ppm) -#format_conversion="csdr convert_u8_f" +# >> remote rtl_tcp server +#receiver_commandline="nc remote_address 1234 | csdr convert_u8_f" # >> /dev/urandom test signal source #samp_rate = 2400000 -#start_rtl_command="cat /dev/urandom | (pv -qL `python -c 'print int({samp_rate} * 2.2)'` 2>&1) | nc -vvl 127.0.0.1 8888".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate) -#format_conversion="csdr convert_u8_f" +#receiver_commandline="cat /dev/urandom | (pv -qL `python -c 'print int({samp_rate} * 2.2)'` 2>&1) | nc -vvl 127.0.0.1 8888 | csdr convert_u8_f".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate) #You can use other SDR hardware as well, by giving your own command that outputs the I/Q samples... @@ -121,4 +111,6 @@ client_audio_buffer_size = 5 start_freq = center_freq start_mod = "nfm" #nfm, am, lsb, usb, cw +ddc_method = "td" + diff --git a/openwebrx.py b/openwebrx.py index 5fa9bf12..428236e4 100755 --- a/openwebrx.py +++ b/openwebrx.py @@ -44,7 +44,6 @@ from collections import namedtuple import Queue import ctypes -#import rtl_mus import rxws import uuid import signal @@ -73,19 +72,25 @@ def import_all_plugins(directory): class MultiThreadHTTPServer(ThreadingMixIn, HTTPServer): pass -def handle_signal(signal, frame): - global spectrum_dsp, rtl_command_proc +def handle_signal(mysignal, frame): + global spectrum_dsp, receiver_proc print "[openwebrx] Ctrl+C: aborting." - cleanup_clients(True) - spectrum_dsp.stop() - if rtl_command_proc: rtl_command_proc.kill() + try: + cleanup_clients(True) + except: + print "[openwebrx] cleanup_clients failed" + try: + spectrum_dsp.stop() + except: + print "[openwebrx] spectrum_dsp.stop() failed" + if receiver_proc: os.killpg(os.getpgid(receiver_proc.pid), signal.SIGTERM) os._exit(1) #not too graceful exit -rtl_command_proc=rtl_thread=spectrum_dsp=server_fail=None +receiver_proc=rtl_thread=spectrum_dsp=server_fail=None def main(): global clients, clients_mutex, pypy, lock_try_time, avatar_ctime, cfg - global serverfail, rtl_thread, rtl_command_proc + global serverfail, rtl_thread, receiver_proc print print "OpenWebRX - Open Source SDR Web App for Everyone! | for license see LICENSE file in the package" print "_________________________________________________________________________________________________" @@ -117,13 +122,25 @@ def main(): pass #Start rtl thread - if cfg.start_rtl_thread: - rtl_command_proc=subprocess.Popen(cfg.start_rtl_command, shell=True) - #rtl_thread=threading.Thread(target = lambda:subprocess.Popen(cfg.start_rtl_command, shell=True), args=()) - #rtl_thread.start() - print "[openwebrx-main] Started rtl_thread: "+cfg.start_rtl_command + test_dsp=getattr(plugins.dsp,cfg.dsp_plugin).plugin.dsp_plugin() + test_dsp.set_samp_rate(cfg.samp_rate) - """ #Now we use ddcd instead of this. + #cfg.receiver_commandline += "| ddcd -d{decimation} -t{transition_bw} -a127.0.0.1 -p4951 -m{method} 2>&1 | tee /dev/fd/2 | (head -n 10; cat >/dev/null)".format(decimation=test_dsp.decimation, transition_bw=test_dsp.ddc_transition_bw(), method=cfg.ddc_method) + cfg.receiver_commandline += "| ddcd -d{decimation} -t{transition_bw} -a127.0.0.1 -p4951 -m{method}".format(decimation=test_dsp.decimation, transition_bw=test_dsp.ddc_transition_bw(), method=cfg.ddc_method) + print "[openwebrx-main] Running receiver_commandline: "+cfg.receiver_commandline + receiver_proc=subprocess.Popen(cfg.receiver_commandline, stdout=subprocess.PIPE, shell=True) + time.sleep(2) + """ + receiver_stdout_data="" + while True: + receiver_new_data=receiver_proc.stdout.read(1) + if len(receiver_new_data)==0: break + receiver_stdout_data+=receiver_new_data + if "ddcd: listening on" in receiver_stdout_data: break + print "[openwebrx-main] ddcd is ready." + """ + + """ #Now we use ddcd instead of rtl_mus, ncat, or others #Run rtl_mus.py in a different OS thread python_command="pypy" if pypy else "python2" rtl_mus_cmd = python_command+" rtl_mus.py config_rtl" @@ -244,11 +261,11 @@ def spectrum_thread_function(): dsp.set_fft_size(cfg.fft_size) dsp.set_fft_fps(cfg.fft_fps) dsp.set_fft_compression(cfg.fft_compression) - dsp.set_format_conversion(cfg.format_conversion) sleep_sec=0.87/cfg.fft_fps print "[openwebrx-spectrum] Spectrum thread initialized successfully." dsp.start() dsp.read(8) #dummy read to skip bufsize & preamble + #dsp.write("bypass=1\n") #set DDC to I/Q mode print "[openwebrx-spectrum] Spectrum thread started." bytes_to_read=int(dsp.get_fft_bytes_to_read()) while True: @@ -395,7 +412,6 @@ class WebRXHandler(BaseHTTPRequestHandler): dsp=getattr(plugins.dsp,cfg.dsp_plugin).plugin.dsp_plugin() dsp_initialized=False dsp.set_audio_compression(cfg.audio_compression) - dsp.set_format_conversion(cfg.format_conversion) dsp.set_offset_freq(0) dsp.set_bpf(-4000,4000) dsp.nc_port=cfg.iq_server_port diff --git a/plugins/dsp/csdr/plugin.py b/plugins/dsp/csdr/plugin.py index 35361c3f..890c2995 100644 --- a/plugins/dsp/csdr/plugin.py +++ b/plugins/dsp/csdr/plugin.py @@ -43,7 +43,6 @@ class dsp_plugin: self.fft_compression = "none" self.demodulator = "nfm" self.name = "csdr" - self.format_conversion = "csdr convert_u8_f" self.base_bufsize = 512 self.nc_port = 4951 try: @@ -52,14 +51,14 @@ class dsp_plugin: print "[openwebrx-plugin:csdr] error: netcat not found, please install netcat!" def chain(self,which): - any_chain_base="nc -v localhost {nc_port} | csdr setbuf {start_bufsize} | csdr through | "+self.format_conversion+(" | " if self.format_conversion!="" else "") ##"csdr flowcontrol {flowcontrol} auto 1.5 10 | " + any_chain_base="nc -v localhost {nc_port} | csdr setbuf {start_bufsize} | csdr through | " ##"csdr flowcontrol {flowcontrol} auto 1.5 10 | " if which == "fft": - fft_chain_base = "sleep 1.5; "+any_chain_base+"csdr fft_cc {fft_size} {fft_block_size} | csdr logpower_cf -70 | csdr fft_exchange_sides_ff {fft_size}" + fft_chain_base = "(echo bypass=1; cat) |"+ any_chain_base+"csdr fft_cc {fft_size} {fft_block_size} | csdr logpower_cf -70 | csdr fft_exchange_sides_ff {fft_size}" if self.fft_compression=="adpcm": return fft_chain_base+" | csdr compress_fft_adpcm_f_u8 {fft_size}" else: return fft_chain_base - chain_begin=any_chain_base+"csdr shift_addition_cc --fifo {shift_pipe} | csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING | csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING | " + chain_begin=any_chain_base+"csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING | " chain_end = "" if self.audio_compression=="adpcm": chain_end = " | csdr encode_ima_adpcm_i16_u8" @@ -121,9 +120,13 @@ class dsp_plugin: def set_offset_freq(self,offset_freq): self.offset_freq=offset_freq + new_shift_rate=(-float(self.offset_freq)/self.samp_rate) if self.running: - self.shift_pipe_file.write("%g\n"%(-float(self.offset_freq)/self.samp_rate)) - self.shift_pipe_file.flush() + if self.shift_pipe: + self.shift_pipe_file.write("%g\n"%new_shift_rate) + self.shift_pipe_file.flush() + else: + self.write("shift=%g\n"%new_shift_rate) def set_bpf(self,low_cut,high_cut): self.low_cut=low_cut @@ -168,7 +171,7 @@ class dsp_plugin: #code.interact(local=locals()) my_env=os.environ.copy() my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1"; - self.process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env) + self.process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env) self.running = True #open control pipes for csdr and send initialization data @@ -181,8 +184,14 @@ class dsp_plugin: def read(self,size): return self.process.stdout.read(size) + + def write(self, data): + self.process.stdin.write(data) + self.process.stdin.flush() def stop(self): + print "[openwebrx-dsp-plugin:csdr] killing pgid", os.getpgid(self.process.pid) + self.process.stdin.close() os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) #if(self.process.poll()!=None):return # returns None while subprocess is running #while(self.process.poll()==None): diff --git a/rtl_mus.py b/rtl_mus.py deleted file mode 100644 index 9f5e230e..00000000 --- a/rtl_mus.py +++ /dev/null @@ -1,539 +0,0 @@ -''' - This file is part of RTL Multi-User Server, - that makes multi-user access to your DVB-T dongle used as an SDR. - Copyright (c) 2013-2015 by Andras Retzler - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . - ------ - -2013-11? Asyncore version -2014-03 Fill with null on no data - -''' - -import socket -import sys -import array -import time -import logging -import os -import time -import subprocess -import fcntl -import thread -import pdb -import asyncore -import multiprocessing -import signal -#pypy compatiblity -try: import dl -except: pass - -import code -import traceback - -def handle_signal(signal, frame): - log.info("Ctrl+C: aborting.") - os._exit(1) #not too graceful exit - -def ip_match(this,ip_ranges,for_allow): - if not len(ip_ranges): - return 1 #empty list matches all ip addresses - for ip_range in ip_ranges: - #print this[0:len(ip_range)], ip_range - if this[0:len(ip_range)]==ip_range: - return 1 - return 0 - -def ip_access_control(ip): - if(not cfg.use_ip_access_control): return 1 - allowed=0 - if(cfg.order_allow_deny): - if ip_match(ip,cfg.allowed_ip_ranges,1): allowed=1 - if ip_match(ip,cfg.denied_ip_ranges,0): allowed=0 - else: - if ip_match(ip,cfg.denied_ip_ranges,0): - allowed=0 - if ip_match(ip,cfg.allowed_ip_ranges,1): - allowed=1 - return allowed - -def add_data_to_clients(new_data): - # might be called from: - # -> dsp_read - # -> rtl_tcp_asyncore.handle_read - global clients - global clients_mutex - clients_mutex.acquire() - for client in clients: - #print "client %d size: %d"%(client[0].ident,client[0].waiting_data.qsize()) - if(client[0].waiting_data.full()): - if cfg.cache_full_behaviour == 0: - log.error("client cache full, dropping samples: "+str(client[0].ident)+"@"+client[0].socket[1][0]) - while not client[0].waiting_data.empty(): # clear queue - client[0].waiting_data.get(False, None) - elif cfg.cache_full_behaviour == 1: - #rather closing client: - log.error("client cache full, dropping client: "+str(client[0].ident)+"@"+client[0].socket[1][0]) - client[0].close(False) - elif cfg.cache_full_behaviour == 2: - pass #client cache full, just not taking care - else: log.error("invalid value for cfg.cache_full_behaviour") - else: - client[0].waiting_data.put(new_data) - clients_mutex.release() - -def dsp_read_thread(): - global proc - global dsp_data_count - while True: - try: - my_buffer=proc.stdout.read(1024) - except IOError: - log.error("DSP subprocess is not ready for reading.") - time.sleep(1) - continue - add_data_to_clients(my_buffer) - if cfg.debug_dsp_command: - dsp_data_count+=len(my_buffer) - -def dsp_write_thread(): - global proc - global dsp_input_queue - global original_data_count - while True: - try: - my_buffer=dsp_input_queue.get(timeout=0.3) - except: - continue - proc.stdin.write(my_buffer) - proc.stdin.flush() - if cfg.debug_dsp_command: - original_data_count+=len(my_buffer) - -class client_handler(asyncore.dispatcher): - - def __init__(self,client_param): - self.client=client_param - self.client[0].asyncore=self - self.sent_dongle_id=False - self.last_waiting_buffer="" - asyncore.dispatcher.__init__(self, self.client[0].socket[0]) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - def handle_read(self): - global commands - new_command = self.recv(5) - if len(new_command)>=5: - if handle_command(new_command, self.client): - commands.put(new_command) - - def handle_error(self): - exc_type, exc_value, exc_traceback = sys.exc_info() - log.info("client error: "+str(self.client[0].ident)+"@"+self.client[0].socket[1][0]) - traceback.print_tb(exc_traceback) - self.close() - - def handle_close(self): - self.client[0].close() - log.info("client disconnected: "+str(self.client[0].ident)+"@"+self.client[0].socket[1][0]) - - def writable(self): - #print "queryWritable",not self.client[0].waiting_data.empty() - return not self.client[0].waiting_data.empty() - - def handle_write(self): - global last_waiting - global rtl_dongle_identifier - global sample_rate - if not self.sent_dongle_id: - self.send(rtl_dongle_identifier) - self.sent_dongle_id=True - return - #print "write2client",self.client[0].waiting_data.qsize() - next=self.last_waiting_buffer+self.client[0].waiting_data.get() - sent=asyncore.dispatcher.send(self, next) - self.last_waiting_buffer=next[sent:] - -class server_asyncore(asyncore.dispatcher): - - def __init__(self): - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind((cfg.my_ip, cfg.my_listening_port)) - self.listen(5) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - log.info("Server listening on port: "+str(cfg.my_listening_port)) - - def handle_accept(self): - global max_client_id - global clients_mutex - global clients - my_client=[client()] - my_client[0].socket=self.accept() - if (my_client[0].socket is None): # not sure if required - return - if (ip_access_control(my_client[0].socket[1][0])): - my_client[0].ident=max_client_id - max_client_id+=1 - my_client[0].start_time=time.time() - my_client[0].waiting_data=multiprocessing.Queue(500) - clients_mutex.acquire() - clients.append(my_client) - clients_mutex.release() - handler = client_handler(my_client) - log.info("client accepted: "+str(len(clients)-1)+"@"+my_client[0].socket[1][0]+":"+str(my_client[0].socket[1][1])+" users now: "+str(len(clients))) - else: - log.info("client denied: "+str(len(clients)-1)+"@"+my_client[0].socket[1][0]+":"+str(my_client[0].socket[1][1])+" blocked by ip") - my_client.socket.close() - -rtl_tcp_resetting=False #put me away - -def rtl_tcp_asyncore_reset(timeout): - global rtl_tcp_core - global rtl_tcp_resetting - if rtl_tcp_resetting: return - #print "rtl_tcp_asyncore_reset" - rtl_tcp_resetting=True - time.sleep(timeout) - try: - rtl_tcp_core.close() - except: - pass - try: - del rtl_tcp_core - except: - pass - rtl_tcp_core=rtl_tcp_asyncore() - #print asyncore.socket_map - rtl_tcp_resetting=False - -class rtl_tcp_asyncore(asyncore.dispatcher): - def __init__(self): - global server_missing_logged - asyncore.dispatcher.__init__(self) - self.password_sent = False - self.ok=True - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - try: - self.connect((cfg.rtl_tcp_host, cfg.rtl_tcp_port)) - self.socket.settimeout(0.1) - except: - log.error("rtl_tcp connection refused. Retrying.") - thread.start_new_thread(rtl_tcp_asyncore_reset, (1,)) - self.close() - return - - def handle_error(self): - global server_missing_logged - global rtl_tcp_connected - rtl_tcp_connected=False - exc_type, exc_value, exc_traceback = sys.exc_info() - self.ok=False - server_is_missing=hasattr(exc_value,"errno") and exc_value.errno==111 - if (not server_is_missing) or (not server_missing_logged): - log.error("with rtl_tcp host connection: "+str(exc_value)) - #traceback.print_tb(exc_traceback) - server_missing_logged|=server_is_missing - try: - self.close() - except: - pass - thread.start_new_thread(rtl_tcp_asyncore_reset, (2,)) - - def handle_connect(self): - global server_missing_logged - global rtl_tcp_connected - self.socket.settimeout(0.1) - self.password_sent = False - rtl_tcp_connected=True - if self.ok: - log.info("rtl_tcp host connection estabilished") - server_missing_logged=False - - def handle_close(self): - global rtl_tcp_connected - global rtl_tcp_core - rtl_tcp_connected=False - log.error("rtl_tcp host connection has closed, now trying to reopen") - try: - self.close() - except: - pass - thread.start_new_thread(rtl_tcp_asyncore_reset, (2,)) - - def handle_read(self): - global rtl_dongle_identifier - global dsp_input_queue - global watchdog_data_count - if(len(rtl_dongle_identifier)==0): - rtl_dongle_identifier=self.recv(12) - return - new_data_buffer=self.recv(1024*16) - if cfg.watchdog_interval: - watchdog_data_count+=1024*16 - if cfg.use_dsp_command: - dsp_input_queue.put(new_data_buffer) - #print "did put anyway" - else: - add_data_to_clients(new_data_buffer) - - def writable(self): - - #check if any new commands to write - global commands - return (not self.password_sent and cfg.rtl_tcp_password != None) or not commands.empty() - - def handle_write(self): - if(not self.password_sent and cfg.rtl_tcp_password != None): - log.info("Sending rtl_tcp_password...") - self.send(cfg.rtl_tcp_password) - self.password_sent = True - global commands - while not commands.empty(): - mcmd=commands.get() - self.send(mcmd) - -def xxd(data): - #diagnostic purposes only - output="" - for d in data: - output+=hex(ord(d))[2:].zfill(2)+" " - return output - -def handle_command(command, client_param): - global sample_rate - client=client_param[0] - param=array.array("I", command[1:5])[0] - param=socket.ntohl(param) - command_id=ord(command[0]) - client_info=str(client.ident)+"@"+client.socket[1][0]+":"+str(client.socket[1][1]) - if(time.time()-client.start_time client can't set anything until "+str(cfg.client_cant_set_until)+" seconds") - return 0 - if command_id == 1: - if max(map((lambda r: param>=r[0] and param<=r[1]),cfg.freq_allowed_ranges)): - log.debug("allow: "+client_info+" -> set freq "+str(param)) - return 1 - else: - log.debug("deny: "+client_info+" -> set freq - out of range: "+str(param)) - elif command_id == 2: - log.debug("deny: "+client_info+" -> set sample rate: "+str(param)) - sample_rate=param - return 0 # ordinary clients are not allowed to do this - elif command_id == 3: - log.debug("deny/allow: "+client_info+" -> set gain mode: "+str(param)) - return cfg.allow_gain_set - elif command_id == 4: - log.debug("deny/allow: "+client_info+" -> set gain: "+str(param)) - return cfg.allow_gain_set - elif command_id == 5: - log.debug("deny: "+client_info+" -> set freq correction: "+str(param)) - return 0 - elif command_id == 6: - log.debug("deny/allow: set if stage gain") - return cfg.allow_gain_set - elif command_id == 7: - log.debug("deny: set test mode") - return 0 - elif command_id == 8: - log.debug("deny/allow: set agc mode") - return cfg.allow_gain_set - elif command_id == 9: - log.debug("deny: set direct sampling") - return 0 - elif command_id == 10: - log.debug("deny: set offset tuning") - return 0 - elif command_id == 11: - log.debug("deny: set rtl xtal") - return 0 - elif command_id == 12: - log.debug("deny: set tuner xtal") - return 0 - elif command_id == 13: - log.debug("deny/allow: set tuner gain by index") - return cfg.allow_gain_set - else: - log.debug("deny: "+client_info+" sent an ivalid command: "+str(param)) - return 0 - -def watchdog_thread(): - global rtl_tcp_connected - global rtl_tcp_core - global watchdog_data_count - global sample_rate - zero_buffer_size=16348 - second_frac=10 - zero_buffer='\x7f'*zero_buffer_size - watchdog_data_count=0 - rtl_tcp_connected=False - null_fill=False - time.sleep(4) # wait before activating this thread - log.info("watchdog started") - first_start=True - n=0 - while True: - wait_altogether=cfg.watchdog_interval if rtl_tcp_connected or first_start else cfg.reconnect_interval - first_start=False - if null_fill: - log.error("watchdog: filling buffer with zeros.") - while wait_altogether>0: - wait_altogether-=1.0/second_frac - for i in range(0,((2*sample_rate)/second_frac)/zero_buffer_size): - add_data_to_clients(zero_buffer) - n+=len(zero_buffer) - time.sleep(0) #yield - if watchdog_data_count: break - if watchdog_data_count: break - time.sleep(1.0/second_frac) - #print "sent altogether",n - else: - time.sleep(wait_altogether) - null_fill=not watchdog_data_count - if not watchdog_data_count: - log.error("watchdog: restarting rtl_tcp_asyncore() now.") - rtl_tcp_asyncore_reset(0) - watchdog_data_count=0 - - - -def dsp_debug_thread(): - global dsp_data_count - global original_data_count - while 1: - time.sleep(1) - print "[rtl-mus] DSP | Original data: "+str(int(original_data_count/1000))+"kB/sec | Processed data: "+str(int(dsp_data_count/1000))+"kB/sec" - dsp_data_count = original_data_count=0 - -class client: - ident=None #id - to_close=False - waiting_data=None - start_time=None - socket=None - asyncore=None - - def close(self, use_mutex=True): - global clients_mutex - global clients - if use_mutex: clients_mutex.acquire() - correction=0 - for i in range(0,len(clients)): - i-=correction - if clients[i][0].ident==self.ident: - try: - self.socket.close() - except: - pass - try: - self.asyncore.close() - del self.asyncore - except: - pass - del clients[i] - correction+=1 - if use_mutex: clients_mutex.release() - - -def main(): - global server_missing_logged - global rtl_dongle_identifier - global log - global clients - global clients_mutex - global original_data_count - global dsp_input_queue - global dsp_data_count - global proc - global commands - global max_client_id - global rtl_tcp_core - global sample_rate - - #Set signal handler - signal.signal(signal.SIGINT, handle_signal) #http://stackoverflow.com/questions/1112343/how-do-i-capture-sigint-in-python - - # set up logging - log = logging.getLogger("rtl_mus") - log.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') - stream_handler = logging.StreamHandler() - stream_handler.setLevel(logging.DEBUG) - stream_handler.setFormatter(formatter) - log.addHandler(stream_handler) - file_handler = logging.FileHandler(cfg.log_file_path) - file_handler.setLevel(logging.INFO) - file_handler.setFormatter(formatter) - log.addHandler(file_handler) - log.info("Server is UP") - - server_missing_logged=0 # Not to flood the screen with messages related to rtl_tcp disconnect - rtl_dongle_identifier='' # rtl_tcp sends some identifier on dongle type and gain values in the first few bytes right after connection - clients=[] - dsp_data_count=original_data_count=0 - commands=multiprocessing.Queue() - dsp_input_queue=multiprocessing.Queue() - clients_mutex=multiprocessing.Lock() - max_client_id=0 - sample_rate=250000 # so far only watchdog thread uses it to fill buffer up with zeros on missing input - - # start dsp threads - if cfg.use_dsp_command: - print "[rtl_mus] Opening DSP process..." - proc = subprocess.Popen (cfg.dsp_command.split(" "), stdin = subprocess.PIPE, stdout = subprocess.PIPE) #!! should fix the split :-S - dsp_read_thread_v=thread.start_new_thread(dsp_read_thread, ()) - dsp_write_thread_v=thread.start_new_thread(dsp_write_thread, ()) - if cfg.debug_dsp_command: - dsp_debug_thread_v=thread.start_new_thread(dsp_debug_thread,()) - - # start watchdog thread - if cfg.watchdog_interval != 0: - watchdog_thread_v=thread.start_new_thread(watchdog_thread,()) - - # start asyncores - rtl_tcp_core = rtl_tcp_asyncore() - server_core = server_asyncore() - - asyncore.loop(0.1) - - -if __name__=="__main__": - print - print "rtl_mus: Multi-User I/Q Data Server for RTL-SDR v0.22, made at HA5KFU Amateur Radio Club (http://ha5kfu.hu)" - print " code by Andras Retzler, HA7ILM " - print " distributed under GNU GPL v3" - print - - try: - for libcpath in ["/lib/i386-linux-gnu/libc.so.6","/lib/libc.so.6"]: - if os.path.exists(libcpath): - libc = dl.open(libcpath) - libc.call("prctl", 15, "rtl_mus", 0, 0, 0) - break - except: - pass - - # === Load configuration script === - if len(sys.argv)==1: - print "[rtl_mus] Warning! Configuration script not specified. I will use: \"config_rtl.py\"" - config_script="config_rtl" - else: - config_script=sys.argv[1] - cfg=__import__(config_script) - if cfg.setuid_on_start: - os.setuid(cfg.uid) - main()