From 0b9387af08298d4b38855f8d635ac48bfac4c1e5 Mon Sep 17 00:00:00 2001 From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:25:28 +0100 Subject: [PATCH] [feat/multicast]: add multi-instance multicast module with active trigger system Introduce a robust multicast processing module for POCSAG that correlates empty tone-RICs (recipients) with subsequent text-RICs (content). Key Features: - Four Output Modes: Internally supports 'complete', 'incomplete', 'single', and 'control'. Functional alarms are delivered as the first three, while technical 'control' packets (Delimiters/NetIdent) are filtered by default. - Active Trigger System: Implements a loss-free deferred delivery mechanism using a loopback socket (TCP) to re-inject wakeup packets, flushing the internal queue during auto-clear timeouts. - Shared State & Multi-Instance: State is shared across instances but separated by frequency to prevent crosstalk in multi-frequency setups. - Data Aggregation: Automatically generates '{FIELD}_list' wildcards (e.g., RIC_LIST, DESCRIPTION_LIST) for all collected recipients, enabling consolidated notifications in downstream plugins. - Dynamic Filtering: Automatically blocks Delimiter and NetIdent RICs from reaching subsequent plugins if they are defined in the configuration. Infrastructural Changes: - ModuleBase: Expanded return semantics to support: * False: Explicitly blocks/drops a packet. * List: Allows a module to expand one input into multiple output packets. - PluginBase: Updated to handle lists of packets, ensuring a full setup->alarm->teardown lifecycle for every individual element. --- docu/docs/modul/multicast.md | 394 +++++++++++++++++++++ docu/mkdocs.yml | 3 +- module/moduleBase.py | 26 ++ module/multicast.py | 638 +++++++++++++++++++++++++++++++++++ plugin/pluginBase.py | 9 + 5 files changed, 1069 insertions(+), 1 deletion(-) create mode 100644 docu/docs/modul/multicast.md create mode 100644 module/multicast.py diff --git a/docu/docs/modul/multicast.md b/docu/docs/modul/multicast.md new file mode 100644 index 0000000..def5284 --- /dev/null +++ b/docu/docs/modul/multicast.md @@ -0,0 +1,394 @@ +#
Multicast
+--- + +## Beschreibung +Mit diesem Modul können Multicast-Alarme verarbeitet werden. Dabei wird eine Alarmnachricht automatisch an mehrere Empfänger (RICs) verteilt. + +### Funktionsweise +Multicast-Alarme funktionieren in zwei bis drei Phasen: + +1. **Delimiter-Phase (Optional)**: Ein spezieller Delimiter-RIC markiert den Start eines neuen Multicast-Blocks und löscht vorherige wartende Tone-RICs. Der Delimiter selbst wird nicht als Alarm ausgegeben (automatische Filterung). Diese Phase ist optional - ohne Delimiter werden alle leeren Nachrichten als Tone-RICs behandelt. + +2. **Tone-RIC-Phase**: Mehrere RICs empfangen (meist) leere Nachrichten. Diese definieren die Empfänger und "registrieren" sich im Modul als Empfänger für die nächste Multicast-Nachricht. + +3. **Text-RIC**: Ein spezieller Message-RIC empfängt die eigentliche Alarmnachricht. Diese wird dann automatisch an alle zuvor gesammelten Tone-RICs verteilt. + +**Beispiel:** +``` +10:31:16 - RIC: 0123456 SubRIC: 1 Message: (leer) → Delimiter-RIC +10:31:16 - RIC: 0234567 SubRIC: 4 Message: (leer) → Empfänger 1 +10:31:16 - RIC: 0345678 SubRIC: 3 Message: (leer) → Empfänger 2 +10:31:17 - RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" → Message-RIC + +Generierte Alarme: +→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" +→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" +``` + +**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC, da diese oft unterschiedliche Alarmtypen oder Prioritäten repräsentiert. + +Das Modul unterstützt: + +- Mehrere Startmarker (Delimiter) +- Mehrere Text-RICs +- Netzident-RIC zur Paketfilterung +- Automatische Bereinigung alter Tone-RICs (Fehlerfall: Auto-Clear) +- Active Trigger System zur verlustfreien Paketauslieferung +- Wildcards für spätere Weiterverarbeitung +- Frequenz-basierte Trennung +- Multi-Instanz-Betrieb mit geteiltem Zustand + +Hinweis: Der Delimiter-RIC (0123456) wird automatisch gefiltert und nicht ausgegeben. + +**Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Falls keine Tone-RICs vorhanden sind, wird die Text-RIC als `multicastMode: single` ausgegeben. + +## Unterstützte Alarmtypen +- POCSAG + +## Resource +`multicast` + +## Konfiguration + +|Feld|Beschreibung|Default| +|----|------------|-------| +|autoClearTimeout|Auto-Clear Timeout in Sekunden - Nicht zugestellte Empfänger werden nach dieser Zeit als incomplete ausgegeben|10| +|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden automatisch gefiltert wenn konfiguriert)|leer| +|textRics|Komma-getrennte Liste von RICs, die den Alarmtext tragen|leer| +|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden automatisch gefiltert wenn konfiguriert)|leer| +|triggerRic|RIC für das Wakeup-Trigger-Paket (optional, bei leer: dynamisch = erste Tone-RIC)|leer| +|triggerHost|IP-Adresse für Loopback-Trigger|127.0.0.1| +|triggerPort|Port für Loopback-Trigger|8080| + +**Achtung:** Zahlen welche führende Nullen enthalten müssen in Anführungszeichen gesetzt werden, z.B. `'0012345'` + +### Konfigurationsbeispiel 1: Automatische Delimiter-Erkennung (oder nicht verfügbar im Netzwerk) (= Minimalkonfiguration) +```yaml +- type: module + res: multicast + name: Multicast Handler + config: + textRics: '0299001,0310001' +``` +In diesem Modus werden **alle leeren Nachrichten** als toneRics behandelt (keine `delimiterRics` angegeben). + +### Konfigurationsbeispiel 2: Mit Delimiter-Trenner (empfohlen) +```yaml +- type: module + res: multicast + name: Multicast Handler + config: + autoClearTimeout: 10 + delimiterRics: '0988988' + textRics: '0299001,0310001' +``` +In diesem Modus wird **0988988 als Trenner (= Delimiter)** behandelt und **alle anderen leeren Nachrichten als Empfänger**. + +### Konfigurationsbeispiel 3: Mit expliziter Trigger-RIC +```yaml +- type: module + res: multicast + name: Multicast Handler + config: + autoClearTimeout: 10 + delimiterRics: '0988988' + textRics: '0299001,0310001' + triggerRic: '9999999' + triggerHost: '127.0.0.1' + triggerPort: 8080 +``` +Verwendet eine feste RIC (9999999) für das interne Wakeup-Trigger-Paket. + +### Konfigurationsbeispiel 4: Mit Netzident-Filterung +```yaml +- type: module + res: multicast + name: Multicast Handler + config: + autoClearTimeout: 10 + delimiterRics: '0988988' + textRics: '0299001,0310001' + netIdentRics: '0000001' +``` +Filtert Netzident-Pakete (RIC 0000001) automatisch aus der Weiterverarbeitung. + +## Integration in Router-Konfiguration + +Das Multicast-Modul sollte **vor** den Plugins platziert werden, damit die generierten Alarme korrekt verarbeitet werden: + +```yaml +- name: Router POCSAG + route: + - type: module + res: filter.modeFilter + name: Filter POCSAG + config: + allowed: + - pocsag + + # Multicast-Modul hier einfügen + - type: module + res: multicast + name: Multicast Handler + config: + textRics: '0299001,0310001' + delimiterRics: '0288088' + autoClearTimeout: 10 + + # Weitere Module und Plugins + - type: plugin + res: mysql + config: + # ... + + - type: plugin + res: telegram + config: + # ... +``` + +## Beispielhafte Verwendung in Router-Konfigurationen +Das Multicast-Modul gibt für jedes RIC ein eigenes Paket aus UND generiert für konsistente Verarbeitung Listenfelder. +Dies ermöglicht es, entweder jede RIC einzeln zu verarbeiten oder die Listenfelder für eine gesammelte Ausgabe zu verwenden. Vor der weiteren Verarbeitung in Plugins empfiehlt sich eventuell eine Filterung mittels [RegEx-Filter](regex_filter.md). +Die folgenden Beispiele dienen zur Veranschaulichung der Möglichkeiten des Multicast-Modul in Verbindung mit RegEx-Filter. + + +### Beispiel (Zusätzliche Wildcards werden noch später in diesem Readme erklärt): +```yaml +router: +- name: Router POCSAG + route: + - type: module + res: filter.modeFilter + config: + [...] + - type: module + res: filter.doubleFilter + config: + [...] + - type: module + res: descriptor + config: + [...] + - type: module + res: multicast + name: Multicast + config: + autoClearTimeout: 10 + delimiterRics: '0123456' # Start eines Multicast-Alarms + textRics: '9909909' # Text-RIC + - type: router + res: RouterMySQL + - type: router + res: RouterTelegram + +- name: RouterMySQL + route: + - type: module + res: filter.regexFilter + name: Filter MySQL + config: + - name: "Multicast Mode complete or single" + checks: + - field: multicastMode + regex: ^(complete|single)$ + - type: plugin + res: mysql + config: + [...] + +- name: RouterTelegram + route: + - type: module + res: filter.regexFilter + name: Multicast Recipient Index Filter # 1. Paket, da ist alles drin für einen kombinierten Alarm und ist immer vorhanden + config: + - name: "Multicast 1 Paket pro Alarm-Paket" + checks: + - field: multicastRecipientIndex + regex: ^1$ + - type: plugin + res: telegram + config: + message_pocsag: | + {CNAME} + {MSG} + Alarmierte Einheiten [{MCAST_COUNT}]: {DESCRIPTION_LIST} + RICs: {RIC_LIST} + {TIME} + [...] + +``` + +--- +## Modul Abhängigkeiten +- keine + +--- +## Externe Abhängigkeiten +- keine + +--- +## Paket Modifikationen + +### Hinzugefügte Felder bei Multicast-Alarmen: +- `multicastMode`(string): Beschreibt das Ergebnis der Multicast-Verarbeitung, besitzt einen der Werte: + + - `complete`: Vollständiges Multicast-Packet + - `incomplete`: Unvollständiges Multicast-Packet (meist fehlt die Text-RIC) + - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC) + - `control`: Netzwerk-Ident-RIC oder andere Verwaltung-RICs + +- `multicastRole` (string): Beschreibt die Rolle dieses Pakets innerhalb des Multicast-Ablaufs, besitzt einen der Werte: + + - `delimiter`: Startmarker-Paket (wird automatisch gefiltert wenn delimiterRics konfiguriert) + - `recipient`: tatsächlicher Empfänger + - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC) + - `netident`: Netzwerk-Identifikations-Paket (wird automatisch gefiltert wenn netIdentRics konfiguriert) + +- `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs +- `multicastRecipientCount` (string): Anzahl der Empfänger insgesamt +- `multicastRecipientIndex` (string): Index dieses Empfängers (1-N), folgende Logik: + + - Empfänger haben den Index 1 bis n. + - Delimiter/Singles haben Index 1 (da sie alleinstehen). + +- `_list` (string): Liste von Werten aus allen Empfänger-RICs für jedes Originalfeld (z.B. ric_list, message_list) + +### Veränderte Felder bei Multicast-Alarmen: +- `ric`: Wird durch Empfänger-RIC ersetzt +- `subric`: Wird durch Empfänger-SubRIC ersetzt +- `subricText`: Wird durch Empfänger-SubRIC-Text ersetzt +- `message`: Bei incomplete-Modus leer, sonst Text von Text-RIC + +### Rückgabewerte: +- **False**: Paket wurde blockiert (z.B. Delimiter/Netident-Filterung), Router stoppt Verarbeitung +- **Liste von Paketen**: Multicast-Verteilung, Router verarbeitet jedes Paket einzeln +- **None**: Normaler Alarm, Router fährt mit unveränderten Paket fort + +--- +## Zusätzliche Wildcards + +Folgende Wildcards stehen in allen nachfolgenden Plugins zur Verfügung: + +|Wildcard|Beschreibung|Beispiel| +|--------|------------|--------| +|{MCAST_SOURCE}|RIC des ursprünglichen Message-RICs|0299001| +|{MCAST_COUNT}|Gesamtanzahl der Empfänger dieses Multicasts.|3| +|{MCAST_INDEX}|Index des Empfängers (1-basiert für Recipients, 0 für Control-Pakete)|0, 1, 2, 3, ...| +|{MCAST_MODE}|Art der Multicast-Verarbeitung durch das Modul|complete, incomplete, single, control| +|{MCAST_ROLE}|Rolle des Pakets im Multicast-Ablauf|recipient, single, delimiter, netident| + +## Erweiterung der Listen-Wildcards +Das Modul generiert Wildcards für alle gesammelten Felder (RICs, SubRICs, etc.) in Listenform. Diese sind besonders nützlich, um eine kombinierte Ausgabe (z.B. in Telegram) zu erstellen: + +|Wildcard|Beschreibung|Zugrundeliegendes Feld|Beispiel| +|--------|------------|--------|--------| +|{RIC_LIST}|Liste aller RICs der Empfänger (durch Komma getrennt).|ric_list|"0299001, 0299002"| +|{SUBRIC_LIST}|Liste aller SubRICs der Empfänger|subric_list|"4, 3"| +|{DESCRIPTION_LIST}|Liste aller (deskriptiven) Namen der Empfänger.|description_list|"FF Musterstadt, BF Beispiel"| +|{_LIST}|Liste der Werte für jedes Originalfeld aus dem Paket|_list|{FREQUENCY_LIST}, {BITRATE_LIST}| + +**Wichtig:** Verwende die **originalen Feldnamen** (z.B. `frequency_list`), nicht die Wildcard-Namen (z.B. ~~`FREQ_list`~~). + +### Verwendungsbeispiel in Plugins, z.B. Telegram-Plugin: +```yaml +- type: plugin + res: telegram + config: + message_pocsag: | + {CNAME} + {MSG} + RIC: {RIC} / SubRIC: {SRIC} + Multicast: {MCAST_INDEX}/{MCAST_COUNT} (Quelle: {MCAST_SOURCE}) + {TIME} +``` + +--- +## Funktionsweise im Detail + +### Active Trigger System (Verlustfreie Paketauslieferung) + +Das Modul verwendet ein aktives Trigger-System, um sicherzustellen, dass **keine Multicast-Pakete verloren gehen**: + +1. **Deferred Delivery**: Bei einem Auto-Clear-Timeout werden die incomplete-Pakete nicht sofort ausgegeben, sondern in einer internen Queue gespeichert. + +2. **Wakeup-Trigger**: Das Modul sendet ein spezielles Trigger-Paket via Loopback-Socket (Standard: 127.0.0.1:8080) zurück an den BOSWatch-Server. + +3. **Queue-Flush**: Beim Empfang des Trigger-Pakets werden alle gespeicherten Pakete aus der Queue ausgegeben. + +**Trigger-RIC Auswahl** (3-stufige Fallback-Kette): +- **Explizit**: Wenn `triggerRic` konfiguriert ist, wird diese RIC verwendet +- **Dynamisch**: Wenn nicht konfiguriert, wird die erste Tone-RIC der aktuellen Gruppe verwendet +- **Fallback**: Falls keine Tone-RICs vorhanden sind (sollte nicht vorkommen), wird "9999999" verwendet + +**Beispiel-Ablauf bei Auto-Clear:** +``` +10:31:16 - Tone-RIC empfangen (0234567) +10:31:16 - Tone-RIC empfangen (0345678) +10:31:26 - Auto-Clear-Timeout (10s) erreicht + → Incomplete-Pakete in Queue gespeichert + → Trigger-Paket via Loopback gesendet (RIC: 0234567) +10:31:26 - Trigger-Paket durch BOSWatch-Server verarbeitet + → Queue-Flush: Incomplete-Pakete werden ausgegeben +``` + +**Vorteile:** +- Keine Pakete gehen verloren, auch bei hoher Systemlast +- Saubere Trennung von Verarbeitung und Ausgabe +- Ermöglicht zeitlich versetzte Ausgabe ohne Race Conditions + +### Zeitbasierte Verarbeitung + +1. **Tone-RIC-Sammlung**: Tone-RICs (meist leere Nachrichten) werden empfangen und gespeichert +2. **Auto-Clear**: Nach `autoClearTimeout` Sekunden ohne Text-RIC werden die Tone-RICs als incomplete ausgegeben (via Trigger-System) +3. **Text-RIC-Verteilung**: Sobald ein Text-RIC empfangen wird, erfolgt die sofortige Verteilung an alle gesammelten Tone-RICs +4. **Hard-Timeout-Cleanup**: Nach 3x `autoClearTimeout` (oder max. 120s) werden veraltete Pakete aus dem Speicher gelöscht (Failsafe) + +### Frequenz-Trennung + +Das Modul trennt Multicast-Listen nach Frequenzen. Dies verhindert Vermischung von Alarmen verschiedener Sender. + +**Beispiel:** +``` +Frequenz 173.050 MHz: Tone-RICs [0234567, 0345678] +Frequenz 173.075 MHz: Tone-RICs [0456789, 0567890] +→ Werden getrennt verarbeitet, keine Vermischung möglich +``` + +### SubRIC-Erhaltung + +**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC aus der Tone-RIC-Phase. Dies ist entscheidend, da SubRICs unterschiedliche Bedeutungen haben können, z.B.: + +- SubRIC 1 (a): Alarmierung +- SubRIC 2 (b): Information +- SubRIC 3 (c): Probealarm +- SubRIC 4 (d): Sirenenalarm + +**Beispiel:** +``` +Eingehende Tone-RICs: +- RIC: 0234567 SubRIC: 4 (Sirenenalarm) +- RIC: 0345678 SubRIC: 3 (Probealarm) + +Text-RIC: RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" + +Ausgegebene Multicast-Pakete: +→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" (behält SubRIC 4!) +→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!) +``` + +### Automatische Filterung + +- **Delimiter-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `delimiterRics` konfiguriert ist +- **Netzident-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `netIdentRics` konfiguriert ist +- **Filterung nach multicastRole**: Ermöglicht saubere nachgelagerte Verarbeitung ohne manuelle Filter + +### Multi-Instanz-Betrieb + +Das Modul unterstützt mehrere parallele Instanzen mit geteiltem Zustand: + +- **Shared State**: Alle Instanzen teilen sich den Tone-RIC-Speicher (frequenz-basiert) +- **Instance-Specific Timeouts**: Jede Instanz kann eigene `autoClearTimeout`-Werte haben +- **Global Cleanup Thread**: Ein globaler Thread prüft alle Instanzen auf Timeouts +- **Thread-Safe**: Alle Operationen sind thread-sicher mit Locks geschützt \ No newline at end of file diff --git a/docu/mkdocs.yml b/docu/mkdocs.yml index c2602dd..73d112e 100644 --- a/docu/mkdocs.yml +++ b/docu/mkdocs.yml @@ -20,10 +20,11 @@ nav: - Changelog: changelog.md - Module: - Descriptor: modul/descriptor.md + - Double Filter: modul/double_filter.md - Geocoding: modul/geocoding.md - Mode Filter: modul/mode_filter.md + - Multicast: modul/multicast.md - Regex Filter: modul/regex_filter.md - - Double Filter: modul/double_filter.md - Plugins: - Http: plugin/http.md - Telegram: plugin/telegram.md diff --git a/module/moduleBase.py b/module/moduleBase.py index 1ce8ccd..568a24a 100644 --- a/module/moduleBase.py +++ b/module/moduleBase.py @@ -56,6 +56,32 @@ class ModuleBase(ABC): @param bwPacket: A BOSWatch packet instance @return bwPacket or False""" + + # --- FIX: Multicast list support for Module --- + if isinstance(bwPacket, list): + result_packets = [] + for single_packet in bwPacket: + # Recursive call for single packet + processed = self._run(single_packet) + + # new logic: + if processed is False: + # filter called 'False' -> packet discarded + continue + elif processed is None: + # module returned None -> keep packet unchanged + result_packets.append(single_packet) + elif isinstance(processed, list): + # module returned new list -> extend + result_packets.extend(processed) + else: + # module returned modified packet -> add + result_packets.append(processed) + + # if list is not empty, return it. else False (filter all). + return result_packets if result_packets else False + # ----------------------------------------------- + self._runCount += 1 logging.debug("[%s] run #%d", self._moduleName, self._runCount) diff --git a/module/multicast.py b/module/multicast.py new file mode 100644 index 0000000..d8ae05e --- /dev/null +++ b/module/multicast.py @@ -0,0 +1,638 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +r"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: multicast.py +@date: 26.01.2025 +@author: Claus Schichl +@description: multicast module +""" + +import logging +import time +import threading +import socket +import json +import datetime +from collections import defaultdict +from module.moduleBase import ModuleBase +from boswatch.packet import Packet + +logging.debug("- %s loaded", __name__) + + +class BoswatchModule(ModuleBase): + r"""!Multicast module with multi-instance support and active trigger mechanism + + This module handles multicast alarm distribution. + It manages the correlation between tone-RICs (recipients) and text-RICs (message content), + ensuring reliable alarm delivery even in complex multi-frequency scenarios. + """ + + # CLASS VARIABLES - SHARED STATE + _tone_ric_packets = defaultdict(list) + _last_tone_ric_time = defaultdict(float) + _processing_text_ric = defaultdict(bool) + _processing_text_ric_started = defaultdict(float) + + # SYSTEM VARIABLES + _lock = threading.Lock() + _cleanup_thread = None + _running = False + _wildcards_registered = set() + _packet_queue = [] + _queue_lock = threading.Lock() + _instances = [] + + # Trigger defaults + _TRIGGER_HOST = "127.0.0.1" + _TRIGGER_PORT = 8080 + _MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###" + _DEFAULT_TRIGGER_RIC = "9999999" + +# ============================================================ +# LIFECYCLE METHODS +# ============================================================ + + def __init__(self, config): + super().__init__(__name__, config) + + def onLoad(self): + r"""!Initialize module configuration and start the global cleanup thread. + + @param None + @return None""" + self._my_frequencies = set() + self.instance_id = hex(id(self))[-4:] + self.name = f"MCAST_{self.instance_id}" + + self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10)) + self._hard_timeout = self._auto_clear_timeout * 3 + + def parse_list(key): + val = self.config.get(key) + if val: + return [x.strip() for x in str(val).split(",") if x.strip()] + return [] + + self._delimiter_rics = parse_list("delimiterRics") + self._text_rics = parse_list("textRics") + self._netident_rics = parse_list("netIdentRics") + + trigger_ric_cfg = self.config.get("triggerRic") + if trigger_ric_cfg: + self._trigger_ric = str(trigger_ric_cfg).strip() + self._trigger_ric_mode = "explicit" + else: + self._trigger_ric = None + self._trigger_ric_mode = "dynamic" + + self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST) + self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT)) + + self._block_delimiter = bool(self._delimiter_rics) + self._block_netident = bool(self._netident_rics) + + logging.info("[%s] Multicast module loaded", self.name) + + with BoswatchModule._lock: + if self not in BoswatchModule._instances: + BoswatchModule._instances.append(self) + + if not BoswatchModule._running: + BoswatchModule._running = True + BoswatchModule._cleanup_thread = threading.Thread( + target=BoswatchModule._global_cleanup_worker, daemon=True + ) + BoswatchModule._cleanup_thread.start() + logging.info("Global multicast cleanup thread started") + +# ============================================================ +# MAIN PROCESSING +# ============================================================ + + def doWork(self, bwPacket): + r"""!Process an incoming packet and handle multicast logic. + + @param bwPacket: A BOSWatch packet instance or list of packets + @return bwPacket, a list of packets, or False if blocked""" + if isinstance(bwPacket, list): + result_packets = [] + for single_packet in bwPacket: + processed = self.doWork(single_packet) + if processed is None: + result_packets.append(single_packet) + elif processed is not False: + if isinstance(processed, list): + result_packets.extend(processed) + else: + result_packets.append(processed) + return result_packets if result_packets else None + + packet_dict = self._get_packet_data(bwPacket) + msg = packet_dict.get("message") + ric = packet_dict.get("ric") + freq = packet_dict.get("frequency", "default") + mode = packet_dict.get("mode") + + if msg == BoswatchModule._MAGIC_WAKEUP_MSG: + if self._trigger_ric and ric != self._trigger_ric: + pass + else: + logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric) + queued = self._get_queued_packets() + return queued if queued else False + + if mode != "pocsag": + queued = self._get_queued_packets() + return queued if queued else None + + self._my_frequencies.add(freq) + + is_text_ric = False + if self._text_rics: + is_text_ric = ric in self._text_rics and msg and msg.strip() + else: + with BoswatchModule._lock: + is_text_ric = msg and msg.strip() and len(BoswatchModule._tone_ric_packets[freq]) > 0 + + if is_text_ric: + with BoswatchModule._lock: + BoswatchModule._processing_text_ric[freq] = True + BoswatchModule._processing_text_ric_started[freq] = time.time() + + queued_packets = self._get_queued_packets() + incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq) + + if self._netident_rics and ric in self._netident_rics: + self._set_mcast_metadata(bwPacket, "control", "netident", ric) + result = self._combine_results(incomplete_packets, queued_packets, [bwPacket]) + return self._filter_output(result) + + if self._delimiter_rics and ric in self._delimiter_rics: + delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket) + result = self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets) + return self._filter_output(result) + + if not msg or not msg.strip(): + self._add_tone_ric_packet(freq, packet_dict) + result = self._combine_results(incomplete_packets, queued_packets, False) + return self._filter_output(result) + + if is_text_ric and msg: + logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric) + alarm_packets = self._distribute_complete(freq, packet_dict) + with BoswatchModule._lock: + BoswatchModule._processing_text_ric[freq] = False + BoswatchModule._processing_text_ric_started.pop(freq, None) + + if not alarm_packets: + logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric) + normal = self._enrich_normal_alarm(bwPacket, packet_dict) + result = self._combine_results(normal, incomplete_packets, queued_packets) + else: + result = self._combine_results(alarm_packets, incomplete_packets, queued_packets) + return self._filter_output(result) + + if msg: + normal = self._enrich_normal_alarm(bwPacket, packet_dict) + result = self._combine_results(normal, incomplete_packets, queued_packets) + return self._filter_output(result) + + result = self._combine_results(incomplete_packets, queued_packets) + return self._filter_output(result) + +# ============================================================ +# PACKET PROCESSING HELPERS (called by doWork) +# ============================================================ + + def _get_packet_data(self, bwPacket): + r"""!Safely extract all fields from packet as a dictionary. + + Handles both dict objects and Packet instances. + Dynamically extracts all fields including those added by other modules. + + @param bwPacket: Packet instance or dict + @return dict: Complete dictionary of all packet fields""" + # 1. Fall: Es ist bereits ein Dictionary + if isinstance(bwPacket, dict): + return bwPacket.copy() + + # 2. Fall: Es ist ein Packet-Objekt (Daten liegen in _packet) + if hasattr(bwPacket, '_packet'): + return bwPacket._packet.copy() + + # 3. Fallback: Falls es ein anderes Objekt ist, versuche __dict__ ohne '_' Filter für 'packet' + try: + return {k: v for k, v in bwPacket.__dict__.items() if not k.startswith('_')} + except Exception as e: + logging.warning("[%s] Error: %s", self.name, e) + return {} + + def _filter_output(self, result): + r"""!Apply multicastRole filtering before output. + + @param result: Single packet, list of packets, None or False + @return Final packet(s) or False if blocked""" + if result is None or result is False: + return result + + def get_role(packet): + """Helper to extract multicastRole from Packet object""" + packet_dict = self._get_packet_data(packet) + return packet_dict.get("multicastRole") + + if isinstance(result, list): + filtered = [p for p in result if self._should_output_packet(get_role(p))] + if not filtered: + logging.debug("All packets filtered out by multicastRole") + return False + return filtered if len(filtered) > 1 else filtered[0] + else: + if self._should_output_packet(get_role(result)): + return result + logging.debug("Packet filtered out: multicastRole=%s", get_role(result)) + return False + + def _combine_results(self, *results): + r"""!Combine multiple result sources into a single list or status. + + @param results: Multiple packet objects, lists, or booleans + @return combined list, False or None""" + combined = [] + has_false = False + for result in results: + if result is False: + has_false = True + continue + if result is None: + continue + if isinstance(result, list): + combined.extend(result) + else: + combined.append(result) + if combined: + return combined + return False if has_false else None + + def _should_output_packet(self, multicast_role): + r"""!Check if packet should be output based on role. + + @param multicast_role: The role string to check + @return bool: True if allowed""" + if self._block_delimiter and multicast_role == "delimiter": + return False + if self._block_netident and multicast_role == "netident": + return False + return True + +# ============================================================ +# TONE-RIC BUFFER MANAGEMENT +# ============================================================ + + def _add_tone_ric_packet(self, freq, packet_dict): + r"""!Add a tone-RIC to the shared buffer. + + @param freq: Frequency identifier + @param packet_dict: Dictionary containing packet data + @return None""" + with BoswatchModule._lock: + stored_packet = packet_dict.copy() + stored_packet['_multicast_timestamp'] = time.time() + BoswatchModule._tone_ric_packets[freq].append(stored_packet) + BoswatchModule._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp'] + logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(BoswatchModule._tone_ric_packets[freq]), freq) + + def _get_queued_packets(self): + r"""!Pop and return all packets currently in the static queue. + + @param None + @return list: List of packets or None""" + with BoswatchModule._queue_lock: + if BoswatchModule._packet_queue: + packets = BoswatchModule._packet_queue[:] + BoswatchModule._packet_queue.clear() + return packets + return None + +# ============================================================ +# MULTICAST PACKET CREATION +# ============================================================ + + def _copy_packet_dict_to_packet(self, recipient_dict, packet, index=1): + r"""!Copy dict fields to Packet with timestamp shift for DB uniqueness. + + @param recipient_dict: Source dictionary + @param packet: Target Packet object + @param index: Packet index (1-based) - shifts timestamp by milliseconds + @return None""" + for k, v in recipient_dict.items(): + if k.startswith('_'): + continue + if k == 'timestamp' and index > 1: + try: + dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S') + dt_shifted = dt + datetime.timedelta(milliseconds=index - 1) + packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S')) + except: + packet.set(k, str(v)) + else: + packet.set(k, str(v)) + + def _distribute_complete(self, freq, text_packet_dict): + r"""!Create full multicast packets with message content. + + @param freq: Frequency identifier + @param text_packet_dict: Data of the message-carrying packet + @return list: List of fully populated Packet instances""" + with BoswatchModule._lock: + recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() + logging.debug("Text-RIC gefunden. Matche gegen %d gespeicherte RICs", len(recipient_dicts)) + BoswatchModule._tone_ric_packets[freq].clear() + BoswatchModule._last_tone_ric_time.pop(freq, None) + + if not recipient_dicts: + return [] + text_ric = text_packet_dict.get("ric") + message_text = text_packet_dict.get("message") + alarm_packets = [] + + for idx, recipient_dict in enumerate(recipient_dicts, 1): + p = Packet() + self._copy_packet_dict_to_packet(recipient_dict, p, idx) + p.set("message", message_text) + self._apply_list_tags(p, recipient_dicts) + self._set_mcast_metadata(p, "complete", "recipient", text_ric, len(recipient_dicts), idx) + alarm_packets.append(p) + + logging.info("[%s] Generated %d complete multicast packets for text-RIC %s", self.name, len(alarm_packets), text_ric) + return alarm_packets + + def _create_incomplete_multicast(self, freq, recipient_dicts): + r"""!Generate multicast packets for timeouts (no text message). + + @param freq: Frequency identifier + @param recipient_dicts: List of recipient data dictionaries + @return list: List of incomplete Packet instances""" + if not recipient_dicts: + return [] + first_ric = recipient_dicts[0].get("ric", "unknown") + incomplete_packets = [] + for idx, recipient_dict in enumerate(recipient_dicts, 1): + p = Packet() + self._copy_packet_dict_to_packet(recipient_dict, p, idx) + p.set("message", "") + self._apply_list_tags(p, recipient_dicts) + self._set_mcast_metadata(p, "incomplete", "recipient", first_ric, len(recipient_dicts), idx) + incomplete_packets.append(p) + return incomplete_packets + + def _enrich_normal_alarm(self, bwPacket, packet_dict): + r"""!Enrich a standard single alarm with multicast metadata. + + @param bwPacket: Target Packet object + @param packet_dict: Source data dictionary + @return list: List containing the enriched packet""" + self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1) + self._apply_list_tags(bwPacket, [packet_dict]) + self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1") + logging.debug(f"Erstelle Single-Alarm für RIC {packet_dict.get('ric')}") + return [bwPacket] + + def _handle_delimiter(self, freq, ric, bwPacket=None): + r"""!Handle delimiter packet and clear orphaned tone-RICs. + + @param freq: Frequency identifier + @param ric: Delimiter RIC + @param bwPacket: Optional delimiter packet instance + @return list: Incomplete packets or delimiter control packet""" + with BoswatchModule._lock: + orphaned = BoswatchModule._tone_ric_packets[freq].copy() + BoswatchModule._tone_ric_packets[freq].clear() + BoswatchModule._last_tone_ric_time.pop(freq, None) + BoswatchModule._processing_text_ric[freq] = False + + if orphaned: + age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time()) + + logging.debug("[%s] Delimiter RIC=%s cleared %d orphaned tone-RICs on freq %s: RICs=[%s], age=%.1fs → Creating INCOMPLETE multicast packets for forwarding", self.name, ric, len(orphaned), freq, ', '.join([packet.get('ric', 'unknown') for packet in orphaned]), age_seconds) + return self._create_incomplete_multicast(freq, orphaned) + if bwPacket is not None: + self._set_mcast_metadata(bwPacket, "control", "delimiter", ric, "0", "0") + return [bwPacket] + return None + +# ============================================================ +# PACKET METADATA HELPERS +# ============================================================ + + def _set_mcast_metadata(self, packet, mode, role, source="", count="1", index="1"): + r"""!Helper to set standard multicast fields and register wildcards. + + @param packet: The Packet instance to modify + @param mode: multicastMode (complete, incomplete, single, control) + @param role: multicastRole (recipient, single, delimiter, netident) + @param source: The originating RIC + @param count: Total number of recipients + @param index: Current recipient index + @return None""" + logging.debug(f"Setze Metadata - Mode: {mode}, Role: {role} für RIC: {source}") + mapping = { + "multicastMode": (mode, "{MCAST_MODE}"), + "multicastRole": (role, "{MCAST_ROLE}"), + "multicastSourceRic": (source, "{MCAST_SOURCE}"), + "multicastRecipientCount": (str(count), "{MCAST_COUNT}"), + "multicastRecipientIndex": (str(index), "{MCAST_INDEX}") + } + for key, (val, wildcard) in mapping.items(): + packet.set(key, val) + self._register_wildcard_safe(wildcard, key) + + def _apply_list_tags(self, packet, recipient_dicts): + r"""!Helper to aggregate fields from all recipients into comma-separated lists. + + @param packet: The target Packet instance + @param recipient_dicts: List of dictionaries of all recipients in this group + @return None""" + all_fields = set() + for r in recipient_dicts: + all_fields.update(k for k in r.keys() if not k.startswith('_')) + + for f in sorted(all_fields): + list_val = ", ".join([str(r.get(f, "")) for r in recipient_dicts]) + list_key = f"{f}_list" + packet.set(list_key, list_val) + self._register_wildcard_safe("{" + f.upper() + "_LIST}", list_key) + + def _register_wildcard_safe(self, wildcard, field): + r"""!Register wildcard if not already globally registered. + + @param wildcard: The wildcard string (e.g. {MCAST_MODE}) + @param field: The packet field name + @return None""" + if wildcard not in BoswatchModule._wildcards_registered: + self.registerWildcard(wildcard, field) + BoswatchModule._wildcards_registered.add(wildcard) + +# ============================================================ +# CLEANUP & TIMEOUT MANAGEMENT +# ============================================================ + + @staticmethod + def _global_cleanup_worker(): + r"""!Static background thread that ticks all active module instances. + + @param None + @return None""" + logging.info("Global multicast cleanup ticker active") + while BoswatchModule._running: + time.sleep(1) + with BoswatchModule._lock: + active_instances = BoswatchModule._instances[:] + for instance in active_instances: + try: + instance._perform_instance_tick() + except Exception as e: + logging.error("Error in instance cleanup: %s", e) + if int(time.time()) % 60 == 0: + BoswatchModule._cleanup_hard_timeout_global() + + def _perform_instance_tick(self): + r"""!Tick-entry point for this specific instance. + + @param None + @return None""" + self._check_all_my_frequencies() + + def _check_all_my_frequencies(self): + r"""!Monitor timeouts for all frequencies assigned to this instance. + + @param None + @return None""" + incomplete_packets = [] + trigger_data = [] + + with BoswatchModule._lock: + current_time = time.time() + for freq in list(self._my_frequencies): + if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]: + continue + + if BoswatchModule._processing_text_ric.get(freq, False): + flag_age = current_time - BoswatchModule._processing_text_ric_started.get(freq, current_time) + if flag_age > 2: + BoswatchModule._processing_text_ric[freq] = False + BoswatchModule._processing_text_ric_started.pop(freq, None) + else: + continue + + last_time = BoswatchModule._last_tone_ric_time.get(freq, 0) + if current_time - last_time > self._auto_clear_timeout: + recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() + safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC) + trigger_data.append((freq, safe_ric)) + BoswatchModule._tone_ric_packets[freq].clear() + BoswatchModule._last_tone_ric_time.pop(freq, None) + + logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout) + packets = self._create_incomplete_multicast(freq, recipient_dicts) + if packets: + incomplete_packets.extend(packets) + + if incomplete_packets: + with BoswatchModule._queue_lock: + BoswatchModule._packet_queue.extend(incomplete_packets) + for freq, safe_ric in trigger_data: + self._send_wakeup_trigger(freq, safe_ric) + + def _check_instance_auto_clear(self, freq): + r"""!Check if frequency has exceeded timeout (called from doWork). + + @param freq: Frequency identifier + @return list: Incomplete packets if timeout exceeded, else None""" + with BoswatchModule._lock: + if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]: + return None + last_time = BoswatchModule._last_tone_ric_time.get(freq, 0) + if time.time() - last_time > self._auto_clear_timeout: + recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() + BoswatchModule._tone_ric_packets[freq].clear() + BoswatchModule._last_tone_ric_time.pop(freq, None) + logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts)) + return self._create_incomplete_multicast(freq, recipient_dicts) + return None + + @staticmethod + def _cleanup_hard_timeout_global(): + r"""!Global failsafe for really old packets (ignores instance config). + + @param None + @return None""" + with BoswatchModule._lock: + current_time = time.time() + max_hard_timeout = 120 + if BoswatchModule._instances: + max_hard_timeout = max(inst._hard_timeout for inst in BoswatchModule._instances) + for freq in list(BoswatchModule._tone_ric_packets.keys()): + BoswatchModule._tone_ric_packets[freq] = [ + p for p in BoswatchModule._tone_ric_packets[freq] + if current_time - p.get('_multicast_timestamp', 0) < max_hard_timeout + ] + if not BoswatchModule._tone_ric_packets[freq]: + del BoswatchModule._tone_ric_packets[freq] + +# ============================================================ +# TRIGGER SYSTEM +# ============================================================ + + def _send_wakeup_trigger(self, freq, fallback_ric): + r"""!Send a loopback trigger via socket to wake up the system. + + @param freq: Frequency identifier + @param fallback_ric: RIC to use if no explicit trigger RIC is configured + @return None""" + try: + trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric + payload = { + "timestamp": time.time(), + "mode": "pocsag", + "bitrate": "1200", + "ric": trigger_ric, + "subric": "1", + "subricText": "a", + "message": BoswatchModule._MAGIC_WAKEUP_MSG, + "clientName": "MulticastTrigger", + "inputSource": "loopback", + "frequency": freq + } + json_str = json.dumps(payload) + header = f"{len(json_str):<10}" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(1.0) + sock.connect((self._trigger_host, self._trigger_port)) + sock.sendall(header.encode('utf-8')) + sock.sendall(json_str.encode('utf-8')) + logging.debug("[%s] Wakeup trigger sent for freq %s (RIC=%s)", self.name, freq, trigger_ric) + except Exception as e: + logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e) + +# ============================================================ +# LIFECYCLE (End) +# ============================================================ + + def onUnload(self): + r"""!Unregister instance from the global cleanup process. + + @param None + @return None""" + with BoswatchModule._lock: + if self in BoswatchModule._instances: + BoswatchModule._instances.remove(self) + logging.debug("[%s] Multicast instance unloaded", self.name) diff --git a/plugin/pluginBase.py b/plugin/pluginBase.py index d0cc5b0..28b90e7 100644 --- a/plugin/pluginBase.py +++ b/plugin/pluginBase.py @@ -65,6 +65,15 @@ class PluginBase(ABC): The alarm() method serves the BOSWatch packet to the plugin. @param bwPacket: A BOSWatch packet instance""" + + # --- FIX: Multicast list support --- + if isinstance(bwPacket, list): + # if we got a list of packets, we have to run each packet through the complete alarm process (Setup -> Alarm -> Teardown) + for single_packet in bwPacket: + self._run(single_packet) + return None + # --------------------------------------------------------------------- + self._runCount += 1 logging.debug("[%s] run #%d", self._pluginName, self._runCount)