This commit is contained in:
KoenigMjr 2025-12-03 21:29:08 +00:00 committed by GitHub
commit 3e951c86ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 256 additions and 24 deletions

View file

@ -2,9 +2,9 @@
--- ---
## Beschreibung ## Beschreibung
Mit diesem Modul können einem Alarmpaket beliebige Beschreibungen in Abhänigkeit der enthaltenen Informationen hinzugefügt werden. Mit diesem Modul können einem Alarmpaket beliebige Beschreibungen in Abhängigkeit der enthaltenen Informationen hinzugefügt werden.
## Unterstütze Alarmtypen ## Unterstützte Alarmtypen
- Fms - Fms
- Pocsag - Pocsag
- Zvei - Zvei
@ -16,20 +16,22 @@ Mit diesem Modul können einem Alarmpaket beliebige Beschreibungen in Abhänigke
## Konfiguration ## Konfiguration
Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md) Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md)
**Achtung:** Zahlen welche führende Nullen entahlten müssen in Anführungszeichen gesetzt werden Bsp. `'0012345'` **Achtung:** Zahlen, die führende Nullen enthalten, müssen in der YAML-Konfiguration in Anführungszeichen gesetzt werden, z.B. `'0012345'`. In CSV-Dateien ist dies nicht zwingend erforderlich.
|Feld|Beschreibung|Default| |Feld|Beschreibung|Default|
|----|------------|-------| |----|------------|-------|
|scanField|Feld des BW Pakets welches geprüft werden soll|| |scanField|Feld des BW Pakets, welches geprüft werden soll||
|descrField|Name des Feldes im BW Paket in welchem die Beschreibung gespeichert werden soll|| |descrField|Name des Feldes im BW Paket, in welchem die Beschreibung gespeichert werden soll||
|wildcard|Optional: Es kann für das angelegte `descrField` automatisch ein Wildcard registriert werden|None| |wildcard|Optional: Es kann für das angelegte `descrField` automatisch ein Wildcard registriert werden|None|
|descriptions|Liste der Beschreibungen|| |descriptions|Liste der Beschreibungen||
|csvPath|Pfad der CSV-Datei (relativ zum Projektverzeichnis)||
#### `descriptions:` #### `descriptions:`
|Feld|Beschreibung|Default| |Feld|Beschreibung|Default|
|----|------------|-------| |----|------------|-------|
|for|Inhalt im `scanField` auf welchem geprüft werden soll|| |for|Wert im `scanField`, der geprüft werden soll||
|add|Beschreibungstext welcher im `descrField` hinterlegt werden soll|| |add|Beschreibungstext, der im `descrField` hinterlegt werden soll||
|isRegex|Muss explizit auf `true` gesetzt werden, wenn RegEx verwendet wird|false|
**Beispiel:** **Beispiel:**
```yaml ```yaml
@ -44,6 +46,9 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md)
add: FF DescriptorTest add: FF DescriptorTest
- for: '05678' # führende Nullen in '' ! - for: '05678' # führende Nullen in '' !
add: FF TestDescription add: FF TestDescription
- for: '890(1[1-9]|2[0-9])' # Regex-Pattern in '' !
add: Feuerwehr Wache \\1 (BF)
isRegex: true
- scanField: status - scanField: status
descrField: fmsStatDescr descrField: fmsStatDescr
wildcard: "{STATUSTEXT}" wildcard: "{STATUSTEXT}"
@ -55,6 +60,70 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md)
- ... - ...
``` ```
**Wichtige Punkte für YAML-Regex:**
- Apostroph: Regex-Pattern sollten in `'` stehen, um YAML-Parsing-Probleme zu vermeiden
- isRegex-Flag: Muss explizit auf `true` gesetzt werden
- Escaping: Backslashes müssen in YAML doppelt escaped werden (`\\1` statt `\1`)
- Regex-Gruppen: Mit `\\1`, `\\2` etc. können Teile des Matches in der Beschreibung verwendet werden
#### `csvPath:`
**Beispiel:**
```
- type: module
res: descriptor
config:
- scanField: tone
descrField: description
wildcard: "{DESCR}"
csvPath: "config/descriptions_tone.csv"
```
`csvPath` gibt den Pfad zu einer CSV-Datei an, relativ zum Projektverzeichnis (z. B. `"config/descriptions_tone.csv"`).
Eine neue CSV-Datei (z. B. `descriptions_tone.csv`) hat folgendes Format:
**Beispiel**
```
for,add,isRegex
11111,KBI Landkreis Z,false
12345,FF A-Dorf,false
23456,FF B-Dorf
^3456[0-9]$,FF Grossdorf,true
```
**Hinweis:** In CSV-Dateien müssen Werte mit führenden Nullen **nicht** in Anführungszeichen gesetzt werden (können aber, falls gewünscht). Die Spalte `isRegex` gibt an, ob der Wert in `for` als regulärer Ausdruck interpretiert werden soll (true/false). Falls kein Wert angegeben ist, ist die Paarung standardmäßig `false`, wie z.B. beim Eintrag der FF B-Dorf im obigen Beispiel.
### Kombination von YAML- und CSV-Konfiguration
Beide Varianten können parallel genutzt werden. In diesem Fall werden die Beschreibungen aus der YAML-Konfiguration und aus der angegebenen CSV-Datei in einer gemeinsamen Datenbank zusammengeführt.
#### Matching-Reihenfolge und Priorität
Das Modul wendet folgende Prioritäten beim Matching an:
1. **Exakte Matches** (aus YAML und CSV) werden zuerst geprüft
2. **Regex-Matches** (aus YAML und CSV) werden nur geprüft, wenn kein exakter Match gefunden wurde
**Beispiel für Kombination:**
```yaml
- type: module
res: descriptor
config:
- scanField: tone
descrField: description
wildcard: "{DESCR}"
descriptions:
- for: 12345
add: FF YAML-Test (exakt)
- for: '123.*'
add: FF YAML-Regex
isRegex: true
csvPath: "config/descriptions_tone.csv"
```
Bei einem `scanField`-Wert von `12345` wird **immer** "FF YAML-Test (exakt)" verwendet, auch wenn der Regex ebenfalls zutreffen würde. Regex-Matches werden nur verwendet, wenn kein exakter Match gefunden wurde - unabhängig davon, ob die Einträge aus YAML oder CSV stammen.
--- ---
## Modul Abhängigkeiten ## Modul Abhängigkeiten
- keine - keine

View file

@ -10,11 +10,14 @@ r"""!
by Bastian Schroll by Bastian Schroll
@file: descriptor.py @file: descriptor.py
@date: 27.10.2019 @date: 03.12.2025
@author: Bastian Schroll @author: Bastian Schroll
@description: Module to add descriptions to bwPackets @description: Module to add descriptions to bwPackets with CSV and Regex support
""" """
import logging import logging
import csv
import re
import os
from module.moduleBase import ModuleBase from module.moduleBase import ModuleBase
# ###################### # # ###################### #
@ -26,31 +29,191 @@ logging.debug("- %s loaded", __name__)
class BoswatchModule(ModuleBase): class BoswatchModule(ModuleBase):
r"""!Adds descriptions to bwPackets""" r"""!Adds descriptions to bwPackets with CSV and Regex support"""
def __init__(self, config): def __init__(self, config):
r"""!Do not change anything here!""" r"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config' super().__init__(__name__, config) # you can access the config class on 'self.config'
def onLoad(self): def onLoad(self):
r"""!Called by import of the plugin""" r"""!Called by import of the plugin"""
for descriptor in self.config: # Initialize unified cache for all descriptors
if descriptor.get("wildcard", default=None): self.unified_cache = {}
self.registerWildcard(descriptor.get("wildcard"), descriptor.get("descrField"))
# Process each descriptor configuration
for descriptor_config in self.config:
scan_field = descriptor_config.get("scanField")
descr_field = descriptor_config.get("descrField")
descriptor_key = f"{scan_field}_{descr_field}"
# Register wildcard if specified
if descriptor_config.get("wildcard", default=None):
self.registerWildcard(descriptor_config.get("wildcard"), descr_field)
# Initialize cache for this descriptor
self.unified_cache[descriptor_key] = []
# Load YAML descriptions first (for backward compatibility)
yaml_descriptions = descriptor_config.get("descriptions", default=None)
if yaml_descriptions:
# yaml_descriptions is a Config object, we need to iterate properly
for desc in yaml_descriptions:
entry = {
'for': str(desc.get("for", default="")),
'add': desc.get("add", default=""),
'isRegex': desc.get("isRegex", default=False) # Default: False
}
# Handle string 'true'/'false' values
if isinstance(entry['isRegex'], str):
entry['isRegex'] = entry['isRegex'].lower() == 'true'
self.unified_cache[descriptor_key].append(entry)
logging.debug("Added YAML entry: %s -> %s", entry['for'], entry['add'])
logging.info("Loaded %d YAML descriptions for %s", len(yaml_descriptions), descriptor_key)
# Load CSV descriptions if csvPath is specified
csv_path = descriptor_config.get("csvPath", default=None)
if csv_path:
self._load_csv_data(csv_path, descriptor_key)
logging.info("Total entries for %s: %d", descriptor_key, len(self.unified_cache[descriptor_key]))
def _load_csv_data(self, csv_path, descriptor_key):
r"""!Load CSV data for a descriptor and add to unified cache"""
try:
if not os.path.isfile(csv_path):
logging.error("CSV file not found: %s", csv_path)
return
csv_count = 0
with open(csv_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# Set default values if columns are missing
raw_for = str(row.get('for', '')).strip()
# Remove enclosing quotes
clean_for = raw_for.strip().strip('"').strip("'")
entry = {
'for': clean_for,
'add': row.get('add', '').strip(),
'isRegex': row.get('isRegex', 'false').lower() == 'true' # Default: False
}
logging.debug("CSV row read: %s", row)
self.unified_cache[descriptor_key].append(entry)
csv_count += 1
logging.info("Loaded %d entries from CSV: %s for %s", csv_count, csv_path, descriptor_key)
except Exception as e:
logging.error("Error loading CSV file %s: %s", csv_path, str(e))
def _find_description(self, descriptor_key, scan_value, bw_packet):
r"""!Find matching description for a scan value with Regex group support.
The search is performed in two passes for performance optimization:
1. First pass: Check for exact string matches (fast, no regex compilation)
2. Second pass: Check regex patterns only if no exact match was found
Regex patterns support capture groups that can be referenced in the description
using standard regex backreferences (\1, \2, etc.) via match.expand().
Example:
Pattern: r"(\d{7})"
Input: "1234567"
Description template: "RIC: \1"
Result: "RIC: 1234567"
@param descriptor_key: Cache key identifying the descriptor configuration
@param scan_value: Value to search for in the descriptor cache
@param bw_packet: BOSWatch packet for wildcard replacement
@return: Matched description string or None if no match found
"""
descriptions = self.unified_cache.get(descriptor_key, [])
scan_value_str = str(scan_value).strip()
# First pass: Search for exact matches (performance optimization)
# Exact matches are checked first because they don't require regex compilation
for desc in descriptions:
if not desc.get('isRegex', False):
if desc['for'] == scan_value_str:
description_text = desc.get('add', '')
final_description = self._replace_wildcards(description_text, bw_packet)
return final_description
# Second pass: Search for regex matches
# Only executed if no exact match was found in the first pass
for desc in descriptions:
if desc.get('isRegex', False):
match_pattern = desc.get('for', '')
try:
match = re.search(match_pattern, scan_value_str)
if match:
description_text = desc.get('add', '')
# match.expand() replaces backreferences (\1, \2, etc.) with captured groups
# Example: pattern="(\d+)-(\d+)", input="123-456", template="First: \1, Second: \2"
# result="First: 123, Second: 456"
expanded_description = match.expand(description_text)
final_description = self._replace_wildcards(expanded_description, bw_packet)
return final_description
except re.error as e:
logging.error("Invalid regex pattern '%s': %s", match_pattern, e)
return None
def _replace_wildcards(self, text, bw_packet):
r"""!Replace all available wildcards in description text dynamically."""
if not text or '{' not in text:
return text
result = text
# Search for wildcards in the format {KEY} and replace them with values from the bw_packet
found_wildcards = re.findall(r"\{([A-Z0-9_]+)\}", result)
for key in found_wildcards:
key_lower = key.lower()
value = bw_packet.get(key_lower)
if value is not None:
result = result.replace(f"{{{key}}}", str(value))
logging.debug("Replaced wildcard {%s} with value '%s'", key, value)
return result
def doWork(self, bwPacket): def doWork(self, bwPacket):
r"""!start an run of the module. r"""!start an run of the module.
@param bwPacket: A BOSWatch packet instance""" @param bwPacket: A BOSWatch packet instance"""
for descriptor in self.config: logging.debug("Processing packet with mode: %s", bwPacket.get("mode"))
if not bwPacket.get(descriptor.get("scanField")):
break # scanField is not available in this packet # Process each descriptor configuration
bwPacket.set(descriptor.get("descrField"), bwPacket.get(descriptor.get("scanField"))) for descriptor_config in self.config:
for description in descriptor.get("descriptions"): scan_field = descriptor_config.get("scanField")
if str(description.get("for")) == bwPacket.get(descriptor.get("scanField")): descr_field = descriptor_config.get("descrField")
logging.debug("Description '%s' added in packet field '%s'", descriptor_key = f"{scan_field}_{descr_field}"
description.get("add"), descriptor.get("descrField"))
bwPacket.set(descriptor.get("descrField"), description.get("add")) logging.debug("Processing descriptor: scanField='%s', descrField='%s'", scan_field, descr_field)
break # this descriptor has found a description - run next descriptor
# Check if scanField is present in packet
scan_value = bwPacket.get(scan_field)
if scan_value is None:
logging.debug("scanField '%s' not found in packet, skipping", scan_field)
continue # scanField not available in this packet - try next descriptor
# Set default value (content of scanField)
bwPacket.set(descr_field, str(scan_value))
logging.debug("Set default value '%s' for field '%s'", scan_value, descr_field)
# Search for matching description in unified cache
description = self._find_description(descriptor_key, scan_value, bwPacket)
if description:
bwPacket.set(descr_field, description)
logging.info("Description set: '%s' -> '%s'", scan_value, description)
else:
logging.debug("No description found for value '%s' in field '%s'", scan_value, scan_field)
logging.debug("Returning modified packet")
return bwPacket return bwPacket
def onUnload(self): def onUnload(self):