[feat/multicast]: refactor: isolate module state by moving class variables to instance scope

This commit refactors the internal state management to ensure true
multi-instance capability. While previous commits shared state via class
variables, this change encapsulates all buffers and flags within the
individual instance.

Key changes:
- Migration of state: Moved `_tone_ric_packets`, `_last_tone_ric_time`,
  and processing flags from class variables to instance variables (`self`).
- Thread Isolation: Shifted the cleanup logic to a per-instance
  `_cleanup_worker` thread, ensuring that timeouts are managed
  independently for each route/configuration.
- Wildcard Safety: Isolated `_wildcards_registered` to prevent
  registration conflicts between multiple multicast instances.
- Robust Hard-Timeout: Simplified `_cleanup_hard_timeout` to act
  strictly on the instance's own state.

This refactoring resolves the "architectural dinosaur" of shared class
state, making the module fully thread-safe and reliable for complex
multi-route and multi-frequency deployments.
This commit is contained in:
KoenigMjr 2026-04-12 15:27:41 +02:00
parent 08d09b4f50
commit dada0d635b
2 changed files with 296 additions and 298 deletions

View file

@ -10,7 +10,7 @@ r"""!
by Bastian Schroll
@file: multicast.py
@date: 28.03.2026
@date: 13.04.2026
@author: Claus Schichl
@description: multicast module
"""
@ -36,21 +36,6 @@ class BoswatchModule(ModuleBase):
ensuring reliable alarm delivery even in complex multi-frequency scenarios.
"""
# CLASS VARIABLES - SHARED STATE
_tone_ric_packets = defaultdict(list)
_last_tone_ric_time = defaultdict(float)
_processing_text_ric = defaultdict(bool)
_processing_text_ric_started = defaultdict(float)
# SYSTEM VARIABLES
_lock = threading.Lock()
_cleanup_thread = None
_running = False
_wildcards_registered = set()
_packet_queue = []
_queue_lock = threading.Lock()
_instances = []
# Trigger defaults
_TRIGGER_HOST = "127.0.0.1"
_TRIGGER_PORT = 8080
@ -95,20 +80,26 @@ class BoswatchModule(ModuleBase):
self._trigger_host = self.config.get("triggerHost", default=self._TRIGGER_HOST)
self._trigger_port = int(self.config.get("triggerPort", default=self._TRIGGER_PORT))
# --- Per-instance state (replaces all former class-variables) ---
# Key: frequency string (e.g. "85.125M")
self._tone_ric_packets = defaultdict(list) # buffered tone-RICs per frequency
self._last_tone_ric_time = defaultdict(float) # last arrival time per frequency
self._processing_text_ric = defaultdict(bool) # text-RIC currently being processed?
self._processing_text_ric_started = defaultdict(float) # when did processing start?
self._wildcards_registered = set() # avoid double-registering wildcards
self._packet_queue = [] # deferred packets waiting for trigger
# --- Locks (only needed within this instance, no cross-instance sharing) ---
self._lock = threading.Lock()
self._queue_lock = threading.Lock()
# --- Per-instance cleanup thread ---
self._running = True
self._cleanup_thread = threading.Thread(target=self._cleanup_worker, daemon=True)
self._cleanup_thread.start()
logging.info("[%s] Multicast module loaded", self.name)
with BoswatchModule._lock:
if self not in BoswatchModule._instances:
BoswatchModule._instances.append(self)
if not BoswatchModule._running:
BoswatchModule._running = True
BoswatchModule._cleanup_thread = threading.Thread(
target=BoswatchModule._global_cleanup_worker, daemon=True
)
BoswatchModule._cleanup_thread.start()
logging.info("Global multicast cleanup thread started")
# ============================================================
# MAIN PROCESSING
# ============================================================
@ -158,13 +149,13 @@ class BoswatchModule(ModuleBase):
if self._text_rics:
is_text_ric = ric in self._text_rics and msg and msg.strip()
else:
with BoswatchModule._lock:
is_text_ric = msg and msg.strip() and len(BoswatchModule._tone_ric_packets[freq]) > 0
with self._lock:
is_text_ric = msg and msg.strip() and len(self._tone_ric_packets[freq]) > 0
if is_text_ric:
with BoswatchModule._lock:
BoswatchModule._processing_text_ric[freq] = True
BoswatchModule._processing_text_ric_started[freq] = time.time()
with self._lock:
self._processing_text_ric[freq] = True
self._processing_text_ric_started[freq] = time.time()
queued_packets = self._get_queued_packets()
incomplete_packets = None if is_text_ric else self._check_instance_auto_clear(freq)
@ -189,9 +180,9 @@ class BoswatchModule(ModuleBase):
if is_text_ric and msg:
logging.info("[%s] Text-RIC received: RIC=%s", self.name, ric)
alarm_packets = self._distribute_complete(freq, packet_dict)
with BoswatchModule._lock:
BoswatchModule._processing_text_ric[freq] = False
BoswatchModule._processing_text_ric_started.pop(freq, None)
with self._lock:
self._processing_text_ric[freq] = False
self._processing_text_ric_started.pop(freq, None)
if not alarm_packets:
logging.warning("[%s] No tone-RICs for text-RIC=%s", self.name, ric)
@ -265,22 +256,22 @@ class BoswatchModule(ModuleBase):
@param freq: Frequency identifier
@param packet_dict: Dictionary containing packet data
@return None"""
with BoswatchModule._lock:
with self._lock:
stored_packet = packet_dict.copy()
stored_packet['_multicast_timestamp'] = time.time()
BoswatchModule._tone_ric_packets[freq].append(stored_packet)
BoswatchModule._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp']
logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(BoswatchModule._tone_ric_packets[freq]), freq)
self._tone_ric_packets[freq].append(stored_packet)
self._last_tone_ric_time[freq] = stored_packet['_multicast_timestamp']
logging.debug("[%s] Tone-RIC added: RIC=%s (total: %d on %s)", self.name, stored_packet.get('ric'), len(self._tone_ric_packets[freq]), freq)
def _get_queued_packets(self):
r"""!Pop and return all packets currently in the static queue.
@param None
@return list: List of packets or None"""
with BoswatchModule._queue_lock:
if BoswatchModule._packet_queue:
packets = BoswatchModule._packet_queue[:]
BoswatchModule._packet_queue.clear()
with self._queue_lock:
if self._packet_queue:
packets = self._packet_queue[:]
self._packet_queue.clear()
return packets
return None
@ -314,11 +305,11 @@ class BoswatchModule(ModuleBase):
@param freq: Frequency identifier
@param text_packet_dict: Data of the message-carrying packet
@return list: List of fully populated Packet instances"""
with BoswatchModule._lock:
recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
logging.debug("Text-RIC gefunden. Matche gegen %d gespeicherte RICs", len(recipient_dicts))
BoswatchModule._tone_ric_packets[freq].clear()
BoswatchModule._last_tone_ric_time.pop(freq, None)
with self._lock:
recipient_dicts = self._tone_ric_packets[freq].copy()
logging.debug("Text RIC found. Matching against %d stored RICs", len(recipient_dicts))
self._tone_ric_packets[freq].clear()
self._last_tone_ric_time.pop(freq, None)
if not recipient_dicts:
return []
@ -365,7 +356,7 @@ class BoswatchModule(ModuleBase):
self._copy_packet_dict_to_packet(packet_dict, bwPacket, index=1)
self._apply_list_tags(bwPacket, [packet_dict])
self._set_mcast_metadata(bwPacket, "single", "single", packet_dict.get("ric", ""), "1", "1")
logging.debug("Erstelle Single-Alarm für RIC %s", packet_dict.get('ric'))
logging.debug("Creating single-alarm for RIC %s", packet_dict.get('ric'))
return [bwPacket]
def _handle_delimiter(self, freq, ric, bwPacket=None):
@ -375,11 +366,11 @@ class BoswatchModule(ModuleBase):
@param ric: Delimiter RIC
@param bwPacket: Optional delimiter packet instance
@return list: Incomplete packets or delimiter control packet"""
with BoswatchModule._lock:
orphaned = BoswatchModule._tone_ric_packets[freq].copy()
BoswatchModule._tone_ric_packets[freq].clear()
BoswatchModule._last_tone_ric_time.pop(freq, None)
BoswatchModule._processing_text_ric[freq] = False
with self._lock:
orphaned = self._tone_ric_packets[freq].copy()
self._tone_ric_packets[freq].clear()
self._last_tone_ric_time.pop(freq, None)
self._processing_text_ric[freq] = False
if orphaned:
age_seconds = time.time() - orphaned[0].get('_multicast_timestamp', time.time())
@ -405,7 +396,7 @@ class BoswatchModule(ModuleBase):
@param count: Total number of recipients
@param index: Current recipient index
@return None"""
logging.debug("Setze Metadata - Mode: %s, Role: %sr RIC: %s", mode, role, source)
logging.debug("setting Metadata - Mode: %s, Role: %s, Index: %s of %s for RIC: %s", mode, role, index, count, source)
mapping = {
"multicastMode": (mode, "{MCAST_MODE}"),
"multicastRole": (role, "{MCAST_ROLE}"),
@ -439,42 +430,25 @@ class BoswatchModule(ModuleBase):
@param wildcard: The wildcard string (e.g. {MCAST_MODE})
@param field: The packet field name
@return None"""
if wildcard not in BoswatchModule._wildcards_registered:
if wildcard not in self._wildcards_registered:
self.registerWildcard(wildcard, field)
BoswatchModule._wildcards_registered.add(wildcard)
self._wildcards_registered.add(wildcard)
# ============================================================
# CLEANUP & TIMEOUT MANAGEMENT
# ============================================================
@staticmethod
def _global_cleanup_worker():
r"""!Static background thread that ticks all active module instances.
@param None
@return None"""
logging.info("Global multicast cleanup ticker active")
while BoswatchModule._running:
def _cleanup_worker(self):
r"""!Per-instance background thread for timeout management."""
logging.info("[%s] Cleanup thread started", self.name)
while self._running:
time.sleep(1)
with BoswatchModule._lock:
active_instances = BoswatchModule._instances[:]
for instance in active_instances:
try:
instance._perform_instance_tick()
except Exception as e:
logging.error("Error in instance cleanup: %s", e)
try:
self._check_all_my_frequencies()
except Exception as e:
logging.error("[%s] Error in cleanup thread: %s", self.name, e)
if int(time.time()) % 60 == 0:
BoswatchModule._cleanup_hard_timeout_global()
def _perform_instance_tick(self):
r"""!Tick-entry point for this specific instance.
Acts as an extension hook for future per-instance tick logic
(e.g. statistics, heartbeat, watchdog). Do not call directly.
@param None
@return None"""
self._check_all_my_frequencies()
self._cleanup_hard_timeout()
def _check_all_my_frequencies(self):
r"""!Monitor timeouts for all frequencies assigned to this instance.
@ -484,27 +458,27 @@ class BoswatchModule(ModuleBase):
incomplete_packets = []
trigger_data = []
with BoswatchModule._lock:
with self._lock:
current_time = time.time()
for freq in list(self._my_frequencies):
if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
if freq not in self._tone_ric_packets or not self._tone_ric_packets[freq]:
continue
if BoswatchModule._processing_text_ric.get(freq, False):
flag_age = current_time - BoswatchModule._processing_text_ric_started.get(freq, current_time)
if self._processing_text_ric.get(freq, False):
flag_age = current_time - self._processing_text_ric_started.get(freq, current_time)
if flag_age > 2:
BoswatchModule._processing_text_ric[freq] = False
BoswatchModule._processing_text_ric_started.pop(freq, None)
self._processing_text_ric[freq] = False
self._processing_text_ric_started.pop(freq, None)
else:
continue
last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
last_time = self._last_tone_ric_time.get(freq, 0)
if current_time - last_time > self._auto_clear_timeout:
recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
recipient_dicts = self._tone_ric_packets[freq].copy()
safe_ric = recipient_dicts[0].get('ric', self._DEFAULT_TRIGGER_RIC)
trigger_data.append((freq, safe_ric))
BoswatchModule._tone_ric_packets[freq].clear()
BoswatchModule._last_tone_ric_time.pop(freq, None)
self._tone_ric_packets[freq].clear()
self._last_tone_ric_time.pop(freq, None)
logging.info("[%s] Auto-clear: %d tone-RICs on %s (Timeout %ds)", self.name, len(recipient_dicts), freq, self._auto_clear_timeout)
packets = self._create_incomplete_multicast(freq, recipient_dicts)
@ -512,8 +486,8 @@ class BoswatchModule(ModuleBase):
incomplete_packets.extend(packets)
if incomplete_packets:
with BoswatchModule._queue_lock:
BoswatchModule._packet_queue.extend(incomplete_packets)
with self._queue_lock:
self._packet_queue.extend(incomplete_packets)
for freq, safe_ric in trigger_data:
self._send_wakeup_trigger(freq, safe_ric)
@ -522,36 +496,30 @@ class BoswatchModule(ModuleBase):
@param freq: Frequency identifier
@return list: Incomplete packets if timeout exceeded, else None"""
with BoswatchModule._lock:
if freq not in BoswatchModule._tone_ric_packets or not BoswatchModule._tone_ric_packets[freq]:
with self._lock:
if freq not in self._tone_ric_packets or not self._tone_ric_packets[freq]:
return None
last_time = BoswatchModule._last_tone_ric_time.get(freq, 0)
last_time = self._last_tone_ric_time.get(freq, 0)
if time.time() - last_time > self._auto_clear_timeout:
recipient_dicts = BoswatchModule._tone_ric_packets[freq].copy()
BoswatchModule._tone_ric_packets[freq].clear()
BoswatchModule._last_tone_ric_time.pop(freq, None)
recipient_dicts = self._tone_ric_packets[freq].copy()
self._tone_ric_packets[freq].clear()
self._last_tone_ric_time.pop(freq, None)
logging.warning("[%s] Auto-clear (doWork): %d packets", self.name, len(recipient_dicts))
return self._create_incomplete_multicast(freq, recipient_dicts)
return None
@staticmethod
def _cleanup_hard_timeout_global():
r"""!Global failsafe for really old packets (ignores instance config).
@param None
@return None"""
with BoswatchModule._lock:
def _cleanup_hard_timeout(self):
r"""!Failsafe for really old packets."""
with self._lock:
current_time = time.time()
max_hard_timeout = 120
if BoswatchModule._instances:
max_hard_timeout = max(inst._hard_timeout for inst in BoswatchModule._instances)
for freq in list(BoswatchModule._tone_ric_packets.keys()):
BoswatchModule._tone_ric_packets[freq] = [
p for p in BoswatchModule._tone_ric_packets[freq]
if current_time - p.get('_multicast_timestamp', 0) < max_hard_timeout
for freq in list(self._tone_ric_packets.keys()):
self._tone_ric_packets[freq] = [
p for p in self._tone_ric_packets[freq]
if current_time - p.get('_multicast_timestamp', 0) < self._hard_timeout
]
if not BoswatchModule._tone_ric_packets[freq]:
del BoswatchModule._tone_ric_packets[freq]
# cleaning empty frequencies
if not self._tone_ric_packets[freq]:
del self._tone_ric_packets[freq]
# ============================================================
# TRIGGER SYSTEM
@ -568,7 +536,7 @@ class BoswatchModule(ModuleBase):
"ric": trigger_ric,
"subric": "1",
"subricText": "a",
"message": BoswatchModule._MAGIC_WAKEUP_MSG,
"message": self._MAGIC_WAKEUP_MSG,
"clientName": "MulticastTrigger",
"inputSource": "loopback",
"frequency": freq
@ -601,7 +569,5 @@ class BoswatchModule(ModuleBase):
@param None
@return None"""
with BoswatchModule._lock:
if self in BoswatchModule._instances:
BoswatchModule._instances.remove(self)
self._running = False
logging.debug("[%s] Multicast instance unloaded", self.name)