mirror of
https://github.com/BOSWatch/BW3-Core.git
synced 2025-12-06 07:12:04 +01:00
146 lines
5.7 KiB
Python
146 lines
5.7 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
r"""!
|
|
____ ____ ______ __ __ __ _____
|
|
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
|
|
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
|
|
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
|
|
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
|
|
German BOS Information Script
|
|
by Bastian Schroll
|
|
|
|
@file: bosmon.py
|
|
@date: 14.01.2024
|
|
@author: Justin Kurowski | BW2 version: Jens Herrmann
|
|
@description: Bosmon Connector Port form BOSWatch2 to BOSWatch 3
|
|
"""
|
|
import logging
|
|
from plugin.pluginBase import PluginBase
|
|
|
|
# ###################### #
|
|
# Custom plugin includes #
|
|
|
|
import requests
|
|
from basicauth import encode
|
|
|
|
# ###################### #
|
|
|
|
logging.debug("- %s loaded", __name__)
|
|
|
|
|
|
class BoswatchPlugin(PluginBase):
|
|
r"""!Description of the Plugin"""
|
|
def __init__(self, config):
|
|
r"""!Do not change anything here!"""
|
|
super().__init__(__name__, config) # you can access the config class on 'self.config'
|
|
|
|
# Auskommentierte sachen folgen noch! #
|
|
|
|
def fms(self, bwPacket):
|
|
r"""!Called on FMS alarm
|
|
|
|
@param bwPacket: bwPacket instance
|
|
Remove if not implemented"""
|
|
BM_hostname=self.config.get("hostname")
|
|
BM_port=self.config.get("port")
|
|
BM_user=self.config.get("user")
|
|
BM_passwd=self.config.get("passwd")
|
|
BM_channel=self.config.get("channel")
|
|
|
|
get_FMS=bwPacket.get("fms")
|
|
get_status=bwPacket.get("status")
|
|
get_direction=bwPacket.get("direction")
|
|
get_tacticalInfo=bwPacket.get("tacticalInfo")
|
|
|
|
self._BosmonRequest_FMS(BM_hostname, BM_port, BM_user, BM_passwd, BM_channel, get_FMS, get_status, get_direction, get_tacticalInfo)
|
|
|
|
def pocsag(self, bwPacket):
|
|
r"""!Called on POCSAG alarm
|
|
|
|
@param bwPacket: bwPacket instance
|
|
Remove if not implemented"""
|
|
BM_hostname=self.config.get("hostname")
|
|
BM_port=self.config.get("port")
|
|
BM_user=self.config.get("user")
|
|
BM_passwd=self.config.get("passwd")
|
|
BM_channel=self.config.get("channel")
|
|
|
|
get_ric=bwPacket.get("ric")
|
|
get_subric=bwPacket.get("subricText")
|
|
get_message=bwPacket.get("message")
|
|
|
|
self._BosmonRequest_Poc(BM_hostname, BM_port, BM_user, BM_passwd, BM_channel, get_ric, get_subric, get_message)
|
|
|
|
def zvei(self, bwPacket):
|
|
r"""!Called on ZVEI alarm
|
|
|
|
@param bwPacket: bwPacket instance
|
|
Remove if not implemented"""
|
|
BM_hostname=self.config.get("hostname")
|
|
BM_port=self.config.get("port")
|
|
BM_user=self.config.get("user")
|
|
BM_passwd=self.config.get("passwd")
|
|
BM_channel=self.config.get("channel")
|
|
|
|
get_zvei_adress=bwPacket.get("tone")
|
|
|
|
self._BosmonRequest_Zvei(BM_hostname, BM_port, BM_user, BM_passwd, BM_channel, get_zvei_adress)
|
|
|
|
def _BosmonRequest_Poc(self, BM_hostname, BM_port, BM_user, BM_passwd, BM_channel, get_ric, get_subric, get_message):
|
|
|
|
url = 'http://'+BM_hostname+':'+BM_port+'/telegramin/'+BM_channel+'/input.xml'
|
|
|
|
payload = 'type=pocsag&address='+get_ric+'&flags=0&function='+get_subric+'&message='+get_message
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'Authorization': encode(BM_user, BM_passwd)
|
|
}
|
|
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
|
|
def _BosmonRequest_FMS(self, BM_hostname, BM_port, BM_user, BM_passwd, BM_channel, get_FMS, get_status, get_direction, get_tacticalInfo):
|
|
|
|
url = 'http://'+BM_hostname+':'+BM_port+'/telegramin/'+BM_channel+'/input.xml'
|
|
|
|
payload = 'type=fms&address='+get_FMS+'&flags=0&status='+get_status+'&info='._getInfo(get_tacticalInfo, get_direction)
|
|
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'Authorization': encode(BM_user, BM_passwd)
|
|
}
|
|
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
|
|
def _getInfo(self, get_tacticalInfo, get_direction):
|
|
|
|
# BosMon-Telegramin expected assembly group, direction and tsi in one field
|
|
# structure (binary as hex in base10):
|
|
# Byte 1: assembly group; Byte 2: Direction; Byte 3+4: tactic short info
|
|
info = 0
|
|
# assembly group:
|
|
info = info + 1 # + b0001 (Assumption: is in every time 1 (no output from multimon-ng))
|
|
# direction:
|
|
if get_direction == "1":
|
|
info = info + 2 # + b0010
|
|
# tsi:
|
|
if "IV" in get_tacticalInfo:
|
|
info = info + 12 # + b1100
|
|
elif "III" in get_tacticalInfo:
|
|
info = info + 8 # + b1000
|
|
elif "II" in get_tacticalInfo:
|
|
info = info + 4 # + b0100
|
|
# "I" is nothing to do + b0000
|
|
|
|
return info
|
|
|
|
def _BosmonRequest_Zvei(self, BM_hostname, BM_port, BM_user, BM_passwd, BM_channel, get_zvei_adress):
|
|
|
|
url = 'http://'+BM_hostname+':'+BM_port+'/telegramin/'+BM_channel+'/input.xml'
|
|
|
|
payload = 'type=pocsag&address='+get_zvei_adress+'&flags=0'
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'Authorization': encode(BM_user, BM_passwd)
|
|
}
|
|
|
|
response = requests.request("POST", url, headers=headers, data=payload) |