diff --git a/docu/docs/plugin/telegram.md b/docu/docs/plugin/telegram.md index bd82296..8c23bbd 100644 --- a/docu/docs/plugin/telegram.md +++ b/docu/docs/plugin/telegram.md @@ -22,11 +22,14 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram |----|------------|-------| |botToken|Der Api-Key des Telegram-Bots|| |chatIds|Liste mit Chat-Ids der Empfängers / der Emfänger-Gruppen|| +|startup_message|Nachricht, dass das Telegram-Plugin erfolgreich geladen wurde|leer| |message_fms|Format der Nachricht für FMS|`{FMS}`| |message_pocsag|Format der Nachricht für Pocsag|`{RIC}({SRIC})\n{MSG}`| |message_zvei|Format der Nachricht für ZVEI|`{TONE}`| |message_msg|Format der Nachricht für MSG|| -|queue|Aktivieren/Deaktivieren der MessageQueue|true| +|max_retries|Anzahl der Versuche, bis das Senden abgebrochen wird|5| +|initial_delay|Verzögerung des zweiten Sendeversuchs|2 [Sek.]| +|max_delay|Maximale Verzögerung|60 [Sek.]| **Beispiel:** ```yaml @@ -35,6 +38,7 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram res: telegram config: message_pocsag: "{RIC}({SRIC})\n{MSG}" + startup_message: "Server up and running!" botToken: "BOT_TOKEN" chatIds: - "CHAT_ID" diff --git a/plugin/telegram.py b/plugin/telegram.py index 80cd5dc..a73a222 100644 --- a/plugin/telegram.py +++ b/plugin/telegram.py @@ -10,126 +10,202 @@ r"""! by Bastian Schroll @file: telegram.py -@date: 20.02.2020 -@author: Jan Speller -@description: Telegram Plugin +@date: 12.07.2025 +@author: Claus Schichl nach der Idee von Jan Speller +@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten """ + import logging +import time +import threading +import queue +import requests from plugin.pluginBase import PluginBase -# ###################### # -# Custom plugin includes # -from telegram.error import (TelegramError, Unauthorized, BadRequest, TimedOut, NetworkError) -from telegram.ext import messagequeue as mq -from telegram.utils.request import Request -import telegram.bot -# ###################### # - -logging.debug("- %s loaded", __name__) +# Setup Logging +logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) -class MQBot(telegram.bot.Bot): - '''A subclass of Bot which delegates send method handling to MQ''' +# =========================== +# TelegramSender-Klasse +# =========================== - def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs): - super(MQBot, self).__init__(*args, **kwargs) - # below 2 attributes should be provided for decorator usage - self._is_messages_queued_default = is_queued_def - self._msg_queue = mqueue or mq.MessageQueue() +class TelegramSender: + def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None): + self.bot_token = bot_token + self.chat_ids = chat_ids + self.max_retries = max_retries if max_retries is not None else 5 + self.initial_delay = initial_delay if initial_delay is not None else 2 + self.max_delay = max_delay if max_delay is not None else 300 + self.msg_queue = queue.Queue() + self._worker = threading.Thread(target=self._worker_loop, daemon=True) + self._worker.start() + + def send_message(self, text): + for chat_id in self.chat_ids: + self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0 + + def send_location(self, latitude, longitude): + for chat_id in self.chat_ids: + self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0)) + + def _worker_loop(self): + delay = self.initial_delay + + while True: + try: + msg_type, chat_id, content, retry_count = self.msg_queue.get() + + success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content) + + if success: + delay = self.initial_delay + + elif permanent_failure: + logger.error("Permanenter Fehler – Nachricht wird verworfen.") + + elif retry_count >= self.max_retries: + logger.error("Maximale Wiederholungsanzahl erreicht – Nachricht wird verworfen.") + + else: + logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).") + self.msg_queue.put((msg_type, chat_id, content, retry_count + 1)) + + # Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden + wait_time = custom_delay if custom_delay is not None else delay + time.sleep(wait_time) + + # Erhöhe Delay für den nächsten Versuch (exponentielles Backoff) + delay = min(delay * 2, self.max_delay) + + except Exception as e: + logger.exception(f"Fehler im Telegram-Worker: {e}") + time.sleep(5) + + def _send_to_telegram(self, msg_type, chat_id, content): + if msg_type == "text": + url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" + payload = { + 'chat_id': chat_id, + 'text': content + } + elif msg_type == "location": + url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation" + payload = { + 'chat_id': chat_id, + **content + } + else: + logger.error("Unbekannter Nachrichtentyp.") + return False, True, None # Unbekannter Typ = permanent falsch - def __del__(self): try: - self._msg_queue.stop() - except: - pass + custom_delay = None # Standardwert für Rückgabe, außer bei 429 - @mq.queuedmessage - def send_message(self, *args, **kwargs): - '''Wrapped method would accept new `queued` and `isgroup` - OPTIONAL arguments''' - return super(MQBot, self).send_message(*args, **kwargs) + response = requests.post(url, data=payload, timeout=10) + + if response.status_code == 429: + custom_delay = response.json().get("parameters", {}).get("retry_after", 5) + logger.warning(f"Rate Limit erreicht – warte {custom_delay} Sekunden.") + return False, False, custom_delay # Telegram gibt genaue Wartezeit vor + + if response.status_code == 400: + logger.error("Ungültige Parameter – Nachricht wird nicht erneut gesendet.") + return False, True, custom_delay # Permanent fehlerhaft + + if response.status_code == 401: + logger.critical("Ungültiger Bot-Token – bitte prüfen!") + return False, True, custom_delay # Permanent fehlerhaft + + response.raise_for_status() + logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}") + return True, False, custom_delay + + except requests.RequestException as e: + logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}") + return False, False, custom_delay + + +# =========================== +# BoswatchPlugin-Klasse +# =========================== class BoswatchPlugin(PluginBase): - r"""!Description of the Plugin""" - def __init__(self, config): r"""!Do not change anything here!""" super().__init__(__name__, config) # you can access the config class on 'self.config' def onLoad(self): r"""!Called by import of the plugin""" - if self.config.get("queue", default=True): - q = mq.MessageQueue() - request = Request(con_pool_size=8) - self.bot = MQBot(token=self.config.get("botToken", default=""), request=request, mqueue=q) - print('queue') - else: - self.bot = telegram.Bot(token=self.config.get("botToken")) - print('normal') + bot_token = self.config.get("botToken") + chat_ids = self.config.get("chatIds", default=[]) + + if not bot_token or not chat_ids: + logger.error("botToken oder chatIds fehlen in der Konfiguration!") + return + + # Konfigurierbare Parameter mit Fallback-Defaults + max_retries = self.config.get("max_retries") + initial_delay = self.config.get("initial_delay") + max_delay = self.config.get("max_delay") + + self.sender = TelegramSender( + bot_token=bot_token, + chat_ids=chat_ids, + max_retries=max_retries, + initial_delay=initial_delay, + max_delay=max_delay + ) + + startup_message = self.config.get("startup_message") + if startup_message and startup_message.strip(): + self.sender.send_message(startup_message) + + def setup(self): + r"""!Called before alarm + Remove if not implemented""" + pass def fms(self, bwPacket): r"""!Called on FMS alarm - @param bwPacket: bwPacket instance""" msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}")) - self._sendMessage(msg) + self.sender.send_message(msg) def pocsag(self, bwPacket): r"""!Called on POCSAG alarm - @param bwPacket: bwPacket instance""" msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}")) - self._sendMessage(msg) + self.sender.send_message(msg) if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None: - logging.debug("Found coordinates in packet") - (lat, lon) = (bwPacket.get("lat"), bwPacket.get("lon")) - self._sendLocation(lat, lon) + lat, lon = bwPacket.get("lat"), bwPacket.get("lon") + logger.debug("Koordinaten gefunden – sende Standort.") + self.sender.send_location(lat, lon) def zvei(self, bwPacket): r"""!Called on ZVEI alarm - @param bwPacket: bwPacket instance""" msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}")) - self._sendMessage(msg) + self.sender.send_message(msg) def msg(self, bwPacket): r"""!Called on MSG packet - @param bwPacket: bwPacket instance""" msg = self.parseWildcards(self.config.get("message_msg")) - self._sendMessage(msg) + self.sender.send_message(msg) - def _sendMessage(self, message): - for chatId in self.config.get("chatIds", default=[]): - try: - # Send Message via Telegram - logging.info("Sending message to " + chatId) - self.bot.send_message(chat_id=chatId, text=message) + def teardown(self): + r"""!Called after alarm + Remove if not implemented""" + pass - except Unauthorized: - logging.exception("Error while sending Telegram Message, please Check your api-key") - except (TimedOut, NetworkError): - logging.exception("Error while sending Telegram Message, please Check your connectivity") - except (BadRequest, TelegramError): - logging.exception("Error while sending Telegram Message") - except Exception as e: - logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e)) - - def _sendLocation(self, lat, lon): - for chatId in self.config.get("chatIds", default=[]): - try: - # Send Location via Telegram - if lat is not None and lon is not None: - logging.info("Sending location to " + chatId) - self.bot.sendLocation(chat_id=chatId, latitude=lat, longitude=lon) - - except Unauthorized: - logging.exception("Error while sending Telegram Message, please Check your api-key") - except (TimedOut, NetworkError): - logging.exception("Error while sending Telegram Message, please Check your connectivity") - except (BadRequest, TelegramError): - logging.exception("Error while sending Telegram Message") - except Exception as e: - logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e)) + def onUnload(self): + r"""!Called by destruction of the plugin + Remove if not implemented""" + pass