mirror of
https://github.com/BOSWatch/BW3-Core.git
synced 2025-12-06 07:12:04 +01:00
Compare commits
3 commits
dc0f4ad2bd
...
1bd192b0d9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1bd192b0d9 | ||
|
|
524efbb0aa | ||
|
|
a6c3395f39 |
2
.github/workflows/run_pytest.yml
vendored
2
.github/workflows/run_pytest.yml
vendored
|
|
@ -1,4 +1,6 @@
|
|||
name: pytest
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
push:
|
||||
|
|
|
|||
|
|
@ -24,12 +24,14 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md)
|
|||
|descrField|Name des Feldes im BW Paket in welchem die Beschreibung gespeichert werden soll||
|
||||
|wildcard|Optional: Es kann für das angelegte `descrField` automatisch ein Wildcard registriert werden|None|
|
||||
|descriptions|Liste der Beschreibungen||
|
||||
|csvPath|Pfad der CSV-Datei (relativ zum Projektverzeichnis)||
|
||||
|
||||
#### `descriptions:`
|
||||
|Feld|Beschreibung|Default|
|
||||
|----|------------|-------|
|
||||
|for|Inhalt im `scanField` auf welchem geprüft werden soll||
|
||||
|add|Beschreibungstext welcher im `descrField` hinterlegt werden soll||
|
||||
|isRegex|Muss explizit auf `true` gesetzt werden, falls RegEx verwendet wird|false|
|
||||
|
||||
**Beispiel:**
|
||||
```yaml
|
||||
|
|
@ -44,6 +46,9 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md)
|
|||
add: FF DescriptorTest
|
||||
- for: '05678' # führende Nullen in '' !
|
||||
add: FF TestDescription
|
||||
- for: '890(1[1-9]|2[0-9])' # Regex-Pattern in '' !
|
||||
add: Feuerwehr Wache \\1 (BF)
|
||||
isRegex: true
|
||||
- scanField: status
|
||||
descrField: fmsStatDescr
|
||||
wildcard: "{STATUSTEXT}"
|
||||
|
|
@ -55,6 +60,62 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md)
|
|||
- ...
|
||||
```
|
||||
|
||||
**Wichtige Punkte für YAML-Regex:**
|
||||
- Apostroph: Regex-Pattern sollten in `'` stehen, um YAML-Parsing-Probleme zu vermeiden
|
||||
- isRegex-Flag: Muss explizit auf `true` gesetzt werden
|
||||
- Escaping: Backslashes müssen in YAML doppelt escaped werden (`\\1` statt `\1`)
|
||||
- Regex-Gruppen: Mit `\\1`, `\\2` etc. können Teile des Matches in der Beschreibung verwendet werden
|
||||
|
||||
#### `csvPath:`
|
||||
|
||||
**Beispiel:**
|
||||
```
|
||||
- type: module
|
||||
res: descriptor
|
||||
config:
|
||||
- scanField: tone
|
||||
descrField: description
|
||||
wildcard: "{DESCR}"
|
||||
csvPath: "config/descriptions_tone.csv"
|
||||
```
|
||||
|
||||
`csvPath` gibt den Pfad zu einer CSV-Datei an, relativ zum Projektverzeichnis (z. B. `"config/descriptions_tone.csv"`).
|
||||
|
||||
Eine neue CSV-Datei (z. B. `descriptions_tone.csv`) hat folgendes Format:
|
||||
|
||||
**Beispiel**
|
||||
```
|
||||
for,add,isRegex
|
||||
11111,KBI Landkreis Z,false
|
||||
12345,FF A-Dorf,false
|
||||
23456,FF B-Dorf,false
|
||||
^3456[0-9]$,FF Grossdorf, true
|
||||
```
|
||||
|
||||
In der Spalte isRegex kann **zusätzlich** angegeben werden, ob der Wert in for als regulärer Ausdruck interpretiert werden soll (true/false). Standardmäßig ist `false`.
|
||||
Wenn `isRegex` auf `true` gesetzt ist, wird der Wert aus `for` als regulärer Ausdruck ausgewertet.
|
||||
|
||||
### Kombination von YAML- und CSV-Konfiguration
|
||||
|
||||
Beide Varianten können parallel genutzt werden. In diesem Fall werden zuerst die Beschreibungen aus der YAML-Konfiguration und zusätzlich die Beschreibungen aus der angegebenen CSV-Datei geladen.
|
||||
|
||||
**Beispiel**
|
||||
|
||||
```
|
||||
- type: module
|
||||
res: descriptor
|
||||
config:
|
||||
- scanField: tone
|
||||
descrField: description
|
||||
wildcard: "{DESCR}"
|
||||
descriptions:
|
||||
- for: 12345
|
||||
add: FF YAML-Test
|
||||
- for: '05678' # führende Nullen in '' !
|
||||
add: FF YAML-Nullen
|
||||
csvPath: "config/descriptions_tone.csv"
|
||||
```
|
||||
|
||||
---
|
||||
## Modul Abhängigkeiten
|
||||
- keine
|
||||
|
|
@ -70,4 +131,4 @@ Informationen zum Aufbau eines [BOSWatch Pakets](../develop/packet.md)
|
|||
|
||||
---
|
||||
## Zusätzliche Wildcards
|
||||
- Von der Konfiguration abhängig
|
||||
- Von der Konfiguration abhängig
|
||||
|
|
@ -10,11 +10,14 @@ r"""!
|
|||
by Bastian Schroll
|
||||
|
||||
@file: descriptor.py
|
||||
@date: 27.10.2019
|
||||
@date: 04.08.2025
|
||||
@author: Bastian Schroll
|
||||
@description: Module to add descriptions to bwPackets
|
||||
@description: Module to add descriptions to bwPackets with CSV and Regex support
|
||||
"""
|
||||
import logging
|
||||
import csv
|
||||
import re
|
||||
import os
|
||||
from module.moduleBase import ModuleBase
|
||||
|
||||
# ###################### #
|
||||
|
|
@ -26,31 +29,172 @@ logging.debug("- %s loaded", __name__)
|
|||
|
||||
|
||||
class BoswatchModule(ModuleBase):
|
||||
r"""!Adds descriptions to bwPackets"""
|
||||
r"""!Adds descriptions to bwPackets with CSV and Regex support"""
|
||||
def __init__(self, config):
|
||||
r"""!Do not change anything here!"""
|
||||
super().__init__(__name__, config) # you can access the config class on 'self.config'
|
||||
|
||||
def onLoad(self):
|
||||
r"""!Called by import of the plugin"""
|
||||
for descriptor in self.config:
|
||||
if descriptor.get("wildcard", default=None):
|
||||
self.registerWildcard(descriptor.get("wildcard"), descriptor.get("descrField"))
|
||||
# Initialize unified cache for all descriptors
|
||||
self.unified_cache = {}
|
||||
|
||||
# Process each descriptor configuration
|
||||
for descriptor_config in self.config:
|
||||
scan_field = descriptor_config.get("scanField")
|
||||
descr_field = descriptor_config.get("descrField")
|
||||
descriptor_key = f"{scan_field}_{descr_field}"
|
||||
|
||||
# Register wildcard if specified
|
||||
if descriptor_config.get("wildcard", default=None):
|
||||
self.registerWildcard(descriptor_config.get("wildcard"), descr_field)
|
||||
|
||||
# Initialize cache for this descriptor
|
||||
self.unified_cache[descriptor_key] = []
|
||||
|
||||
# Load YAML descriptions first (for backward compatibility)
|
||||
yaml_descriptions = descriptor_config.get("descriptions", default=None)
|
||||
if yaml_descriptions:
|
||||
# yaml_descriptions is a Config object, we need to iterate properly
|
||||
for desc in yaml_descriptions:
|
||||
entry = {
|
||||
'for': str(desc.get("for", default="")),
|
||||
'add': desc.get("add", default=""),
|
||||
'isRegex': desc.get("isRegex", default=False) # Default: False
|
||||
}
|
||||
# Handle string 'true'/'false' values
|
||||
if isinstance(entry['isRegex'], str):
|
||||
entry['isRegex'] = entry['isRegex'].lower() == 'true'
|
||||
|
||||
self.unified_cache[descriptor_key].append(entry)
|
||||
logging.debug("Added YAML entry: %s -> %s", entry['for'], entry['add'])
|
||||
logging.info("Loaded %d YAML descriptions for %s", len(yaml_descriptions), descriptor_key)
|
||||
|
||||
# Load CSV descriptions if csvPath is specified
|
||||
csv_path = descriptor_config.get("csvPath", default=None)
|
||||
if csv_path:
|
||||
self._load_csv_data(csv_path, descriptor_key)
|
||||
|
||||
logging.info("Total entries for %s: %d", descriptor_key, len(self.unified_cache[descriptor_key]))
|
||||
|
||||
def _load_csv_data(self, csv_path, descriptor_key):
|
||||
r"""!Load CSV data for a descriptor and add to unified cache"""
|
||||
try:
|
||||
if not os.path.isfile(csv_path):
|
||||
logging.error("CSV file not found: %s", csv_path)
|
||||
return
|
||||
|
||||
csv_count = 0
|
||||
with open(csv_path, 'r', encoding='utf-8') as csvfile:
|
||||
reader = csv.DictReader(csvfile)
|
||||
for row in reader:
|
||||
# Set default values if columns are missing
|
||||
entry = {
|
||||
'for': str(row.get('for', '')),
|
||||
'add': row.get('add', ''),
|
||||
'isRegex': row.get('isRegex', 'false').lower() == 'true' # Default: False
|
||||
}
|
||||
self.unified_cache[descriptor_key].append(entry)
|
||||
csv_count += 1
|
||||
|
||||
logging.info("Loaded %d entries from CSV: %s for %s", csv_count, csv_path, descriptor_key)
|
||||
|
||||
except Exception as e:
|
||||
logging.error("Error loading CSV file %s: %s", csv_path, str(e))
|
||||
|
||||
def _find_description(self, descriptor_key, scan_value, bw_packet):
|
||||
r"""!Find matching description for a scan value with Regex group support."""
|
||||
descriptions = self.unified_cache.get(descriptor_key, [])
|
||||
scan_value_str = str(scan_value)
|
||||
|
||||
# Search for matching description
|
||||
for desc in descriptions:
|
||||
description_text = desc.get('add', '')
|
||||
match_pattern = desc.get('for', '')
|
||||
is_regex = desc.get('isRegex', False)
|
||||
|
||||
if is_regex:
|
||||
# Regex matching
|
||||
try:
|
||||
match = re.search(match_pattern, scan_value_str)
|
||||
if match:
|
||||
# Expand regex groups (\1, \2) in the description
|
||||
expanded_description = match.expand(description_text)
|
||||
|
||||
# Replace standard wildcards like {TONE}
|
||||
final_description = self._replace_wildcards(expanded_description, bw_packet)
|
||||
|
||||
logging.debug("Regex match '%s' -> '%s' for descriptor '%s'",
|
||||
match_pattern, final_description, descriptor_key)
|
||||
return final_description
|
||||
except re.error as e:
|
||||
logging.error("Invalid regex pattern '%s': %s", match_pattern, str(e))
|
||||
continue
|
||||
else:
|
||||
# Exact match
|
||||
if match_pattern == scan_value_str:
|
||||
# Replace standard wildcards like {TONE}
|
||||
final_description = self._replace_wildcards(description_text, bw_packet)
|
||||
logging.debug("Exact match '%s' -> '%s' for descriptor '%s'",
|
||||
match_pattern, final_description, descriptor_key)
|
||||
return final_description
|
||||
|
||||
return None
|
||||
|
||||
def _replace_wildcards(self, text, bw_packet):
|
||||
r"""!Replace all available wildcards in description text dynamically."""
|
||||
if not text or '{' not in text:
|
||||
return text
|
||||
|
||||
result = text
|
||||
|
||||
# Search for wildcards in the format {KEY} and replace them with values from the bw_packet
|
||||
found_wildcards = re.findall(r"\{([A-Z0-9_]+)\}", result)
|
||||
|
||||
for key in found_wildcards:
|
||||
key_lower = key.lower()
|
||||
value = bw_packet.get(key_lower)
|
||||
|
||||
if value is not None:
|
||||
result = result.replace(f"{{{key}}}", str(value))
|
||||
logging.debug("Replaced wildcard {%s} with value '%s'", key, value)
|
||||
|
||||
return result
|
||||
|
||||
def doWork(self, bwPacket):
|
||||
r"""!start an run of the module.
|
||||
|
||||
@param bwPacket: A BOSWatch packet instance"""
|
||||
for descriptor in self.config:
|
||||
if not bwPacket.get(descriptor.get("scanField")):
|
||||
break # scanField is not available in this packet
|
||||
bwPacket.set(descriptor.get("descrField"), bwPacket.get(descriptor.get("scanField")))
|
||||
for description in descriptor.get("descriptions"):
|
||||
if str(description.get("for")) == bwPacket.get(descriptor.get("scanField")):
|
||||
logging.debug("Description '%s' added in packet field '%s'",
|
||||
description.get("add"), descriptor.get("descrField"))
|
||||
bwPacket.set(descriptor.get("descrField"), description.get("add"))
|
||||
break # this descriptor has found a description - run next descriptor
|
||||
logging.debug("Processing packet with mode: %s", bwPacket.get("mode"))
|
||||
|
||||
# Process each descriptor configuration
|
||||
for descriptor_config in self.config:
|
||||
scan_field = descriptor_config.get("scanField")
|
||||
descr_field = descriptor_config.get("descrField")
|
||||
descriptor_key = f"{scan_field}_{descr_field}"
|
||||
|
||||
logging.debug("Processing descriptor: scanField='%s', descrField='%s'", scan_field, descr_field)
|
||||
|
||||
# Check if scanField is present in packet
|
||||
scan_value = bwPacket.get(scan_field)
|
||||
if scan_value is None:
|
||||
logging.debug("scanField '%s' not found in packet, skipping", scan_field)
|
||||
continue # scanField not available in this packet - try next descriptor
|
||||
|
||||
# Set default value (content of scanField)
|
||||
bwPacket.set(descr_field, str(scan_value))
|
||||
logging.debug("Set default value '%s' for field '%s'", scan_value, descr_field)
|
||||
|
||||
# Search for matching description in unified cache
|
||||
description = self._find_description(descriptor_key, scan_value, bwPacket)
|
||||
|
||||
if description:
|
||||
bwPacket.set(descr_field, description)
|
||||
logging.info("Description set: '%s' -> '%s'", scan_value, description)
|
||||
else:
|
||||
logging.debug("No description found for value '%s' in field '%s'", scan_value, scan_field)
|
||||
|
||||
logging.debug("Returning modified packet")
|
||||
return bwPacket
|
||||
|
||||
def onUnload(self):
|
||||
|
|
|
|||
Loading…
Reference in a new issue