From 0b9387af08298d4b38855f8d635ac48bfac4c1e5 Mon Sep 17 00:00:00 2001 From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:25:28 +0100 Subject: [PATCH 1/6] [feat/multicast]: add multi-instance multicast module with active trigger system Introduce a robust multicast processing module for POCSAG that correlates empty tone-RICs (recipients) with subsequent text-RICs (content). Key Features: - Four Output Modes: Internally supports 'complete', 'incomplete', 'single', and 'control'. Functional alarms are delivered as the first three, while technical 'control' packets (Delimiters/NetIdent) are filtered by default. - Active Trigger System: Implements a loss-free deferred delivery mechanism using a loopback socket (TCP) to re-inject wakeup packets, flushing the internal queue during auto-clear timeouts. - Shared State & Multi-Instance: State is shared across instances but separated by frequency to prevent crosstalk in multi-frequency setups. - Data Aggregation: Automatically generates '{FIELD}_list' wildcards (e.g., RIC_LIST, DESCRIPTION_LIST) for all collected recipients, enabling consolidated notifications in downstream plugins. - Dynamic Filtering: Automatically blocks Delimiter and NetIdent RICs from reaching subsequent plugins if they are defined in the configuration. Infrastructural Changes: - ModuleBase: Expanded return semantics to support: * False: Explicitly blocks/drops a packet. * List: Allows a module to expand one input into multiple output packets. - PluginBase: Updated to handle lists of packets, ensuring a full setup->alarm->teardown lifecycle for every individual element. --- docu/docs/modul/multicast.md | 394 +++++++++++++++++++++ docu/mkdocs.yml | 3 +- module/moduleBase.py | 26 ++ module/multicast.py | 638 +++++++++++++++++++++++++++++++++++ plugin/pluginBase.py | 9 + 5 files changed, 1069 insertions(+), 1 deletion(-) create mode 100644 docu/docs/modul/multicast.md create mode 100644 module/multicast.py diff --git a/docu/docs/modul/multicast.md b/docu/docs/modul/multicast.md new file mode 100644 index 0000000..def5284 --- /dev/null +++ b/docu/docs/modul/multicast.md @@ -0,0 +1,394 @@ +#
Multicast
+--- + +## Beschreibung +Mit diesem Modul können Multicast-Alarme verarbeitet werden. Dabei wird eine Alarmnachricht automatisch an mehrere Empfänger (RICs) verteilt. + +### Funktionsweise +Multicast-Alarme funktionieren in zwei bis drei Phasen: + +1. **Delimiter-Phase (Optional)**: Ein spezieller Delimiter-RIC markiert den Start eines neuen Multicast-Blocks und löscht vorherige wartende Tone-RICs. Der Delimiter selbst wird nicht als Alarm ausgegeben (automatische Filterung). Diese Phase ist optional - ohne Delimiter werden alle leeren Nachrichten als Tone-RICs behandelt. + +2. **Tone-RIC-Phase**: Mehrere RICs empfangen (meist) leere Nachrichten. Diese definieren die Empfänger und "registrieren" sich im Modul als Empfänger für die nächste Multicast-Nachricht. + +3. **Text-RIC**: Ein spezieller Message-RIC empfängt die eigentliche Alarmnachricht. Diese wird dann automatisch an alle zuvor gesammelten Tone-RICs verteilt. + +**Beispiel:** +``` +10:31:16 - RIC: 0123456 SubRIC: 1 Message: (leer) → Delimiter-RIC +10:31:16 - RIC: 0234567 SubRIC: 4 Message: (leer) → Empfänger 1 +10:31:16 - RIC: 0345678 SubRIC: 3 Message: (leer) → Empfänger 2 +10:31:17 - RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" → Message-RIC + +Generierte Alarme: +→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" +→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" +``` + +**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC, da diese oft unterschiedliche Alarmtypen oder Prioritäten repräsentiert. + +Das Modul unterstützt: + +- Mehrere Startmarker (Delimiter) +- Mehrere Text-RICs +- Netzident-RIC zur Paketfilterung +- Automatische Bereinigung alter Tone-RICs (Fehlerfall: Auto-Clear) +- Active Trigger System zur verlustfreien Paketauslieferung +- Wildcards für spätere Weiterverarbeitung +- Frequenz-basierte Trennung +- Multi-Instanz-Betrieb mit geteiltem Zustand + +Hinweis: Der Delimiter-RIC (0123456) wird automatisch gefiltert und nicht ausgegeben. + +**Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Falls keine Tone-RICs vorhanden sind, wird die Text-RIC als `multicastMode: single` ausgegeben. + +## Unterstützte Alarmtypen +- POCSAG + +## Resource +`multicast` + +## Konfiguration + +|Feld|Beschreibung|Default| +|----|------------|-------| +|autoClearTimeout|Auto-Clear Timeout in Sekunden - Nicht zugestellte Empfänger werden nach dieser Zeit als incomplete ausgegeben|10| +|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden automatisch gefiltert wenn konfiguriert)|leer| +|textRics|Komma-getrennte Liste von RICs, die den Alarmtext tragen|leer| +|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden automatisch gefiltert wenn konfiguriert)|leer| +|triggerRic|RIC für das Wakeup-Trigger-Paket (optional, bei leer: dynamisch = erste Tone-RIC)|leer| +|triggerHost|IP-Adresse für Loopback-Trigger|127.0.0.1| +|triggerPort|Port für Loopback-Trigger|8080| + +**Achtung:** Zahlen welche führende Nullen enthalten müssen in Anführungszeichen gesetzt werden, z.B. `'0012345'` + +### Konfigurationsbeispiel 1: Automatische Delimiter-Erkennung (oder nicht verfügbar im Netzwerk) (= Minimalkonfiguration) +```yaml +- type: module + res: multicast + name: Multicast Handler + config: + textRics: '0299001,0310001' +``` +In diesem Modus werden **alle leeren Nachrichten** als toneRics behandelt (keine `delimiterRics` angegeben). + +### Konfigurationsbeispiel 2: Mit Delimiter-Trenner (empfohlen) +```yaml +- type: module + res: multicast + name: Multicast Handler + config: + autoClearTimeout: 10 + delimiterRics: '0988988' + textRics: '0299001,0310001' +``` +In diesem Modus wird **0988988 als Trenner (= Delimiter)** behandelt und **alle anderen leeren Nachrichten als Empfänger**. + +### Konfigurationsbeispiel 3: Mit expliziter Trigger-RIC +```yaml +- type: module + res: multicast + name: Multicast Handler + config: + autoClearTimeout: 10 + delimiterRics: '0988988' + textRics: '0299001,0310001' + triggerRic: '9999999' + triggerHost: '127.0.0.1' + triggerPort: 8080 +``` +Verwendet eine feste RIC (9999999) für das interne Wakeup-Trigger-Paket. + +### Konfigurationsbeispiel 4: Mit Netzident-Filterung +```yaml +- type: module + res: multicast + name: Multicast Handler + config: + autoClearTimeout: 10 + delimiterRics: '0988988' + textRics: '0299001,0310001' + netIdentRics: '0000001' +``` +Filtert Netzident-Pakete (RIC 0000001) automatisch aus der Weiterverarbeitung. + +## Integration in Router-Konfiguration + +Das Multicast-Modul sollte **vor** den Plugins platziert werden, damit die generierten Alarme korrekt verarbeitet werden: + +```yaml +- name: Router POCSAG + route: + - type: module + res: filter.modeFilter + name: Filter POCSAG + config: + allowed: + - pocsag + + # Multicast-Modul hier einfügen + - type: module + res: multicast + name: Multicast Handler + config: + textRics: '0299001,0310001' + delimiterRics: '0288088' + autoClearTimeout: 10 + + # Weitere Module und Plugins + - type: plugin + res: mysql + config: + # ... + + - type: plugin + res: telegram + config: + # ... +``` + +## Beispielhafte Verwendung in Router-Konfigurationen +Das Multicast-Modul gibt für jedes RIC ein eigenes Paket aus UND generiert für konsistente Verarbeitung Listenfelder. +Dies ermöglicht es, entweder jede RIC einzeln zu verarbeiten oder die Listenfelder für eine gesammelte Ausgabe zu verwenden. Vor der weiteren Verarbeitung in Plugins empfiehlt sich eventuell eine Filterung mittels [RegEx-Filter](regex_filter.md). +Die folgenden Beispiele dienen zur Veranschaulichung der Möglichkeiten des Multicast-Modul in Verbindung mit RegEx-Filter. + + +### Beispiel (Zusätzliche Wildcards werden noch später in diesem Readme erklärt): +```yaml +router: +- name: Router POCSAG + route: + - type: module + res: filter.modeFilter + config: + [...] + - type: module + res: filter.doubleFilter + config: + [...] + - type: module + res: descriptor + config: + [...] + - type: module + res: multicast + name: Multicast + config: + autoClearTimeout: 10 + delimiterRics: '0123456' # Start eines Multicast-Alarms + textRics: '9909909' # Text-RIC + - type: router + res: RouterMySQL + - type: router + res: RouterTelegram + +- name: RouterMySQL + route: + - type: module + res: filter.regexFilter + name: Filter MySQL + config: + - name: "Multicast Mode complete or single" + checks: + - field: multicastMode + regex: ^(complete|single)$ + - type: plugin + res: mysql + config: + [...] + +- name: RouterTelegram + route: + - type: module + res: filter.regexFilter + name: Multicast Recipient Index Filter # 1. Paket, da ist alles drin für einen kombinierten Alarm und ist immer vorhanden + config: + - name: "Multicast 1 Paket pro Alarm-Paket" + checks: + - field: multicastRecipientIndex + regex: ^1$ + - type: plugin + res: telegram + config: + message_pocsag: | + {CNAME} + {MSG} + Alarmierte Einheiten [{MCAST_COUNT}]: {DESCRIPTION_LIST} + RICs: {RIC_LIST} + {TIME} + [...] + +``` + +--- +## Modul Abhängigkeiten +- keine + +--- +## Externe Abhängigkeiten +- keine + +--- +## Paket Modifikationen + +### Hinzugefügte Felder bei Multicast-Alarmen: +- `multicastMode`(string): Beschreibt das Ergebnis der Multicast-Verarbeitung, besitzt einen der Werte: + + - `complete`: Vollständiges Multicast-Packet + - `incomplete`: Unvollständiges Multicast-Packet (meist fehlt die Text-RIC) + - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC) + - `control`: Netzwerk-Ident-RIC oder andere Verwaltung-RICs + +- `multicastRole` (string): Beschreibt die Rolle dieses Pakets innerhalb des Multicast-Ablaufs, besitzt einen der Werte: + + - `delimiter`: Startmarker-Paket (wird automatisch gefiltert wenn delimiterRics konfiguriert) + - `recipient`: tatsächlicher Empfänger + - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC) + - `netident`: Netzwerk-Identifikations-Paket (wird automatisch gefiltert wenn netIdentRics konfiguriert) + +- `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs +- `multicastRecipientCount` (string): Anzahl der Empfänger insgesamt +- `multicastRecipientIndex` (string): Index dieses Empfängers (1-N), folgende Logik: + + - Empfänger haben den Index 1 bis n. + - Delimiter/Singles haben Index 1 (da sie alleinstehen). + +- `_list` (string): Liste von Werten aus allen Empfänger-RICs für jedes Originalfeld (z.B. ric_list, message_list) + +### Veränderte Felder bei Multicast-Alarmen: +- `ric`: Wird durch Empfänger-RIC ersetzt +- `subric`: Wird durch Empfänger-SubRIC ersetzt +- `subricText`: Wird durch Empfänger-SubRIC-Text ersetzt +- `message`: Bei incomplete-Modus leer, sonst Text von Text-RIC + +### Rückgabewerte: +- **False**: Paket wurde blockiert (z.B. Delimiter/Netident-Filterung), Router stoppt Verarbeitung +- **Liste von Paketen**: Multicast-Verteilung, Router verarbeitet jedes Paket einzeln +- **None**: Normaler Alarm, Router fährt mit unveränderten Paket fort + +--- +## Zusätzliche Wildcards + +Folgende Wildcards stehen in allen nachfolgenden Plugins zur Verfügung: + +|Wildcard|Beschreibung|Beispiel| +|--------|------------|--------| +|{MCAST_SOURCE}|RIC des ursprünglichen Message-RICs|0299001| +|{MCAST_COUNT}|Gesamtanzahl der Empfänger dieses Multicasts.|3| +|{MCAST_INDEX}|Index des Empfängers (1-basiert für Recipients, 0 für Control-Pakete)|0, 1, 2, 3, ...| +|{MCAST_MODE}|Art der Multicast-Verarbeitung durch das Modul|complete, incomplete, single, control| +|{MCAST_ROLE}|Rolle des Pakets im Multicast-Ablauf|recipient, single, delimiter, netident| + +## Erweiterung der Listen-Wildcards +Das Modul generiert Wildcards für alle gesammelten Felder (RICs, SubRICs, etc.) in Listenform. Diese sind besonders nützlich, um eine kombinierte Ausgabe (z.B. in Telegram) zu erstellen: + +|Wildcard|Beschreibung|Zugrundeliegendes Feld|Beispiel| +|--------|------------|--------|--------| +|{RIC_LIST}|Liste aller RICs der Empfänger (durch Komma getrennt).|ric_list|"0299001, 0299002"| +|{SUBRIC_LIST}|Liste aller SubRICs der Empfänger|subric_list|"4, 3"| +|{DESCRIPTION_LIST}|Liste aller (deskriptiven) Namen der Empfänger.|description_list|"FF Musterstadt, BF Beispiel"| +|{_LIST}|Liste der Werte für jedes Originalfeld aus dem Paket|_list|{FREQUENCY_LIST}, {BITRATE_LIST}| + +**Wichtig:** Verwende die **originalen Feldnamen** (z.B. `frequency_list`), nicht die Wildcard-Namen (z.B. ~~`FREQ_list`~~). + +### Verwendungsbeispiel in Plugins, z.B. Telegram-Plugin: +```yaml +- type: plugin + res: telegram + config: + message_pocsag: | + {CNAME} + {MSG} + RIC: {RIC} / SubRIC: {SRIC} + Multicast: {MCAST_INDEX}/{MCAST_COUNT} (Quelle: {MCAST_SOURCE}) + {TIME} +``` + +--- +## Funktionsweise im Detail + +### Active Trigger System (Verlustfreie Paketauslieferung) + +Das Modul verwendet ein aktives Trigger-System, um sicherzustellen, dass **keine Multicast-Pakete verloren gehen**: + +1. **Deferred Delivery**: Bei einem Auto-Clear-Timeout werden die incomplete-Pakete nicht sofort ausgegeben, sondern in einer internen Queue gespeichert. + +2. **Wakeup-Trigger**: Das Modul sendet ein spezielles Trigger-Paket via Loopback-Socket (Standard: 127.0.0.1:8080) zurück an den BOSWatch-Server. + +3. **Queue-Flush**: Beim Empfang des Trigger-Pakets werden alle gespeicherten Pakete aus der Queue ausgegeben. + +**Trigger-RIC Auswahl** (3-stufige Fallback-Kette): +- **Explizit**: Wenn `triggerRic` konfiguriert ist, wird diese RIC verwendet +- **Dynamisch**: Wenn nicht konfiguriert, wird die erste Tone-RIC der aktuellen Gruppe verwendet +- **Fallback**: Falls keine Tone-RICs vorhanden sind (sollte nicht vorkommen), wird "9999999" verwendet + +**Beispiel-Ablauf bei Auto-Clear:** +``` +10:31:16 - Tone-RIC empfangen (0234567) +10:31:16 - Tone-RIC empfangen (0345678) +10:31:26 - Auto-Clear-Timeout (10s) erreicht + → Incomplete-Pakete in Queue gespeichert + → Trigger-Paket via Loopback gesendet (RIC: 0234567) +10:31:26 - Trigger-Paket durch BOSWatch-Server verarbeitet + → Queue-Flush: Incomplete-Pakete werden ausgegeben +``` + +**Vorteile:** +- Keine Pakete gehen verloren, auch bei hoher Systemlast +- Saubere Trennung von Verarbeitung und Ausgabe +- Ermöglicht zeitlich versetzte Ausgabe ohne Race Conditions + +### Zeitbasierte Verarbeitung + +1. **Tone-RIC-Sammlung**: Tone-RICs (meist leere Nachrichten) werden empfangen und gespeichert +2. **Auto-Clear**: Nach `autoClearTimeout` Sekunden ohne Text-RIC werden die Tone-RICs als incomplete ausgegeben (via Trigger-System) +3. **Text-RIC-Verteilung**: Sobald ein Text-RIC empfangen wird, erfolgt die sofortige Verteilung an alle gesammelten Tone-RICs +4. **Hard-Timeout-Cleanup**: Nach 3x `autoClearTimeout` (oder max. 120s) werden veraltete Pakete aus dem Speicher gelöscht (Failsafe) + +### Frequenz-Trennung + +Das Modul trennt Multicast-Listen nach Frequenzen. Dies verhindert Vermischung von Alarmen verschiedener Sender. + +**Beispiel:** +``` +Frequenz 173.050 MHz: Tone-RICs [0234567, 0345678] +Frequenz 173.075 MHz: Tone-RICs [0456789, 0567890] +→ Werden getrennt verarbeitet, keine Vermischung möglich +``` + +### SubRIC-Erhaltung + +**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC aus der Tone-RIC-Phase. Dies ist entscheidend, da SubRICs unterschiedliche Bedeutungen haben können, z.B.: + +- SubRIC 1 (a): Alarmierung +- SubRIC 2 (b): Information +- SubRIC 3 (c): Probealarm +- SubRIC 4 (d): Sirenenalarm + +**Beispiel:** +``` +Eingehende Tone-RICs: +- RIC: 0234567 SubRIC: 4 (Sirenenalarm) +- RIC: 0345678 SubRIC: 3 (Probealarm) + +Text-RIC: RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" + +Ausgegebene Multicast-Pakete: +→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" (behält SubRIC 4!) +→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!) +``` + +### Automatische Filterung + +- **Delimiter-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `delimiterRics` konfiguriert ist +- **Netzident-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `netIdentRics` konfiguriert ist +- **Filterung nach multicastRole**: Ermöglicht saubere nachgelagerte Verarbeitung ohne manuelle Filter + +### Multi-Instanz-Betrieb + +Das Modul unterstützt mehrere parallele Instanzen mit geteiltem Zustand: + +- **Shared State**: Alle Instanzen teilen sich den Tone-RIC-Speicher (frequenz-basiert) +- **Instance-Specific Timeouts**: Jede Instanz kann eigene `autoClearTimeout`-Werte haben +- **Global Cleanup Thread**: Ein globaler Thread prüft alle Instanzen auf Timeouts +- **Thread-Safe**: Alle Operationen sind thread-sicher mit Locks geschützt \ No newline at end of file diff --git a/docu/mkdocs.yml b/docu/mkdocs.yml index c2602dd..73d112e 100644 --- a/docu/mkdocs.yml +++ b/docu/mkdocs.yml @@ -20,10 +20,11 @@ nav: - Changelog: changelog.md - Module: - Descriptor: modul/descriptor.md + - Double Filter: modul/double_filter.md - Geocoding: modul/geocoding.md - Mode Filter: modul/mode_filter.md + - Multicast: modul/multicast.md - Regex Filter: modul/regex_filter.md - - Double Filter: modul/double_filter.md - Plugins: - Http: plugin/http.md - Telegram: plugin/telegram.md diff --git a/module/moduleBase.py b/module/moduleBase.py index 1ce8ccd..568a24a 100644 --- a/module/moduleBase.py +++ b/module/moduleBase.py @@ -56,6 +56,32 @@ class ModuleBase(ABC): @param bwPacket: A BOSWatch packet instance @return bwPacket or False""" + + # --- FIX: Multicast list support for Module --- + if isinstance(bwPacket, list): + result_packets = [] + for single_packet in bwPacket: + # Recursive call for single packet + processed = self._run(single_packet) + + # new logic: + if processed is False: + # filter called 'False' -> packet discarded + continue + elif processed is None: + # module returned None -> keep packet unchanged + result_packets.append(single_packet) + elif isinstance(processed, list): + # module returned new list -> extend + result_packets.extend(processed) + else: + # module returned modified packet -> add + result_packets.append(processed) + + # if list is not empty, return it. else False (filter all). + return result_packets if result_packets else False + # ----------------------------------------------- + self._runCount += 1 logging.debug("[%s] run #%d", self._moduleName, self._runCount) diff --git a/module/multicast.py b/module/multicast.py new file mode 100644 index 0000000..d8ae05e --- /dev/null +++ b/module/multicast.py @@ -0,0 +1,638 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +r"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: multicast.py +@date: 26.01.2025 +@author: Claus Schichl +@description: multicast module +""" + +import logging +import time +import threading +import socket +import json +import datetime +from collections import defaultdict +from module.moduleBase import ModuleBase +from boswatch.packet import Packet + +logging.debug("- %s loaded", __name__) + + +class BoswatchModule(ModuleBase): + r"""!Multicast module with multi-instance support and active trigger mechanism + + This module handles multicast alarm distribution. + It manages the correlation between tone-RICs (recipients) and text-RICs (message content), + ensuring reliable alarm delivery even in complex multi-frequency scenarios. + """ + + # CLASS VARIABLES - SHARED STATE + _tone_ric_packets = defaultdict(list) + _last_tone_ric_time = defaultdict(float) + _processing_text_ric = defaultdict(bool) + _processing_text_ric_started = defaultdict(float) + + # SYSTEM VARIABLES + _lock = threading.Lock() + _cleanup_thread = None + _running = False + _wildcards_registered = set() + _packet_queue = [] + _queue_lock = threading.Lock() + _instances = [] + + # Trigger defaults + _TRIGGER_HOST = "127.0.0.1" + _TRIGGER_PORT = 8080 + _MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###" + _DEFAULT_TRIGGER_RIC = "9999999" + +# ============================================================ +# LIFECYCLE METHODS +# ============================================================ + + def __init__(self, config): + super().__init__(__name__, config) + + def onLoad(self): + r"""!Initialize module configuration and start the global cleanup thread. + + @param None + @return None""" + self._my_frequencies = set() + self.instance_id = hex(id(self))[-4:] + self.name = f"MCAST_{self.instance_id}" + + self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10)) + self._hard_timeout = self._auto_clear_timeout * 3 + + def parse_list(key): + val = self.config.get(key) + if val: + return [x.strip() for x in str(val).split(",") if x.strip()] + return [] + + self._delimiter_rics = parse_list("delimiterRics") + self._text_rics = parse_list("textRics") + self._netident_rics = parse_list("netIdentRics") + + trigger_ric_cfg = self.config.get("triggerRic") + if trigger_ric_cfg: + self._trigger_ric = str(trigger_ric_cfg).strip() + self._trigger_ric_mode = "explicit" + else: + self._trigger_ric = None + self._trigger_ric_mode = "dynamic" + + self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST) + self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT)) + + self._block_delimiter = bool(self._delimiter_rics) + self._block_netident = bool(self._netident_rics) + + logging.info("[%s] Multicast module loaded", self.name) + + with BoswatchModule._lock: + if self not in BoswatchModule._instances: + BoswatchModule._instances.append(self) + + if not BoswatchModule._running: + BoswatchModule._running = True + BoswatchModule._cleanup_thread = threading.Thread( + target=BoswatchModule._global_cleanup_worker, daemon=True + ) + BoswatchModule._cleanup_thread.start() + logging.info("Global multicast cleanup thread started") + +# ============================================================ +# MAIN PROCESSING +# ============================================================ + + def doWork(self, bwPacket): + r"""!Process an incoming packet and handle multicast logic. + + @param bwPacket: A BOSWatch packet instance or list of packets + @return bwPacket, a list of packets, or False if blocked""" + if isinstance(bwPacket, list): + result_packets = [] + for single_packet in bwPacket: + processed = self.doWork(single_packet) + if processed is None: + result_packets.append(single_packet) + elif processed is not False: + if isinstance(processed, list): + result_packets.extend(processed) + else: + result_packets.append(processed) + return result_packets if result_packets else None + + packet_dict = self._get_packet_data(bwPacket) + msg = packet_dict.get("message") + ric = packet_dict.get("ric") + freq = packet_dict.get("frequency", "default") + mode = packet_dict.get("mode") + + if msg == BoswatchModule._MAGIC_WAKEUP_MSG: + if self._trigger_ric and ric != self._trigger_ric: + pass + else: + logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric) + queued = self._get_queued_packets() + return queued if queued else False + + if mode != "pocsag": + queued = self._get_queued_packets() + return queued if queued else None + + self._my_frequencies.add(freq) + + is_text_ric = False + if self._text_rics: + is_text_ric = ric in self._text_rics and msg and msg.strip() + else: + with BoswatchModule._lock: + is_text_ric = msg and msg.strip() and len(BoswatchModule._tone_ric_packets[freq]) > 0 + + if is_text_ric: + with BoswatchModule._lock: + BoswatchModule._processing_text_ric[freq] = True + BoswatchModule._processing_text_ric_started[freq] = time.time() + + queued_packets = self._get_queued_packets() + incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq) + + if self._netident_rics and ric in self._netident_rics: + self._set_mcast_metadata(bwPacket, "control", "netident", ric) + result = self._combine_results(incomplete_packets, queued_packets, [bwPacket]) + return self._filter_output(result) + + if self._delimiter_rics and ric in self._delimiter_rics: + delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket) + result = self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets) + return self._filter_output(result) + + if not msg or not msg.strip(): + self._add_tone_ric_packet(freq, packet_dict) + result = self._combine_results(incomplete_packets, queued_packets, False) + return self._filter_output(result) + + if is_text_ric and msg: + logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric) + alarm_packets = self._distribute_complete(freq, packet_dict) + with BoswatchModule._lock: + BoswatchModule._processing_text_ric[freq] = False + BoswatchModule._processing_text_ric_started.pop(freq, None) + + if not alarm_packets: + logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric) + normal = self._enrich_normal_alarm(bwPacket, packet_dict) + result = self._combine_results(normal, incomplete_packets, queued_packets) + else: + result = self._combine_results(alarm_packets, incomplete_packets, queued_packets) + return self._filter_output(result) + + if msg: + normal = self._enrich_normal_alarm(bwPacket, packet_dict) + result = self._combine_results(normal, incomplete_packets, queued_packets) + return self._filter_output(result) + + result = self._combine_results(incomplete_packets, queued_packets) + return self._filter_output(result) + +# ============================================================ +# PACKET PROCESSING HELPERS (called by doWork) +# ============================================================ + + def _get_packet_data(self, bwPacket): + r"""!Safely extract all fields from packet as a dictionary. + + Handles both dict objects and Packet instances. + Dynamically extracts all fields including those added by other modules. + + @param bwPacket: Packet instance or dict + @return dict: Complete dictionary of all packet fields""" + # 1. Fall: Es ist bereits ein Dictionary + if isinstance(bwPacket, dict): + return bwPacket.copy() + + # 2. Fall: Es ist ein Packet-Objekt (Daten liegen in _packet) + if hasattr(bwPacket, '_packet'): + return bwPacket._packet.copy() + + # 3. Fallback: Falls es ein anderes Objekt ist, versuche __dict__ ohne '_' Filter für 'packet' + try: + return {k: v for k, v in bwPacket.__dict__.items() if not k.startswith('_')} + except Exception as e: + logging.warning("[%s] Error: %s", self.name, e) + return {} + + def _filter_output(self, result): + r"""!Apply multicastRole filtering before output. + + @param result: Single packet, list of packets, None or False + @return Final packet(s) or False if blocked""" + if result is None or result is False: + return result + + def get_role(packet): + """Helper to extract multicastRole from Packet object""" + packet_dict = self._get_packet_data(packet) + return packet_dict.get("multicastRole") + + if isinstance(result, list): + filtered = [p for p in result if self._should_output_packet(get_role(p))] + if not filtered: + logging.debug("All packets filtered out by multicastRole") + return False + return filtered if len(filtered) > 1 else filtered[0] + else: + if self._should_output_packet(get_role(result)): + return result + logging.debug("Packet filtered out: multicastRole=%s", get_role(result)) + return False + + def _combine_results(self, *results): + r"""!Combine multiple result sources into a single list or status. + + @param results: Multiple packet objects, lists, or booleans + @return combined list, False or None""" + combined = [] + has_false = False + for result in results: + if result is False: + has_false = True + continue + if result is None: + continue + if isinstance(result, list): + combined.extend(result) + else: + combined.append(result) + if combined: + return combined + return False if has_false else None + + def _should_output_packet(self, multicast_role): + r"""!Check if packet should be output based on role. + + @param multicast_role: The role string to check + @return bool: True if allowed""" + if self._block_delimiter and multicast_role == "delimiter": + return False + if self._block_netident and multicast_role == "netident": + return False + return True + +# ============================================================ +# TONE-RIC BUFFER MANAGEMENT +# ============================================================ + + def _add_tone_ric_packet(self, freq, packet_dict): + r"""!Add a tone-RIC to the shared buffer. + + @param freq: Frequency identifier + @param packet_dict: Dictionary containing packet data + @return None""" + with BoswatchModule._lock: + stored_packet = packet_dict.copy() + stored_packet['_multicast_timestamp'] = time.time() + BoswatchModule._tone_ric_packets[freq].append(stored_packet) + BoswatchModule._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp'] + logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(BoswatchModule._tone_ric_packets[freq]), freq) + + def _get_queued_packets(self): + r"""!Pop and return all packets currently in the static queue. + + @param None + @return list: List of packets or None""" + with BoswatchModule._queue_lock: + if BoswatchModule._packet_queue: + packets = BoswatchModule._packet_queue[:] + BoswatchModule._packet_queue.clear() + return packets + return None + +# ============================================================ +# MULTICAST PACKET CREATION +# ============================================================ + + def _copy_packet_dict_to_packet(self, recipient_dict, packet, index=1): + r"""!Copy dict fields to Packet with timestamp shift for DB uniqueness. + + @param recipient_dict: Source dictionary + @param packet: Target Packet object + @param index: Packet index (1-based) - shifts timestamp by milliseconds + @return None""" + for k, v in recipient_dict.items(): + if k.startswith('_'): + continue + if k == 'timestamp' and index > 1: + try: + dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S') + dt_shifted = dt + datetime.timedelta(milliseconds=index - 1) + packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S')) + except: + packet.set(k, str(v)) + else: + packet.set(k, str(v)) + + def _distribute_complete(self, freq, text_packet_dict): + r"""!Create full multicast packets with message content. + + @param freq: Frequency identifier + @param text_packet_dict: Data of the message-carrying packet + @return list: List of fully populated Packet instances""" + with BoswatchModule._lock: + recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() + logging.debug("Text-RIC gefunden. Matche gegen %d gespeicherte RICs", len(recipient_dicts)) + BoswatchModule._tone_ric_packets[freq].clear() + BoswatchModule._last_tone_ric_time.pop(freq, None) + + if not recipient_dicts: + return [] + text_ric = text_packet_dict.get("ric") + message_text = text_packet_dict.get("message") + alarm_packets = [] + + for idx, recipient_dict in enumerate(recipient_dicts, 1): + p = Packet() + self._copy_packet_dict_to_packet(recipient_dict, p, idx) + p.set("message", message_text) + self._apply_list_tags(p, recipient_dicts) + self._set_mcast_metadata(p, "complete", "recipient", text_ric, len(recipient_dicts), idx) + alarm_packets.append(p) + + logging.info("[%s] Generated %d complete multicast packets for text-RIC %s", self.name, len(alarm_packets), text_ric) + return alarm_packets + + def _create_incomplete_multicast(self, freq, recipient_dicts): + r"""!Generate multicast packets for timeouts (no text message). + + @param freq: Frequency identifier + @param recipient_dicts: List of recipient data dictionaries + @return list: List of incomplete Packet instances""" + if not recipient_dicts: + return [] + first_ric = recipient_dicts[0].get("ric", "unknown") + incomplete_packets = [] + for idx, recipient_dict in enumerate(recipient_dicts, 1): + p = Packet() + self._copy_packet_dict_to_packet(recipient_dict, p, idx) + p.set("message", "") + self._apply_list_tags(p, recipient_dicts) + self._set_mcast_metadata(p, "incomplete", "recipient", first_ric, len(recipient_dicts), idx) + incomplete_packets.append(p) + return incomplete_packets + + def _enrich_normal_alarm(self, bwPacket, packet_dict): + r"""!Enrich a standard single alarm with multicast metadata. + + @param bwPacket: Target Packet object + @param packet_dict: Source data dictionary + @return list: List containing the enriched packet""" + self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1) + self._apply_list_tags(bwPacket, [packet_dict]) + self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1") + logging.debug(f"Erstelle Single-Alarm für RIC {packet_dict.get('ric')}") + return [bwPacket] + + def _handle_delimiter(self, freq, ric, bwPacket=None): + r"""!Handle delimiter packet and clear orphaned tone-RICs. + + @param freq: Frequency identifier + @param ric: Delimiter RIC + @param bwPacket: Optional delimiter packet instance + @return list: Incomplete packets or delimiter control packet""" + with BoswatchModule._lock: + orphaned = BoswatchModule._tone_ric_packets[freq].copy() + BoswatchModule._tone_ric_packets[freq].clear() + BoswatchModule._last_tone_ric_time.pop(freq, None) + BoswatchModule._processing_text_ric[freq] = False + + if orphaned: + age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time()) + + logging.debug("[%s] Delimiter RIC=%s cleared %d orphaned tone-RICs on freq %s: RICs=[%s], age=%.1fs → Creating INCOMPLETE multicast packets for forwarding", self.name, ric, len(orphaned), freq, ', '.join([packet.get('ric', 'unknown') for packet in orphaned]), age_seconds) + return self._create_incomplete_multicast(freq, orphaned) + if bwPacket is not None: + self._set_mcast_metadata(bwPacket, "control", "delimiter", ric, "0", "0") + return [bwPacket] + return None + +# ============================================================ +# PACKET METADATA HELPERS +# ============================================================ + + def _set_mcast_metadata(self, packet, mode, role, source="", count="1", index="1"): + r"""!Helper to set standard multicast fields and register wildcards. + + @param packet: The Packet instance to modify + @param mode: multicastMode (complete, incomplete, single, control) + @param role: multicastRole (recipient, single, delimiter, netident) + @param source: The originating RIC + @param count: Total number of recipients + @param index: Current recipient index + @return None""" + logging.debug(f"Setze Metadata - Mode: {mode}, Role: {role} für RIC: {source}") + mapping = { + "multicastMode": (mode, "{MCAST_MODE}"), + "multicastRole": (role, "{MCAST_ROLE}"), + "multicastSourceRic": (source, "{MCAST_SOURCE}"), + "multicastRecipientCount": (str(count), "{MCAST_COUNT}"), + "multicastRecipientIndex": (str(index), "{MCAST_INDEX}") + } + for key, (val, wildcard) in mapping.items(): + packet.set(key, val) + self._register_wildcard_safe(wildcard, key) + + def _apply_list_tags(self, packet, recipient_dicts): + r"""!Helper to aggregate fields from all recipients into comma-separated lists. + + @param packet: The target Packet instance + @param recipient_dicts: List of dictionaries of all recipients in this group + @return None""" + all_fields = set() + for r in recipient_dicts: + all_fields.update(k for k in r.keys() if not k.startswith('_')) + + for f in sorted(all_fields): + list_val = ", ".join([str(r.get(f, "")) for r in recipient_dicts]) + list_key = f"{f}_list" + packet.set(list_key, list_val) + self._register_wildcard_safe("{" + f.upper() + "_LIST}", list_key) + + def _register_wildcard_safe(self, wildcard, field): + r"""!Register wildcard if not already globally registered. + + @param wildcard: The wildcard string (e.g. {MCAST_MODE}) + @param field: The packet field name + @return None""" + if wildcard not in BoswatchModule._wildcards_registered: + self.registerWildcard(wildcard, field) + BoswatchModule._wildcards_registered.add(wildcard) + +# ============================================================ +# CLEANUP & TIMEOUT MANAGEMENT +# ============================================================ + + @staticmethod + def _global_cleanup_worker(): + r"""!Static background thread that ticks all active module instances. + + @param None + @return None""" + logging.info("Global multicast cleanup ticker active") + while BoswatchModule._running: + time.sleep(1) + with BoswatchModule._lock: + active_instances = BoswatchModule._instances[:] + for instance in active_instances: + try: + instance._perform_instance_tick() + except Exception as e: + logging.error("Error in instance cleanup: %s", e) + if int(time.time()) % 60 == 0: + BoswatchModule._cleanup_hard_timeout_global() + + def _perform_instance_tick(self): + r"""!Tick-entry point for this specific instance. + + @param None + @return None""" + self._check_all_my_frequencies() + + def _check_all_my_frequencies(self): + r"""!Monitor timeouts for all frequencies assigned to this instance. + + @param None + @return None""" + incomplete_packets = [] + trigger_data = [] + + with BoswatchModule._lock: + current_time = time.time() + for freq in list(self._my_frequencies): + if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]: + continue + + if BoswatchModule._processing_text_ric.get(freq, False): + flag_age = current_time - BoswatchModule._processing_text_ric_started.get(freq, current_time) + if flag_age > 2: + BoswatchModule._processing_text_ric[freq] = False + BoswatchModule._processing_text_ric_started.pop(freq, None) + else: + continue + + last_time = BoswatchModule._last_tone_ric_time.get(freq, 0) + if current_time - last_time > self._auto_clear_timeout: + recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() + safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC) + trigger_data.append((freq, safe_ric)) + BoswatchModule._tone_ric_packets[freq].clear() + BoswatchModule._last_tone_ric_time.pop(freq, None) + + logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout) + packets = self._create_incomplete_multicast(freq, recipient_dicts) + if packets: + incomplete_packets.extend(packets) + + if incomplete_packets: + with BoswatchModule._queue_lock: + BoswatchModule._packet_queue.extend(incomplete_packets) + for freq, safe_ric in trigger_data: + self._send_wakeup_trigger(freq, safe_ric) + + def _check_instance_auto_clear(self, freq): + r"""!Check if frequency has exceeded timeout (called from doWork). + + @param freq: Frequency identifier + @return list: Incomplete packets if timeout exceeded, else None""" + with BoswatchModule._lock: + if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]: + return None + last_time = BoswatchModule._last_tone_ric_time.get(freq, 0) + if time.time() - last_time > self._auto_clear_timeout: + recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() + BoswatchModule._tone_ric_packets[freq].clear() + BoswatchModule._last_tone_ric_time.pop(freq, None) + logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts)) + return self._create_incomplete_multicast(freq, recipient_dicts) + return None + + @staticmethod + def _cleanup_hard_timeout_global(): + r"""!Global failsafe for really old packets (ignores instance config). + + @param None + @return None""" + with BoswatchModule._lock: + current_time = time.time() + max_hard_timeout = 120 + if BoswatchModule._instances: + max_hard_timeout = max(inst._hard_timeout for inst in BoswatchModule._instances) + for freq in list(BoswatchModule._tone_ric_packets.keys()): + BoswatchModule._tone_ric_packets[freq] = [ + p for p in BoswatchModule._tone_ric_packets[freq] + if current_time - p.get('_multicast_timestamp', 0) < max_hard_timeout + ] + if not BoswatchModule._tone_ric_packets[freq]: + del BoswatchModule._tone_ric_packets[freq] + +# ============================================================ +# TRIGGER SYSTEM +# ============================================================ + + def _send_wakeup_trigger(self, freq, fallback_ric): + r"""!Send a loopback trigger via socket to wake up the system. + + @param freq: Frequency identifier + @param fallback_ric: RIC to use if no explicit trigger RIC is configured + @return None""" + try: + trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric + payload = { + "timestamp": time.time(), + "mode": "pocsag", + "bitrate": "1200", + "ric": trigger_ric, + "subric": "1", + "subricText": "a", + "message": BoswatchModule._MAGIC_WAKEUP_MSG, + "clientName": "MulticastTrigger", + "inputSource": "loopback", + "frequency": freq + } + json_str = json.dumps(payload) + header = f"{len(json_str):<10}" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(1.0) + sock.connect((self._trigger_host, self._trigger_port)) + sock.sendall(header.encode('utf-8')) + sock.sendall(json_str.encode('utf-8')) + logging.debug("[%s] Wakeup trigger sent for freq %s (RIC=%s)", self.name, freq, trigger_ric) + except Exception as e: + logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e) + +# ============================================================ +# LIFECYCLE (End) +# ============================================================ + + def onUnload(self): + r"""!Unregister instance from the global cleanup process. + + @param None + @return None""" + with BoswatchModule._lock: + if self in BoswatchModule._instances: + BoswatchModule._instances.remove(self) + logging.debug("[%s] Multicast instance unloaded", self.name) diff --git a/plugin/pluginBase.py b/plugin/pluginBase.py index d0cc5b0..28b90e7 100644 --- a/plugin/pluginBase.py +++ b/plugin/pluginBase.py @@ -65,6 +65,15 @@ class PluginBase(ABC): The alarm() method serves the BOSWatch packet to the plugin. @param bwPacket: A BOSWatch packet instance""" + + # --- FIX: Multicast list support --- + if isinstance(bwPacket, list): + # if we got a list of packets, we have to run each packet through the complete alarm process (Setup -> Alarm -> Teardown) + for single_packet in bwPacket: + self._run(single_packet) + return None + # --------------------------------------------------------------------- + self._runCount += 1 logging.debug("[%s] run #%d", self._pluginName, self._runCount) From e53cb8cff2ed64d58ba8f4594c7fce1732436f92 Mon Sep 17 00:00:00 2001 From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com> Date: Thu, 26 Feb 2026 09:10:36 +0100 Subject: [PATCH 2/6] [feat/multicast] adding Config-Editor --- docu/docs/config-editor.html | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docu/docs/config-editor.html b/docu/docs/config-editor.html index aa6db3c..b50a0c3 100644 --- a/docu/docs/config-editor.html +++ b/docu/docs/config-editor.html @@ -569,6 +569,21 @@ router: ] }, + "multicast": { + kind: "module", + title: "Multicast", + creator: "BW3 Dev Team", + fields: [ + { key: "autoClearTimeout", type: "number", label: "autoClearTimeout", default: 10, required: false }, + { key: "delimiterRics", type: "text", label: "delimiterRics", required: false }, + { key: "textRics", type: "text", label: "textRics", required: false }, + { key: "netIdentRics", type: "text", label: "netIdentRics", required: false }, + { key: "triggerRic", type: "text", label: "triggerRic", required: false }, + { key: "triggerHost", type: "text", label: "triggerHost", default: "127.0.0.1", required: false }, + { key: "triggerPort", type: "number", label: "triggerPort", default: 8080, required: false }, + ] + }, + // Plugins "http": { kind: "plugin", From 7dabc786f61c76e1f9d9f82f75b3576b42cfa170 Mon Sep 17 00:00:00 2001 From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com> Date: Thu, 5 Mar 2026 13:10:31 +0100 Subject: [PATCH 3/6] [feat/multicast] correction of wakeup_trigger sending method to official architecture --- module/multicast.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/module/multicast.py b/module/multicast.py index d8ae05e..51e672b 100644 --- a/module/multicast.py +++ b/module/multicast.py @@ -18,12 +18,12 @@ r"""! import logging import time import threading -import socket import json import datetime from collections import defaultdict from module.moduleBase import ModuleBase from boswatch.packet import Packet +from boswatch.network.client import TCPClient logging.debug("- %s loaded", __name__) @@ -593,11 +593,7 @@ class BoswatchModule(ModuleBase): # ============================================================ def _send_wakeup_trigger(self, freq, fallback_ric): - r"""!Send a loopback trigger via socket to wake up the system. - - @param freq: Frequency identifier - @param fallback_ric: RIC to use if no explicit trigger RIC is configured - @return None""" + r"""!Send a loopback trigger using the standard TCPClient class.""" try: trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric payload = { @@ -613,13 +609,21 @@ class BoswatchModule(ModuleBase): "frequency": freq } json_str = json.dumps(payload) - header = f"{len(json_str):<10}" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.settimeout(1.0) - sock.connect((self._trigger_host, self._trigger_port)) - sock.sendall(header.encode('utf-8')) - sock.sendall(json_str.encode('utf-8')) - logging.debug("[%s] Wakeup trigger sent for freq %s (RIC=%s)", self.name, freq, trigger_ric) + + # using BOSWatch-Architecture + client = TCPClient(timeout=2) + if client.connect(self._trigger_host, self._trigger_port): + # 1. Send + client.transmit(json_str) + + # 2. Recieve (getting [ack] and prevents connection reset) + client.receive(timeout=1) + + client.disconnect() + logging.debug("[%s] Wakeup trigger sent and acknowledged (RIC=%s)", self.name, trigger_ric) + else: + logging.error("[%s] Could not connect to local server for wakeup", self.name) + except Exception as e: logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e) From 1ebcbf23e9d30edb8f5884d1098f6d40309b428e Mon Sep 17 00:00:00 2001 From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com> Date: Thu, 5 Mar 2026 21:39:27 +0100 Subject: [PATCH 4/6] [feat/multicast] Code quality fixes and cleanup - Remove redundant list-handling block in doWork() - already handled by moduleBase._run() - Fix bare except to except (ValueError, TypeError) in _copy_packet_dict_to_packet() - Replace f-string logging with lazy %-style in _enrich_normal_alarm() and _set_mcast_metadata() - Remove unused _trigger_ric_mode variable - Simplify instance name from dynamic MCAST_{id} to static "Multicast" (no debug value) - Update doWork() docstring to reflect single-packet-only parameter - Add extension hook comment to _perform_instance_tick() --- module/multicast.py | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/module/multicast.py b/module/multicast.py index 51e672b..55f8665 100644 --- a/module/multicast.py +++ b/module/multicast.py @@ -70,8 +70,7 @@ class BoswatchModule(ModuleBase): @param None @return None""" self._my_frequencies = set() - self.instance_id = hex(id(self))[-4:] - self.name = f"MCAST_{self.instance_id}" + self.name = "Multicast" self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10)) self._hard_timeout = self._auto_clear_timeout * 3 @@ -89,10 +88,8 @@ class BoswatchModule(ModuleBase): trigger_ric_cfg = self.config.get("triggerRic") if trigger_ric_cfg: self._trigger_ric = str(trigger_ric_cfg).strip() - self._trigger_ric_mode = "explicit" else: self._trigger_ric = None - self._trigger_ric_mode = "dynamic" self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST) self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT)) @@ -121,21 +118,8 @@ class BoswatchModule(ModuleBase): def doWork(self, bwPacket): r"""!Process an incoming packet and handle multicast logic. - @param bwPacket: A BOSWatch packet instance or list of packets + @param bwPacket: A BOSWatch packet instance @return bwPacket, a list of packets, or False if blocked""" - if isinstance(bwPacket, list): - result_packets = [] - for single_packet in bwPacket: - processed = self.doWork(single_packet) - if processed is None: - result_packets.append(single_packet) - elif processed is not False: - if isinstance(processed, list): - result_packets.extend(processed) - else: - result_packets.append(processed) - return result_packets if result_packets else None - packet_dict = self._get_packet_data(bwPacket) msg = packet_dict.get("message") ric = packet_dict.get("ric") @@ -341,7 +325,7 @@ class BoswatchModule(ModuleBase): dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S') dt_shifted = dt + datetime.timedelta(milliseconds=index - 1) packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S')) - except: + except (ValueError, TypeError): packet.set(k, str(v)) else: packet.set(k, str(v)) @@ -403,7 +387,7 @@ class BoswatchModule(ModuleBase): self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1) self._apply_list_tags(bwPacket, [packet_dict]) self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1") - logging.debug(f"Erstelle Single-Alarm für RIC {packet_dict.get('ric')}") + logging.debug("Erstelle Single-Alarm für RIC %s", packet_dict.get('ric')) return [bwPacket] def _handle_delimiter(self, freq, ric, bwPacket=None): @@ -443,7 +427,7 @@ class BoswatchModule(ModuleBase): @param count: Total number of recipients @param index: Current recipient index @return None""" - logging.debug(f"Setze Metadata - Mode: {mode}, Role: {role} für RIC: {source}") + logging.debug("Setze Metadata - Mode: %s, Role: %s für RIC: %s", mode, role, source) mapping = { "multicastMode": (mode, "{MCAST_MODE}"), "multicastRole": (role, "{MCAST_ROLE}"), @@ -507,6 +491,9 @@ class BoswatchModule(ModuleBase): def _perform_instance_tick(self): r"""!Tick-entry point for this specific instance. + Acts as an extension hook for future per-instance tick logic + (e.g. statistics, heartbeat, watchdog). Do not call directly. + @param None @return None""" self._check_all_my_frequencies() From 08d09b4f50a91a696427e026149159e930bd6485 Mon Sep 17 00:00:00 2001 From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com> Date: Sat, 28 Mar 2026 15:41:54 +0100 Subject: [PATCH 5/6] [feat/multicast] refactor: move packet filtering from module to downstream Remove internal filtering of delimiter and netident packets from the multicast module. All packets are now passed through with multicastRole metadata set, allowing downstream filters (e.g. filter.regexFilter) to handle filtering as needed. Tone-RICs remain internally consumed as they carry no alarm-relevant information outside the module. Update documentation to reflect new behavior and add regexFilter example for filtering by multicastRole. --- docu/docs/modul/multicast.md | 47 +++++++++++----- module/multicast.py | 100 ++++++++++++++--------------------- 2 files changed, 74 insertions(+), 73 deletions(-) diff --git a/docu/docs/modul/multicast.md b/docu/docs/modul/multicast.md index def5284..151c6e3 100644 --- a/docu/docs/modul/multicast.md +++ b/docu/docs/modul/multicast.md @@ -31,14 +31,14 @@ Das Modul unterstützt: - Mehrere Startmarker (Delimiter) - Mehrere Text-RICs -- Netzident-RIC zur Paketfilterung +- Netzident-RIC zur Paketmarkierung - Automatische Bereinigung alter Tone-RICs (Fehlerfall: Auto-Clear) - Active Trigger System zur verlustfreien Paketauslieferung - Wildcards für spätere Weiterverarbeitung - Frequenz-basierte Trennung - Multi-Instanz-Betrieb mit geteiltem Zustand -Hinweis: Der Delimiter-RIC (0123456) wird automatisch gefiltert und nicht ausgegeben. +Hinweis: Der Delimiter-RIC (0123456) wird mit multicastRole: delimiter markiert und durchgereicht. Downstream-Filter (z.B. filter.regexFilter) können ihn bei Bedarf ausfiltern. **Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Falls keine Tone-RICs vorhanden sind, wird die Text-RIC als `multicastMode: single` ausgegeben. @@ -53,9 +53,9 @@ Hinweis: Der Delimiter-RIC (0123456) wird automatisch gefiltert und nicht ausgeg |Feld|Beschreibung|Default| |----|------------|-------| |autoClearTimeout|Auto-Clear Timeout in Sekunden - Nicht zugestellte Empfänger werden nach dieser Zeit als incomplete ausgegeben|10| -|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden automatisch gefiltert wenn konfiguriert)|leer| +|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden mit multicastRole: delimiter markiert)|leer| |textRics|Komma-getrennte Liste von RICs, die den Alarmtext tragen|leer| -|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden automatisch gefiltert wenn konfiguriert)|leer| +|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden mit multicastRole: netident markiert)|leer| |triggerRic|RIC für das Wakeup-Trigger-Paket (optional, bei leer: dynamisch = erste Tone-RIC)|leer| |triggerHost|IP-Adresse für Loopback-Trigger|127.0.0.1| |triggerPort|Port für Loopback-Trigger|8080| @@ -110,7 +110,7 @@ Verwendet eine feste RIC (9999999) für das interne Wakeup-Trigger-Paket. textRics: '0299001,0310001' netIdentRics: '0000001' ``` -Filtert Netzident-Pakete (RIC 0000001) automatisch aus der Weiterverarbeitung. +Markiert Netzident-Pakete (RIC 0000001) mit multicastRole: netident. Downstream-Filter können sie gezielt ausfiltern (z.B. RegEx-Filter). ## Integration in Router-Konfiguration @@ -241,10 +241,10 @@ router: - `multicastRole` (string): Beschreibt die Rolle dieses Pakets innerhalb des Multicast-Ablaufs, besitzt einen der Werte: - - `delimiter`: Startmarker-Paket (wird automatisch gefiltert wenn delimiterRics konfiguriert) + - `delimiter`: Startmarker-Paket - `recipient`: tatsächlicher Empfänger - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC) - - `netident`: Netzwerk-Identifikations-Paket (wird automatisch gefiltert wenn netIdentRics konfiguriert) + - `netident`: Netzwerk-Identifikations-Paket - `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs - `multicastRecipientCount` (string): Anzahl der Empfänger insgesamt @@ -262,7 +262,7 @@ router: - `message`: Bei incomplete-Modus leer, sonst Text von Text-RIC ### Rückgabewerte: -- **False**: Paket wurde blockiert (z.B. Delimiter/Netident-Filterung), Router stoppt Verarbeitung +- **False**: Paket wurde intern konsumiert (z.B. Tone-RIC wurde in den Buffer aufgenommen), Router stoppt Verarbeitung für dieses Paket - **Liste von Paketen**: Multicast-Verteilung, Router verarbeitet jedes Paket einzeln - **None**: Normaler Alarm, Router fährt mit unveränderten Paket fort @@ -378,11 +378,34 @@ Ausgegebene Multicast-Pakete: → RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!) ``` -### Automatische Filterung +### Paketmarkierung statt interner Filterung -- **Delimiter-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `delimiterRics` konfiguriert ist -- **Netzident-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `netIdentRics` konfiguriert ist -- **Filterung nach multicastRole**: Ermöglicht saubere nachgelagerte Verarbeitung ohne manuelle Filter +Das Modul filtert keine inhaltlich relevanten Pakete. +Alle Pakete mit Alarminhalt werden mit `multicastRole` markiert und +weitergereicht. Die Filterung nach Bedarf erfolgt nachgelagert, +z.B. mit `filter.regexFilter`. + +Eine Ausnahme bilden **Tone-RICs** (leere Nachrichten): Diese werden +intern im Buffer gesammelt und bei einem complete-Alarm in die +Listenfelder aggregiert. Sie erscheinen nie als eigenständige Pakete +im Router. + +Pakete und ihre Rollen: +- **Delimiter-Pakete**: Erhalten `multicastRole: delimiter` +- **Netzident-Pakete**: Erhalten `multicastRole: netident` +- **Empfänger-Pakete**: Erhalten `multicastRole: recipient` +- **Einzelalarme**: Erhalten `multicastRole: single` + +Beispiel-Filter um Delimiter und Netident auszublenden: +```yaml +- type: module + res: filter.regexFilter + config: + - name: "Nur echte Alarme" + checks: + - field: multicastRole + regex: ^(recipient|single)$ +``` ### Multi-Instanz-Betrieb diff --git a/module/multicast.py b/module/multicast.py index 55f8665..4cecd54 100644 --- a/module/multicast.py +++ b/module/multicast.py @@ -10,7 +10,7 @@ r"""! by Bastian Schroll @file: multicast.py -@date: 26.01.2025 +@date: 28.03.2026 @author: Claus Schichl @description: multicast module """ @@ -70,7 +70,8 @@ class BoswatchModule(ModuleBase): @param None @return None""" self._my_frequencies = set() - self.name = "Multicast" + self.instance_id = hex(id(self))[-4:] + self.name = f"MCAST_{self.instance_id}" self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10)) self._hard_timeout = self._auto_clear_timeout * 3 @@ -94,9 +95,6 @@ class BoswatchModule(ModuleBase): self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST) self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT)) - self._block_delimiter = bool(self._delimiter_rics) - self._block_netident = bool(self._netident_rics) - logging.info("[%s] Multicast module loaded", self.name) with BoswatchModule._lock: @@ -118,28 +116,44 @@ class BoswatchModule(ModuleBase): def doWork(self, bwPacket): r"""!Process an incoming packet and handle multicast logic. - @param bwPacket: A BOSWatch packet instance - @return bwPacket, a list of packets, or False if blocked""" + Enriches packets with multicast metadata (mode, role, source). + Does NOT filter - all packets pass through, downstream modules handle filtering. + + @param bwPacket: A BOSWatch packet instance or list of packets + @return bwPacket, a list of packets, or None if no processing""" + if isinstance(bwPacket, list): + result_packets = [] + for single_packet in bwPacket: + processed = self.doWork(single_packet) + if processed is not None and processed is not False: + if isinstance(processed, list): + result_packets.extend(processed) + else: + result_packets.append(processed) + return result_packets if result_packets else None + packet_dict = self._get_packet_data(bwPacket) msg = packet_dict.get("message") ric = packet_dict.get("ric") freq = packet_dict.get("frequency", "default") mode = packet_dict.get("mode") + # Handle wakeup triggers if msg == BoswatchModule._MAGIC_WAKEUP_MSG: if self._trigger_ric and ric != self._trigger_ric: - pass - else: - logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric) - queued = self._get_queued_packets() - return queued if queued else False + return None + logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric) + queued = self._get_queued_packets() + return queued if queued else None + # Only process POCSAG if mode != "pocsag": queued = self._get_queued_packets() return queued if queued else None self._my_frequencies.add(freq) + # Determine if this is a text-RIC is_text_ric = False if self._text_rics: is_text_ric = ric in self._text_rics and msg and msg.strip() @@ -155,21 +169,23 @@ class BoswatchModule(ModuleBase): queued_packets = self._get_queued_packets() incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq) + # === CONTROL PACKETS (netident, delimiter) === + # Mark and pass through - no filtering! + if self._netident_rics and ric in self._netident_rics: self._set_mcast_metadata(bwPacket, "control", "netident", ric) - result = self._combine_results(incomplete_packets, queued_packets, [bwPacket]) - return self._filter_output(result) + return self._combine_results(incomplete_packets, queued_packets, [bwPacket]) if self._delimiter_rics and ric in self._delimiter_rics: delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket) - result = self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets) - return self._filter_output(result) + return self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets) + # === TONE-RICs (no message) === if not msg or not msg.strip(): self._add_tone_ric_packet(freq, packet_dict) - result = self._combine_results(incomplete_packets, queued_packets, False) - return self._filter_output(result) + return self._combine_results(incomplete_packets, queued_packets, False) + # === TEXT-RICs (with message) === if is_text_ric and msg: logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric) alarm_packets = self._distribute_complete(freq, packet_dict) @@ -180,18 +196,16 @@ class BoswatchModule(ModuleBase): if not alarm_packets: logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric) normal = self._enrich_normal_alarm(bwPacket, packet_dict) - result = self._combine_results(normal, incomplete_packets, queued_packets) + return self._combine_results(normal, incomplete_packets, queued_packets) else: - result = self._combine_results(alarm_packets, incomplete_packets, queued_packets) - return self._filter_output(result) + return self._combine_results(alarm_packets, incomplete_packets, queued_packets) + # === SINGLE ALARM (message but no text-RICs configured) === if msg: normal = self._enrich_normal_alarm(bwPacket, packet_dict) - result = self._combine_results(normal, incomplete_packets, queued_packets) - return self._filter_output(result) + return self._combine_results(normal, incomplete_packets, queued_packets) - result = self._combine_results(incomplete_packets, queued_packets) - return self._filter_output(result) + return self._combine_results(incomplete_packets, queued_packets) # ============================================================ # PACKET PROCESSING HELPERS (called by doWork) @@ -220,31 +234,6 @@ class BoswatchModule(ModuleBase): logging.warning("[%s] Error: %s", self.name, e) return {} - def _filter_output(self, result): - r"""!Apply multicastRole filtering before output. - - @param result: Single packet, list of packets, None or False - @return Final packet(s) or False if blocked""" - if result is None or result is False: - return result - - def get_role(packet): - """Helper to extract multicastRole from Packet object""" - packet_dict = self._get_packet_data(packet) - return packet_dict.get("multicastRole") - - if isinstance(result, list): - filtered = [p for p in result if self._should_output_packet(get_role(p))] - if not filtered: - logging.debug("All packets filtered out by multicastRole") - return False - return filtered if len(filtered) > 1 else filtered[0] - else: - if self._should_output_packet(get_role(result)): - return result - logging.debug("Packet filtered out: multicastRole=%s", get_role(result)) - return False - def _combine_results(self, *results): r"""!Combine multiple result sources into a single list or status. @@ -266,17 +255,6 @@ class BoswatchModule(ModuleBase): return combined return False if has_false else None - def _should_output_packet(self, multicast_role): - r"""!Check if packet should be output based on role. - - @param multicast_role: The role string to check - @return bool: True if allowed""" - if self._block_delimiter and multicast_role == "delimiter": - return False - if self._block_netident and multicast_role == "netident": - return False - return True - # ============================================================ # TONE-RIC BUFFER MANAGEMENT # ============================================================ From dada0d635b43373189942efbcdcafeded4cec0f1 Mon Sep 17 00:00:00 2001 From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com> Date: Sun, 12 Apr 2026 15:27:41 +0200 Subject: [PATCH 6/6] [feat/multicast]: refactor: isolate module state by moving class variables to instance scope This commit refactors the internal state management to ensure true multi-instance capability. While previous commits shared state via class variables, this change encapsulates all buffers and flags within the individual instance. Key changes: - Migration of state: Moved `_tone_ric_packets`, `_last_tone_ric_time`, and processing flags from class variables to instance variables (`self`). - Thread Isolation: Shifted the cleanup logic to a per-instance `_cleanup_worker` thread, ensuring that timeouts are managed independently for each route/configuration. - Wildcard Safety: Isolated `_wildcards_registered` to prevent registration conflicts between multiple multicast instances. - Robust Hard-Timeout: Simplified `_cleanup_hard_timeout` to act strictly on the instance's own state. This refactoring resolves the "architectural dinosaur" of shared class state, making the module fully thread-safe and reliable for complex multi-route and multi-frequency deployments. --- docu/docs/modul/multicast.md | 384 +++++++++++++++++++---------------- module/multicast.py | 210 ++++++++----------- 2 files changed, 296 insertions(+), 298 deletions(-) diff --git a/docu/docs/modul/multicast.md b/docu/docs/modul/multicast.md index 151c6e3..6cd8463 100644 --- a/docu/docs/modul/multicast.md +++ b/docu/docs/modul/multicast.md @@ -2,45 +2,30 @@ --- ## Beschreibung -Mit diesem Modul können Multicast-Alarme verarbeitet werden. Dabei wird eine Alarmnachricht automatisch an mehrere Empfänger (RICs) verteilt. +Das Multicast-Modul verarbeitet komplexe Alarmsequenzen, bei denen eine Nachricht (Text-RIC) an eine Liste zuvor gesendeter Empfänger (Tone-RICs) verteilt wird. Es sorgt dafür, dass jeder Empfänger ein individuelles Paket mit dem Alarmtext erhält. -### Funktionsweise -Multicast-Alarme funktionieren in zwei bis drei Phasen: - -1. **Delimiter-Phase (Optional)**: Ein spezieller Delimiter-RIC markiert den Start eines neuen Multicast-Blocks und löscht vorherige wartende Tone-RICs. Der Delimiter selbst wird nicht als Alarm ausgegeben (automatische Filterung). Diese Phase ist optional - ohne Delimiter werden alle leeren Nachrichten als Tone-RICs behandelt. - -2. **Tone-RIC-Phase**: Mehrere RICs empfangen (meist) leere Nachrichten. Diese definieren die Empfänger und "registrieren" sich im Modul als Empfänger für die nächste Multicast-Nachricht. - -3. **Text-RIC**: Ein spezieller Message-RIC empfängt die eigentliche Alarmnachricht. Diese wird dann automatisch an alle zuvor gesammelten Tone-RICs verteilt. - -**Beispiel:** -``` -10:31:16 - RIC: 0123456 SubRIC: 1 Message: (leer) → Delimiter-RIC -10:31:16 - RIC: 0234567 SubRIC: 4 Message: (leer) → Empfänger 1 -10:31:16 - RIC: 0345678 SubRIC: 3 Message: (leer) → Empfänger 2 -10:31:17 - RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" → Message-RIC - -Generierte Alarme: -→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" -→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" -``` - -**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC, da diese oft unterschiedliche Alarmtypen oder Prioritäten repräsentiert. +Das Modul filtert keine inhaltlich relevanten Pakete. Alle Pakete mit Alarminhalt werden mit `multicastRole` markiert und weitergereicht. Die Filterung nach Bedarf erfolgt nachgelagert, z.B. mit `filter.regexFilter`. Das Modul unterstützt: -- Mehrere Startmarker (Delimiter) -- Mehrere Text-RICs -- Netzident-RIC zur Paketmarkierung -- Automatische Bereinigung alter Tone-RICs (Fehlerfall: Auto-Clear) -- Active Trigger System zur verlustfreien Paketauslieferung -- Wildcards für spätere Weiterverarbeitung -- Frequenz-basierte Trennung -- Multi-Instanz-Betrieb mit geteiltem Zustand +- **Multi-Instance Support:** Vollständige Isolation bei parallelem Betrieb in verschiedenen Routen. +- **Frequenz-Trennung:** Verhindert die Vermischung von Alarmen auf unterschiedlichen Kanälen. +- **Active Trigger System:** Nutzt TCP-Loopback, um auch bei Inaktivität des Funkkanals Timeouts sicher zu verarbeiten. +- **Dynamische Listen:** Generiert aggregierte Listenfelder (z. B. {RIC_LIST}) für Sammel-Alarmierungen. +- **Metadaten-Enrichment:** Markiert Pakete präzise für nachgelagerte Filter (z. B. RegEx). -Hinweis: Der Delimiter-RIC (0123456) wird mit multicastRole: delimiter markiert und durchgereicht. Downstream-Filter (z.B. filter.regexFilter) können ihn bei Bedarf ausfiltern. +### Funktionsweise +Multicast-Alarme funktionieren in zwei bis vier Phasen: +**Wichtig:** Das Modul arbeitet verzögert bei der Ausgabe der Text-RICs, um die Pakete anzureichern. -**Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Falls keine Tone-RICs vorhanden sind, wird die Text-RIC als `multicastMode: single` ausgegeben. +1. **Delimiter-Phase (Optional)**: Ein spezieller Delimiter-RIC markiert den Start eines neuen Multicast-Blocks. Er wird als technisches Paket (`multicastRole: delimiter`) **sofort** durchgereicht, leert aber intern den RAM-Puffer für neue Empfänger. Diese Phase ist optional - ohne Delimiter werden alle leeren Nachrichten als Tone-RICs behandelt. Downstream-Filter (z.B. filter.regexFilter) können ihn bei Bedarf ausfiltern. + +2. **Tone-RIC-Phase**: Eingehende leere Nachrichten werden **nicht direkt als Pakete ausgegeben**, sondern im RAM zwischengespeichert. Das Modul gibt hier `False` zurück, wodurch der Router die Verarbeitung für dieses spezifische Paket vorerst pausiert. + +3. **Text-RIC**: Ein spezieller Message-RIC empfängt die eigentliche Alarmnachricht. Sobald eine Text-RIC empfangen wird, "kopiert" das Modul diesen Text in jedes einzelne der gespeicherten Tone-RIC-Pakete. Diese werden dann als **Liste von Paketen** gesammelt an den Router als `multicastMode: complete` übergeben. Falls keine Tone-RICs im Puffer liegen (z.B. Einzelalarm), wird die Text-RIC als `multicastMode: single` ausgegeben. +**Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Ausnahme: Einzelalarm (Single) + +4. **Timeout-Phase (Auto-Clear - Optional):** Läuft der `autoClearTimeout` ab, ohne dass ein Text-RIC eintrifft, werden die gepufferten RICs als `multicastMode: incomplete` (ohne Text) ausgegeben. ## Unterstützte Alarmtypen - POCSAG @@ -52,15 +37,15 @@ Hinweis: Der Delimiter-RIC (0123456) wird mit multicastRole: delimiter markiert |Feld|Beschreibung|Default| |----|------------|-------| -|autoClearTimeout|Auto-Clear Timeout in Sekunden - Nicht zugestellte Empfänger werden nach dieser Zeit als incomplete ausgegeben|10| -|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden mit multicastRole: delimiter markiert)|leer| +|autoClearTimeout|Zeit in Sekunden, nach der Tone-RICs ohne Text-Eingang als `incomplete` ausgegeben werden|10| +|delimiterRics|Komma-getrennte Liste von Startmarkern (leert Puffer, `multicastRole: delimiter`)|leer| |textRics|Komma-getrennte Liste von RICs, die den Alarmtext tragen|leer| -|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden mit multicastRole: netident markiert)|leer| +|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (`multicastRole: netident`)|leer| |triggerRic|RIC für das Wakeup-Trigger-Paket (optional, bei leer: dynamisch = erste Tone-RIC)|leer| |triggerHost|IP-Adresse für Loopback-Trigger|127.0.0.1| -|triggerPort|Port für Loopback-Trigger|8080| +|triggerPort|Port für Loopback-Trigger (entspricht meist Server-Port)|8080| -**Achtung:** Zahlen welche führende Nullen enthalten müssen in Anführungszeichen gesetzt werden, z.B. `'0012345'` +**Hinweis:** Zahlen mit führenden Nullen müssen in Anführungszeichen gesetzt werden, z.B. `'0012345'`. ### Konfigurationsbeispiel 1: Automatische Delimiter-Erkennung (oder nicht verfügbar im Netzwerk) (= Minimalkonfiguration) ```yaml @@ -112,9 +97,156 @@ Verwendet eine feste RIC (9999999) für das interne Wakeup-Trigger-Paket. ``` Markiert Netzident-Pakete (RIC 0000001) mit multicastRole: netident. Downstream-Filter können sie gezielt ausfiltern (z.B. RegEx-Filter). +--- +## Modul Abhängigkeiten +- keine + +--- +## Externe Abhängigkeiten +- keine + +--- +## Paket Modifikationen + +### Hinzugefügte Felder +- `multicastMode`(string): Beschreibt das Ergebnis der Multicast-Verarbeitung, besitzt einen der Werte: + - `complete`: Vollständiges Multicast-Packet + - `incomplete`: Unvollständiges Multicast-Packet (meist fehlt die Text-RIC) (Timeout) + - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC) + - `control`: Netzwerk-Ident-RIC oder andere Verwaltung-RICs (Technik) +- `multicastRole`: + - `delimiter` + - `netident` + - `recipient` (Empfänger) + - `single` +- `multicastRecipientIndex` (string): Index dieses Empfängers (1-N), folgende Logik: + - Bei **recipient**: Zählt hoch (z.B. 1 von 5, 2 von 5...) + - Bei **delimiter / netident / single**: Immer **1**, da sie als eigenständige technische Pakete zählen +- `multicastRecipientCount` (string): Gesamtanzahl der Empfänger des Multicasts +- `_list` (string): Liste von Werten aus allen Empfänger-RICs für jedes Originalfeld (z.B. `ric_list`, `subric_list`) + +### Ergänzte Felder (von Text-RIC an Tone-RIC): +- `message`: Der Text wird aus der Text-RIC übernommen und in die Empfänger-Pakete eingefügt (Bei incomplete-Modus leer) +- `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs + +### Erhaltene Felder (Tone-RIC): +Diese Felder bleiben **unverändert** bestehen, damit die Zuordnung zum Endgerät korrekt bleibt: +- `ric` +- `subric` +- alle bereits zuvor hinzugefügten Felder (z.B. Descriptor-Modul) + +### Rückgabewerte: +- **False**: Paket wurde intern konsumiert (z.B. Tone-RIC wurde in den Buffer aufgenommen), Router stoppt Verarbeitung für dieses Paket (Verhindert die Ausgabe leerer Nachrichten). Allerdings: Das Paket wird im RAM geparkt. +- **Liste von Paketen**: Tritt ein, sobald eine Text-RIC die Verteilung auslöst oder ein Timeout abläuft. Der Router verarbeitet jedes Element der Liste (die nun angereicherten Tone-RICs) als eigenständigen Alarm. +- **None**: Der Router verarbeitet das Original-Paket normal weiter. + +--- + +## Zusätzliche Wildcards + +Folgende Wildcards stehen in allen nachfolgenden Plugins zur Verfügung: + +|Wildcard|Beschreibung|Beispiel| +|--------|------------|--------| +|{MCAST_SOURCE}|RIC des ursprünglichen Message-RICs|0299001| +|{MCAST_COUNT}|Gesamtanzahl der Empfänger dieses Multicasts.|3| +|{MCAST_INDEX}|Index des Empfängers (1-basiert für Recipients, 0 für Control-Pakete)|0, 1, 2, 3, ...| +|{MCAST_MODE}|Art der Multicast-Verarbeitung durch das Modul|complete, incomplete, single, control| +|{MCAST_ROLE}|Rolle des Pakets im Multicast-Ablauf|recipient, single, delimiter, netident| + +### Erweiterung der Listen-Wildcards +Das Modul generiert Wildcards für alle gesammelten Felder (RICs, SubRICs, etc.) in Listenform. Diese sind besonders nützlich, um eine kombinierte Ausgabe (z.B. in Telegram) zu erstellen. Im Folgenden ein paar Beispiele: + +|Wildcard|Beschreibung|Zugrundeliegendes Feld|Beispiel| +|--------|------------|--------|--------| +|{RIC_LIST}|Liste aller RICs der Empfänger (durch Komma getrennt).|ric_list|"0299001, 0299002"| +|{SUBRIC_LIST}|Liste aller SubRICs der Empfänger|subric_list|"4, 3"| +|{DESCRIPTION_LIST}|Liste aller (deskriptiven) Namen der Empfänger (BEISPIEL! **NUR** bei vorher durchlaufenen Descriptor-Modul)|description_list|"FF Musterstadt, BF Beispiel"| +|{_LIST}|Liste der Werte für jedes Originalfeld aus dem Paket|_list|{FREQUENCY_LIST}, {BITRATE_LIST}| + +**Wichtig:** Verwende die **originalen Feldnamen** (z.B. `frequency_list`), nicht die Wildcard-Namen (z.B. ~~`FREQ_list`~~). + +### Verwendungsbeispiel in Plugins, z.B. Telegram-Plugin: +```yaml +- type: plugin + res: telegram + config: + message_pocsag: | + {CNAME} + {MSG} + RIC: {RIC} / SubRIC: {SRIC} + Multicast: {MCAST_INDEX}/{MCAST_COUNT} (Quelle: {MCAST_SOURCE}) + {TIME} +``` + +--- +# Funktionsweise im Detail + +## Grundsätzliche Funktion + +**Beispiel:** +``` +10:31:16 - RIC: 0123456 SubRIC: 1 Message: (leer) → Delimiter-RIC +10:31:16 - RIC: 0234567 SubRIC: 4 Message: (leer) → Empfänger 1 +10:31:16 - RIC: 0345678 SubRIC: 3 Message: (leer) → Empfänger 2 +10:31:17 - RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" → Message-RIC + +Generierte Alarme: +→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" (behält SubRIC 4!) +→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!) +``` + +**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC, da diese oft unterschiedliche Alarmtypen oder Prioritäten repräsentiert. + +### Logik der hinzugefügten Felder + +Um die Logik der Felder multicastMode, multicastRole, etc. zu verstehen, hilft eine tabellarische Gegenüberstellung: + +**1) Szenario Der "Feldstärke-Alarm" (Netident/Delimiter)** + +|Paket-Typ|RIC|multicastMode|multicastRole|sourceRic|Index|Count| +|--------|-------|-------|---------|------|--|--| +|Delimiter|0288088|control|delimiter|0288088|1|1| +|Netzident|0000001|control|netident|0000001|1|1| + +Hinweis: Diese Pakete dienen der Systemsteuerung. Der Index ist immer 1, da sie "Einzelereignisse" im technischen Ablauf sind. +Beide RIC werden unmittelbar nach der Verarbeitung weitergereicht, d.h. es wird nicht auf die Netzident-RIC gewartet, um die Delmitier-RIC weiterzureichen. + +**2) Szenario Echter Multicast-Alarm (Vollständig)** +Hier sieht man den Ablauf: +- Der Delimiter leert den Speicher und wird mit den ergänzenden Feldern angereichert und sofort weitergegeben. +- Zwei Tone-RICs sammeln sich an +- Die Text-RIC löst die Verteilung aus + +**Beachte:** Die Text-RIC (0456789) dient als Nachrichtenträger und erscheint nicht als eigenes Paket im Output. + +|Phase|Paket-Typ|RIC|multicastMode|multicastRole|sourceRic|Index|Count| +|-----|---------|---|-------------|-------------|---------|-----|-----| +|Start|Delimiter|0288088|control|delimiter|0288088|1|1| +|Sammler|Tone-RIC 1|0234567|-|(interner Buffer)|-|-|-| +|Sammler|Tone-RIC 2|0345678|-|(interner Buffer)|-|-|-| +|Auslöser|Text-RIC|0456789|(wird verteilt - keine Ausgabe)|(Nachrichtenträger)|-|-|-| +|Output 1|Alarm-Paket|0234567|complete|recipient|0456789|1|2| +|Output 2|Alarm-Paket|0345678|complete|recipient|0456789|2|2| + +**3) Szenario Unvollständiger Alarm (Incomplete / Timeout)** +In diesem Fall fehlt die Text-RIC. Das System wartet bis zum Timeout und schickt dann die Empfänger mit leerer Nachricht raus (getriggert durch das Active Trigger System). + +|Phase|Paket-Typ|RIC|multicastMode|multicastRole|sourceRic|Index|Count| +|-----|---------|---|-------------|-------------|---------|-----|-----| +|Start|Delimiter|0288088|control|delimiter|0288088|1|1| +|Sammler|Tone-RIC 1|0234567|-|(interner Buffer)|-|-|-| +|Sammler|Tone-RIC 2|0345678|-|(interner Buffer)|-|-|-| +|Event|Timeout|Kein Text|Auto-Clear nach standardmäßig 10s|-|-|-|-| +|Output 1|Incomplete|0234567|incomplete|recipient|0234567|1|2| +|Output 2|Incomplete|0345678|incomplete|recipient|0234567|2|2| + +Der Delimiter wird mit den ergänzenden Feldern angereichert und sofort weitergegeben. + +--- ## Integration in Router-Konfiguration -Das Multicast-Modul sollte **vor** den Plugins platziert werden, damit die generierten Alarme korrekt verarbeitet werden: +Das Multicast-Modul muss **vor** den Plugins platziert werden, damit die generierten Alarme korrekt verarbeitet werden: ```yaml - name: Router POCSAG @@ -153,7 +285,7 @@ Dies ermöglicht es, entweder jede RIC einzeln zu verarbeiten oder die Listenfel Die folgenden Beispiele dienen zur Veranschaulichung der Möglichkeiten des Multicast-Modul in Verbindung mit RegEx-Filter. -### Beispiel (Zusätzliche Wildcards werden noch später in diesem Readme erklärt): +### Beispiel (siehe auch "Zusätzliche Wildcards"): ```yaml router: - name: Router POCSAG @@ -162,10 +294,6 @@ router: res: filter.modeFilter config: [...] - - type: module - res: filter.doubleFilter - config: - [...] - type: module res: descriptor config: @@ -177,6 +305,10 @@ router: autoClearTimeout: 10 delimiterRics: '0123456' # Start eines Multicast-Alarms textRics: '9909909' # Text-RIC + - type: module + res: filter.doubleFilter + config: + [...] - type: router res: RouterMySQL - type: router @@ -221,101 +353,15 @@ router: ``` --- -## Modul Abhängigkeiten -- keine ---- -## Externe Abhängigkeiten -- keine +## Das Active Trigger System (Verlustfreie Paketauslieferung) +BOSWatch arbeitet **synchron**. Das bedeutet: Der Router "schläft", wenn kein Funk-Paket von außen eingeht. Ein Timeout im Hintergrund-Thread des Moduls kann den schlafenden Router nicht von alleine aufwecken, um die im RAM wartenden Pakete (`incomplete`) herauszuschieben. ---- -## Paket Modifikationen +**Lösung:** +Das Modul verwendet ein aktives Trigger-System, um sicherzustellen, dass **keine Multicast-Pakete verloren gehen** -### Hinzugefügte Felder bei Multicast-Alarmen: -- `multicastMode`(string): Beschreibt das Ergebnis der Multicast-Verarbeitung, besitzt einen der Werte: - - - `complete`: Vollständiges Multicast-Packet - - `incomplete`: Unvollständiges Multicast-Packet (meist fehlt die Text-RIC) - - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC) - - `control`: Netzwerk-Ident-RIC oder andere Verwaltung-RICs - -- `multicastRole` (string): Beschreibt die Rolle dieses Pakets innerhalb des Multicast-Ablaufs, besitzt einen der Werte: - - - `delimiter`: Startmarker-Paket - - `recipient`: tatsächlicher Empfänger - - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC) - - `netident`: Netzwerk-Identifikations-Paket - -- `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs -- `multicastRecipientCount` (string): Anzahl der Empfänger insgesamt -- `multicastRecipientIndex` (string): Index dieses Empfängers (1-N), folgende Logik: - - - Empfänger haben den Index 1 bis n. - - Delimiter/Singles haben Index 1 (da sie alleinstehen). - -- `_list` (string): Liste von Werten aus allen Empfänger-RICs für jedes Originalfeld (z.B. ric_list, message_list) - -### Veränderte Felder bei Multicast-Alarmen: -- `ric`: Wird durch Empfänger-RIC ersetzt -- `subric`: Wird durch Empfänger-SubRIC ersetzt -- `subricText`: Wird durch Empfänger-SubRIC-Text ersetzt -- `message`: Bei incomplete-Modus leer, sonst Text von Text-RIC - -### Rückgabewerte: -- **False**: Paket wurde intern konsumiert (z.B. Tone-RIC wurde in den Buffer aufgenommen), Router stoppt Verarbeitung für dieses Paket -- **Liste von Paketen**: Multicast-Verteilung, Router verarbeitet jedes Paket einzeln -- **None**: Normaler Alarm, Router fährt mit unveränderten Paket fort - ---- -## Zusätzliche Wildcards - -Folgende Wildcards stehen in allen nachfolgenden Plugins zur Verfügung: - -|Wildcard|Beschreibung|Beispiel| -|--------|------------|--------| -|{MCAST_SOURCE}|RIC des ursprünglichen Message-RICs|0299001| -|{MCAST_COUNT}|Gesamtanzahl der Empfänger dieses Multicasts.|3| -|{MCAST_INDEX}|Index des Empfängers (1-basiert für Recipients, 0 für Control-Pakete)|0, 1, 2, 3, ...| -|{MCAST_MODE}|Art der Multicast-Verarbeitung durch das Modul|complete, incomplete, single, control| -|{MCAST_ROLE}|Rolle des Pakets im Multicast-Ablauf|recipient, single, delimiter, netident| - -## Erweiterung der Listen-Wildcards -Das Modul generiert Wildcards für alle gesammelten Felder (RICs, SubRICs, etc.) in Listenform. Diese sind besonders nützlich, um eine kombinierte Ausgabe (z.B. in Telegram) zu erstellen: - -|Wildcard|Beschreibung|Zugrundeliegendes Feld|Beispiel| -|--------|------------|--------|--------| -|{RIC_LIST}|Liste aller RICs der Empfänger (durch Komma getrennt).|ric_list|"0299001, 0299002"| -|{SUBRIC_LIST}|Liste aller SubRICs der Empfänger|subric_list|"4, 3"| -|{DESCRIPTION_LIST}|Liste aller (deskriptiven) Namen der Empfänger.|description_list|"FF Musterstadt, BF Beispiel"| -|{_LIST}|Liste der Werte für jedes Originalfeld aus dem Paket|_list|{FREQUENCY_LIST}, {BITRATE_LIST}| - -**Wichtig:** Verwende die **originalen Feldnamen** (z.B. `frequency_list`), nicht die Wildcard-Namen (z.B. ~~`FREQ_list`~~). - -### Verwendungsbeispiel in Plugins, z.B. Telegram-Plugin: -```yaml -- type: plugin - res: telegram - config: - message_pocsag: | - {CNAME} - {MSG} - RIC: {RIC} / SubRIC: {SRIC} - Multicast: {MCAST_INDEX}/{MCAST_COUNT} (Quelle: {MCAST_SOURCE}) - {TIME} -``` - ---- -## Funktionsweise im Detail - -### Active Trigger System (Verlustfreie Paketauslieferung) - -Das Modul verwendet ein aktives Trigger-System, um sicherzustellen, dass **keine Multicast-Pakete verloren gehen**: - -1. **Deferred Delivery**: Bei einem Auto-Clear-Timeout werden die incomplete-Pakete nicht sofort ausgegeben, sondern in einer internen Queue gespeichert. - -2. **Wakeup-Trigger**: Das Modul sendet ein spezielles Trigger-Paket via Loopback-Socket (Standard: 127.0.0.1:8080) zurück an den BOSWatch-Server. - -3. **Queue-Flush**: Beim Empfang des Trigger-Pakets werden alle gespeicherten Pakete aus der Queue ausgegeben. +**Ausführung** +Das Modul sendet über via TCP (Loopback) ein minimales Trigger-Paket an den eigenen BOSWatch-Server. Dieser empfängt es wie einen normalen Funk-Alarm, weckt den Router auf und das Modul kann die wartenden Alarme (`incomplete`) sicher ausliefern. **Trigger-RIC Auswahl** (3-stufige Fallback-Kette): - **Explizit**: Wenn `triggerRic` konfiguriert ist, wird diese RIC verwendet @@ -341,8 +387,8 @@ Das Modul verwendet ein aktives Trigger-System, um sicherzustellen, dass **keine ### Zeitbasierte Verarbeitung 1. **Tone-RIC-Sammlung**: Tone-RICs (meist leere Nachrichten) werden empfangen und gespeichert -2. **Auto-Clear**: Nach `autoClearTimeout` Sekunden ohne Text-RIC werden die Tone-RICs als incomplete ausgegeben (via Trigger-System) -3. **Text-RIC-Verteilung**: Sobald ein Text-RIC empfangen wird, erfolgt die sofortige Verteilung an alle gesammelten Tone-RICs +2. **Text-RIC-Verteilung**: Sobald ein Text-RIC empfangen wird, erfolgt die sofortige Verteilung an alle gesammelten Tone-RICs +3. **Auto-Clear**: Nach `autoClearTimeout` Sekunden ohne Text-RIC werden die Tone-RICs als incomplete ausgegeben (via Trigger-System) 4. **Hard-Timeout-Cleanup**: Nach 3x `autoClearTimeout` (oder max. 120s) werden veraltete Pakete aus dem Speicher gelöscht (Failsafe) ### Frequenz-Trennung @@ -353,44 +399,18 @@ Das Modul trennt Multicast-Listen nach Frequenzen. Dies verhindert Vermischung v ``` Frequenz 173.050 MHz: Tone-RICs [0234567, 0345678] Frequenz 173.075 MHz: Tone-RICs [0456789, 0567890] -→ Werden getrennt verarbeitet, keine Vermischung möglich +→ Werden getrennt verarbeitet, keine Vermischung möglich (wichtig für Multi-Client mit Single-Server) ``` -### SubRIC-Erhaltung - -**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC aus der Tone-RIC-Phase. Dies ist entscheidend, da SubRICs unterschiedliche Bedeutungen haben können, z.B.: - -- SubRIC 1 (a): Alarmierung -- SubRIC 2 (b): Information -- SubRIC 3 (c): Probealarm -- SubRIC 4 (d): Sirenenalarm - -**Beispiel:** -``` -Eingehende Tone-RICs: -- RIC: 0234567 SubRIC: 4 (Sirenenalarm) -- RIC: 0345678 SubRIC: 3 (Probealarm) - -Text-RIC: RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" - -Ausgegebene Multicast-Pakete: -→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" (behält SubRIC 4!) -→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!) -``` - -### Paketmarkierung statt interner Filterung +## Paketmarkierung statt interner Filterung Das Modul filtert keine inhaltlich relevanten Pakete. -Alle Pakete mit Alarminhalt werden mit `multicastRole` markiert und -weitergereicht. Die Filterung nach Bedarf erfolgt nachgelagert, -z.B. mit `filter.regexFilter`. +Alle Pakete werden mit `multicastRole` markiert und weitergereicht. Die Filterung nach Bedarf erfolgt nachgelagert, z.B. mit `filter.regexFilter`. -Eine Ausnahme bilden **Tone-RICs** (leere Nachrichten): Diese werden -intern im Buffer gesammelt und bei einem complete-Alarm in die -Listenfelder aggregiert. Sie erscheinen nie als eigenständige Pakete -im Router. +Eine Ausnahme bilden **Tone-RICs** (leere Nachrichten): Diese werden zuerst intern im Buffer gesammelt und bei einem complete-Alarm (und incomplete) in die Listenfelder aggregiert. Die Listenfelder werden an alle **Tone-RICs** angehängt und anschließend jede **Tone-RIC** angereichert ausgegeben. Pakete und ihre Rollen: + - **Delimiter-Pakete**: Erhalten `multicastRole: delimiter` - **Netzident-Pakete**: Erhalten `multicastRole: netident` - **Empfänger-Pakete**: Erhalten `multicastRole: recipient` @@ -404,14 +424,26 @@ Beispiel-Filter um Delimiter und Netident auszublenden: - name: "Nur echte Alarme" checks: - field: multicastRole - regex: ^(recipient|single)$ + regex: ^(recipient|single)$multicastIndex ``` + +--- + +## Zusammenfassung: Was passiert mit den Daten? + +- Der Delimiter: Wird sofort markiert und als technisches Paket weitergereicht. Er sorgt dafür, dass keine "Leichen" von alten, abgebrochenen Alarmen im Speicher liegen. +- Die Tone-RICs: Diese werden vom Modul "geschluckt" (return False). Sie verschwinden kurzzeitig aus dem Datenfluss und warten im RAM. +- Die Text-RIC: Wenn sie eintrifft, nimmt das Modul ihren Text (B3 WOHNHAUS) und kopiert ihn in die Tone-RIC-Pakete im RAM. Anschließend erfolgt die Ausgabe der Tone-RICs. +- Die Metadaten: Erst beim Erzeugen der Output-Pakete werden Felder wie multicastRecipientIndex berechnet, damit nachfolgende Plugins wissen, dass diese Pakete zusammengehören. + +Technischer Hinweis: +Da die Text-RIC im complete-Fall "verbraucht" wird, um die Tone-RICs zu füllen, wird sie nicht als separates zusätzliches Paket ausgegeben. Das verhindert Dopplungen in der Datenbank. Nur wenn gar keine Tone-RICs da sind, wird die Text-RIC als `single` ausgegeben. +In diesem Fall (Single) ist es völlig egal, ob die Text-RIC mit Delimiter oder ohne empfangen wird - die Delimiter-RIC wird als Delimiter gekennzeichnet, das Text-RIC als Single (in `multicastMode` sowie in `multicastRole`, `multicastRecipientIndex: 1`, `multicastRecipientCount: 1`). + ### Multi-Instanz-Betrieb +Das Modul unterstützt unbegrenzte parallele Instanzen durch vollständige Isolation: -Das Modul unterstützt mehrere parallele Instanzen mit geteiltem Zustand: - -- **Shared State**: Alle Instanzen teilen sich den Tone-RIC-Speicher (frequenz-basiert) -- **Instance-Specific Timeouts**: Jede Instanz kann eigene `autoClearTimeout`-Werte haben -- **Global Cleanup Thread**: Ein globaler Thread prüft alle Instanzen auf Timeouts -- **Thread-Safe**: Alle Operationen sind thread-sicher mit Locks geschützt \ No newline at end of file +- **Encapsulated State:** Jede Instanz verwaltet ihren eigenen Tone-RIC-Speicher. Es gibt keine Vermischung zwischen verschiedenen Routen. +- **Isolated Cleanup:** Jede Instanz startet einen eigenen, internen Cleanup-Thread für präzises Timeout-Management. +- **Instance IDs:** Zur besseren Nachverfolgung im Log erhält jede Instanz eine eindeutige ID (z.B. MCAST_a1b2). diff --git a/module/multicast.py b/module/multicast.py index 4cecd54..e641bdf 100644 --- a/module/multicast.py +++ b/module/multicast.py @@ -10,7 +10,7 @@ r"""! by Bastian Schroll @file: multicast.py -@date: 28.03.2026 +@date: 13.04.2026 @author: Claus Schichl @description: multicast module """ @@ -36,21 +36,6 @@ class BoswatchModule(ModuleBase): ensuring reliable alarm delivery even in complex multi-frequency scenarios. """ - # CLASS VARIABLES - SHARED STATE - _tone_ric_packets = defaultdict(list) - _last_tone_ric_time = defaultdict(float) - _processing_text_ric = defaultdict(bool) - _processing_text_ric_started = defaultdict(float) - - # SYSTEM VARIABLES - _lock = threading.Lock() - _cleanup_thread = None - _running = False - _wildcards_registered = set() - _packet_queue = [] - _queue_lock = threading.Lock() - _instances = [] - # Trigger defaults _TRIGGER_HOST = "127.0.0.1" _TRIGGER_PORT = 8080 @@ -95,20 +80,26 @@ class BoswatchModule(ModuleBase): self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST) self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT)) + # --- Per-instance state (replaces all former class-variables) --- + # Key: frequency string (e.g. "85.125M") + self._tone_ric_packets = defaultdict(list) # buffered tone-RICs per frequency + self._last_tone_ric_time = defaultdict(float) # last arrival time per frequency + self._processing_text_ric = defaultdict(bool) # text-RIC currently being processed? + self._processing_text_ric_started = defaultdict(float) # when did processing start? + self._wildcards_registered = set() # avoid double-registering wildcards + self._packet_queue = [] # deferred packets waiting for trigger + + # --- Locks (only needed within this instance, no cross-instance sharing) --- + self._lock = threading.Lock() + self._queue_lock = threading.Lock() + + # --- Per-instance cleanup thread --- + self._running = True + self._cleanup_thread = threading.Thread(target=self._cleanup_worker, daemon=True) + self._cleanup_thread.start() + logging.info("[%s] Multicast module loaded", self.name) - with BoswatchModule._lock: - if self not in BoswatchModule._instances: - BoswatchModule._instances.append(self) - - if not BoswatchModule._running: - BoswatchModule._running = True - BoswatchModule._cleanup_thread = threading.Thread( - target=BoswatchModule._global_cleanup_worker, daemon=True - ) - BoswatchModule._cleanup_thread.start() - logging.info("Global multicast cleanup thread started") - # ============================================================ # MAIN PROCESSING # ============================================================ @@ -158,13 +149,13 @@ class BoswatchModule(ModuleBase): if self._text_rics: is_text_ric = ric in self._text_rics and msg and msg.strip() else: - with BoswatchModule._lock: - is_text_ric = msg and msg.strip() and len(BoswatchModule._tone_ric_packets[freq]) > 0 + with self._lock: + is_text_ric = msg and msg.strip() and len(self._tone_ric_packets[freq]) > 0 if is_text_ric: - with BoswatchModule._lock: - BoswatchModule._processing_text_ric[freq] = True - BoswatchModule._processing_text_ric_started[freq] = time.time() + with self._lock: + self._processing_text_ric[freq] = True + self._processing_text_ric_started[freq] = time.time() queued_packets = self._get_queued_packets() incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq) @@ -189,9 +180,9 @@ class BoswatchModule(ModuleBase): if is_text_ric and msg: logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric) alarm_packets = self._distribute_complete(freq, packet_dict) - with BoswatchModule._lock: - BoswatchModule._processing_text_ric[freq] = False - BoswatchModule._processing_text_ric_started.pop(freq, None) + with self._lock: + self._processing_text_ric[freq] = False + self._processing_text_ric_started.pop(freq, None) if not alarm_packets: logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric) @@ -265,22 +256,22 @@ class BoswatchModule(ModuleBase): @param freq: Frequency identifier @param packet_dict: Dictionary containing packet data @return None""" - with BoswatchModule._lock: + with self._lock: stored_packet = packet_dict.copy() stored_packet['_multicast_timestamp'] = time.time() - BoswatchModule._tone_ric_packets[freq].append(stored_packet) - BoswatchModule._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp'] - logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(BoswatchModule._tone_ric_packets[freq]), freq) + self._tone_ric_packets[freq].append(stored_packet) + self._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp'] + logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(self._tone_ric_packets[freq]), freq) def _get_queued_packets(self): r"""!Pop and return all packets currently in the static queue. @param None @return list: List of packets or None""" - with BoswatchModule._queue_lock: - if BoswatchModule._packet_queue: - packets = BoswatchModule._packet_queue[:] - BoswatchModule._packet_queue.clear() + with self._queue_lock: + if self._packet_queue: + packets = self._packet_queue[:] + self._packet_queue.clear() return packets return None @@ -314,11 +305,11 @@ class BoswatchModule(ModuleBase): @param freq: Frequency identifier @param text_packet_dict: Data of the message-carrying packet @return list: List of fully populated Packet instances""" - with BoswatchModule._lock: - recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() - logging.debug("Text-RIC gefunden. Matche gegen %d gespeicherte RICs", len(recipient_dicts)) - BoswatchModule._tone_ric_packets[freq].clear() - BoswatchModule._last_tone_ric_time.pop(freq, None) + with self._lock: + recipient_dicts = self._tone_ric_packets[freq].copy() + logging.debug("Text RIC found. Matching against %d stored RICs", len(recipient_dicts)) + self._tone_ric_packets[freq].clear() + self._last_tone_ric_time.pop(freq, None) if not recipient_dicts: return [] @@ -365,7 +356,7 @@ class BoswatchModule(ModuleBase): self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1) self._apply_list_tags(bwPacket, [packet_dict]) self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1") - logging.debug("Erstelle Single-Alarm für RIC %s", packet_dict.get('ric')) + logging.debug("Creating single-alarm for RIC %s", packet_dict.get('ric')) return [bwPacket] def _handle_delimiter(self, freq, ric, bwPacket=None): @@ -375,11 +366,11 @@ class BoswatchModule(ModuleBase): @param ric: Delimiter RIC @param bwPacket: Optional delimiter packet instance @return list: Incomplete packets or delimiter control packet""" - with BoswatchModule._lock: - orphaned = BoswatchModule._tone_ric_packets[freq].copy() - BoswatchModule._tone_ric_packets[freq].clear() - BoswatchModule._last_tone_ric_time.pop(freq, None) - BoswatchModule._processing_text_ric[freq] = False + with self._lock: + orphaned = self._tone_ric_packets[freq].copy() + self._tone_ric_packets[freq].clear() + self._last_tone_ric_time.pop(freq, None) + self._processing_text_ric[freq] = False if orphaned: age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time()) @@ -405,7 +396,7 @@ class BoswatchModule(ModuleBase): @param count: Total number of recipients @param index: Current recipient index @return None""" - logging.debug("Setze Metadata - Mode: %s, Role: %s für RIC: %s", mode, role, source) + logging.debug("setting Metadata - Mode: %s, Role: %s, Index: %s of %s for RIC: %s", mode, role, index, count, source) mapping = { "multicastMode": (mode, "{MCAST_MODE}"), "multicastRole": (role, "{MCAST_ROLE}"), @@ -439,42 +430,25 @@ class BoswatchModule(ModuleBase): @param wildcard: The wildcard string (e.g. {MCAST_MODE}) @param field: The packet field name @return None""" - if wildcard not in BoswatchModule._wildcards_registered: + if wildcard not in self._wildcards_registered: self.registerWildcard(wildcard, field) - BoswatchModule._wildcards_registered.add(wildcard) + self._wildcards_registered.add(wildcard) # ============================================================ # CLEANUP & TIMEOUT MANAGEMENT # ============================================================ - @staticmethod - def _global_cleanup_worker(): - r"""!Static background thread that ticks all active module instances. - - @param None - @return None""" - logging.info("Global multicast cleanup ticker active") - while BoswatchModule._running: + def _cleanup_worker(self): + r"""!Per-instance background thread for timeout management.""" + logging.info("[%s] Cleanup thread started", self.name) + while self._running: time.sleep(1) - with BoswatchModule._lock: - active_instances = BoswatchModule._instances[:] - for instance in active_instances: - try: - instance._perform_instance_tick() - except Exception as e: - logging.error("Error in instance cleanup: %s", e) + try: + self._check_all_my_frequencies() + except Exception as e: + logging.error("[%s] Error in cleanup thread: %s", self.name, e) if int(time.time()) % 60 == 0: - BoswatchModule._cleanup_hard_timeout_global() - - def _perform_instance_tick(self): - r"""!Tick-entry point for this specific instance. - - Acts as an extension hook for future per-instance tick logic - (e.g. statistics, heartbeat, watchdog). Do not call directly. - - @param None - @return None""" - self._check_all_my_frequencies() + self._cleanup_hard_timeout() def _check_all_my_frequencies(self): r"""!Monitor timeouts for all frequencies assigned to this instance. @@ -484,27 +458,27 @@ class BoswatchModule(ModuleBase): incomplete_packets = [] trigger_data = [] - with BoswatchModule._lock: + with self._lock: current_time = time.time() for freq in list(self._my_frequencies): - if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]: + if freq not in self._tone_ric_packets or not self._tone_ric_packets[freq]: continue - if BoswatchModule._processing_text_ric.get(freq, False): - flag_age = current_time - BoswatchModule._processing_text_ric_started.get(freq, current_time) + if self._processing_text_ric.get(freq, False): + flag_age = current_time - self._processing_text_ric_started.get(freq, current_time) if flag_age > 2: - BoswatchModule._processing_text_ric[freq] = False - BoswatchModule._processing_text_ric_started.pop(freq, None) + self._processing_text_ric[freq] = False + self._processing_text_ric_started.pop(freq, None) else: continue - last_time = BoswatchModule._last_tone_ric_time.get(freq, 0) + last_time = self._last_tone_ric_time.get(freq, 0) if current_time - last_time > self._auto_clear_timeout: - recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() + recipient_dicts = self._tone_ric_packets[freq].copy() safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC) trigger_data.append((freq, safe_ric)) - BoswatchModule._tone_ric_packets[freq].clear() - BoswatchModule._last_tone_ric_time.pop(freq, None) + self._tone_ric_packets[freq].clear() + self._last_tone_ric_time.pop(freq, None) logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout) packets = self._create_incomplete_multicast(freq, recipient_dicts) @@ -512,8 +486,8 @@ class BoswatchModule(ModuleBase): incomplete_packets.extend(packets) if incomplete_packets: - with BoswatchModule._queue_lock: - BoswatchModule._packet_queue.extend(incomplete_packets) + with self._queue_lock: + self._packet_queue.extend(incomplete_packets) for freq, safe_ric in trigger_data: self._send_wakeup_trigger(freq, safe_ric) @@ -522,36 +496,30 @@ class BoswatchModule(ModuleBase): @param freq: Frequency identifier @return list: Incomplete packets if timeout exceeded, else None""" - with BoswatchModule._lock: - if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]: + with self._lock: + if freq not in self._tone_ric_packets or not self._tone_ric_packets[freq]: return None - last_time = BoswatchModule._last_tone_ric_time.get(freq, 0) + last_time = self._last_tone_ric_time.get(freq, 0) if time.time() - last_time > self._auto_clear_timeout: - recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy() - BoswatchModule._tone_ric_packets[freq].clear() - BoswatchModule._last_tone_ric_time.pop(freq, None) + recipient_dicts = self._tone_ric_packets[freq].copy() + self._tone_ric_packets[freq].clear() + self._last_tone_ric_time.pop(freq, None) logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts)) return self._create_incomplete_multicast(freq, recipient_dicts) return None - @staticmethod - def _cleanup_hard_timeout_global(): - r"""!Global failsafe for really old packets (ignores instance config). - - @param None - @return None""" - with BoswatchModule._lock: + def _cleanup_hard_timeout(self): + r"""!Failsafe for really old packets.""" + with self._lock: current_time = time.time() - max_hard_timeout = 120 - if BoswatchModule._instances: - max_hard_timeout = max(inst._hard_timeout for inst in BoswatchModule._instances) - for freq in list(BoswatchModule._tone_ric_packets.keys()): - BoswatchModule._tone_ric_packets[freq] = [ - p for p in BoswatchModule._tone_ric_packets[freq] - if current_time - p.get('_multicast_timestamp', 0) < max_hard_timeout + for freq in list(self._tone_ric_packets.keys()): + self._tone_ric_packets[freq] = [ + p for p in self._tone_ric_packets[freq] + if current_time - p.get('_multicast_timestamp', 0) < self._hard_timeout ] - if not BoswatchModule._tone_ric_packets[freq]: - del BoswatchModule._tone_ric_packets[freq] + # cleaning empty frequencies + if not self._tone_ric_packets[freq]: + del self._tone_ric_packets[freq] # ============================================================ # TRIGGER SYSTEM @@ -568,7 +536,7 @@ class BoswatchModule(ModuleBase): "ric": trigger_ric, "subric": "1", "subricText": "a", - "message": BoswatchModule._MAGIC_WAKEUP_MSG, + "message": self._MAGIC_WAKEUP_MSG, "clientName": "MulticastTrigger", "inputSource": "loopback", "frequency": freq @@ -601,7 +569,5 @@ class BoswatchModule(ModuleBase): @param None @return None""" - with BoswatchModule._lock: - if self in BoswatchModule._instances: - BoswatchModule._instances.remove(self) + self._running = False logging.debug("[%s] Multicast instance unloaded", self.name)