diff --git a/docu/docs/modul/descriptor.md b/docu/docs/modul/descriptor.md index 43444d3..8a2ec4e 100644 --- a/docu/docs/modul/descriptor.md +++ b/docu/docs/modul/descriptor.md @@ -24,12 +24,14 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md) |descrField|Name des Feldes im BW Paket in welchem die Beschreibung gespeichert werden soll|| |wildcard|Optional: Es kann für das angelegte `descrField` automatisch ein Wildcard registriert werden|None| |descriptions|Liste der Beschreibungen|| +|csvPath|Pfad der CSV-Datei (relativ zum Projektverzeichnis)|| #### `descriptions:` |Feld|Beschreibung|Default| |----|------------|-------| |for|Inhalt im `scanField` auf welchem geprüft werden soll|| |add|Beschreibungstext welcher im `descrField` hinterlegt werden soll|| +|isRegex|Muss explizit auf `true` gesetzt werden, falls RegEx verwendet wird|false| **Beispiel:** ```yaml @@ -44,6 +46,9 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md) add: FF DescriptorTest - for: '05678' # führende Nullen in '' ! add: FF TestDescription + - for: '890(1[1-9]|2[0-9])' # Regex-Pattern in '' ! + add: Feuerwehr Wache \\1 (BF) + isRegex: true - scanField: status descrField: fmsStatDescr wildcard: "{STATUSTEXT}" @@ -55,6 +60,62 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md) - ... ``` +**Wichtige Punkte für YAML-Regex:** +- Apostroph: Regex-Pattern sollten in `'` stehen, um YAML-Parsing-Probleme zu vermeiden +- isRegex-Flag: Muss explizit auf `true` gesetzt werden +- Escaping: Backslashes müssen in YAML doppelt escaped werden (`\\1` statt `\1`) +- Regex-Gruppen: Mit `\\1`, `\\2` etc. können Teile des Matches in der Beschreibung verwendet werden + +#### `csvPath:` + +**Beispiel:** +``` +- type: module + res: descriptor + config: + - scanField: tone + descrField: description + wildcard: "{DESCR}" + csvPath: "config/descriptions_tone.csv" +``` + +`csvPath` gibt den Pfad zu einer CSV-Datei an, relativ zum Projektverzeichnis (z. B. `"config/descriptions_tone.csv"`). + +Eine neue CSV-Datei (z. B. `descriptions_tone.csv`) hat folgendes Format: + +**Beispiel** +``` +for,add,isRegex +11111,KBI Landkreis Z,false +12345,FF A-Dorf,false +23456,FF B-Dorf,false +^3456[0-9]$,FF Grossdorf, true +``` + +In der Spalte isRegex kann **zusätzlich** angegeben werden, ob der Wert in for als regulärer Ausdruck interpretiert werden soll (true/false). Standardmäßig ist `false`. +Wenn `isRegex` auf `true` gesetzt ist, wird der Wert aus `for` als regulärer Ausdruck ausgewertet. + +### Kombination von YAML- und CSV-Konfiguration + +Beide Varianten können parallel genutzt werden. In diesem Fall werden zuerst die Beschreibungen aus der YAML-Konfiguration und zusätzlich die Beschreibungen aus der angegebenen CSV-Datei geladen. + +**Beispiel** + +``` +- type: module + res: descriptor + config: + - scanField: tone + descrField: description + wildcard: "{DESCR}" + descriptions: + - for: 12345 + add: FF YAML-Test + - for: '05678' # führende Nullen in '' ! + add: FF YAML-Nullen + csvPath: "config/descriptions_tone.csv" +``` + --- ## Modul Abhängigkeiten - keine @@ -70,4 +131,4 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md) --- ## Zusätzliche Wildcards -- Von der Konfiguration abhängig +- Von der Konfiguration abhängig \ No newline at end of file diff --git a/docu/docs/plugin/telegram.md b/docu/docs/plugin/telegram.md index bd82296..8c23bbd 100644 --- a/docu/docs/plugin/telegram.md +++ b/docu/docs/plugin/telegram.md @@ -22,11 +22,14 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram |----|------------|-------| |botToken|Der Api-Key des Telegram-Bots|| |chatIds|Liste mit Chat-Ids der Empfängers / der Emfänger-Gruppen|| +|startup_message|Nachricht, dass das Telegram-Plugin erfolgreich geladen wurde|leer| |message_fms|Format der Nachricht für FMS|`{FMS}`| |message_pocsag|Format der Nachricht für Pocsag|`{RIC}({SRIC})\n{MSG}`| |message_zvei|Format der Nachricht für ZVEI|`{TONE}`| |message_msg|Format der Nachricht für MSG|| -|queue|Aktivieren/Deaktivieren der MessageQueue|true| +|max_retries|Anzahl der Versuche, bis das Senden abgebrochen wird|5| +|initial_delay|Verzögerung des zweiten Sendeversuchs|2 [Sek.]| +|max_delay|Maximale Verzögerung|60 [Sek.]| **Beispiel:** ```yaml @@ -35,6 +38,7 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram res: telegram config: message_pocsag: "{RIC}({SRIC})\n{MSG}" + startup_message: "Server up and running!" botToken: "BOT_TOKEN" chatIds: - "CHAT_ID" diff --git a/module/descriptor.py b/module/descriptor.py index 132384a..b8b565c 100644 --- a/module/descriptor.py +++ b/module/descriptor.py @@ -10,11 +10,14 @@ r"""! by Bastian Schroll @file: descriptor.py -@date: 27.10.2019 +@date: 04.08.2025 @author: Bastian Schroll -@description: Module to add descriptions to bwPackets +@description: Module to add descriptions to bwPackets with CSV and Regex support """ import logging +import csv +import re +import os from module.moduleBase import ModuleBase # ###################### # @@ -26,31 +29,172 @@ logging.debug("- %s loaded", __name__) class BoswatchModule(ModuleBase): - r"""!Adds descriptions to bwPackets""" + r"""!Adds descriptions to bwPackets with CSV and Regex support""" def __init__(self, config): r"""!Do not change anything here!""" super().__init__(__name__, config) # you can access the config class on 'self.config' def onLoad(self): r"""!Called by import of the plugin""" - for descriptor in self.config: - if descriptor.get("wildcard", default=None): - self.registerWildcard(descriptor.get("wildcard"), descriptor.get("descrField")) + # Initialize unified cache for all descriptors + self.unified_cache = {} + + # Process each descriptor configuration + for descriptor_config in self.config: + scan_field = descriptor_config.get("scanField") + descr_field = descriptor_config.get("descrField") + descriptor_key = f"{scan_field}_{descr_field}" + + # Register wildcard if specified + if descriptor_config.get("wildcard", default=None): + self.registerWildcard(descriptor_config.get("wildcard"), descr_field) + + # Initialize cache for this descriptor + self.unified_cache[descriptor_key] = [] + + # Load YAML descriptions first (for backward compatibility) + yaml_descriptions = descriptor_config.get("descriptions", default=None) + if yaml_descriptions: + # yaml_descriptions is a Config object, we need to iterate properly + for desc in yaml_descriptions: + entry = { + 'for': str(desc.get("for", default="")), + 'add': desc.get("add", default=""), + 'isRegex': desc.get("isRegex", default=False) # Default: False + } + # Handle string 'true'/'false' values + if isinstance(entry['isRegex'], str): + entry['isRegex'] = entry['isRegex'].lower() == 'true' + + self.unified_cache[descriptor_key].append(entry) + logging.debug("Added YAML entry: %s -> %s", entry['for'], entry['add']) + logging.info("Loaded %d YAML descriptions for %s", len(yaml_descriptions), descriptor_key) + + # Load CSV descriptions if csvPath is specified + csv_path = descriptor_config.get("csvPath", default=None) + if csv_path: + self._load_csv_data(csv_path, descriptor_key) + + logging.info("Total entries for %s: %d", descriptor_key, len(self.unified_cache[descriptor_key])) + + def _load_csv_data(self, csv_path, descriptor_key): + r"""!Load CSV data for a descriptor and add to unified cache""" + try: + if not os.path.isfile(csv_path): + logging.error("CSV file not found: %s", csv_path) + return + + csv_count = 0 + with open(csv_path, 'r', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + # Set default values if columns are missing + entry = { + 'for': str(row.get('for', '')), + 'add': row.get('add', ''), + 'isRegex': row.get('isRegex', 'false').lower() == 'true' # Default: False + } + self.unified_cache[descriptor_key].append(entry) + csv_count += 1 + + logging.info("Loaded %d entries from CSV: %s for %s", csv_count, csv_path, descriptor_key) + + except Exception as e: + logging.error("Error loading CSV file %s: %s", csv_path, str(e)) + + def _find_description(self, descriptor_key, scan_value, bw_packet): + r"""!Find matching description for a scan value with Regex group support.""" + descriptions = self.unified_cache.get(descriptor_key, []) + scan_value_str = str(scan_value) + + # Search for matching description + for desc in descriptions: + description_text = desc.get('add', '') + match_pattern = desc.get('for', '') + is_regex = desc.get('isRegex', False) + + if is_regex: + # Regex matching + try: + match = re.search(match_pattern, scan_value_str) + if match: + # Expand regex groups (\1, \2) in the description + expanded_description = match.expand(description_text) + + # Replace standard wildcards like {TONE} + final_description = self._replace_wildcards(expanded_description, bw_packet) + + logging.debug("Regex match '%s' -> '%s' for descriptor '%s'", + match_pattern, final_description, descriptor_key) + return final_description + except re.error as e: + logging.error("Invalid regex pattern '%s': %s", match_pattern, str(e)) + continue + else: + # Exact match + if match_pattern == scan_value_str: + # Replace standard wildcards like {TONE} + final_description = self._replace_wildcards(description_text, bw_packet) + logging.debug("Exact match '%s' -> '%s' for descriptor '%s'", + match_pattern, final_description, descriptor_key) + return final_description + + return None + + def _replace_wildcards(self, text, bw_packet): + r"""!Replace all available wildcards in description text dynamically.""" + if not text or '{' not in text: + return text + + result = text + + # Search for wildcards in the format {KEY} and replace them with values from the bw_packet + found_wildcards = re.findall(r"\{([A-Z0-9_]+)\}", result) + + for key in found_wildcards: + key_lower = key.lower() + value = bw_packet.get(key_lower) + + if value is not None: + result = result.replace(f"{{{key}}}", str(value)) + logging.debug("Replaced wildcard {%s} with value '%s'", key, value) + + return result def doWork(self, bwPacket): r"""!start an run of the module. @param bwPacket: A BOSWatch packet instance""" - for descriptor in self.config: - if not bwPacket.get(descriptor.get("scanField")): - break # scanField is not available in this packet - bwPacket.set(descriptor.get("descrField"), bwPacket.get(descriptor.get("scanField"))) - for description in descriptor.get("descriptions"): - if str(description.get("for")) == bwPacket.get(descriptor.get("scanField")): - logging.debug("Description '%s' added in packet field '%s'", - description.get("add"), descriptor.get("descrField")) - bwPacket.set(descriptor.get("descrField"), description.get("add")) - break # this descriptor has found a description - run next descriptor + logging.debug("Processing packet with mode: %s", bwPacket.get("mode")) + + # Process each descriptor configuration + for descriptor_config in self.config: + scan_field = descriptor_config.get("scanField") + descr_field = descriptor_config.get("descrField") + descriptor_key = f"{scan_field}_{descr_field}" + + logging.debug("Processing descriptor: scanField='%s', descrField='%s'", scan_field, descr_field) + + # Check if scanField is present in packet + scan_value = bwPacket.get(scan_field) + if scan_value is None: + logging.debug("scanField '%s' not found in packet, skipping", scan_field) + continue # scanField not available in this packet - try next descriptor + + # Set default value (content of scanField) + bwPacket.set(descr_field, str(scan_value)) + logging.debug("Set default value '%s' for field '%s'", scan_value, descr_field) + + # Search for matching description in unified cache + description = self._find_description(descriptor_key, scan_value, bwPacket) + + if description: + bwPacket.set(descr_field, description) + logging.info("Description set: '%s' -> '%s'", scan_value, description) + else: + logging.debug("No description found for value '%s' in field '%s'", scan_value, scan_field) + + logging.debug("Returning modified packet") return bwPacket def onUnload(self): diff --git a/plugin/telegram.py b/plugin/telegram.py index 80cd5dc..a73a222 100644 --- a/plugin/telegram.py +++ b/plugin/telegram.py @@ -10,126 +10,202 @@ r"""! by Bastian Schroll @file: telegram.py -@date: 20.02.2020 -@author: Jan Speller -@description: Telegram Plugin +@date: 12.07.2025 +@author: Claus Schichl nach der Idee von Jan Speller +@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten """ + import logging +import time +import threading +import queue +import requests from plugin.pluginBase import PluginBase -# ###################### # -# Custom plugin includes # -from telegram.error import (TelegramError, Unauthorized, BadRequest, TimedOut, NetworkError) -from telegram.ext import messagequeue as mq -from telegram.utils.request import Request -import telegram.bot -# ###################### # - -logging.debug("- %s loaded", __name__) +# Setup Logging +logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) -class MQBot(telegram.bot.Bot): - '''A subclass of Bot which delegates send method handling to MQ''' +# =========================== +# TelegramSender-Klasse +# =========================== - def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs): - super(MQBot, self).__init__(*args, **kwargs) - # below 2 attributes should be provided for decorator usage - self._is_messages_queued_default = is_queued_def - self._msg_queue = mqueue or mq.MessageQueue() +class TelegramSender: + def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None): + self.bot_token = bot_token + self.chat_ids = chat_ids + self.max_retries = max_retries if max_retries is not None else 5 + self.initial_delay = initial_delay if initial_delay is not None else 2 + self.max_delay = max_delay if max_delay is not None else 300 + self.msg_queue = queue.Queue() + self._worker = threading.Thread(target=self._worker_loop, daemon=True) + self._worker.start() + + def send_message(self, text): + for chat_id in self.chat_ids: + self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0 + + def send_location(self, latitude, longitude): + for chat_id in self.chat_ids: + self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0)) + + def _worker_loop(self): + delay = self.initial_delay + + while True: + try: + msg_type, chat_id, content, retry_count = self.msg_queue.get() + + success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content) + + if success: + delay = self.initial_delay + + elif permanent_failure: + logger.error("Permanenter Fehler – Nachricht wird verworfen.") + + elif retry_count >= self.max_retries: + logger.error("Maximale Wiederholungsanzahl erreicht – Nachricht wird verworfen.") + + else: + logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).") + self.msg_queue.put((msg_type, chat_id, content, retry_count + 1)) + + # Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden + wait_time = custom_delay if custom_delay is not None else delay + time.sleep(wait_time) + + # Erhöhe Delay für den nächsten Versuch (exponentielles Backoff) + delay = min(delay * 2, self.max_delay) + + except Exception as e: + logger.exception(f"Fehler im Telegram-Worker: {e}") + time.sleep(5) + + def _send_to_telegram(self, msg_type, chat_id, content): + if msg_type == "text": + url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" + payload = { + 'chat_id': chat_id, + 'text': content + } + elif msg_type == "location": + url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation" + payload = { + 'chat_id': chat_id, + **content + } + else: + logger.error("Unbekannter Nachrichtentyp.") + return False, True, None # Unbekannter Typ = permanent falsch - def __del__(self): try: - self._msg_queue.stop() - except: - pass + custom_delay = None # Standardwert für Rückgabe, außer bei 429 - @mq.queuedmessage - def send_message(self, *args, **kwargs): - '''Wrapped method would accept new `queued` and `isgroup` - OPTIONAL arguments''' - return super(MQBot, self).send_message(*args, **kwargs) + response = requests.post(url, data=payload, timeout=10) + + if response.status_code == 429: + custom_delay = response.json().get("parameters", {}).get("retry_after", 5) + logger.warning(f"Rate Limit erreicht – warte {custom_delay} Sekunden.") + return False, False, custom_delay # Telegram gibt genaue Wartezeit vor + + if response.status_code == 400: + logger.error("Ungültige Parameter – Nachricht wird nicht erneut gesendet.") + return False, True, custom_delay # Permanent fehlerhaft + + if response.status_code == 401: + logger.critical("Ungültiger Bot-Token – bitte prüfen!") + return False, True, custom_delay # Permanent fehlerhaft + + response.raise_for_status() + logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}") + return True, False, custom_delay + + except requests.RequestException as e: + logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}") + return False, False, custom_delay + + +# =========================== +# BoswatchPlugin-Klasse +# =========================== class BoswatchPlugin(PluginBase): - r"""!Description of the Plugin""" - def __init__(self, config): r"""!Do not change anything here!""" super().__init__(__name__, config) # you can access the config class on 'self.config' def onLoad(self): r"""!Called by import of the plugin""" - if self.config.get("queue", default=True): - q = mq.MessageQueue() - request = Request(con_pool_size=8) - self.bot = MQBot(token=self.config.get("botToken", default=""), request=request, mqueue=q) - print('queue') - else: - self.bot = telegram.Bot(token=self.config.get("botToken")) - print('normal') + bot_token = self.config.get("botToken") + chat_ids = self.config.get("chatIds", default=[]) + + if not bot_token or not chat_ids: + logger.error("botToken oder chatIds fehlen in der Konfiguration!") + return + + # Konfigurierbare Parameter mit Fallback-Defaults + max_retries = self.config.get("max_retries") + initial_delay = self.config.get("initial_delay") + max_delay = self.config.get("max_delay") + + self.sender = TelegramSender( + bot_token=bot_token, + chat_ids=chat_ids, + max_retries=max_retries, + initial_delay=initial_delay, + max_delay=max_delay + ) + + startup_message = self.config.get("startup_message") + if startup_message and startup_message.strip(): + self.sender.send_message(startup_message) + + def setup(self): + r"""!Called before alarm + Remove if not implemented""" + pass def fms(self, bwPacket): r"""!Called on FMS alarm - @param bwPacket: bwPacket instance""" msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}")) - self._sendMessage(msg) + self.sender.send_message(msg) def pocsag(self, bwPacket): r"""!Called on POCSAG alarm - @param bwPacket: bwPacket instance""" msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}")) - self._sendMessage(msg) + self.sender.send_message(msg) if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None: - logging.debug("Found coordinates in packet") - (lat, lon) = (bwPacket.get("lat"), bwPacket.get("lon")) - self._sendLocation(lat, lon) + lat, lon = bwPacket.get("lat"), bwPacket.get("lon") + logger.debug("Koordinaten gefunden – sende Standort.") + self.sender.send_location(lat, lon) def zvei(self, bwPacket): r"""!Called on ZVEI alarm - @param bwPacket: bwPacket instance""" msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}")) - self._sendMessage(msg) + self.sender.send_message(msg) def msg(self, bwPacket): r"""!Called on MSG packet - @param bwPacket: bwPacket instance""" msg = self.parseWildcards(self.config.get("message_msg")) - self._sendMessage(msg) + self.sender.send_message(msg) - def _sendMessage(self, message): - for chatId in self.config.get("chatIds", default=[]): - try: - # Send Message via Telegram - logging.info("Sending message to " + chatId) - self.bot.send_message(chat_id=chatId, text=message) + def teardown(self): + r"""!Called after alarm + Remove if not implemented""" + pass - except Unauthorized: - logging.exception("Error while sending Telegram Message, please Check your api-key") - except (TimedOut, NetworkError): - logging.exception("Error while sending Telegram Message, please Check your connectivity") - except (BadRequest, TelegramError): - logging.exception("Error while sending Telegram Message") - except Exception as e: - logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e)) - - def _sendLocation(self, lat, lon): - for chatId in self.config.get("chatIds", default=[]): - try: - # Send Location via Telegram - if lat is not None and lon is not None: - logging.info("Sending location to " + chatId) - self.bot.sendLocation(chat_id=chatId, latitude=lat, longitude=lon) - - except Unauthorized: - logging.exception("Error while sending Telegram Message, please Check your api-key") - except (TimedOut, NetworkError): - logging.exception("Error while sending Telegram Message, please Check your connectivity") - except (BadRequest, TelegramError): - logging.exception("Error while sending Telegram Message") - except Exception as e: - logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e)) + def onUnload(self): + r"""!Called by destruction of the plugin + Remove if not implemented""" + pass