Telegram mit Warteschlange

Durch Einbau einer Warteschlange kein Datenverlust bei belegter API (Sendelimit ca. 30 Nachrichten/min, gibt aber Soft-Limit)

Exponentielles Backoff mit Maximalgrenze
Retry-Zähler mit Abbruch bei zu vielen Fehlversuchen
Kein Wiederholen bei permanenten Fehlern (400/401)
dynamische Zeitanpassung bei 429 Fehlern

Fehlerrobustheit verbessert hinsichtlich Connection Error

neues Plugin ohne telegram-bot

* Timeout (timeout=10),
* HTTP-Fehlerprüfung (raise_for_status()),
* Retry-Logik (3 Versuche mit wachsender Wartezeit),
* Sauberem Logging mit logger statt print).

send_location aus altem Skript übernommen und angepasst
This commit is contained in:
KoenigMjr 2025-07-11 22:24:39 +02:00
parent a1cb545c39
commit 6a0a59c3ac

View file

@ -10,126 +10,202 @@ r"""!
by Bastian Schroll by Bastian Schroll
@file: telegram.py @file: telegram.py
@date: 20.02.2020 @date: 12.07.2025
@author: Jan Speller @author: Claus Schichl nach der Idee von Jan Speller
@description: Telegram Plugin @description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
""" """
import logging import logging
import time
import threading
import queue
import requests
from plugin.pluginBase import PluginBase from plugin.pluginBase import PluginBase
# ###################### # # Setup Logging
# Custom plugin includes # logging.basicConfig(
from telegram.error import (TelegramError, Unauthorized, BadRequest, TimedOut, NetworkError) format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
from telegram.ext import messagequeue as mq level=logging.INFO
from telegram.utils.request import Request )
import telegram.bot logger = logging.getLogger(__name__)
# ###################### #
logging.debug("- %s loaded", __name__)
class MQBot(telegram.bot.Bot): # ===========================
'''A subclass of Bot which delegates send method handling to MQ''' # TelegramSender-Klasse
# ===========================
def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs): class TelegramSender:
super(MQBot, self).__init__(*args, **kwargs) def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None):
# below 2 attributes should be provided for decorator usage self.bot_token = bot_token
self._is_messages_queued_default = is_queued_def self.chat_ids = chat_ids
self._msg_queue = mqueue or mq.MessageQueue() self.max_retries = max_retries if max_retries is not None else 5
self.initial_delay = initial_delay if initial_delay is not None else 2
self.max_delay = max_delay if max_delay is not None else 300
self.msg_queue = queue.Queue()
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
def send_message(self, text):
for chat_id in self.chat_ids:
self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
def send_location(self, latitude, longitude):
for chat_id in self.chat_ids:
self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
def _worker_loop(self):
delay = self.initial_delay
while True:
try:
msg_type, chat_id, content, retry_count = self.msg_queue.get()
success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
if success:
delay = self.initial_delay
elif permanent_failure:
logger.error("Permanenter Fehler Nachricht wird verworfen.")
elif retry_count >= self.max_retries:
logger.error("Maximale Wiederholungsanzahl erreicht Nachricht wird verworfen.")
else:
logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
# Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden
wait_time = custom_delay if custom_delay is not None else delay
time.sleep(wait_time)
# Erhöhe Delay für den nächsten Versuch (exponentielles Backoff)
delay = min(delay * 2, self.max_delay)
except Exception as e:
logger.exception(f"Fehler im Telegram-Worker: {e}")
time.sleep(5)
def _send_to_telegram(self, msg_type, chat_id, content):
if msg_type == "text":
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': content
}
elif msg_type == "location":
url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
payload = {
'chat_id': chat_id,
**content
}
else:
logger.error("Unbekannter Nachrichtentyp.")
return False, True, None # Unbekannter Typ = permanent falsch
def __del__(self):
try: try:
self._msg_queue.stop() custom_delay = None # Standardwert für Rückgabe, außer bei 429
except:
pass
@mq.queuedmessage response = requests.post(url, data=payload, timeout=10)
def send_message(self, *args, **kwargs):
'''Wrapped method would accept new `queued` and `isgroup` if response.status_code == 429:
OPTIONAL arguments''' custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
return super(MQBot, self).send_message(*args, **kwargs) logger.warning(f"Rate Limit erreicht warte {custom_delay} Sekunden.")
return False, False, custom_delay # Telegram gibt genaue Wartezeit vor
if response.status_code == 400:
logger.error("Ungültige Parameter Nachricht wird nicht erneut gesendet.")
return False, True, custom_delay # Permanent fehlerhaft
if response.status_code == 401:
logger.critical("Ungültiger Bot-Token bitte prüfen!")
return False, True, custom_delay # Permanent fehlerhaft
response.raise_for_status()
logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
return True, False, custom_delay
except requests.RequestException as e:
logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
return False, False, custom_delay
# ===========================
# BoswatchPlugin-Klasse
# ===========================
class BoswatchPlugin(PluginBase): class BoswatchPlugin(PluginBase):
r"""!Description of the Plugin"""
def __init__(self, config): def __init__(self, config):
r"""!Do not change anything here!""" r"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config' super().__init__(__name__, config) # you can access the config class on 'self.config'
def onLoad(self): def onLoad(self):
r"""!Called by import of the plugin""" r"""!Called by import of the plugin"""
if self.config.get("queue", default=True): bot_token = self.config.get("botToken")
q = mq.MessageQueue() chat_ids = self.config.get("chatIds", default=[])
request = Request(con_pool_size=8)
self.bot = MQBot(token=self.config.get("botToken", default=""), request=request, mqueue=q) if not bot_token or not chat_ids:
print('queue') logger.error("botToken oder chatIds fehlen in der Konfiguration!")
else: return
self.bot = telegram.Bot(token=self.config.get("botToken"))
print('normal') # Konfigurierbare Parameter mit Fallback-Defaults
max_retries = self.config.get("max_retries")
initial_delay = self.config.get("initial_delay")
max_delay = self.config.get("max_delay")
self.sender = TelegramSender(
bot_token=bot_token,
chat_ids=chat_ids,
max_retries=max_retries,
initial_delay=initial_delay,
max_delay=max_delay
)
startup_message = self.config.get("startup_message")
if startup_message and startup_message.strip():
self.sender.send_message(startup_message)
def setup(self):
r"""!Called before alarm
Remove if not implemented"""
pass
def fms(self, bwPacket): def fms(self, bwPacket):
r"""!Called on FMS alarm r"""!Called on FMS alarm
@param bwPacket: bwPacket instance""" @param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}")) msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
self._sendMessage(msg) self.sender.send_message(msg)
def pocsag(self, bwPacket): def pocsag(self, bwPacket):
r"""!Called on POCSAG alarm r"""!Called on POCSAG alarm
@param bwPacket: bwPacket instance""" @param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}")) msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
self._sendMessage(msg) self.sender.send_message(msg)
if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None: if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None:
logging.debug("Found coordinates in packet") lat, lon = bwPacket.get("lat"), bwPacket.get("lon")
(lat, lon) = (bwPacket.get("lat"), bwPacket.get("lon")) logger.debug("Koordinaten gefunden sende Standort.")
self._sendLocation(lat, lon) self.sender.send_location(lat, lon)
def zvei(self, bwPacket): def zvei(self, bwPacket):
r"""!Called on ZVEI alarm r"""!Called on ZVEI alarm
@param bwPacket: bwPacket instance""" @param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}")) msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
self._sendMessage(msg) self.sender.send_message(msg)
def msg(self, bwPacket): def msg(self, bwPacket):
r"""!Called on MSG packet r"""!Called on MSG packet
@param bwPacket: bwPacket instance""" @param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_msg")) msg = self.parseWildcards(self.config.get("message_msg"))
self._sendMessage(msg) self.sender.send_message(msg)
def _sendMessage(self, message): def teardown(self):
for chatId in self.config.get("chatIds", default=[]): r"""!Called after alarm
try: Remove if not implemented"""
# Send Message via Telegram pass
logging.info("Sending message to " + chatId)
self.bot.send_message(chat_id=chatId, text=message)
except Unauthorized: def onUnload(self):
logging.exception("Error while sending Telegram Message, please Check your api-key") r"""!Called by destruction of the plugin
except (TimedOut, NetworkError): Remove if not implemented"""
logging.exception("Error while sending Telegram Message, please Check your connectivity") pass
except (BadRequest, TelegramError):
logging.exception("Error while sending Telegram Message")
except Exception as e:
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
def _sendLocation(self, lat, lon):
for chatId in self.config.get("chatIds", default=[]):
try:
# Send Location via Telegram
if lat is not None and lon is not None:
logging.info("Sending location to " + chatId)
self.bot.sendLocation(chat_id=chatId, latitude=lat, longitude=lon)
except Unauthorized:
logging.exception("Error while sending Telegram Message, please Check your api-key")
except (TimedOut, NetworkError):
logging.exception("Error while sending Telegram Message, please Check your connectivity")
except (BadRequest, TelegramError):
logging.exception("Error while sending Telegram Message")
except Exception as e:
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))