mirror of
https://github.com/BOSWatch/BW3-Core.git
synced 2025-12-06 07:12:04 +01:00
Compare commits
4 commits
16dbd731e8
...
017e882363
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
017e882363 | ||
|
|
2e5479cde2 | ||
|
|
523329a9bb | ||
|
|
6a0a59c3ac |
|
|
@ -22,11 +22,14 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
|
|||
|----|------------|-------|
|
||||
|botToken|Der Api-Key des Telegram-Bots||
|
||||
|chatIds|Liste mit Chat-Ids der Empfängers / der Emfänger-Gruppen||
|
||||
|startup_message|Nachricht, dass das Telegram-Plugin erfolgreich geladen wurde|leer|
|
||||
|message_fms|Format der Nachricht für FMS|`{FMS}`|
|
||||
|message_pocsag|Format der Nachricht für Pocsag|`{RIC}({SRIC})\n{MSG}`|
|
||||
|message_zvei|Format der Nachricht für ZVEI|`{TONE}`|
|
||||
|message_msg|Format der Nachricht für MSG||
|
||||
|queue|Aktivieren/Deaktivieren der MessageQueue|true|
|
||||
|max_retries|Anzahl der Versuche, bis das Senden abgebrochen wird|5|
|
||||
|initial_delay|Verzögerung des zweiten Sendeversuchs|2 [Sek.]|
|
||||
|max_delay|Maximale Verzögerung|60 [Sek.]|
|
||||
|
||||
**Beispiel:**
|
||||
```yaml
|
||||
|
|
@ -35,6 +38,7 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
|
|||
res: telegram
|
||||
config:
|
||||
message_pocsag: "{RIC}({SRIC})\n{MSG}"
|
||||
startup_message: "Server up and running!"
|
||||
botToken: "BOT_TOKEN"
|
||||
chatIds:
|
||||
- "CHAT_ID"
|
||||
|
|
|
|||
|
|
@ -10,126 +10,202 @@ r"""!
|
|||
by Bastian Schroll
|
||||
|
||||
@file: telegram.py
|
||||
@date: 20.02.2020
|
||||
@author: Jan Speller
|
||||
@description: Telegram Plugin
|
||||
@date: 12.07.2025
|
||||
@author: Claus Schichl nach der Idee von Jan Speller
|
||||
@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
import threading
|
||||
import queue
|
||||
import requests
|
||||
from plugin.pluginBase import PluginBase
|
||||
|
||||
# ###################### #
|
||||
# Custom plugin includes #
|
||||
from telegram.error import (TelegramError, Unauthorized, BadRequest, TimedOut, NetworkError)
|
||||
from telegram.ext import messagequeue as mq
|
||||
from telegram.utils.request import Request
|
||||
import telegram.bot
|
||||
# ###################### #
|
||||
|
||||
logging.debug("- %s loaded", __name__)
|
||||
# Setup Logging
|
||||
logging.basicConfig(
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
level=logging.INFO
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MQBot(telegram.bot.Bot):
|
||||
'''A subclass of Bot which delegates send method handling to MQ'''
|
||||
# ===========================
|
||||
# TelegramSender-Klasse
|
||||
# ===========================
|
||||
|
||||
def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs):
|
||||
super(MQBot, self).__init__(*args, **kwargs)
|
||||
# below 2 attributes should be provided for decorator usage
|
||||
self._is_messages_queued_default = is_queued_def
|
||||
self._msg_queue = mqueue or mq.MessageQueue()
|
||||
class TelegramSender:
|
||||
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None):
|
||||
self.bot_token = bot_token
|
||||
self.chat_ids = chat_ids
|
||||
self.max_retries = max_retries if max_retries is not None else 5
|
||||
self.initial_delay = initial_delay if initial_delay is not None else 2
|
||||
self.max_delay = max_delay if max_delay is not None else 300
|
||||
self.msg_queue = queue.Queue()
|
||||
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
|
||||
self._worker.start()
|
||||
|
||||
def __del__(self):
|
||||
def send_message(self, text):
|
||||
for chat_id in self.chat_ids:
|
||||
self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
|
||||
|
||||
def send_location(self, latitude, longitude):
|
||||
for chat_id in self.chat_ids:
|
||||
self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
|
||||
|
||||
def _worker_loop(self):
|
||||
delay = self.initial_delay
|
||||
|
||||
while True:
|
||||
try:
|
||||
self._msg_queue.stop()
|
||||
except:
|
||||
pass
|
||||
msg_type, chat_id, content, retry_count = self.msg_queue.get()
|
||||
|
||||
@mq.queuedmessage
|
||||
def send_message(self, *args, **kwargs):
|
||||
'''Wrapped method would accept new `queued` and `isgroup`
|
||||
OPTIONAL arguments'''
|
||||
return super(MQBot, self).send_message(*args, **kwargs)
|
||||
success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
|
||||
|
||||
if success:
|
||||
delay = self.initial_delay
|
||||
|
||||
elif permanent_failure:
|
||||
logger.error("Permanenter Fehler – Nachricht wird verworfen.")
|
||||
|
||||
elif retry_count >= self.max_retries:
|
||||
logger.error("Maximale Wiederholungsanzahl erreicht – Nachricht wird verworfen.")
|
||||
|
||||
else:
|
||||
logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
|
||||
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
|
||||
|
||||
# Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden
|
||||
wait_time = custom_delay if custom_delay is not None else delay
|
||||
time.sleep(wait_time)
|
||||
|
||||
# Erhöhe Delay für den nächsten Versuch (exponentielles Backoff)
|
||||
delay = min(delay * 2, self.max_delay)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Fehler im Telegram-Worker: {e}")
|
||||
time.sleep(5)
|
||||
|
||||
def _send_to_telegram(self, msg_type, chat_id, content):
|
||||
if msg_type == "text":
|
||||
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
|
||||
payload = {
|
||||
'chat_id': chat_id,
|
||||
'text': content
|
||||
}
|
||||
elif msg_type == "location":
|
||||
url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
|
||||
payload = {
|
||||
'chat_id': chat_id,
|
||||
**content
|
||||
}
|
||||
else:
|
||||
logger.error("Unbekannter Nachrichtentyp.")
|
||||
return False, True, None # Unbekannter Typ = permanent falsch
|
||||
|
||||
try:
|
||||
custom_delay = None # Standardwert für Rückgabe, außer bei 429
|
||||
|
||||
response = requests.post(url, data=payload, timeout=10)
|
||||
|
||||
if response.status_code == 429:
|
||||
custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
|
||||
logger.warning(f"Rate Limit erreicht – warte {custom_delay} Sekunden.")
|
||||
return False, False, custom_delay # Telegram gibt genaue Wartezeit vor
|
||||
|
||||
if response.status_code == 400:
|
||||
logger.error("Ungültige Parameter – Nachricht wird nicht erneut gesendet.")
|
||||
return False, True, custom_delay # Permanent fehlerhaft
|
||||
|
||||
if response.status_code == 401:
|
||||
logger.critical("Ungültiger Bot-Token – bitte prüfen!")
|
||||
return False, True, custom_delay # Permanent fehlerhaft
|
||||
|
||||
response.raise_for_status()
|
||||
logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
|
||||
return True, False, custom_delay
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
|
||||
return False, False, custom_delay
|
||||
|
||||
|
||||
# ===========================
|
||||
# BoswatchPlugin-Klasse
|
||||
# ===========================
|
||||
|
||||
|
||||
class BoswatchPlugin(PluginBase):
|
||||
r"""!Description of the Plugin"""
|
||||
|
||||
def __init__(self, config):
|
||||
r"""!Do not change anything here!"""
|
||||
super().__init__(__name__, config) # you can access the config class on 'self.config'
|
||||
|
||||
def onLoad(self):
|
||||
r"""!Called by import of the plugin"""
|
||||
if self.config.get("queue", default=True):
|
||||
q = mq.MessageQueue()
|
||||
request = Request(con_pool_size=8)
|
||||
self.bot = MQBot(token=self.config.get("botToken", default=""), request=request, mqueue=q)
|
||||
print('queue')
|
||||
else:
|
||||
self.bot = telegram.Bot(token=self.config.get("botToken"))
|
||||
print('normal')
|
||||
bot_token = self.config.get("botToken")
|
||||
chat_ids = self.config.get("chatIds", default=[])
|
||||
|
||||
if not bot_token or not chat_ids:
|
||||
logger.error("botToken oder chatIds fehlen in der Konfiguration!")
|
||||
return
|
||||
|
||||
# Konfigurierbare Parameter mit Fallback-Defaults
|
||||
max_retries = self.config.get("max_retries")
|
||||
initial_delay = self.config.get("initial_delay")
|
||||
max_delay = self.config.get("max_delay")
|
||||
|
||||
self.sender = TelegramSender(
|
||||
bot_token=bot_token,
|
||||
chat_ids=chat_ids,
|
||||
max_retries=max_retries,
|
||||
initial_delay=initial_delay,
|
||||
max_delay=max_delay
|
||||
)
|
||||
|
||||
startup_message = self.config.get("startup_message")
|
||||
if startup_message and startup_message.strip():
|
||||
self.sender.send_message(startup_message)
|
||||
|
||||
def setup(self):
|
||||
r"""!Called before alarm
|
||||
Remove if not implemented"""
|
||||
pass
|
||||
|
||||
def fms(self, bwPacket):
|
||||
r"""!Called on FMS alarm
|
||||
|
||||
@param bwPacket: bwPacket instance"""
|
||||
msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
|
||||
self._sendMessage(msg)
|
||||
self.sender.send_message(msg)
|
||||
|
||||
def pocsag(self, bwPacket):
|
||||
r"""!Called on POCSAG alarm
|
||||
|
||||
@param bwPacket: bwPacket instance"""
|
||||
msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
|
||||
self._sendMessage(msg)
|
||||
self.sender.send_message(msg)
|
||||
|
||||
if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None:
|
||||
logging.debug("Found coordinates in packet")
|
||||
(lat, lon) = (bwPacket.get("lat"), bwPacket.get("lon"))
|
||||
self._sendLocation(lat, lon)
|
||||
lat, lon = bwPacket.get("lat"), bwPacket.get("lon")
|
||||
logger.debug("Koordinaten gefunden – sende Standort.")
|
||||
self.sender.send_location(lat, lon)
|
||||
|
||||
def zvei(self, bwPacket):
|
||||
r"""!Called on ZVEI alarm
|
||||
|
||||
@param bwPacket: bwPacket instance"""
|
||||
msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
|
||||
self._sendMessage(msg)
|
||||
self.sender.send_message(msg)
|
||||
|
||||
def msg(self, bwPacket):
|
||||
r"""!Called on MSG packet
|
||||
|
||||
@param bwPacket: bwPacket instance"""
|
||||
msg = self.parseWildcards(self.config.get("message_msg"))
|
||||
self._sendMessage(msg)
|
||||
self.sender.send_message(msg)
|
||||
|
||||
def _sendMessage(self, message):
|
||||
for chatId in self.config.get("chatIds", default=[]):
|
||||
try:
|
||||
# Send Message via Telegram
|
||||
logging.info("Sending message to " + chatId)
|
||||
self.bot.send_message(chat_id=chatId, text=message)
|
||||
def teardown(self):
|
||||
r"""!Called after alarm
|
||||
Remove if not implemented"""
|
||||
pass
|
||||
|
||||
except Unauthorized:
|
||||
logging.exception("Error while sending Telegram Message, please Check your api-key")
|
||||
except (TimedOut, NetworkError):
|
||||
logging.exception("Error while sending Telegram Message, please Check your connectivity")
|
||||
except (BadRequest, TelegramError):
|
||||
logging.exception("Error while sending Telegram Message")
|
||||
except Exception as e:
|
||||
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
|
||||
|
||||
def _sendLocation(self, lat, lon):
|
||||
for chatId in self.config.get("chatIds", default=[]):
|
||||
try:
|
||||
# Send Location via Telegram
|
||||
if lat is not None and lon is not None:
|
||||
logging.info("Sending location to " + chatId)
|
||||
self.bot.sendLocation(chat_id=chatId, latitude=lat, longitude=lon)
|
||||
|
||||
except Unauthorized:
|
||||
logging.exception("Error while sending Telegram Message, please Check your api-key")
|
||||
except (TimedOut, NetworkError):
|
||||
logging.exception("Error while sending Telegram Message, please Check your connectivity")
|
||||
except (BadRequest, TelegramError):
|
||||
logging.exception("Error while sending Telegram Message")
|
||||
except Exception as e:
|
||||
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
|
||||
def onUnload(self):
|
||||
r"""!Called by destruction of the plugin
|
||||
Remove if not implemented"""
|
||||
pass
|
||||
|
|
|
|||
Loading…
Reference in a new issue