Merge pull request #16 from BOSWatch/process_manager

Merge some changes
This commit is contained in:
Bastian Schroll 2019-10-08 20:31:16 +02:00 committed by GitHub
commit 055acfeca5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 360 additions and 113 deletions

38
_demo_procMan.py Normal file
View file

@ -0,0 +1,38 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
"""
from boswatch.processManager import ProcessManager
from boswatch.decoder.decoder import Decoder
import logging.config
logging.config.fileConfig("config/logger_client.ini")
sdrProc = ProcessManager("/usr/bin/rtl_fm")
sdrProc.addArgument("-f 85M")
sdrProc.addArgument("-m fm")
sdrProc.start(True)
mmProc = ProcessManager("/opt/multimon/multimon-ng", textMode=True)
# mmProc.addArgument("-i")
# mmProc.addArgument("-a POCSAG1200 -a FMSFSK -a ZVEI1")
mmProc.addArgument("-f aplha")
mmProc.addArgument("-t raw /dev/stdin -")
mmProc.setStdin(sdrProc.stdout)
mmProc.start(True)
mmProc.skipLines(5)
while 1:
if not mmProc.isRunning:
logging.warning("multimon was down - try to restart")
mmProc.start()
mmProc.skipLines(5)
line = mmProc.readline()
if line:
print(line)

View file

@ -34,6 +34,7 @@ class ConfigYAML:
yield item
def __str__(self):
"""!Returns the string representation of the internal config dict"""
return str(self._config)
def loadConfigFile(self, configPath):
@ -54,6 +55,12 @@ class ConfigYAML:
return False
def get(self, *args, default=None):
"""!Get a single value from the config
or a value set in a new configYAML class instance
@param *args: Config section (one ore more strings)
@param default: Default value if section not found (None)
@return: A single value, a value set in an configYAML instance, the default value"""
tmp = self._config
try:
for arg in args:

View file

@ -16,9 +16,9 @@
"""
import logging
from boswatch.decoder.fmsdecoder import FmsDecoder
from boswatch.decoder.pocsagdecoder import PocsagDecoder
from boswatch.decoder.zveidecoder import ZveiDecoder
from boswatch.decoder.fmsDecoder import FmsDecoder
from boswatch.decoder.pocsagDecoder import PocsagDecoder
from boswatch.decoder.zveiDecoder import ZveiDecoder
logging.debug("- %s loaded", __name__)
@ -32,6 +32,7 @@ class Decoder:
@param data: data to decode
@return bwPacket instance"""
logging.debug("search decoder")
data = str(data)
if "FMS" in data:
return FmsDecoder.decode(data)
elif "POCSAG" in data:

View file

@ -70,20 +70,20 @@ class PocsagDecoder:
@return bitrate
@return ric
@return subric"""
bitrate, ric, subric = 0, 0, 0
bitrate, ric, subric = "0", "0", "0"
if "POCSAG512:" in data:
bitrate = 512
bitrate = "512"
ric = data[20:27].replace(" ", "").zfill(7)
subric = str(int(data[39]) + 1)
elif "POCSAG1200:" in data:
bitrate = 1200
bitrate = "1200"
ric = data[21:28].replace(" ", "").zfill(7)
subric = str(int(data[40]) + 1)
elif "POCSAG2400:" in data:
bitrate = 2400
bitrate = "2400"
ric = data[21:28].replace(" ", "").zfill(7)
subric = str(int(data[40]) + 1)

View file

@ -103,5 +103,9 @@ class TCPClient:
def isConnected(self):
"""!Property of client connected state"""
if self._sock:
return True
try:
self._sock.sendall(bytes("", "utf-8"))
return True
except AttributeError:
pass
return False

View file

@ -16,7 +16,6 @@
"""
import logging
import time
from boswatch import version
logging.debug("- %s loaded", __name__)
@ -58,38 +57,6 @@ class Packet:
logging.warning("field not found: %s", fieldName)
return None
def addClientData(self, config):
"""!Add the client information to the decoded data
This function adds the following data to the bwPacket:
- clientName
- clientVersion
- clientBuildDate
- clientBranch
- inputSource
- frequency"""
logging.debug("add client data to bwPacket")
self.set("clientName", config.get("client", "name"))
self.set("clientVersion", version.client)
self.set("clientBuildDate", version.date)
self.set("clientBranch", version.branch)
self.set("inputSource", config.get("client", "inputSource"))
self.set("frequency", config.get("inputSource", "sdr", "frequency"))
def addServerData(self, config):
"""!Add the server information to the decoded data
This function adds the following data to the bwPacket:
- serverName
- serverVersion
- serverBuildDate
- serverBranch"""
logging.debug("add server data to bwPacket")
self.set("serverName", config.get("server", "name"))
self.set("serverVersion", version.server)
self.set("serverBuildDate", version.date)
self.set("serverBranch", version.branch)
def printInfo(self):
"""!Print a info message to the log on INFO level.
Contains the most useful info about this packet.

127
boswatch/processManager.py Normal file
View file

@ -0,0 +1,127 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: processManager.py
@date: 04.03.2018
@author: Bastian Schroll
@description: Class for managing sub processes
"""
import logging
import subprocess
logging.debug("- %s loaded", __name__)
class ProcessManager:
"""!class to manage a extern sub process"""
def __init__(self, process, textMode=False):
logging.debug("create process instance %s - textMode: %s", process, textMode)
self._args = []
self._args.append(process)
self._stdin = None
self._stdout = subprocess.PIPE
self._stderr = subprocess.STDOUT
self._processHandle = None
self._textMode = textMode
def addArgument(self, arg):
"""!add a new argument
@param arg: argument to add as string"""
logging.debug("add argument to process: %s -> %s", self._args[0], arg)
for splitArg in arg.split():
self._args.append(splitArg)
def clearArguments(self):
"""!clear all arguments"""
self._args = self._args[0:1] # kept first element (process name)
def start(self, startAsShell=False):
"""!start the new process
@return: True or False"""
logging.debug("start new process: %s %s", self._args[0], self._args[1:])
try:
self._processHandle = subprocess.Popen(self._args,
stdin=self._stdin,
stdout=self._stdout,
stderr=self._stderr,
universal_newlines=self._textMode,
shell=startAsShell)
if not self.isRunning:
logging.error("cannot start")
return False
logging.debug("process started with PID %d", self._processHandle.pid)
return True
except FileNotFoundError:
logging.error("File not found: %s", self._args[0])
return False
def stop(self):
"""!Stop the process by sending SIGTERM and wait for ending"""
logging.debug("stopping process: %s", self._args[0])
if self.isRunning:
self._processHandle.terminate()
while self.isRunning:
pass
logging.debug("process %s returned %d", self._args[0], self._processHandle.returncode)
def readline(self):
"""!Read one line from stdout stream
@return singe line or None"""
if self.isRunning and self._stdout is not None:
try:
line = self._processHandle.stdout.readline().strip()
except UnicodeDecodeError:
return None
return line
return None
def skipLines(self, line_cnt=1):
logging.debug("skip %d lines from output", line_cnt)
while line_cnt:
self.readline()
line_cnt -= 1
def setStdin(self, stdin):
"""!Set the stdin stream instance"""
self._stdin = stdin
def setStdout(self, stdout):
"""!Set the stdout stream instance"""
self._stdout = stdout
def setStderr(self, stderr):
"""!Set the stderr stream instance"""
self._stderr = stderr
@property
def stdout(self):
"""!Property to get the stdout stream"""
return self._processHandle.stdout
@property
def stderr(self):
"""!Property to get the stderr stream"""
return self._processHandle.stderr
@property
def isRunning(self):
"""!Property to get process running state
@return True or False"""
if self._processHandle:
if self._processHandle.poll() is None:
return True
return False

View file

@ -58,7 +58,7 @@ class Router:
bwPacket = bwPacket_tmp
logging.debug("[%s] <- bwPacket returned: %s", self._name, bwPacket)
logging.debug("[%s] ended", self._name)
logging.debug("[%s] finished", self._name)
return bwPacket
@property

View file

@ -17,7 +17,7 @@
import logging
import platform # for python version nr
import boswatch.version
from boswatch.utils import version
logging.debug("- %s loaded", __name__)
@ -43,19 +43,19 @@ def infoToLog():
@return True or False on error"""
logging.debug("BOSWatch and environment information")
logging.debug("- Client version: %d.%d.%d",
boswatch.version.client["major"],
boswatch.version.client["minor"],
boswatch.version.client["patch"])
version.client["major"],
version.client["minor"],
version.client["patch"])
logging.debug("- Server version: %d.%d.%d",
boswatch.version.server["major"],
boswatch.version.server["minor"],
boswatch.version.server["patch"])
version.server["major"],
version.server["minor"],
version.server["patch"])
logging.debug("- Branch: %s",
boswatch.version.branch)
version.branch)
logging.debug("- Release date: %02d.%02d.%4d",
boswatch.version.date["day"],
boswatch.version.date["month"],
boswatch.version.date["year"])
version.date["day"],
version.date["month"],
version.date["year"])
logging.debug("- Python version: %s", platform.python_version())
logging.debug("- Python build: %s", platform.python_build())
logging.debug("- System: %s", platform.system())

54
boswatch/utils/misc.py Normal file
View file

@ -0,0 +1,54 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: misc.py
@date: 11.03.2019
@author: Bastian Schroll
@description: Some misc functions
"""
import logging
from boswatch.utils import version
logging.debug("- %s loaded", __name__)
def addClientDataToPacket(bwPacket, config):
"""!Add the client information to the decoded data
This function adds the following data to the bwPacket:
- clientName
- clientVersion
- clientBuildDate
- clientBranch
- inputSource
- frequency"""
logging.debug("add client data to bwPacket")
bwPacket.set("clientName", config.get("client", "name"))
bwPacket.set("clientVersion", version.client)
bwPacket.set("clientBuildDate", version.date)
bwPacket.set("clientBranch", version.branch)
bwPacket.set("inputSource", config.get("client", "inoutSource"))
bwPacket.set("frequency", config.get("inputSource", "sdr", "frequency"))
def addServerDataToPacket(bwPacket, config):
"""!Add the server information to the decoded data
This function adds the following data to the bwPacket:
- serverName
- serverVersion
- serverBuildDate
- serverBranch"""
logging.debug("add server data to bwPacket")
bwPacket.set("serverName", config.get("server", "name"))
bwPacket.set("serverVersion", version.server)
bwPacket.set("serverBuildDate", version.date)
bwPacket.set("serverBranch", version.branch)

View file

@ -32,8 +32,10 @@ logging.debug("BOSWatch client has started ...")
logging.debug("Import python modules")
import argparse
logging.debug("- argparse")
import subprocess
logging.debug("- subprocess")
import threading
logging.debug("- threading")
import queue
logging.debug("- queue")
import time
logging.debug("- time")
@ -41,8 +43,10 @@ logging.debug("Import BOSWatch modules")
from boswatch.configYaml import ConfigYAML
from boswatch.network.client import TCPClient
from boswatch.network.broadcast import BroadcastClient
from boswatch.processManager import ProcessManager
from boswatch.decoder.decoder import Decoder
from boswatch.utils import header
from boswatch.utils import misc
header.logoToLog()
@ -56,6 +60,7 @@ parser = argparse.ArgumentParser(prog="bw_client.py",
epilog="""More options you can find in the extern client.ini
file in the folder /config""")
parser.add_argument("-c", "--config", help="Name to configuration File", required=True)
parser.add_argument("-t", "--test", help="Start Client with testdata-set")
args = parser.parse_args()
bwConfig = ConfigYAML()
@ -63,10 +68,8 @@ if not bwConfig.loadConfigFile(paths.CONFIG_PATH + args.config):
logging.error("cannot load config file")
exit(1)
# ############################# begin client system
# ========== CLIENT CODE ==========
try:
ip = bwConfig.get("server", "ip", default="127.0.0.1")
port = bwConfig.get("server", "port", default="8080")
@ -76,41 +79,75 @@ try:
ip = broadcastClient.serverIP
port = broadcastClient.serverPort
bwClient = TCPClient()
if bwClient.connect(ip, port):
inputQueue = queue.Queue()
testFile = open(paths.TEST_PATH + "testdata.list", "r")
# ========== INPUT CODE ==========
def handleSDRInput(dataQueue, config):
sdrProc = ProcessManager("/usr/bin/rtl_fm")
sdrProc.addArgument("-f 85M")
sdrProc.addArgument("-m fm")
sdrProc.start(True)
mmProc = ProcessManager("/opt/multimon/multimon-ng", textMode=True)
# mmProc.addArgument("-i")
# mmProc.addArgument("-a POCSAG1200 -a FMSFSK -a ZVEI1")
mmProc.addArgument("-f aplha")
mmProc.addArgument("-t raw /dev/stdin -")
mmProc.setStdin(sdrProc.stdout)
# mmProc.addArgument("./poc1200.raw")
mmProc.start(True)
mmProc.skipLines(5)
while 1:
if not mmProc.isRunning:
logging.warning("multimon was down - try to restart")
mmProc.start()
mmProc.skipLines(5)
line = mmProc.readline()
if line:
dataQueue.put_nowait((line, time.time()))
logging.debug("Add data to queue")
print(line)
# ========== INPUT CODE ==========
for testData in testFile:
mmThread = threading.Thread(target=handleSDRInput, name="mmReader", args=(inputQueue, bwConfig.get("inputSource", "sdr")))
mmThread.daemon = True
mmThread.start()
if (len(testData.rstrip(' \t\n\r')) == 0) or ("#" in testData[0]):
continue
bwClient = TCPClient()
bwClient.connect(ip, port)
while 1:
logging.debug("Test: %s", testData)
bwPacket = Decoder.decode(testData)
if not bwClient.isConnected:
logging.warning("connection to server lost - sleep %d seconds", bwConfig.get("client", "reconnectDelay", default="3"))
time.sleep(bwConfig.get("client", "reconnectDelay", default="3"))
bwClient.connect(ip, port)
if bwPacket:
bwPacket.printInfo()
bwPacket.addClientData(bwConfig)
bwClient.transmit(str(bwPacket))
elif not inputQueue.empty():
data = inputQueue.get()
logging.info("get data from queue (waited %0.3f sec.)", time.time() - data[1])
logging.debug("%s packet(s) still waiting in queue", inputQueue.qsize())
# todo should we do this in an thread, to not block receiving ??? but then we should use transmit() and receive() with Lock()
failedTransmits = 0
while not bwClient.receive() == "[ack]": # wait for ack or timeout
if failedTransmits >= 3:
logging.error("cannot transmit after 3 retires")
break
failedTransmits += 1
logging.warning("attempt %d to resend packet", failedTransmits)
bwClient.transmit(str(bwPacket)) # try to resend
bwPacket = Decoder.decode(data[0])
inputQueue.task_done()
if bwPacket is None:
continue
bwPacket.printInfo()
misc.addClientDataToPacket(bwPacket, bwConfig)
for sendCnt in range(bwConfig.get("client", "sendTries", default="3")):
bwClient.transmit(str(bwPacket))
if bwClient.receive() == "[ack-]":
logging.debug("ack ok")
break
logging.warning("cannot send packet - sleep %d seconds", bwConfig.get("client", "sendDelay", default="3"))
time.sleep(bwConfig.get("client", "sendDelay", default="3"))
else:
time.sleep(0.1) # reduce cpu load (wait 100ms)
# in worst case a packet have to wait 100ms until it will be processed
bwClient.disconnect()
break
# test for server ####################################
except KeyboardInterrupt: # pragma: no cover
logging.warning("Keyboard interrupt")
@ -118,5 +155,7 @@ except SystemExit: # pragma: no cover
logging.error("BOSWatch interrupted by an error")
except: # pragma: no cover
logging.exception("BOSWatch interrupted by an error")
finally: # pragma: no cover
logging.debug("BOSWatch has ended ...")
finally:
logging.debug("Starting shutdown routine")
bwClient.disconnect()
logging.debug("BOSWatch client has stopped ...")

View file

@ -44,6 +44,7 @@ from boswatch.packet import Packet
from boswatch.utils import header
from boswatch.network.broadcast import BroadcastServer
from boswatch.router.routerManager import RouterManager
from boswatch.utils import misc
header.logoToLog()
@ -93,7 +94,7 @@ try:
bwPacket = Packet((data[1]))
bwPacket.set("clientIP", data[0])
bwPacket.addServerData(bwConfig)
misc.addServerDataToPacket(bwPacket, bwConfig)
bwRoutMan.runRouter(bwConfig.get("alarmRouter"), bwPacket)
@ -105,23 +106,9 @@ except SystemExit: # pragma: no cover
logging.error("BOSWatch interrupted by an error")
except: # pragma: no cover
logging.exception("BOSWatch interrupted by an error")
finally: # pragma: no cover
finally:
logging.debug("Starting shutdown routine")
del bwRoutMan
try:
bwServer.stop()
except NameError:
pass
except:
raise
try:
bcServer.stop()
except NameError:
pass
except:
raise
logging.debug("BOSWatch has ended ...")
bwServer.stop()
bcServer.stop()
logging.debug("BOSWatch server has stopped ...")

View file

@ -11,6 +11,9 @@ client:
name: BW3 Client # name of the BW3 Client instance
inputSource: sdr # atm only 'sdr' is possible
useBroadcast: no # use broadcast to find server automatically
reconnectDelay: 3 # time in seconds to delay reconnect try
sendTries: 3 # how often should tried to send a packet
sendDelay: 3 # time in seconds to delay the resend try
server: # only used if useBroadcast = no
ip: 127.0.0.1

View file

@ -11,8 +11,11 @@ zwingend in die Konfiguration eingetragen werden.
|Feld|Beschreibung|Default|
|----|------------|-------|
|name|Name zur Identifizierung der Client Instanz||
|inputSource|Art der zu nutzenden Input Quelle (aktuell nur `sdr`)||
|inputSource|Art der zu nutzenden Input Quelle (aktuell nur `sdr`)|sdr|
|useBroadcast|Verbindungsdaten per [Broadcast](information/broadcast.md) beziehen|no|
|reconnectDelay|Verzögerung für erneuten Verbindungsversuch zum Server|3|
|sendTries|Anzahl der Sendeversuche eines Pakets|3|
|sendDelay|Verzögerung für einen erneuten Sendeversuch|3|
---
### `server:`
@ -103,8 +106,8 @@ Jeder Router kann eine beliebige Anzahl einzelner Routenpunkte enthalten. Diese
|Feld|Beschreibung|Default|
|----|------------|-------|
|type|Art des Routenpunktes (module, plugin, router)||
|name|Zu ladende Resource (Siehe weiter unten)||
|config|Konfigurationseinstellungen des Routenpunktes (Siehe weiter unten)||
|name|Zu ladende Resource (Siehe entsprechende Kapitel)||
|config|Konfigurationseinstellungen des Routenpunktes (Siehe entsprechende Kapitel)||
**Beispiel:**
```yaml
@ -121,4 +124,6 @@ router:
---
## Module/Plugins
Für die Konfiguration der Module und Plugins ist in den entsprechenden Kategorien eine ausführliche Beschreibung zu finden.
|Feld|Beschreibung|Default|
|----|------------|-------|
|allowed|Liste der erlaubten Paket Typen `fms` `zvei` `pocsag`||

View file

@ -1,7 +1,13 @@
# <center>BOSWatch 3</center>
---
<center>![BOSWatch](img/bw3.png "BOSWatch 3 Logo")</center>
<center>
![BOSWatch](img/bw3.png "BOSWatch 3 Logo")
Falls du uns unterstützen möchtest würden wir uns über eine Spende freuen.
Server, Hosting, Domain sowie Kaffee kosten leider Geld ;-)
[![](https://www.paypalobjects.com/de_DE/DE/i/btn/btn_donateCC_LG.gif)](https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=CLK9VBN2MSLZY&source=url)
</center>
**Es wird darauf hingewiesen, dass für die Teilnahme am BOS-Funk nur nach den Technischen Richtlinien der BOS zugelassene Funkanlagen verwendet werden dürfen.**
**Der BOS-Funk ist ein nichtöffentlicher mobiler Landfunk. Privatpersonen gehören nicht zum Kreis der berechtigten Funkteilnehmer.** _(Quelle: TR-BOS)_

View file

@ -667,7 +667,7 @@ MAX_INITIALIZER_LINES = 30
# list will mention the files that were used to generate the documentation.
# The default value is: YES.
SHOW_USED_FILES = YES
SHOW_USED_FILES = NO
# Set the SHOW_FILES tag to NO to disable the generation of the Files page. This
# will remove the Files entry from the Quick Index and from the Folder Tree View
@ -681,7 +681,7 @@ SHOW_FILES = NO
# Folder Tree View (if specified).
# The default value is: YES.
SHOW_NAMESPACES = YES
SHOW_NAMESPACES = NO
# The FILE_VERSION_FILTER tag can be used to specify a program or script that
# doxygen should invoke to get the current version for each file (typically from
@ -757,7 +757,7 @@ WARN_IF_DOC_ERROR = YES
# parameter documentation, but not about the absence of documentation.
# The default value is: NO.
WARN_NO_PARAMDOC = NO
WARN_NO_PARAMDOC = YES
# If the WARN_AS_ERROR tag is set to YES then doxygen will immediately stop when
# a warning is encountered.

View file

@ -40,6 +40,15 @@ class BoswatchModule(Module):
@param bwPacket: A BOSWatch packet instance
@return bwPacket or False"""
if bwPacket.get("mode") == "fms":
pass
elif bwPacket.get("mode") == "zvei":
pass
elif bwPacket.get("mode") == "pocsag":
pass
elif bwPacket.get("mode") == "msg":
pass
return bwPacket
def onUnload(self):

View file

@ -17,7 +17,7 @@
import logging
import time
from boswatch.utils import wildcard
from boswatch import wildcard
logging.debug("- %s loaded", __name__)

View file

@ -20,7 +20,7 @@ import logging
import time
import pytest
from boswatch.utils.timer import RepeatedTimer
from boswatch.timer import RepeatedTimer
def setup_method(method):