BW3-Core/plugin/divera.py

139 lines
5.5 KiB
Python
Raw Normal View History

2022-01-23 10:54:36 +01:00
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: divera.py
@date: 16.01.2022
@author: Lars Gremme
@description: Divera247 Plugin
"""
import logging
from plugin.pluginBase import PluginBase
# ###################### #
# Custom plugin includes #
import asyncio
from aiohttp import ClientSession
import urllib
# ###################### #
logging.debug("- %s loaded", __name__)
class BoswatchPlugin(PluginBase):
"""!Description of the Plugin"""
def __init__(self, config):
"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config'
def fms(self, bwPacket):
"""!Called on FMS alarm
@param bwPacket: bwPacket instance
Remove if not implemented"""
fms_data = self.config.get("fms")
apicall = urllib.parse.urlencode({
"accesskey": self.config.get("accesskey", default=""),
"vehicle_ric": self.parseWildcards(fms_data.get("vehicle", default="")),
"status_id": bwPacket.get("status"),
"status_note": bwPacket.get("directionText"),
"title": self.parseWildcards(fms_data.get("title", default="{FMS}")),
"text": self.parseWildcards(fms_data.get("message", default="{FMS}")),
"priority": fms_data.get("priority", default="false"),
})
apipath = "/api/fms"
2022-01-23 12:25:30 +01:00
self._makeRequests(apipath, apicall)
2022-01-23 10:54:36 +01:00
def pocsag(self, bwPacket):
"""!Called on POCSAG alarm
@param bwPacket: bwPacket instance
Remove if not implemented"""
poc_data = self.config.get("pocsag")
apicall = urllib.parse.urlencode({
"accesskey": self.config.get("accesskey", default=""),
"title": self.parseWildcards(poc_data.get("title", default="{RIC}({SRIC})\n{MSG}")),
"ric": self.parseWildcards(poc_data.get("ric", default="")),
"text": self.parseWildcards(poc_data.get("message", default="{MSG}")),
"priority": poc_data.get("priority", default="false"),
})
apipath = "/api/alarm"
2022-01-23 12:25:30 +01:00
self._makeRequests(apipath, apicall)
2022-01-23 10:54:36 +01:00
def zvei(self, bwPacket):
"""!Called on ZVEI alarm
@param bwPacket: bwPacket instance
Remove if not implemented"""
zvei_data = self.config.get("zvei")
apicall = urllib.parse.urlencode({
"accesskey": self.config.get("accesskey", default=""),
"title": self.parseWildcards(zvei_data.get("title", default="{TONE}")),
"ric": self.parseWildcards(zvei_data.get("ric", default="{TONE}")),
"text": self.parseWildcards(zvei_data.get("message", default="{TONE}")),
"priority": zvei_data.get("priority", default="false"),
})
apipath = "/api/alarm"
2022-01-23 12:25:30 +01:00
self._makeRequests(apipath, apicall)
2022-01-23 10:54:36 +01:00
def msg(self, bwPacket):
"""!Called on MSG packet
@param bwPacket: bwPacket instance
Remove if not implemented"""
msg_data = self.config.get("msg")
apicall = urllib.parse.urlencode({
"accesskey": self.config.get("accesskey", default=""),
"title": self.parseWildcards(msg_data.get("title", default="{MSG}")),
"ric": self.parseWildcards(msg_data.get("ric", default="")),
"text": self.parseWildcards(msg_data.get("message", default="{MSG}")),
"priority": msg_data.get("priority", default="false"),
})
apipath = "/api/alarm"
2022-01-23 12:25:30 +01:00
self._makeRequests(apipath, apicall)
2022-01-23 10:54:36 +01:00
2022-01-23 12:25:30 +01:00
def _makeRequests(self, apipath, apicall):
2022-01-23 10:54:36 +01:00
"""Parses wildcard urls and handles asynchronus requests
@param urls: array of urls"""
url = "https://www.divera247.com"
request = url + apipath + "?" + apicall
loop = asyncio.get_event_loop()
2022-01-23 12:25:30 +01:00
future = asyncio.ensure_future(self._asyncRequests(request))
2022-01-23 10:54:36 +01:00
loop.run_until_complete(future)
2022-01-23 12:25:30 +01:00
async def _asyncRequests(self, url):
2022-01-23 10:54:36 +01:00
"""Handles asynchronus requests
@param urls: array of urls to send requests to"""
tasks = []
async with ClientSession() as session:
logging.debug("Generated URL: [{}]".format(url))
2022-01-23 12:25:30 +01:00
task = asyncio.ensure_future(self._fetch(url, session))
2022-01-23 10:54:36 +01:00
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
2022-01-23 12:25:30 +01:00
async def _fetch(self, url, session):
2022-01-23 10:54:36 +01:00
"""Fetches requests
@param url: url
@param session: Clientsession instance"""
logging.debug("Post URL: [{}]".format(url))
async with session.post(url) as response:
logging.info("{} returned [{}]".format(response.url, response.status))
return await response.read()