mirror of
https://github.com/BOSWatch/BW3-Core.git
synced 2026-04-20 05:33:49 +00:00
Merge branch 'develop' into update_check
This commit is contained in:
commit
be73d3c381
60 changed files with 1013 additions and 345 deletions
|
|
@ -33,6 +33,10 @@ class ConfigYAML:
|
|||
else:
|
||||
yield item
|
||||
|
||||
def __len__(self):
|
||||
"""!returns the length of an config element"""
|
||||
return len(self._config)
|
||||
|
||||
def __str__(self):
|
||||
"""!Returns the string representation of the internal config dict"""
|
||||
return str(self._config)
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ class Decoder:
|
|||
|
||||
@param data: data to decode
|
||||
@return bwPacket instance"""
|
||||
logging.debug("search decoder")
|
||||
data = str(data)
|
||||
if "FMS" in data:
|
||||
return FmsDecoder.decode(data)
|
||||
|
|
|
|||
|
|
@ -62,10 +62,9 @@ class FmsDecoder:
|
|||
bwPacket.set("directionText", directionText)
|
||||
bwPacket.set("tacticalInfo", tacticalInfo)
|
||||
|
||||
logging.debug(bwPacket)
|
||||
return bwPacket
|
||||
|
||||
logging.warning("no valid data")
|
||||
logging.warning("no valid FMS")
|
||||
return None
|
||||
logging.warning("CRC Error")
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -56,10 +56,9 @@ class PocsagDecoder:
|
|||
bwPacket.set("subricText", subricText)
|
||||
bwPacket.set("message", message)
|
||||
|
||||
logging.debug(bwPacket)
|
||||
return bwPacket
|
||||
|
||||
logging.warning("no valid data")
|
||||
logging.warning("no valid POCSAG")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
|
|
|
|||
|
|
@ -44,10 +44,9 @@ class ZveiDecoder:
|
|||
bwPacket.set("mode", "zvei")
|
||||
bwPacket.set("zvei", ZveiDecoder._solveDoubleTone(data[7:12]))
|
||||
|
||||
logging.debug(bwPacket)
|
||||
return bwPacket
|
||||
|
||||
logging.warning("no valid data")
|
||||
logging.warning("no valid ZVEI")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
|
|
|
|||
2
boswatch/inputSource/__init__.py
Normal file
2
boswatch/inputSource/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
65
boswatch/inputSource/inputBase.py
Normal file
65
boswatch/inputSource/inputBase.py
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""!
|
||||
____ ____ ______ __ __ __ _____
|
||||
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
|
||||
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
|
||||
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
|
||||
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
|
||||
German BOS Information Script
|
||||
by Bastian Schroll
|
||||
|
||||
@file: inoutSource.py
|
||||
@date: 28.10.2018
|
||||
@author: Bastian Schroll
|
||||
@description: Base class for boswatch input sources
|
||||
"""
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
logging.debug("- %s loaded", __name__)
|
||||
|
||||
|
||||
class InputBase(ABC):
|
||||
"""!Base class for handling inout sources"""
|
||||
|
||||
def __init__(self, inputQueue, inputConfig, decoderConfig):
|
||||
"""!Build a new InputSource class
|
||||
|
||||
@param inputQueue: Python queue object to store input data
|
||||
@param inputConfig: ConfigYaml object with the inoutSource config
|
||||
@param decoderConfig: ConfigYaml object with the decoder config"""
|
||||
self._inputThread = None
|
||||
self._isRunning = False
|
||||
self._inputQueue = inputQueue
|
||||
self._inputConfig = inputConfig
|
||||
self._decoderConfig = decoderConfig
|
||||
|
||||
def start(self):
|
||||
"""!Start the input source thread"""
|
||||
logging.debug("starting input thread")
|
||||
self._isRunning = True
|
||||
self._inputThread = threading.Thread(target=self._runThread, name="inputThread",
|
||||
args=(self._inputQueue, self._inputConfig, self._decoderConfig))
|
||||
self._inputThread.daemon = True
|
||||
self._inputThread.start()
|
||||
|
||||
@abstractmethod
|
||||
def _runThread(self, dataQueue, sdrConfig, decoderConfig):
|
||||
"""!Thread routine of the input source has to be inherit"""
|
||||
|
||||
def shutdown(self):
|
||||
"""!Stop the input source thread"""
|
||||
if self._isRunning:
|
||||
logging.debug("wait for stopping the input thread")
|
||||
self._isRunning = False
|
||||
self._inputThread.join()
|
||||
logging.debug("input thread stopped")
|
||||
|
||||
def addToQueue(self, data):
|
||||
"""!Adds alarm data to the queue for further processing during boswatch client"""
|
||||
self._inputQueue.put_nowait((data, time.time()))
|
||||
logging.debug("Add received data to queue")
|
||||
print(data)
|
||||
77
boswatch/inputSource/sdrInput.py
Normal file
77
boswatch/inputSource/sdrInput.py
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
"""!
|
||||
____ ____ ______ __ __ __ _____
|
||||
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
|
||||
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
|
||||
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
|
||||
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
|
||||
German BOS Information Script
|
||||
by Bastian Schroll
|
||||
|
||||
@file: sdrInput.py
|
||||
@date: 28.10.2018
|
||||
@author: Bastian Schroll
|
||||
@description: Input source for sdr with rtl_fm
|
||||
"""
|
||||
import logging
|
||||
from boswatch.utils import paths
|
||||
from boswatch.processManager import ProcessManager
|
||||
from boswatch.inputSource.inputBase import InputBase
|
||||
|
||||
logging.debug("- %s loaded", __name__)
|
||||
|
||||
|
||||
class SdrInput(InputBase):
|
||||
"""!Class for the sdr input source"""
|
||||
|
||||
def _runThread(self, dataQueue, sdrConfig, decoderConfig):
|
||||
sdrProc = None
|
||||
mmProc = None
|
||||
try:
|
||||
sdrProc = ProcessManager(str(sdrConfig.get("rtlPath", default="rtl_fm")))
|
||||
sdrProc.addArgument("-d " + str(sdrConfig.get("device", default="0"))) # device id
|
||||
sdrProc.addArgument("-f " + str(sdrConfig.get("frequency"))) # frequencies
|
||||
sdrProc.addArgument("-p " + str(sdrConfig.get("error", default="0"))) # frequency error in ppm
|
||||
sdrProc.addArgument("-l " + str(sdrConfig.get("squelch", default="1"))) # squelch
|
||||
sdrProc.addArgument("-g " + str(sdrConfig.get("gain", default="100"))) # gain
|
||||
sdrProc.addArgument("-M fm") # set mode to fm
|
||||
sdrProc.addArgument("-E DC") # set DC filter
|
||||
sdrProc.addArgument("-s 22050") # bit rate of audio stream
|
||||
sdrProc.setStderr(open(paths.LOG_PATH + "rtl_fm.log", "a"))
|
||||
sdrProc.start()
|
||||
|
||||
mmProc = ProcessManager(str(sdrConfig.get("mmPath", default="multimon-ng")), textMode=True)
|
||||
if decoderConfig.get("fms", default=0):
|
||||
mmProc.addArgument("-a FMSFSK")
|
||||
if decoderConfig.get("zvei", default=0):
|
||||
mmProc.addArgument("-a ZVEI1")
|
||||
if decoderConfig.get("poc512", default=0):
|
||||
mmProc.addArgument("-a POCSAG512")
|
||||
if decoderConfig.get("poc1200", default=0):
|
||||
mmProc.addArgument("-a POCSAG1200")
|
||||
if decoderConfig.get("poc2400", default=0):
|
||||
mmProc.addArgument("-a POCSAG2400")
|
||||
mmProc.addArgument("-f alpha")
|
||||
mmProc.addArgument("-t raw -")
|
||||
mmProc.setStdin(sdrProc.stdout)
|
||||
mmProc.setStderr(open(paths.LOG_PATH + "multimon-ng.log", "a"))
|
||||
mmProc.start()
|
||||
|
||||
logging.info("start decoding")
|
||||
while self._isRunning:
|
||||
if not sdrProc.isRunning:
|
||||
logging.warning("rtl_fm was down - try to restart")
|
||||
sdrProc.start()
|
||||
elif not mmProc.isRunning:
|
||||
logging.warning("multimon was down - try to restart")
|
||||
mmProc.start()
|
||||
elif sdrProc.isRunning and mmProc.isRunning:
|
||||
line = mmProc.readline()
|
||||
if line:
|
||||
self.addToQueue(line)
|
||||
except:
|
||||
logging.exception("error in sdr input routine")
|
||||
finally:
|
||||
mmProc.stop()
|
||||
sdrProc.stop()
|
||||
|
|
@ -57,6 +57,7 @@ class TCPClient:
|
|||
@return True or False"""
|
||||
try:
|
||||
if self.isConnected:
|
||||
self._sock.shutdown(socket.SHUT_RDWR)
|
||||
self._sock.close()
|
||||
logging.debug("disconnected")
|
||||
return True
|
||||
|
|
@ -72,29 +73,35 @@ class TCPClient:
|
|||
@param data: data to send to the server
|
||||
@return True or False"""
|
||||
try:
|
||||
logging.debug("transmitting: %s", data)
|
||||
header = str(len(data)).ljust(HEADERSIZE)
|
||||
self._sock.sendall(bytes(header + data, "utf-8"))
|
||||
logging.debug("transmitting:\n%s", data)
|
||||
data = data.encode("utf-8")
|
||||
header = str(len(data)).ljust(HEADERSIZE).encode("utf-8")
|
||||
self._sock.sendall(header + data)
|
||||
logging.debug("transmitted...")
|
||||
return True
|
||||
except socket.error as e:
|
||||
logging.error(e)
|
||||
return False
|
||||
|
||||
def receive(self):
|
||||
def receive(self, timeout=1):
|
||||
"""!Receive data from the server
|
||||
|
||||
@param timeout: to wait for incoming data in seconds
|
||||
@return received data"""
|
||||
try:
|
||||
read, _, _ = select.select([self._sock], [], [], 1)
|
||||
read, _, _ = select.select([self._sock], [], [], timeout)
|
||||
if not read: # check if there is something to read
|
||||
return False
|
||||
header = self._sock.recv(HEADERSIZE)
|
||||
|
||||
header = self._sock.recv(HEADERSIZE).decode("utf-8")
|
||||
if not len(header): # check if there data
|
||||
return False
|
||||
length = int(header.decode("utf-8").strip())
|
||||
|
||||
length = int(header.strip())
|
||||
received = self._sock.recv(length).decode("utf-8")
|
||||
logging.debug("received %d bytes: %s", length, received)
|
||||
|
||||
logging.debug("recv header: '%s'", header)
|
||||
logging.debug("received %d bytes: %s", len(received), received)
|
||||
return received
|
||||
except socket.error as e:
|
||||
logging.error(e)
|
||||
|
|
@ -104,12 +111,17 @@ class TCPClient:
|
|||
def isConnected(self):
|
||||
"""!Property of client connected state"""
|
||||
try:
|
||||
aliveMsg = "<alive>"
|
||||
header = str(len(aliveMsg)).ljust(HEADERSIZE)
|
||||
self._sock.sendall(bytes(header + aliveMsg, "utf-8"))
|
||||
return True
|
||||
if self._sock:
|
||||
_, write, _ = select.select([], [self._sock], [], 0.1)
|
||||
if write:
|
||||
data = "<keep-alive>".encode("utf-8")
|
||||
header = str(len(data)).ljust(HEADERSIZE).encode("utf-8")
|
||||
self._sock.sendall(header + data)
|
||||
return True
|
||||
return False
|
||||
except socket.error as e:
|
||||
if e.errno is 32: # broken pipe - no one will read from this pipe anymore
|
||||
return False
|
||||
logging.error(e)
|
||||
return False
|
||||
if e.errno != 32:
|
||||
logging.exception(e)
|
||||
return False
|
||||
except ValueError:
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -27,10 +27,10 @@ class NetCheck:
|
|||
"""!Create a new NetCheck instance
|
||||
|
||||
@param hostname: host against connection check is running ("https://www.google.com/")
|
||||
@param timout: timout for connection check in sec. (1)"""
|
||||
@param timeout: timeout for connection check in sec. (1)"""
|
||||
self._hostname = hostname
|
||||
self._timeout = timeout
|
||||
self._connectionState = False
|
||||
self.connectionState = False
|
||||
self.checkConn() # initiate a first check
|
||||
|
||||
def checkConn(self):
|
||||
|
|
@ -40,14 +40,9 @@ class NetCheck:
|
|||
try:
|
||||
urlopen(self._hostname, timeout=self._timeout)
|
||||
logging.debug("%s is reachable", self._hostname)
|
||||
self._connectionState = True
|
||||
self.connectionState = True
|
||||
return True
|
||||
except: # todo find right exception type
|
||||
logging.warning("%s is not reachable", self._hostname)
|
||||
self._connectionState = False
|
||||
self.connectionState = False
|
||||
return False
|
||||
|
||||
@property
|
||||
def connectionState(self):
|
||||
"""!Property for the last connection state from checkConn()"""
|
||||
return self._connectionState
|
||||
|
|
|
|||
|
|
@ -46,17 +46,18 @@ class _ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
|
|||
if not read:
|
||||
continue # nothing to read on the socket
|
||||
|
||||
header = self.request.recv(HEADERSIZE)
|
||||
header = self.request.recv(HEADERSIZE).decode("utf-8")
|
||||
if not len(header):
|
||||
break # empty data -> socked closed
|
||||
|
||||
length = int(header.decode("utf-8").strip())
|
||||
length = int(header.strip())
|
||||
data = self.request.recv(length).decode("utf-8")
|
||||
|
||||
if data == "<alive>":
|
||||
if data == "<keep-alive>":
|
||||
continue
|
||||
|
||||
logging.debug("%s recv %d bytes: %s", req_name, length, data)
|
||||
logging.debug("%s recv header: '%s'", req_name, header)
|
||||
logging.debug("%s recv %d bytes:\n%s", req_name, len(data), data)
|
||||
|
||||
# add a new entry and the decoded data dict as an string in utf-8 and an timestamp
|
||||
self.server.alarmQueue.put_nowait((self.client_address[0], data, time.time())) # queue is threadsafe
|
||||
|
|
@ -64,15 +65,15 @@ class _ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
|
|||
|
||||
logging.debug("%s send: [ack]", req_name)
|
||||
|
||||
data = "[ack]"
|
||||
header = str(len(data)).ljust(HEADERSIZE)
|
||||
self.request.sendall(bytes(header + data, "utf-8"))
|
||||
self.request.close()
|
||||
data = "[ack]".encode("utf-8")
|
||||
header = str(len(data)).ljust(HEADERSIZE).encode("utf-8")
|
||||
self.request.sendall(header + data)
|
||||
|
||||
except socket.error as e:
|
||||
logging.error(e)
|
||||
return False
|
||||
finally:
|
||||
self.request.close()
|
||||
del self.server.clientsConnected[threading.current_thread().name]
|
||||
logging.info("Client disconnected: %s", self.client_address[0])
|
||||
|
||||
|
|
@ -115,7 +116,7 @@ class TCPServer:
|
|||
@return True or False"""
|
||||
if not self.isRunning:
|
||||
try:
|
||||
socketserver.TCPServer.allow_reuse_address = False # because we can start two instances on same port elsewhere
|
||||
socketserver.TCPServer.allow_reuse_address = True # because we can start two instances on same port elsewhere
|
||||
self._server = _ThreadedTCPServer(("", port), _ThreadedTCPRequestHandler)
|
||||
self._server.timeout = self._timeout
|
||||
self._server.alarmQueue = self._alarmQueue
|
||||
|
|
@ -144,6 +145,7 @@ class TCPServer:
|
|||
if self.isRunning:
|
||||
self._server.shutdown()
|
||||
self._server.isActive = False
|
||||
self._server.server_close()
|
||||
self._server_thread.join()
|
||||
self._server_thread = None
|
||||
self._server = None
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ class Packet:
|
|||
|
||||
@param fieldName: Name of the data to set
|
||||
@param value: Value to set"""
|
||||
self._packet[fieldName] = value
|
||||
self._packet[fieldName] = str(value)
|
||||
|
||||
def get(self, fieldName):
|
||||
"""!Returns the value from a single field.
|
||||
|
|
@ -52,7 +52,7 @@ class Packet:
|
|||
@param fieldName: Name of the field
|
||||
@return Value or None"""
|
||||
try:
|
||||
return self._packet[fieldName]
|
||||
return str(self._packet[fieldName])
|
||||
except:
|
||||
logging.warning("field not found: %s", fieldName)
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -32,9 +32,6 @@ class ProcessManager:
|
|||
self._processHandle = None
|
||||
self._textMode = textMode
|
||||
|
||||
def __del__(self):
|
||||
self.stop()
|
||||
|
||||
def addArgument(self, arg):
|
||||
"""!add a new argument
|
||||
|
||||
|
|
|
|||
|
|
@ -15,24 +15,22 @@
|
|||
@description: Class for a single BOSWatch packet router route point
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
logging.debug("- %s loaded", __name__)
|
||||
|
||||
|
||||
class Route:
|
||||
"""!Class for single routing points"""
|
||||
def __init__(self, name, callback):
|
||||
def __init__(self, name, callback, statsCallback=None, cleanupCallback=None):
|
||||
"""!Create a instance of an route point
|
||||
|
||||
@param name: name of the route point
|
||||
@param callback: instance of the callback function
|
||||
@param statsCallback: instance of the callback to get statistics (None)
|
||||
@param cleanupCallback: instance of the callback to run a cleanup method (None)
|
||||
"""
|
||||
self._name = name
|
||||
self._callback = callback
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""!Property to get the route point name"""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def callback(self):
|
||||
"""!Porperty to get the callback function instance"""
|
||||
return self._callback
|
||||
self.name = name
|
||||
self.callback = callback
|
||||
self.statistics = statsCallback
|
||||
self.cleanup = cleanupCallback
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
"""
|
||||
import logging
|
||||
import copy
|
||||
import time
|
||||
|
||||
logging.debug("- %s loaded", __name__)
|
||||
|
||||
|
|
@ -26,17 +27,25 @@ class Router:
|
|||
"""!Create a new router
|
||||
|
||||
@param name: name of the router"""
|
||||
self._name = name
|
||||
self._routeList = []
|
||||
logging.debug("[%s] new router", self._name)
|
||||
self.name = name
|
||||
self.routeList = []
|
||||
|
||||
# for time counting
|
||||
self._cumTime = 0
|
||||
self._routerTime = 0
|
||||
|
||||
# for statistics
|
||||
self._runCount = 0
|
||||
|
||||
logging.debug("[%s] add new router", self.name)
|
||||
|
||||
def addRoute(self, route):
|
||||
"""!Adds a route point to the router
|
||||
|
||||
@param route: instance of the Route class
|
||||
"""
|
||||
logging.debug("[%s] add route: %s", self._name, route.name)
|
||||
self._routeList.append(route)
|
||||
logging.debug("[%s] add route: %s", self.name, route.name)
|
||||
self.routeList.append(route)
|
||||
|
||||
def runRouter(self, bwPacket):
|
||||
"""!Run the router
|
||||
|
|
@ -44,29 +53,37 @@ class Router:
|
|||
@param bwPacket: instance of Packet class
|
||||
@return a instance of Packet class
|
||||
"""
|
||||
logging.debug("[%s] started", self._name)
|
||||
for routeObject in self._routeList:
|
||||
logging.debug("[%s] -> run route: %s", self._name, routeObject)
|
||||
self._runCount += 1
|
||||
tmpTime = time.time()
|
||||
|
||||
logging.debug("[%s] started", self.name)
|
||||
|
||||
for routeObject in self.routeList:
|
||||
logging.debug("[%s] -> run route: %s", self.name, routeObject.name)
|
||||
bwPacket_tmp = routeObject.callback(copy.deepcopy(bwPacket)) # copy bwPacket to prevent edit the original
|
||||
|
||||
if bwPacket_tmp is None: # returning None doesnt change the bwPacket
|
||||
continue
|
||||
|
||||
if bwPacket_tmp is False: # returning False stops the router immediately
|
||||
logging.debug("[%s] stopped", self._name)
|
||||
logging.debug("[%s] stopped", self.name)
|
||||
break
|
||||
|
||||
bwPacket = bwPacket_tmp
|
||||
logging.debug("[%s] <- bwPacket returned: %s", self._name, bwPacket)
|
||||
logging.debug("[%s] finished", self._name)
|
||||
logging.debug("[%s] bwPacket returned", self.name)
|
||||
logging.debug("[%s] finished", self.name)
|
||||
|
||||
self._routerTime = time.time() - tmpTime
|
||||
self._cumTime += self._routerTime
|
||||
|
||||
return bwPacket
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""!Property to get the name of the router"""
|
||||
return self._name
|
||||
def _getStatistics(self):
|
||||
"""!Returns statistical information's from last router run
|
||||
|
||||
@property
|
||||
def routeList(self):
|
||||
"""!Property to get a list of all route points of this router"""
|
||||
return self._routeList
|
||||
@return Statistics as pyton dict"""
|
||||
stats = {"type": "router",
|
||||
"runCount": self._runCount,
|
||||
"cumTime": self._cumTime,
|
||||
"moduleTime": self._routerTime}
|
||||
return stats
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
# todo think about implement threading for routers and the plugin calls (THREAD SAFETY!!!)
|
||||
import logging
|
||||
import importlib
|
||||
import time
|
||||
from boswatch.configYaml import ConfigYAML
|
||||
from boswatch.router.router import Router
|
||||
from boswatch.router.route import Route
|
||||
|
|
@ -30,16 +31,10 @@ class RouterManager:
|
|||
def __init__(self):
|
||||
"""!Create new router"""
|
||||
self._routerDict = {}
|
||||
|
||||
def __del__(self):
|
||||
"""!Destroy the internal routerDict
|
||||
All routers and route point instances will be destroyed too
|
||||
Also destroys all instances from modules or plugins"""
|
||||
# destroy all routers (also destroys all instances of modules/plugins)
|
||||
del self._routerDict
|
||||
self._startTime = int(time.time())
|
||||
|
||||
# if there is an error, router list would be empty (see tmp variable)
|
||||
def buildRouter(self, config):
|
||||
def buildRouters(self, config):
|
||||
"""!Initialize Routers from given config file
|
||||
|
||||
@param config: instance of ConfigYaml class
|
||||
|
|
@ -64,33 +59,42 @@ class RouterManager:
|
|||
|
||||
for route in router.get("route"):
|
||||
routeType = route.get("type")
|
||||
routeName = route.get("name")
|
||||
routeRes = route.get("res")
|
||||
routeName = route.get("name", default=routeRes)
|
||||
|
||||
routeConfig = route.get("config", default=ConfigYAML()) # if no config - build a empty
|
||||
|
||||
if routeType is None or routeName is None:
|
||||
if routeType is None or routeRes is None:
|
||||
logging.error("type or name not found in route: %s", route)
|
||||
return False
|
||||
|
||||
try:
|
||||
if routeType == "plugin":
|
||||
importedFile = importlib.import_module(routeType + "." + routeName)
|
||||
importedFile = importlib.import_module(routeType + "." + routeRes)
|
||||
loadedClass = importedFile.BoswatchPlugin(routeConfig)
|
||||
routerDict_tmp[routerName].addRoute(Route(routeName, loadedClass._run))
|
||||
routerDict_tmp[routerName].addRoute(Route(routeName,
|
||||
loadedClass._run,
|
||||
loadedClass._getStatistics,
|
||||
loadedClass._cleanup))
|
||||
|
||||
elif routeType == "module":
|
||||
importedFile = importlib.import_module(routeType + "." + routeName)
|
||||
importedFile = importlib.import_module(routeType + "." + routeRes)
|
||||
loadedClass = importedFile.BoswatchModule(routeConfig)
|
||||
routerDict_tmp[routerName].addRoute(Route(routeName, loadedClass._run))
|
||||
routerDict_tmp[routerName].addRoute(Route(routeName,
|
||||
loadedClass._run,
|
||||
loadedClass._getStatistics,
|
||||
loadedClass._cleanup))
|
||||
|
||||
elif routeType == "router":
|
||||
routerDict_tmp[routerName].addRoute(Route(routeName, routerDict_tmp[routeName].runRouter))
|
||||
routerDict_tmp[routerName].addRoute(Route(routeName, routerDict_tmp[routeRes].runRouter))
|
||||
|
||||
else:
|
||||
logging.error("unknown type '%s' in %s", routeType, route)
|
||||
return False
|
||||
|
||||
except ModuleNotFoundError:
|
||||
logging.error("%s not found: %s", route.get("type"), route.get("name"))
|
||||
# except ModuleNotFoundError: # only since Py3.6
|
||||
except ImportError:
|
||||
logging.error("%s not found: %s", route.get("type"), route.get("res"))
|
||||
return False
|
||||
|
||||
logging.debug("finished building routers")
|
||||
|
|
@ -98,7 +102,7 @@ class RouterManager:
|
|||
self._showRouterRoute()
|
||||
return True
|
||||
|
||||
def runRouter(self, routerRunList, bwPacket):
|
||||
def runRouters(self, routerRunList, bwPacket):
|
||||
"""!Run given Routers
|
||||
|
||||
@param routerRunList: string or list of router names in string form
|
||||
|
|
@ -112,6 +116,16 @@ class RouterManager:
|
|||
else:
|
||||
logging.warning("unknown router: %s", routerName)
|
||||
|
||||
self._saveStats() # write stats to stats file
|
||||
|
||||
def cleanup(self):
|
||||
"""!Run cleanup routines for all loaded route points"""
|
||||
for name, routerObject in self._routerDict.items():
|
||||
logging.debug("Start cleanup for %s", name)
|
||||
for routePoint in routerObject.routeList:
|
||||
if routePoint.cleanup:
|
||||
routePoint.cleanup()
|
||||
|
||||
def _showRouterRoute(self):
|
||||
"""!Show the routes of all routers"""
|
||||
for name, routerObject in self._routerDict.items():
|
||||
|
|
@ -120,3 +134,27 @@ class RouterManager:
|
|||
for routePoint in routerObject.routeList:
|
||||
counter += 1
|
||||
logging.debug(" %d. %s", counter, routePoint.name)
|
||||
|
||||
def _saveStats(self):
|
||||
"""!Save current statistics to file"""
|
||||
lines = []
|
||||
for name, routerObject in self._routerDict.items():
|
||||
lines.append("[" + name + "]")
|
||||
lines.append(" - Route points: " + str(len(routerObject.routeList)))
|
||||
lines.append(" - Runs: " + str(routerObject._getStatistics()['runCount']))
|
||||
for routePoint in routerObject.routeList:
|
||||
lines.append("[+] " + routePoint.name)
|
||||
if routePoint.statistics:
|
||||
if routePoint.statistics()['type'] == "module":
|
||||
lines.append(" - Runs: " + str(routePoint.statistics()['runCount']))
|
||||
lines.append(" - Run errors: " + str(routePoint.statistics()['moduleErrorCount']))
|
||||
elif routePoint.statistics()['type'] == "plugin":
|
||||
lines.append(" - Runs: " + str(routePoint.statistics()['runCount']))
|
||||
lines.append(" - Setup errors: " + str(routePoint.statistics()['setupErrorCount']))
|
||||
lines.append(" - Alarm errors: " + str(routePoint.statistics()['alarmErrorCount']))
|
||||
lines.append(" - Teardown errors: " + str(routePoint.statistics()['teardownErrorCount']))
|
||||
lines.append("")
|
||||
|
||||
with open("stats_" + str(self._startTime) + ".txt", "w") as stats:
|
||||
for line in lines:
|
||||
stats.write(line + "\n")
|
||||
|
|
|
|||
|
|
@ -36,8 +36,8 @@ class RepeatedTimer:
|
|||
self._args = args
|
||||
self._kwargs = kwargs
|
||||
self._start = 0
|
||||
self._overdueCount = 0
|
||||
self._lostEvents = 0
|
||||
self.overdueCount = 0
|
||||
self.lostEvents = 0
|
||||
self._isRunning = False
|
||||
self._event = Event()
|
||||
self._thread = None
|
||||
|
|
@ -88,8 +88,8 @@ class RepeatedTimer:
|
|||
lostEvents = int(runTime / self._interval)
|
||||
logging.warning("timer overdue! interval: %0.3f sec. - runtime: %0.3f sec. - "
|
||||
"%d events lost - next call in: %0.3f sec.", self._interval, runTime, lostEvents, self.restTime)
|
||||
self._lostEvents += lostEvents
|
||||
self._overdueCount += 1
|
||||
self.lostEvents += lostEvents
|
||||
self.overdueCount += 1
|
||||
logging.debug("repeatedTimer thread stopped: %s", self._thread.name)
|
||||
self._thread = None # set to none after leave teh thread (running recognize)
|
||||
|
||||
|
|
@ -104,13 +104,3 @@ class RepeatedTimer:
|
|||
def restTime(self):
|
||||
"""!Property to get remaining time till next call"""
|
||||
return self._interval - ((time.time() - self._start) % self._interval)
|
||||
|
||||
@property
|
||||
def overdueCount(self):
|
||||
"""!Property to get a count over all overdues"""
|
||||
return self._overdueCount
|
||||
|
||||
@property
|
||||
def lostEvents(self):
|
||||
"""!Property to get a count over all lost events"""
|
||||
return self._lostEvents
|
||||
|
|
|
|||
|
|
@ -12,28 +12,43 @@
|
|||
@file: wildcard.py
|
||||
@date: 15.01.2018
|
||||
@author: Bastian Schroll
|
||||
@description: Little Helper to replace wildcards in stings
|
||||
@description: Functions to replace wildcards in stings
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
|
||||
# from boswatch.module import file
|
||||
|
||||
logging.debug("- %s loaded", __name__)
|
||||
|
||||
# todo check function and document + write an test
|
||||
# todo maybe can be a module instead of a native boswatch piece
|
||||
# idea: maybe this can be a class with a register_wildcard() method
|
||||
# so the list with wildcards can be modified by other modules
|
||||
# todo check function - write an test
|
||||
|
||||
_additionalWildcards = {}
|
||||
|
||||
|
||||
def registerWildcard(wildcard, bwPacketField):
|
||||
"""!Register a new additional wildcard
|
||||
|
||||
@param wildcard: New wildcard string with format: '{WILDCARD}'
|
||||
@param bwPacketField: Field of the bwPacket which is used for wildcard replacement"""
|
||||
if wildcard in _additionalWildcards:
|
||||
logging.error("wildcard always registered: %s", wildcard)
|
||||
return
|
||||
logging.debug("register new wildcard %s for field: %s", wildcard, bwPacketField)
|
||||
_additionalWildcards[wildcard] = bwPacketField
|
||||
|
||||
|
||||
def replaceWildcards(message, bwPacket):
|
||||
"""!Replace the wildcards in a given message
|
||||
|
||||
@param message: Message in which wildcards should be replaced
|
||||
@param bwPacket: bwPacket instance with the replacement information
|
||||
@return Input message with the replaced wildcards"""
|
||||
_wildcards = {
|
||||
# formatting wildcards
|
||||
# todo check if br and par are needed - if not also change config
|
||||
"{BR}": "\r\n",
|
||||
"{LPAR}": "(",
|
||||
"{RPAR}": ")",
|
||||
"{TIME}": time.time(),
|
||||
"{TIME}": time.strftime("%d.%m.%Y %H:%M:%S"),
|
||||
|
||||
# info wildcards
|
||||
# server
|
||||
|
|
@ -78,7 +93,10 @@ def replaceWildcards(message, bwPacket):
|
|||
|
||||
# message for MSG packet is done in poc
|
||||
}
|
||||
for wildcard in _wildcards:
|
||||
message = message.replace(wildcard, _wildcards.get(wildcard))
|
||||
for wildcard, field in _wildcards.items():
|
||||
message = message.replace(wildcard, field)
|
||||
|
||||
for wildcard, field in _additionalWildcards.items():
|
||||
message = message.replace(wildcard, bwPacket.getField(field))
|
||||
|
||||
return message
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue