[upd/telegram]: improve stability, safety and production robustness

Updated the Telegram plugin to handle high-load scenarios and prevent
resource exhaustion. Key focus areas were message formatting,
concurrency management, and configuration resilience.

- Implement bounded message queue (max 100) with non-blocking drops to prevent memory leaks
- Add graceful shutdown logic with worker thread joining and queue draining
- Add self-healing initialization (`_ensure_sender`) to handle race conditions during startup
- Implement robust escaping/sanitization for HTML and MarkdownV2 parse modes
- Enforce Telegram's 4096 character limit with graceful truncation
- Enhance error diagnostics for API responses (Rate limiting, 4xx/5xx errors)
- Validate and sanitize GPS coordinates (range and type checking)
- Decouple logging from global config by using module-level logger

Behavioral Changes:
- BREAKING: Location messages now require `coordinates: true` in config (previously default)
- Messages are dropped with an error log when the queue is full (prevents system hang)
- Invalid HTML/Markdown characters are now automatically escaped to prevent API errors
This commit is contained in:
KoenigMjr 2025-12-19 18:56:35 +01:00
parent eee8aa6684
commit edf7a6afe8
2 changed files with 197 additions and 127 deletions

View file

@ -2,12 +2,12 @@
---
## Beschreibung
Dieses Plugin ermöglicht das Versenden von Telegram-Nachrichten für verschiedene Alarmierungsarten.
Wenn im eingehenden Paket die Felder `lat` und `lon` vorhanden sind (z. B. durch das [Geocoding](../modul/geocoding.md) Modul), wird zusätzlich automatisch der Standort als Telegram-Location gesendet.
Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und exponentiellem Backoff, um die Vorgaben der Telegram API einzuhalten und Nachrichtenverluste zu verhindern. Die Retry-Parameter (max_retries, initial_delay, max_delay) können in der Konfiguration angepasst werden.
Dieses Plugin ermöglicht den Versand von Telegram-Nachrichten für verschiedene Alarmierungsarten. Um eine hohe Stabilität im BOS-Betrieb zu gewährleisten, erfolgt der Versand asynchron über eine interne Warteschlange (Queue) mit Überlastschutz.
## Unterstütze Alarmtypen
Das Plugin hält die Vorgaben der Telegram API automatisch ein: Eine integrierte Retry-Logik mit exponentiellem Backoff verhindert Nachrichtenverluste bei temporären Netzwerkproblemen. Zudem werden Nachrichten, die das Telegram-Limit von 4.096 Zeichen überschreiten, automatisch gekürzt. Wenn Standortdaten (lat/lon) vorhanden sind, kann das Plugin diese als native Karte senden (erfordert [Geocoding-Modul](../modul/geocoding.md) und Aktivierung via coordinates).
## Unterstützte Alarmtypen
- FMS
- POCSAG
- ZVEI
@ -26,11 +26,12 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und
|message_fms|Formatvorlage für FMS-Alarm|`{FMS}`|
|message_pocsag|Formatvorlage für POCSAG|`{RIC}({SRIC})\n{MSG}`|
|message_zvei|Formatvorlage für ZVEI|`{TONE}`|
|message_msg|Formatvorlage für MSG-Nachricht|-|
|message_msg|Formatvorlage für MSG-Nachricht|`{MSG}`|
|max_retries|Anzahl Wiederholungsversuche bei Fehlern|5|
|initial_delay|Initiale Wartezeit bei Wiederholungsversuchen|2 [Sek.]|
|max_delay|Maximale Retry-Verzögerung|300 [Sek.]|
|parse_mode|Formatierung ("HTML" oder "MarkdownV2"), Case-sensitive!|leer|
|parse_mode|Formatierung ("HTML" oder "MarkdownV2"), !Case-sensitive! Empfehlung: HTML|leer|
|coordinates|Aktiviert die Verarbeitung von Standortdaten|false|
**Beispiel:**
```yaml
@ -38,6 +39,7 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und
name: Telegram Plugin
res: telegram
config:
coordinates: true
message_pocsag: |
<b>POCSAG Alarm:</b>
RIC: <b>{RIC}</b> ({SRIC})
@ -49,21 +51,44 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und
- "CHAT_ID"
```
Hinweis:
### parse_mode
Über parse_mode kannst du Telegram-Formatierungen verwenden:
- HTML: `<b>fett</b>`, `<i>kursiv</i>`, `<u>unterstrichen</u>`, `<s>durchgestrichen</s>`, ...
- MarkdownV2: `**fett**`, `__unterstrichen__`, `_italic \*text_` usw. (Escape-Regeln beachten)
- MarkdownV2: `**fett**`, `__unterstrichen__`, `_italic \*text_` usw.
**Wichtig**: Bei MarkdownV2 werden alle Sonderzeichen innerhalb der Wildcards (wie {MSG}) automatisch escaped. Das verhindert zwar API-Fehler, macht aber eine bewusste Formatierung innerhalb des Funktextes unmöglich.
**Nutze HTML**, wenn du fettgedruckte oder kursive Elemente in deinem Template verwenden möchtest, ohne dass der Inhalt der Nachricht verändert wird.
```yaml
# EMPFOHLEN: HTML für Formatierung
parse_mode: "HTML"
message_pocsag: |
<b>POCSAG Alarm:</b>
RIC: {RIC}
{MSG}
# NICHT EMPFOHLEN: MarkdownV2
# (alle Sonderzeichen in {MSG} werden escaped)
parse_mode: "MarkdownV2"
message_pocsag: "*Alarm*\nRIC: {RIC}"
```
Block-Strings (|) eignen sich perfekt für mehrzeilige Nachrichten und vermeiden Escape-Zeichen wie \n
### coordinates
Der Versand von Standorten ist standardmäßig deaktiviert (`coordinates: false`), um unnötige Warnmeldungen im Log zu vermeiden, wenn keine Koordinaten im Datenpaket enthalten sind. Setze diesen Wert nur auf `true`, wenn du sicherstellst, dass die Alarmierung Koordinaten liefert (z.B. durch einen vorgeschalteten Geocoder).
**Verhalten beim Standortversand:**
Bei aktivierten Koordinaten sendet das Plugin zusätzlich zum Alarmtext eine native Telegram-Karte als separate Nachricht. Es sind keine Wildcards im Nachrichtentext erforderlich; die Karte wird automatisch unter dem Text gepostet.
---
## Modul Abhängigkeiten
OPTIONAL, nur für POCSAG-Locationversand: Aus dem Modul [Geocoding](../modul/geocoding.md):
- `lat`
- `lon`
---
## Externe Abhängigkeiten
keine
requests

View file

@ -10,9 +10,9 @@ r"""!
by Bastian Schroll
@file: telegram.py
@date: 17.11.2025
@author: Claus Schichl nach der Idee von Jan Speller
@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
@date: 17.01.2026
@author: Claus Schichl
@description: Telegram-Plugin
"""
import logging
@ -20,13 +20,10 @@ import time
import threading
import queue
import requests
import re
from plugin.pluginBase import PluginBase
# Setup Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
# Setup logging
logger = logging.getLogger(__name__)
@ -36,181 +33,229 @@ logger = logging.getLogger(__name__)
class TelegramSender:
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None, parse_mode=None):
self._stop_event = threading.Event()
self.bot_token = bot_token
self.chat_ids = chat_ids
self.max_retries = max_retries if max_retries is not None else 5
self.initial_delay = initial_delay if initial_delay is not None else 2
self.max_delay = max_delay if max_delay is not None else 300
self.msg_queue = queue.Queue(maxsize=100) # max 100 messages
self.parse_mode = parse_mode
self.msg_queue = queue.Queue()
# Start Worker Thread
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
def send_message(self, text):
clean_text = self._escape_text(text, self.parse_mode)
for chat_id in self.chat_ids:
self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
try:
self.msg_queue.put(("text", chat_id, clean_text, 0), block=False)
except queue.Full:
logger.error(f"Message queue full for chat_id {chat_id}, message discarded: {clean_text[:50]}...")
def send_location(self, latitude, longitude):
try:
# Use validated float values, not original strings
lat = float(latitude)
lon = float(longitude)
# check Telegram API Limits
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
logger.error(f"Invalid coordinates: lat={lat}, lon={lon} (out of range)")
return
except (TypeError, ValueError) as e:
logger.error(f"Invalid coordinate format: {e}, location skipped")
return
for chat_id in self.chat_ids:
self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
try:
self.msg_queue.put(("location", chat_id, {"latitude": lat, "longitude": lon}, 0), block=False)
except queue.Full:
logger.error(f"Location queue full for chat_id {chat_id}, location discarded: {lat}, {lon}")
@staticmethod
def _escape_text(text, parse_mode):
if not text:
return ""
if parse_mode == "HTML":
protected_tags = {}
tag_pattern = r'<(/?)(\w+)>'
allowed = ["b", "strong", "i", "em", "code", "pre", "u", "s"]
def protect_tag(match):
tag_full = match.group(0)
tag_name = match.group(2)
if tag_name in allowed:
placeholder = f"__TAG_{len(protected_tags)}__"
protected_tags[placeholder] = tag_full
return placeholder
return tag_full
text = re.sub(tag_pattern, protect_tag, text)
text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
for placeholder, original_tag in protected_tags.items():
text = text.replace(placeholder, original_tag)
elif parse_mode == "MarkdownV2":
# Escape all MarkdownV2 special characters (Telegram API requirement)
# See: https://core.telegram.org/bots/api#markdownv2-style
escape_chars = r'_*[]()~`>#+-=|{}.!'
text = "".join(['\\' + char if char in escape_chars else char for char in text])
return text[:4090] + "[...]" if len(text) > 4096 else text
def _worker_loop(self):
delay = self.initial_delay
while True:
while not self._stop_event.is_set():
try:
msg_type, chat_id, content, retry_count = self.msg_queue.get()
success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
msg_type, chat_id, content, retry_count = self.msg_queue.get(timeout=1)
success, permanent_failure, custom_delay, error_details = self._send_to_telegram(msg_type, chat_id, content)
if success:
delay = self.initial_delay
elif permanent_failure:
logger.error("Permanenter Fehler Nachricht wird verworfen.")
elif retry_count >= self.max_retries:
logger.error("Maximale Wiederholungsanzahl erreicht Nachricht wird verworfen.")
elif permanent_failure or retry_count >= self.max_retries:
logger.warning(f"Discarding message for {chat_id}: {error_details}")
else:
logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
# use the Telegram-provided value (retry_after) if available
wait_time = custom_delay if custom_delay is not None else delay
time.sleep(wait_time)
# increase delay for the next attempt (exponential backoff)
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
delay = min(delay * 2, self.max_delay)
except queue.Empty:
continue
except Exception as e:
logger.exception(f"Fehler im Telegram-Worker: {e}")
logger.error(f"Error in Telegram worker: {e}")
time.sleep(5)
def _send_to_telegram(self, msg_type, chat_id, content):
url = f"https://api.telegram.org/bot{self.bot_token}/"
url += "sendMessage" if msg_type == "text" else "sendLocation"
payload = {'chat_id': chat_id}
if msg_type == "text":
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': content,
}
payload['text'] = content
if self.parse_mode:
payload['parse_mode'] = self.parse_mode
elif msg_type == "location":
url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
payload = {
'chat_id': chat_id,
**content
}
else:
logger.error("Unbekannter Nachrichtentyp.")
return False, True, None # unknown message type = permanent failure
payload.update(content)
try:
custom_delay = None # standardvalue for return, except in case of 429
response = requests.post(url, data=payload, timeout=10)
if response.status_code == 200:
logger.info(f"Successfully sent to Chat-ID {chat_id}")
return True, False, None, None
# Rate limiting
if response.status_code == 429:
custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
logger.warning(f"Rate Limit erreicht warte {custom_delay} Sekunden.")
return False, False, custom_delay # Telegram gives exact wait time
retry_after = response.json().get("parameters", {}).get("retry_after", 5)
logger.warning(f"Rate limited for {chat_id}, retry after {retry_after}s")
return False, False, retry_after, f"Rate Limit (retry after {retry_after}s)"
if response.status_code == 400:
logger.error("Ungültige Parameter Nachricht wird nicht erneut gesendet.")
return False, True, custom_delay # permanent failure
return False, response.status_code < 500, None, f"HTTP {response.status_code}"
except Exception as e:
return False, False, None, str(e)
if response.status_code == 401:
logger.critical("Ungültiger Bot-Token bitte prüfen!")
return False, True, custom_delay # permanent failure
response.raise_for_status()
logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
return True, False, custom_delay
except requests.RequestException as e:
logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
return False, False, custom_delay
def shutdown(self):
r"""Graceful shutdown with queue draining"""
logger.info("Shutting down Telegram sender...")
self._stop_event.set()
timeout = time.time() + 5
while not self.msg_queue.empty() and time.time() < timeout:
time.sleep(0.1)
remaining = self.msg_queue.qsize()
if remaining > 0:
logger.warning(f"{remaining} messages in queue discarded during shutdown")
self._worker.join(timeout=5)
# ===========================
# BoswatchPlugin-Class
# ===========================
logging.debug("- %s loaded", __name__)
class BoswatchPlugin(PluginBase):
r"""!Description of the Plugin"""
def __init__(self, config):
r"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config'
def _ensure_sender(self):
r""" checking with hasattr if self.sender already exists"""
if not hasattr(self, 'sender') or self.sender is None:
token = self.config.get("botToken")
ids = self.config.get("chatIds", default=[])
if token and ids:
self.sender = TelegramSender(
bot_token=token,
chat_ids=ids,
max_retries=self.config.get("max_retries"),
initial_delay=self.config.get("initial_delay"),
max_delay=self.config.get("max_delay"),
parse_mode=self.config.get("parse_mode")
)
logger.debug("TelegramSender initialized via Self-Healing")
else:
missing = []
if not token:
missing.append("botToken")
if not ids:
missing.append("chatIds")
logger.error(f"Telegram configuration incomplete! Missing: {', '.join(missing)}. Plugin will not send messages.")
def _has_sender(self):
r"""Check if sender is available and properly initialized"""
return hasattr(self, 'sender') and self.sender is not None
def onLoad(self):
r"""!Called by import of the plugin"""
bot_token = self.config.get("botToken")
chat_ids = self.config.get("chatIds", default=[])
if not bot_token or not chat_ids:
logger.error("botToken oder chatIds fehlen in der Konfiguration!")
return
# configurable parameters with fallback defaults
max_retries = self.config.get("max_retries")
initial_delay = self.config.get("initial_delay")
max_delay = self.config.get("max_delay")
parse_mode = self.config.get("parse_mode")
self.sender = TelegramSender(
bot_token=bot_token,
chat_ids=chat_ids,
max_retries=max_retries,
initial_delay=initial_delay,
max_delay=max_delay,
parse_mode=parse_mode
)
startup_message = self.config.get("startup_message")
if startup_message and startup_message.strip():
self.sender.send_message(startup_message)
self._ensure_sender()
if not self._has_sender():
logger.warning("Telegram plugin loaded but not configured. Messages will be discarded.")
else:
startup = self.config.get("startup_message")
if startup and startup.strip():
self.sender.send_message(startup)
def setup(self):
r"""!Called before alarm
Remove if not implemented"""
pass
r"""!Called before alarm"""
# ensure sender exists before alarms are processed
self._ensure_sender()
def fms(self, bwPacket):
r"""!Called on FMS alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
self.sender.send_message(msg)
r"""!Called on FMS alarm"""
if self._has_sender():
msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
self.sender.send_message(msg)
def pocsag(self, bwPacket):
r"""!Called on POCSAG alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
self.sender.send_message(msg)
r"""!Called on POCSAG alarm"""
if self._has_sender():
msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
self.sender.send_message(msg)
if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None:
lat, lon = bwPacket.get("lat"), bwPacket.get("lon")
logger.debug("Koordinaten gefunden sende Standort.")
self.sender.send_location(lat, lon)
if self.config.get("coordinates", default=False):
lat = bwPacket.get("lat")
lon = bwPacket.get("lon")
if lat is not None and lon is not None:
self.sender.send_location(lat, lon)
def zvei(self, bwPacket):
r"""!Called on ZVEI alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
self.sender.send_message(msg)
r"""!Called on ZVEI alarm"""
if self._has_sender():
msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
self.sender.send_message(msg)
def msg(self, bwPacket):
r"""!Called on MSG packet
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_msg"))
self.sender.send_message(msg)
r"""!Called on MSG packet"""
if self._has_sender():
msg = self.parseWildcards(self.config.get("message_msg", default="{MSG}"))
self.sender.send_message(msg)
def teardown(self):
r"""!Called after alarm
Remove if not implemented"""
r"""!Called after alarm"""
pass
def onUnload(self):
r"""!Called by destruction of the plugin
Remove if not implemented"""
pass
r"""!Called by destruction of the plugin"""
if self._has_sender():
self.sender.shutdown()
logger.info("Telegram plugin unloaded")