Merge branch 'develop' into enh/descr-csv

This commit is contained in:
Bastian Schroll 2025-11-26 10:47:48 +01:00 committed by GitHub
commit 1a721f4258
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 181 additions and 79 deletions

View file

@ -2,16 +2,16 @@
---
## Beschreibung
Mit diesem Plugin ist es moeglich, Telegram-Nachrichten für POCSAG-Alarmierungen zu senden.
Außerdem werden Locations versendet, wenn die Felder `lat` und `lon` im Paket definiert sind. (beispielsweise durch das [Geocoding](../modul/geocoding.md) Modul)
Dieses Plugin ermöglicht das Versenden von Telegram-Nachrichten für verschiedene Alarmierungsarten.
Wenn im eingehenden Paket die Felder `lat` und `lon` vorhanden sind (z. B. durch das [Geocoding](../modul/geocoding.md) Modul), wird zusätzlich automatisch der Standort als Telegram-Location gesendet.
Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram API, damit keine Nachrichten verloren gehen, diese Funktion kann mit dem ```queue``` Parameter deaktiviert werden.
Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und exponentiellem Backoff, um die Vorgaben der Telegram API einzuhalten und Nachrichtenverluste zu verhindern. Die Retry-Parameter (max_retries, initial_delay, max_delay) können in der Konfiguration angepasst werden.
## Unterstütze Alarmtypen
- Fms
- Pocsag
- Zvei
- Msg
- FMS
- POCSAG
- ZVEI
- MSG
## Resource
`telegram`
@ -20,16 +20,17 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
|Feld|Beschreibung|Default|
|----|------------|-------|
|botToken|Der Api-Key des Telegram-Bots||
|chatIds|Liste mit Chat-Ids der Empfängers / der Emfänger-Gruppen||
|startup_message|Nachricht, dass das Telegram-Plugin erfolgreich geladen wurde|leer|
|message_fms|Format der Nachricht für FMS|`{FMS}`|
|message_pocsag|Format der Nachricht für Pocsag|`{RIC}({SRIC})\n{MSG}`|
|message_zvei|Format der Nachricht für ZVEI|`{TONE}`|
|message_msg|Format der Nachricht für MSG||
|max_retries|Anzahl der Versuche, bis das Senden abgebrochen wird|5|
|initial_delay|Verzögerung des zweiten Sendeversuchs|2 [Sek.]|
|max_delay|Maximale Verzögerung|60 [Sek.]|
|botToken|Der Api-Key des Telegram-Bots|-|
|chatIds|Liste mit Chat-Ids der Empfängers / der Empfänger-Gruppen|-|
|startup_message|Nachricht beim erfolgreichen Initialisieren des Plugins|leer|
|message_fms|Formatvorlage für FMS-Alarm|`{FMS}`|
|message_pocsag|Formatvorlage für POCSAG|`{RIC}({SRIC})\n{MSG}`|
|message_zvei|Formatvorlage für ZVEI|`{TONE}`|
|message_msg|Formatvorlage für MSG-Nachricht|-|
|max_retries|Anzahl Wiederholungsversuche bei Fehlern|5|
|initial_delay|Initiale Wartezeit bei Wiederholungsversuchen|2 [Sek.]|
|max_delay|Maximale Retry-Verzögerung|300 [Sek.]|
|parse_mode|Formatierung ("HTML" oder "MarkdownV2"), Case-sensitive!|leer|
**Beispiel:**
```yaml
@ -37,20 +38,32 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
name: Telegram Plugin
res: telegram
config:
message_pocsag: "{RIC}({SRIC})\n{MSG}"
message_pocsag: |
<b>POCSAG Alarm:</b>
RIC: <b>{RIC}</b> ({SRIC})
{MSG}
parse_mode: "HTML"
startup_message: "Server up and running!"
botToken: "BOT_TOKEN"
chatIds:
- "CHAT_ID"
```
Hinweis:
Über parse_mode kannst du Telegram-Formatierungen verwenden:
- HTML: `<b>fett</b>`, `<i>kursiv</i>`, `<u>unterstrichen</u>`, `<s>durchgestrichen</s>`, ...
- MarkdownV2: `**fett**`, `__unterstrichen__`, `_italic \*text_` usw. (Escape-Regeln beachten)
Block-Strings (|) eignen sich perfekt für mehrzeilige Nachrichten und vermeiden Escape-Zeichen wie \n
---
## Modul Abhängigkeiten
Aus dem Modul [Geocoding](../modul/geocoding.md) (optional/nur POCSAG):
OPTIONAL, nur für POCSAG-Locationversand: Aus dem Modul [Geocoding](../modul/geocoding.md):
- `lat`
- `lon`
---
## Externe Abhängigkeiten
- python-telegram-bot
keine

View file

@ -9,8 +9,8 @@ r"""!
German BOS Information Script
by Bastian Schroll
@file: http.py
@date: 21.07.2025
@file: install_service.py
@date: 15.11.2025
@author: Claus Schichl
@description: Install Service File with argparse CLI
"""
@ -21,13 +21,9 @@ import sys
import logging
import argparse
import yaml
from colorama import init as colorama_init, Fore, Style
from pathlib import Path
# === Initialisiere Colorama für Windows/Konsole ===
colorama_init(autoreset=True)
# === Konstanten ===
# === constants for directories and files ===
BASE_DIR = Path(__file__).resolve().parent
BW_DIR = '/opt/boswatch3'
SERVICE_DIR = Path('/etc/systemd/system')
@ -35,7 +31,8 @@ CONFIG_DIR = (BASE_DIR / 'config').resolve()
LOG_FILE = (BASE_DIR / 'log' / 'install' / 'service_install.log').resolve()
os.makedirs(LOG_FILE.parent, exist_ok=True)
# === Sprache (kapselt globale Variable) ===
# === language management (default german)===
_lang = 'de'
@ -48,7 +45,7 @@ def set_lang(lang):
_lang = lang
# === Texte ===
# === text-dictionary ===
TEXT = {
"de": {
"script_title": "🛠️ BOSWatch Service Manager",
@ -88,7 +85,17 @@ TEXT = {
"service_active": "✅ Service {0} läuft erfolgreich.",
"service_inactive": "⚠ Service {0} ist **nicht aktiv** bitte prüfen.",
"dryrun_status_check": "🧪 [Dry-Run] Service-Status von {0} würde jetzt geprüft.",
"max_attempts_exceeded": "❌ Maximale Anzahl an Eingabeversuchen überschritten. Das Menü wird beendet."
"max_attempts_exceeded": "❌ Maximale Anzahl an Eingabeversuchen überschritten. Das Menü wird beendet.",
"user_interrupt": "\nAbbruch durch Benutzer.",
"unhandled_error": "Unbehandelter Fehler: {}",
"max_retries_skip": "Maximale Anzahl Eingabeversuche überschritten. Überspringe Service.",
"max_retries_exit": "Maximale Anzahl Eingabeversuche überschritten. Beende Programm.",
"colorama_missing": "⚠️ Colorama nicht installiert versuche automatische Installation...",
"colorama_install": "➡️ Installiere Colorama...",
"colorama_install_ok": "✅ Colorama erfolgreich installiert.",
"colorama_install_fail": "❌ Colorama konnte nicht automatisch installiert werden.",
"verify_timeout": "⚠ Timeout bei systemd-analyze verify für: {}",
"status_timeout": "⚠ Timeout beim Prüfen des Service-Status: {}"
},
"en": {
@ -129,14 +136,78 @@ TEXT = {
"service_active": "✅ Service {0} is running successfully.",
"service_inactive": "⚠ Service {0} is **not active** please check.",
"dryrun_status_check": "🧪 [Dry-Run] Service status of {0} would be checked now.",
"max_attempts_exceeded": "❌ Maximum number of input attempts exceeded. Exiting menu."
"max_attempts_exceeded": "❌ Maximum number of input attempts exceeded. Exiting menu.",
"user_interrupt": "\nInterrupted by user.",
"unhandled_error": "Unhandled error: {}",
"max_retries_skip": "Maximum input attempts exceeded. Skipping service.",
"max_retries_exit": "Maximum input attempts exceeded. Exiting program.",
"colorama_missing": "⚠️ Colorama not installed attempting automatic installation...",
"colorama_install": "➡️ Installing Colorama...",
"colorama_install_ok": "✅ Colorama installed successfully.",
"colorama_install_fail": "❌ Colorama could not be installed automatically.",
"verify_timeout": "⚠ Timeout during systemd-analyze verify for: {}",
"status_timeout": "⚠ Timeout while checking service status: {}"
}
}
# === Logging Setup ===
def setup_logging(verbose=False, quiet=False):
# === COLORAMA AUTO-INSTALL (dual language) ===
def colorama_auto_install():
r"""
Auto-installs colorama if missing.
Note: Language detection happens before colorama is available.
"""
# recognize language early (before colorama installation)
import argparse
early_parser = argparse.ArgumentParser(add_help=False)
early_parser.add_argument('--lang', '-l', choices=['de', 'en'], default='de')
early_args, _ = early_parser.parse_known_args()
lang = early_args.lang
# use text from global TEXT dictionary
txt = TEXT[lang]
try:
from colorama import init as colorama_init, Fore, Style
colorama_init(autoreset=True)
return True, Fore, Style
except ImportError:
print(txt["colorama_missing"])
# install Colorama
print(txt["colorama_install"])
subprocess.run(["sudo", "apt", "install", "-y", "python3-colorama"], check=False)
# retry importing Colorama
try:
from colorama import init as colorama_init, Fore, Style
colorama_init(autoreset=True)
print(txt["colorama_install_ok"])
return True, Fore, Style
except ImportError:
print(txt["colorama_install_fail"])
return False, None, None
# === import / install colorama ===
colorama_available, Fore, Style = colorama_auto_install()
if not colorama_available:
# provides dummy classes if colorama is not available (no crash)
class DummyStyle:
RESET_ALL = ""
BRIGHT = ""
class DummyFore:
RED = GREEN = YELLOW = BLUE = CYAN = MAGENTA = WHITE = RESET = ""
Fore = DummyFore()
Style = DummyStyle()
# === logging Setup ===
def setup_logging(verbose=False, quiet=False):
r"""
Setup logging to file and console with colorized output.
"""
log_level = logging.INFO
@ -178,7 +249,7 @@ def setup_logging(verbose=False, quiet=False):
def t(key):
"""
r"""
Translation helper: returns the localized string for the given key.
"""
lang = get_lang()
@ -186,7 +257,7 @@ def t(key):
def get_user_input(prompt, valid_inputs, max_attempts=3):
"""
r"""
Prompt user for input until a valid input from valid_inputs is entered or max_attempts exceeded.
Raises RuntimeError on failure.
"""
@ -197,22 +268,22 @@ def get_user_input(prompt, valid_inputs, max_attempts=3):
return value
logging.warning(t("invalid_input").format(", ".join(valid_inputs)))
attempts += 1
raise RuntimeError("Maximale Anzahl Eingabeversuche überschritten.")
raise RuntimeError(t("max_attempts_exceeded"))
def list_yaml_files():
"""
r"""
Returns a list of .yaml or .yml files in the config directory.
"""
return [f.name for f in CONFIG_DIR.glob("*.y*ml")]
return sorted([f.name for f in CONFIG_DIR.glob("*.y*ml")])
def test_yaml_file(file_path):
"""
r"""
Tests if YAML file can be loaded without error.
"""
try:
content = file_path.read_text()
content = file_path.read_text(encoding='utf-8')
yaml.safe_load(content)
return True
except Exception as e:
@ -221,11 +292,11 @@ def test_yaml_file(file_path):
def detect_yaml_type(file_path):
"""
r"""
Detects if YAML config is 'client' or 'server' type.
"""
try:
with open(file_path, 'r') as f:
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if 'client' in data:
return 'client'
@ -240,7 +311,7 @@ def detect_yaml_type(file_path):
def execute(command, dry_run=False):
"""
r"""
Executes shell command unless dry_run is True.
"""
logging.debug(f"{command}")
@ -249,21 +320,28 @@ def execute(command, dry_run=False):
def verify_service(service_path):
"""
r"""
Runs 'systemd-analyze verify' on the service file and logs warnings/errors.
"""
try:
result = subprocess.run(['systemd-analyze', 'verify', service_path], capture_output=True, text=True)
result = subprocess.run(
['systemd-analyze', 'verify', service_path],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0 or result.stderr:
logging.warning(t("verify_warn").format(result.stderr.strip()))
else:
logging.debug(t("verify_ok").format(os.path.basename(service_path)))
except subprocess.TimeoutExpired:
logging.warning(t("verify_timeout").format(os.path.basename(service_path)))
except Exception as e:
logging.error(t("yaml_error").format(service_path, e))
def install_service(yaml_file, dry_run=False):
"""
r"""
Creates and installs systemd service based on YAML config.
"""
yaml_path = CONFIG_DIR / yaml_file
@ -280,7 +358,7 @@ def install_service(yaml_file, dry_run=False):
service_path = SERVICE_DIR / service_name
if is_server:
exec_line = f"/usr/bin/python {BW_DIR}/bw_server.py -c {yaml_file}"
exec_line = f"/usr/bin/python3 {BW_DIR}/bw_server.py -c {yaml_file}"
description = "BOSWatch Server"
after = "network-online.target"
wants = "Wants=network-online.target"
@ -309,49 +387,56 @@ WantedBy=multi-user.target
if not dry_run:
try:
with open(service_path, 'w') as f:
with open(service_path, 'w', encoding='utf-8') as f:
f.write(service_content)
except IOError as e:
logging.error(t("file_write_error").format(service_path, e))
return
verify_service(service_path)
execute("systemctl daemon-reexec", dry_run=dry_run)
execute("systemctl daemon-reload", dry_run=dry_run)
execute(f"systemctl enable {service_name}", dry_run=dry_run)
execute(f"systemctl start {service_name}", dry_run=dry_run)
if not dry_run:
try:
subprocess.run(
["systemctl", "is-active", "--quiet", service_name],
check=True
check=True,
timeout=5
)
logging.info(t("service_active").format(service_name))
except subprocess.CalledProcessError:
logging.warning(t("service_inactive").format(service_name))
except subprocess.TimeoutExpired:
logging.warning(t("status_timeout").format(service_name))
else:
logging.info(t("dryrun_status_check").format(service_name))
def remove_service(service_name, dry_run=False):
"""
r"""
Stops, disables and removes the given systemd service.
"""
logging.warning(t("removing_service").format(service_name))
execute(f"systemctl stop {service_name}", dry_run=dry_run)
execute(f"systemctl disable {service_name}", dry_run=dry_run)
service_path = Path(SERVICE_DIR) / service_name
service_path = SERVICE_DIR / service_name
if not dry_run and service_path.exists():
try:
os.remove(service_path)
service_path.unlink()
logging.info(t("service_deleted").format(service_name))
except Exception as e:
logging.error(t("file_write_error").format(service_path, e))
else:
logging.warning(t("service_not_found").format(service_name))
execute("systemctl daemon-reload", dry_run=dry_run)
def remove_menu(dry_run=False):
"""
r"""
Interactive menu to remove services.
"""
while True:
@ -384,16 +469,16 @@ def remove_menu(dry_run=False):
elif auswahl == 'a':
for s in services:
remove_service(s, dry_run=dry_run)
# Danach direkt weiter zur nächsten Schleife (aktualisierte Liste!)
# directly continue to the next loop (updated list!)
continue
else:
remove_service(services[int(auswahl)], dry_run=dry_run)
# Danach ebenfalls weiter zur nächsten Schleife (aktualisierte Liste!)
# also directly continue to the next loop (updated list!)
continue
def init_language():
"""
r"""
Parses --lang/-l argument early to set language before other parsing.
"""
lang_parser = argparse.ArgumentParser(add_help=False)
@ -410,8 +495,8 @@ def init_language():
def main(dry_run=False):
"""
Hauptprogramm: Service installieren oder entfernen.
r"""
main program: install or remove service.
"""
print(Fore.GREEN + Style.BRIGHT + t("script_title") + Style.RESET_ALL)
print(t('mode_dry') if dry_run else t('mode_live'))
@ -432,7 +517,7 @@ def main(dry_run=False):
try:
action = get_user_input(t("action_prompt"), ['i', 'r', 'e'])
except RuntimeError:
logging.error("Maximale Anzahl Eingabeversuche überschritten. Beende Programm.")
logging.error(t("max_retries_exit"))
sys.exit(1)
if action == 'e':
@ -444,7 +529,7 @@ def main(dry_run=False):
try:
edited = get_user_input(t("edited_prompt"), ['y', 'n'])
except RuntimeError:
logging.error("Maximale Anzahl Eingabeversuche überschritten. Beende Programm.")
logging.error(t("max_retries_exit"))
sys.exit(1)
if edited == 'n':
@ -464,7 +549,7 @@ def main(dry_run=False):
try:
install = get_user_input(t("install_confirm").format(yaml_file), ['y', 'n'])
except RuntimeError:
logging.error("Maximale Anzahl Eingabeversuche überschritten. Überspringe Service.")
logging.error(t("max_retries_skip"))
skipped += 1
continue
@ -501,9 +586,8 @@ if __name__ == "__main__":
try:
main(dry_run=args.dry_run)
except KeyboardInterrupt:
print("\nAbbruch durch Benutzer.")
print(t("user_interrupt"))
sys.exit(1)
except Exception as e:
logging.critical(f"Unbehandelter Fehler: {e}")
logging.critical(t("unhandled_error").format(e))
sys.exit(1)
# === Ende des Skripts ===

View file

@ -10,7 +10,7 @@ r"""!
by Bastian Schroll
@file: telegram.py
@date: 12.07.2025
@date: 17.11.2025
@author: Claus Schichl nach der Idee von Jan Speller
@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
"""
@ -31,16 +31,17 @@ logger = logging.getLogger(__name__)
# ===========================
# TelegramSender-Klasse
# TelegramSender-Class
# ===========================
class TelegramSender:
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None):
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None, parse_mode=None):
self.bot_token = bot_token
self.chat_ids = chat_ids
self.max_retries = max_retries if max_retries is not None else 5
self.initial_delay = initial_delay if initial_delay is not None else 2
self.max_delay = max_delay if max_delay is not None else 300
self.parse_mode = parse_mode
self.msg_queue = queue.Queue()
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
@ -75,11 +76,11 @@ class TelegramSender:
logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
# Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden
# use the Telegram-provided value (retry_after) if available
wait_time = custom_delay if custom_delay is not None else delay
time.sleep(wait_time)
# Erhöhe Delay für den nächsten Versuch (exponentielles Backoff)
# increase delay for the next attempt (exponential backoff)
delay = min(delay * 2, self.max_delay)
except Exception as e:
@ -91,8 +92,10 @@ class TelegramSender:
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': content
'text': content,
}
if self.parse_mode:
payload['parse_mode'] = self.parse_mode
elif msg_type == "location":
url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
payload = {
@ -101,25 +104,25 @@ class TelegramSender:
}
else:
logger.error("Unbekannter Nachrichtentyp.")
return False, True, None # Unbekannter Typ = permanent falsch
return False, True, None # unknown message type = permanent failure
try:
custom_delay = None # Standardwert für Rückgabe, außer bei 429
custom_delay = None # standardvalue for return, except in case of 429
response = requests.post(url, data=payload, timeout=10)
if response.status_code == 429:
custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
logger.warning(f"Rate Limit erreicht warte {custom_delay} Sekunden.")
return False, False, custom_delay # Telegram gibt genaue Wartezeit vor
return False, False, custom_delay # Telegram gives exact wait time
if response.status_code == 400:
logger.error("Ungültige Parameter Nachricht wird nicht erneut gesendet.")
return False, True, custom_delay # Permanent fehlerhaft
return False, True, custom_delay # permanent failure
if response.status_code == 401:
logger.critical("Ungültiger Bot-Token bitte prüfen!")
return False, True, custom_delay # Permanent fehlerhaft
return False, True, custom_delay # permanent failure
response.raise_for_status()
logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
@ -131,7 +134,7 @@ class TelegramSender:
# ===========================
# BoswatchPlugin-Klasse
# BoswatchPlugin-Class
# ===========================
@ -149,17 +152,19 @@ class BoswatchPlugin(PluginBase):
logger.error("botToken oder chatIds fehlen in der Konfiguration!")
return
# Konfigurierbare Parameter mit Fallback-Defaults
# configurable parameters with fallback defaults
max_retries = self.config.get("max_retries")
initial_delay = self.config.get("initial_delay")
max_delay = self.config.get("max_delay")
parse_mode = self.config.get("parse_mode")
self.sender = TelegramSender(
bot_token=bot_token,
chat_ids=chat_ids,
max_retries=max_retries,
initial_delay=initial_delay,
max_delay=max_delay
max_delay=max_delay,
parse_mode=parse_mode
)
startup_message = self.config.get("startup_message")