From 0b9387af08298d4b38855f8d635ac48bfac4c1e5 Mon Sep 17 00:00:00 2001
From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com>
Date: Tue, 25 Nov 2025 16:25:28 +0100
Subject: [PATCH 1/5] [feat/multicast]: add multi-instance multicast module
with active trigger system
Introduce a robust multicast processing module for POCSAG that correlates
empty tone-RICs (recipients) with subsequent text-RICs (content).
Key Features:
- Four Output Modes: Internally supports 'complete', 'incomplete', 'single',
and 'control'. Functional alarms are delivered as the first three, while
technical 'control' packets (Delimiters/NetIdent) are filtered by default.
- Active Trigger System: Implements a loss-free deferred delivery mechanism
using a loopback socket (TCP) to re-inject wakeup packets, flushing the
internal queue during auto-clear timeouts.
- Shared State & Multi-Instance: State is shared across instances but
separated by frequency to prevent crosstalk in multi-frequency setups.
- Data Aggregation: Automatically generates '{FIELD}_list' wildcards (e.g.,
RIC_LIST, DESCRIPTION_LIST) for all collected recipients, enabling
consolidated notifications in downstream plugins.
- Dynamic Filtering: Automatically blocks Delimiter and NetIdent RICs from
reaching subsequent plugins if they are defined in the configuration.
Infrastructural Changes:
- ModuleBase: Expanded return semantics to support:
* False: Explicitly blocks/drops a packet.
* List: Allows a module to expand one input into multiple output packets.
- PluginBase: Updated to handle lists of packets, ensuring a full
setup->alarm->teardown lifecycle for every individual element.
---
docu/docs/modul/multicast.md | 394 +++++++++++++++++++++
docu/mkdocs.yml | 3 +-
module/moduleBase.py | 26 ++
module/multicast.py | 638 +++++++++++++++++++++++++++++++++++
plugin/pluginBase.py | 9 +
5 files changed, 1069 insertions(+), 1 deletion(-)
create mode 100644 docu/docs/modul/multicast.md
create mode 100644 module/multicast.py
diff --git a/docu/docs/modul/multicast.md b/docu/docs/modul/multicast.md
new file mode 100644
index 0000000..def5284
--- /dev/null
+++ b/docu/docs/modul/multicast.md
@@ -0,0 +1,394 @@
+#
Multicast
+---
+
+## Beschreibung
+Mit diesem Modul können Multicast-Alarme verarbeitet werden. Dabei wird eine Alarmnachricht automatisch an mehrere Empfänger (RICs) verteilt.
+
+### Funktionsweise
+Multicast-Alarme funktionieren in zwei bis drei Phasen:
+
+1. **Delimiter-Phase (Optional)**: Ein spezieller Delimiter-RIC markiert den Start eines neuen Multicast-Blocks und löscht vorherige wartende Tone-RICs. Der Delimiter selbst wird nicht als Alarm ausgegeben (automatische Filterung). Diese Phase ist optional - ohne Delimiter werden alle leeren Nachrichten als Tone-RICs behandelt.
+
+2. **Tone-RIC-Phase**: Mehrere RICs empfangen (meist) leere Nachrichten. Diese definieren die Empfänger und "registrieren" sich im Modul als Empfänger für die nächste Multicast-Nachricht.
+
+3. **Text-RIC**: Ein spezieller Message-RIC empfängt die eigentliche Alarmnachricht. Diese wird dann automatisch an alle zuvor gesammelten Tone-RICs verteilt.
+
+**Beispiel:**
+```
+10:31:16 - RIC: 0123456 SubRIC: 1 Message: (leer) → Delimiter-RIC
+10:31:16 - RIC: 0234567 SubRIC: 4 Message: (leer) → Empfänger 1
+10:31:16 - RIC: 0345678 SubRIC: 3 Message: (leer) → Empfänger 2
+10:31:17 - RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" → Message-RIC
+
+Generierte Alarme:
+→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS"
+→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS"
+```
+
+**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC, da diese oft unterschiedliche Alarmtypen oder Prioritäten repräsentiert.
+
+Das Modul unterstützt:
+
+- Mehrere Startmarker (Delimiter)
+- Mehrere Text-RICs
+- Netzident-RIC zur Paketfilterung
+- Automatische Bereinigung alter Tone-RICs (Fehlerfall: Auto-Clear)
+- Active Trigger System zur verlustfreien Paketauslieferung
+- Wildcards für spätere Weiterverarbeitung
+- Frequenz-basierte Trennung
+- Multi-Instanz-Betrieb mit geteiltem Zustand
+
+Hinweis: Der Delimiter-RIC (0123456) wird automatisch gefiltert und nicht ausgegeben.
+
+**Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Falls keine Tone-RICs vorhanden sind, wird die Text-RIC als `multicastMode: single` ausgegeben.
+
+## Unterstützte Alarmtypen
+- POCSAG
+
+## Resource
+`multicast`
+
+## Konfiguration
+
+|Feld|Beschreibung|Default|
+|----|------------|-------|
+|autoClearTimeout|Auto-Clear Timeout in Sekunden - Nicht zugestellte Empfänger werden nach dieser Zeit als incomplete ausgegeben|10|
+|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden automatisch gefiltert wenn konfiguriert)|leer|
+|textRics|Komma-getrennte Liste von RICs, die den Alarmtext tragen|leer|
+|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden automatisch gefiltert wenn konfiguriert)|leer|
+|triggerRic|RIC für das Wakeup-Trigger-Paket (optional, bei leer: dynamisch = erste Tone-RIC)|leer|
+|triggerHost|IP-Adresse für Loopback-Trigger|127.0.0.1|
+|triggerPort|Port für Loopback-Trigger|8080|
+
+**Achtung:** Zahlen welche führende Nullen enthalten müssen in Anführungszeichen gesetzt werden, z.B. `'0012345'`
+
+### Konfigurationsbeispiel 1: Automatische Delimiter-Erkennung (oder nicht verfügbar im Netzwerk) (= Minimalkonfiguration)
+```yaml
+- type: module
+ res: multicast
+ name: Multicast Handler
+ config:
+ textRics: '0299001,0310001'
+```
+In diesem Modus werden **alle leeren Nachrichten** als toneRics behandelt (keine `delimiterRics` angegeben).
+
+### Konfigurationsbeispiel 2: Mit Delimiter-Trenner (empfohlen)
+```yaml
+- type: module
+ res: multicast
+ name: Multicast Handler
+ config:
+ autoClearTimeout: 10
+ delimiterRics: '0988988'
+ textRics: '0299001,0310001'
+```
+In diesem Modus wird **0988988 als Trenner (= Delimiter)** behandelt und **alle anderen leeren Nachrichten als Empfänger**.
+
+### Konfigurationsbeispiel 3: Mit expliziter Trigger-RIC
+```yaml
+- type: module
+ res: multicast
+ name: Multicast Handler
+ config:
+ autoClearTimeout: 10
+ delimiterRics: '0988988'
+ textRics: '0299001,0310001'
+ triggerRic: '9999999'
+ triggerHost: '127.0.0.1'
+ triggerPort: 8080
+```
+Verwendet eine feste RIC (9999999) für das interne Wakeup-Trigger-Paket.
+
+### Konfigurationsbeispiel 4: Mit Netzident-Filterung
+```yaml
+- type: module
+ res: multicast
+ name: Multicast Handler
+ config:
+ autoClearTimeout: 10
+ delimiterRics: '0988988'
+ textRics: '0299001,0310001'
+ netIdentRics: '0000001'
+```
+Filtert Netzident-Pakete (RIC 0000001) automatisch aus der Weiterverarbeitung.
+
+## Integration in Router-Konfiguration
+
+Das Multicast-Modul sollte **vor** den Plugins platziert werden, damit die generierten Alarme korrekt verarbeitet werden:
+
+```yaml
+- name: Router POCSAG
+ route:
+ - type: module
+ res: filter.modeFilter
+ name: Filter POCSAG
+ config:
+ allowed:
+ - pocsag
+
+ # Multicast-Modul hier einfügen
+ - type: module
+ res: multicast
+ name: Multicast Handler
+ config:
+ textRics: '0299001,0310001'
+ delimiterRics: '0288088'
+ autoClearTimeout: 10
+
+ # Weitere Module und Plugins
+ - type: plugin
+ res: mysql
+ config:
+ # ...
+
+ - type: plugin
+ res: telegram
+ config:
+ # ...
+```
+
+## Beispielhafte Verwendung in Router-Konfigurationen
+Das Multicast-Modul gibt für jedes RIC ein eigenes Paket aus UND generiert für konsistente Verarbeitung Listenfelder.
+Dies ermöglicht es, entweder jede RIC einzeln zu verarbeiten oder die Listenfelder für eine gesammelte Ausgabe zu verwenden. Vor der weiteren Verarbeitung in Plugins empfiehlt sich eventuell eine Filterung mittels [RegEx-Filter](regex_filter.md).
+Die folgenden Beispiele dienen zur Veranschaulichung der Möglichkeiten des Multicast-Modul in Verbindung mit RegEx-Filter.
+
+
+### Beispiel (Zusätzliche Wildcards werden noch später in diesem Readme erklärt):
+```yaml
+router:
+- name: Router POCSAG
+ route:
+ - type: module
+ res: filter.modeFilter
+ config:
+ [...]
+ - type: module
+ res: filter.doubleFilter
+ config:
+ [...]
+ - type: module
+ res: descriptor
+ config:
+ [...]
+ - type: module
+ res: multicast
+ name: Multicast
+ config:
+ autoClearTimeout: 10
+ delimiterRics: '0123456' # Start eines Multicast-Alarms
+ textRics: '9909909' # Text-RIC
+ - type: router
+ res: RouterMySQL
+ - type: router
+ res: RouterTelegram
+
+- name: RouterMySQL
+ route:
+ - type: module
+ res: filter.regexFilter
+ name: Filter MySQL
+ config:
+ - name: "Multicast Mode complete or single"
+ checks:
+ - field: multicastMode
+ regex: ^(complete|single)$
+ - type: plugin
+ res: mysql
+ config:
+ [...]
+
+- name: RouterTelegram
+ route:
+ - type: module
+ res: filter.regexFilter
+ name: Multicast Recipient Index Filter # 1. Paket, da ist alles drin für einen kombinierten Alarm und ist immer vorhanden
+ config:
+ - name: "Multicast 1 Paket pro Alarm-Paket"
+ checks:
+ - field: multicastRecipientIndex
+ regex: ^1$
+ - type: plugin
+ res: telegram
+ config:
+ message_pocsag: |
+ {CNAME}
+ {MSG}
+ Alarmierte Einheiten [{MCAST_COUNT}]: {DESCRIPTION_LIST}
+ RICs: {RIC_LIST}
+ {TIME}
+ [...]
+
+```
+
+---
+## Modul Abhängigkeiten
+- keine
+
+---
+## Externe Abhängigkeiten
+- keine
+
+---
+## Paket Modifikationen
+
+### Hinzugefügte Felder bei Multicast-Alarmen:
+- `multicastMode`(string): Beschreibt das Ergebnis der Multicast-Verarbeitung, besitzt einen der Werte:
+
+ - `complete`: Vollständiges Multicast-Packet
+ - `incomplete`: Unvollständiges Multicast-Packet (meist fehlt die Text-RIC)
+ - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC)
+ - `control`: Netzwerk-Ident-RIC oder andere Verwaltung-RICs
+
+- `multicastRole` (string): Beschreibt die Rolle dieses Pakets innerhalb des Multicast-Ablaufs, besitzt einen der Werte:
+
+ - `delimiter`: Startmarker-Paket (wird automatisch gefiltert wenn delimiterRics konfiguriert)
+ - `recipient`: tatsächlicher Empfänger
+ - `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC)
+ - `netident`: Netzwerk-Identifikations-Paket (wird automatisch gefiltert wenn netIdentRics konfiguriert)
+
+- `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs
+- `multicastRecipientCount` (string): Anzahl der Empfänger insgesamt
+- `multicastRecipientIndex` (string): Index dieses Empfängers (1-N), folgende Logik:
+
+ - Empfänger haben den Index 1 bis n.
+ - Delimiter/Singles haben Index 1 (da sie alleinstehen).
+
+- `_list` (string): Liste von Werten aus allen Empfänger-RICs für jedes Originalfeld (z.B. ric_list, message_list)
+
+### Veränderte Felder bei Multicast-Alarmen:
+- `ric`: Wird durch Empfänger-RIC ersetzt
+- `subric`: Wird durch Empfänger-SubRIC ersetzt
+- `subricText`: Wird durch Empfänger-SubRIC-Text ersetzt
+- `message`: Bei incomplete-Modus leer, sonst Text von Text-RIC
+
+### Rückgabewerte:
+- **False**: Paket wurde blockiert (z.B. Delimiter/Netident-Filterung), Router stoppt Verarbeitung
+- **Liste von Paketen**: Multicast-Verteilung, Router verarbeitet jedes Paket einzeln
+- **None**: Normaler Alarm, Router fährt mit unveränderten Paket fort
+
+---
+## Zusätzliche Wildcards
+
+Folgende Wildcards stehen in allen nachfolgenden Plugins zur Verfügung:
+
+|Wildcard|Beschreibung|Beispiel|
+|--------|------------|--------|
+|{MCAST_SOURCE}|RIC des ursprünglichen Message-RICs|0299001|
+|{MCAST_COUNT}|Gesamtanzahl der Empfänger dieses Multicasts.|3|
+|{MCAST_INDEX}|Index des Empfängers (1-basiert für Recipients, 0 für Control-Pakete)|0, 1, 2, 3, ...|
+|{MCAST_MODE}|Art der Multicast-Verarbeitung durch das Modul|complete, incomplete, single, control|
+|{MCAST_ROLE}|Rolle des Pakets im Multicast-Ablauf|recipient, single, delimiter, netident|
+
+## Erweiterung der Listen-Wildcards
+Das Modul generiert Wildcards für alle gesammelten Felder (RICs, SubRICs, etc.) in Listenform. Diese sind besonders nützlich, um eine kombinierte Ausgabe (z.B. in Telegram) zu erstellen:
+
+|Wildcard|Beschreibung|Zugrundeliegendes Feld|Beispiel|
+|--------|------------|--------|--------|
+|{RIC_LIST}|Liste aller RICs der Empfänger (durch Komma getrennt).|ric_list|"0299001, 0299002"|
+|{SUBRIC_LIST}|Liste aller SubRICs der Empfänger|subric_list|"4, 3"|
+|{DESCRIPTION_LIST}|Liste aller (deskriptiven) Namen der Empfänger.|description_list|"FF Musterstadt, BF Beispiel"|
+|{_LIST}|Liste der Werte für jedes Originalfeld aus dem Paket|_list|{FREQUENCY_LIST}, {BITRATE_LIST}|
+
+**Wichtig:** Verwende die **originalen Feldnamen** (z.B. `frequency_list`), nicht die Wildcard-Namen (z.B. ~~`FREQ_list`~~).
+
+### Verwendungsbeispiel in Plugins, z.B. Telegram-Plugin:
+```yaml
+- type: plugin
+ res: telegram
+ config:
+ message_pocsag: |
+ {CNAME}
+ {MSG}
+ RIC: {RIC} / SubRIC: {SRIC}
+ Multicast: {MCAST_INDEX}/{MCAST_COUNT} (Quelle: {MCAST_SOURCE})
+ {TIME}
+```
+
+---
+## Funktionsweise im Detail
+
+### Active Trigger System (Verlustfreie Paketauslieferung)
+
+Das Modul verwendet ein aktives Trigger-System, um sicherzustellen, dass **keine Multicast-Pakete verloren gehen**:
+
+1. **Deferred Delivery**: Bei einem Auto-Clear-Timeout werden die incomplete-Pakete nicht sofort ausgegeben, sondern in einer internen Queue gespeichert.
+
+2. **Wakeup-Trigger**: Das Modul sendet ein spezielles Trigger-Paket via Loopback-Socket (Standard: 127.0.0.1:8080) zurück an den BOSWatch-Server.
+
+3. **Queue-Flush**: Beim Empfang des Trigger-Pakets werden alle gespeicherten Pakete aus der Queue ausgegeben.
+
+**Trigger-RIC Auswahl** (3-stufige Fallback-Kette):
+- **Explizit**: Wenn `triggerRic` konfiguriert ist, wird diese RIC verwendet
+- **Dynamisch**: Wenn nicht konfiguriert, wird die erste Tone-RIC der aktuellen Gruppe verwendet
+- **Fallback**: Falls keine Tone-RICs vorhanden sind (sollte nicht vorkommen), wird "9999999" verwendet
+
+**Beispiel-Ablauf bei Auto-Clear:**
+```
+10:31:16 - Tone-RIC empfangen (0234567)
+10:31:16 - Tone-RIC empfangen (0345678)
+10:31:26 - Auto-Clear-Timeout (10s) erreicht
+ → Incomplete-Pakete in Queue gespeichert
+ → Trigger-Paket via Loopback gesendet (RIC: 0234567)
+10:31:26 - Trigger-Paket durch BOSWatch-Server verarbeitet
+ → Queue-Flush: Incomplete-Pakete werden ausgegeben
+```
+
+**Vorteile:**
+- Keine Pakete gehen verloren, auch bei hoher Systemlast
+- Saubere Trennung von Verarbeitung und Ausgabe
+- Ermöglicht zeitlich versetzte Ausgabe ohne Race Conditions
+
+### Zeitbasierte Verarbeitung
+
+1. **Tone-RIC-Sammlung**: Tone-RICs (meist leere Nachrichten) werden empfangen und gespeichert
+2. **Auto-Clear**: Nach `autoClearTimeout` Sekunden ohne Text-RIC werden die Tone-RICs als incomplete ausgegeben (via Trigger-System)
+3. **Text-RIC-Verteilung**: Sobald ein Text-RIC empfangen wird, erfolgt die sofortige Verteilung an alle gesammelten Tone-RICs
+4. **Hard-Timeout-Cleanup**: Nach 3x `autoClearTimeout` (oder max. 120s) werden veraltete Pakete aus dem Speicher gelöscht (Failsafe)
+
+### Frequenz-Trennung
+
+Das Modul trennt Multicast-Listen nach Frequenzen. Dies verhindert Vermischung von Alarmen verschiedener Sender.
+
+**Beispiel:**
+```
+Frequenz 173.050 MHz: Tone-RICs [0234567, 0345678]
+Frequenz 173.075 MHz: Tone-RICs [0456789, 0567890]
+→ Werden getrennt verarbeitet, keine Vermischung möglich
+```
+
+### SubRIC-Erhaltung
+
+**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC aus der Tone-RIC-Phase. Dies ist entscheidend, da SubRICs unterschiedliche Bedeutungen haben können, z.B.:
+
+- SubRIC 1 (a): Alarmierung
+- SubRIC 2 (b): Information
+- SubRIC 3 (c): Probealarm
+- SubRIC 4 (d): Sirenenalarm
+
+**Beispiel:**
+```
+Eingehende Tone-RICs:
+- RIC: 0234567 SubRIC: 4 (Sirenenalarm)
+- RIC: 0345678 SubRIC: 3 (Probealarm)
+
+Text-RIC: RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS"
+
+Ausgegebene Multicast-Pakete:
+→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" (behält SubRIC 4!)
+→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!)
+```
+
+### Automatische Filterung
+
+- **Delimiter-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `delimiterRics` konfiguriert ist
+- **Netzident-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `netIdentRics` konfiguriert ist
+- **Filterung nach multicastRole**: Ermöglicht saubere nachgelagerte Verarbeitung ohne manuelle Filter
+
+### Multi-Instanz-Betrieb
+
+Das Modul unterstützt mehrere parallele Instanzen mit geteiltem Zustand:
+
+- **Shared State**: Alle Instanzen teilen sich den Tone-RIC-Speicher (frequenz-basiert)
+- **Instance-Specific Timeouts**: Jede Instanz kann eigene `autoClearTimeout`-Werte haben
+- **Global Cleanup Thread**: Ein globaler Thread prüft alle Instanzen auf Timeouts
+- **Thread-Safe**: Alle Operationen sind thread-sicher mit Locks geschützt
\ No newline at end of file
diff --git a/docu/mkdocs.yml b/docu/mkdocs.yml
index c2602dd..73d112e 100644
--- a/docu/mkdocs.yml
+++ b/docu/mkdocs.yml
@@ -20,10 +20,11 @@ nav:
- Changelog: changelog.md
- Module:
- Descriptor: modul/descriptor.md
+ - Double Filter: modul/double_filter.md
- Geocoding: modul/geocoding.md
- Mode Filter: modul/mode_filter.md
+ - Multicast: modul/multicast.md
- Regex Filter: modul/regex_filter.md
- - Double Filter: modul/double_filter.md
- Plugins:
- Http: plugin/http.md
- Telegram: plugin/telegram.md
diff --git a/module/moduleBase.py b/module/moduleBase.py
index 1ce8ccd..568a24a 100644
--- a/module/moduleBase.py
+++ b/module/moduleBase.py
@@ -56,6 +56,32 @@ class ModuleBase(ABC):
@param bwPacket: A BOSWatch packet instance
@return bwPacket or False"""
+
+ # --- FIX: Multicast list support for Module ---
+ if isinstance(bwPacket, list):
+ result_packets = []
+ for single_packet in bwPacket:
+ # Recursive call for single packet
+ processed = self._run(single_packet)
+
+ # new logic:
+ if processed is False:
+ # filter called 'False' -> packet discarded
+ continue
+ elif processed is None:
+ # module returned None -> keep packet unchanged
+ result_packets.append(single_packet)
+ elif isinstance(processed, list):
+ # module returned new list -> extend
+ result_packets.extend(processed)
+ else:
+ # module returned modified packet -> add
+ result_packets.append(processed)
+
+ # if list is not empty, return it. else False (filter all).
+ return result_packets if result_packets else False
+ # -----------------------------------------------
+
self._runCount += 1
logging.debug("[%s] run #%d", self._moduleName, self._runCount)
diff --git a/module/multicast.py b/module/multicast.py
new file mode 100644
index 0000000..d8ae05e
--- /dev/null
+++ b/module/multicast.py
@@ -0,0 +1,638 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+r"""!
+ ____ ____ ______ __ __ __ _____
+ / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
+ / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
+ / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
+/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
+ German BOS Information Script
+ by Bastian Schroll
+
+@file: multicast.py
+@date: 26.01.2025
+@author: Claus Schichl
+@description: multicast module
+"""
+
+import logging
+import time
+import threading
+import socket
+import json
+import datetime
+from collections import defaultdict
+from module.moduleBase import ModuleBase
+from boswatch.packet import Packet
+
+logging.debug("- %s loaded", __name__)
+
+
+class BoswatchModule(ModuleBase):
+ r"""!Multicast module with multi-instance support and active trigger mechanism
+
+ This module handles multicast alarm distribution.
+ It manages the correlation between tone-RICs (recipients) and text-RICs (message content),
+ ensuring reliable alarm delivery even in complex multi-frequency scenarios.
+ """
+
+ # CLASS VARIABLES - SHARED STATE
+ _tone_ric_packets = defaultdict(list)
+ _last_tone_ric_time = defaultdict(float)
+ _processing_text_ric = defaultdict(bool)
+ _processing_text_ric_started = defaultdict(float)
+
+ # SYSTEM VARIABLES
+ _lock = threading.Lock()
+ _cleanup_thread = None
+ _running = False
+ _wildcards_registered = set()
+ _packet_queue = []
+ _queue_lock = threading.Lock()
+ _instances = []
+
+ # Trigger defaults
+ _TRIGGER_HOST = "127.0.0.1"
+ _TRIGGER_PORT = 8080
+ _MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
+ _DEFAULT_TRIGGER_RIC = "9999999"
+
+# ============================================================
+# LIFECYCLE METHODS
+# ============================================================
+
+ def __init__(self, config):
+ super().__init__(__name__, config)
+
+ def onLoad(self):
+ r"""!Initialize module configuration and start the global cleanup thread.
+
+ @param None
+ @return None"""
+ self._my_frequencies = set()
+ self.instance_id = hex(id(self))[-4:]
+ self.name = f"MCAST_{self.instance_id}"
+
+ self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10))
+ self._hard_timeout = self._auto_clear_timeout * 3
+
+ def parse_list(key):
+ val = self.config.get(key)
+ if val:
+ return [x.strip() for x in str(val).split(",") if x.strip()]
+ return []
+
+ self._delimiter_rics = parse_list("delimiterRics")
+ self._text_rics = parse_list("textRics")
+ self._netident_rics = parse_list("netIdentRics")
+
+ trigger_ric_cfg = self.config.get("triggerRic")
+ if trigger_ric_cfg:
+ self._trigger_ric = str(trigger_ric_cfg).strip()
+ self._trigger_ric_mode = "explicit"
+ else:
+ self._trigger_ric = None
+ self._trigger_ric_mode = "dynamic"
+
+ self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
+ self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
+
+ self._block_delimiter = bool(self._delimiter_rics)
+ self._block_netident = bool(self._netident_rics)
+
+ logging.info("[%s] Multicast module loaded", self.name)
+
+ with BoswatchModule._lock:
+ if self not in BoswatchModule._instances:
+ BoswatchModule._instances.append(self)
+
+ if not BoswatchModule._running:
+ BoswatchModule._running = True
+ BoswatchModule._cleanup_thread = threading.Thread(
+ target=BoswatchModule._global_cleanup_worker, daemon=True
+ )
+ BoswatchModule._cleanup_thread.start()
+ logging.info("Global multicast cleanup thread started")
+
+# ============================================================
+# MAIN PROCESSING
+# ============================================================
+
+ def doWork(self, bwPacket):
+ r"""!Process an incoming packet and handle multicast logic.
+
+ @param bwPacket: A BOSWatch packet instance or list of packets
+ @return bwPacket, a list of packets, or False if blocked"""
+ if isinstance(bwPacket, list):
+ result_packets = []
+ for single_packet in bwPacket:
+ processed = self.doWork(single_packet)
+ if processed is None:
+ result_packets.append(single_packet)
+ elif processed is not False:
+ if isinstance(processed, list):
+ result_packets.extend(processed)
+ else:
+ result_packets.append(processed)
+ return result_packets if result_packets else None
+
+ packet_dict = self._get_packet_data(bwPacket)
+ msg = packet_dict.get("message")
+ ric = packet_dict.get("ric")
+ freq = packet_dict.get("frequency", "default")
+ mode = packet_dict.get("mode")
+
+ if msg == BoswatchModule._MAGIC_WAKEUP_MSG:
+ if self._trigger_ric and ric != self._trigger_ric:
+ pass
+ else:
+ logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric)
+ queued = self._get_queued_packets()
+ return queued if queued else False
+
+ if mode != "pocsag":
+ queued = self._get_queued_packets()
+ return queued if queued else None
+
+ self._my_frequencies.add(freq)
+
+ is_text_ric = False
+ if self._text_rics:
+ is_text_ric = ric in self._text_rics and msg and msg.strip()
+ else:
+ with BoswatchModule._lock:
+ is_text_ric = msg and msg.strip() and len(BoswatchModule._tone_ric_packets[freq]) > 0
+
+ if is_text_ric:
+ with BoswatchModule._lock:
+ BoswatchModule._processing_text_ric[freq] = True
+ BoswatchModule._processing_text_ric_started[freq] = time.time()
+
+ queued_packets = self._get_queued_packets()
+ incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq)
+
+ if self._netident_rics and ric in self._netident_rics:
+ self._set_mcast_metadata(bwPacket, "control", "netident", ric)
+ result = self._combine_results(incomplete_packets, queued_packets, [bwPacket])
+ return self._filter_output(result)
+
+ if self._delimiter_rics and ric in self._delimiter_rics:
+ delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket)
+ result = self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets)
+ return self._filter_output(result)
+
+ if not msg or not msg.strip():
+ self._add_tone_ric_packet(freq, packet_dict)
+ result = self._combine_results(incomplete_packets, queued_packets, False)
+ return self._filter_output(result)
+
+ if is_text_ric and msg:
+ logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric)
+ alarm_packets = self._distribute_complete(freq, packet_dict)
+ with BoswatchModule._lock:
+ BoswatchModule._processing_text_ric[freq] = False
+ BoswatchModule._processing_text_ric_started.pop(freq, None)
+
+ if not alarm_packets:
+ logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric)
+ normal = self._enrich_normal_alarm(bwPacket, packet_dict)
+ result = self._combine_results(normal, incomplete_packets, queued_packets)
+ else:
+ result = self._combine_results(alarm_packets, incomplete_packets, queued_packets)
+ return self._filter_output(result)
+
+ if msg:
+ normal = self._enrich_normal_alarm(bwPacket, packet_dict)
+ result = self._combine_results(normal, incomplete_packets, queued_packets)
+ return self._filter_output(result)
+
+ result = self._combine_results(incomplete_packets, queued_packets)
+ return self._filter_output(result)
+
+# ============================================================
+# PACKET PROCESSING HELPERS (called by doWork)
+# ============================================================
+
+ def _get_packet_data(self, bwPacket):
+ r"""!Safely extract all fields from packet as a dictionary.
+
+ Handles both dict objects and Packet instances.
+ Dynamically extracts all fields including those added by other modules.
+
+ @param bwPacket: Packet instance or dict
+ @return dict: Complete dictionary of all packet fields"""
+ # 1. Fall: Es ist bereits ein Dictionary
+ if isinstance(bwPacket, dict):
+ return bwPacket.copy()
+
+ # 2. Fall: Es ist ein Packet-Objekt (Daten liegen in _packet)
+ if hasattr(bwPacket, '_packet'):
+ return bwPacket._packet.copy()
+
+ # 3. Fallback: Falls es ein anderes Objekt ist, versuche __dict__ ohne '_' Filter für 'packet'
+ try:
+ return {k: v for k, v in bwPacket.__dict__.items() if not k.startswith('_')}
+ except Exception as e:
+ logging.warning("[%s] Error: %s", self.name, e)
+ return {}
+
+ def _filter_output(self, result):
+ r"""!Apply multicastRole filtering before output.
+
+ @param result: Single packet, list of packets, None or False
+ @return Final packet(s) or False if blocked"""
+ if result is None or result is False:
+ return result
+
+ def get_role(packet):
+ """Helper to extract multicastRole from Packet object"""
+ packet_dict = self._get_packet_data(packet)
+ return packet_dict.get("multicastRole")
+
+ if isinstance(result, list):
+ filtered = [p for p in result if self._should_output_packet(get_role(p))]
+ if not filtered:
+ logging.debug("All packets filtered out by multicastRole")
+ return False
+ return filtered if len(filtered) > 1 else filtered[0]
+ else:
+ if self._should_output_packet(get_role(result)):
+ return result
+ logging.debug("Packet filtered out: multicastRole=%s", get_role(result))
+ return False
+
+ def _combine_results(self, *results):
+ r"""!Combine multiple result sources into a single list or status.
+
+ @param results: Multiple packet objects, lists, or booleans
+ @return combined list, False or None"""
+ combined = []
+ has_false = False
+ for result in results:
+ if result is False:
+ has_false = True
+ continue
+ if result is None:
+ continue
+ if isinstance(result, list):
+ combined.extend(result)
+ else:
+ combined.append(result)
+ if combined:
+ return combined
+ return False if has_false else None
+
+ def _should_output_packet(self, multicast_role):
+ r"""!Check if packet should be output based on role.
+
+ @param multicast_role: The role string to check
+ @return bool: True if allowed"""
+ if self._block_delimiter and multicast_role == "delimiter":
+ return False
+ if self._block_netident and multicast_role == "netident":
+ return False
+ return True
+
+# ============================================================
+# TONE-RIC BUFFER MANAGEMENT
+# ============================================================
+
+ def _add_tone_ric_packet(self, freq, packet_dict):
+ r"""!Add a tone-RIC to the shared buffer.
+
+ @param freq: Frequency identifier
+ @param packet_dict: Dictionary containing packet data
+ @return None"""
+ with BoswatchModule._lock:
+ stored_packet = packet_dict.copy()
+ stored_packet['_multicast_timestamp'] = time.time()
+ BoswatchModule._tone_ric_packets[freq].append(stored_packet)
+ BoswatchModule._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp']
+ logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(BoswatchModule._tone_ric_packets[freq]), freq)
+
+ def _get_queued_packets(self):
+ r"""!Pop and return all packets currently in the static queue.
+
+ @param None
+ @return list: List of packets or None"""
+ with BoswatchModule._queue_lock:
+ if BoswatchModule._packet_queue:
+ packets = BoswatchModule._packet_queue[:]
+ BoswatchModule._packet_queue.clear()
+ return packets
+ return None
+
+# ============================================================
+# MULTICAST PACKET CREATION
+# ============================================================
+
+ def _copy_packet_dict_to_packet(self, recipient_dict, packet, index=1):
+ r"""!Copy dict fields to Packet with timestamp shift for DB uniqueness.
+
+ @param recipient_dict: Source dictionary
+ @param packet: Target Packet object
+ @param index: Packet index (1-based) - shifts timestamp by milliseconds
+ @return None"""
+ for k, v in recipient_dict.items():
+ if k.startswith('_'):
+ continue
+ if k == 'timestamp' and index > 1:
+ try:
+ dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S')
+ dt_shifted = dt + datetime.timedelta(milliseconds=index - 1)
+ packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S'))
+ except:
+ packet.set(k, str(v))
+ else:
+ packet.set(k, str(v))
+
+ def _distribute_complete(self, freq, text_packet_dict):
+ r"""!Create full multicast packets with message content.
+
+ @param freq: Frequency identifier
+ @param text_packet_dict: Data of the message-carrying packet
+ @return list: List of fully populated Packet instances"""
+ with BoswatchModule._lock:
+ recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
+ logging.debug("Text-RIC gefunden. Matche gegen %d gespeicherte RICs", len(recipient_dicts))
+ BoswatchModule._tone_ric_packets[freq].clear()
+ BoswatchModule._last_tone_ric_time.pop(freq, None)
+
+ if not recipient_dicts:
+ return []
+ text_ric = text_packet_dict.get("ric")
+ message_text = text_packet_dict.get("message")
+ alarm_packets = []
+
+ for idx, recipient_dict in enumerate(recipient_dicts, 1):
+ p = Packet()
+ self._copy_packet_dict_to_packet(recipient_dict, p, idx)
+ p.set("message", message_text)
+ self._apply_list_tags(p, recipient_dicts)
+ self._set_mcast_metadata(p, "complete", "recipient", text_ric, len(recipient_dicts), idx)
+ alarm_packets.append(p)
+
+ logging.info("[%s] Generated %d complete multicast packets for text-RIC %s", self.name, len(alarm_packets), text_ric)
+ return alarm_packets
+
+ def _create_incomplete_multicast(self, freq, recipient_dicts):
+ r"""!Generate multicast packets for timeouts (no text message).
+
+ @param freq: Frequency identifier
+ @param recipient_dicts: List of recipient data dictionaries
+ @return list: List of incomplete Packet instances"""
+ if not recipient_dicts:
+ return []
+ first_ric = recipient_dicts[0].get("ric", "unknown")
+ incomplete_packets = []
+ for idx, recipient_dict in enumerate(recipient_dicts, 1):
+ p = Packet()
+ self._copy_packet_dict_to_packet(recipient_dict, p, idx)
+ p.set("message", "")
+ self._apply_list_tags(p, recipient_dicts)
+ self._set_mcast_metadata(p, "incomplete", "recipient", first_ric, len(recipient_dicts), idx)
+ incomplete_packets.append(p)
+ return incomplete_packets
+
+ def _enrich_normal_alarm(self, bwPacket, packet_dict):
+ r"""!Enrich a standard single alarm with multicast metadata.
+
+ @param bwPacket: Target Packet object
+ @param packet_dict: Source data dictionary
+ @return list: List containing the enriched packet"""
+ self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1)
+ self._apply_list_tags(bwPacket, [packet_dict])
+ self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1")
+ logging.debug(f"Erstelle Single-Alarm für RIC {packet_dict.get('ric')}")
+ return [bwPacket]
+
+ def _handle_delimiter(self, freq, ric, bwPacket=None):
+ r"""!Handle delimiter packet and clear orphaned tone-RICs.
+
+ @param freq: Frequency identifier
+ @param ric: Delimiter RIC
+ @param bwPacket: Optional delimiter packet instance
+ @return list: Incomplete packets or delimiter control packet"""
+ with BoswatchModule._lock:
+ orphaned = BoswatchModule._tone_ric_packets[freq].copy()
+ BoswatchModule._tone_ric_packets[freq].clear()
+ BoswatchModule._last_tone_ric_time.pop(freq, None)
+ BoswatchModule._processing_text_ric[freq] = False
+
+ if orphaned:
+ age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time())
+
+ logging.debug("[%s] Delimiter RIC=%s cleared %d orphaned tone-RICs on freq %s: RICs=[%s], age=%.1fs → Creating INCOMPLETE multicast packets for forwarding", self.name, ric, len(orphaned), freq, ', '.join([packet.get('ric', 'unknown') for packet in orphaned]), age_seconds)
+ return self._create_incomplete_multicast(freq, orphaned)
+ if bwPacket is not None:
+ self._set_mcast_metadata(bwPacket, "control", "delimiter", ric, "0", "0")
+ return [bwPacket]
+ return None
+
+# ============================================================
+# PACKET METADATA HELPERS
+# ============================================================
+
+ def _set_mcast_metadata(self, packet, mode, role, source="", count="1", index="1"):
+ r"""!Helper to set standard multicast fields and register wildcards.
+
+ @param packet: The Packet instance to modify
+ @param mode: multicastMode (complete, incomplete, single, control)
+ @param role: multicastRole (recipient, single, delimiter, netident)
+ @param source: The originating RIC
+ @param count: Total number of recipients
+ @param index: Current recipient index
+ @return None"""
+ logging.debug(f"Setze Metadata - Mode: {mode}, Role: {role} für RIC: {source}")
+ mapping = {
+ "multicastMode": (mode, "{MCAST_MODE}"),
+ "multicastRole": (role, "{MCAST_ROLE}"),
+ "multicastSourceRic": (source, "{MCAST_SOURCE}"),
+ "multicastRecipientCount": (str(count), "{MCAST_COUNT}"),
+ "multicastRecipientIndex": (str(index), "{MCAST_INDEX}")
+ }
+ for key, (val, wildcard) in mapping.items():
+ packet.set(key, val)
+ self._register_wildcard_safe(wildcard, key)
+
+ def _apply_list_tags(self, packet, recipient_dicts):
+ r"""!Helper to aggregate fields from all recipients into comma-separated lists.
+
+ @param packet: The target Packet instance
+ @param recipient_dicts: List of dictionaries of all recipients in this group
+ @return None"""
+ all_fields = set()
+ for r in recipient_dicts:
+ all_fields.update(k for k in r.keys() if not k.startswith('_'))
+
+ for f in sorted(all_fields):
+ list_val = ", ".join([str(r.get(f, "")) for r in recipient_dicts])
+ list_key = f"{f}_list"
+ packet.set(list_key, list_val)
+ self._register_wildcard_safe("{" + f.upper() + "_LIST}", list_key)
+
+ def _register_wildcard_safe(self, wildcard, field):
+ r"""!Register wildcard if not already globally registered.
+
+ @param wildcard: The wildcard string (e.g. {MCAST_MODE})
+ @param field: The packet field name
+ @return None"""
+ if wildcard not in BoswatchModule._wildcards_registered:
+ self.registerWildcard(wildcard, field)
+ BoswatchModule._wildcards_registered.add(wildcard)
+
+# ============================================================
+# CLEANUP & TIMEOUT MANAGEMENT
+# ============================================================
+
+ @staticmethod
+ def _global_cleanup_worker():
+ r"""!Static background thread that ticks all active module instances.
+
+ @param None
+ @return None"""
+ logging.info("Global multicast cleanup ticker active")
+ while BoswatchModule._running:
+ time.sleep(1)
+ with BoswatchModule._lock:
+ active_instances = BoswatchModule._instances[:]
+ for instance in active_instances:
+ try:
+ instance._perform_instance_tick()
+ except Exception as e:
+ logging.error("Error in instance cleanup: %s", e)
+ if int(time.time()) % 60 == 0:
+ BoswatchModule._cleanup_hard_timeout_global()
+
+ def _perform_instance_tick(self):
+ r"""!Tick-entry point for this specific instance.
+
+ @param None
+ @return None"""
+ self._check_all_my_frequencies()
+
+ def _check_all_my_frequencies(self):
+ r"""!Monitor timeouts for all frequencies assigned to this instance.
+
+ @param None
+ @return None"""
+ incomplete_packets = []
+ trigger_data = []
+
+ with BoswatchModule._lock:
+ current_time = time.time()
+ for freq in list(self._my_frequencies):
+ if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
+ continue
+
+ if BoswatchModule._processing_text_ric.get(freq, False):
+ flag_age = current_time - BoswatchModule._processing_text_ric_started.get(freq, current_time)
+ if flag_age > 2:
+ BoswatchModule._processing_text_ric[freq] = False
+ BoswatchModule._processing_text_ric_started.pop(freq, None)
+ else:
+ continue
+
+ last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
+ if current_time - last_time > self._auto_clear_timeout:
+ recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
+ safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC)
+ trigger_data.append((freq, safe_ric))
+ BoswatchModule._tone_ric_packets[freq].clear()
+ BoswatchModule._last_tone_ric_time.pop(freq, None)
+
+ logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout)
+ packets = self._create_incomplete_multicast(freq, recipient_dicts)
+ if packets:
+ incomplete_packets.extend(packets)
+
+ if incomplete_packets:
+ with BoswatchModule._queue_lock:
+ BoswatchModule._packet_queue.extend(incomplete_packets)
+ for freq, safe_ric in trigger_data:
+ self._send_wakeup_trigger(freq, safe_ric)
+
+ def _check_instance_auto_clear(self, freq):
+ r"""!Check if frequency has exceeded timeout (called from doWork).
+
+ @param freq: Frequency identifier
+ @return list: Incomplete packets if timeout exceeded, else None"""
+ with BoswatchModule._lock:
+ if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
+ return None
+ last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
+ if time.time() - last_time > self._auto_clear_timeout:
+ recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
+ BoswatchModule._tone_ric_packets[freq].clear()
+ BoswatchModule._last_tone_ric_time.pop(freq, None)
+ logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts))
+ return self._create_incomplete_multicast(freq, recipient_dicts)
+ return None
+
+ @staticmethod
+ def _cleanup_hard_timeout_global():
+ r"""!Global failsafe for really old packets (ignores instance config).
+
+ @param None
+ @return None"""
+ with BoswatchModule._lock:
+ current_time = time.time()
+ max_hard_timeout = 120
+ if BoswatchModule._instances:
+ max_hard_timeout = max(inst._hard_timeout for inst in BoswatchModule._instances)
+ for freq in list(BoswatchModule._tone_ric_packets.keys()):
+ BoswatchModule._tone_ric_packets[freq] = [
+ p for p in BoswatchModule._tone_ric_packets[freq]
+ if current_time - p.get('_multicast_timestamp', 0) < max_hard_timeout
+ ]
+ if not BoswatchModule._tone_ric_packets[freq]:
+ del BoswatchModule._tone_ric_packets[freq]
+
+# ============================================================
+# TRIGGER SYSTEM
+# ============================================================
+
+ def _send_wakeup_trigger(self, freq, fallback_ric):
+ r"""!Send a loopback trigger via socket to wake up the system.
+
+ @param freq: Frequency identifier
+ @param fallback_ric: RIC to use if no explicit trigger RIC is configured
+ @return None"""
+ try:
+ trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric
+ payload = {
+ "timestamp": time.time(),
+ "mode": "pocsag",
+ "bitrate": "1200",
+ "ric": trigger_ric,
+ "subric": "1",
+ "subricText": "a",
+ "message": BoswatchModule._MAGIC_WAKEUP_MSG,
+ "clientName": "MulticastTrigger",
+ "inputSource": "loopback",
+ "frequency": freq
+ }
+ json_str = json.dumps(payload)
+ header = f"{len(json_str):<10}"
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ sock.settimeout(1.0)
+ sock.connect((self._trigger_host, self._trigger_port))
+ sock.sendall(header.encode('utf-8'))
+ sock.sendall(json_str.encode('utf-8'))
+ logging.debug("[%s] Wakeup trigger sent for freq %s (RIC=%s)", self.name, freq, trigger_ric)
+ except Exception as e:
+ logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e)
+
+# ============================================================
+# LIFECYCLE (End)
+# ============================================================
+
+ def onUnload(self):
+ r"""!Unregister instance from the global cleanup process.
+
+ @param None
+ @return None"""
+ with BoswatchModule._lock:
+ if self in BoswatchModule._instances:
+ BoswatchModule._instances.remove(self)
+ logging.debug("[%s] Multicast instance unloaded", self.name)
diff --git a/plugin/pluginBase.py b/plugin/pluginBase.py
index d0cc5b0..28b90e7 100644
--- a/plugin/pluginBase.py
+++ b/plugin/pluginBase.py
@@ -65,6 +65,15 @@ class PluginBase(ABC):
The alarm() method serves the BOSWatch packet to the plugin.
@param bwPacket: A BOSWatch packet instance"""
+
+ # --- FIX: Multicast list support ---
+ if isinstance(bwPacket, list):
+ # if we got a list of packets, we have to run each packet through the complete alarm process (Setup -> Alarm -> Teardown)
+ for single_packet in bwPacket:
+ self._run(single_packet)
+ return None
+ # ---------------------------------------------------------------------
+
self._runCount += 1
logging.debug("[%s] run #%d", self._pluginName, self._runCount)
From e53cb8cff2ed64d58ba8f4594c7fce1732436f92 Mon Sep 17 00:00:00 2001
From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com>
Date: Thu, 26 Feb 2026 09:10:36 +0100
Subject: [PATCH 2/5] [feat/multicast] adding Config-Editor
---
docu/docs/config-editor.html | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/docu/docs/config-editor.html b/docu/docs/config-editor.html
index aa6db3c..b50a0c3 100644
--- a/docu/docs/config-editor.html
+++ b/docu/docs/config-editor.html
@@ -569,6 +569,21 @@ router:
]
},
+ "multicast": {
+ kind: "module",
+ title: "Multicast",
+ creator: "BW3 Dev Team",
+ fields: [
+ { key: "autoClearTimeout", type: "number", label: "autoClearTimeout", default: 10, required: false },
+ { key: "delimiterRics", type: "text", label: "delimiterRics", required: false },
+ { key: "textRics", type: "text", label: "textRics", required: false },
+ { key: "netIdentRics", type: "text", label: "netIdentRics", required: false },
+ { key: "triggerRic", type: "text", label: "triggerRic", required: false },
+ { key: "triggerHost", type: "text", label: "triggerHost", default: "127.0.0.1", required: false },
+ { key: "triggerPort", type: "number", label: "triggerPort", default: 8080, required: false },
+ ]
+ },
+
// Plugins
"http": {
kind: "plugin",
From 7dabc786f61c76e1f9d9f82f75b3576b42cfa170 Mon Sep 17 00:00:00 2001
From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com>
Date: Thu, 5 Mar 2026 13:10:31 +0100
Subject: [PATCH 3/5] [feat/multicast] correction of wakeup_trigger sending
method to official architecture
---
module/multicast.py | 30 +++++++++++++++++-------------
1 file changed, 17 insertions(+), 13 deletions(-)
diff --git a/module/multicast.py b/module/multicast.py
index d8ae05e..51e672b 100644
--- a/module/multicast.py
+++ b/module/multicast.py
@@ -18,12 +18,12 @@ r"""!
import logging
import time
import threading
-import socket
import json
import datetime
from collections import defaultdict
from module.moduleBase import ModuleBase
from boswatch.packet import Packet
+from boswatch.network.client import TCPClient
logging.debug("- %s loaded", __name__)
@@ -593,11 +593,7 @@ class BoswatchModule(ModuleBase):
# ============================================================
def _send_wakeup_trigger(self, freq, fallback_ric):
- r"""!Send a loopback trigger via socket to wake up the system.
-
- @param freq: Frequency identifier
- @param fallback_ric: RIC to use if no explicit trigger RIC is configured
- @return None"""
+ r"""!Send a loopback trigger using the standard TCPClient class."""
try:
trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric
payload = {
@@ -613,13 +609,21 @@ class BoswatchModule(ModuleBase):
"frequency": freq
}
json_str = json.dumps(payload)
- header = f"{len(json_str):<10}"
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
- sock.settimeout(1.0)
- sock.connect((self._trigger_host, self._trigger_port))
- sock.sendall(header.encode('utf-8'))
- sock.sendall(json_str.encode('utf-8'))
- logging.debug("[%s] Wakeup trigger sent for freq %s (RIC=%s)", self.name, freq, trigger_ric)
+
+ # using BOSWatch-Architecture
+ client = TCPClient(timeout=2)
+ if client.connect(self._trigger_host, self._trigger_port):
+ # 1. Send
+ client.transmit(json_str)
+
+ # 2. Recieve (getting [ack] and prevents connection reset)
+ client.receive(timeout=1)
+
+ client.disconnect()
+ logging.debug("[%s] Wakeup trigger sent and acknowledged (RIC=%s)", self.name, trigger_ric)
+ else:
+ logging.error("[%s] Could not connect to local server for wakeup", self.name)
+
except Exception as e:
logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e)
From 1ebcbf23e9d30edb8f5884d1098f6d40309b428e Mon Sep 17 00:00:00 2001
From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com>
Date: Thu, 5 Mar 2026 21:39:27 +0100
Subject: [PATCH 4/5] [feat/multicast] Code quality fixes and cleanup
- Remove redundant list-handling block in doWork() - already handled by moduleBase._run()
- Fix bare except to except (ValueError, TypeError) in _copy_packet_dict_to_packet()
- Replace f-string logging with lazy %-style in _enrich_normal_alarm() and _set_mcast_metadata()
- Remove unused _trigger_ric_mode variable
- Simplify instance name from dynamic MCAST_{id} to static "Multicast" (no debug value)
- Update doWork() docstring to reflect single-packet-only parameter
- Add extension hook comment to _perform_instance_tick()
---
module/multicast.py | 29 ++++++++---------------------
1 file changed, 8 insertions(+), 21 deletions(-)
diff --git a/module/multicast.py b/module/multicast.py
index 51e672b..55f8665 100644
--- a/module/multicast.py
+++ b/module/multicast.py
@@ -70,8 +70,7 @@ class BoswatchModule(ModuleBase):
@param None
@return None"""
self._my_frequencies = set()
- self.instance_id = hex(id(self))[-4:]
- self.name = f"MCAST_{self.instance_id}"
+ self.name = "Multicast"
self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10))
self._hard_timeout = self._auto_clear_timeout * 3
@@ -89,10 +88,8 @@ class BoswatchModule(ModuleBase):
trigger_ric_cfg = self.config.get("triggerRic")
if trigger_ric_cfg:
self._trigger_ric = str(trigger_ric_cfg).strip()
- self._trigger_ric_mode = "explicit"
else:
self._trigger_ric = None
- self._trigger_ric_mode = "dynamic"
self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
@@ -121,21 +118,8 @@ class BoswatchModule(ModuleBase):
def doWork(self, bwPacket):
r"""!Process an incoming packet and handle multicast logic.
- @param bwPacket: A BOSWatch packet instance or list of packets
+ @param bwPacket: A BOSWatch packet instance
@return bwPacket, a list of packets, or False if blocked"""
- if isinstance(bwPacket, list):
- result_packets = []
- for single_packet in bwPacket:
- processed = self.doWork(single_packet)
- if processed is None:
- result_packets.append(single_packet)
- elif processed is not False:
- if isinstance(processed, list):
- result_packets.extend(processed)
- else:
- result_packets.append(processed)
- return result_packets if result_packets else None
-
packet_dict = self._get_packet_data(bwPacket)
msg = packet_dict.get("message")
ric = packet_dict.get("ric")
@@ -341,7 +325,7 @@ class BoswatchModule(ModuleBase):
dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S')
dt_shifted = dt + datetime.timedelta(milliseconds=index - 1)
packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S'))
- except:
+ except (ValueError, TypeError):
packet.set(k, str(v))
else:
packet.set(k, str(v))
@@ -403,7 +387,7 @@ class BoswatchModule(ModuleBase):
self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1)
self._apply_list_tags(bwPacket, [packet_dict])
self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1")
- logging.debug(f"Erstelle Single-Alarm für RIC {packet_dict.get('ric')}")
+ logging.debug("Erstelle Single-Alarm für RIC %s", packet_dict.get('ric'))
return [bwPacket]
def _handle_delimiter(self, freq, ric, bwPacket=None):
@@ -443,7 +427,7 @@ class BoswatchModule(ModuleBase):
@param count: Total number of recipients
@param index: Current recipient index
@return None"""
- logging.debug(f"Setze Metadata - Mode: {mode}, Role: {role} für RIC: {source}")
+ logging.debug("Setze Metadata - Mode: %s, Role: %s für RIC: %s", mode, role, source)
mapping = {
"multicastMode": (mode, "{MCAST_MODE}"),
"multicastRole": (role, "{MCAST_ROLE}"),
@@ -507,6 +491,9 @@ class BoswatchModule(ModuleBase):
def _perform_instance_tick(self):
r"""!Tick-entry point for this specific instance.
+ Acts as an extension hook for future per-instance tick logic
+ (e.g. statistics, heartbeat, watchdog). Do not call directly.
+
@param None
@return None"""
self._check_all_my_frequencies()
From 08d09b4f50a91a696427e026149159e930bd6485 Mon Sep 17 00:00:00 2001
From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com>
Date: Sat, 28 Mar 2026 15:41:54 +0100
Subject: [PATCH 5/5] [feat/multicast] refactor: move packet filtering from
module to downstream
Remove internal filtering of delimiter and netident packets from the
multicast module. All packets are now passed through with multicastRole
metadata set, allowing downstream filters (e.g. filter.regexFilter) to
handle filtering as needed.
Tone-RICs remain internally consumed as they carry no alarm-relevant
information outside the module.
Update documentation to reflect new behavior and add regexFilter
example for filtering by multicastRole.
---
docu/docs/modul/multicast.md | 47 +++++++++++-----
module/multicast.py | 100 ++++++++++++++---------------------
2 files changed, 74 insertions(+), 73 deletions(-)
diff --git a/docu/docs/modul/multicast.md b/docu/docs/modul/multicast.md
index def5284..151c6e3 100644
--- a/docu/docs/modul/multicast.md
+++ b/docu/docs/modul/multicast.md
@@ -31,14 +31,14 @@ Das Modul unterstützt:
- Mehrere Startmarker (Delimiter)
- Mehrere Text-RICs
-- Netzident-RIC zur Paketfilterung
+- Netzident-RIC zur Paketmarkierung
- Automatische Bereinigung alter Tone-RICs (Fehlerfall: Auto-Clear)
- Active Trigger System zur verlustfreien Paketauslieferung
- Wildcards für spätere Weiterverarbeitung
- Frequenz-basierte Trennung
- Multi-Instanz-Betrieb mit geteiltem Zustand
-Hinweis: Der Delimiter-RIC (0123456) wird automatisch gefiltert und nicht ausgegeben.
+Hinweis: Der Delimiter-RIC (0123456) wird mit multicastRole: delimiter markiert und durchgereicht. Downstream-Filter (z.B. filter.regexFilter) können ihn bei Bedarf ausfiltern.
**Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Falls keine Tone-RICs vorhanden sind, wird die Text-RIC als `multicastMode: single` ausgegeben.
@@ -53,9 +53,9 @@ Hinweis: Der Delimiter-RIC (0123456) wird automatisch gefiltert und nicht ausgeg
|Feld|Beschreibung|Default|
|----|------------|-------|
|autoClearTimeout|Auto-Clear Timeout in Sekunden - Nicht zugestellte Empfänger werden nach dieser Zeit als incomplete ausgegeben|10|
-|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden automatisch gefiltert wenn konfiguriert)|leer|
+|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden mit multicastRole: delimiter markiert)|leer|
|textRics|Komma-getrennte Liste von RICs, die den Alarmtext tragen|leer|
-|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden automatisch gefiltert wenn konfiguriert)|leer|
+|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden mit multicastRole: netident markiert)|leer|
|triggerRic|RIC für das Wakeup-Trigger-Paket (optional, bei leer: dynamisch = erste Tone-RIC)|leer|
|triggerHost|IP-Adresse für Loopback-Trigger|127.0.0.1|
|triggerPort|Port für Loopback-Trigger|8080|
@@ -110,7 +110,7 @@ Verwendet eine feste RIC (9999999) für das interne Wakeup-Trigger-Paket.
textRics: '0299001,0310001'
netIdentRics: '0000001'
```
-Filtert Netzident-Pakete (RIC 0000001) automatisch aus der Weiterverarbeitung.
+Markiert Netzident-Pakete (RIC 0000001) mit multicastRole: netident. Downstream-Filter können sie gezielt ausfiltern (z.B. RegEx-Filter).
## Integration in Router-Konfiguration
@@ -241,10 +241,10 @@ router:
- `multicastRole` (string): Beschreibt die Rolle dieses Pakets innerhalb des Multicast-Ablaufs, besitzt einen der Werte:
- - `delimiter`: Startmarker-Paket (wird automatisch gefiltert wenn delimiterRics konfiguriert)
+ - `delimiter`: Startmarker-Paket
- `recipient`: tatsächlicher Empfänger
- `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC)
- - `netident`: Netzwerk-Identifikations-Paket (wird automatisch gefiltert wenn netIdentRics konfiguriert)
+ - `netident`: Netzwerk-Identifikations-Paket
- `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs
- `multicastRecipientCount` (string): Anzahl der Empfänger insgesamt
@@ -262,7 +262,7 @@ router:
- `message`: Bei incomplete-Modus leer, sonst Text von Text-RIC
### Rückgabewerte:
-- **False**: Paket wurde blockiert (z.B. Delimiter/Netident-Filterung), Router stoppt Verarbeitung
+- **False**: Paket wurde intern konsumiert (z.B. Tone-RIC wurde in den Buffer aufgenommen), Router stoppt Verarbeitung für dieses Paket
- **Liste von Paketen**: Multicast-Verteilung, Router verarbeitet jedes Paket einzeln
- **None**: Normaler Alarm, Router fährt mit unveränderten Paket fort
@@ -378,11 +378,34 @@ Ausgegebene Multicast-Pakete:
→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!)
```
-### Automatische Filterung
+### Paketmarkierung statt interner Filterung
-- **Delimiter-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `delimiterRics` konfiguriert ist
-- **Netzident-Pakete**: Werden automatisch gefiltert (nicht weitergegeben), wenn `netIdentRics` konfiguriert ist
-- **Filterung nach multicastRole**: Ermöglicht saubere nachgelagerte Verarbeitung ohne manuelle Filter
+Das Modul filtert keine inhaltlich relevanten Pakete.
+Alle Pakete mit Alarminhalt werden mit `multicastRole` markiert und
+weitergereicht. Die Filterung nach Bedarf erfolgt nachgelagert,
+z.B. mit `filter.regexFilter`.
+
+Eine Ausnahme bilden **Tone-RICs** (leere Nachrichten): Diese werden
+intern im Buffer gesammelt und bei einem complete-Alarm in die
+Listenfelder aggregiert. Sie erscheinen nie als eigenständige Pakete
+im Router.
+
+Pakete und ihre Rollen:
+- **Delimiter-Pakete**: Erhalten `multicastRole: delimiter`
+- **Netzident-Pakete**: Erhalten `multicastRole: netident`
+- **Empfänger-Pakete**: Erhalten `multicastRole: recipient`
+- **Einzelalarme**: Erhalten `multicastRole: single`
+
+Beispiel-Filter um Delimiter und Netident auszublenden:
+```yaml
+- type: module
+ res: filter.regexFilter
+ config:
+ - name: "Nur echte Alarme"
+ checks:
+ - field: multicastRole
+ regex: ^(recipient|single)$
+```
### Multi-Instanz-Betrieb
diff --git a/module/multicast.py b/module/multicast.py
index 55f8665..4cecd54 100644
--- a/module/multicast.py
+++ b/module/multicast.py
@@ -10,7 +10,7 @@ r"""!
by Bastian Schroll
@file: multicast.py
-@date: 26.01.2025
+@date: 28.03.2026
@author: Claus Schichl
@description: multicast module
"""
@@ -70,7 +70,8 @@ class BoswatchModule(ModuleBase):
@param None
@return None"""
self._my_frequencies = set()
- self.name = "Multicast"
+ self.instance_id = hex(id(self))[-4:]
+ self.name = f"MCAST_{self.instance_id}"
self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10))
self._hard_timeout = self._auto_clear_timeout * 3
@@ -94,9 +95,6 @@ class BoswatchModule(ModuleBase):
self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
- self._block_delimiter = bool(self._delimiter_rics)
- self._block_netident = bool(self._netident_rics)
-
logging.info("[%s] Multicast module loaded", self.name)
with BoswatchModule._lock:
@@ -118,28 +116,44 @@ class BoswatchModule(ModuleBase):
def doWork(self, bwPacket):
r"""!Process an incoming packet and handle multicast logic.
- @param bwPacket: A BOSWatch packet instance
- @return bwPacket, a list of packets, or False if blocked"""
+ Enriches packets with multicast metadata (mode, role, source).
+ Does NOT filter - all packets pass through, downstream modules handle filtering.
+
+ @param bwPacket: A BOSWatch packet instance or list of packets
+ @return bwPacket, a list of packets, or None if no processing"""
+ if isinstance(bwPacket, list):
+ result_packets = []
+ for single_packet in bwPacket:
+ processed = self.doWork(single_packet)
+ if processed is not None and processed is not False:
+ if isinstance(processed, list):
+ result_packets.extend(processed)
+ else:
+ result_packets.append(processed)
+ return result_packets if result_packets else None
+
packet_dict = self._get_packet_data(bwPacket)
msg = packet_dict.get("message")
ric = packet_dict.get("ric")
freq = packet_dict.get("frequency", "default")
mode = packet_dict.get("mode")
+ # Handle wakeup triggers
if msg == BoswatchModule._MAGIC_WAKEUP_MSG:
if self._trigger_ric and ric != self._trigger_ric:
- pass
- else:
- logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric)
- queued = self._get_queued_packets()
- return queued if queued else False
+ return None
+ logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric)
+ queued = self._get_queued_packets()
+ return queued if queued else None
+ # Only process POCSAG
if mode != "pocsag":
queued = self._get_queued_packets()
return queued if queued else None
self._my_frequencies.add(freq)
+ # Determine if this is a text-RIC
is_text_ric = False
if self._text_rics:
is_text_ric = ric in self._text_rics and msg and msg.strip()
@@ -155,21 +169,23 @@ class BoswatchModule(ModuleBase):
queued_packets = self._get_queued_packets()
incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq)
+ # === CONTROL PACKETS (netident, delimiter) ===
+ # Mark and pass through - no filtering!
+
if self._netident_rics and ric in self._netident_rics:
self._set_mcast_metadata(bwPacket, "control", "netident", ric)
- result = self._combine_results(incomplete_packets, queued_packets, [bwPacket])
- return self._filter_output(result)
+ return self._combine_results(incomplete_packets, queued_packets, [bwPacket])
if self._delimiter_rics and ric in self._delimiter_rics:
delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket)
- result = self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets)
- return self._filter_output(result)
+ return self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets)
+ # === TONE-RICs (no message) ===
if not msg or not msg.strip():
self._add_tone_ric_packet(freq, packet_dict)
- result = self._combine_results(incomplete_packets, queued_packets, False)
- return self._filter_output(result)
+ return self._combine_results(incomplete_packets, queued_packets, False)
+ # === TEXT-RICs (with message) ===
if is_text_ric and msg:
logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric)
alarm_packets = self._distribute_complete(freq, packet_dict)
@@ -180,18 +196,16 @@ class BoswatchModule(ModuleBase):
if not alarm_packets:
logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric)
normal = self._enrich_normal_alarm(bwPacket, packet_dict)
- result = self._combine_results(normal, incomplete_packets, queued_packets)
+ return self._combine_results(normal, incomplete_packets, queued_packets)
else:
- result = self._combine_results(alarm_packets, incomplete_packets, queued_packets)
- return self._filter_output(result)
+ return self._combine_results(alarm_packets, incomplete_packets, queued_packets)
+ # === SINGLE ALARM (message but no text-RICs configured) ===
if msg:
normal = self._enrich_normal_alarm(bwPacket, packet_dict)
- result = self._combine_results(normal, incomplete_packets, queued_packets)
- return self._filter_output(result)
+ return self._combine_results(normal, incomplete_packets, queued_packets)
- result = self._combine_results(incomplete_packets, queued_packets)
- return self._filter_output(result)
+ return self._combine_results(incomplete_packets, queued_packets)
# ============================================================
# PACKET PROCESSING HELPERS (called by doWork)
@@ -220,31 +234,6 @@ class BoswatchModule(ModuleBase):
logging.warning("[%s] Error: %s", self.name, e)
return {}
- def _filter_output(self, result):
- r"""!Apply multicastRole filtering before output.
-
- @param result: Single packet, list of packets, None or False
- @return Final packet(s) or False if blocked"""
- if result is None or result is False:
- return result
-
- def get_role(packet):
- """Helper to extract multicastRole from Packet object"""
- packet_dict = self._get_packet_data(packet)
- return packet_dict.get("multicastRole")
-
- if isinstance(result, list):
- filtered = [p for p in result if self._should_output_packet(get_role(p))]
- if not filtered:
- logging.debug("All packets filtered out by multicastRole")
- return False
- return filtered if len(filtered) > 1 else filtered[0]
- else:
- if self._should_output_packet(get_role(result)):
- return result
- logging.debug("Packet filtered out: multicastRole=%s", get_role(result))
- return False
-
def _combine_results(self, *results):
r"""!Combine multiple result sources into a single list or status.
@@ -266,17 +255,6 @@ class BoswatchModule(ModuleBase):
return combined
return False if has_false else None
- def _should_output_packet(self, multicast_role):
- r"""!Check if packet should be output based on role.
-
- @param multicast_role: The role string to check
- @return bool: True if allowed"""
- if self._block_delimiter and multicast_role == "delimiter":
- return False
- if self._block_netident and multicast_role == "netident":
- return False
- return True
-
# ============================================================
# TONE-RIC BUFFER MANAGEMENT
# ============================================================