diff --git a/boswatch/inputSource/__init__.py b/boswatch/inputSource/__init__.py new file mode 100644 index 0000000..836e3e8 --- /dev/null +++ b/boswatch/inputSource/__init__.py @@ -0,0 +1,2 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- diff --git a/boswatch/inputSource/inputBase.py b/boswatch/inputSource/inputBase.py new file mode 100644 index 0000000..f8d5588 --- /dev/null +++ b/boswatch/inputSource/inputBase.py @@ -0,0 +1,65 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: inoutSource.py +@date: 28.10.2018 +@author: Bastian Schroll +@description: Base class for boswatch input sources +""" +import time +import logging +import threading + +logging.debug("- %s loaded", __name__) + + +class InputBase: + """!Base class for handling inout sources""" + + def __init__(self, inputQueue, inputConfig, decoderConfig): + """!Build a new InputSource class + + @param inputQueue: Python queue object to store input data + @param inputConfig: ConfigYaml object with the inoutSource config + @param decoderConfig: ConfigYaml object with the decoder config""" + self._inputThread = None + self._isRunning = False + self._inputQueue = inputQueue + self._inputConfig = inputConfig + self._decoderConfig = decoderConfig + + def start(self): + """!Start the input source thread""" + logging.debug("starting input thread") + self._isRunning = True + self._inputThread = threading.Thread(target=self._runThread, name="inputThread", + args=(self._inputQueue, self._inputConfig, self._decoderConfig)) + self._inputThread.daemon = True + self._inputThread.start() + + def _runThread(self, dataQueue, sdrConfig, decoderConfig): + """!Thread routine of the input source has to be inherit""" + logging.fatal("input thread routine not implemented") + exit(1) + + def shutdown(self): + """!Stop the input source thread""" + if self._isRunning: + logging.debug("wait for stopping the input thread") + self._isRunning = False + self._inputThread.join() + logging.debug("input thread stopped") + + def addToQueue(self, data): + """!Adds alarm data to the queue for further processing during boswatch client""" + self._inputQueue.put_nowait((data, time.time())) + logging.debug("Add received data to queue") + print(data) diff --git a/boswatch/inputSource/inputSource.py b/boswatch/inputSource/inputSource.py deleted file mode 100644 index a4b8766..0000000 --- a/boswatch/inputSource/inputSource.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -"""! - ____ ____ ______ __ __ __ _____ - / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / - / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < - / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / -/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ - German BOS Information Script - by Bastian Schroll - -@file: sdrInput.py -@date: 28.10.2018 -@author: Bastian Schroll -@description: Input source for sdr with rtl_fm -""" -import time -import logging -import threading -from boswatch.utils import paths -from boswatch.processManager import ProcessManager - -logging.debug("- %s loaded", __name__) - - -class SdrInput: - """!Worker class to check internet connection""" - - def __init__(self): - self._isRunning = False - self._mmThread = None - - def start(self, packetQueue, inputConfig, decoderConfig): - self._isRunning = True - self._mmThread = threading.Thread(target=self._handleSDRInput, name="mmReader", - args=(packetQueue, inputConfig, decoderConfig)) - self._mmThread.daemon = True - self._mmThread.start() - - def shutdown(self): - self._isRunning = False - self._mmThread.join() - - def _handleSDRInput(self, dataQueue, sdrConfig, decoderConfig): # todo exception handling inside - sdrProc = ProcessManager(str(sdrConfig.get("rtlPath", default="rtl_fm"))) - sdrProc.addArgument("-d " + str(sdrConfig.get("device", default="0"))) # device id - sdrProc.addArgument("-f " + sdrConfig.get("frequency")) # frequencies - sdrProc.addArgument("-p " + str(sdrConfig.get("error", default="0"))) # frequency error in ppm - sdrProc.addArgument("-l " + str(sdrConfig.get("squelch", default="1"))) # squelch - sdrProc.addArgument("-g " + str(sdrConfig.get("gain", default="100"))) # gain - sdrProc.addArgument("-M fm") # set mode to fm - sdrProc.addArgument("-E DC") # set DC filter - sdrProc.addArgument("-s 22050") # bit rate of audio stream - sdrProc.setStderr(open(paths.LOG_PATH + "rtl_fm.log", "a")) - sdrProc.start() - - mmProc = ProcessManager(str(sdrConfig.get("mmPath", default="multimon-ng")), textMode=True) - if decoderConfig.get("fms", default=0): - mmProc.addArgument("-a FMSFSK") - if decoderConfig.get("zvei", default=0): - mmProc.addArgument("-a ZVEI1") - if decoderConfig.get("poc512", default=0): - mmProc.addArgument("-a POCSAG512") - if decoderConfig.get("poc1200", default=0): - mmProc.addArgument("-a POCSAG1200") - if decoderConfig.get("poc2400", default=0): - mmProc.addArgument("-a POCSAG2400") - mmProc.addArgument("-f alpha") - mmProc.addArgument("-t raw -") - mmProc.setStdin(sdrProc.stdout) - mmProc.setStderr(open(paths.LOG_PATH + "multimon-ng.log", "a")) - mmProc.start() - - logging.info("start decoding") - while self._isRunning: - if not sdrProc.isRunning: - logging.warning("rtl_fm was down - try to restart") - sdrProc.start() - elif not mmProc.isRunning: - logging.warning("multimon was down - try to restart") - mmProc.start() - elif sdrProc.isRunning and mmProc.isRunning: - line = mmProc.readline() - if line: - dataQueue.put_nowait((line, time.time())) - logging.debug("Add data to queue") - print(line) - logging.debug("stopping thread") - mmProc.stop() - sdrProc.stop() diff --git a/boswatch/inputSource/sdrInput.py b/boswatch/inputSource/sdrInput.py new file mode 100644 index 0000000..8f9ce1e --- /dev/null +++ b/boswatch/inputSource/sdrInput.py @@ -0,0 +1,77 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: sdrInput.py +@date: 28.10.2018 +@author: Bastian Schroll +@description: Input source for sdr with rtl_fm +""" +import logging +from boswatch.utils import paths +from boswatch.processManager import ProcessManager +from boswatch.inputSource.inputBase import InputBase + +logging.debug("- %s loaded", __name__) + + +class SdrInput(InputBase): + """!Class for the sdr input source""" + + def _runThread(self, dataQueue, sdrConfig, decoderConfig): + sdrProc = None + mmProc = None + try: + sdrProc = ProcessManager(str(sdrConfig.get("rtlPath", default="rtl_fm"))) + sdrProc.addArgument("-d " + str(sdrConfig.get("device", default="0"))) # device id + sdrProc.addArgument("-f " + str(sdrConfig.get("frequency"))) # frequencies + sdrProc.addArgument("-p " + str(sdrConfig.get("error", default="0"))) # frequency error in ppm + sdrProc.addArgument("-l " + str(sdrConfig.get("squelch", default="1"))) # squelch + sdrProc.addArgument("-g " + str(sdrConfig.get("gain", default="100"))) # gain + sdrProc.addArgument("-M fm") # set mode to fm + sdrProc.addArgument("-E DC") # set DC filter + sdrProc.addArgument("-s 22050") # bit rate of audio stream + sdrProc.setStderr(open(paths.LOG_PATH + "rtl_fm.log", "a")) + sdrProc.start() + + mmProc = ProcessManager(str(sdrConfig.get("mmPath", default="multimon-ng")), textMode=True) + if decoderConfig.get("fms", default=0): + mmProc.addArgument("-a FMSFSK") + if decoderConfig.get("zvei", default=0): + mmProc.addArgument("-a ZVEI1") + if decoderConfig.get("poc512", default=0): + mmProc.addArgument("-a POCSAG512") + if decoderConfig.get("poc1200", default=0): + mmProc.addArgument("-a POCSAG1200") + if decoderConfig.get("poc2400", default=0): + mmProc.addArgument("-a POCSAG2400") + mmProc.addArgument("-f alpha") + mmProc.addArgument("-t raw -") + mmProc.setStdin(sdrProc.stdout) + mmProc.setStderr(open(paths.LOG_PATH + "multimon-ng.log", "a")) + mmProc.start() + + logging.info("start decoding") + while self._isRunning: + if not sdrProc.isRunning: + logging.warning("rtl_fm was down - try to restart") + sdrProc.start() + elif not mmProc.isRunning: + logging.warning("multimon was down - try to restart") + mmProc.start() + elif sdrProc.isRunning and mmProc.isRunning: + line = mmProc.readline() + if line: + self.addToQueue(line) + except: + logging.exception("error in sdr input routine") + finally: + mmProc.stop() + sdrProc.stop() diff --git a/boswatch/processManager.py b/boswatch/processManager.py index 2e703be..f261fdc 100644 --- a/boswatch/processManager.py +++ b/boswatch/processManager.py @@ -32,9 +32,6 @@ class ProcessManager: self._processHandle = None self._textMode = textMode - def __del__(self): - self.stop() - def addArgument(self, arg): """!add a new argument diff --git a/bw_client.py b/bw_client.py index 587e22e..14da3af 100644 --- a/bw_client.py +++ b/bw_client.py @@ -32,8 +32,6 @@ logging.debug("BOSWatch client has started ...") logging.debug("Import python modules") import argparse logging.debug("- argparse") -import threading -logging.debug("- threading") import queue logging.debug("- queue") import time @@ -43,10 +41,10 @@ logging.debug("Import BOSWatch modules") from boswatch.configYaml import ConfigYAML from boswatch.network.client import TCPClient from boswatch.network.broadcast import BroadcastClient -from boswatch.processManager import ProcessManager from boswatch.decoder.decoder import Decoder from boswatch.utils import header from boswatch.utils import misc +from boswatch.inputSource.sdrInput import SdrInput header.logoToLog() header.infoToLog() @@ -67,8 +65,9 @@ if not bwConfig.loadConfigFile(paths.CONFIG_PATH + args.config): exit(1) # ========== CLIENT CODE ========== -mmThread = None bwClient = None +inputSource = None +inputQueue = queue.Queue() try: ip = bwConfig.get("server", "ip", default="127.0.0.1") @@ -80,68 +79,15 @@ try: ip = broadcastClient.serverIP port = broadcastClient.serverPort - # ========== INPUT CODE ========== - def handleSDRInput(dataQueue, sdrConfig, decoderConfig): # todo exception handling inside - sdrProc = ProcessManager(str(sdrConfig.get("rtlPath", default="rtl_fm"))) - sdrProc.addArgument("-d " + str(sdrConfig.get("device", default="0"))) # device id - sdrProc.addArgument("-f " + sdrConfig.get("frequency")) # frequencies - sdrProc.addArgument("-p " + str(sdrConfig.get("error", default="0"))) # frequency error in ppm - sdrProc.addArgument("-l " + str(sdrConfig.get("squelch", default="1"))) # squelch - sdrProc.addArgument("-g " + str(sdrConfig.get("gain", default="100"))) # gain - sdrProc.addArgument("-M fm") # set mode to fm - sdrProc.addArgument("-E DC") # set DC filter - sdrProc.addArgument("-s 22050") # bit rate of audio stream - sdrProc.setStderr(open(paths.LOG_PATH + "rtl_fm.log", "a")) - sdrProc.start() - # sdrProc.skipLinesUntil("Output at") - - mmProc = ProcessManager(str(sdrConfig.get("mmPath", default="multimon-ng")), textMode=True) - if decoderConfig.get("fms", default=0): - mmProc.addArgument("-a FMSFSK") - if decoderConfig.get("zvei", default=0): - mmProc.addArgument("-a ZVEI1") - if decoderConfig.get("poc512", default=0): - mmProc.addArgument("-a POCSAG512") - if decoderConfig.get("poc1200", default=0): - mmProc.addArgument("-a POCSAG1200") - if decoderConfig.get("poc2400", default=0): - mmProc.addArgument("-a POCSAG2400") - mmProc.addArgument("-f alpha") - mmProc.addArgument("-t raw -") - mmProc.setStdin(sdrProc.stdout) - mmProc.setStderr(open(paths.LOG_PATH + "multimon-ng.log", "a")) - mmProc.start() - # mmProc.skipLinesUntil("Available demodulators:") - - logging.info("start decoding") - while inputThreadRunning: - if not sdrProc.isRunning: - logging.warning("rtl_fm was down - try to restart") - sdrProc.start() - # sdrProc.skipLinesUntil("Output at") # last line form rtl_fm before data - elif not mmProc.isRunning: - logging.warning("multimon was down - try to restart") - mmProc.start() - # mmProc.skipLinesUntil("Available demodulators:") # last line from mm before data - elif sdrProc.isRunning and mmProc.isRunning: - line = mmProc.readline() - if line: - dataQueue.put_nowait((line, time.time())) - logging.debug("Add data to queue") - print(line) - logging.debug("stopping thread") - mmProc.stop() - sdrProc.stop() - # ========== INPUT CODE ========== - - inputQueue = queue.Queue() - if not args.test: - inputThreadRunning = True - mmThread = threading.Thread(target=handleSDRInput, name="mmReader", - args=(inputQueue, bwConfig.get("inputSource", "sdr"), bwConfig.get("decoder"))) - mmThread.daemon = True - mmThread.start() + logging.debug("loading input source: %s", bwConfig.get("client", "inputSource")) + if bwConfig.get("client", "inputSource") == "sdr": + inputSource = SdrInput(inputQueue, bwConfig.get("inputSource", "sdr"), bwConfig.get("decoder")) + else: + logging.fatal("Invalid input source: %s", bwConfig.get("client", "inputSource")) + exit(1) + + inputSource.start() else: logging.warning("STARTING TESTMODE!") logging.debug("reading testdata from file") @@ -200,9 +146,8 @@ except: # pragma: no cover logging.exception("BOSWatch interrupted by an error") finally: logging.debug("Starting shutdown routine") + if inputSource: + inputSource.shutdown() if bwClient: bwClient.disconnect() - inputThreadRunning = False - if mmThread: - mmThread.join() logging.debug("BOSWatch client has stopped ...")