adding sdrInput and inputBase classes

This commit is contained in:
Bastian Schroll 2019-10-28 21:20:27 +01:00
parent 2f5184742f
commit bf16a5c82f
No known key found for this signature in database
GPG key ID: 0AE96912A20E9F5F
6 changed files with 157 additions and 161 deletions

View file

@ -0,0 +1,2 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-

View file

@ -0,0 +1,65 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: inoutSource.py
@date: 28.10.2018
@author: Bastian Schroll
@description: Base class for boswatch input sources
"""
import time
import logging
import threading
logging.debug("- %s loaded", __name__)
class InputBase:
"""!Base class for handling inout sources"""
def __init__(self, inputQueue, inputConfig, decoderConfig):
"""!Build a new InputSource class
@param inputQueue: Python queue object to store input data
@param inputConfig: ConfigYaml object with the inoutSource config
@param decoderConfig: ConfigYaml object with the decoder config"""
self._inputThread = None
self._isRunning = False
self._inputQueue = inputQueue
self._inputConfig = inputConfig
self._decoderConfig = decoderConfig
def start(self):
"""!Start the input source thread"""
logging.debug("starting input thread")
self._isRunning = True
self._inputThread = threading.Thread(target=self._runThread, name="inputThread",
args=(self._inputQueue, self._inputConfig, self._decoderConfig))
self._inputThread.daemon = True
self._inputThread.start()
def _runThread(self, dataQueue, sdrConfig, decoderConfig):
"""!Thread routine of the input source has to be inherit"""
logging.fatal("input thread routine not implemented")
exit(1)
def shutdown(self):
"""!Stop the input source thread"""
if self._isRunning:
logging.debug("wait for stopping the input thread")
self._isRunning = False
self._inputThread.join()
logging.debug("input thread stopped")
def addToQueue(self, data):
"""!Adds alarm data to the queue for further processing during boswatch client"""
self._inputQueue.put_nowait((data, time.time()))
logging.debug("Add received data to queue")
print(data)

View file

@ -1,90 +0,0 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: sdrInput.py
@date: 28.10.2018
@author: Bastian Schroll
@description: Input source for sdr with rtl_fm
"""
import time
import logging
import threading
from boswatch.utils import paths
from boswatch.processManager import ProcessManager
logging.debug("- %s loaded", __name__)
class SdrInput:
"""!Worker class to check internet connection"""
def __init__(self):
self._isRunning = False
self._mmThread = None
def start(self, packetQueue, inputConfig, decoderConfig):
self._isRunning = True
self._mmThread = threading.Thread(target=self._handleSDRInput, name="mmReader",
args=(packetQueue, inputConfig, decoderConfig))
self._mmThread.daemon = True
self._mmThread.start()
def shutdown(self):
self._isRunning = False
self._mmThread.join()
def _handleSDRInput(self, dataQueue, sdrConfig, decoderConfig): # todo exception handling inside
sdrProc = ProcessManager(str(sdrConfig.get("rtlPath", default="rtl_fm")))
sdrProc.addArgument("-d " + str(sdrConfig.get("device", default="0"))) # device id
sdrProc.addArgument("-f " + sdrConfig.get("frequency")) # frequencies
sdrProc.addArgument("-p " + str(sdrConfig.get("error", default="0"))) # frequency error in ppm
sdrProc.addArgument("-l " + str(sdrConfig.get("squelch", default="1"))) # squelch
sdrProc.addArgument("-g " + str(sdrConfig.get("gain", default="100"))) # gain
sdrProc.addArgument("-M fm") # set mode to fm
sdrProc.addArgument("-E DC") # set DC filter
sdrProc.addArgument("-s 22050") # bit rate of audio stream
sdrProc.setStderr(open(paths.LOG_PATH + "rtl_fm.log", "a"))
sdrProc.start()
mmProc = ProcessManager(str(sdrConfig.get("mmPath", default="multimon-ng")), textMode=True)
if decoderConfig.get("fms", default=0):
mmProc.addArgument("-a FMSFSK")
if decoderConfig.get("zvei", default=0):
mmProc.addArgument("-a ZVEI1")
if decoderConfig.get("poc512", default=0):
mmProc.addArgument("-a POCSAG512")
if decoderConfig.get("poc1200", default=0):
mmProc.addArgument("-a POCSAG1200")
if decoderConfig.get("poc2400", default=0):
mmProc.addArgument("-a POCSAG2400")
mmProc.addArgument("-f alpha")
mmProc.addArgument("-t raw -")
mmProc.setStdin(sdrProc.stdout)
mmProc.setStderr(open(paths.LOG_PATH + "multimon-ng.log", "a"))
mmProc.start()
logging.info("start decoding")
while self._isRunning:
if not sdrProc.isRunning:
logging.warning("rtl_fm was down - try to restart")
sdrProc.start()
elif not mmProc.isRunning:
logging.warning("multimon was down - try to restart")
mmProc.start()
elif sdrProc.isRunning and mmProc.isRunning:
line = mmProc.readline()
if line:
dataQueue.put_nowait((line, time.time()))
logging.debug("Add data to queue")
print(line)
logging.debug("stopping thread")
mmProc.stop()
sdrProc.stop()

View file

@ -0,0 +1,77 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: sdrInput.py
@date: 28.10.2018
@author: Bastian Schroll
@description: Input source for sdr with rtl_fm
"""
import logging
from boswatch.utils import paths
from boswatch.processManager import ProcessManager
from boswatch.inputSource.inputBase import InputBase
logging.debug("- %s loaded", __name__)
class SdrInput(InputBase):
"""!Class for the sdr input source"""
def _runThread(self, dataQueue, sdrConfig, decoderConfig):
sdrProc = None
mmProc = None
try:
sdrProc = ProcessManager(str(sdrConfig.get("rtlPath", default="rtl_fm")))
sdrProc.addArgument("-d " + str(sdrConfig.get("device", default="0"))) # device id
sdrProc.addArgument("-f " + str(sdrConfig.get("frequency"))) # frequencies
sdrProc.addArgument("-p " + str(sdrConfig.get("error", default="0"))) # frequency error in ppm
sdrProc.addArgument("-l " + str(sdrConfig.get("squelch", default="1"))) # squelch
sdrProc.addArgument("-g " + str(sdrConfig.get("gain", default="100"))) # gain
sdrProc.addArgument("-M fm") # set mode to fm
sdrProc.addArgument("-E DC") # set DC filter
sdrProc.addArgument("-s 22050") # bit rate of audio stream
sdrProc.setStderr(open(paths.LOG_PATH + "rtl_fm.log", "a"))
sdrProc.start()
mmProc = ProcessManager(str(sdrConfig.get("mmPath", default="multimon-ng")), textMode=True)
if decoderConfig.get("fms", default=0):
mmProc.addArgument("-a FMSFSK")
if decoderConfig.get("zvei", default=0):
mmProc.addArgument("-a ZVEI1")
if decoderConfig.get("poc512", default=0):
mmProc.addArgument("-a POCSAG512")
if decoderConfig.get("poc1200", default=0):
mmProc.addArgument("-a POCSAG1200")
if decoderConfig.get("poc2400", default=0):
mmProc.addArgument("-a POCSAG2400")
mmProc.addArgument("-f alpha")
mmProc.addArgument("-t raw -")
mmProc.setStdin(sdrProc.stdout)
mmProc.setStderr(open(paths.LOG_PATH + "multimon-ng.log", "a"))
mmProc.start()
logging.info("start decoding")
while self._isRunning:
if not sdrProc.isRunning:
logging.warning("rtl_fm was down - try to restart")
sdrProc.start()
elif not mmProc.isRunning:
logging.warning("multimon was down - try to restart")
mmProc.start()
elif sdrProc.isRunning and mmProc.isRunning:
line = mmProc.readline()
if line:
self.addToQueue(line)
except:
logging.exception("error in sdr input routine")
finally:
mmProc.stop()
sdrProc.stop()

View file

@ -32,9 +32,6 @@ class ProcessManager:
self._processHandle = None self._processHandle = None
self._textMode = textMode self._textMode = textMode
def __del__(self):
self.stop()
def addArgument(self, arg): def addArgument(self, arg):
"""!add a new argument """!add a new argument

View file

@ -32,8 +32,6 @@ logging.debug("BOSWatch client has started ...")
logging.debug("Import python modules") logging.debug("Import python modules")
import argparse import argparse
logging.debug("- argparse") logging.debug("- argparse")
import threading
logging.debug("- threading")
import queue import queue
logging.debug("- queue") logging.debug("- queue")
import time import time
@ -43,10 +41,10 @@ logging.debug("Import BOSWatch modules")
from boswatch.configYaml import ConfigYAML from boswatch.configYaml import ConfigYAML
from boswatch.network.client import TCPClient from boswatch.network.client import TCPClient
from boswatch.network.broadcast import BroadcastClient from boswatch.network.broadcast import BroadcastClient
from boswatch.processManager import ProcessManager
from boswatch.decoder.decoder import Decoder from boswatch.decoder.decoder import Decoder
from boswatch.utils import header from boswatch.utils import header
from boswatch.utils import misc from boswatch.utils import misc
from boswatch.inputSource.sdrInput import SdrInput
header.logoToLog() header.logoToLog()
header.infoToLog() header.infoToLog()
@ -67,8 +65,9 @@ if not bwConfig.loadConfigFile(paths.CONFIG_PATH + args.config):
exit(1) exit(1)
# ========== CLIENT CODE ========== # ========== CLIENT CODE ==========
mmThread = None
bwClient = None bwClient = None
inputSource = None
inputQueue = queue.Queue()
try: try:
ip = bwConfig.get("server", "ip", default="127.0.0.1") ip = bwConfig.get("server", "ip", default="127.0.0.1")
@ -80,68 +79,15 @@ try:
ip = broadcastClient.serverIP ip = broadcastClient.serverIP
port = broadcastClient.serverPort port = broadcastClient.serverPort
# ========== INPUT CODE ==========
def handleSDRInput(dataQueue, sdrConfig, decoderConfig): # todo exception handling inside
sdrProc = ProcessManager(str(sdrConfig.get("rtlPath", default="rtl_fm")))
sdrProc.addArgument("-d " + str(sdrConfig.get("device", default="0"))) # device id
sdrProc.addArgument("-f " + sdrConfig.get("frequency")) # frequencies
sdrProc.addArgument("-p " + str(sdrConfig.get("error", default="0"))) # frequency error in ppm
sdrProc.addArgument("-l " + str(sdrConfig.get("squelch", default="1"))) # squelch
sdrProc.addArgument("-g " + str(sdrConfig.get("gain", default="100"))) # gain
sdrProc.addArgument("-M fm") # set mode to fm
sdrProc.addArgument("-E DC") # set DC filter
sdrProc.addArgument("-s 22050") # bit rate of audio stream
sdrProc.setStderr(open(paths.LOG_PATH + "rtl_fm.log", "a"))
sdrProc.start()
# sdrProc.skipLinesUntil("Output at")
mmProc = ProcessManager(str(sdrConfig.get("mmPath", default="multimon-ng")), textMode=True)
if decoderConfig.get("fms", default=0):
mmProc.addArgument("-a FMSFSK")
if decoderConfig.get("zvei", default=0):
mmProc.addArgument("-a ZVEI1")
if decoderConfig.get("poc512", default=0):
mmProc.addArgument("-a POCSAG512")
if decoderConfig.get("poc1200", default=0):
mmProc.addArgument("-a POCSAG1200")
if decoderConfig.get("poc2400", default=0):
mmProc.addArgument("-a POCSAG2400")
mmProc.addArgument("-f alpha")
mmProc.addArgument("-t raw -")
mmProc.setStdin(sdrProc.stdout)
mmProc.setStderr(open(paths.LOG_PATH + "multimon-ng.log", "a"))
mmProc.start()
# mmProc.skipLinesUntil("Available demodulators:")
logging.info("start decoding")
while inputThreadRunning:
if not sdrProc.isRunning:
logging.warning("rtl_fm was down - try to restart")
sdrProc.start()
# sdrProc.skipLinesUntil("Output at") # last line form rtl_fm before data
elif not mmProc.isRunning:
logging.warning("multimon was down - try to restart")
mmProc.start()
# mmProc.skipLinesUntil("Available demodulators:") # last line from mm before data
elif sdrProc.isRunning and mmProc.isRunning:
line = mmProc.readline()
if line:
dataQueue.put_nowait((line, time.time()))
logging.debug("Add data to queue")
print(line)
logging.debug("stopping thread")
mmProc.stop()
sdrProc.stop()
# ========== INPUT CODE ==========
inputQueue = queue.Queue()
if not args.test: if not args.test:
inputThreadRunning = True logging.debug("loading input source: %s", bwConfig.get("client", "inputSource"))
mmThread = threading.Thread(target=handleSDRInput, name="mmReader", if bwConfig.get("client", "inputSource") == "sdr":
args=(inputQueue, bwConfig.get("inputSource", "sdr"), bwConfig.get("decoder"))) inputSource = SdrInput(inputQueue, bwConfig.get("inputSource", "sdr"), bwConfig.get("decoder"))
mmThread.daemon = True else:
mmThread.start() logging.fatal("Invalid input source: %s", bwConfig.get("client", "inputSource"))
exit(1)
inputSource.start()
else: else:
logging.warning("STARTING TESTMODE!") logging.warning("STARTING TESTMODE!")
logging.debug("reading testdata from file") logging.debug("reading testdata from file")
@ -200,9 +146,8 @@ except: # pragma: no cover
logging.exception("BOSWatch interrupted by an error") logging.exception("BOSWatch interrupted by an error")
finally: finally:
logging.debug("Starting shutdown routine") logging.debug("Starting shutdown routine")
if inputSource:
inputSource.shutdown()
if bwClient: if bwClient:
bwClient.disconnect() bwClient.disconnect()
inputThreadRunning = False
if mmThread:
mmThread.join()
logging.debug("BOSWatch client has stopped ...") logging.debug("BOSWatch client has stopped ...")