Compare commits

...

4 commits

Author SHA1 Message Date
Bastian Schroll 017e882363
Merge pull request #134 from KoenigMjr/feature/telegram-neu
Some checks failed
build_docs / Build documentation (push) Has been cancelled
CodeQL / CodeQL-Build (push) Has been cancelled
pytest / build (ubuntu-latest, 3.10) (push) Has been cancelled
pytest / build (ubuntu-latest, 3.11) (push) Has been cancelled
pytest / build (ubuntu-latest, 3.12) (push) Has been cancelled
pytest / build (ubuntu-latest, 3.13) (push) Has been cancelled
pytest / build (ubuntu-latest, 3.9) (push) Has been cancelled
build_docs / deploy (push) Has been cancelled
Telegram-Plugin Refactor
2025-10-22 08:53:40 +02:00
Bastian Schroll 2e5479cde2
Merge branch 'develop' into feature/telegram-neu 2025-10-21 13:39:04 +02:00
KoenigMjr 523329a9bb Doku-Ergänzung
update zur neuen Telegram Version

*in Konfiguration hinzugefügt:*
Startup_message
max_retries
initial_delay
max_delay

*gelöscht:*
queue

*im Beispiel:*
Startup_message hinzugefügt
2025-08-08 21:11:48 +02:00
KoenigMjr 6a0a59c3ac Telegram mit Warteschlange
Durch Einbau einer Warteschlange kein Datenverlust bei belegter API (Sendelimit ca. 30 Nachrichten/min, gibt aber Soft-Limit)

Exponentielles Backoff mit Maximalgrenze
Retry-Zähler mit Abbruch bei zu vielen Fehlversuchen
Kein Wiederholen bei permanenten Fehlern (400/401)
dynamische Zeitanpassung bei 429 Fehlern

Fehlerrobustheit verbessert hinsichtlich Connection Error

neues Plugin ohne telegram-bot

* Timeout (timeout=10),
* HTTP-Fehlerprüfung (raise_for_status()),
* Retry-Logik (3 Versuche mit wachsender Wartezeit),
* Sauberem Logging mit logger statt print).

send_location aus altem Skript übernommen und angepasst
2025-08-08 21:11:48 +02:00
2 changed files with 161 additions and 81 deletions

View file

@ -22,11 +22,14 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
|----|------------|-------|
|botToken|Der Api-Key des Telegram-Bots||
|chatIds|Liste mit Chat-Ids der Empfängers / der Emfänger-Gruppen||
|startup_message|Nachricht, dass das Telegram-Plugin erfolgreich geladen wurde|leer|
|message_fms|Format der Nachricht für FMS|`{FMS}`|
|message_pocsag|Format der Nachricht für Pocsag|`{RIC}({SRIC})\n{MSG}`|
|message_zvei|Format der Nachricht für ZVEI|`{TONE}`|
|message_msg|Format der Nachricht für MSG||
|queue|Aktivieren/Deaktivieren der MessageQueue|true|
|max_retries|Anzahl der Versuche, bis das Senden abgebrochen wird|5|
|initial_delay|Verzögerung des zweiten Sendeversuchs|2 [Sek.]|
|max_delay|Maximale Verzögerung|60 [Sek.]|
**Beispiel:**
```yaml
@ -35,6 +38,7 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
res: telegram
config:
message_pocsag: "{RIC}({SRIC})\n{MSG}"
startup_message: "Server up and running!"
botToken: "BOT_TOKEN"
chatIds:
- "CHAT_ID"

View file

@ -10,126 +10,202 @@ r"""!
by Bastian Schroll
@file: telegram.py
@date: 20.02.2020
@author: Jan Speller
@description: Telegram Plugin
@date: 12.07.2025
@author: Claus Schichl nach der Idee von Jan Speller
@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
"""
import logging
import time
import threading
import queue
import requests
from plugin.pluginBase import PluginBase
# ###################### #
# Custom plugin includes #
from telegram.error import (TelegramError, Unauthorized, BadRequest, TimedOut, NetworkError)
from telegram.ext import messagequeue as mq
from telegram.utils.request import Request
import telegram.bot
# ###################### #
logging.debug("- %s loaded", __name__)
# Setup Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
class MQBot(telegram.bot.Bot):
'''A subclass of Bot which delegates send method handling to MQ'''
# ===========================
# TelegramSender-Klasse
# ===========================
def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs):
super(MQBot, self).__init__(*args, **kwargs)
# below 2 attributes should be provided for decorator usage
self._is_messages_queued_default = is_queued_def
self._msg_queue = mqueue or mq.MessageQueue()
class TelegramSender:
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None):
self.bot_token = bot_token
self.chat_ids = chat_ids
self.max_retries = max_retries if max_retries is not None else 5
self.initial_delay = initial_delay if initial_delay is not None else 2
self.max_delay = max_delay if max_delay is not None else 300
self.msg_queue = queue.Queue()
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
def send_message(self, text):
for chat_id in self.chat_ids:
self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
def send_location(self, latitude, longitude):
for chat_id in self.chat_ids:
self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
def _worker_loop(self):
delay = self.initial_delay
while True:
try:
msg_type, chat_id, content, retry_count = self.msg_queue.get()
success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
if success:
delay = self.initial_delay
elif permanent_failure:
logger.error("Permanenter Fehler Nachricht wird verworfen.")
elif retry_count >= self.max_retries:
logger.error("Maximale Wiederholungsanzahl erreicht Nachricht wird verworfen.")
else:
logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
# Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden
wait_time = custom_delay if custom_delay is not None else delay
time.sleep(wait_time)
# Erhöhe Delay für den nächsten Versuch (exponentielles Backoff)
delay = min(delay * 2, self.max_delay)
except Exception as e:
logger.exception(f"Fehler im Telegram-Worker: {e}")
time.sleep(5)
def _send_to_telegram(self, msg_type, chat_id, content):
if msg_type == "text":
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': content
}
elif msg_type == "location":
url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
payload = {
'chat_id': chat_id,
**content
}
else:
logger.error("Unbekannter Nachrichtentyp.")
return False, True, None # Unbekannter Typ = permanent falsch
def __del__(self):
try:
self._msg_queue.stop()
except:
pass
custom_delay = None # Standardwert für Rückgabe, außer bei 429
@mq.queuedmessage
def send_message(self, *args, **kwargs):
'''Wrapped method would accept new `queued` and `isgroup`
OPTIONAL arguments'''
return super(MQBot, self).send_message(*args, **kwargs)
response = requests.post(url, data=payload, timeout=10)
if response.status_code == 429:
custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
logger.warning(f"Rate Limit erreicht warte {custom_delay} Sekunden.")
return False, False, custom_delay # Telegram gibt genaue Wartezeit vor
if response.status_code == 400:
logger.error("Ungültige Parameter Nachricht wird nicht erneut gesendet.")
return False, True, custom_delay # Permanent fehlerhaft
if response.status_code == 401:
logger.critical("Ungültiger Bot-Token bitte prüfen!")
return False, True, custom_delay # Permanent fehlerhaft
response.raise_for_status()
logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
return True, False, custom_delay
except requests.RequestException as e:
logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
return False, False, custom_delay
# ===========================
# BoswatchPlugin-Klasse
# ===========================
class BoswatchPlugin(PluginBase):
r"""!Description of the Plugin"""
def __init__(self, config):
r"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config'
def onLoad(self):
r"""!Called by import of the plugin"""
if self.config.get("queue", default=True):
q = mq.MessageQueue()
request = Request(con_pool_size=8)
self.bot = MQBot(token=self.config.get("botToken", default=""), request=request, mqueue=q)
print('queue')
else:
self.bot = telegram.Bot(token=self.config.get("botToken"))
print('normal')
bot_token = self.config.get("botToken")
chat_ids = self.config.get("chatIds", default=[])
if not bot_token or not chat_ids:
logger.error("botToken oder chatIds fehlen in der Konfiguration!")
return
# Konfigurierbare Parameter mit Fallback-Defaults
max_retries = self.config.get("max_retries")
initial_delay = self.config.get("initial_delay")
max_delay = self.config.get("max_delay")
self.sender = TelegramSender(
bot_token=bot_token,
chat_ids=chat_ids,
max_retries=max_retries,
initial_delay=initial_delay,
max_delay=max_delay
)
startup_message = self.config.get("startup_message")
if startup_message and startup_message.strip():
self.sender.send_message(startup_message)
def setup(self):
r"""!Called before alarm
Remove if not implemented"""
pass
def fms(self, bwPacket):
r"""!Called on FMS alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
self._sendMessage(msg)
self.sender.send_message(msg)
def pocsag(self, bwPacket):
r"""!Called on POCSAG alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
self._sendMessage(msg)
self.sender.send_message(msg)
if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None:
logging.debug("Found coordinates in packet")
(lat, lon) = (bwPacket.get("lat"), bwPacket.get("lon"))
self._sendLocation(lat, lon)
lat, lon = bwPacket.get("lat"), bwPacket.get("lon")
logger.debug("Koordinaten gefunden sende Standort.")
self.sender.send_location(lat, lon)
def zvei(self, bwPacket):
r"""!Called on ZVEI alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
self._sendMessage(msg)
self.sender.send_message(msg)
def msg(self, bwPacket):
r"""!Called on MSG packet
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_msg"))
self._sendMessage(msg)
self.sender.send_message(msg)
def _sendMessage(self, message):
for chatId in self.config.get("chatIds", default=[]):
try:
# Send Message via Telegram
logging.info("Sending message to " + chatId)
self.bot.send_message(chat_id=chatId, text=message)
def teardown(self):
r"""!Called after alarm
Remove if not implemented"""
pass
except Unauthorized:
logging.exception("Error while sending Telegram Message, please Check your api-key")
except (TimedOut, NetworkError):
logging.exception("Error while sending Telegram Message, please Check your connectivity")
except (BadRequest, TelegramError):
logging.exception("Error while sending Telegram Message")
except Exception as e:
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
def _sendLocation(self, lat, lon):
for chatId in self.config.get("chatIds", default=[]):
try:
# Send Location via Telegram
if lat is not None and lon is not None:
logging.info("Sending location to " + chatId)
self.bot.sendLocation(chat_id=chatId, latitude=lat, longitude=lon)
except Unauthorized:
logging.exception("Error while sending Telegram Message, please Check your api-key")
except (TimedOut, NetworkError):
logging.exception("Error while sending Telegram Message, please Check your connectivity")
except (BadRequest, TelegramError):
logging.exception("Error while sending Telegram Message")
except Exception as e:
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
def onUnload(self):
r"""!Called by destruction of the plugin
Remove if not implemented"""
pass