This commit is contained in:
KoenigMjr 2026-03-28 14:42:39 +00:00 committed by GitHub
commit 8877e490ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1076 additions and 1 deletions

View file

@ -569,6 +569,21 @@ router:
]
},
"multicast": {
kind: "module",
title: "Multicast",
creator: "BW3 Dev Team",
fields: [
{ key: "autoClearTimeout", type: "number", label: "autoClearTimeout", default: 10, required: false },
{ key: "delimiterRics", type: "text", label: "delimiterRics", required: false },
{ key: "textRics", type: "text", label: "textRics", required: false },
{ key: "netIdentRics", type: "text", label: "netIdentRics", required: false },
{ key: "triggerRic", type: "text", label: "triggerRic", required: false },
{ key: "triggerHost", type: "text", label: "triggerHost", default: "127.0.0.1", required: false },
{ key: "triggerPort", type: "number", label: "triggerPort", default: 8080, required: false },
]
},
// Plugins
"http": {
kind: "plugin",

View file

@ -0,0 +1,417 @@
# <center>Multicast</center>
---
## Beschreibung
Mit diesem Modul können Multicast-Alarme verarbeitet werden. Dabei wird eine Alarmnachricht automatisch an mehrere Empfänger (RICs) verteilt.
### Funktionsweise
Multicast-Alarme funktionieren in zwei bis drei Phasen:
1. **Delimiter-Phase (Optional)**: Ein spezieller Delimiter-RIC markiert den Start eines neuen Multicast-Blocks und löscht vorherige wartende Tone-RICs. Der Delimiter selbst wird nicht als Alarm ausgegeben (automatische Filterung). Diese Phase ist optional - ohne Delimiter werden alle leeren Nachrichten als Tone-RICs behandelt.
2. **Tone-RIC-Phase**: Mehrere RICs empfangen (meist) leere Nachrichten. Diese definieren die Empfänger und "registrieren" sich im Modul als Empfänger für die nächste Multicast-Nachricht.
3. **Text-RIC**: Ein spezieller Message-RIC empfängt die eigentliche Alarmnachricht. Diese wird dann automatisch an alle zuvor gesammelten Tone-RICs verteilt.
**Beispiel:**
```
10:31:16 - RIC: 0123456 SubRIC: 1 Message: (leer) → Delimiter-RIC
10:31:16 - RIC: 0234567 SubRIC: 4 Message: (leer) → Empfänger 1
10:31:16 - RIC: 0345678 SubRIC: 3 Message: (leer) → Empfänger 2
10:31:17 - RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" → Message-RIC
Generierte Alarme:
→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS"
→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS"
```
**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC, da diese oft unterschiedliche Alarmtypen oder Prioritäten repräsentiert.
Das Modul unterstützt:
- Mehrere Startmarker (Delimiter)
- Mehrere Text-RICs
- Netzident-RIC zur Paketmarkierung
- Automatische Bereinigung alter Tone-RICs (Fehlerfall: Auto-Clear)
- Active Trigger System zur verlustfreien Paketauslieferung
- Wildcards für spätere Weiterverarbeitung
- Frequenz-basierte Trennung
- Multi-Instanz-Betrieb mit geteiltem Zustand
Hinweis: Der Delimiter-RIC (0123456) wird mit multicastRole: delimiter markiert und durchgereicht. Downstream-Filter (z.B. filter.regexFilter) können ihn bei Bedarf ausfiltern.
**Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Falls keine Tone-RICs vorhanden sind, wird die Text-RIC als `multicastMode: single` ausgegeben.
## Unterstützte Alarmtypen
- POCSAG
## Resource
`multicast`
## Konfiguration
|Feld|Beschreibung|Default|
|----|------------|-------|
|autoClearTimeout|Auto-Clear Timeout in Sekunden - Nicht zugestellte Empfänger werden nach dieser Zeit als incomplete ausgegeben|10|
|delimiterRics|Komma-getrennte Liste von Startmarkern, die einen Multicast-Block beginnen (leert sofort vorherige Empfänger und werden mit multicastRole: delimiter markiert)|leer|
|textRics|Komma-getrennte Liste von RICs, die den Alarmtext tragen|leer|
|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (werden mit multicastRole: netident markiert)|leer|
|triggerRic|RIC für das Wakeup-Trigger-Paket (optional, bei leer: dynamisch = erste Tone-RIC)|leer|
|triggerHost|IP-Adresse für Loopback-Trigger|127.0.0.1|
|triggerPort|Port für Loopback-Trigger|8080|
**Achtung:** Zahlen welche führende Nullen enthalten müssen in Anführungszeichen gesetzt werden, z.B. `'0012345'`
### Konfigurationsbeispiel 1: Automatische Delimiter-Erkennung (oder nicht verfügbar im Netzwerk) (= Minimalkonfiguration)
```yaml
- type: module
res: multicast
name: Multicast Handler
config:
textRics: '0299001,0310001'
```
In diesem Modus werden **alle leeren Nachrichten** als toneRics behandelt (keine `delimiterRics` angegeben).
### Konfigurationsbeispiel 2: Mit Delimiter-Trenner (empfohlen)
```yaml
- type: module
res: multicast
name: Multicast Handler
config:
autoClearTimeout: 10
delimiterRics: '0988988'
textRics: '0299001,0310001'
```
In diesem Modus wird **0988988 als Trenner (= Delimiter)** behandelt und **alle anderen leeren Nachrichten als Empfänger**.
### Konfigurationsbeispiel 3: Mit expliziter Trigger-RIC
```yaml
- type: module
res: multicast
name: Multicast Handler
config:
autoClearTimeout: 10
delimiterRics: '0988988'
textRics: '0299001,0310001'
triggerRic: '9999999'
triggerHost: '127.0.0.1'
triggerPort: 8080
```
Verwendet eine feste RIC (9999999) für das interne Wakeup-Trigger-Paket.
### Konfigurationsbeispiel 4: Mit Netzident-Filterung
```yaml
- type: module
res: multicast
name: Multicast Handler
config:
autoClearTimeout: 10
delimiterRics: '0988988'
textRics: '0299001,0310001'
netIdentRics: '0000001'
```
Markiert Netzident-Pakete (RIC 0000001) mit multicastRole: netident. Downstream-Filter können sie gezielt ausfiltern (z.B. RegEx-Filter).
## Integration in Router-Konfiguration
Das Multicast-Modul sollte **vor** den Plugins platziert werden, damit die generierten Alarme korrekt verarbeitet werden:
```yaml
- name: Router POCSAG
route:
- type: module
res: filter.modeFilter
name: Filter POCSAG
config:
allowed:
- pocsag
# Multicast-Modul hier einfügen
- type: module
res: multicast
name: Multicast Handler
config:
textRics: '0299001,0310001'
delimiterRics: '0288088'
autoClearTimeout: 10
# Weitere Module und Plugins
- type: plugin
res: mysql
config:
# ...
- type: plugin
res: telegram
config:
# ...
```
## Beispielhafte Verwendung in Router-Konfigurationen
Das Multicast-Modul gibt für jedes RIC ein eigenes Paket aus UND generiert für konsistente Verarbeitung Listenfelder.
Dies ermöglicht es, entweder jede RIC einzeln zu verarbeiten oder die Listenfelder für eine gesammelte Ausgabe zu verwenden. Vor der weiteren Verarbeitung in Plugins empfiehlt sich eventuell eine Filterung mittels [RegEx-Filter](regex_filter.md).
Die folgenden Beispiele dienen zur Veranschaulichung der Möglichkeiten des Multicast-Modul in Verbindung mit RegEx-Filter.
### Beispiel (Zusätzliche Wildcards werden noch später in diesem Readme erklärt):
```yaml
router:
- name: Router POCSAG
route:
- type: module
res: filter.modeFilter
config:
[...]
- type: module
res: filter.doubleFilter
config:
[...]
- type: module
res: descriptor
config:
[...]
- type: module
res: multicast
name: Multicast
config:
autoClearTimeout: 10
delimiterRics: '0123456' # Start eines Multicast-Alarms
textRics: '9909909' # Text-RIC
- type: router
res: RouterMySQL
- type: router
res: RouterTelegram
- name: RouterMySQL
route:
- type: module
res: filter.regexFilter
name: Filter MySQL
config:
- name: "Multicast Mode complete or single"
checks:
- field: multicastMode
regex: ^(complete|single)$
- type: plugin
res: mysql
config:
[...]
- name: RouterTelegram
route:
- type: module
res: filter.regexFilter
name: Multicast Recipient Index Filter # 1. Paket, da ist alles drin für einen kombinierten Alarm und ist immer vorhanden
config:
- name: "Multicast 1 Paket pro Alarm-Paket"
checks:
- field: multicastRecipientIndex
regex: ^1$
- type: plugin
res: telegram
config:
message_pocsag: |
<b>{CNAME}</b>
{MSG}
Alarmierte Einheiten [{MCAST_COUNT}]: {DESCRIPTION_LIST}
RICs: {RIC_LIST}
{TIME}
[...]
```
---
## Modul Abhängigkeiten
- keine
---
## Externe Abhängigkeiten
- keine
---
## Paket Modifikationen
### Hinzugefügte Felder bei Multicast-Alarmen:
- `multicastMode`(string): Beschreibt das Ergebnis der Multicast-Verarbeitung, besitzt einen der Werte:
- `complete`: Vollständiges Multicast-Packet
- `incomplete`: Unvollständiges Multicast-Packet (meist fehlt die Text-RIC)
- `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC)
- `control`: Netzwerk-Ident-RIC oder andere Verwaltung-RICs
- `multicastRole` (string): Beschreibt die Rolle dieses Pakets innerhalb des Multicast-Ablaufs, besitzt einen der Werte:
- `delimiter`: Startmarker-Paket
- `recipient`: tatsächlicher Empfänger
- `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC)
- `netident`: Netzwerk-Identifikations-Paket
- `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs
- `multicastRecipientCount` (string): Anzahl der Empfänger insgesamt
- `multicastRecipientIndex` (string): Index dieses Empfängers (1-N), folgende Logik:
- Empfänger haben den Index 1 bis n.
- Delimiter/Singles haben Index 1 (da sie alleinstehen).
- `<FELD>_list` (string): Liste von Werten aus allen Empfänger-RICs für jedes Originalfeld (z.B. ric_list, message_list)
### Veränderte Felder bei Multicast-Alarmen:
- `ric`: Wird durch Empfänger-RIC ersetzt
- `subric`: Wird durch Empfänger-SubRIC ersetzt
- `subricText`: Wird durch Empfänger-SubRIC-Text ersetzt
- `message`: Bei incomplete-Modus leer, sonst Text von Text-RIC
### Rückgabewerte:
- **False**: Paket wurde intern konsumiert (z.B. Tone-RIC wurde in den Buffer aufgenommen), Router stoppt Verarbeitung für dieses Paket
- **Liste von Paketen**: Multicast-Verteilung, Router verarbeitet jedes Paket einzeln
- **None**: Normaler Alarm, Router fährt mit unveränderten Paket fort
---
## Zusätzliche Wildcards
Folgende Wildcards stehen in allen nachfolgenden Plugins zur Verfügung:
|Wildcard|Beschreibung|Beispiel|
|--------|------------|--------|
|{MCAST_SOURCE}|RIC des ursprünglichen Message-RICs|0299001|
|{MCAST_COUNT}|Gesamtanzahl der Empfänger dieses Multicasts.|3|
|{MCAST_INDEX}|Index des Empfängers (1-basiert für Recipients, 0 für Control-Pakete)|0, 1, 2, 3, ...|
|{MCAST_MODE}|Art der Multicast-Verarbeitung durch das Modul|complete, incomplete, single, control|
|{MCAST_ROLE}|Rolle des Pakets im Multicast-Ablauf|recipient, single, delimiter, netident|
## Erweiterung der Listen-Wildcards
Das Modul generiert Wildcards für alle gesammelten Felder (RICs, SubRICs, etc.) in Listenform. Diese sind besonders nützlich, um eine kombinierte Ausgabe (z.B. in Telegram) zu erstellen:
|Wildcard|Beschreibung|Zugrundeliegendes Feld|Beispiel|
|--------|------------|--------|--------|
|{RIC_LIST}|Liste aller RICs der Empfänger (durch Komma getrennt).|ric_list|"0299001, 0299002"|
|{SUBRIC_LIST}|Liste aller SubRICs der Empfänger|subric_list|"4, 3"|
|{DESCRIPTION_LIST}|Liste aller (deskriptiven) Namen der Empfänger.|description_list|"FF Musterstadt, BF Beispiel"|
|{<FELD>_LIST}|Liste der Werte für jedes Originalfeld aus dem Paket|<feld>_list|{FREQUENCY_LIST}, {BITRATE_LIST}|
**Wichtig:** Verwende die **originalen Feldnamen** (z.B. `frequency_list`), nicht die Wildcard-Namen (z.B. ~~`FREQ_list`~~).
### Verwendungsbeispiel in Plugins, z.B. Telegram-Plugin:
```yaml
- type: plugin
res: telegram
config:
message_pocsag: |
{CNAME}
{MSG}
RIC: {RIC} / SubRIC: {SRIC}
Multicast: {MCAST_INDEX}/{MCAST_COUNT} (Quelle: {MCAST_SOURCE})
{TIME}
```
---
## Funktionsweise im Detail
### Active Trigger System (Verlustfreie Paketauslieferung)
Das Modul verwendet ein aktives Trigger-System, um sicherzustellen, dass **keine Multicast-Pakete verloren gehen**:
1. **Deferred Delivery**: Bei einem Auto-Clear-Timeout werden die incomplete-Pakete nicht sofort ausgegeben, sondern in einer internen Queue gespeichert.
2. **Wakeup-Trigger**: Das Modul sendet ein spezielles Trigger-Paket via Loopback-Socket (Standard: 127.0.0.1:8080) zurück an den BOSWatch-Server.
3. **Queue-Flush**: Beim Empfang des Trigger-Pakets werden alle gespeicherten Pakete aus der Queue ausgegeben.
**Trigger-RIC Auswahl** (3-stufige Fallback-Kette):
- **Explizit**: Wenn `triggerRic` konfiguriert ist, wird diese RIC verwendet
- **Dynamisch**: Wenn nicht konfiguriert, wird die erste Tone-RIC der aktuellen Gruppe verwendet
- **Fallback**: Falls keine Tone-RICs vorhanden sind (sollte nicht vorkommen), wird "9999999" verwendet
**Beispiel-Ablauf bei Auto-Clear:**
```
10:31:16 - Tone-RIC empfangen (0234567)
10:31:16 - Tone-RIC empfangen (0345678)
10:31:26 - Auto-Clear-Timeout (10s) erreicht
→ Incomplete-Pakete in Queue gespeichert
→ Trigger-Paket via Loopback gesendet (RIC: 0234567)
10:31:26 - Trigger-Paket durch BOSWatch-Server verarbeitet
→ Queue-Flush: Incomplete-Pakete werden ausgegeben
```
**Vorteile:**
- Keine Pakete gehen verloren, auch bei hoher Systemlast
- Saubere Trennung von Verarbeitung und Ausgabe
- Ermöglicht zeitlich versetzte Ausgabe ohne Race Conditions
### Zeitbasierte Verarbeitung
1. **Tone-RIC-Sammlung**: Tone-RICs (meist leere Nachrichten) werden empfangen und gespeichert
2. **Auto-Clear**: Nach `autoClearTimeout` Sekunden ohne Text-RIC werden die Tone-RICs als incomplete ausgegeben (via Trigger-System)
3. **Text-RIC-Verteilung**: Sobald ein Text-RIC empfangen wird, erfolgt die sofortige Verteilung an alle gesammelten Tone-RICs
4. **Hard-Timeout-Cleanup**: Nach 3x `autoClearTimeout` (oder max. 120s) werden veraltete Pakete aus dem Speicher gelöscht (Failsafe)
### Frequenz-Trennung
Das Modul trennt Multicast-Listen nach Frequenzen. Dies verhindert Vermischung von Alarmen verschiedener Sender.
**Beispiel:**
```
Frequenz 173.050 MHz: Tone-RICs [0234567, 0345678]
Frequenz 173.075 MHz: Tone-RICs [0456789, 0567890]
→ Werden getrennt verarbeitet, keine Vermischung möglich
```
### SubRIC-Erhaltung
**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC aus der Tone-RIC-Phase. Dies ist entscheidend, da SubRICs unterschiedliche Bedeutungen haben können, z.B.:
- SubRIC 1 (a): Alarmierung
- SubRIC 2 (b): Information
- SubRIC 3 (c): Probealarm
- SubRIC 4 (d): Sirenenalarm
**Beispiel:**
```
Eingehende Tone-RICs:
- RIC: 0234567 SubRIC: 4 (Sirenenalarm)
- RIC: 0345678 SubRIC: 3 (Probealarm)
Text-RIC: RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS"
Ausgegebene Multicast-Pakete:
→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" (behält SubRIC 4!)
→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!)
```
### Paketmarkierung statt interner Filterung
Das Modul filtert keine inhaltlich relevanten Pakete.
Alle Pakete mit Alarminhalt werden mit `multicastRole` markiert und
weitergereicht. Die Filterung nach Bedarf erfolgt nachgelagert,
z.B. mit `filter.regexFilter`.
Eine Ausnahme bilden **Tone-RICs** (leere Nachrichten): Diese werden
intern im Buffer gesammelt und bei einem complete-Alarm in die
Listenfelder aggregiert. Sie erscheinen nie als eigenständige Pakete
im Router.
Pakete und ihre Rollen:
- **Delimiter-Pakete**: Erhalten `multicastRole: delimiter`
- **Netzident-Pakete**: Erhalten `multicastRole: netident`
- **Empfänger-Pakete**: Erhalten `multicastRole: recipient`
- **Einzelalarme**: Erhalten `multicastRole: single`
Beispiel-Filter um Delimiter und Netident auszublenden:
```yaml
- type: module
res: filter.regexFilter
config:
- name: "Nur echte Alarme"
checks:
- field: multicastRole
regex: ^(recipient|single)$
```
### Multi-Instanz-Betrieb
Das Modul unterstützt mehrere parallele Instanzen mit geteiltem Zustand:
- **Shared State**: Alle Instanzen teilen sich den Tone-RIC-Speicher (frequenz-basiert)
- **Instance-Specific Timeouts**: Jede Instanz kann eigene `autoClearTimeout`-Werte haben
- **Global Cleanup Thread**: Ein globaler Thread prüft alle Instanzen auf Timeouts
- **Thread-Safe**: Alle Operationen sind thread-sicher mit Locks geschützt

View file

@ -20,10 +20,11 @@ nav:
- Changelog: changelog.md
- Module:
- Descriptor: modul/descriptor.md
- Double Filter: modul/double_filter.md
- Geocoding: modul/geocoding.md
- Mode Filter: modul/mode_filter.md
- Multicast: modul/multicast.md
- Regex Filter: modul/regex_filter.md
- Double Filter: modul/double_filter.md
- Plugins:
- Http: plugin/http.md
- Telegram: plugin/telegram.md

View file

@ -56,6 +56,32 @@ class ModuleBase(ABC):
@param bwPacket: A BOSWatch packet instance
@return bwPacket or False"""
# --- FIX: Multicast list support for Module ---
if isinstance(bwPacket, list):
result_packets = []
for single_packet in bwPacket:
# Recursive call for single packet
processed = self._run(single_packet)
# new logic:
if processed is False:
# filter called 'False' -> packet discarded
continue
elif processed is None:
# module returned None -> keep packet unchanged
result_packets.append(single_packet)
elif isinstance(processed, list):
# module returned new list -> extend
result_packets.extend(processed)
else:
# module returned modified packet -> add
result_packets.append(processed)
# if list is not empty, return it. else False (filter all).
return result_packets if result_packets else False
# -----------------------------------------------
self._runCount += 1
logging.debug("[%s] run #%d", self._moduleName, self._runCount)

607
module/multicast.py Normal file
View file

@ -0,0 +1,607 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
r"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: multicast.py
@date: 28.03.2026
@author: Claus Schichl
@description: multicast module
"""
import logging
import time
import threading
import json
import datetime
from collections import defaultdict
from module.moduleBase import ModuleBase
from boswatch.packet import Packet
from boswatch.network.client import TCPClient
logging.debug("- %s loaded", __name__)
class BoswatchModule(ModuleBase):
r"""!Multicast module with multi-instance support and active trigger mechanism
This module handles multicast alarm distribution.
It manages the correlation between tone-RICs (recipients) and text-RICs (message content),
ensuring reliable alarm delivery even in complex multi-frequency scenarios.
"""
# CLASS VARIABLES - SHARED STATE
_tone_ric_packets = defaultdict(list)
_last_tone_ric_time = defaultdict(float)
_processing_text_ric = defaultdict(bool)
_processing_text_ric_started = defaultdict(float)
# SYSTEM VARIABLES
_lock = threading.Lock()
_cleanup_thread = None
_running = False
_wildcards_registered = set()
_packet_queue = []
_queue_lock = threading.Lock()
_instances = []
# Trigger defaults
_TRIGGER_HOST = "127.0.0.1"
_TRIGGER_PORT = 8080
_MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
_DEFAULT_TRIGGER_RIC = "9999999"
# ============================================================
# LIFECYCLE METHODS
# ============================================================
def __init__(self, config):
super().__init__(__name__, config)
def onLoad(self):
r"""!Initialize module configuration and start the global cleanup thread.
@param None
@return None"""
self._my_frequencies = set()
self.instance_id = hex(id(self))[-4:]
self.name = f"MCAST_{self.instance_id}"
self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10))
self._hard_timeout = self._auto_clear_timeout * 3
def parse_list(key):
val = self.config.get(key)
if val:
return [x.strip() for x in str(val).split(",") if x.strip()]
return []
self._delimiter_rics = parse_list("delimiterRics")
self._text_rics = parse_list("textRics")
self._netident_rics = parse_list("netIdentRics")
trigger_ric_cfg = self.config.get("triggerRic")
if trigger_ric_cfg:
self._trigger_ric = str(trigger_ric_cfg).strip()
else:
self._trigger_ric = None
self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
logging.info("[%s] Multicast module loaded", self.name)
with BoswatchModule._lock:
if self not in BoswatchModule._instances:
BoswatchModule._instances.append(self)
if not BoswatchModule._running:
BoswatchModule._running = True
BoswatchModule._cleanup_thread = threading.Thread(
target=BoswatchModule._global_cleanup_worker, daemon=True
)
BoswatchModule._cleanup_thread.start()
logging.info("Global multicast cleanup thread started")
# ============================================================
# MAIN PROCESSING
# ============================================================
def doWork(self, bwPacket):
r"""!Process an incoming packet and handle multicast logic.
Enriches packets with multicast metadata (mode, role, source).
Does NOT filter - all packets pass through, downstream modules handle filtering.
@param bwPacket: A BOSWatch packet instance or list of packets
@return bwPacket, a list of packets, or None if no processing"""
if isinstance(bwPacket, list):
result_packets = []
for single_packet in bwPacket:
processed = self.doWork(single_packet)
if processed is not None and processed is not False:
if isinstance(processed, list):
result_packets.extend(processed)
else:
result_packets.append(processed)
return result_packets if result_packets else None
packet_dict = self._get_packet_data(bwPacket)
msg = packet_dict.get("message")
ric = packet_dict.get("ric")
freq = packet_dict.get("frequency", "default")
mode = packet_dict.get("mode")
# Handle wakeup triggers
if msg == BoswatchModule._MAGIC_WAKEUP_MSG:
if self._trigger_ric and ric != self._trigger_ric:
return None
logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric)
queued = self._get_queued_packets()
return queued if queued else None
# Only process POCSAG
if mode != "pocsag":
queued = self._get_queued_packets()
return queued if queued else None
self._my_frequencies.add(freq)
# Determine if this is a text-RIC
is_text_ric = False
if self._text_rics:
is_text_ric = ric in self._text_rics and msg and msg.strip()
else:
with BoswatchModule._lock:
is_text_ric = msg and msg.strip() and len(BoswatchModule._tone_ric_packets[freq]) > 0
if is_text_ric:
with BoswatchModule._lock:
BoswatchModule._processing_text_ric[freq] = True
BoswatchModule._processing_text_ric_started[freq] = time.time()
queued_packets = self._get_queued_packets()
incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq)
# === CONTROL PACKETS (netident, delimiter) ===
# Mark and pass through - no filtering!
if self._netident_rics and ric in self._netident_rics:
self._set_mcast_metadata(bwPacket, "control", "netident", ric)
return self._combine_results(incomplete_packets, queued_packets, [bwPacket])
if self._delimiter_rics and ric in self._delimiter_rics:
delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket)
return self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets)
# === TONE-RICs (no message) ===
if not msg or not msg.strip():
self._add_tone_ric_packet(freq, packet_dict)
return self._combine_results(incomplete_packets, queued_packets, False)
# === TEXT-RICs (with message) ===
if is_text_ric and msg:
logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric)
alarm_packets = self._distribute_complete(freq, packet_dict)
with BoswatchModule._lock:
BoswatchModule._processing_text_ric[freq] = False
BoswatchModule._processing_text_ric_started.pop(freq, None)
if not alarm_packets:
logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric)
normal = self._enrich_normal_alarm(bwPacket, packet_dict)
return self._combine_results(normal, incomplete_packets, queued_packets)
else:
return self._combine_results(alarm_packets, incomplete_packets, queued_packets)
# === SINGLE ALARM (message but no text-RICs configured) ===
if msg:
normal = self._enrich_normal_alarm(bwPacket, packet_dict)
return self._combine_results(normal, incomplete_packets, queued_packets)
return self._combine_results(incomplete_packets, queued_packets)
# ============================================================
# PACKET PROCESSING HELPERS (called by doWork)
# ============================================================
def _get_packet_data(self, bwPacket):
r"""!Safely extract all fields from packet as a dictionary.
Handles both dict objects and Packet instances.
Dynamically extracts all fields including those added by other modules.
@param bwPacket: Packet instance or dict
@return dict: Complete dictionary of all packet fields"""
# 1. Fall: Es ist bereits ein Dictionary
if isinstance(bwPacket, dict):
return bwPacket.copy()
# 2. Fall: Es ist ein Packet-Objekt (Daten liegen in _packet)
if hasattr(bwPacket, '_packet'):
return bwPacket._packet.copy()
# 3. Fallback: Falls es ein anderes Objekt ist, versuche __dict__ ohne '_' Filter für 'packet'
try:
return {k: v for k, v in bwPacket.__dict__.items() if not k.startswith('_')}
except Exception as e:
logging.warning("[%s] Error: %s", self.name, e)
return {}
def _combine_results(self, *results):
r"""!Combine multiple result sources into a single list or status.
@param results: Multiple packet objects, lists, or booleans
@return combined list, False or None"""
combined = []
has_false = False
for result in results:
if result is False:
has_false = True
continue
if result is None:
continue
if isinstance(result, list):
combined.extend(result)
else:
combined.append(result)
if combined:
return combined
return False if has_false else None
# ============================================================
# TONE-RIC BUFFER MANAGEMENT
# ============================================================
def _add_tone_ric_packet(self, freq, packet_dict):
r"""!Add a tone-RIC to the shared buffer.
@param freq: Frequency identifier
@param packet_dict: Dictionary containing packet data
@return None"""
with BoswatchModule._lock:
stored_packet = packet_dict.copy()
stored_packet['_multicast_timestamp'] = time.time()
BoswatchModule._tone_ric_packets[freq].append(stored_packet)
BoswatchModule._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp']
logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(BoswatchModule._tone_ric_packets[freq]), freq)
def _get_queued_packets(self):
r"""!Pop and return all packets currently in the static queue.
@param None
@return list: List of packets or None"""
with BoswatchModule._queue_lock:
if BoswatchModule._packet_queue:
packets = BoswatchModule._packet_queue[:]
BoswatchModule._packet_queue.clear()
return packets
return None
# ============================================================
# MULTICAST PACKET CREATION
# ============================================================
def _copy_packet_dict_to_packet(self, recipient_dict, packet, index=1):
r"""!Copy dict fields to Packet with timestamp shift for DB uniqueness.
@param recipient_dict: Source dictionary
@param packet: Target Packet object
@param index: Packet index (1-based) - shifts timestamp by milliseconds
@return None"""
for k, v in recipient_dict.items():
if k.startswith('_'):
continue
if k == 'timestamp' and index > 1:
try:
dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S')
dt_shifted = dt + datetime.timedelta(milliseconds=index - 1)
packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S'))
except (ValueError, TypeError):
packet.set(k, str(v))
else:
packet.set(k, str(v))
def _distribute_complete(self, freq, text_packet_dict):
r"""!Create full multicast packets with message content.
@param freq: Frequency identifier
@param text_packet_dict: Data of the message-carrying packet
@return list: List of fully populated Packet instances"""
with BoswatchModule._lock:
recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
logging.debug("Text-RIC gefunden. Matche gegen %d gespeicherte RICs", len(recipient_dicts))
BoswatchModule._tone_ric_packets[freq].clear()
BoswatchModule._last_tone_ric_time.pop(freq, None)
if not recipient_dicts:
return []
text_ric = text_packet_dict.get("ric")
message_text = text_packet_dict.get("message")
alarm_packets = []
for idx, recipient_dict in enumerate(recipient_dicts, 1):
p = Packet()
self._copy_packet_dict_to_packet(recipient_dict, p, idx)
p.set("message", message_text)
self._apply_list_tags(p, recipient_dicts)
self._set_mcast_metadata(p, "complete", "recipient", text_ric, len(recipient_dicts), idx)
alarm_packets.append(p)
logging.info("[%s] Generated %d complete multicast packets for text-RIC %s", self.name, len(alarm_packets), text_ric)
return alarm_packets
def _create_incomplete_multicast(self, freq, recipient_dicts):
r"""!Generate multicast packets for timeouts (no text message).
@param freq: Frequency identifier
@param recipient_dicts: List of recipient data dictionaries
@return list: List of incomplete Packet instances"""
if not recipient_dicts:
return []
first_ric = recipient_dicts[0].get("ric", "unknown")
incomplete_packets = []
for idx, recipient_dict in enumerate(recipient_dicts, 1):
p = Packet()
self._copy_packet_dict_to_packet(recipient_dict, p, idx)
p.set("message", "")
self._apply_list_tags(p, recipient_dicts)
self._set_mcast_metadata(p, "incomplete", "recipient", first_ric, len(recipient_dicts), idx)
incomplete_packets.append(p)
return incomplete_packets
def _enrich_normal_alarm(self, bwPacket, packet_dict):
r"""!Enrich a standard single alarm with multicast metadata.
@param bwPacket: Target Packet object
@param packet_dict: Source data dictionary
@return list: List containing the enriched packet"""
self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1)
self._apply_list_tags(bwPacket, [packet_dict])
self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1")
logging.debug("Erstelle Single-Alarm für RIC %s", packet_dict.get('ric'))
return [bwPacket]
def _handle_delimiter(self, freq, ric, bwPacket=None):
r"""!Handle delimiter packet and clear orphaned tone-RICs.
@param freq: Frequency identifier
@param ric: Delimiter RIC
@param bwPacket: Optional delimiter packet instance
@return list: Incomplete packets or delimiter control packet"""
with BoswatchModule._lock:
orphaned = BoswatchModule._tone_ric_packets[freq].copy()
BoswatchModule._tone_ric_packets[freq].clear()
BoswatchModule._last_tone_ric_time.pop(freq, None)
BoswatchModule._processing_text_ric[freq] = False
if orphaned:
age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time())
logging.debug("[%s] Delimiter RIC=%s cleared %d orphaned tone-RICs on freq %s: RICs=[%s], age=%.1fs → Creating INCOMPLETE multicast packets for forwarding", self.name, ric, len(orphaned), freq, ', '.join([packet.get('ric', 'unknown') for packet in orphaned]), age_seconds)
return self._create_incomplete_multicast(freq, orphaned)
if bwPacket is not None:
self._set_mcast_metadata(bwPacket, "control", "delimiter", ric, "0", "0")
return [bwPacket]
return None
# ============================================================
# PACKET METADATA HELPERS
# ============================================================
def _set_mcast_metadata(self, packet, mode, role, source="", count="1", index="1"):
r"""!Helper to set standard multicast fields and register wildcards.
@param packet: The Packet instance to modify
@param mode: multicastMode (complete, incomplete, single, control)
@param role: multicastRole (recipient, single, delimiter, netident)
@param source: The originating RIC
@param count: Total number of recipients
@param index: Current recipient index
@return None"""
logging.debug("Setze Metadata - Mode: %s, Role: %s für RIC: %s", mode, role, source)
mapping = {
"multicastMode": (mode, "{MCAST_MODE}"),
"multicastRole": (role, "{MCAST_ROLE}"),
"multicastSourceRic": (source, "{MCAST_SOURCE}"),
"multicastRecipientCount": (str(count), "{MCAST_COUNT}"),
"multicastRecipientIndex": (str(index), "{MCAST_INDEX}")
}
for key, (val, wildcard) in mapping.items():
packet.set(key, val)
self._register_wildcard_safe(wildcard, key)
def _apply_list_tags(self, packet, recipient_dicts):
r"""!Helper to aggregate fields from all recipients into comma-separated lists.
@param packet: The target Packet instance
@param recipient_dicts: List of dictionaries of all recipients in this group
@return None"""
all_fields = set()
for r in recipient_dicts:
all_fields.update(k for k in r.keys() if not k.startswith('_'))
for f in sorted(all_fields):
list_val = ", ".join([str(r.get(f, "")) for r in recipient_dicts])
list_key = f"{f}_list"
packet.set(list_key, list_val)
self._register_wildcard_safe("{" + f.upper() + "_LIST}", list_key)
def _register_wildcard_safe(self, wildcard, field):
r"""!Register wildcard if not already globally registered.
@param wildcard: The wildcard string (e.g. {MCAST_MODE})
@param field: The packet field name
@return None"""
if wildcard not in BoswatchModule._wildcards_registered:
self.registerWildcard(wildcard, field)
BoswatchModule._wildcards_registered.add(wildcard)
# ============================================================
# CLEANUP & TIMEOUT MANAGEMENT
# ============================================================
@staticmethod
def _global_cleanup_worker():
r"""!Static background thread that ticks all active module instances.
@param None
@return None"""
logging.info("Global multicast cleanup ticker active")
while BoswatchModule._running:
time.sleep(1)
with BoswatchModule._lock:
active_instances = BoswatchModule._instances[:]
for instance in active_instances:
try:
instance._perform_instance_tick()
except Exception as e:
logging.error("Error in instance cleanup: %s", e)
if int(time.time()) % 60 == 0:
BoswatchModule._cleanup_hard_timeout_global()
def _perform_instance_tick(self):
r"""!Tick-entry point for this specific instance.
Acts as an extension hook for future per-instance tick logic
(e.g. statistics, heartbeat, watchdog). Do not call directly.
@param None
@return None"""
self._check_all_my_frequencies()
def _check_all_my_frequencies(self):
r"""!Monitor timeouts for all frequencies assigned to this instance.
@param None
@return None"""
incomplete_packets = []
trigger_data = []
with BoswatchModule._lock:
current_time = time.time()
for freq in list(self._my_frequencies):
if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
continue
if BoswatchModule._processing_text_ric.get(freq, False):
flag_age = current_time - BoswatchModule._processing_text_ric_started.get(freq, current_time)
if flag_age > 2:
BoswatchModule._processing_text_ric[freq] = False
BoswatchModule._processing_text_ric_started.pop(freq, None)
else:
continue
last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
if current_time - last_time > self._auto_clear_timeout:
recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC)
trigger_data.append((freq, safe_ric))
BoswatchModule._tone_ric_packets[freq].clear()
BoswatchModule._last_tone_ric_time.pop(freq, None)
logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout)
packets = self._create_incomplete_multicast(freq, recipient_dicts)
if packets:
incomplete_packets.extend(packets)
if incomplete_packets:
with BoswatchModule._queue_lock:
BoswatchModule._packet_queue.extend(incomplete_packets)
for freq, safe_ric in trigger_data:
self._send_wakeup_trigger(freq, safe_ric)
def _check_instance_auto_clear(self, freq):
r"""!Check if frequency has exceeded timeout (called from doWork).
@param freq: Frequency identifier
@return list: Incomplete packets if timeout exceeded, else None"""
with BoswatchModule._lock:
if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
return None
last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
if time.time() - last_time > self._auto_clear_timeout:
recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
BoswatchModule._tone_ric_packets[freq].clear()
BoswatchModule._last_tone_ric_time.pop(freq, None)
logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts))
return self._create_incomplete_multicast(freq, recipient_dicts)
return None
@staticmethod
def _cleanup_hard_timeout_global():
r"""!Global failsafe for really old packets (ignores instance config).
@param None
@return None"""
with BoswatchModule._lock:
current_time = time.time()
max_hard_timeout = 120
if BoswatchModule._instances:
max_hard_timeout = max(inst._hard_timeout for inst in BoswatchModule._instances)
for freq in list(BoswatchModule._tone_ric_packets.keys()):
BoswatchModule._tone_ric_packets[freq] = [
p for p in BoswatchModule._tone_ric_packets[freq]
if current_time - p.get('_multicast_timestamp', 0) < max_hard_timeout
]
if not BoswatchModule._tone_ric_packets[freq]:
del BoswatchModule._tone_ric_packets[freq]
# ============================================================
# TRIGGER SYSTEM
# ============================================================
def _send_wakeup_trigger(self, freq, fallback_ric):
r"""!Send a loopback trigger using the standard TCPClient class."""
try:
trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric
payload = {
"timestamp": time.time(),
"mode": "pocsag",
"bitrate": "1200",
"ric": trigger_ric,
"subric": "1",
"subricText": "a",
"message": BoswatchModule._MAGIC_WAKEUP_MSG,
"clientName": "MulticastTrigger",
"inputSource": "loopback",
"frequency": freq
}
json_str = json.dumps(payload)
# using BOSWatch-Architecture
client = TCPClient(timeout=2)
if client.connect(self._trigger_host, self._trigger_port):
# 1. Send
client.transmit(json_str)
# 2. Recieve (getting [ack] and prevents connection reset)
client.receive(timeout=1)
client.disconnect()
logging.debug("[%s] Wakeup trigger sent and acknowledged (RIC=%s)", self.name, trigger_ric)
else:
logging.error("[%s] Could not connect to local server for wakeup", self.name)
except Exception as e:
logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e)
# ============================================================
# LIFECYCLE (End)
# ============================================================
def onUnload(self):
r"""!Unregister instance from the global cleanup process.
@param None
@return None"""
with BoswatchModule._lock:
if self in BoswatchModule._instances:
BoswatchModule._instances.remove(self)
logging.debug("[%s] Multicast instance unloaded", self.name)

View file

@ -65,6 +65,15 @@ class PluginBase(ABC):
The alarm() method serves the BOSWatch packet to the plugin.
@param bwPacket: A BOSWatch packet instance"""
# --- FIX: Multicast list support ---
if isinstance(bwPacket, list):
# if we got a list of packets, we have to run each packet through the complete alarm process (Setup -> Alarm -> Teardown)
for single_packet in bwPacket:
self._run(single_packet)
return None
# ---------------------------------------------------------------------
self._runCount += 1
logging.debug("[%s] run #%d", self._pluginName, self._runCount)