BW3-Core/plugin/telegram.py
KoenigMjr 6a0a59c3ac Telegram mit Warteschlange
Durch Einbau einer Warteschlange kein Datenverlust bei belegter API (Sendelimit ca. 30 Nachrichten/min, gibt aber Soft-Limit)

Exponentielles Backoff mit Maximalgrenze
Retry-Zähler mit Abbruch bei zu vielen Fehlversuchen
Kein Wiederholen bei permanenten Fehlern (400/401)
dynamische Zeitanpassung bei 429 Fehlern

Fehlerrobustheit verbessert hinsichtlich Connection Error

neues Plugin ohne telegram-bot

* Timeout (timeout=10),
* HTTP-Fehlerprüfung (raise_for_status()),
* Retry-Logik (3 Versuche mit wachsender Wartezeit),
* Sauberem Logging mit logger statt print).

send_location aus altem Skript übernommen und angepasst
2025-08-08 21:11:48 +02:00

212 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python
# -*- coding: utf-8 -*-
r"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: telegram.py
@date: 12.07.2025
@author: Claus Schichl nach der Idee von Jan Speller
@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
"""
import logging
import time
import threading
import queue
import requests
from plugin.pluginBase import PluginBase
# Setup Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# ===========================
# TelegramSender-Klasse
# ===========================
class TelegramSender:
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None):
self.bot_token = bot_token
self.chat_ids = chat_ids
self.max_retries = max_retries if max_retries is not None else 5
self.initial_delay = initial_delay if initial_delay is not None else 2
self.max_delay = max_delay if max_delay is not None else 300
self.msg_queue = queue.Queue()
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
def send_message(self, text):
for chat_id in self.chat_ids:
self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
def send_location(self, latitude, longitude):
for chat_id in self.chat_ids:
self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
def _worker_loop(self):
delay = self.initial_delay
while True:
try:
msg_type, chat_id, content, retry_count = self.msg_queue.get()
success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
if success:
delay = self.initial_delay
elif permanent_failure:
logger.error("Permanenter Fehler Nachricht wird verworfen.")
elif retry_count >= self.max_retries:
logger.error("Maximale Wiederholungsanzahl erreicht Nachricht wird verworfen.")
else:
logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
# Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden
wait_time = custom_delay if custom_delay is not None else delay
time.sleep(wait_time)
# Erhöhe Delay für den nächsten Versuch (exponentielles Backoff)
delay = min(delay * 2, self.max_delay)
except Exception as e:
logger.exception(f"Fehler im Telegram-Worker: {e}")
time.sleep(5)
def _send_to_telegram(self, msg_type, chat_id, content):
if msg_type == "text":
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': content
}
elif msg_type == "location":
url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
payload = {
'chat_id': chat_id,
**content
}
else:
logger.error("Unbekannter Nachrichtentyp.")
return False, True, None # Unbekannter Typ = permanent falsch
try:
custom_delay = None # Standardwert für Rückgabe, außer bei 429
response = requests.post(url, data=payload, timeout=10)
if response.status_code == 429:
custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
logger.warning(f"Rate Limit erreicht warte {custom_delay} Sekunden.")
return False, False, custom_delay # Telegram gibt genaue Wartezeit vor
if response.status_code == 400:
logger.error("Ungültige Parameter Nachricht wird nicht erneut gesendet.")
return False, True, custom_delay # Permanent fehlerhaft
if response.status_code == 401:
logger.critical("Ungültiger Bot-Token bitte prüfen!")
return False, True, custom_delay # Permanent fehlerhaft
response.raise_for_status()
logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
return True, False, custom_delay
except requests.RequestException as e:
logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
return False, False, custom_delay
# ===========================
# BoswatchPlugin-Klasse
# ===========================
class BoswatchPlugin(PluginBase):
def __init__(self, config):
r"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config'
def onLoad(self):
r"""!Called by import of the plugin"""
bot_token = self.config.get("botToken")
chat_ids = self.config.get("chatIds", default=[])
if not bot_token or not chat_ids:
logger.error("botToken oder chatIds fehlen in der Konfiguration!")
return
# Konfigurierbare Parameter mit Fallback-Defaults
max_retries = self.config.get("max_retries")
initial_delay = self.config.get("initial_delay")
max_delay = self.config.get("max_delay")
self.sender = TelegramSender(
bot_token=bot_token,
chat_ids=chat_ids,
max_retries=max_retries,
initial_delay=initial_delay,
max_delay=max_delay
)
startup_message = self.config.get("startup_message")
if startup_message and startup_message.strip():
self.sender.send_message(startup_message)
def setup(self):
r"""!Called before alarm
Remove if not implemented"""
pass
def fms(self, bwPacket):
r"""!Called on FMS alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
self.sender.send_message(msg)
def pocsag(self, bwPacket):
r"""!Called on POCSAG alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
self.sender.send_message(msg)
if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None:
lat, lon = bwPacket.get("lat"), bwPacket.get("lon")
logger.debug("Koordinaten gefunden sende Standort.")
self.sender.send_location(lat, lon)
def zvei(self, bwPacket):
r"""!Called on ZVEI alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
self.sender.send_message(msg)
def msg(self, bwPacket):
r"""!Called on MSG packet
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_msg"))
self.sender.send_message(msg)
def teardown(self):
r"""!Called after alarm
Remove if not implemented"""
pass
def onUnload(self):
r"""!Called by destruction of the plugin
Remove if not implemented"""
pass