diff --git a/_demo_procMan.py b/_demo_procMan.py new file mode 100644 index 0000000..d407ad1 --- /dev/null +++ b/_demo_procMan.py @@ -0,0 +1,38 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +""" + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll +""" + +from boswatch.processManager import ProcessManager +from boswatch.decoder.decoder import Decoder +import logging.config +logging.config.fileConfig("config/logger_client.ini") + +sdrProc = ProcessManager("/usr/bin/rtl_fm") +sdrProc.addArgument("-f 85M") +sdrProc.addArgument("-m fm") +sdrProc.start(True) + +mmProc = ProcessManager("/opt/multimon/multimon-ng", textMode=True) +# mmProc.addArgument("-i") +# mmProc.addArgument("-a POCSAG1200 -a FMSFSK -a ZVEI1") +mmProc.addArgument("-f aplha") +mmProc.addArgument("-t raw /dev/stdin -") +mmProc.setStdin(sdrProc.stdout) +mmProc.start(True) +mmProc.skipLines(5) +while 1: + if not mmProc.isRunning: + logging.warning("multimon was down - try to restart") + mmProc.start() + mmProc.skipLines(5) + line = mmProc.readline() + if line: + print(line) diff --git a/boswatch/configYaml.py b/boswatch/configYaml.py index 5e7239f..5a0cab2 100644 --- a/boswatch/configYaml.py +++ b/boswatch/configYaml.py @@ -34,6 +34,7 @@ class ConfigYAML: yield item def __str__(self): + """!Returns the string representation of the internal config dict""" return str(self._config) def loadConfigFile(self, configPath): @@ -54,6 +55,12 @@ class ConfigYAML: return False def get(self, *args, default=None): + """!Get a single value from the config + or a value set in a new configYAML class instance + + @param *args: Config section (one ore more strings) + @param default: Default value if section not found (None) + @return: A single value, a value set in an configYAML instance, the default value""" tmp = self._config try: for arg in args: diff --git a/boswatch/decoder/decoder.py b/boswatch/decoder/decoder.py index 5baca9b..50ae598 100644 --- a/boswatch/decoder/decoder.py +++ b/boswatch/decoder/decoder.py @@ -16,9 +16,9 @@ """ import logging -from boswatch.decoder.fmsdecoder import FmsDecoder -from boswatch.decoder.pocsagdecoder import PocsagDecoder -from boswatch.decoder.zveidecoder import ZveiDecoder +from boswatch.decoder.fmsDecoder import FmsDecoder +from boswatch.decoder.pocsagDecoder import PocsagDecoder +from boswatch.decoder.zveiDecoder import ZveiDecoder logging.debug("- %s loaded", __name__) @@ -32,6 +32,7 @@ class Decoder: @param data: data to decode @return bwPacket instance""" logging.debug("search decoder") + data = str(data) if "FMS" in data: return FmsDecoder.decode(data) elif "POCSAG" in data: diff --git a/boswatch/decoder/fmsdecoder.py b/boswatch/decoder/fmsDecoder.py similarity index 100% rename from boswatch/decoder/fmsdecoder.py rename to boswatch/decoder/fmsDecoder.py diff --git a/boswatch/decoder/pocsagdecoder.py b/boswatch/decoder/pocsagDecoder.py similarity index 95% rename from boswatch/decoder/pocsagdecoder.py rename to boswatch/decoder/pocsagDecoder.py index 1264953..fdd7634 100644 --- a/boswatch/decoder/pocsagdecoder.py +++ b/boswatch/decoder/pocsagDecoder.py @@ -70,20 +70,20 @@ class PocsagDecoder: @return bitrate @return ric @return subric""" - bitrate, ric, subric = 0, 0, 0 + bitrate, ric, subric = "0", "0", "0" if "POCSAG512:" in data: - bitrate = 512 + bitrate = "512" ric = data[20:27].replace(" ", "").zfill(7) subric = str(int(data[39]) + 1) elif "POCSAG1200:" in data: - bitrate = 1200 + bitrate = "1200" ric = data[21:28].replace(" ", "").zfill(7) subric = str(int(data[40]) + 1) elif "POCSAG2400:" in data: - bitrate = 2400 + bitrate = "2400" ric = data[21:28].replace(" ", "").zfill(7) subric = str(int(data[40]) + 1) diff --git a/boswatch/decoder/zveidecoder.py b/boswatch/decoder/zveiDecoder.py similarity index 100% rename from boswatch/decoder/zveidecoder.py rename to boswatch/decoder/zveiDecoder.py diff --git a/boswatch/network/client.py b/boswatch/network/client.py index 808ae51..7f9417a 100644 --- a/boswatch/network/client.py +++ b/boswatch/network/client.py @@ -103,5 +103,9 @@ class TCPClient: def isConnected(self): """!Property of client connected state""" if self._sock: - return True + try: + self._sock.sendall(bytes("", "utf-8")) + return True + except AttributeError: + pass return False diff --git a/boswatch/packet.py b/boswatch/packet.py index a0fcce8..815d30b 100644 --- a/boswatch/packet.py +++ b/boswatch/packet.py @@ -16,7 +16,6 @@ """ import logging import time -from boswatch import version logging.debug("- %s loaded", __name__) @@ -58,38 +57,6 @@ class Packet: logging.warning("field not found: %s", fieldName) return None - def addClientData(self, config): - """!Add the client information to the decoded data - - This function adds the following data to the bwPacket: - - clientName - - clientVersion - - clientBuildDate - - clientBranch - - inputSource - - frequency""" - logging.debug("add client data to bwPacket") - self.set("clientName", config.get("client", "name")) - self.set("clientVersion", version.client) - self.set("clientBuildDate", version.date) - self.set("clientBranch", version.branch) - self.set("inputSource", config.get("client", "inputSource")) - self.set("frequency", config.get("inputSource", "sdr", "frequency")) - - def addServerData(self, config): - """!Add the server information to the decoded data - - This function adds the following data to the bwPacket: - - serverName - - serverVersion - - serverBuildDate - - serverBranch""" - logging.debug("add server data to bwPacket") - self.set("serverName", config.get("server", "name")) - self.set("serverVersion", version.server) - self.set("serverBuildDate", version.date) - self.set("serverBranch", version.branch) - def printInfo(self): """!Print a info message to the log on INFO level. Contains the most useful info about this packet. diff --git a/boswatch/processManager.py b/boswatch/processManager.py new file mode 100644 index 0000000..16b0dcf --- /dev/null +++ b/boswatch/processManager.py @@ -0,0 +1,127 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: processManager.py +@date: 04.03.2018 +@author: Bastian Schroll +@description: Class for managing sub processes +""" +import logging +import subprocess + +logging.debug("- %s loaded", __name__) + + +class ProcessManager: + """!class to manage a extern sub process""" + def __init__(self, process, textMode=False): + logging.debug("create process instance %s - textMode: %s", process, textMode) + self._args = [] + self._args.append(process) + self._stdin = None + self._stdout = subprocess.PIPE + self._stderr = subprocess.STDOUT + self._processHandle = None + self._textMode = textMode + + def addArgument(self, arg): + """!add a new argument + + @param arg: argument to add as string""" + logging.debug("add argument to process: %s -> %s", self._args[0], arg) + for splitArg in arg.split(): + self._args.append(splitArg) + + def clearArguments(self): + """!clear all arguments""" + self._args = self._args[0:1] # kept first element (process name) + + def start(self, startAsShell=False): + """!start the new process + + @return: True or False""" + logging.debug("start new process: %s %s", self._args[0], self._args[1:]) + try: + self._processHandle = subprocess.Popen(self._args, + stdin=self._stdin, + stdout=self._stdout, + stderr=self._stderr, + universal_newlines=self._textMode, + shell=startAsShell) + if not self.isRunning: + logging.error("cannot start") + return False + logging.debug("process started with PID %d", self._processHandle.pid) + return True + + except FileNotFoundError: + logging.error("File not found: %s", self._args[0]) + return False + + def stop(self): + """!Stop the process by sending SIGTERM and wait for ending""" + logging.debug("stopping process: %s", self._args[0]) + if self.isRunning: + self._processHandle.terminate() + while self.isRunning: + pass + logging.debug("process %s returned %d", self._args[0], self._processHandle.returncode) + + def readline(self): + """!Read one line from stdout stream + + @return singe line or None""" + if self.isRunning and self._stdout is not None: + try: + line = self._processHandle.stdout.readline().strip() + except UnicodeDecodeError: + return None + + return line + return None + + def skipLines(self, line_cnt=1): + logging.debug("skip %d lines from output", line_cnt) + while line_cnt: + self.readline() + line_cnt -= 1 + + def setStdin(self, stdin): + """!Set the stdin stream instance""" + self._stdin = stdin + + def setStdout(self, stdout): + """!Set the stdout stream instance""" + self._stdout = stdout + + def setStderr(self, stderr): + """!Set the stderr stream instance""" + self._stderr = stderr + + @property + def stdout(self): + """!Property to get the stdout stream""" + return self._processHandle.stdout + + @property + def stderr(self): + """!Property to get the stderr stream""" + return self._processHandle.stderr + + @property + def isRunning(self): + """!Property to get process running state + + @return True or False""" + if self._processHandle: + if self._processHandle.poll() is None: + return True + return False diff --git a/boswatch/router/router.py b/boswatch/router/router.py index 599e9b2..9f84b19 100644 --- a/boswatch/router/router.py +++ b/boswatch/router/router.py @@ -58,7 +58,7 @@ class Router: bwPacket = bwPacket_tmp logging.debug("[%s] <- bwPacket returned: %s", self._name, bwPacket) - logging.debug("[%s] ended", self._name) + logging.debug("[%s] finished", self._name) return bwPacket @property diff --git a/boswatch/utils/timer.py b/boswatch/timer.py similarity index 100% rename from boswatch/utils/timer.py rename to boswatch/timer.py diff --git a/boswatch/utils/header.py b/boswatch/utils/header.py index 38cbc90..2b969f7 100644 --- a/boswatch/utils/header.py +++ b/boswatch/utils/header.py @@ -17,7 +17,7 @@ import logging import platform # for python version nr -import boswatch.version +from boswatch.utils import version logging.debug("- %s loaded", __name__) @@ -43,19 +43,19 @@ def infoToLog(): @return True or False on error""" logging.debug("BOSWatch and environment information") logging.debug("- Client version: %d.%d.%d", - boswatch.version.client["major"], - boswatch.version.client["minor"], - boswatch.version.client["patch"]) + version.client["major"], + version.client["minor"], + version.client["patch"]) logging.debug("- Server version: %d.%d.%d", - boswatch.version.server["major"], - boswatch.version.server["minor"], - boswatch.version.server["patch"]) + version.server["major"], + version.server["minor"], + version.server["patch"]) logging.debug("- Branch: %s", - boswatch.version.branch) + version.branch) logging.debug("- Release date: %02d.%02d.%4d", - boswatch.version.date["day"], - boswatch.version.date["month"], - boswatch.version.date["year"]) + version.date["day"], + version.date["month"], + version.date["year"]) logging.debug("- Python version: %s", platform.python_version()) logging.debug("- Python build: %s", platform.python_build()) logging.debug("- System: %s", platform.system()) diff --git a/boswatch/utils/misc.py b/boswatch/utils/misc.py new file mode 100644 index 0000000..c146f33 --- /dev/null +++ b/boswatch/utils/misc.py @@ -0,0 +1,54 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: misc.py +@date: 11.03.2019 +@author: Bastian Schroll +@description: Some misc functions +""" +import logging +from boswatch.utils import version + +logging.debug("- %s loaded", __name__) + + +def addClientDataToPacket(bwPacket, config): + """!Add the client information to the decoded data + + This function adds the following data to the bwPacket: + - clientName + - clientVersion + - clientBuildDate + - clientBranch + - inputSource + - frequency""" + logging.debug("add client data to bwPacket") + bwPacket.set("clientName", config.get("client", "name")) + bwPacket.set("clientVersion", version.client) + bwPacket.set("clientBuildDate", version.date) + bwPacket.set("clientBranch", version.branch) + bwPacket.set("inputSource", config.get("client", "inoutSource")) + bwPacket.set("frequency", config.get("inputSource", "sdr", "frequency")) + + +def addServerDataToPacket(bwPacket, config): + """!Add the server information to the decoded data + + This function adds the following data to the bwPacket: + - serverName + - serverVersion + - serverBuildDate + - serverBranch""" + logging.debug("add server data to bwPacket") + bwPacket.set("serverName", config.get("server", "name")) + bwPacket.set("serverVersion", version.server) + bwPacket.set("serverBuildDate", version.date) + bwPacket.set("serverBranch", version.branch) diff --git a/boswatch/version.py b/boswatch/utils/version.py similarity index 100% rename from boswatch/version.py rename to boswatch/utils/version.py diff --git a/boswatch/utils/wildcard.py b/boswatch/wildcard.py similarity index 100% rename from boswatch/utils/wildcard.py rename to boswatch/wildcard.py diff --git a/bw_client.py b/bw_client.py index 8e52634..aed6701 100644 --- a/bw_client.py +++ b/bw_client.py @@ -32,8 +32,10 @@ logging.debug("BOSWatch client has started ...") logging.debug("Import python modules") import argparse logging.debug("- argparse") -import subprocess -logging.debug("- subprocess") +import threading +logging.debug("- threading") +import queue +logging.debug("- queue") import time logging.debug("- time") @@ -41,8 +43,10 @@ logging.debug("Import BOSWatch modules") from boswatch.configYaml import ConfigYAML from boswatch.network.client import TCPClient from boswatch.network.broadcast import BroadcastClient +from boswatch.processManager import ProcessManager from boswatch.decoder.decoder import Decoder from boswatch.utils import header +from boswatch.utils import misc header.logoToLog() @@ -56,6 +60,7 @@ parser = argparse.ArgumentParser(prog="bw_client.py", epilog="""More options you can find in the extern client.ini file in the folder /config""") parser.add_argument("-c", "--config", help="Name to configuration File", required=True) +parser.add_argument("-t", "--test", help="Start Client with testdata-set") args = parser.parse_args() bwConfig = ConfigYAML() @@ -63,10 +68,8 @@ if not bwConfig.loadConfigFile(paths.CONFIG_PATH + args.config): logging.error("cannot load config file") exit(1) - -# ############################# begin client system +# ========== CLIENT CODE ========== try: - ip = bwConfig.get("server", "ip", default="127.0.0.1") port = bwConfig.get("server", "port", default="8080") @@ -76,41 +79,75 @@ try: ip = broadcastClient.serverIP port = broadcastClient.serverPort - bwClient = TCPClient() - if bwClient.connect(ip, port): + inputQueue = queue.Queue() - testFile = open(paths.TEST_PATH + "testdata.list", "r") + # ========== INPUT CODE ========== + def handleSDRInput(dataQueue, config): + sdrProc = ProcessManager("/usr/bin/rtl_fm") + sdrProc.addArgument("-f 85M") + sdrProc.addArgument("-m fm") + sdrProc.start(True) + mmProc = ProcessManager("/opt/multimon/multimon-ng", textMode=True) + # mmProc.addArgument("-i") + # mmProc.addArgument("-a POCSAG1200 -a FMSFSK -a ZVEI1") + mmProc.addArgument("-f aplha") + mmProc.addArgument("-t raw /dev/stdin -") + mmProc.setStdin(sdrProc.stdout) + # mmProc.addArgument("./poc1200.raw") + mmProc.start(True) + mmProc.skipLines(5) while 1: + if not mmProc.isRunning: + logging.warning("multimon was down - try to restart") + mmProc.start() + mmProc.skipLines(5) + line = mmProc.readline() + if line: + dataQueue.put_nowait((line, time.time())) + logging.debug("Add data to queue") + print(line) + # ========== INPUT CODE ========== - for testData in testFile: + mmThread = threading.Thread(target=handleSDRInput, name="mmReader", args=(inputQueue, bwConfig.get("inputSource", "sdr"))) + mmThread.daemon = True + mmThread.start() - if (len(testData.rstrip(' \t\n\r')) == 0) or ("#" in testData[0]): - continue + bwClient = TCPClient() + bwClient.connect(ip, port) + while 1: - logging.debug("Test: %s", testData) - bwPacket = Decoder.decode(testData) + if not bwClient.isConnected: + logging.warning("connection to server lost - sleep %d seconds", bwConfig.get("client", "reconnectDelay", default="3")) + time.sleep(bwConfig.get("client", "reconnectDelay", default="3")) + bwClient.connect(ip, port) - if bwPacket: - bwPacket.printInfo() - bwPacket.addClientData(bwConfig) - bwClient.transmit(str(bwPacket)) + elif not inputQueue.empty(): + data = inputQueue.get() + logging.info("get data from queue (waited %0.3f sec.)", time.time() - data[1]) + logging.debug("%s packet(s) still waiting in queue", inputQueue.qsize()) - # todo should we do this in an thread, to not block receiving ??? but then we should use transmit() and receive() with Lock() - failedTransmits = 0 - while not bwClient.receive() == "[ack]": # wait for ack or timeout - if failedTransmits >= 3: - logging.error("cannot transmit after 3 retires") - break - failedTransmits += 1 - logging.warning("attempt %d to resend packet", failedTransmits) - bwClient.transmit(str(bwPacket)) # try to resend + bwPacket = Decoder.decode(data[0]) + inputQueue.task_done() + if bwPacket is None: + continue + + bwPacket.printInfo() + misc.addClientDataToPacket(bwPacket, bwConfig) + + for sendCnt in range(bwConfig.get("client", "sendTries", default="3")): + bwClient.transmit(str(bwPacket)) + if bwClient.receive() == "[ack-]": logging.debug("ack ok") + break + logging.warning("cannot send packet - sleep %d seconds", bwConfig.get("client", "sendDelay", default="3")) + time.sleep(bwConfig.get("client", "sendDelay", default="3")) + + else: + time.sleep(0.1) # reduce cpu load (wait 100ms) + # in worst case a packet have to wait 100ms until it will be processed - bwClient.disconnect() - break -# test for server #################################### except KeyboardInterrupt: # pragma: no cover logging.warning("Keyboard interrupt") @@ -118,5 +155,7 @@ except SystemExit: # pragma: no cover logging.error("BOSWatch interrupted by an error") except: # pragma: no cover logging.exception("BOSWatch interrupted by an error") -finally: # pragma: no cover - logging.debug("BOSWatch has ended ...") +finally: + logging.debug("Starting shutdown routine") + bwClient.disconnect() + logging.debug("BOSWatch client has stopped ...") diff --git a/bw_server.py b/bw_server.py index 833f456..5564154 100644 --- a/bw_server.py +++ b/bw_server.py @@ -44,6 +44,7 @@ from boswatch.packet import Packet from boswatch.utils import header from boswatch.network.broadcast import BroadcastServer from boswatch.router.routerManager import RouterManager +from boswatch.utils import misc header.logoToLog() @@ -93,7 +94,7 @@ try: bwPacket = Packet((data[1])) bwPacket.set("clientIP", data[0]) - bwPacket.addServerData(bwConfig) + misc.addServerDataToPacket(bwPacket, bwConfig) bwRoutMan.runRouter(bwConfig.get("alarmRouter"), bwPacket) @@ -105,23 +106,9 @@ except SystemExit: # pragma: no cover logging.error("BOSWatch interrupted by an error") except: # pragma: no cover logging.exception("BOSWatch interrupted by an error") -finally: # pragma: no cover - +finally: logging.debug("Starting shutdown routine") del bwRoutMan - - try: - bwServer.stop() - except NameError: - pass - except: - raise - - try: - bcServer.stop() - except NameError: - pass - except: - raise - - logging.debug("BOSWatch has ended ...") + bwServer.stop() + bcServer.stop() + logging.debug("BOSWatch server has stopped ...") diff --git a/config/client.yaml b/config/client.yaml index a3e33c1..5f84774 100644 --- a/config/client.yaml +++ b/config/client.yaml @@ -11,6 +11,9 @@ client: name: BW3 Client # name of the BW3 Client instance inputSource: sdr # atm only 'sdr' is possible useBroadcast: no # use broadcast to find server automatically + reconnectDelay: 3 # time in seconds to delay reconnect try + sendTries: 3 # how often should tried to send a packet + sendDelay: 3 # time in seconds to delay the resend try server: # only used if useBroadcast = no ip: 127.0.0.1 diff --git a/docu/docs/config.md b/docu/docs/config.md index d15cb35..1f0f116 100644 --- a/docu/docs/config.md +++ b/docu/docs/config.md @@ -11,8 +11,11 @@ zwingend in die Konfiguration eingetragen werden. |Feld|Beschreibung|Default| |----|------------|-------| |name|Name zur Identifizierung der Client Instanz|| -|inputSource|Art der zu nutzenden Input Quelle (aktuell nur `sdr`)|| +|inputSource|Art der zu nutzenden Input Quelle (aktuell nur `sdr`)|sdr| |useBroadcast|Verbindungsdaten per [Broadcast](information/broadcast.md) beziehen|no| +|reconnectDelay|Verzögerung für erneuten Verbindungsversuch zum Server|3| +|sendTries|Anzahl der Sendeversuche eines Pakets|3| +|sendDelay|Verzögerung für einen erneuten Sendeversuch|3| --- ### `server:` @@ -103,8 +106,8 @@ Jeder Router kann eine beliebige Anzahl einzelner Routenpunkte enthalten. Diese |Feld|Beschreibung|Default| |----|------------|-------| |type|Art des Routenpunktes (module, plugin, router)|| -|name|Zu ladende Resource (Siehe weiter unten)|| -|config|Konfigurationseinstellungen des Routenpunktes (Siehe weiter unten)|| +|name|Zu ladende Resource (Siehe entsprechende Kapitel)|| +|config|Konfigurationseinstellungen des Routenpunktes (Siehe entsprechende Kapitel)|| **Beispiel:** ```yaml @@ -121,4 +124,6 @@ router: --- ## Module/Plugins -Für die Konfiguration der Module und Plugins ist in den entsprechenden Kategorien eine ausführliche Beschreibung zu finden. +|Feld|Beschreibung|Default| +|----|------------|-------| +|allowed|Liste der erlaubten Paket Typen `fms` `zvei` `pocsag`|| diff --git a/docu/docs/index.md b/docu/docs/index.md index f62eb8c..a6840b4 100644 --- a/docu/docs/index.md +++ b/docu/docs/index.md @@ -1,7 +1,13 @@ #