Compare commits

...

6 commits

Author SHA1 Message Date
KoenigMjr e1ecf4ef1c
Merge 23d1b1a328 into 017e882363 2025-10-22 08:00:01 +00:00
KoenigMjr 23d1b1a328 Fix POCSAG decoding crash caused by invalid subric parsing
Errorcode führte zu Programmexit:

> 12.10.2025 02:20:39,918 - inputThread sdrInput _runThread [ERROR] error in sdr input routine
Traceback (most recent call last):
>   File "/opt/boswatch3/boswatch/inputSource/sdrInput.py", line 65, in _runThread
>     self.addToQueue(line)
>   ...
> ValueError: invalid literal for int() with base 10: ' '

Ursache:
Die Funktion `_getBitrateRicSubric()` in `pocsagDecoder.py` griff fest auf `data[40]` zu, um den SubRIC-Wert zu ermitteln. Bei Fehlerhaften Datensätzen von multimon-ng kann sich die Position jedoch verschieben, wodurch an dieser Stelle ein Leerzeichen (' ') statt einer Ziffer stand. Dies führte zu einem ValueError und damit zum Abbruch des gesamten SDR-Threads.

Änderung:
Die Funktion wurde auf robuste Regex-Analyse umgestellt (analog fmsDecoder.py und pocsagDecoder.py):
- Bitrate, Address (RIC) und Function (SubRIC) werden nun mit regulären Ausdrücken extrahiert.
- Die ursprüngliche Logik (`subric = int(Function) + 1`) bleibt vollständig erhalten.
- Enthält die Zeile keine gültige Function, wird eine Warnung geloggt ("Invalid POCSAG function (not 0–3)")
- Zusätzliche Fehlerabsicherung durch try/except.

Ergebnis:
Der Decoder ist nun tolerant gegenüber Formatabweichungen und verhindert Abstürze bei fehlerhaften oder unvollständigen multimon-ng-Zeilen.
2025-10-22 09:59:59 +02:00
Bastian Schroll 017e882363
Merge pull request #134 from KoenigMjr/feature/telegram-neu
Some checks failed
build_docs / Build documentation (push) Has been cancelled
CodeQL / CodeQL-Build (push) Has been cancelled
pytest / build (ubuntu-latest, 3.10) (push) Has been cancelled
pytest / build (ubuntu-latest, 3.11) (push) Has been cancelled
pytest / build (ubuntu-latest, 3.12) (push) Has been cancelled
pytest / build (ubuntu-latest, 3.13) (push) Has been cancelled
pytest / build (ubuntu-latest, 3.9) (push) Has been cancelled
build_docs / deploy (push) Has been cancelled
Telegram-Plugin Refactor
2025-10-22 08:53:40 +02:00
Bastian Schroll 2e5479cde2
Merge branch 'develop' into feature/telegram-neu 2025-10-21 13:39:04 +02:00
KoenigMjr 523329a9bb Doku-Ergänzung
update zur neuen Telegram Version

*in Konfiguration hinzugefügt:*
Startup_message
max_retries
initial_delay
max_delay

*gelöscht:*
queue

*im Beispiel:*
Startup_message hinzugefügt
2025-08-08 21:11:48 +02:00
KoenigMjr 6a0a59c3ac Telegram mit Warteschlange
Durch Einbau einer Warteschlange kein Datenverlust bei belegter API (Sendelimit ca. 30 Nachrichten/min, gibt aber Soft-Limit)

Exponentielles Backoff mit Maximalgrenze
Retry-Zähler mit Abbruch bei zu vielen Fehlversuchen
Kein Wiederholen bei permanenten Fehlern (400/401)
dynamische Zeitanpassung bei 429 Fehlern

Fehlerrobustheit verbessert hinsichtlich Connection Error

neues Plugin ohne telegram-bot

* Timeout (timeout=10),
* HTTP-Fehlerprüfung (raise_for_status()),
* Retry-Logik (3 Versuche mit wachsender Wartezeit),
* Sauberem Logging mit logger statt print).

send_location aus altem Skript übernommen und angepasst
2025-08-08 21:11:48 +02:00
3 changed files with 185 additions and 100 deletions

View file

@ -9,8 +9,8 @@ r"""!
German BOS Information Script
by Bastian Schroll
@file: pocsag.py
@date: 06.01.2018
@file: pocsagDecoder.py
@date: 15.10.2025
@author: Bastian Schroll
@description: Decoder class for pocsag
"""
@ -38,10 +38,15 @@ class PocsagDecoder:
@return BOSWatch POCSAG packet or None"""
bitrate, ric, subric = PocsagDecoder._getBitrateRicSubric(data)
if re.search("[0-9]{7}", ric) and re.search("[1-4]", subric):
# If no valid SubRIC (Function 03) detected → abort
if subric is None:
logging.warning("Invalid POCSAG function (not 03)")
return None
if ric and len(ric) == 7:
if "Alpha:" in data:
message = data.split('Alpha:')[1].strip()
message = message.replace('<NUL>', '').replace('<NUL', '').replace('< NUL>', '').replace('<EOT>', '').strip()
message = re.sub(r'<\s*(?:NUL|EOT)\s*>?', '', message).strip()
else:
message = ""
subricText = subric.replace("1", "a").replace("2", "b").replace("3", "c").replace("4", "d")
@ -63,27 +68,27 @@ class PocsagDecoder:
@staticmethod
def _getBitrateRicSubric(data):
r"""!Gets the Bitrate, Ric and Subric from data
@param data: POCSAG data string
@return bitrate
@return ric
@return subric"""
bitrate, ric, subric = "0", "0", "0"
"""Gets the Bitrate, Ric and Subric from data using robust regex parsing."""
bitrate = "0"
ric = None
subric = None
# determine bitrate
if "POCSAG512:" in data:
bitrate = "512"
ric = data[20:27].replace(" ", "").zfill(7)
subric = str(int(data[39]) + 1)
elif "POCSAG1200:" in data:
bitrate = "1200"
ric = data[21:28].replace(" ", "").zfill(7)
subric = str(int(data[40]) + 1)
elif "POCSAG2400:" in data:
bitrate = "2400"
ric = data[21:28].replace(" ", "").zfill(7)
subric = str(int(data[40]) + 1)
# extract RIC (address)
m_ric = re.search(r'Address:\s*(\d{1,7})(?=\s|$)', data)
if m_ric:
ric = m_ric.group(1).zfill(7)
# extract SubRIC (function)
m_sub = re.search(r'Function:\s*([0-3])', data)
if m_sub:
subric = str(int(m_sub.group(1)) + 1)
return bitrate, ric, subric

View file

@ -22,11 +22,14 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
|----|------------|-------|
|botToken|Der Api-Key des Telegram-Bots||
|chatIds|Liste mit Chat-Ids der Empfängers / der Emfänger-Gruppen||
|startup_message|Nachricht, dass das Telegram-Plugin erfolgreich geladen wurde|leer|
|message_fms|Format der Nachricht für FMS|`{FMS}`|
|message_pocsag|Format der Nachricht für Pocsag|`{RIC}({SRIC})\n{MSG}`|
|message_zvei|Format der Nachricht für ZVEI|`{TONE}`|
|message_msg|Format der Nachricht für MSG||
|queue|Aktivieren/Deaktivieren der MessageQueue|true|
|max_retries|Anzahl der Versuche, bis das Senden abgebrochen wird|5|
|initial_delay|Verzögerung des zweiten Sendeversuchs|2 [Sek.]|
|max_delay|Maximale Verzögerung|60 [Sek.]|
**Beispiel:**
```yaml
@ -35,6 +38,7 @@ Die abarbeitung der Alarmierungen erfolgt per Queue nach den Limits der Telegram
res: telegram
config:
message_pocsag: "{RIC}({SRIC})\n{MSG}"
startup_message: "Server up and running!"
botToken: "BOT_TOKEN"
chatIds:
- "CHAT_ID"

View file

@ -10,126 +10,202 @@ r"""!
by Bastian Schroll
@file: telegram.py
@date: 20.02.2020
@author: Jan Speller
@description: Telegram Plugin
@date: 12.07.2025
@author: Claus Schichl nach der Idee von Jan Speller
@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
"""
import logging
import time
import threading
import queue
import requests
from plugin.pluginBase import PluginBase
# ###################### #
# Custom plugin includes #
from telegram.error import (TelegramError, Unauthorized, BadRequest, TimedOut, NetworkError)
from telegram.ext import messagequeue as mq
from telegram.utils.request import Request
import telegram.bot
# ###################### #
logging.debug("- %s loaded", __name__)
# Setup Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
class MQBot(telegram.bot.Bot):
'''A subclass of Bot which delegates send method handling to MQ'''
# ===========================
# TelegramSender-Klasse
# ===========================
def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs):
super(MQBot, self).__init__(*args, **kwargs)
# below 2 attributes should be provided for decorator usage
self._is_messages_queued_default = is_queued_def
self._msg_queue = mqueue or mq.MessageQueue()
class TelegramSender:
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None):
self.bot_token = bot_token
self.chat_ids = chat_ids
self.max_retries = max_retries if max_retries is not None else 5
self.initial_delay = initial_delay if initial_delay is not None else 2
self.max_delay = max_delay if max_delay is not None else 300
self.msg_queue = queue.Queue()
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
def send_message(self, text):
for chat_id in self.chat_ids:
self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
def send_location(self, latitude, longitude):
for chat_id in self.chat_ids:
self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
def _worker_loop(self):
delay = self.initial_delay
while True:
try:
msg_type, chat_id, content, retry_count = self.msg_queue.get()
success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
if success:
delay = self.initial_delay
elif permanent_failure:
logger.error("Permanenter Fehler Nachricht wird verworfen.")
elif retry_count >= self.max_retries:
logger.error("Maximale Wiederholungsanzahl erreicht Nachricht wird verworfen.")
else:
logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
# Nutze den von Telegram gelieferten Wert (retry_after), falls vorhanden
wait_time = custom_delay if custom_delay is not None else delay
time.sleep(wait_time)
# Erhöhe Delay für den nächsten Versuch (exponentielles Backoff)
delay = min(delay * 2, self.max_delay)
except Exception as e:
logger.exception(f"Fehler im Telegram-Worker: {e}")
time.sleep(5)
def _send_to_telegram(self, msg_type, chat_id, content):
if msg_type == "text":
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {
'chat_id': chat_id,
'text': content
}
elif msg_type == "location":
url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
payload = {
'chat_id': chat_id,
**content
}
else:
logger.error("Unbekannter Nachrichtentyp.")
return False, True, None # Unbekannter Typ = permanent falsch
def __del__(self):
try:
self._msg_queue.stop()
except:
pass
custom_delay = None # Standardwert für Rückgabe, außer bei 429
@mq.queuedmessage
def send_message(self, *args, **kwargs):
'''Wrapped method would accept new `queued` and `isgroup`
OPTIONAL arguments'''
return super(MQBot, self).send_message(*args, **kwargs)
response = requests.post(url, data=payload, timeout=10)
if response.status_code == 429:
custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
logger.warning(f"Rate Limit erreicht warte {custom_delay} Sekunden.")
return False, False, custom_delay # Telegram gibt genaue Wartezeit vor
if response.status_code == 400:
logger.error("Ungültige Parameter Nachricht wird nicht erneut gesendet.")
return False, True, custom_delay # Permanent fehlerhaft
if response.status_code == 401:
logger.critical("Ungültiger Bot-Token bitte prüfen!")
return False, True, custom_delay # Permanent fehlerhaft
response.raise_for_status()
logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
return True, False, custom_delay
except requests.RequestException as e:
logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
return False, False, custom_delay
# ===========================
# BoswatchPlugin-Klasse
# ===========================
class BoswatchPlugin(PluginBase):
r"""!Description of the Plugin"""
def __init__(self, config):
r"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config'
def onLoad(self):
r"""!Called by import of the plugin"""
if self.config.get("queue", default=True):
q = mq.MessageQueue()
request = Request(con_pool_size=8)
self.bot = MQBot(token=self.config.get("botToken", default=""), request=request, mqueue=q)
print('queue')
else:
self.bot = telegram.Bot(token=self.config.get("botToken"))
print('normal')
bot_token = self.config.get("botToken")
chat_ids = self.config.get("chatIds", default=[])
if not bot_token or not chat_ids:
logger.error("botToken oder chatIds fehlen in der Konfiguration!")
return
# Konfigurierbare Parameter mit Fallback-Defaults
max_retries = self.config.get("max_retries")
initial_delay = self.config.get("initial_delay")
max_delay = self.config.get("max_delay")
self.sender = TelegramSender(
bot_token=bot_token,
chat_ids=chat_ids,
max_retries=max_retries,
initial_delay=initial_delay,
max_delay=max_delay
)
startup_message = self.config.get("startup_message")
if startup_message and startup_message.strip():
self.sender.send_message(startup_message)
def setup(self):
r"""!Called before alarm
Remove if not implemented"""
pass
def fms(self, bwPacket):
r"""!Called on FMS alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
self._sendMessage(msg)
self.sender.send_message(msg)
def pocsag(self, bwPacket):
r"""!Called on POCSAG alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
self._sendMessage(msg)
self.sender.send_message(msg)
if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None:
logging.debug("Found coordinates in packet")
(lat, lon) = (bwPacket.get("lat"), bwPacket.get("lon"))
self._sendLocation(lat, lon)
lat, lon = bwPacket.get("lat"), bwPacket.get("lon")
logger.debug("Koordinaten gefunden sende Standort.")
self.sender.send_location(lat, lon)
def zvei(self, bwPacket):
r"""!Called on ZVEI alarm
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
self._sendMessage(msg)
self.sender.send_message(msg)
def msg(self, bwPacket):
r"""!Called on MSG packet
@param bwPacket: bwPacket instance"""
msg = self.parseWildcards(self.config.get("message_msg"))
self._sendMessage(msg)
self.sender.send_message(msg)
def _sendMessage(self, message):
for chatId in self.config.get("chatIds", default=[]):
try:
# Send Message via Telegram
logging.info("Sending message to " + chatId)
self.bot.send_message(chat_id=chatId, text=message)
def teardown(self):
r"""!Called after alarm
Remove if not implemented"""
pass
except Unauthorized:
logging.exception("Error while sending Telegram Message, please Check your api-key")
except (TimedOut, NetworkError):
logging.exception("Error while sending Telegram Message, please Check your connectivity")
except (BadRequest, TelegramError):
logging.exception("Error while sending Telegram Message")
except Exception as e:
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
def _sendLocation(self, lat, lon):
for chatId in self.config.get("chatIds", default=[]):
try:
# Send Location via Telegram
if lat is not None and lon is not None:
logging.info("Sending location to " + chatId)
self.bot.sendLocation(chat_id=chatId, latitude=lat, longitude=lon)
except Unauthorized:
logging.exception("Error while sending Telegram Message, please Check your api-key")
except (TimedOut, NetworkError):
logging.exception("Error while sending Telegram Message, please Check your connectivity")
except (BadRequest, TelegramError):
logging.exception("Error while sending Telegram Message")
except Exception as e:
logging.exception("Unknown Error while sending Telegram Message: " + str(type(e).__name__) + ": " + str(e))
def onUnload(self):
r"""!Called by destruction of the plugin
Remove if not implemented"""
pass