mirror of
https://github.com/BOSWatch/BW3-Core.git
synced 2026-04-21 06:03:50 +00:00
Merge dada0d635b into 7d4cb57a6e
This commit is contained in:
commit
4772fa10aa
6 changed files with 1074 additions and 1 deletions
|
|
@ -569,6 +569,21 @@ router:
|
|||
]
|
||||
},
|
||||
|
||||
"multicast": {
|
||||
kind: "module",
|
||||
title: "Multicast",
|
||||
creator: "BW3 Dev Team",
|
||||
fields: [
|
||||
{ key: "autoClearTimeout", type: "number", label: "autoClearTimeout", default: 10, required: false },
|
||||
{ key: "delimiterRics", type: "text", label: "delimiterRics", required: false },
|
||||
{ key: "textRics", type: "text", label: "textRics", required: false },
|
||||
{ key: "netIdentRics", type: "text", label: "netIdentRics", required: false },
|
||||
{ key: "triggerRic", type: "text", label: "triggerRic", required: false },
|
||||
{ key: "triggerHost", type: "text", label: "triggerHost", default: "127.0.0.1", required: false },
|
||||
{ key: "triggerPort", type: "number", label: "triggerPort", default: 8080, required: false },
|
||||
]
|
||||
},
|
||||
|
||||
// Plugins
|
||||
"http": {
|
||||
kind: "plugin",
|
||||
|
|
|
|||
449
docu/docs/modul/multicast.md
Normal file
449
docu/docs/modul/multicast.md
Normal file
|
|
@ -0,0 +1,449 @@
|
|||
# <center>Multicast</center>
|
||||
---
|
||||
|
||||
## Beschreibung
|
||||
Das Multicast-Modul verarbeitet komplexe Alarmsequenzen, bei denen eine Nachricht (Text-RIC) an eine Liste zuvor gesendeter Empfänger (Tone-RICs) verteilt wird. Es sorgt dafür, dass jeder Empfänger ein individuelles Paket mit dem Alarmtext erhält.
|
||||
|
||||
Das Modul filtert keine inhaltlich relevanten Pakete. Alle Pakete mit Alarminhalt werden mit `multicastRole` markiert und weitergereicht. Die Filterung nach Bedarf erfolgt nachgelagert, z.B. mit `filter.regexFilter`.
|
||||
|
||||
Das Modul unterstützt:
|
||||
|
||||
- **Multi-Instance Support:** Vollständige Isolation bei parallelem Betrieb in verschiedenen Routen.
|
||||
- **Frequenz-Trennung:** Verhindert die Vermischung von Alarmen auf unterschiedlichen Kanälen.
|
||||
- **Active Trigger System:** Nutzt TCP-Loopback, um auch bei Inaktivität des Funkkanals Timeouts sicher zu verarbeiten.
|
||||
- **Dynamische Listen:** Generiert aggregierte Listenfelder (z. B. {RIC_LIST}) für Sammel-Alarmierungen.
|
||||
- **Metadaten-Enrichment:** Markiert Pakete präzise für nachgelagerte Filter (z. B. RegEx).
|
||||
|
||||
### Funktionsweise
|
||||
Multicast-Alarme funktionieren in zwei bis vier Phasen:
|
||||
**Wichtig:** Das Modul arbeitet verzögert bei der Ausgabe der Text-RICs, um die Pakete anzureichern.
|
||||
|
||||
1. **Delimiter-Phase (Optional)**: Ein spezieller Delimiter-RIC markiert den Start eines neuen Multicast-Blocks. Er wird als technisches Paket (`multicastRole: delimiter`) **sofort** durchgereicht, leert aber intern den RAM-Puffer für neue Empfänger. Diese Phase ist optional - ohne Delimiter werden alle leeren Nachrichten als Tone-RICs behandelt. Downstream-Filter (z.B. filter.regexFilter) können ihn bei Bedarf ausfiltern.
|
||||
|
||||
2. **Tone-RIC-Phase**: Eingehende leere Nachrichten werden **nicht direkt als Pakete ausgegeben**, sondern im RAM zwischengespeichert. Das Modul gibt hier `False` zurück, wodurch der Router die Verarbeitung für dieses spezifische Paket vorerst pausiert.
|
||||
|
||||
3. **Text-RIC**: Ein spezieller Message-RIC empfängt die eigentliche Alarmnachricht. Sobald eine Text-RIC empfangen wird, "kopiert" das Modul diesen Text in jedes einzelne der gespeicherten Tone-RIC-Pakete. Diese werden dann als **Liste von Paketen** gesammelt an den Router als `multicastMode: complete` übergeben. Falls keine Tone-RICs im Puffer liegen (z.B. Einzelalarm), wird die Text-RIC als `multicastMode: single` ausgegeben.
|
||||
**Wichtig:** Die Text-RIC (Message-RIC) wird **nicht als separates Paket ausgegeben**. Sie dient nur als Nachrichtenträger, der seinen Text an alle gesammelten Tone-RICs verteilt. Ausnahme: Einzelalarm (Single)
|
||||
|
||||
4. **Timeout-Phase (Auto-Clear - Optional):** Läuft der `autoClearTimeout` ab, ohne dass ein Text-RIC eintrifft, werden die gepufferten RICs als `multicastMode: incomplete` (ohne Text) ausgegeben.
|
||||
|
||||
## Unterstützte Alarmtypen
|
||||
- POCSAG
|
||||
|
||||
## Resource
|
||||
`multicast`
|
||||
|
||||
## Konfiguration
|
||||
|
||||
|Feld|Beschreibung|Default|
|
||||
|----|------------|-------|
|
||||
|autoClearTimeout|Zeit in Sekunden, nach der Tone-RICs ohne Text-Eingang als `incomplete` ausgegeben werden|10|
|
||||
|delimiterRics|Komma-getrennte Liste von Startmarkern (leert Puffer, `multicastRole: delimiter`)|leer|
|
||||
|textRics|Komma-getrennte Liste von RICs, die den Alarmtext tragen|leer|
|
||||
|netIdentRics|Komma-getrennte Liste von Netzwerk-Identifikations-RICs (`multicastRole: netident`)|leer|
|
||||
|triggerRic|RIC für das Wakeup-Trigger-Paket (optional, bei leer: dynamisch = erste Tone-RIC)|leer|
|
||||
|triggerHost|IP-Adresse für Loopback-Trigger|127.0.0.1|
|
||||
|triggerPort|Port für Loopback-Trigger (entspricht meist Server-Port)|8080|
|
||||
|
||||
**Hinweis:** Zahlen mit führenden Nullen müssen in Anführungszeichen gesetzt werden, z.B. `'0012345'`.
|
||||
|
||||
### Konfigurationsbeispiel 1: Automatische Delimiter-Erkennung (oder nicht verfügbar im Netzwerk) (= Minimalkonfiguration)
|
||||
```yaml
|
||||
- type: module
|
||||
res: multicast
|
||||
name: Multicast Handler
|
||||
config:
|
||||
textRics: '0299001,0310001'
|
||||
```
|
||||
In diesem Modus werden **alle leeren Nachrichten** als toneRics behandelt (keine `delimiterRics` angegeben).
|
||||
|
||||
### Konfigurationsbeispiel 2: Mit Delimiter-Trenner (empfohlen)
|
||||
```yaml
|
||||
- type: module
|
||||
res: multicast
|
||||
name: Multicast Handler
|
||||
config:
|
||||
autoClearTimeout: 10
|
||||
delimiterRics: '0988988'
|
||||
textRics: '0299001,0310001'
|
||||
```
|
||||
In diesem Modus wird **0988988 als Trenner (= Delimiter)** behandelt und **alle anderen leeren Nachrichten als Empfänger**.
|
||||
|
||||
### Konfigurationsbeispiel 3: Mit expliziter Trigger-RIC
|
||||
```yaml
|
||||
- type: module
|
||||
res: multicast
|
||||
name: Multicast Handler
|
||||
config:
|
||||
autoClearTimeout: 10
|
||||
delimiterRics: '0988988'
|
||||
textRics: '0299001,0310001'
|
||||
triggerRic: '9999999'
|
||||
triggerHost: '127.0.0.1'
|
||||
triggerPort: 8080
|
||||
```
|
||||
Verwendet eine feste RIC (9999999) für das interne Wakeup-Trigger-Paket.
|
||||
|
||||
### Konfigurationsbeispiel 4: Mit Netzident-Filterung
|
||||
```yaml
|
||||
- type: module
|
||||
res: multicast
|
||||
name: Multicast Handler
|
||||
config:
|
||||
autoClearTimeout: 10
|
||||
delimiterRics: '0988988'
|
||||
textRics: '0299001,0310001'
|
||||
netIdentRics: '0000001'
|
||||
```
|
||||
Markiert Netzident-Pakete (RIC 0000001) mit multicastRole: netident. Downstream-Filter können sie gezielt ausfiltern (z.B. RegEx-Filter).
|
||||
|
||||
---
|
||||
## Modul Abhängigkeiten
|
||||
- keine
|
||||
|
||||
---
|
||||
## Externe Abhängigkeiten
|
||||
- keine
|
||||
|
||||
---
|
||||
## Paket Modifikationen
|
||||
|
||||
### Hinzugefügte Felder
|
||||
- `multicastMode`(string): Beschreibt das Ergebnis der Multicast-Verarbeitung, besitzt einen der Werte:
|
||||
- `complete`: Vollständiges Multicast-Packet
|
||||
- `incomplete`: Unvollständiges Multicast-Packet (meist fehlt die Text-RIC) (Timeout)
|
||||
- `single`: Einzelner, "normaler" Alarm (Tone-RIC = Text-RIC)
|
||||
- `control`: Netzwerk-Ident-RIC oder andere Verwaltung-RICs (Technik)
|
||||
- `multicastRole`:
|
||||
- `delimiter`
|
||||
- `netident`
|
||||
- `recipient` (Empfänger)
|
||||
- `single`
|
||||
- `multicastRecipientIndex` (string): Index dieses Empfängers (1-N), folgende Logik:
|
||||
- Bei **recipient**: Zählt hoch (z.B. 1 von 5, 2 von 5...)
|
||||
- Bei **delimiter / netident / single**: Immer **1**, da sie als eigenständige technische Pakete zählen
|
||||
- `multicastRecipientCount` (string): Gesamtanzahl der Empfänger des Multicasts
|
||||
- `<FELD>_list` (string): Liste von Werten aus allen Empfänger-RICs für jedes Originalfeld (z.B. `ric_list`, `subric_list`)
|
||||
|
||||
### Ergänzte Felder (von Text-RIC an Tone-RIC):
|
||||
- `message`: Der Text wird aus der Text-RIC übernommen und in die Empfänger-Pakete eingefügt (Bei incomplete-Modus leer)
|
||||
- `multicastSourceRic` (string): RIC des ursprünglichen Message-RICs
|
||||
|
||||
### Erhaltene Felder (Tone-RIC):
|
||||
Diese Felder bleiben **unverändert** bestehen, damit die Zuordnung zum Endgerät korrekt bleibt:
|
||||
- `ric`
|
||||
- `subric`
|
||||
- alle bereits zuvor hinzugefügten Felder (z.B. Descriptor-Modul)
|
||||
|
||||
### Rückgabewerte:
|
||||
- **False**: Paket wurde intern konsumiert (z.B. Tone-RIC wurde in den Buffer aufgenommen), Router stoppt Verarbeitung für dieses Paket (Verhindert die Ausgabe leerer Nachrichten). Allerdings: Das Paket wird im RAM geparkt.
|
||||
- **Liste von Paketen**: Tritt ein, sobald eine Text-RIC die Verteilung auslöst oder ein Timeout abläuft. Der Router verarbeitet jedes Element der Liste (die nun angereicherten Tone-RICs) als eigenständigen Alarm.
|
||||
- **None**: Der Router verarbeitet das Original-Paket normal weiter.
|
||||
|
||||
---
|
||||
|
||||
## Zusätzliche Wildcards
|
||||
|
||||
Folgende Wildcards stehen in allen nachfolgenden Plugins zur Verfügung:
|
||||
|
||||
|Wildcard|Beschreibung|Beispiel|
|
||||
|--------|------------|--------|
|
||||
|{MCAST_SOURCE}|RIC des ursprünglichen Message-RICs|0299001|
|
||||
|{MCAST_COUNT}|Gesamtanzahl der Empfänger dieses Multicasts.|3|
|
||||
|{MCAST_INDEX}|Index des Empfängers (1-basiert für Recipients, 0 für Control-Pakete)|0, 1, 2, 3, ...|
|
||||
|{MCAST_MODE}|Art der Multicast-Verarbeitung durch das Modul|complete, incomplete, single, control|
|
||||
|{MCAST_ROLE}|Rolle des Pakets im Multicast-Ablauf|recipient, single, delimiter, netident|
|
||||
|
||||
### Erweiterung der Listen-Wildcards
|
||||
Das Modul generiert Wildcards für alle gesammelten Felder (RICs, SubRICs, etc.) in Listenform. Diese sind besonders nützlich, um eine kombinierte Ausgabe (z.B. in Telegram) zu erstellen. Im Folgenden ein paar Beispiele:
|
||||
|
||||
|Wildcard|Beschreibung|Zugrundeliegendes Feld|Beispiel|
|
||||
|--------|------------|--------|--------|
|
||||
|{RIC_LIST}|Liste aller RICs der Empfänger (durch Komma getrennt).|ric_list|"0299001, 0299002"|
|
||||
|{SUBRIC_LIST}|Liste aller SubRICs der Empfänger|subric_list|"4, 3"|
|
||||
|{DESCRIPTION_LIST}|Liste aller (deskriptiven) Namen der Empfänger (BEISPIEL! **NUR** bei vorher durchlaufenen Descriptor-Modul)|description_list|"FF Musterstadt, BF Beispiel"|
|
||||
|{<FELD>_LIST}|Liste der Werte für jedes Originalfeld aus dem Paket|<feld>_list|{FREQUENCY_LIST}, {BITRATE_LIST}|
|
||||
|
||||
**Wichtig:** Verwende die **originalen Feldnamen** (z.B. `frequency_list`), nicht die Wildcard-Namen (z.B. ~~`FREQ_list`~~).
|
||||
|
||||
### Verwendungsbeispiel in Plugins, z.B. Telegram-Plugin:
|
||||
```yaml
|
||||
- type: plugin
|
||||
res: telegram
|
||||
config:
|
||||
message_pocsag: |
|
||||
{CNAME}
|
||||
{MSG}
|
||||
RIC: {RIC} / SubRIC: {SRIC}
|
||||
Multicast: {MCAST_INDEX}/{MCAST_COUNT} (Quelle: {MCAST_SOURCE})
|
||||
{TIME}
|
||||
```
|
||||
|
||||
---
|
||||
# Funktionsweise im Detail
|
||||
|
||||
## Grundsätzliche Funktion
|
||||
|
||||
**Beispiel:**
|
||||
```
|
||||
10:31:16 - RIC: 0123456 SubRIC: 1 Message: (leer) → Delimiter-RIC
|
||||
10:31:16 - RIC: 0234567 SubRIC: 4 Message: (leer) → Empfänger 1
|
||||
10:31:16 - RIC: 0345678 SubRIC: 3 Message: (leer) → Empfänger 2
|
||||
10:31:17 - RIC: 0456789 SubRIC: 1 Message: "B3 WOHNHAUS" → Message-RIC
|
||||
|
||||
Generierte Alarme:
|
||||
→ RIC: 0234567 SubRIC: 4 Message: "B3 WOHNHAUS" (behält SubRIC 4!)
|
||||
→ RIC: 0345678 SubRIC: 3 Message: "B3 WOHNHAUS" (behält SubRIC 3!)
|
||||
```
|
||||
|
||||
**Wichtig:** Jeder Empfänger behält seine ursprüngliche SubRIC, da diese oft unterschiedliche Alarmtypen oder Prioritäten repräsentiert.
|
||||
|
||||
### Logik der hinzugefügten Felder
|
||||
|
||||
Um die Logik der Felder multicastMode, multicastRole, etc. zu verstehen, hilft eine tabellarische Gegenüberstellung:
|
||||
|
||||
**1) Szenario Der "Feldstärke-Alarm" (Netident/Delimiter)**
|
||||
|
||||
|Paket-Typ|RIC|multicastMode|multicastRole|sourceRic|Index|Count|
|
||||
|--------|-------|-------|---------|------|--|--|
|
||||
|Delimiter|0288088|control|delimiter|0288088|1|1|
|
||||
|Netzident|0000001|control|netident|0000001|1|1|
|
||||
|
||||
Hinweis: Diese Pakete dienen der Systemsteuerung. Der Index ist immer 1, da sie "Einzelereignisse" im technischen Ablauf sind.
|
||||
Beide RIC werden unmittelbar nach der Verarbeitung weitergereicht, d.h. es wird nicht auf die Netzident-RIC gewartet, um die Delmitier-RIC weiterzureichen.
|
||||
|
||||
**2) Szenario Echter Multicast-Alarm (Vollständig)**
|
||||
Hier sieht man den Ablauf:
|
||||
- Der Delimiter leert den Speicher und wird mit den ergänzenden Feldern angereichert und sofort weitergegeben.
|
||||
- Zwei Tone-RICs sammeln sich an
|
||||
- Die Text-RIC löst die Verteilung aus
|
||||
|
||||
**Beachte:** Die Text-RIC (0456789) dient als Nachrichtenträger und erscheint nicht als eigenes Paket im Output.
|
||||
|
||||
|Phase|Paket-Typ|RIC|multicastMode|multicastRole|sourceRic|Index|Count|
|
||||
|-----|---------|---|-------------|-------------|---------|-----|-----|
|
||||
|Start|Delimiter|0288088|control|delimiter|0288088|1|1|
|
||||
|Sammler|Tone-RIC 1|0234567|-|(interner Buffer)|-|-|-|
|
||||
|Sammler|Tone-RIC 2|0345678|-|(interner Buffer)|-|-|-|
|
||||
|Auslöser|Text-RIC|0456789|(wird verteilt - keine Ausgabe)|(Nachrichtenträger)|-|-|-|
|
||||
|Output 1|Alarm-Paket|0234567|complete|recipient|0456789|1|2|
|
||||
|Output 2|Alarm-Paket|0345678|complete|recipient|0456789|2|2|
|
||||
|
||||
**3) Szenario Unvollständiger Alarm (Incomplete / Timeout)**
|
||||
In diesem Fall fehlt die Text-RIC. Das System wartet bis zum Timeout und schickt dann die Empfänger mit leerer Nachricht raus (getriggert durch das Active Trigger System).
|
||||
|
||||
|Phase|Paket-Typ|RIC|multicastMode|multicastRole|sourceRic|Index|Count|
|
||||
|-----|---------|---|-------------|-------------|---------|-----|-----|
|
||||
|Start|Delimiter|0288088|control|delimiter|0288088|1|1|
|
||||
|Sammler|Tone-RIC 1|0234567|-|(interner Buffer)|-|-|-|
|
||||
|Sammler|Tone-RIC 2|0345678|-|(interner Buffer)|-|-|-|
|
||||
|Event|Timeout|Kein Text|Auto-Clear nach standardmäßig 10s|-|-|-|-|
|
||||
|Output 1|Incomplete|0234567|incomplete|recipient|0234567|1|2|
|
||||
|Output 2|Incomplete|0345678|incomplete|recipient|0234567|2|2|
|
||||
|
||||
Der Delimiter wird mit den ergänzenden Feldern angereichert und sofort weitergegeben.
|
||||
|
||||
---
|
||||
## Integration in Router-Konfiguration
|
||||
|
||||
Das Multicast-Modul muss **vor** den Plugins platziert werden, damit die generierten Alarme korrekt verarbeitet werden:
|
||||
|
||||
```yaml
|
||||
- name: Router POCSAG
|
||||
route:
|
||||
- type: module
|
||||
res: filter.modeFilter
|
||||
name: Filter POCSAG
|
||||
config:
|
||||
allowed:
|
||||
- pocsag
|
||||
|
||||
# Multicast-Modul hier einfügen
|
||||
- type: module
|
||||
res: multicast
|
||||
name: Multicast Handler
|
||||
config:
|
||||
textRics: '0299001,0310001'
|
||||
delimiterRics: '0288088'
|
||||
autoClearTimeout: 10
|
||||
|
||||
# Weitere Module und Plugins
|
||||
- type: plugin
|
||||
res: mysql
|
||||
config:
|
||||
# ...
|
||||
|
||||
- type: plugin
|
||||
res: telegram
|
||||
config:
|
||||
# ...
|
||||
```
|
||||
|
||||
## Beispielhafte Verwendung in Router-Konfigurationen
|
||||
Das Multicast-Modul gibt für jedes RIC ein eigenes Paket aus UND generiert für konsistente Verarbeitung Listenfelder.
|
||||
Dies ermöglicht es, entweder jede RIC einzeln zu verarbeiten oder die Listenfelder für eine gesammelte Ausgabe zu verwenden. Vor der weiteren Verarbeitung in Plugins empfiehlt sich eventuell eine Filterung mittels [RegEx-Filter](regex_filter.md).
|
||||
Die folgenden Beispiele dienen zur Veranschaulichung der Möglichkeiten des Multicast-Modul in Verbindung mit RegEx-Filter.
|
||||
|
||||
|
||||
### Beispiel (siehe auch "Zusätzliche Wildcards"):
|
||||
```yaml
|
||||
router:
|
||||
- name: Router POCSAG
|
||||
route:
|
||||
- type: module
|
||||
res: filter.modeFilter
|
||||
config:
|
||||
[...]
|
||||
- type: module
|
||||
res: descriptor
|
||||
config:
|
||||
[...]
|
||||
- type: module
|
||||
res: multicast
|
||||
name: Multicast
|
||||
config:
|
||||
autoClearTimeout: 10
|
||||
delimiterRics: '0123456' # Start eines Multicast-Alarms
|
||||
textRics: '9909909' # Text-RIC
|
||||
- type: module
|
||||
res: filter.doubleFilter
|
||||
config:
|
||||
[...]
|
||||
- type: router
|
||||
res: RouterMySQL
|
||||
- type: router
|
||||
res: RouterTelegram
|
||||
|
||||
- name: RouterMySQL
|
||||
route:
|
||||
- type: module
|
||||
res: filter.regexFilter
|
||||
name: Filter MySQL
|
||||
config:
|
||||
- name: "Multicast Mode complete or single"
|
||||
checks:
|
||||
- field: multicastMode
|
||||
regex: ^(complete|single)$
|
||||
- type: plugin
|
||||
res: mysql
|
||||
config:
|
||||
[...]
|
||||
|
||||
- name: RouterTelegram
|
||||
route:
|
||||
- type: module
|
||||
res: filter.regexFilter
|
||||
name: Multicast Recipient Index Filter # 1. Paket, da ist alles drin für einen kombinierten Alarm und ist immer vorhanden
|
||||
config:
|
||||
- name: "Multicast 1 Paket pro Alarm-Paket"
|
||||
checks:
|
||||
- field: multicastRecipientIndex
|
||||
regex: ^1$
|
||||
- type: plugin
|
||||
res: telegram
|
||||
config:
|
||||
message_pocsag: |
|
||||
<b>{CNAME}</b>
|
||||
{MSG}
|
||||
Alarmierte Einheiten [{MCAST_COUNT}]: {DESCRIPTION_LIST}
|
||||
RICs: {RIC_LIST}
|
||||
{TIME}
|
||||
[...]
|
||||
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Das Active Trigger System (Verlustfreie Paketauslieferung)
|
||||
BOSWatch arbeitet **synchron**. Das bedeutet: Der Router "schläft", wenn kein Funk-Paket von außen eingeht. Ein Timeout im Hintergrund-Thread des Moduls kann den schlafenden Router nicht von alleine aufwecken, um die im RAM wartenden Pakete (`incomplete`) herauszuschieben.
|
||||
|
||||
**Lösung:**
|
||||
Das Modul verwendet ein aktives Trigger-System, um sicherzustellen, dass **keine Multicast-Pakete verloren gehen**
|
||||
|
||||
**Ausführung**
|
||||
Das Modul sendet über via TCP (Loopback) ein minimales Trigger-Paket an den eigenen BOSWatch-Server. Dieser empfängt es wie einen normalen Funk-Alarm, weckt den Router auf und das Modul kann die wartenden Alarme (`incomplete`) sicher ausliefern.
|
||||
|
||||
**Trigger-RIC Auswahl** (3-stufige Fallback-Kette):
|
||||
- **Explizit**: Wenn `triggerRic` konfiguriert ist, wird diese RIC verwendet
|
||||
- **Dynamisch**: Wenn nicht konfiguriert, wird die erste Tone-RIC der aktuellen Gruppe verwendet
|
||||
- **Fallback**: Falls keine Tone-RICs vorhanden sind (sollte nicht vorkommen), wird "9999999" verwendet
|
||||
|
||||
**Beispiel-Ablauf bei Auto-Clear:**
|
||||
```
|
||||
10:31:16 - Tone-RIC empfangen (0234567)
|
||||
10:31:16 - Tone-RIC empfangen (0345678)
|
||||
10:31:26 - Auto-Clear-Timeout (10s) erreicht
|
||||
→ Incomplete-Pakete in Queue gespeichert
|
||||
→ Trigger-Paket via Loopback gesendet (RIC: 0234567)
|
||||
10:31:26 - Trigger-Paket durch BOSWatch-Server verarbeitet
|
||||
→ Queue-Flush: Incomplete-Pakete werden ausgegeben
|
||||
```
|
||||
|
||||
**Vorteile:**
|
||||
- Keine Pakete gehen verloren, auch bei hoher Systemlast
|
||||
- Saubere Trennung von Verarbeitung und Ausgabe
|
||||
- Ermöglicht zeitlich versetzte Ausgabe ohne Race Conditions
|
||||
|
||||
### Zeitbasierte Verarbeitung
|
||||
|
||||
1. **Tone-RIC-Sammlung**: Tone-RICs (meist leere Nachrichten) werden empfangen und gespeichert
|
||||
2. **Text-RIC-Verteilung**: Sobald ein Text-RIC empfangen wird, erfolgt die sofortige Verteilung an alle gesammelten Tone-RICs
|
||||
3. **Auto-Clear**: Nach `autoClearTimeout` Sekunden ohne Text-RIC werden die Tone-RICs als incomplete ausgegeben (via Trigger-System)
|
||||
4. **Hard-Timeout-Cleanup**: Nach 3x `autoClearTimeout` (oder max. 120s) werden veraltete Pakete aus dem Speicher gelöscht (Failsafe)
|
||||
|
||||
### Frequenz-Trennung
|
||||
|
||||
Das Modul trennt Multicast-Listen nach Frequenzen. Dies verhindert Vermischung von Alarmen verschiedener Sender.
|
||||
|
||||
**Beispiel:**
|
||||
```
|
||||
Frequenz 173.050 MHz: Tone-RICs [0234567, 0345678]
|
||||
Frequenz 173.075 MHz: Tone-RICs [0456789, 0567890]
|
||||
→ Werden getrennt verarbeitet, keine Vermischung möglich (wichtig für Multi-Client mit Single-Server)
|
||||
```
|
||||
|
||||
## Paketmarkierung statt interner Filterung
|
||||
|
||||
Das Modul filtert keine inhaltlich relevanten Pakete.
|
||||
Alle Pakete werden mit `multicastRole` markiert und weitergereicht. Die Filterung nach Bedarf erfolgt nachgelagert, z.B. mit `filter.regexFilter`.
|
||||
|
||||
Eine Ausnahme bilden **Tone-RICs** (leere Nachrichten): Diese werden zuerst intern im Buffer gesammelt und bei einem complete-Alarm (und incomplete) in die Listenfelder aggregiert. Die Listenfelder werden an alle **Tone-RICs** angehängt und anschließend jede **Tone-RIC** angereichert ausgegeben.
|
||||
|
||||
Pakete und ihre Rollen:
|
||||
|
||||
- **Delimiter-Pakete**: Erhalten `multicastRole: delimiter`
|
||||
- **Netzident-Pakete**: Erhalten `multicastRole: netident`
|
||||
- **Empfänger-Pakete**: Erhalten `multicastRole: recipient`
|
||||
- **Einzelalarme**: Erhalten `multicastRole: single`
|
||||
|
||||
Beispiel-Filter um Delimiter und Netident auszublenden:
|
||||
```yaml
|
||||
- type: module
|
||||
res: filter.regexFilter
|
||||
config:
|
||||
- name: "Nur echte Alarme"
|
||||
checks:
|
||||
- field: multicastRole
|
||||
regex: ^(recipient|single)$multicastIndex
|
||||
```
|
||||
|
||||
|
||||
---
|
||||
|
||||
## Zusammenfassung: Was passiert mit den Daten?
|
||||
|
||||
- Der Delimiter: Wird sofort markiert und als technisches Paket weitergereicht. Er sorgt dafür, dass keine "Leichen" von alten, abgebrochenen Alarmen im Speicher liegen.
|
||||
- Die Tone-RICs: Diese werden vom Modul "geschluckt" (return False). Sie verschwinden kurzzeitig aus dem Datenfluss und warten im RAM.
|
||||
- Die Text-RIC: Wenn sie eintrifft, nimmt das Modul ihren Text (B3 WOHNHAUS) und kopiert ihn in die Tone-RIC-Pakete im RAM. Anschließend erfolgt die Ausgabe der Tone-RICs.
|
||||
- Die Metadaten: Erst beim Erzeugen der Output-Pakete werden Felder wie multicastRecipientIndex berechnet, damit nachfolgende Plugins wissen, dass diese Pakete zusammengehören.
|
||||
|
||||
Technischer Hinweis:
|
||||
Da die Text-RIC im complete-Fall "verbraucht" wird, um die Tone-RICs zu füllen, wird sie nicht als separates zusätzliches Paket ausgegeben. Das verhindert Dopplungen in der Datenbank. Nur wenn gar keine Tone-RICs da sind, wird die Text-RIC als `single` ausgegeben.
|
||||
In diesem Fall (Single) ist es völlig egal, ob die Text-RIC mit Delimiter oder ohne empfangen wird - die Delimiter-RIC wird als Delimiter gekennzeichnet, das Text-RIC als Single (in `multicastMode` sowie in `multicastRole`, `multicastRecipientIndex: 1`, `multicastRecipientCount: 1`).
|
||||
|
||||
### Multi-Instanz-Betrieb
|
||||
Das Modul unterstützt unbegrenzte parallele Instanzen durch vollständige Isolation:
|
||||
|
||||
- **Encapsulated State:** Jede Instanz verwaltet ihren eigenen Tone-RIC-Speicher. Es gibt keine Vermischung zwischen verschiedenen Routen.
|
||||
- **Isolated Cleanup:** Jede Instanz startet einen eigenen, internen Cleanup-Thread für präzises Timeout-Management.
|
||||
- **Instance IDs:** Zur besseren Nachverfolgung im Log erhält jede Instanz eine eindeutige ID (z.B. MCAST_a1b2).
|
||||
|
|
@ -20,10 +20,11 @@ nav:
|
|||
- Changelog: changelog.md
|
||||
- Module:
|
||||
- Descriptor: modul/descriptor.md
|
||||
- Double Filter: modul/double_filter.md
|
||||
- Geocoding: modul/geocoding.md
|
||||
- Mode Filter: modul/mode_filter.md
|
||||
- Multicast: modul/multicast.md
|
||||
- Regex Filter: modul/regex_filter.md
|
||||
- Double Filter: modul/double_filter.md
|
||||
- Plugins:
|
||||
- Http: plugin/http.md
|
||||
- Telegram: plugin/telegram.md
|
||||
|
|
|
|||
|
|
@ -56,6 +56,32 @@ class ModuleBase(ABC):
|
|||
|
||||
@param bwPacket: A BOSWatch packet instance
|
||||
@return bwPacket or False"""
|
||||
|
||||
# --- FIX: Multicast list support for Module ---
|
||||
if isinstance(bwPacket, list):
|
||||
result_packets = []
|
||||
for single_packet in bwPacket:
|
||||
# Recursive call for single packet
|
||||
processed = self._run(single_packet)
|
||||
|
||||
# new logic:
|
||||
if processed is False:
|
||||
# filter called 'False' -> packet discarded
|
||||
continue
|
||||
elif processed is None:
|
||||
# module returned None -> keep packet unchanged
|
||||
result_packets.append(single_packet)
|
||||
elif isinstance(processed, list):
|
||||
# module returned new list -> extend
|
||||
result_packets.extend(processed)
|
||||
else:
|
||||
# module returned modified packet -> add
|
||||
result_packets.append(processed)
|
||||
|
||||
# if list is not empty, return it. else False (filter all).
|
||||
return result_packets if result_packets else False
|
||||
# -----------------------------------------------
|
||||
|
||||
self._runCount += 1
|
||||
logging.debug("[%s] run #%d", self._moduleName, self._runCount)
|
||||
|
||||
|
|
|
|||
573
module/multicast.py
Normal file
573
module/multicast.py
Normal file
|
|
@ -0,0 +1,573 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
r"""!
|
||||
____ ____ ______ __ __ __ _____
|
||||
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
|
||||
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
|
||||
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
|
||||
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
|
||||
German BOS Information Script
|
||||
by Bastian Schroll
|
||||
|
||||
@file: multicast.py
|
||||
@date: 13.04.2026
|
||||
@author: Claus Schichl
|
||||
@description: multicast module
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
import threading
|
||||
import json
|
||||
import datetime
|
||||
from collections import defaultdict
|
||||
from module.moduleBase import ModuleBase
|
||||
from boswatch.packet import Packet
|
||||
from boswatch.network.client import TCPClient
|
||||
|
||||
logging.debug("- %s loaded", __name__)
|
||||
|
||||
|
||||
class BoswatchModule(ModuleBase):
|
||||
r"""!Multicast module with multi-instance support and active trigger mechanism
|
||||
|
||||
This module handles multicast alarm distribution.
|
||||
It manages the correlation between tone-RICs (recipients) and text-RICs (message content),
|
||||
ensuring reliable alarm delivery even in complex multi-frequency scenarios.
|
||||
"""
|
||||
|
||||
# Trigger defaults
|
||||
_TRIGGER_HOST = "127.0.0.1"
|
||||
_TRIGGER_PORT = 8080
|
||||
_MAGIC_WAKEUP_MSG = "###_MULTICAST_WAKEUP_###"
|
||||
_DEFAULT_TRIGGER_RIC = "9999999"
|
||||
|
||||
# ============================================================
|
||||
# LIFECYCLE METHODS
|
||||
# ============================================================
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__(__name__, config)
|
||||
|
||||
def onLoad(self):
|
||||
r"""!Initialize module configuration and start the global cleanup thread.
|
||||
|
||||
@param None
|
||||
@return None"""
|
||||
self._my_frequencies = set()
|
||||
self.instance_id = hex(id(self))[-4:]
|
||||
self.name = f"MCAST_{self.instance_id}"
|
||||
|
||||
self._auto_clear_timeout = int(self.config.get("autoClearTimeout", default=10))
|
||||
self._hard_timeout = self._auto_clear_timeout * 3
|
||||
|
||||
def parse_list(key):
|
||||
val = self.config.get(key)
|
||||
if val:
|
||||
return [x.strip() for x in str(val).split(",") if x.strip()]
|
||||
return []
|
||||
|
||||
self._delimiter_rics = parse_list("delimiterRics")
|
||||
self._text_rics = parse_list("textRics")
|
||||
self._netident_rics = parse_list("netIdentRics")
|
||||
|
||||
trigger_ric_cfg = self.config.get("triggerRic")
|
||||
if trigger_ric_cfg:
|
||||
self._trigger_ric = str(trigger_ric_cfg).strip()
|
||||
else:
|
||||
self._trigger_ric = None
|
||||
|
||||
self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
|
||||
self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
|
||||
|
||||
# --- Per-instance state (replaces all former class-variables) ---
|
||||
# Key: frequency string (e.g. "85.125M")
|
||||
self._tone_ric_packets = defaultdict(list) # buffered tone-RICs per frequency
|
||||
self._last_tone_ric_time = defaultdict(float) # last arrival time per frequency
|
||||
self._processing_text_ric = defaultdict(bool) # text-RIC currently being processed?
|
||||
self._processing_text_ric_started = defaultdict(float) # when did processing start?
|
||||
self._wildcards_registered = set() # avoid double-registering wildcards
|
||||
self._packet_queue = [] # deferred packets waiting for trigger
|
||||
|
||||
# --- Locks (only needed within this instance, no cross-instance sharing) ---
|
||||
self._lock = threading.Lock()
|
||||
self._queue_lock = threading.Lock()
|
||||
|
||||
# --- Per-instance cleanup thread ---
|
||||
self._running = True
|
||||
self._cleanup_thread = threading.Thread(target=self._cleanup_worker, daemon=True)
|
||||
self._cleanup_thread.start()
|
||||
|
||||
logging.info("[%s] Multicast module loaded", self.name)
|
||||
|
||||
# ============================================================
|
||||
# MAIN PROCESSING
|
||||
# ============================================================
|
||||
|
||||
def doWork(self, bwPacket):
|
||||
r"""!Process an incoming packet and handle multicast logic.
|
||||
|
||||
Enriches packets with multicast metadata (mode, role, source).
|
||||
Does NOT filter - all packets pass through, downstream modules handle filtering.
|
||||
|
||||
@param bwPacket: A BOSWatch packet instance or list of packets
|
||||
@return bwPacket, a list of packets, or None if no processing"""
|
||||
if isinstance(bwPacket, list):
|
||||
result_packets = []
|
||||
for single_packet in bwPacket:
|
||||
processed = self.doWork(single_packet)
|
||||
if processed is not None and processed is not False:
|
||||
if isinstance(processed, list):
|
||||
result_packets.extend(processed)
|
||||
else:
|
||||
result_packets.append(processed)
|
||||
return result_packets if result_packets else None
|
||||
|
||||
packet_dict = self._get_packet_data(bwPacket)
|
||||
msg = packet_dict.get("message")
|
||||
ric = packet_dict.get("ric")
|
||||
freq = packet_dict.get("frequency", "default")
|
||||
mode = packet_dict.get("mode")
|
||||
|
||||
# Handle wakeup triggers
|
||||
if msg == BoswatchModule._MAGIC_WAKEUP_MSG:
|
||||
if self._trigger_ric and ric != self._trigger_ric:
|
||||
return None
|
||||
logging.debug("[%s] Wakeup trigger received (RIC=%s)", self.name, ric)
|
||||
queued = self._get_queued_packets()
|
||||
return queued if queued else None
|
||||
|
||||
# Only process POCSAG
|
||||
if mode != "pocsag":
|
||||
queued = self._get_queued_packets()
|
||||
return queued if queued else None
|
||||
|
||||
self._my_frequencies.add(freq)
|
||||
|
||||
# Determine if this is a text-RIC
|
||||
is_text_ric = False
|
||||
if self._text_rics:
|
||||
is_text_ric = ric in self._text_rics and msg and msg.strip()
|
||||
else:
|
||||
with self._lock:
|
||||
is_text_ric = msg and msg.strip() and len(self._tone_ric_packets[freq]) > 0
|
||||
|
||||
if is_text_ric:
|
||||
with self._lock:
|
||||
self._processing_text_ric[freq] = True
|
||||
self._processing_text_ric_started[freq] = time.time()
|
||||
|
||||
queued_packets = self._get_queued_packets()
|
||||
incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq)
|
||||
|
||||
# === CONTROL PACKETS (netident, delimiter) ===
|
||||
# Mark and pass through - no filtering!
|
||||
|
||||
if self._netident_rics and ric in self._netident_rics:
|
||||
self._set_mcast_metadata(bwPacket, "control", "netident", ric)
|
||||
return self._combine_results(incomplete_packets, queued_packets, [bwPacket])
|
||||
|
||||
if self._delimiter_rics and ric in self._delimiter_rics:
|
||||
delimiter_incomplete = self._handle_delimiter(freq, ric, bwPacket)
|
||||
return self._combine_results(delimiter_incomplete, incomplete_packets, queued_packets)
|
||||
|
||||
# === TONE-RICs (no message) ===
|
||||
if not msg or not msg.strip():
|
||||
self._add_tone_ric_packet(freq, packet_dict)
|
||||
return self._combine_results(incomplete_packets, queued_packets, False)
|
||||
|
||||
# === TEXT-RICs (with message) ===
|
||||
if is_text_ric and msg:
|
||||
logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric)
|
||||
alarm_packets = self._distribute_complete(freq, packet_dict)
|
||||
with self._lock:
|
||||
self._processing_text_ric[freq] = False
|
||||
self._processing_text_ric_started.pop(freq, None)
|
||||
|
||||
if not alarm_packets:
|
||||
logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric)
|
||||
normal = self._enrich_normal_alarm(bwPacket, packet_dict)
|
||||
return self._combine_results(normal, incomplete_packets, queued_packets)
|
||||
else:
|
||||
return self._combine_results(alarm_packets, incomplete_packets, queued_packets)
|
||||
|
||||
# === SINGLE ALARM (message but no text-RICs configured) ===
|
||||
if msg:
|
||||
normal = self._enrich_normal_alarm(bwPacket, packet_dict)
|
||||
return self._combine_results(normal, incomplete_packets, queued_packets)
|
||||
|
||||
return self._combine_results(incomplete_packets, queued_packets)
|
||||
|
||||
# ============================================================
|
||||
# PACKET PROCESSING HELPERS (called by doWork)
|
||||
# ============================================================
|
||||
|
||||
def _get_packet_data(self, bwPacket):
|
||||
r"""!Safely extract all fields from packet as a dictionary.
|
||||
|
||||
Handles both dict objects and Packet instances.
|
||||
Dynamically extracts all fields including those added by other modules.
|
||||
|
||||
@param bwPacket: Packet instance or dict
|
||||
@return dict: Complete dictionary of all packet fields"""
|
||||
# 1. Fall: Es ist bereits ein Dictionary
|
||||
if isinstance(bwPacket, dict):
|
||||
return bwPacket.copy()
|
||||
|
||||
# 2. Fall: Es ist ein Packet-Objekt (Daten liegen in _packet)
|
||||
if hasattr(bwPacket, '_packet'):
|
||||
return bwPacket._packet.copy()
|
||||
|
||||
# 3. Fallback: Falls es ein anderes Objekt ist, versuche __dict__ ohne '_' Filter für 'packet'
|
||||
try:
|
||||
return {k: v for k, v in bwPacket.__dict__.items() if not k.startswith('_')}
|
||||
except Exception as e:
|
||||
logging.warning("[%s] Error: %s", self.name, e)
|
||||
return {}
|
||||
|
||||
def _combine_results(self, *results):
|
||||
r"""!Combine multiple result sources into a single list or status.
|
||||
|
||||
@param results: Multiple packet objects, lists, or booleans
|
||||
@return combined list, False or None"""
|
||||
combined = []
|
||||
has_false = False
|
||||
for result in results:
|
||||
if result is False:
|
||||
has_false = True
|
||||
continue
|
||||
if result is None:
|
||||
continue
|
||||
if isinstance(result, list):
|
||||
combined.extend(result)
|
||||
else:
|
||||
combined.append(result)
|
||||
if combined:
|
||||
return combined
|
||||
return False if has_false else None
|
||||
|
||||
# ============================================================
|
||||
# TONE-RIC BUFFER MANAGEMENT
|
||||
# ============================================================
|
||||
|
||||
def _add_tone_ric_packet(self, freq, packet_dict):
|
||||
r"""!Add a tone-RIC to the shared buffer.
|
||||
|
||||
@param freq: Frequency identifier
|
||||
@param packet_dict: Dictionary containing packet data
|
||||
@return None"""
|
||||
with self._lock:
|
||||
stored_packet = packet_dict.copy()
|
||||
stored_packet['_multicast_timestamp'] = time.time()
|
||||
self._tone_ric_packets[freq].append(stored_packet)
|
||||
self._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp']
|
||||
logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(self._tone_ric_packets[freq]), freq)
|
||||
|
||||
def _get_queued_packets(self):
|
||||
r"""!Pop and return all packets currently in the static queue.
|
||||
|
||||
@param None
|
||||
@return list: List of packets or None"""
|
||||
with self._queue_lock:
|
||||
if self._packet_queue:
|
||||
packets = self._packet_queue[:]
|
||||
self._packet_queue.clear()
|
||||
return packets
|
||||
return None
|
||||
|
||||
# ============================================================
|
||||
# MULTICAST PACKET CREATION
|
||||
# ============================================================
|
||||
|
||||
def _copy_packet_dict_to_packet(self, recipient_dict, packet, index=1):
|
||||
r"""!Copy dict fields to Packet with timestamp shift for DB uniqueness.
|
||||
|
||||
@param recipient_dict: Source dictionary
|
||||
@param packet: Target Packet object
|
||||
@param index: Packet index (1-based) - shifts timestamp by milliseconds
|
||||
@return None"""
|
||||
for k, v in recipient_dict.items():
|
||||
if k.startswith('_'):
|
||||
continue
|
||||
if k == 'timestamp' and index > 1:
|
||||
try:
|
||||
dt = datetime.datetime.strptime(str(v), '%d.%m.%Y %H:%M:%S')
|
||||
dt_shifted = dt + datetime.timedelta(milliseconds=index - 1)
|
||||
packet.set(k, dt_shifted.strftime('%d.%m.%Y %H:%M:%S'))
|
||||
except (ValueError, TypeError):
|
||||
packet.set(k, str(v))
|
||||
else:
|
||||
packet.set(k, str(v))
|
||||
|
||||
def _distribute_complete(self, freq, text_packet_dict):
|
||||
r"""!Create full multicast packets with message content.
|
||||
|
||||
@param freq: Frequency identifier
|
||||
@param text_packet_dict: Data of the message-carrying packet
|
||||
@return list: List of fully populated Packet instances"""
|
||||
with self._lock:
|
||||
recipient_dicts = self._tone_ric_packets[freq].copy()
|
||||
logging.debug("Text RIC found. Matching against %d stored RICs", len(recipient_dicts))
|
||||
self._tone_ric_packets[freq].clear()
|
||||
self._last_tone_ric_time.pop(freq, None)
|
||||
|
||||
if not recipient_dicts:
|
||||
return []
|
||||
text_ric = text_packet_dict.get("ric")
|
||||
message_text = text_packet_dict.get("message")
|
||||
alarm_packets = []
|
||||
|
||||
for idx, recipient_dict in enumerate(recipient_dicts, 1):
|
||||
p = Packet()
|
||||
self._copy_packet_dict_to_packet(recipient_dict, p, idx)
|
||||
p.set("message", message_text)
|
||||
self._apply_list_tags(p, recipient_dicts)
|
||||
self._set_mcast_metadata(p, "complete", "recipient", text_ric, len(recipient_dicts), idx)
|
||||
alarm_packets.append(p)
|
||||
|
||||
logging.info("[%s] Generated %d complete multicast packets for text-RIC %s", self.name, len(alarm_packets), text_ric)
|
||||
return alarm_packets
|
||||
|
||||
def _create_incomplete_multicast(self, freq, recipient_dicts):
|
||||
r"""!Generate multicast packets for timeouts (no text message).
|
||||
|
||||
@param freq: Frequency identifier
|
||||
@param recipient_dicts: List of recipient data dictionaries
|
||||
@return list: List of incomplete Packet instances"""
|
||||
if not recipient_dicts:
|
||||
return []
|
||||
first_ric = recipient_dicts[0].get("ric", "unknown")
|
||||
incomplete_packets = []
|
||||
for idx, recipient_dict in enumerate(recipient_dicts, 1):
|
||||
p = Packet()
|
||||
self._copy_packet_dict_to_packet(recipient_dict, p, idx)
|
||||
p.set("message", "")
|
||||
self._apply_list_tags(p, recipient_dicts)
|
||||
self._set_mcast_metadata(p, "incomplete", "recipient", first_ric, len(recipient_dicts), idx)
|
||||
incomplete_packets.append(p)
|
||||
return incomplete_packets
|
||||
|
||||
def _enrich_normal_alarm(self, bwPacket, packet_dict):
|
||||
r"""!Enrich a standard single alarm with multicast metadata.
|
||||
|
||||
@param bwPacket: Target Packet object
|
||||
@param packet_dict: Source data dictionary
|
||||
@return list: List containing the enriched packet"""
|
||||
self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1)
|
||||
self._apply_list_tags(bwPacket, [packet_dict])
|
||||
self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1")
|
||||
logging.debug("Creating single-alarm for RIC %s", packet_dict.get('ric'))
|
||||
return [bwPacket]
|
||||
|
||||
def _handle_delimiter(self, freq, ric, bwPacket=None):
|
||||
r"""!Handle delimiter packet and clear orphaned tone-RICs.
|
||||
|
||||
@param freq: Frequency identifier
|
||||
@param ric: Delimiter RIC
|
||||
@param bwPacket: Optional delimiter packet instance
|
||||
@return list: Incomplete packets or delimiter control packet"""
|
||||
with self._lock:
|
||||
orphaned = self._tone_ric_packets[freq].copy()
|
||||
self._tone_ric_packets[freq].clear()
|
||||
self._last_tone_ric_time.pop(freq, None)
|
||||
self._processing_text_ric[freq] = False
|
||||
|
||||
if orphaned:
|
||||
age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time())
|
||||
|
||||
logging.debug("[%s] Delimiter RIC=%s cleared %d orphaned tone-RICs on freq %s: RICs=[%s], age=%.1fs → Creating INCOMPLETE multicast packets for forwarding", self.name, ric, len(orphaned), freq, ', '.join([packet.get('ric', 'unknown') for packet in orphaned]), age_seconds)
|
||||
return self._create_incomplete_multicast(freq, orphaned)
|
||||
if bwPacket is not None:
|
||||
self._set_mcast_metadata(bwPacket, "control", "delimiter", ric, "0", "0")
|
||||
return [bwPacket]
|
||||
return None
|
||||
|
||||
# ============================================================
|
||||
# PACKET METADATA HELPERS
|
||||
# ============================================================
|
||||
|
||||
def _set_mcast_metadata(self, packet, mode, role, source="", count="1", index="1"):
|
||||
r"""!Helper to set standard multicast fields and register wildcards.
|
||||
|
||||
@param packet: The Packet instance to modify
|
||||
@param mode: multicastMode (complete, incomplete, single, control)
|
||||
@param role: multicastRole (recipient, single, delimiter, netident)
|
||||
@param source: The originating RIC
|
||||
@param count: Total number of recipients
|
||||
@param index: Current recipient index
|
||||
@return None"""
|
||||
logging.debug("setting Metadata - Mode: %s, Role: %s, Index: %s of %s for RIC: %s", mode, role, index, count, source)
|
||||
mapping = {
|
||||
"multicastMode": (mode, "{MCAST_MODE}"),
|
||||
"multicastRole": (role, "{MCAST_ROLE}"),
|
||||
"multicastSourceRic": (source, "{MCAST_SOURCE}"),
|
||||
"multicastRecipientCount": (str(count), "{MCAST_COUNT}"),
|
||||
"multicastRecipientIndex": (str(index), "{MCAST_INDEX}")
|
||||
}
|
||||
for key, (val, wildcard) in mapping.items():
|
||||
packet.set(key, val)
|
||||
self._register_wildcard_safe(wildcard, key)
|
||||
|
||||
def _apply_list_tags(self, packet, recipient_dicts):
|
||||
r"""!Helper to aggregate fields from all recipients into comma-separated lists.
|
||||
|
||||
@param packet: The target Packet instance
|
||||
@param recipient_dicts: List of dictionaries of all recipients in this group
|
||||
@return None"""
|
||||
all_fields = set()
|
||||
for r in recipient_dicts:
|
||||
all_fields.update(k for k in r.keys() if not k.startswith('_'))
|
||||
|
||||
for f in sorted(all_fields):
|
||||
list_val = ", ".join([str(r.get(f, "")) for r in recipient_dicts])
|
||||
list_key = f"{f}_list"
|
||||
packet.set(list_key, list_val)
|
||||
self._register_wildcard_safe("{" + f.upper() + "_LIST}", list_key)
|
||||
|
||||
def _register_wildcard_safe(self, wildcard, field):
|
||||
r"""!Register wildcard if not already globally registered.
|
||||
|
||||
@param wildcard: The wildcard string (e.g. {MCAST_MODE})
|
||||
@param field: The packet field name
|
||||
@return None"""
|
||||
if wildcard not in self._wildcards_registered:
|
||||
self.registerWildcard(wildcard, field)
|
||||
self._wildcards_registered.add(wildcard)
|
||||
|
||||
# ============================================================
|
||||
# CLEANUP & TIMEOUT MANAGEMENT
|
||||
# ============================================================
|
||||
|
||||
def _cleanup_worker(self):
|
||||
r"""!Per-instance background thread for timeout management."""
|
||||
logging.info("[%s] Cleanup thread started", self.name)
|
||||
while self._running:
|
||||
time.sleep(1)
|
||||
try:
|
||||
self._check_all_my_frequencies()
|
||||
except Exception as e:
|
||||
logging.error("[%s] Error in cleanup thread: %s", self.name, e)
|
||||
if int(time.time()) % 60 == 0:
|
||||
self._cleanup_hard_timeout()
|
||||
|
||||
def _check_all_my_frequencies(self):
|
||||
r"""!Monitor timeouts for all frequencies assigned to this instance.
|
||||
|
||||
@param None
|
||||
@return None"""
|
||||
incomplete_packets = []
|
||||
trigger_data = []
|
||||
|
||||
with self._lock:
|
||||
current_time = time.time()
|
||||
for freq in list(self._my_frequencies):
|
||||
if freq not in self._tone_ric_packets or not self._tone_ric_packets[freq]:
|
||||
continue
|
||||
|
||||
if self._processing_text_ric.get(freq, False):
|
||||
flag_age = current_time - self._processing_text_ric_started.get(freq, current_time)
|
||||
if flag_age > 2:
|
||||
self._processing_text_ric[freq] = False
|
||||
self._processing_text_ric_started.pop(freq, None)
|
||||
else:
|
||||
continue
|
||||
|
||||
last_time = self._last_tone_ric_time.get(freq, 0)
|
||||
if current_time - last_time > self._auto_clear_timeout:
|
||||
recipient_dicts = self._tone_ric_packets[freq].copy()
|
||||
safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC)
|
||||
trigger_data.append((freq, safe_ric))
|
||||
self._tone_ric_packets[freq].clear()
|
||||
self._last_tone_ric_time.pop(freq, None)
|
||||
|
||||
logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout)
|
||||
packets = self._create_incomplete_multicast(freq, recipient_dicts)
|
||||
if packets:
|
||||
incomplete_packets.extend(packets)
|
||||
|
||||
if incomplete_packets:
|
||||
with self._queue_lock:
|
||||
self._packet_queue.extend(incomplete_packets)
|
||||
for freq, safe_ric in trigger_data:
|
||||
self._send_wakeup_trigger(freq, safe_ric)
|
||||
|
||||
def _check_instance_auto_clear(self, freq):
|
||||
r"""!Check if frequency has exceeded timeout (called from doWork).
|
||||
|
||||
@param freq: Frequency identifier
|
||||
@return list: Incomplete packets if timeout exceeded, else None"""
|
||||
with self._lock:
|
||||
if freq not in self._tone_ric_packets or not self._tone_ric_packets[freq]:
|
||||
return None
|
||||
last_time = self._last_tone_ric_time.get(freq, 0)
|
||||
if time.time() - last_time > self._auto_clear_timeout:
|
||||
recipient_dicts = self._tone_ric_packets[freq].copy()
|
||||
self._tone_ric_packets[freq].clear()
|
||||
self._last_tone_ric_time.pop(freq, None)
|
||||
logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts))
|
||||
return self._create_incomplete_multicast(freq, recipient_dicts)
|
||||
return None
|
||||
|
||||
def _cleanup_hard_timeout(self):
|
||||
r"""!Failsafe for really old packets."""
|
||||
with self._lock:
|
||||
current_time = time.time()
|
||||
for freq in list(self._tone_ric_packets.keys()):
|
||||
self._tone_ric_packets[freq] = [
|
||||
p for p in self._tone_ric_packets[freq]
|
||||
if current_time - p.get('_multicast_timestamp', 0) < self._hard_timeout
|
||||
]
|
||||
# cleaning empty frequencies
|
||||
if not self._tone_ric_packets[freq]:
|
||||
del self._tone_ric_packets[freq]
|
||||
|
||||
# ============================================================
|
||||
# TRIGGER SYSTEM
|
||||
# ============================================================
|
||||
|
||||
def _send_wakeup_trigger(self, freq, fallback_ric):
|
||||
r"""!Send a loopback trigger using the standard TCPClient class."""
|
||||
try:
|
||||
trigger_ric = self._trigger_ric if self._trigger_ric else fallback_ric
|
||||
payload = {
|
||||
"timestamp": time.time(),
|
||||
"mode": "pocsag",
|
||||
"bitrate": "1200",
|
||||
"ric": trigger_ric,
|
||||
"subric": "1",
|
||||
"subricText": "a",
|
||||
"message": self._MAGIC_WAKEUP_MSG,
|
||||
"clientName": "MulticastTrigger",
|
||||
"inputSource": "loopback",
|
||||
"frequency": freq
|
||||
}
|
||||
json_str = json.dumps(payload)
|
||||
|
||||
# using BOSWatch-Architecture
|
||||
client = TCPClient(timeout=2)
|
||||
if client.connect(self._trigger_host, self._trigger_port):
|
||||
# 1. Send
|
||||
client.transmit(json_str)
|
||||
|
||||
# 2. Recieve (getting [ack] and prevents connection reset)
|
||||
client.receive(timeout=1)
|
||||
|
||||
client.disconnect()
|
||||
logging.debug("[%s] Wakeup trigger sent and acknowledged (RIC=%s)", self.name, trigger_ric)
|
||||
else:
|
||||
logging.error("[%s] Could not connect to local server for wakeup", self.name)
|
||||
|
||||
except Exception as e:
|
||||
logging.error("[%s] Failed to send wakeup trigger: %s", self.name, e)
|
||||
|
||||
# ============================================================
|
||||
# LIFECYCLE (End)
|
||||
# ============================================================
|
||||
|
||||
def onUnload(self):
|
||||
r"""!Unregister instance from the global cleanup process.
|
||||
|
||||
@param None
|
||||
@return None"""
|
||||
self._running = False
|
||||
logging.debug("[%s] Multicast instance unloaded", self.name)
|
||||
|
|
@ -65,6 +65,15 @@ class PluginBase(ABC):
|
|||
The alarm() method serves the BOSWatch packet to the plugin.
|
||||
|
||||
@param bwPacket: A BOSWatch packet instance"""
|
||||
|
||||
# --- FIX: Multicast list support ---
|
||||
if isinstance(bwPacket, list):
|
||||
# if we got a list of packets, we have to run each packet through the complete alarm process (Setup -> Alarm -> Teardown)
|
||||
for single_packet in bwPacket:
|
||||
self._run(single_packet)
|
||||
return None
|
||||
# ---------------------------------------------------------------------
|
||||
|
||||
self._runCount += 1
|
||||
logging.debug("[%s] run #%d", self._pluginName, self._runCount)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue