meshcore-packet-capture/packet_capture.py
2026-04-12 00:18:53 +03:00

3549 lines
167 KiB
Python

#!/usr/bin/env python3
"""
MeshCore Packet Capture Tool
Captures packets from MeshCore radios and outputs to console, file, and MQTT.
Compatible with both serial and BLE connections.
Usage:
python packet_capture.py [--output output.json] [--verbose] [--debug] [--no-mqtt]
Options:
--output Output file for packet data
--verbose Show JSON packet data
--debug Show detailed debugging info
--no-mqtt Disable MQTT publishing
The script captures packet metadata including SNR, RSSI, route type, payload type,
and raw hex data. Configuration is done via environment variables and .env files.
"""
import asyncio
import json
import logging
import hashlib
import time
import re
import os
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
import argparse
# Import meshcore from PyPI
import meshcore
from meshcore import EventType
# Import our enums for packet parsing
from enums import AdvertFlags, PayloadType, PayloadVersion, RouteType, DeviceRole
# Import MQTT client
try:
import paho.mqtt.client as mqtt
except ImportError:
print("Error: paho-mqtt not installed. Install with:")
print("pip install paho-mqtt")
exit(1)
# Import auth token module
try:
from auth_token import create_auth_token, create_auth_token_async, read_private_key_file
except ImportError:
print("Warning: auth_token.py not found - auth token authentication will not be available")
create_auth_token = None
create_auth_token_async = None
read_private_key_file = None
# Private key functionality using meshcore_py library
def get_transport(meshcore_instance):
"""Get transport from meshcore instance using the documented API structure.
Based on meshcore library structure:
- MeshCore.cx is a ConnectionManager
- ConnectionManager.connection is the actual connection (TCPConnection, BLEConnection, etc.)
- TCPConnection.transport is the asyncio transport object
Returns the transport object or None if not available.
Note: This function only returns a reference to the existing transport object
owned by the meshcore instance. It does not create new objects or store references.
Transport objects are cleaned up automatically when meshcore.disconnect() is called
or when the meshcore instance is garbage collected.
"""
if not meshcore_instance:
return None
try:
# MeshCore.cx is a ConnectionManager
if hasattr(meshcore_instance, 'cx'):
connection_manager = meshcore_instance.cx
# ConnectionManager.connection is the actual connection object
if hasattr(connection_manager, 'connection'):
connection = connection_manager.connection
# TCPConnection has a transport attribute
if hasattr(connection, 'transport'):
transport = connection.transport
if transport is not None:
return transport
except Exception:
pass
return None
def enable_tcp_keepalive(transport, idle=10, interval=5, count=3):
"""Enable TCP keepalive on the transport's socket.
Supports multiple transport types:
- asyncio transport with get_extra_info('socket')
- Direct socket objects
- Objects with _socket attribute
"""
import socket
sock = None
# Try to get socket from transport using get_extra_info
if hasattr(transport, 'get_extra_info'):
try:
sock = transport.get_extra_info('socket')
except Exception:
pass
# If not found, check if transport is a socket directly
if sock is None:
if isinstance(transport, socket.socket):
sock = transport
elif hasattr(transport, '_socket'):
try:
sock = transport._socket
except Exception:
pass
if sock is None:
return False
try:
# Enable TCP keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Platform-specific keepalive settings
# Linux and some BSD systems
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)
# macOS uses different constant names
elif hasattr(socket, 'TCP_KEEPALIVE'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, idle)
if hasattr(socket, 'TCP_KEEPINTVL'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
if hasattr(socket, 'TCP_KEEPCNT'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count)
return True
except Exception as e:
# Log but don't fail the connection
print(f"Warning: Could not enable TCP keepalive: {e}")
return False
def load_env_files():
"""Load environment variables from .env and .env.local files"""
script_dir = os.path.dirname(os.path.abspath(__file__))
env_file = os.path.join(script_dir, '.env')
env_local_file = os.path.join(script_dir, '.env.local')
def parse_env_file(filepath):
"""Parse a .env file and return a dictionary"""
env_vars = {}
if not os.path.exists(filepath):
return env_vars
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
# Parse KEY=VALUE
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
# Remove inline comments (everything after #)
if '#' in value:
value = value.split('#')[0].strip()
# Remove quotes if present
if value and value[0] in ('"', "'") and value[-1] == value[0]:
value = value[1:-1]
env_vars[key] = value
return env_vars
# Load .env first (defaults)
env_vars = parse_env_file(env_file)
# Load .env.local (overrides)
local_vars = parse_env_file(env_local_file)
env_vars.update(local_vars)
# Set environment variables
for key, value in env_vars.items():
if key not in os.environ:
os.environ[key] = value
return env_vars
# Load environment configuration
load_env_files()
class PacketCapture:
"""Standalone packet capture using meshcore package"""
def __init__(self, output_file: Optional[str] = None, verbose: bool = False, debug: bool = False, enable_mqtt: bool = True, shutdown_event=None):
self.output_file = output_file
self.verbose = verbose
self.debug = debug
self.enable_mqtt = enable_mqtt
self.shutdown_event = shutdown_event
# Setup logging
self.setup_logging()
# Global IATA for template resolution
self.global_iata = os.getenv('PACKETCAPTURE_IATA', 'LOC').lower()
# Connection
self.meshcore = None
self.connected = False
self.connection_type = None # Track connection type for health checks
self.connection_retry_count = 0
self.max_connection_retries = self.get_env_int('MAX_CONNECTION_RETRIES', 5)
self.connection_retry_delay = self.get_env_int('CONNECTION_RETRY_DELAY', 5)
self.connection_retry_delay_max = self.get_env_int('CONNECTION_RETRY_DELAY_MAX', 300) # 5 minutes max
self.connection_retry_backoff_multiplier = self.get_env_float('CONNECTION_RETRY_BACKOFF_MULTIPLIER', 2.0)
self.connection_retry_jitter = self.get_env_bool('CONNECTION_RETRY_JITTER', True)
self.health_check_interval = self.get_env_int('HEALTH_CHECK_INTERVAL', 30)
# Health check grace period for BLE connections
self.health_check_grace_period = self.get_env_int('HEALTH_CHECK_GRACE_PERIOD', 2) # Allow 2 consecutive failures
self.health_check_failure_count = 0 # Track consecutive health check failures
# Retry configuration
self.default_retry_limit = self.get_env_int('DEVICE_COMMAND_RETRY_LIMIT', 3) # Default retries for device commands
self.ble_retry_limit = self.get_env_int('BLE_COMMAND_RETRY_LIMIT', 3) # Retries for BLE connections
self.tcp_retry_limit = self.get_env_int('TCP_COMMAND_RETRY_LIMIT', 2) # Retries for TCP connections
self.health_check_retry_limit = self.get_env_int('HEALTH_CHECK_RETRY_LIMIT', None) # Override for health checks (None = use connection-specific)
self.stats_retry_limit = self.get_env_int('STATS_RETRY_LIMIT', 2) # Retries for stats queries (non-critical)
self.device_info_retry_limit = self.get_env_int('DEVICE_INFO_RETRY_LIMIT', 2) # Retries for device info queries
# MQTT connection
self.mqtt_clients = [] # List of MQTT client info dictionaries
self.mqtt_connected = False
self.should_exit = False # Flag to exit when reconnection attempts fail
# Stats/status publishing
self.stats_status_enabled = self.get_env_bool('STATS_IN_STATUS_ENABLED', True)
self.stats_refresh_interval = self.get_env_int('STATS_REFRESH_INTERVAL', 300) # seconds
self.latest_stats = None
self.last_stats_fetch = 0
self.stats_supported = False
self.stats_capability_state = None
self.stats_update_task = None
self.stats_fetch_lock = asyncio.Lock()
# Service-level failure tracking for systemd restart
self.service_failure_count = 0
self.max_service_failures = self.get_env_int('MAX_SERVICE_FAILURES', 3)
self.service_failure_window = self.get_env_int('SERVICE_FAILURE_WINDOW', 300) # 5 minutes
self.last_service_failure = 0
self.critical_failure_threshold = self.get_env_int('CRITICAL_FAILURE_THRESHOLD', 5)
# Track consecutive failures for more intelligent failure detection
self.consecutive_connection_failures = 0
self.consecutive_mqtt_failures = 0
self.max_consecutive_failures = self.get_env_int('MAX_CONSECUTIVE_FAILURES', 3)
# MQTT failure tracking with grace period
self.mqtt_health_check_interval = self.get_env_int('MQTT_HEALTH_CHECK_INTERVAL', 60) # Check every minute
self.mqtt_grace_period = self.get_env_int('MQTT_GRACE_PERIOD', 180) # 3 minutes grace before counting failures
self.mqtt_disconnect_timestamps = {} # Track when brokers disconnected: {broker_num: timestamp}
# Packet correlation cache
self.rf_data_cache = {}
self.recent_rf_packets = {}
self.raw_duplicate_window = self.get_env_float('RAW_DUPLICATE_WINDOW', 2.0)
# When True (default), call get_msg() on MESSAGES_WAITING to drain the device message queue.
# Set PACKETCAPTURE_DRAIN_MESSAGES=false to capture RF packets only without pulling stored mesh messages.
self.drain_messages = self.get_env_bool('DRAIN_MESSAGES', True)
self.packet_count = 0
# Device information
self.device_name = None
self.device_public_key = None
self.device_private_key = None
self.radio_info = None
self.cached_firmware_info = None # Cache firmware info to avoid queries during shutdown
# Private key export capability
self.private_key_export_available = False
# JWT token management
self.jwt_tokens = {} # Store tokens per broker: {broker_num: {'token': str, 'expires_at': float}}
self.jwt_renewal_interval = self.get_env_int('JWT_RENEWAL_INTERVAL', 3600) # Check every hour
self.jwt_renewal_threshold = self.get_env_int('JWT_RENEWAL_THRESHOLD', 300) # Renew 5 minutes before expiry
# Advert settings
self.advert_interval_hours = self.get_env_int('ADVERT_INTERVAL_HOURS', 47)
self.last_advert_time = 0
self.advert_task = None
# Load persisted advert state
self.last_advert_time = self._load_advert_state()
# Packet type filtering for uploads
upload_types_str = self.get_env('UPLOAD_PACKET_TYPES', '').strip()
if upload_types_str:
self.allowed_upload_types = set(t.strip() for t in upload_types_str.split(','))
self.logger.info(f"Packet type upload filter enabled: {sorted(self.allowed_upload_types)}")
else:
self.allowed_upload_types = None # None means upload all (default)
# JWT renewal task
self.jwt_renewal_task = None
# Task tracking to prevent duplicate tasks
self.active_tasks = set()
self.jwt_renewal_in_progress = False
# TCP keepalive settings
self.tcp_keepalive_enabled = self.get_env_bool('TCP_KEEPALIVE_ENABLED', True)
self.tcp_keepalive_idle = self.get_env_int('TCP_KEEPALIVE_IDLE', 10)
self.tcp_keepalive_interval = self.get_env_int('TCP_KEEPALIVE_INTERVAL', 5)
self.tcp_keepalive_count = self.get_env_int('TCP_KEEPALIVE_COUNT', 3)
# SDK auto-reconnect settings for TCP
self.tcp_sdk_auto_reconnect_enabled = self.get_env_bool('TCP_SDK_AUTO_RECONNECT_ENABLED', True)
self.tcp_sdk_max_reconnect_attempts = self.get_env_int('TCP_SDK_MAX_RECONNECT_ATTEMPTS', 100)
self.sdk_reconnect_exhausted = False # Track if SDK auto-reconnect has given up (TCP only)
# Circuit breaker for JWT failures
self.jwt_failure_count = 0
self.max_jwt_failures = 5
self.jwt_circuit_breaker_timeout = 300 # 5 minutes
self.jwt_circuit_breaker_reset_time = 0
# Resource monitoring
self.max_active_tasks = 100 # Prevent task explosion
self.task_monitoring_interval = 60 # Check every minute
self.last_task_check = 0
# Output file handle
self.output_handle = None
if self.output_file:
self.output_handle = open(self.output_file, 'w')
self.logger.info(f"Output will be written to: {self.output_file}")
def setup_logging(self):
"""Setup logging configuration"""
# Clear any existing handlers to avoid conflicts
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
# Get log level from environment variable
log_level_str = self.get_env('LOG_LEVEL', 'INFO').upper()
log_level_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}
log_level = log_level_map.get(log_level_str, logging.INFO)
# Create a custom formatter with timestamp
formatter = logging.Formatter(
fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Create console handler with the formatter
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# Configure root logger
logging.basicConfig(
level=log_level,
handlers=[console_handler],
force=True
)
self.logger = logging.getLogger('PacketCapture')
# Test the logging format
self.logger.info(f"Logging initialized with level: {log_level_str}")
def get_env(self, key, fallback=''):
"""Get environment variable with fallback (all vars are PACKETCAPTURE_ prefixed)"""
full_key = f"PACKETCAPTURE_{key}"
return os.getenv(full_key, fallback)
def get_env_bool(self, key, fallback=False):
"""Get boolean environment variable"""
value = self.get_env(key, str(fallback)).lower()
return value in ('true', '1', 'yes', 'on')
def get_env_int(self, key, fallback=0):
"""Get integer environment variable"""
try:
return int(self.get_env(key, str(fallback)))
except ValueError:
return fallback
def get_env_float(self, key, fallback=0.0):
"""Get float environment variable"""
try:
return float(self.get_env(key, str(fallback)))
except ValueError:
return fallback
def _get_state_file_path(self):
"""Get the path to the state file for persisting last_advert_time.
Works across all installation methods:
- Docker: Uses /app/data/ (mounted volume)
- NixOS: Uses cfg.dataDir (working directory)
- Systemd: Uses script directory or data subdirectory
"""
script_dir = os.path.dirname(os.path.abspath(__file__))
# Try data subdirectory first (works for Docker and if created)
data_dir = os.path.join(script_dir, 'data')
if os.path.exists(data_dir) and os.path.isdir(data_dir):
return os.path.join(data_dir, 'advert_state.json')
# Fall back to script directory (works for all installation methods)
return os.path.join(script_dir, 'advert_state.json')
def _load_advert_state(self):
"""Load last_advert_time from persistent state file.
Returns the timestamp if found, otherwise returns 0.
"""
state_file = self._get_state_file_path()
if not os.path.exists(state_file):
if self.debug:
self.logger.debug(f"Advert state file not found: {state_file}")
return 0
try:
with open(state_file, 'r') as f:
state = json.load(f)
last_time = state.get('last_advert_time', 0)
# Validate the timestamp is reasonable (not in the future, not too old)
current_time = time.time()
if last_time > current_time:
# Timestamp is in the future, ignore it
if self.debug:
self.logger.debug(f"Advert state timestamp is in the future, ignoring: {last_time}")
return 0
# If timestamp is more than 1 year old, treat as invalid
if current_time - last_time > 31536000: # 1 year in seconds
if self.debug:
self.logger.debug(f"Advert state timestamp is too old, ignoring: {last_time}")
return 0
if self.debug:
self.logger.debug(f"Loaded last_advert_time from state file: {last_time} ({datetime.fromtimestamp(last_time).isoformat()})")
return last_time
except (json.JSONDecodeError, IOError, OSError) as e:
self.logger.warning(f"Failed to load advert state from {state_file}: {e}")
return 0
def _save_advert_state(self):
"""Save last_advert_time to persistent state file."""
state_file = self._get_state_file_path()
state_dir = os.path.dirname(state_file)
try:
# Create directory if it doesn't exist (for data subdirectory case)
if state_dir and not os.path.exists(state_dir):
os.makedirs(state_dir, mode=0o755, exist_ok=True)
state = {
'last_advert_time': self.last_advert_time,
'updated_at': time.time()
}
# Write atomically using a temporary file
temp_file = state_file + '.tmp'
with open(temp_file, 'w') as f:
json.dump(state, f, indent=2)
# Atomic rename
os.replace(temp_file, state_file)
if self.debug:
self.logger.debug(f"Saved last_advert_time to state file: {self.last_advert_time} ({datetime.fromtimestamp(self.last_advert_time).isoformat()})")
except (IOError, OSError) as e:
self.logger.warning(f"Failed to save advert state to {state_file}: {e}")
def calculate_connection_retry_delay(self, attempt: int) -> float:
"""Calculate exponential backoff delay with jitter for connection retries"""
import random
# Calculate exponential backoff: base_delay * (multiplier ^ (attempt - 1))
delay = self.connection_retry_delay * (self.connection_retry_backoff_multiplier ** (attempt - 1))
# Cap at maximum delay
delay = min(delay, self.connection_retry_delay_max)
# Add jitter to prevent thundering herd (random factor between 0.5 and 1.5)
if self.connection_retry_jitter:
jitter_factor = random.uniform(0.5, 1.5)
delay *= jitter_factor
return max(1.0, delay) # Minimum 1 second delay
def track_service_failure(self, failure_type: str, details: str = ""):
"""Track service-level failures and determine if we should exit for systemd restart"""
import time
current_time = time.time()
# Reset failure count if outside the failure window
if current_time - self.last_service_failure > self.service_failure_window:
self.service_failure_count = 0
self.service_failure_count += 1
self.last_service_failure = current_time
self.logger.error(f"Service failure #{self.service_failure_count}: {failure_type}")
if details:
self.logger.error(f"Failure details: {details}")
# Check if we should exit for systemd restart
if self.service_failure_count >= self.max_service_failures:
self.logger.critical(f"Maximum service failures ({self.max_service_failures}) reached within {self.service_failure_window}s window")
self.logger.critical("Exiting to allow systemd to restart the service with fresh state")
self.should_exit = True
return True
return False
def track_consecutive_failure(self, failure_type: str) -> bool:
"""Track consecutive failures and determine if they warrant a service failure"""
if failure_type == "connection":
self.consecutive_connection_failures += 1
self.consecutive_mqtt_failures = 0 # Reset other type
elif failure_type == "mqtt":
self.consecutive_mqtt_failures += 1
self.consecutive_connection_failures = 0 # Reset other type
# Check if consecutive failures warrant a service failure
if (self.consecutive_connection_failures >= self.max_consecutive_failures or
self.consecutive_mqtt_failures >= self.max_consecutive_failures):
failure_details = f"Consecutive {failure_type} failures: {self.consecutive_connection_failures if failure_type == 'connection' else self.consecutive_mqtt_failures}"
return self.track_service_failure(f"Consecutive {failure_type} failures", failure_details)
return False
def reset_consecutive_failures(self, failure_type: str):
"""Reset consecutive failure count when connection is restored"""
if failure_type == "connection":
self.consecutive_connection_failures = 0
elif failure_type == "mqtt":
self.consecutive_mqtt_failures = 0
async def wait_with_shutdown(self, timeout: float) -> bool:
"""Wait for specified time but return immediately if shutdown is requested"""
if self.shutdown_event:
try:
await asyncio.wait_for(self.shutdown_event.wait(), timeout=timeout)
return True # Shutdown was requested
except asyncio.TimeoutError:
return False # Timeout reached, no shutdown
else:
await asyncio.sleep(timeout)
return False
async def retryable_device_command(self, command_func, command_name: str,
timeout: float = 10.0, max_retries: int = None,
retry_delay: float = 0.2, backoff_multiplier: float = 1.5):
"""
Execute a device command with timeout and retry logic.
Args:
command_func: Async function that returns a meshcore Event
command_name: Name of the command for logging
timeout: Timeout in seconds for each attempt
max_retries: Maximum number of retry attempts (including initial attempt)
If None, uses connection-specific default from environment variables
retry_delay: Initial delay between retries in seconds
backoff_multiplier: Multiplier for exponential backoff
Returns:
Event object from the command, or None if all retries failed
"""
if not self._ensure_connected(command_name, "debug"):
return None
# Use connection-specific default if max_retries not specified
if max_retries is None:
if self.connection_type == 'ble':
max_retries = self.ble_retry_limit
elif self.connection_type == 'tcp':
max_retries = self.tcp_retry_limit
else:
max_retries = self.default_retry_limit
last_error = None
current_delay = retry_delay
for attempt in range(max_retries):
try:
# Add small delay between retries (except first attempt)
if attempt > 0:
await asyncio.sleep(current_delay)
current_delay *= backoff_multiplier # Exponential backoff
# Execute command with timeout
result = await asyncio.wait_for(
command_func(),
timeout=timeout
)
# Check if result is an error
if result and hasattr(result, 'type'):
if result.type == EventType.ERROR:
error_payload = result.payload if hasattr(result, 'payload') else {}
error_reason = error_payload.get('reason', 'unknown')
# Check if it's a transient error that we should retry
if error_reason == 'no_event_received' and attempt < max_retries - 1:
last_error = f"{command_name} failed: {error_reason}"
if self.debug:
self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
# Permanent error or last attempt
self.logger.debug(f"{command_name} failed: {error_payload}")
return result
else:
# Success - return the result
if attempt > 0:
self.logger.debug(f"{command_name} succeeded on attempt {attempt + 1}")
return result
else:
# Unexpected result format
self.logger.debug(f"{command_name} returned unexpected result format")
return result
except asyncio.TimeoutError:
last_error = f"{command_name} timed out after {timeout}s"
if attempt < max_retries - 1:
if self.debug:
self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
self.logger.debug(f"{last_error} (all {max_retries} attempts exhausted)")
return None
except Exception as e:
last_error = f"{command_name} raised exception: {e}"
if attempt < max_retries - 1:
if self.debug:
self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
self.logger.debug(f"{last_error} (all {max_retries} attempts exhausted)")
return None
# All retries failed
if last_error:
self.logger.debug(f"{command_name} failed after {max_retries} attempts: {last_error}")
return None
def should_exit_for_systemd_restart(self) -> bool:
"""Determine if we should exit to allow systemd restart"""
import time
# Check for critical failure threshold
if self.service_failure_count >= self.critical_failure_threshold:
self.logger.critical(f"Critical failure threshold ({self.critical_failure_threshold}) reached")
return True
# Check for recent failure pattern
current_time = time.time()
if (current_time - self.last_service_failure) < self.service_failure_window:
if self.service_failure_count >= self.max_service_failures:
self.logger.critical(f"Too many failures ({self.service_failure_count}) in {self.service_failure_window}s")
return True
return False
def _load_client_version(self):
"""Load client version from .version_info file or git"""
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
version_file = os.path.join(script_dir, '.version_info')
# First try to load from .version_info file (created by installer)
if os.path.exists(version_file):
with open(version_file, 'r') as f:
version_data = json.load(f)
installer_ver = version_data.get('installer_version', 'unknown')
git_hash = version_data.get('git_hash', 'unknown')
return f"meshcore-packet-capture/{installer_ver}-{git_hash}"
# Fallback: try to get git information directly
try:
import subprocess
result = subprocess.run(['git', 'rev-parse', '--short', 'HEAD'],
cwd=script_dir, capture_output=True, text=True, timeout=5)
if result.returncode == 0:
git_hash = result.stdout.strip()
return f"meshcore-packet-capture/dev-{git_hash}"
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
except Exception as e:
self.logger.debug(f"Could not load version info: {e}")
# Final fallback
return "meshcore-packet-capture/unknown"
async def get_firmware_info(self):
"""Get firmware information from meshcore device using send_device_query()"""
try:
# During shutdown, always use cached info - don't query the device
if self.should_exit:
if self.cached_firmware_info:
self.logger.debug("Using cached firmware info (shutdown in progress)")
return self.cached_firmware_info
else:
self.logger.debug("No cached firmware info available during shutdown")
return {"model": "unknown", "version": "unknown"}
# Return cached info if available and device is not connected
if self.cached_firmware_info and (not self.meshcore or not self.meshcore.is_connected):
self.logger.debug("Using cached firmware info")
return self.cached_firmware_info
if not self._ensure_connected("get_firmware_info", "debug"):
return {"model": "unknown", "version": "unknown"}
self.logger.debug("Querying device for firmware info...")
# Use send_device_query() to get firmware version with retry logic
# Use connection-specific retry limit
result = await self.retryable_device_command(
lambda: self.meshcore.commands.send_device_query(),
"send_device_query",
timeout=10.0,
max_retries=None # Use connection-specific default
)
if result is None:
self.logger.debug("Device query failed after retries")
return {"model": "unknown", "version": "unknown"}
self.logger.debug(f"Device query result type: {result.type}")
self.logger.debug(f"Device query result: {result}")
if result.type == EventType.ERROR:
self.logger.debug(f"Device query failed: {result}")
return {"model": "unknown", "version": "unknown"}
if result.payload:
payload = result.payload
self.logger.debug(f"Device query payload: {payload}")
# Check firmware version format
fw_ver = payload.get('fw ver', 0)
self.logger.debug(f"Firmware version number: {fw_ver}")
if fw_ver >= 3:
# For newer firmware versions (v3+)
model = payload.get('model', 'Unknown')
version = payload.get('ver', 'Unknown')
build_date = payload.get('fw_build', 'Unknown')
# Remove 'v' prefix from version if it already has one
if version.startswith('v'):
version = version[1:]
version_str = f"v{version} (Build: {build_date})"
self.logger.debug(f"New firmware format - Model: {model}, Version: {version_str}")
firmware_info = {"model": model, "version": version_str}
self.cached_firmware_info = firmware_info # Cache the result
return firmware_info
else:
# For older firmware versions
version_str = f"v{fw_ver}"
self.logger.debug(f"Old firmware format - Model: unknown, Version: {version_str}")
firmware_info = {"model": "unknown", "version": version_str}
self.cached_firmware_info = firmware_info # Cache the result
return firmware_info
self.logger.debug("No payload in device query result")
return {"model": "unknown", "version": "unknown"}
except Exception as e:
self.logger.debug(f"Error getting firmware info: {e}")
return {"model": "unknown", "version": "unknown"}
def resolve_topic_template(self, template, broker_num=None):
"""Resolve topic template with {IATA}, {IATA_lower}, and {PUBLIC_KEY} placeholders"""
if not template:
return template
# Get IATA - broker-specific or global
iata = self.global_iata
if broker_num:
broker_iata = self.get_env(f'MQTT{broker_num}_IATA', '')
if broker_iata:
iata = broker_iata.lower()
# Replace template variables
resolved = template.replace('{IATA}', iata.upper()) # Uppercase variant
resolved = resolved.replace('{IATA_lower}', iata.lower()) # Lowercase variant
resolved = resolved.replace('{PUBLIC_KEY}', self.device_public_key if self.device_public_key and self.device_public_key != 'Unknown' else 'DEVICE')
return resolved
def is_letsmesh_broker(self, broker_num=None) -> bool:
"""Detect if the given broker is a Let's Mesh Analyzer broker by hostname or token audience."""
server = None
audience = None
if broker_num:
server = self.get_env(f'MQTT{broker_num}_SERVER', '')
audience = self.get_env(f'MQTT{broker_num}_TOKEN_AUDIENCE', '')
if not server:
server = self.get_env('MQTT1_SERVER', '')
if not audience:
audience = self.get_env('MQTT1_TOKEN_AUDIENCE', '')
host = (server or '').lower()
aud = (audience or '').lower()
return ('letsmesh.net' in host) or ('letsmesh.net' in aud)
def has_configured_iata(self, broker_num=None) -> bool:
"""Return True if a non-default IATA code is configured (not 'LOC')."""
iata = self.global_iata or ''
if broker_num:
broker_iata = self.get_env(f'MQTT{broker_num}_IATA', '')
if broker_iata:
iata = broker_iata.lower()
return bool(iata) and iata.lower() != 'loc'
def broker_requires_iata(self, broker_num) -> bool:
"""Check if a broker requires IATA configuration.
Returns True if:
- It's a Let's Mesh Analyzer broker, OR
- It has explicitly configured topics that use IATA placeholders"""
# Check if it's a Let's Mesh broker
if self.is_letsmesh_broker(broker_num):
return True
# Check if any configured topics use IATA placeholders
topic_types = ['STATUS', 'PACKETS', 'DECODED', 'DEBUG', 'RAW']
for topic_type in topic_types:
# Check broker-specific topic
broker_topic = self.get_env(f'MQTT{broker_num}_TOPIC_{topic_type}', '')
if broker_topic and ('{IATA}' in broker_topic or '{IATA_lower}' in broker_topic):
return True
# Check global topic (only if no broker-specific topic)
if not broker_topic:
global_topic = self.get_env(f'TOPIC_{topic_type}', '')
if global_topic and ('{IATA}' in global_topic or '{IATA_lower}' in global_topic):
return True
return False
def get_topic(self, topic_type, broker_num=None):
"""Get topic with template resolution, checking broker-specific override first"""
topic_type_upper = topic_type.upper()
# Check broker-specific topic override
if broker_num:
broker_topic = self.get_env(f'MQTT{broker_num}_TOPIC_{topic_type_upper}', '')
if broker_topic:
return self.resolve_topic_template(broker_topic, broker_num)
# Fall back to global topic
global_topic = self.get_env(f'TOPIC_{topic_type_upper}', '')
if global_topic:
return self.resolve_topic_template(global_topic, broker_num)
# For RAW topic, don't provide a default - only publish if explicitly configured
if topic_type_upper == 'RAW':
if self.debug:
self.logger.debug(f"No RAW topic configured for broker {broker_num}, skipping RAW publish")
return None
# Defaulting policy adjustment:
# - Never use classic defaults (meshcore/status, meshcore/packets, etc.) for Let's Mesh Analyzer brokers
# - Prefer IATA-based defaults when IATA is configured
# - Only on custom brokers without IATA configured, fall back to classic defaults
is_letsmesh = self.is_letsmesh_broker(broker_num)
iata_configured = self.has_configured_iata(broker_num)
iata_defaults = {
'STATUS': 'meshcore/{IATA}/{PUBLIC_KEY}/status',
'PACKETS': 'meshcore/{IATA}/{PUBLIC_KEY}/packets',
'DECODED': 'meshcore/{IATA}/{PUBLIC_KEY}/decoded',
'DEBUG': 'meshcore/{IATA}/{PUBLIC_KEY}/debug'
}
classic_defaults = {
'STATUS': 'meshcore/status',
'PACKETS': 'meshcore/packets',
'DECODED': 'meshcore/decoded',
'DEBUG': 'meshcore/debug'
}
if iata_configured:
chosen_default = iata_defaults.get(topic_type_upper, f"meshcore/{{IATA}}/{{PUBLIC_KEY}}/{topic_type.lower()}")
else:
if is_letsmesh:
if self.debug:
self.logger.debug(f"Skipping default '{topic_type}' topic for Let's Mesh broker {broker_num} because IATA is not configured")
return None
chosen_default = classic_defaults.get(topic_type_upper, f'meshcore/{topic_type.lower()}')
resolved = self.resolve_topic_template(chosen_default, broker_num)
if self.debug:
self.logger.debug(f"Using default topic for {topic_type}: {resolved}")
return resolved
async def set_radio_clock(self) -> bool:
"""Set radio clock only if device time is earlier than current system time"""
try:
if not self._ensure_connected("set_radio_clock", "warning"):
return False
# Get current device time with retry logic
self.logger.info("Checking device time...")
time_result = await self.retryable_device_command(
lambda: self.meshcore.commands.get_time(),
"get_time",
timeout=8.0,
max_retries=self.device_info_retry_limit, # Use device info retry limit
retry_delay=0.2
)
if time_result is None or time_result.type == EventType.ERROR:
self.logger.warning("Device does not support time commands")
return False
device_time = time_result.payload.get('time', 0)
current_time = int(time.time())
self.logger.info(f"Device time: {device_time}, System time: {current_time}")
# Only set time if device time is earlier than current time
if device_time < current_time:
time_diff = current_time - device_time
self.logger.info(f"Device time is {time_diff} seconds behind, updating...")
result = await self.retryable_device_command(
lambda: self.meshcore.commands.set_time(current_time),
"set_time",
timeout=8.0,
max_retries=self.device_info_retry_limit, # Use device info retry limit
retry_delay=0.2
)
if result and result.type == EventType.OK:
self.logger.info(f"✓ Radio clock updated to: {current_time}")
self.last_clock_sync_time = current_time
return True
else:
self.logger.warning(f"Failed to update radio clock: {result}")
return False
else:
self.logger.info("Device time is current or ahead - no update needed")
return True
except Exception as e:
self.logger.warning(f"Error checking/setting radio clock: {e}")
return False
async def fetch_private_key_from_device(self) -> bool:
"""Fetch private key from device using meshcore library"""
try:
self.logger.info("Fetching private key from device...")
if not self._ensure_connected("fetch_private_key_from_device", "error"):
return False
# Use meshcore library to export private key with retry logic
# Use connection-specific retry limit (defaults to 3 for BLE, 2 for TCP)
result = await self.retryable_device_command(
lambda: self.meshcore.commands.export_private_key(),
"export_private_key",
timeout=10.0,
max_retries=None, # Use connection-specific default
retry_delay=0.3 # Slightly longer delay for private key operations
)
if result is None:
self.logger.error("Error fetching private key: command failed after retries")
self.private_key_export_available = False
return False
if result.type == EventType.PRIVATE_KEY:
self.device_private_key = result.payload["private_key"]
self.logger.info("✓ Private key fetched successfully from device")
self.private_key_export_available = True
return True
elif result.type == EventType.DISABLED:
self.logger.warning("Private key export is disabled on this device")
self.logger.info("This feature requires:")
self.logger.info(" - Companion radio firmware")
self.logger.info(" - ENABLE_PRIVATE_KEY_EXPORT=1 compile-time flag")
self.private_key_export_available = False
return False
elif result.type == EventType.ERROR:
self.logger.error(f"Error fetching private key: {result.payload}")
self.private_key_export_available = False
return False
else:
self.logger.error(f"Unexpected response when fetching private key: {result.type}")
self.private_key_export_available = False
return False
except Exception as e:
self.logger.error(f"Error fetching private key from device: {e}")
self.private_key_export_available = False
return False
async def create_jwt_with_private_key(self, audience: str = None) -> Optional[str]:
"""Create JWT using on-device signing (preferred) or private key from device"""
try:
if not create_auth_token_async and not create_auth_token:
return None
# Build claims
claims = {}
if audience:
claims['aud'] = audience
# Add optional owner public key if configured
owner_public_key = os.getenv('PACKETCAPTURE_OWNER_PUBLIC_KEY', '').strip()
if owner_public_key:
# Validate it's a valid hex string of correct length (64 hex chars = 32 bytes)
if len(owner_public_key) == 64 and all(c in '0123456789ABCDEFabcdef' for c in owner_public_key):
claims['owner'] = owner_public_key.upper()
else:
self.logger.warning(f"Invalid owner public key format (expected 64 hex characters): {owner_public_key[:16]}...")
# Add optional email if configured
email = os.getenv('PACKETCAPTURE_OWNER_EMAIL', '').strip()
if email:
# Normalize to lowercase
email = email.lower()
# Validate email format using a simple regex
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(email_pattern, email):
claims['email'] = email
else:
self.logger.warning(f"Invalid email format: {email}")
# Add optional client agent/version if configured, otherwise use default from status message
client_agent = os.getenv('PACKETCAPTURE_CLIENT_AGENT', '').strip()
if not client_agent:
# Default to the same value used in status messages
client_agent = self._load_client_version()
if client_agent:
claims['client'] = client_agent
# Prefer on-device signing if meshcore instance is available and connected
if (create_auth_token_async and
self.meshcore and
self.meshcore.is_connected and
os.getenv('AUTH_TOKEN_METHOD', '').lower().strip() not in ('python', 'meshcore-decoder')):
try:
# Use on-device signing (no private key needed)
# Don't pass private_key_hex so auth_token.py will fail fast if device signing fails
jwt_token = await create_auth_token_async(
self.device_public_key,
meshcore_instance=self.meshcore,
**claims
)
self.logger.info("✓ JWT created using on-device signing")
return jwt_token
except Exception as e:
# Device signing failed - fall back to private key if available
self.logger.debug(f"On-device signing failed: {e}, attempting private key fallback...")
# Fallback to private key signing (skip if device-only mode is enabled)
device_only = os.getenv('AUTH_TOKEN_DEVICE_ONLY', '').lower().strip() == 'true'
if device_only:
self.logger.error("Device-only signing mode enabled but device signing failed or not available")
return None
# Fallback to private key signing - load from env/file first, then try device if needed
if not self.device_private_key:
# Try to load from environment variable first
env_private_key = self.get_env('PRIVATE_KEY', '')
if env_private_key:
self.device_private_key = env_private_key
self.logger.info("Device signing failed, using private key from environment")
# Try to read from private key file
elif read_private_key_file:
private_key_file = self.get_env('PRIVATE_KEY_FILE', '')
if private_key_file and Path(private_key_file).exists():
try:
self.device_private_key = read_private_key_file(private_key_file)
self.logger.info(f"Device signing failed, using private key from file: {private_key_file}")
except Exception as e:
self.logger.warning(f"Failed to read private key from file {private_key_file}: {e}")
# If still no private key, try fetching from device
if not self.device_private_key:
self.logger.info("Device signing not available, fetching private key from device for fallback...")
private_key_fetch_success = await self.fetch_private_key_from_device()
if not private_key_fetch_success:
self.logger.warning("Cannot create JWT: device signing failed and private key not available from device or environment")
return None
# Convert bytearray to hex string if needed
private_key = self.device_private_key
if isinstance(private_key, (bytes, bytearray)):
private_key = private_key.hex()
# Use async version if available (for consistency), otherwise sync version
if create_auth_token_async:
jwt_token = await create_auth_token_async(
self.device_public_key,
private_key_hex=private_key,
**claims
)
else:
jwt_token = create_auth_token(self.device_public_key, private_key, **claims)
self.logger.info("✓ JWT created using private key from device")
return jwt_token
except Exception as e:
device_only = os.getenv('AUTH_TOKEN_DEVICE_ONLY', '').lower().strip() == 'true'
if device_only:
self.logger.error(f"Device-only signing mode: JWT creation failed: {e}")
else:
self.logger.error(f"Error creating JWT: {e}", exc_info=True)
return None
async def create_auth_token_jwt(self, audience: str = None, broker_num: int = None) -> Optional[str]:
"""Create JWT token using on-device signing or private key from device"""
# Use on-device signing (preferred) or private key method (fallback)
# The create_jwt_with_private_key() method already logs which method was used
jwt_token = await self.create_jwt_with_private_key(audience)
if jwt_token:
# Store token with expiry time if broker_num is provided
if broker_num is not None:
import time
import json
import base64
# Parse token to get expiry time
try:
parts = jwt_token.split('.')
if len(parts) == 3:
# Decode payload to get expiry
payload_data = base64.urlsafe_b64decode(parts[1] + '==')
payload = json.loads(payload_data)
expires_at = payload.get('exp', time.time() + 86400) # Default 24h if not found
self.jwt_tokens[broker_num] = {
'token': jwt_token,
'expires_at': expires_at,
'audience': audience
}
if self.debug:
self.logger.debug(f"JWT token stored for broker {broker_num}, expires at {expires_at}")
except Exception as e:
self.logger.warning(f"Could not parse JWT expiry: {e}")
return jwt_token
self.logger.error("Failed to create JWT with private key from device")
return None
def is_jwt_token_expired(self, broker_num: int) -> bool:
"""Check if JWT token for broker is expired or near expiry"""
if broker_num not in self.jwt_tokens:
return True
import time
current_time = time.time()
token_info = self.jwt_tokens[broker_num]
expires_at = token_info['expires_at']
# Check if token is expired or within renewal threshold
return current_time >= (expires_at - self.jwt_renewal_threshold)
async def renew_jwt_token(self, broker_num: int) -> bool:
"""Renew JWT token for a specific broker"""
try:
if broker_num not in self.jwt_tokens:
self.logger.warning(f"No existing JWT token for broker {broker_num}")
return False
token_info = self.jwt_tokens[broker_num]
audience = token_info.get('audience')
self.logger.info(f"Renewing JWT token for broker {broker_num}...")
# Create new token
new_token = await self.create_auth_token_jwt(audience, broker_num)
if new_token:
self.logger.info(f"✓ JWT token renewed for broker {broker_num}")
# Reset failure count on success
self.jwt_failure_count = 0
return True
else:
self.logger.error(f"Failed to renew JWT token for broker {broker_num}")
# Increment failure count
self.jwt_failure_count += 1
self.jwt_circuit_breaker_reset_time = time.time()
return False
except Exception as e:
self.logger.error(f"Error renewing JWT token for broker {broker_num}: {e}")
# Increment failure count
self.jwt_failure_count += 1
self.jwt_circuit_breaker_reset_time = time.time()
return False
async def check_jwt_renewal_for_broker(self, broker_num: int):
"""Check and renew JWT token for a specific broker if needed"""
try:
if broker_num not in self.jwt_tokens:
return
if self.is_jwt_token_expired(broker_num):
self.logger.info(f"JWT token for broker {broker_num} needs renewal")
# Renew the token
renewal_success = await self.renew_jwt_token(broker_num)
if renewal_success:
# Find the broker client and update credentials
for client_info in self.mqtt_clients:
if client_info['broker_num'] == broker_num:
mqtt_client = client_info['client']
new_token = self.jwt_tokens[broker_num]['token']
username = f"v1_{self.device_public_key.upper()}"
# Update credentials and reconnect
mqtt_client.username_pw_set(username, new_token)
mqtt_client.reconnect()
self.logger.info(f"✓ Updated credentials for MQTT broker {broker_num}")
break
else:
self.logger.error(f"Failed to renew JWT token for broker {broker_num}")
except Exception as e:
self.logger.error(f"Error checking JWT renewal for broker {broker_num}: {e}")
async def check_and_renew_jwt_tokens(self):
"""Check all JWT tokens and renew if needed"""
try:
for broker_num in list(self.jwt_tokens.keys()):
await self.check_jwt_renewal_for_broker(broker_num)
except Exception as e:
self.logger.error(f"Error checking JWT token renewals: {e}")
def _is_tcp_sdk_auto_reconnect_active(self) -> bool:
"""
Check if TCP SDK auto-reconnect is active and handling reconnection.
Returns:
True if TCP connection with SDK auto-reconnect enabled and not exhausted
"""
return (self.connection_type == 'tcp' and
self.tcp_sdk_auto_reconnect_enabled and
not self.sdk_reconnect_exhausted)
def _get_connection_timeout_config(self, default_timeout: float = 5.0, default_retries: int = None):
"""
Get timeout and retry configuration based on connection type.
Args:
default_timeout: Default timeout for connections without special handling
default_retries: Default number of retries (None = use connection-specific default)
Returns:
Tuple of (timeout, retries) appropriate for the current connection type
"""
if self.connection_type == 'ble':
retries = self.health_check_retry_limit if self.health_check_retry_limit is not None else self.ble_retry_limit
return (12.0, retries) # Longer timeout and more retries for BLE on Linux
elif self._is_tcp_sdk_auto_reconnect_active():
retries = self.health_check_retry_limit if self.health_check_retry_limit is not None else self.tcp_retry_limit
return (8.0, retries) # Longer timeout for TCP with SDK auto-reconnect
else:
retries = self.health_check_retry_limit if self.health_check_retry_limit is not None else (default_retries or self.default_retry_limit)
return (default_timeout, retries)
def _ensure_connected(self, command_name: str = "command", log_level: str = "debug") -> bool:
"""
Check if device is connected, logging appropriately if not.
Args:
command_name: Name of the command being executed (for logging)
log_level: Log level to use ("debug", "warning", "error")
Returns:
True if connected, False otherwise
"""
if not self.meshcore or not self.meshcore.is_connected:
message = f"Cannot execute {command_name} - not connected to device"
if log_level == "error":
self.logger.error(message)
elif log_level == "warning":
self.logger.warning(message)
else:
self.logger.debug(message)
return False
return True
def _reset_connection_state(self):
"""
Reset all connection-related state variables after successful connection/reconnection.
This includes health check counters, SDK reconnect flags, and consecutive failure counts.
"""
self.connected = True
self.health_check_failure_count = 0
if self.connection_type == 'tcp':
self.sdk_reconnect_exhausted = False
self.reset_consecutive_failures("connection")
async def _start_auto_message_fetching_if_enabled(self):
"""Start meshcore auto-fetch loop when PACKETCAPTURE_DRAIN_MESSAGES is enabled (default)."""
if not self.drain_messages:
self.logger.info(
"PACKETCAPTURE_DRAIN_MESSAGES is false: skipping auto message fetch (device message queue will not be drained)"
)
return
await self.meshcore.start_auto_message_fetching()
async def _setup_after_reconnection(self):
"""
Perform all setup tasks required after a successful reconnection.
This includes cleaning up old subscriptions, setting up event handlers,
and starting auto message fetching.
"""
# Clean up old subscriptions before re-setting up handlers
# (SDK may have recreated the instance, leaving old subscriptions orphaned)
self.cleanup_event_subscriptions()
# Re-setup event handlers after reconnection
await self.setup_event_handlers()
await self._start_auto_message_fetching_if_enabled()
def _check_ble_grace_period(self, failure_reason: str = "failed") -> bool:
"""
Check if BLE health check failure should be allowed under grace period.
Args:
failure_reason: Description of why the health check failed (for logging)
Returns:
True if failure is within grace period and should be allowed, False otherwise
"""
if self.connection_type == 'ble' and self.meshcore and self.meshcore.is_connected:
self.health_check_failure_count += 1
if self.health_check_failure_count <= self.health_check_grace_period:
if self.debug:
self.logger.debug(
f"Health check {failure_reason} but BLE connection appears active "
f"(grace period: {self.health_check_failure_count}/{self.health_check_grace_period})"
)
return True # Allow grace period for BLE
else:
self.logger.warning(
f"Health check {failure_reason} {self.health_check_failure_count} times consecutively - "
"connection may be degraded"
)
return False
return False
async def check_connection_health(self) -> bool:
"""Enhanced health check with network validation"""
try:
# 1. Check if meshcore object exists and reports connected
if not self.meshcore or not self.meshcore.is_connected:
# For TCP with SDK auto-reconnect, don't log warning if SDK is still trying
if self._is_tcp_sdk_auto_reconnect_active():
if self.debug:
self.logger.debug("MeshCore reports not connected, but SDK auto-reconnect is active")
return False
self.logger.warning("MeshCore reports not connected")
return False
# 2. For TCP connections, verify socket state
if self.connection_type == 'tcp':
transport = get_transport(self.meshcore)
if transport:
if transport.is_closing():
# For TCP with SDK auto-reconnect, SDK will handle reconnection
if self.tcp_sdk_auto_reconnect_enabled and not self.sdk_reconnect_exhausted:
if self.debug:
self.logger.debug("TCP transport is closing, but SDK auto-reconnect is active")
return False
self.logger.warning("TCP transport is closed or closing")
return False
# 3. Try a lightweight command with timeout and retry
# Use longer timeout for BLE connections (Linux BLE can be slow) and TCP with SDK auto-reconnect
health_check_timeout, health_check_retries = self._get_connection_timeout_config()
try:
result = await self.retryable_device_command(
lambda: self.meshcore.commands.send_device_query(),
"send_device_query (health check)",
timeout=health_check_timeout,
max_retries=health_check_retries, # Uses connection-specific or health_check_retry_limit override
retry_delay=0.3 # Slightly longer delay for health checks
)
if result and hasattr(result, 'type') and result.type != EventType.ERROR:
# Success - reset failure count
self.health_check_failure_count = 0
return True
else:
if self.debug:
self.logger.debug(f"Health check device query failed: {result}")
# For BLE, if is_connected is True, we might still consider it healthy
# (BLE can have slow responses but connection might still be valid)
if self._check_ble_grace_period("query failed"):
return True
return False
except asyncio.TimeoutError:
# For TCP with SDK auto-reconnect, timeout might just mean device is busy
# SDK will handle reconnection if needed, so don't log as warning
if self._is_tcp_sdk_auto_reconnect_active():
if self.debug:
self.logger.debug("Health check timed out, but SDK auto-reconnect is active")
return False
# For BLE, allow grace period even on timeout if connection appears active
if self._check_ble_grace_period("timed out"):
return True
self.logger.warning("Health check timed out")
return False
except Exception as e:
# For TCP with SDK auto-reconnect, errors might be temporary
if self._is_tcp_sdk_auto_reconnect_active():
if self.debug:
error_type = type(e).__name__
self.logger.debug(f"Health check command failed ({error_type}), but SDK auto-reconnect is active")
return False
# Log detailed error information for debugging
error_type = type(e).__name__
error_msg = str(e)
# Check if it's an errno error (common on macOS/Linux)
errno_value = getattr(e, 'errno', None)
if errno_value is not None:
import errno
try:
errno_name = errno.errorcode.get(errno_value, f"UNKNOWN({errno_value})")
self.logger.warning(f"Health check command failed: {error_type} [{errno_name}]: {error_msg}")
except (AttributeError, KeyError):
self.logger.warning(f"Health check command failed: {error_type} [errno={errno_value}]: {error_msg}")
else:
self.logger.warning(f"Health check command failed: {error_type}: {error_msg}")
return False
except Exception as e:
# For TCP with SDK auto-reconnect, don't log as warning if SDK is handling it
if self._is_tcp_sdk_auto_reconnect_active():
if self.debug:
self.logger.debug(f"Connection health check failed ({type(e).__name__}), but SDK auto-reconnect is active")
return False
self.logger.warning(f"Connection health check failed: {e}")
return False
def check_mqtt_health(self) -> bool:
"""Check MQTT broker health with grace period before counting failures"""
import time
if not self.enable_mqtt or not self.mqtt_clients:
return True # MQTT not enabled or no brokers configured
current_time = time.time()
connected_brokers = 0
failed_brokers = 0
total_brokers = len(self.mqtt_clients)
# Check each broker's connection status
for client_info in self.mqtt_clients:
broker_num = client_info['broker_num']
mqtt_client = client_info['client']
if mqtt_client.is_connected():
# Broker is connected - clear any disconnect timestamp
if broker_num in self.mqtt_disconnect_timestamps:
disconnect_duration = current_time - self.mqtt_disconnect_timestamps[broker_num]
self.logger.info(f"MQTT{broker_num} reconnected after {disconnect_duration:.1f} seconds")
del self.mqtt_disconnect_timestamps[broker_num]
# Reset consecutive failures on successful reconnection
self.reset_consecutive_failures("mqtt")
connected_brokers += 1
else:
# Broker is disconnected
# Record disconnect timestamp if not already recorded
if broker_num not in self.mqtt_disconnect_timestamps:
self.mqtt_disconnect_timestamps[broker_num] = current_time
self.logger.debug(f"MQTT{broker_num} disconnected - grace period started")
# Check if grace period has elapsed
disconnect_time = self.mqtt_disconnect_timestamps[broker_num]
time_disconnected = current_time - disconnect_time
if time_disconnected >= self.mqtt_grace_period:
# Grace period elapsed - this broker has persistently failed
failed_brokers += 1
if self.debug:
self.logger.debug(f"MQTT{broker_num} disconnected for {time_disconnected:.1f}s (grace period: {self.mqtt_grace_period}s) - persistent failure")
# If all enabled brokers have been disconnected past grace period, this is a failure
# We require ALL brokers to be failed, not just one, to avoid false positives with multiple brokers
all_brokers_failed = (failed_brokers == total_brokers and total_brokers > 0)
if all_brokers_failed:
if self.debug:
self.logger.debug(f"All {total_brokers} MQTT broker(s) have persistent failures")
return not all_brokers_failed
async def connect(self) -> bool:
"""Connect to MeshCore node using official package"""
try:
self.logger.info("Connecting to MeshCore node...")
# Clean up any existing connection before attempting new one
# This prevents pending tasks from interfering with new connections
if self.meshcore:
try:
self.cleanup_event_subscriptions()
self.meshcore.stop()
await self.meshcore.disconnect()
except Exception as cleanup_error:
self.logger.debug(f"Error cleaning up existing connection before reconnect: {cleanup_error}")
self.meshcore = None
# Brief delay to ensure cleanup completes
await asyncio.sleep(0.2)
# Get connection type from environment
connection_type = self.get_env('CONNECTION_TYPE', 'ble').lower()
self.connection_type = connection_type # Store for health checks
self.logger.info(f"Using connection type: {connection_type}")
if connection_type == 'serial':
# Create serial connection
serial_port = self.get_env('SERIAL_PORTS', '/dev/ttyUSB0')
# Handle comma-separated ports (take first one for now)
if ',' in serial_port:
serial_port = serial_port.split(',')[0].strip()
self.logger.info(f"Connecting via serial port: {serial_port}")
self.meshcore = await meshcore.MeshCore.create_serial(serial_port, debug=False)
elif connection_type == 'tcp':
# Create TCP connection with SDK auto-reconnect if enabled
tcp_host = self.get_env('TCP_HOST', 'localhost')
tcp_port = self.get_env_int('TCP_PORT', 5000)
self.logger.info(f"Connecting via TCP to {tcp_host}:{tcp_port}")
# Enable SDK auto-reconnect for TCP connections
create_kwargs = {'debug': False}
if self.tcp_sdk_auto_reconnect_enabled:
create_kwargs['auto_reconnect'] = True
create_kwargs['max_reconnect_attempts'] = self.tcp_sdk_max_reconnect_attempts
self.logger.info(f"SDK auto-reconnect enabled with max {self.tcp_sdk_max_reconnect_attempts} attempts")
else:
self.logger.info("SDK auto-reconnect disabled - using custom reconnect logic")
self.meshcore = await meshcore.MeshCore.create_tcp(tcp_host, tcp_port, **create_kwargs)
# Reset SDK reconnect exhaustion flag on new connection
self.sdk_reconnect_exhausted = False
# Enable TCP keepalive if configured
# Access transport via: meshcore.cx.connection.transport
# (MeshCore.cx is ConnectionManager, connection is TCPConnection)
if self.tcp_keepalive_enabled:
transport = get_transport(self.meshcore)
if transport:
try:
if enable_tcp_keepalive(
transport,
idle=self.tcp_keepalive_idle,
interval=self.tcp_keepalive_interval,
count=self.tcp_keepalive_count
):
self.logger.info(f"TCP keepalive enabled (idle={self.tcp_keepalive_idle}s, interval={self.tcp_keepalive_interval}s, count={self.tcp_keepalive_count})")
else:
self.logger.warning("Failed to enable TCP keepalive")
except Exception as e:
self.logger.warning(f"Could not enable TCP keepalive: {e}")
else:
if self.debug:
# Only log as debug to avoid noise if transport is genuinely not accessible
self.logger.debug("Could not access transport for TCP keepalive configuration (transport may not be exposed by meshcore library)")
else:
# Log as info since this is a known limitation, not a critical error
self.logger.info("TCP keepalive configuration skipped (transport not accessible)")
elif not self.tcp_keepalive_enabled:
self.logger.debug("TCP keepalive disabled by configuration")
else:
# Create BLE connection (default)
# Support both BLE_ADDRESS and BLE_DEVICE for MAC address
ble_address = self.get_env('BLE_ADDRESS', None) or self.get_env('BLE_DEVICE', None)
# Support both BLE_DEVICE_NAME and BLE_NAME for device name
ble_device_name = self.get_env('BLE_DEVICE_NAME', None) or self.get_env('BLE_NAME', None)
if self.debug:
self.logger.debug(f"BLE connection config - Address: {ble_address}, Name: {ble_device_name}")
self.logger.debug(f"Environment check - BLE_ADDRESS: {self.get_env('BLE_ADDRESS', None)}, BLE_DEVICE: {self.get_env('BLE_DEVICE', None)}")
self.logger.debug(f"Environment check - BLE_DEVICE_NAME: {self.get_env('BLE_DEVICE_NAME', None)}, BLE_NAME: {self.get_env('BLE_NAME', None)}")
if ble_address:
# Direct address connection
self.logger.info(f"Connecting via BLE to address: {ble_address}")
if self.debug:
self.logger.debug(f"Using BLE address from environment: {ble_address}")
self.meshcore = await meshcore.MeshCore.create_ble(ble_address, debug=False)
elif ble_device_name:
# Try to find device by name - the meshcore library handles name matching internally
self.logger.info(f"Scanning for BLE device with name: {ble_device_name}")
try:
# The meshcore library will automatically find devices by name during scanning
self.meshcore = await meshcore.MeshCore.create_ble(ble_device_name, debug=False)
except Exception as e:
self.logger.error(f"Error connecting to device '{ble_device_name}': {e}")
# Clean up any partial connection
if self.meshcore:
try:
self.meshcore.stop()
await self.meshcore.disconnect()
except:
pass
self.meshcore = None
# Fallback to general scan
self.logger.info("Falling back to general BLE scan...")
self.meshcore = await meshcore.MeshCore.create_ble(debug=False)
else:
# No specific device, just scan and connect to first available
self.logger.info("Scanning for available BLE devices...")
self.meshcore = await meshcore.MeshCore.create_ble(debug=False)
# Wait a brief moment for connection to fully establish (especially for BLE)
if self.meshcore and self.connection_type == 'ble':
await asyncio.sleep(0.5)
# Retry connection check a few times in case it's still establishing
for attempt in range(3):
if self.meshcore.is_connected:
break
if attempt < 2:
await asyncio.sleep(0.5)
if self.meshcore and self.meshcore.is_connected:
self._reset_connection_state()
self.logger.info(f"Connected to: {self.meshcore.self_info}")
# Wait for self_info to be populated (it may be empty initially, especially for serial)
# Check if self_info has actual content (not just empty dict)
max_wait_attempts = 10
wait_interval = 0.5
self_info_populated = False
for attempt in range(max_wait_attempts):
if self.meshcore.self_info and (
self.meshcore.self_info.get('name') or
self.meshcore.self_info.get('public_key')
):
self_info_populated = True
break
if attempt < max_wait_attempts - 1:
self.logger.debug(f"Waiting for device info to populate (attempt {attempt + 1}/{max_wait_attempts})...")
await asyncio.sleep(wait_interval)
# Try to trigger device info by sending a query (for serial connections especially)
if not self_info_populated and hasattr(self.meshcore, 'commands'):
try:
self.logger.debug("Attempting to query device info...")
result = await self.retryable_device_command(
lambda: self.meshcore.commands.send_device_query(),
"send_device_query (device info)",
timeout=3.0,
max_retries=self.device_info_retry_limit, # Use device info retry limit
retry_delay=0.2
)
# Wait a bit more after query
await asyncio.sleep(0.5)
if self.meshcore.self_info and (
self.meshcore.self_info.get('name') or
self.meshcore.self_info.get('public_key')
):
self_info_populated = True
except Exception as e:
self.logger.debug(f"Device query failed (non-critical): {e}")
# Store device information for origin field
if self_info_populated and self.meshcore.self_info:
self.device_name = self.meshcore.self_info.get('name', 'Unknown')
self.device_public_key = self.meshcore.self_info.get('public_key', 'Unknown')
# Normalize public key to uppercase
if self.device_public_key != 'Unknown':
self.device_public_key = self.device_public_key.upper()
# Extract radio information
radio_freq = self.meshcore.self_info.get('radio_freq', 0)
radio_bw = self.meshcore.self_info.get('radio_bw', 0)
radio_sf = self.meshcore.self_info.get('radio_sf', 0)
radio_cr = self.meshcore.self_info.get('radio_cr', 0)
self.radio_info = f"{radio_freq},{radio_bw},{radio_sf},{radio_cr}"
self.logger.info(f"Device name: {self.device_name}")
self.logger.info(f"Device public key: {self.device_public_key}")
self.logger.info(f"Radio info: {self.radio_info}")
else:
# Fallback: Use configured origin or default
self.logger.warning("Device info not available from connection, using fallback")
self.device_name = self.get_env('ORIGIN', 'MeshCore Device')
self.device_public_key = 'Unknown'
self.radio_info = "0,0,0,0"
self.logger.info(f"Using fallback device name: {self.device_name}")
self.logger.info("You can set PACKETCAPTURE_ORIGIN in .env.local to customize the device name")
# Set radio clock to current system time
await self.set_radio_clock()
# Don't publish status here - wait for MQTT connections
# Status will be published after MQTT connections are established
# Setup JWT authentication - will use on-device signing (preferred)
# Private key fallback will be loaded lazily only if device signing fails
self.logger.info("Setting up JWT authentication...")
self.logger.info("✓ JWT authentication: Will use on-device signing")
return True
else:
self.logger.error("Failed to connect to MeshCore node")
# Clean up failed connection attempt to prevent pending tasks
if self.meshcore:
try:
self.cleanup_event_subscriptions()
self.meshcore.stop()
await self.meshcore.disconnect()
except Exception as cleanup_error:
self.logger.debug(f"Error cleaning up failed connection: {cleanup_error}")
self.meshcore = None
return False
except Exception as e:
self.logger.error(f"Connection failed: {e}")
# Clean up any partial connection on exception
if self.meshcore:
try:
self.cleanup_event_subscriptions()
self.meshcore.stop()
await self.meshcore.disconnect()
except Exception as cleanup_error:
self.logger.debug(f"Error cleaning up failed connection: {cleanup_error}")
self.meshcore = None
return False
def cleanup_event_subscriptions(self):
"""Clean up all event subscriptions before disconnecting to prevent pending tasks"""
if not self.meshcore:
return
try:
# Use meshcore.unsubscribe() method which is the proper API
if hasattr(self.meshcore, "dispatcher") and hasattr(self.meshcore.dispatcher, "subscriptions"):
subscription_count = len(self.meshcore.dispatcher.subscriptions)
if subscription_count > 0:
self.logger.debug(f"Cleaning up {subscription_count} event subscriptions")
# Create a copy of the list to avoid modification during iteration
for subscription in list(self.meshcore.dispatcher.subscriptions):
try:
# Use meshcore.unsubscribe() - the proper API method
self.meshcore.unsubscribe(subscription)
except Exception as e:
self.logger.debug(f"Error unsubscribing: {e}")
self.logger.debug(f"Cleared {subscription_count} event subscriptions")
except Exception as e:
self.logger.debug(f"Error cleaning up subscriptions: {e}")
async def reconnect_meshcore(self) -> bool:
"""Attempt to reconnect to MeshCore device with exponential backoff retry logic"""
if self.max_connection_retries > 0 and self.connection_retry_count >= self.max_connection_retries:
self.logger.error(f"Maximum connection retry attempts ({self.max_connection_retries}) reached")
# Track service failure for systemd restart decision
if self.track_service_failure("MeshCore connection exhausted",
f"Failed {self.connection_retry_count} reconnection attempts"):
return False
return False
self.connection_retry_count += 1
# Calculate exponential backoff delay
delay = self.calculate_connection_retry_delay(self.connection_retry_count)
self.logger.info(f"Attempting MeshCore reconnection (attempt {self.connection_retry_count}/{self.max_connection_retries if self.max_connection_retries > 0 else ''}) with {delay:.1f}s delay...")
# Clean up existing connection
# Capture BLE address before disconnecting (needed for bluetoothctl cleanup)
ble_device = None
if self.meshcore and self.connection_type == 'ble':
# Try to get BLE address from meshcore object before disconnecting
try:
# Check if meshcore has address attribute (BLE connections often do)
if hasattr(self.meshcore, 'address') and self.meshcore.address:
ble_device = self.meshcore.address
except Exception:
pass
# Fallback to environment variables
if not ble_device:
ble_device = self.get_env('BLE_DEVICE', '') or self.get_env('BLE_ADDRESS', '')
if self.meshcore:
try:
# Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks
self.cleanup_event_subscriptions()
# Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors
try:
self.meshcore.stop()
except Exception as e:
self.logger.debug(f"Error stopping meshcore event dispatcher: {e}")
# Disconnect the connection
await self.meshcore.disconnect()
except Exception as e:
self.logger.debug(f"Error disconnecting during reconnect: {e}")
self.meshcore = None
# For BLE connections, ensure full cleanup including OS-level disconnect
if self.connection_type == 'ble':
# On Linux, force disconnect via bluetoothctl to ensure clean state
import platform
if platform.system() == 'Linux':
try:
import subprocess
if ble_device and ble_device != 'Unknown':
self.logger.debug(f"Force disconnecting BLE device {ble_device} via bluetoothctl...")
subprocess.run(['bluetoothctl', 'disconnect', ble_device],
capture_output=True, timeout=10)
await asyncio.sleep(1) # Give time for disconnection
except Exception as e:
self.logger.debug(f"Could not force BLE disconnect via bluetoothctl: {e}")
else:
# On non-Linux systems, add a short delay to ensure BLE cleanup completes
await asyncio.sleep(0.5)
# Wait before retrying with exponential backoff
if delay > 0:
self.logger.info(f"Waiting {delay:.1f} seconds before retry (exponential backoff)...")
if await self.wait_with_shutdown(delay):
return False # Shutdown was requested during delay
# Attempt to reconnect
success = await self.connect()
if success:
self.connection_retry_count = 0 # Reset counter on successful connection
self.logger.info("MeshCore reconnection successful")
else:
self.logger.warning(f"MeshCore reconnection attempt {self.connection_retry_count} failed")
return success
async def connection_monitor(self):
"""Monitor connection health and attempt reconnection if needed"""
if self.health_check_interval <= 0:
if self.debug:
self.logger.debug("Connection monitoring disabled (health_check_interval <= 0)")
return
if self.debug:
self.logger.debug(f"Starting connection monitoring (health check every {self.health_check_interval} seconds)")
# Track last MQTT health check time separately
last_mqtt_check = 0
while not self.should_exit:
try:
if await self.wait_with_shutdown(self.health_check_interval):
break # Shutdown was requested
# Check if we need to reconnect (either disconnected or health check failed)
# For TCP with SDK auto-reconnect, only check health if SDK has exhausted
if self._is_tcp_sdk_auto_reconnect_active():
# SDK is handling reconnection - just check if it succeeded
if self.meshcore and self.meshcore.is_connected:
if not self.connected:
# SDK reconnected - update our state
self._reset_connection_state()
self.logger.info("SDK auto-reconnect succeeded - connection restored")
await self._setup_after_reconnection()
# Skip health check and reconnect logic - let SDK handle it
continue
# For other connection types or after SDK has exhausted, do normal health check
health_check_passed = await self.check_connection_health()
needs_reconnection = not self.connected or not health_check_passed
if needs_reconnection:
# For non-TCP connections, or TCP after SDK has exhausted, use custom reconnect
if not self.connected:
self.logger.info("Connection is disconnected, attempting reconnection...")
else:
self.logger.warning("MeshCore connection health check failed, attempting reconnection...")
# Attempt to reconnect
if await self.reconnect_meshcore():
self.logger.info("MeshCore reconnection successful, resuming packet capture")
self._reset_connection_state()
await self._setup_after_reconnection()
else:
self.logger.error("MeshCore reconnection failed, will retry on next health check")
# Track consecutive failures for more intelligent failure detection
if self.track_consecutive_failure("connection"):
return # Exit if service failure threshold reached
# Check MQTT health periodically (separate interval to avoid being too aggressive)
import time
current_time = time.time()
if self.enable_mqtt and (current_time - last_mqtt_check) >= self.mqtt_health_check_interval:
last_mqtt_check = current_time
mqtt_healthy = self.check_mqtt_health()
if not mqtt_healthy:
# All brokers have been disconnected past grace period - this is a persistent failure
self.logger.warning("MQTT health check failed - all brokers disconnected past grace period")
# Track consecutive failures for more intelligent failure detection
if self.track_consecutive_failure("mqtt"):
return # Exit if service failure threshold reached
elif self.debug:
self.logger.debug("MQTT health check passed")
# JWT token renewal is now handled proactively in safe_publish()
# and by the dedicated jwt_renewal_scheduler task
except asyncio.CancelledError:
if self.debug:
self.logger.debug("Connection monitoring cancelled")
break
except Exception as e:
self.logger.error(f"Error in connection monitoring: {e}")
if await self.wait_with_shutdown(5):
break # Shutdown was requested
def sanitize_client_id(self, name):
"""Convert device name to valid MQTT client ID"""
client_id = self.get_env("CLIENT_ID_PREFIX", "meshcore_client_") + name.replace(" ", "_")
client_id = re.sub(r"[^a-zA-Z0-9_-]", "", client_id)
return client_id[:23]
def on_mqtt_connect(self, client, userdata, flags, rc, properties=None):
broker_name = userdata.get('name', 'unknown') if userdata else 'unknown'
broker_num = userdata.get('broker_num', None) if userdata else None
if rc == 0:
self.mqtt_connected = True
self.logger.info(f"Connected to MQTT broker: {broker_name}")
# Clear disconnect timestamp if this was a reconnection
if broker_num and broker_num in self.mqtt_disconnect_timestamps:
import time
disconnect_duration = time.time() - self.mqtt_disconnect_timestamps[broker_num]
self.logger.info(f"MQTT{broker_num} reconnected after {disconnect_duration:.1f} seconds")
del self.mqtt_disconnect_timestamps[broker_num]
# Reset consecutive failures on successful reconnection
self.reset_consecutive_failures("mqtt")
# JWT renewal is handled by the dedicated JWT renewal scheduler
# No need to check here as it will be handled proactively
# Don't publish status here - it will be published after device connection
# This callback fires when MQTT connects, but device might not be ready yet
self.logger.debug(f"MQTT broker {broker_name} connected, waiting for device connection...")
else:
self.logger.error(f"MQTT connection failed for {broker_name} with code {rc}")
def on_mqtt_disconnect(self, client, userdata, disconnect_flags, reason_code, properties):
broker_name = userdata.get('name', 'unknown') if userdata else 'unknown'
# Handle both integer and ReasonCode object types
if hasattr(reason_code, 'value'):
# ReasonCode object - get the integer value
reason_code_int = reason_code.value
else:
# Integer or other type
reason_code_int = int(reason_code) if reason_code is not None else 0
# Provide more specific logging for different disconnect reasons
if reason_code_int == mqtt.MQTT_ERR_KEEPALIVE:
self.logger.warning(f"Disconnected from MQTT broker {broker_name} (code: Keep alive timeout)")
self.logger.info("This may be due to network latency or firewall timeouts. Connection will be retried.")
elif reason_code_int == mqtt.MQTT_ERR_CONN_LOST:
self.logger.warning(f"Disconnected from MQTT broker {broker_name} (code: Connection lost)")
self.logger.info("Network connection was lost. Connection will be retried.")
elif reason_code_int == mqtt.MQTT_ERR_CONN_REFUSED:
self.logger.warning(f"Disconnected from MQTT broker {broker_name} (code: Connection refused)")
self.logger.info("Server refused the connection. Check credentials and server configuration.")
elif reason_code_int == mqtt.MQTT_ERR_AUTH:
self.logger.warning(f"Disconnected from MQTT broker {broker_name} (code: Authentication failed)")
self.logger.info("Authentication failed. Check username/password or auth token.")
elif reason_code_int == mqtt.MQTT_ERR_ACL_DENIED:
self.logger.warning(f"Disconnected from MQTT broker {broker_name} (code: ACL denied)")
self.logger.info("Access denied. Check topic permissions and broker ACL settings.")
elif reason_code_int == mqtt.MQTT_ERR_TLS:
self.logger.warning(f"Disconnected from MQTT broker {broker_name} (code: TLS error)")
self.logger.info("TLS/SSL error occurred. Check certificate configuration.")
else:
# Map numeric codes to human-readable names
error_names = {
0: "Success",
1: "Out of memory",
2: "Protocol error",
3: "Invalid arguments",
4: "Not connected",
5: "Connection refused",
6: "Not found",
7: "Connection lost",
8: "TLS error",
9: "Payload too large",
10: "Not supported",
11: "Authentication failed",
12: "ACL denied",
13: "Unknown error",
14: "System error",
15: "Queue size exceeded",
16: "Keepalive timeout"
}
error_name = error_names.get(reason_code_int, f"Unknown error code {reason_code_int}")
self.logger.warning(f"Disconnected from MQTT broker {broker_name} (code: {reason_code_int} - {error_name})")
# Check if any brokers are still connected (excluding the one that just disconnected)
connected_brokers = []
for info in self.mqtt_clients:
if info['client'] != client and info['client'].is_connected():
connected_brokers.append(info)
if not connected_brokers:
self.mqtt_connected = False
# Record disconnect timestamp for each disconnected broker (will be tracked in health check)
import time
for info in self.mqtt_clients:
if info['client'] == client and info['broker_num'] not in self.mqtt_disconnect_timestamps:
self.mqtt_disconnect_timestamps[info['broker_num']] = time.time()
self.logger.debug(f"MQTT{info['broker_num']} disconnect recorded - grace period started")
# Only attempt reconnection if we're not shutting down
if not self.should_exit:
self.logger.warning("All MQTT brokers disconnected. paho-mqtt will attempt reconnection automatically...")
self.logger.info(f"Grace period: {self.mqtt_grace_period}s before counting as persistent failure")
# Don't exit immediately - let reconnection logic and health check handle it
else:
self.logger.info("All MQTT brokers disconnected during shutdown")
else:
self.logger.info(f"Still connected to {len(connected_brokers)} broker(s)")
async def connect_mqtt_broker(self, broker_num):
"""Connect to a single MQTT broker"""
if not self.device_name:
self.logger.error("Cannot connect to MQTT without device name")
return None
# Check if broker is enabled
if not self.get_env_bool(f'MQTT{broker_num}_ENABLED', False):
self.logger.debug(f"MQTT broker {broker_num} is disabled, skipping")
return None
# Validate IATA configuration for brokers that require it
if self.broker_requires_iata(broker_num) and not self.has_configured_iata(broker_num):
server = self.get_env(f'MQTT{broker_num}_SERVER', 'unknown')
self.logger.warning(
f"WARNING: MQTT broker {broker_num} ({server}) requires IATA configuration but IATA code is not set.\n"
f" This broker will be DISABLED during startup.\n"
f" To fix this issue:\n"
f" 1. Set a global IATA code: PACKETCAPTURE_IATA=<airport_code>\n"
f" 2. Or set a broker-specific IATA: PACKETCAPTURE_MQTT{broker_num}_IATA=<airport_code>\n"
f" 3. Valid IATA codes are 3-letter airport identifiers (e.g., JFK, LAX, SFO)\n"
f" 4. Restart the packet capture service after setting the IATA code"
)
return None
try:
# Create client ID
client_id = self.sanitize_client_id(self.device_public_key or self.device_name)
if broker_num > 1:
client_id += f"_{broker_num}"
self.logger.info(f"Connecting to MQTT{broker_num} with client ID: {client_id}")
# Get transport type
transport = self.get_env(f'MQTT{broker_num}_TRANSPORT', 'tcp')
mqtt_client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2,
client_id=client_id,
clean_session=True,
transport=transport
)
# Enable paho-mqtt's built-in reconnection
mqtt_client.enable_logger(self.logger)
mqtt_client.reconnect_delay_set(min_delay=1, max_delay=120)
# Set user data for callbacks
mqtt_client.user_data_set({
'name': f"MQTT{broker_num}",
'broker_num': broker_num
})
# Handle authentication
use_auth_token = self.get_env_bool(f'MQTT{broker_num}_USE_AUTH_TOKEN', False)
if use_auth_token:
try:
username = f"v1_{self.device_public_key.upper()}"
audience = self.get_env(f'MQTT{broker_num}_TOKEN_AUDIENCE', "")
if audience:
self.logger.info(f"MQTT{broker_num}: Using JWT authentication [aud: {audience}]")
else:
self.logger.info(f"MQTT{broker_num}: Using JWT authentication")
# Use the JWT creation method with private key from device
password = await self.create_auth_token_jwt(audience, broker_num)
if not password:
self.logger.error(f"MQTT{broker_num}: Failed to generate JWT token")
return None
# Log JWT details for debugging if debug mode is enabled
if self.debug:
self.logger.debug(f"MQTT{broker_num}: Generated JWT: {password}")
try:
import base64
parts = password.split('.')
if len(parts) == 3:
header = base64.urlsafe_b64decode(parts[0] + '==').decode('utf-8')
payload = base64.urlsafe_b64decode(parts[1] + '==').decode('utf-8')
self.logger.debug(f"MQTT{broker_num}: JWT Header: {header}")
self.logger.debug(f"MQTT{broker_num}: JWT Payload: {payload}")
self.logger.debug(f"MQTT{broker_num}: JWT Signature length: {len(base64.urlsafe_b64decode(parts[2] + '=='))} bytes")
except Exception as e:
self.logger.debug(f"Could not decode JWT for inspection: {e}")
mqtt_client.username_pw_set(username, password)
except Exception as e:
self.logger.error(f"MQTT{broker_num}: Failed to generate auth token: {e}")
return None
else:
# Username/password authentication
username = self.get_env(f'MQTT{broker_num}_USERNAME', "")
password = self.get_env(f'MQTT{broker_num}_PASSWORD', "")
if username:
mqtt_client.username_pw_set(username, password)
# Set Last Will and Testament
lwt_topic = self.get_topic("status", broker_num)
lwt_payload = json.dumps({
"status": "offline",
"timestamp": datetime.now().isoformat(),
"origin": self.device_name,
"origin_id": self.device_public_key.upper() if self.device_public_key and self.device_public_key != 'Unknown' else 'DEVICE'
})
lwt_qos = self.get_env_int(f'MQTT{broker_num}_QOS', 0)
lwt_retain = self.get_env_bool(f'MQTT{broker_num}_RETAIN', True)
mqtt_client.will_set(lwt_topic, lwt_payload, qos=lwt_qos, retain=lwt_retain)
# Set callbacks
mqtt_client.on_connect = self.on_mqtt_connect
mqtt_client.on_disconnect = self.on_mqtt_disconnect
# Get connection parameters
server = self.get_env(f'MQTT{broker_num}_SERVER', "")
if not server:
self.logger.error(f"MQTT{broker_num}: Server not configured")
return None
port = self.get_env_int(f'MQTT{broker_num}_PORT', 1883)
# Handle TLS/SSL
use_tls = self.get_env_bool(f'MQTT{broker_num}_USE_TLS', False)
if use_tls:
import ssl
tls_verify = self.get_env_bool(f'MQTT{broker_num}_TLS_VERIFY', True)
if tls_verify:
mqtt_client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
mqtt_client.tls_insecure_set(False)
else:
mqtt_client.tls_set(cert_reqs=ssl.CERT_NONE)
mqtt_client.tls_insecure_set(True)
self.logger.warning(f"MQTT{broker_num}: TLS certificate verification disabled (insecure)")
# Handle WebSocket transport
if transport == "websockets":
mqtt_client.ws_set_options(
path="/",
headers=None
)
# Connect with adaptive keep-alive based on transport type
if transport == "websockets":
# WebSocket connections need longer keep-alive to handle network latency
keepalive = self.get_env_int(f'MQTT{broker_num}_KEEPALIVE', 120)
else:
# TCP connections can use shorter keep-alive
keepalive = self.get_env_int(f'MQTT{broker_num}_KEEPALIVE', 60)
mqtt_client.connect(server, port, keepalive=keepalive)
mqtt_client.loop_start()
self.logger.info(f"Connected to MQTT{broker_num} at {server}:{port} (transport={transport}, tls={use_tls})")
return {
'client': mqtt_client,
'broker_num': broker_num
}
except Exception as e:
self.logger.error(f"MQTT connection error for MQTT{broker_num}: {str(e)}")
return None
async def connect_mqtt(self):
"""Connect to all configured MQTT brokers"""
# Try to connect to MQTT1, MQTT2, MQTT3, MQTT4, MQTT5, MQTT6 (can expand if needed)
for broker_num in range(1, 7):
client_info = await self.connect_mqtt_broker(broker_num)
if client_info:
self.mqtt_clients.append(client_info)
if len(self.mqtt_clients) == 0:
self.logger.error("Failed to connect to any MQTT broker")
return False
self.logger.info(f"Connected to {len(self.mqtt_clients)} MQTT broker(s)")
# Publish initial status with firmware version now that MQTT is connected
if self.enable_mqtt:
await asyncio.sleep(1) # Give MQTT connections a moment to stabilize
await self.publish_status("online")
return True
def disconnect_mqtt(self):
"""Disconnect from all MQTT brokers and clean up connections"""
if self.mqtt_clients:
self.logger.info(f"Disconnecting from {len(self.mqtt_clients)} MQTT broker(s)...")
for client_info in self.mqtt_clients:
try:
mqtt_client = client_info['client']
broker_num = client_info['broker_num']
if mqtt_client.is_connected():
mqtt_client.loop_stop()
mqtt_client.disconnect()
self.logger.debug(f"Disconnected from MQTT{broker_num}")
except Exception as e:
self.logger.warning(f"Error disconnecting from MQTT{broker_num}: {e}")
# Clear the clients list
self.mqtt_clients.clear()
self.mqtt_connected = False
async def publish_status(self, status, client=None, broker_num=None, refresh_stats=True):
"""Publish status with additional information"""
firmware_info = await self.get_firmware_info()
status_msg = {
"status": status,
"timestamp": datetime.now().isoformat(),
"origin": self.device_name,
"origin_id": self.device_public_key.upper() if self.device_public_key and self.device_public_key != 'Unknown' else 'DEVICE',
"model": firmware_info.get('model', 'unknown'),
"firmware_version": firmware_info.get('version', 'unknown'),
"radio": self.radio_info or "unknown",
"client_version": self._load_client_version()
}
# Attach stats (online status only) if supported and enabled
if (
status.lower() == "online"
and self.stats_status_enabled
):
stats_payload = None
if refresh_stats:
# Always force refresh stats right before publishing to ensure fresh data
stats_payload = await self.refresh_stats(force=True)
if not stats_payload:
self.logger.debug("Stats refresh returned no data - stats will not be included in status message")
elif self.latest_stats:
stats_payload = dict(self.latest_stats)
if stats_payload:
status_msg["stats"] = stats_payload
elif self.debug:
self.logger.debug("No stats payload available - status message will not include stats")
if client:
self.safe_publish(None, json.dumps(status_msg), retain=True, client=client, broker_num=broker_num, topic_type="status")
else:
self.safe_publish(None, json.dumps(status_msg), retain=True, topic_type="status")
if self.debug:
self.logger.debug(f"Published status: {status}")
def stats_commands_available(self) -> bool:
"""Detect whether the connected meshcore build exposes stats commands."""
if not self.meshcore or not hasattr(self.meshcore, "commands"):
return False
commands = self.meshcore.commands
required = ["get_stats_core", "get_stats_radio"]
available = all(callable(getattr(commands, attr, None)) for attr in required)
state = "available" if available else "missing"
if state != self.stats_capability_state:
if available:
self.logger.info("MeshCore stats commands detected - status messages will include device stats")
else:
self.logger.info("MeshCore stats commands not available - skipping stats in status messages")
self.stats_capability_state = state
self.stats_supported = available
return available
async def refresh_stats(self, force: bool = False):
"""Fetch stats from the radio and cache them for status publishing."""
if not self.stats_status_enabled:
if self.debug:
self.logger.debug("Stats refresh skipped: stats_status_enabled is False")
return None
if not self._ensure_connected("refresh_stats", "debug"):
return None
if self.stats_refresh_interval <= 0:
if self.debug:
self.logger.debug("Stats refresh skipped: stats_refresh_interval is 0 or negative")
return None
if not self.stats_commands_available():
if self.debug:
self.logger.debug("Stats refresh skipped: stats commands not available")
return None
now = time.time()
if (
not force
and self.latest_stats
and (now - self.last_stats_fetch) < max(60, self.stats_refresh_interval // 2)
):
return dict(self.latest_stats)
async with self.stats_fetch_lock:
# Another coroutine may have completed the refresh while we waited
if (
not force
and self.latest_stats
and (time.time() - self.last_stats_fetch) < max(60, self.stats_refresh_interval // 2)
):
return dict(self.latest_stats)
stats_payload = {}
try:
core_result = await self.retryable_device_command(
lambda: self.meshcore.commands.get_stats_core(),
"get_stats_core",
timeout=8.0,
max_retries=self.stats_retry_limit, # Use stats retry limit
retry_delay=0.2
)
if core_result and core_result.type == EventType.STATS_CORE and core_result.payload:
stats_payload.update(core_result.payload)
elif core_result and core_result.type == EventType.ERROR:
self.logger.debug(f"Core stats unavailable: {core_result.payload}")
except Exception as exc:
self.logger.debug(f"Error fetching core stats: {exc}")
try:
radio_result = await self.retryable_device_command(
lambda: self.meshcore.commands.get_stats_radio(),
"get_stats_radio",
timeout=8.0,
max_retries=self.stats_retry_limit, # Use stats retry limit
retry_delay=0.2
)
if radio_result and radio_result.type == EventType.STATS_RADIO and radio_result.payload:
stats_payload.update(radio_result.payload)
elif radio_result and radio_result.type == EventType.ERROR:
self.logger.debug(f"Radio stats unavailable: {radio_result.payload}")
except Exception as exc:
self.logger.debug(f"Error fetching radio stats: {exc}")
if stats_payload:
self.latest_stats = stats_payload
self.last_stats_fetch = time.time()
if self.debug:
self.logger.debug(f"Updated stats cache: {self.latest_stats}")
elif self.debug:
self.logger.debug("Stats refresh completed but returned no data")
return dict(self.latest_stats) if self.latest_stats else None
async def stats_refresh_scheduler(self):
"""Periodically refresh stats and publish them via MQTT."""
if self.stats_refresh_interval <= 0 or not self.stats_status_enabled:
return
while not self.should_exit:
try:
# Only fetch stats when we're about to publish status
if self.enable_mqtt and self.mqtt_connected:
await self.publish_status("online", refresh_stats=True)
except asyncio.CancelledError:
break
except Exception as exc:
self.logger.debug(f"Stats refresh error: {exc}")
if await self.wait_with_shutdown(self.stats_refresh_interval):
break
def safe_publish(self, topic, payload, retain=False, client=None, broker_num=None, topic_type=None):
"""Publish to one or all MQTT brokers and return publish metrics."""
metrics = {"attempted": 0, "succeeded": 0}
if not self.mqtt_connected:
self.logger.warning(f"Not connected - skipping publish to {topic or topic_type}")
return metrics
# Proactively check for expired tokens before publishing
if self.enable_mqtt:
try:
# Check if any tokens are expired and need renewal
expired_brokers = []
for broker_num in list(self.jwt_tokens.keys()):
if self.is_jwt_token_expired(broker_num):
expired_brokers.append(broker_num)
if expired_brokers:
self.logger.warning(f"Detected expired JWT tokens for brokers: {expired_brokers}")
# Check circuit breaker before attempting JWT renewal
current_time = time.time()
if (current_time - self.jwt_circuit_breaker_reset_time) > self.jwt_circuit_breaker_timeout:
self.jwt_failure_count = 0 # Reset circuit breaker
if self.jwt_failure_count >= self.max_jwt_failures:
self.logger.warning(f"JWT circuit breaker open - too many failures ({self.jwt_failure_count}). Skipping JWT renewal.")
return metrics
# Schedule renewal only if not already in progress (prevent task explosion)
if not self.jwt_renewal_in_progress:
self.jwt_renewal_in_progress = True
task = asyncio.create_task(self.check_and_renew_jwt_tokens())
self.active_tasks.add(task)
task.add_done_callback(lambda t: (self.active_tasks.discard(t), setattr(self, 'jwt_renewal_in_progress', False)))
except Exception as e:
self.logger.debug(f"Error checking token expiry before publish: {e}")
if client:
clients_to_publish = [info for info in self.mqtt_clients if info['client'] == client]
else:
clients_to_publish = self.mqtt_clients
for mqtt_client_info in clients_to_publish:
current_broker_num = mqtt_client_info['broker_num']
try:
mqtt_client = mqtt_client_info['client']
# Check individual client connection status
if not mqtt_client.is_connected():
self.logger.warning(f"MQTT{current_broker_num} client not connected - skipping publish")
continue
# CRITICAL FIX: Resolve topic properly
if topic_type:
resolved_topic = self.get_topic(topic_type, current_broker_num)
if self.debug:
self.logger.debug(f"Resolved topic for MQTT{current_broker_num} {topic_type}: {resolved_topic}")
elif topic:
resolved_topic = topic
else:
self.logger.error("Neither topic nor topic_type provided to safe_publish")
continue
# Skip publishing if topic is None (e.g., RAW topic not configured)
if resolved_topic is None:
if self.debug:
self.logger.debug(f"Skipping publish to MQTT{current_broker_num} - topic not configured for {topic_type}")
continue
# Validate topic before publishing
if not resolved_topic:
self.logger.error(f"Failed to resolve topic (type={topic_type}, topic={topic})")
continue
qos = self.get_env_int(f'MQTT{current_broker_num}_QOS', 0)
# Force QoS 1 to 0 to prevent retry storms (like mctomqtt.py)
if qos == 1:
qos = 0
# Only count as attempted if we actually try to publish
metrics["attempted"] += 1
result = mqtt_client.publish(resolved_topic, payload, qos=qos, retain=retain)
if result.rc != mqtt.MQTT_ERR_SUCCESS:
self.logger.error(f"Publish failed to {resolved_topic} on MQTT{current_broker_num}: {mqtt.error_string(result.rc)}")
else:
if self.verbose:
self.logger.info(f"✓ Published to {resolved_topic} on MQTT{current_broker_num} (len={len(payload)})")
metrics["succeeded"] += 1
except Exception as e:
self.logger.error(f"Publish error on MQTT{current_broker_num}: {str(e)}", exc_info=True)
return metrics
def parse_advert(self, payload):
"""Parse advert payload - matches C++ AdvertDataHelpers.h implementation"""
try:
# The advert header is fixed-width: pubkey (32) + timestamp (4) + signature (64).
if len(payload) < 100:
self.logger.error(f"ADVERT payload too short for header: {len(payload)} bytes")
return {
"advert_parse_ok": False,
"advert_error": "payload_too_short_header",
"advert_payload_len": len(payload),
}
# advert header
pub_key = payload[0:32]
timestamp = int.from_bytes(payload[32:32+4], "little")
signature = payload[36:36+64]
advert = {
"advert_parse_ok": True,
"public_key": pub_key.hex(),
"advert_time": timestamp,
"signature": signature.hex(),
}
# appdata - parse according to C++ AdvertDataParser
app_data = payload[100:]
if len(app_data) == 0:
self.logger.error("ADVERT has no app data")
return advert
flags_byte = app_data[0]
# Log the full flag byte for debugging
if self.debug:
self.logger.debug(f"ADVERT flags: 0x{flags_byte:02X} (binary: {flags_byte:08b})")
# Create flags object with the full byte value
flags = AdvertFlags(flags_byte)
# Extract type from lower 4 bits (matches C++ getType())
adv_type = flags_byte & 0x0F
if adv_type == AdvertFlags.ADV_TYPE_CHAT:
advert.update({"mode": DeviceRole.Companion.name})
elif adv_type == AdvertFlags.ADV_TYPE_REPEATER:
advert.update({"mode": DeviceRole.Repeater.name})
elif adv_type == AdvertFlags.ADV_TYPE_ROOM:
advert.update({"mode": DeviceRole.RoomServer.name})
elif adv_type == AdvertFlags.ADV_TYPE_SENSOR:
advert.update({"mode": "Sensor"})
else:
advert.update({"mode": f"Type{adv_type}"})
# Parse data according to C++ AdvertDataParser logic
i = 1 # Start after flags byte
# Parse location data if present (matches C++ hasLatLon())
if AdvertFlags.ADV_LATLON_MASK in flags:
if len(app_data) < i + 8:
self.logger.error(f"ADVERT with location flag too short: {len(app_data)} bytes")
return advert
lat = int.from_bytes(app_data[i:i+4], 'little', signed=True)
lon = int.from_bytes(app_data[i+4:i+8], 'little', signed=True)
advert.update({"lat": round(lat / 1000000.0, 6), "lon": round(lon / 1000000.0, 6)})
i += 8
# Parse feat1 data if present
if AdvertFlags.ADV_FEAT1_MASK in flags:
if len(app_data) < i + 2:
self.logger.error(f"ADVERT with feat1 flag too short: {len(app_data)} bytes")
return advert
feat1 = int.from_bytes(app_data[i:i+2], 'little')
advert.update({"feat1": feat1})
i += 2
# Parse feat2 data if present
if AdvertFlags.ADV_FEAT2_MASK in flags:
if len(app_data) < i + 2:
self.logger.error(f"ADVERT with feat2 flag too short: {len(app_data)} bytes")
return advert
feat2 = int.from_bytes(app_data[i:i+2], 'little')
advert.update({"feat2": feat2})
i += 2
# Parse name data if present (matches C++ hasName())
if AdvertFlags.ADV_NAME_MASK in flags:
if len(app_data) >= i:
name_len = len(app_data) - i
if name_len > 0:
try:
# Decode name and handle potential null terminators
name = app_data[i:].decode('utf-8', errors='ignore').rstrip('\x00')
advert.update({"name": name})
except Exception as e:
self.logger.warning(f"Failed to decode ADVERT name: {e}")
return advert
except Exception as e:
self.logger.error(f"Error parsing ADVERT payload: {e}", exc_info=True)
return {
"advert_parse_ok": False,
"advert_error": "exception",
"advert_error_detail": str(e),
"advert_payload_len": len(payload) if payload is not None else 0,
}
def decode_and_publish_message(self, raw_data):
"""Decode message - matches Packet.cpp exactly"""
byte_data = bytes.fromhex(raw_data)
try:
# Validate minimum packet size
if len(byte_data) < 2:
self.logger.error(f"Packet too short: {len(byte_data)} bytes")
return None
header = byte_data[0]
# Extract route type
route_type = RouteType(header & 0x03)
has_transport = route_type in [RouteType.TRANSPORT_FLOOD, RouteType.TRANSPORT_DIRECT]
# Calculate path length offset based on presence of transport codes
offset = 1
if has_transport:
offset += 4
# Check if we have enough data for path_len
if len(byte_data) <= offset:
self.logger.error(f"Packet too short for path_len at offset {offset}: {len(byte_data)} bytes")
return None
path_len_byte = byte_data[offset]
offset += 1
# MeshCore packs path_len byte: low 6 bits hop count, high 2 bits hash-size mode.
path_byte_len, path_hash_bytes = self._decode_packed_path_length(path_len_byte)
if self.debug:
self.logger.debug(
"Decoded path length: "
f"path_len_byte=0x{path_len_byte:02X}, path_byte_len={path_byte_len}, "
f"path_hash_bytes={path_hash_bytes}, offset_after_path_len={offset}"
)
# Check if we have enough data for the full path
if len(byte_data) < offset + path_byte_len:
self.logger.error(f"Packet too short for path (need {offset + path_byte_len}, have {len(byte_data)})")
return None
# Extract path
path_bytes = byte_data[offset:offset + path_byte_len]
offset += path_byte_len
# Remaining data is payload
payload = byte_data[offset:]
if self.debug:
self.logger.debug(
f"Packet layout: packet_len={len(byte_data)}, payload_offset={offset}, payload_len={len(payload)}"
)
# Extract payload version (bits 6-7)
payload_version = PayloadVersion((header >> 6) & 0x03)
# Only accept VER_1 (version 0)
if payload_version != PayloadVersion.VER_1:
self.logger.warning(f"Encountered an unknown packet version. Version: {payload_version.value} RAW: {raw_data}")
return None
# Extract payload type (bits 2-5)
payload_type = PayloadType((header >> 2) & 0x0F)
# Convert path bytes to hop tokens using decoded hash width (1/2/3 bytes)
path_values = self._split_path_hops(path_bytes, path_hash_bytes)
message = {
"payload_type": payload_type.name,
"payload_type_value": payload_type.value,
"payload_version": payload_version.name,
"route_type": route_type.name,
"path": path_values,
"path_len_byte": path_len_byte,
"path_byte_len": path_byte_len,
"path_hash_bytes": path_hash_bytes,
}
payload_value = {}
if payload_type is PayloadType.ADVERT:
payload_value = self.parse_advert(payload)
if not payload_value.get("advert_parse_ok", False):
self.logger.warning(
"Dropping malformed ADVERT packet: "
f"{payload_value.get('advert_error', 'unknown_error')} "
f"(payload_len={payload_value.get('advert_payload_len', len(payload))})"
)
return None
if payload_type is PayloadType.ADVERT:
message.update(payload_value)
else:
message.update(payload_value)
if self.debug:
self.logger.debug(f"Successfully decoded: route={message['route_type']}, type={message['payload_type']}")
return message
except Exception as e:
# Log as ERROR not DEBUG so we can see what's failing
self.logger.error(f"Error decoding packet (len={len(byte_data)}): {e}", exc_info=True)
self.logger.error(f"Failed packet hex: {raw_data}")
return None
def _decode_packed_path_length(self, path_len_byte: int, max_path_size: int = 64) -> tuple[int, int]:
"""Decode packed path length byte per MeshCore firmware.
path_len layout:
- low 6 bits: hop count
- high 2 bits: bytes-per-hop minus 1
"""
hop_count = path_len_byte & 0x3F
bytes_per_hop = (path_len_byte >> 6) + 1
# Mode 3 => 4 bytes/hop is reserved in firmware; fallback to legacy interpretation.
if bytes_per_hop == 4:
if self.debug:
self.logger.debug(
"Path decode fallback to legacy length due to reserved hash-size mode: "
f"path_len_byte=0x{path_len_byte:02X}"
)
return path_len_byte, 1
path_byte_len = hop_count * bytes_per_hop
if path_byte_len > max_path_size:
# Invalid packed value; fallback keeps compatibility with legacy one-byte parsing.
if self.debug:
self.logger.debug(
"Path decode fallback to legacy length due to oversized packed path: "
f"path_len_byte=0x{path_len_byte:02X}, hop_count={hop_count}, "
f"bytes_per_hop={bytes_per_hop}, computed_path_byte_len={path_byte_len}, "
f"max_path_size={max_path_size}"
)
return path_len_byte, 1
if self.debug:
self.logger.debug(
"Path decode packed mode: "
f"path_len_byte=0x{path_len_byte:02X}, hop_count={hop_count}, "
f"bytes_per_hop={bytes_per_hop}, path_byte_len={path_byte_len}"
)
return path_byte_len, bytes_per_hop
def _split_path_hops(self, path_bytes: bytes, bytes_per_hop: int) -> list[str]:
"""Split path bytes into per-hop hex tokens."""
path_hex = path_bytes.hex()
hop_hex_chars = max(bytes_per_hop, 1) * 2
if hop_hex_chars <= 0:
hop_hex_chars = 2
nodes = [path_hex[i:i + hop_hex_chars] for i in range(0, len(path_hex), hop_hex_chars)]
if (len(path_hex) % hop_hex_chars) != 0:
nodes = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)]
return nodes
def calculate_packet_hash(self, raw_hex: str, payload_type: int = None) -> str:
"""Calculate hash for packet identification - based on packet.cpp"""
try:
# Parse the packet to extract payload type and payload data
byte_data = bytes.fromhex(raw_hex)
header = byte_data[0]
# Get payload type from header (bits 2-5)
if payload_type is None:
payload_type = (header >> 2) & 0x0F
# Check if transport codes are present
route_type = header & 0x03
has_transport = route_type in [0x00, 0x03] # TRANSPORT_FLOOD or TRANSPORT_DIRECT
# Calculate path length offset dynamically based on transport codes
offset = 1 # After header
if has_transport:
offset += 4 # Skip 4 bytes of transport codes
if len(byte_data) <= offset:
self.logger.debug(f"Packet too short for path_len while hashing: len={len(byte_data)}, offset={offset}")
return "0000000000000000"
# Read packed path_len byte from wire
path_len_byte = byte_data[offset]
offset += 1
# Skip past the path to get to payload
path_byte_len, _ = self._decode_packed_path_length(path_len_byte)
payload_start = offset + path_byte_len
if payload_start > len(byte_data):
self.logger.debug(
f"Packet too short for decoded path while hashing: need {payload_start}, have {len(byte_data)}"
)
return "0000000000000000"
payload_data = byte_data[payload_start:]
# Calculate hash exactly like MeshCore Packet::calculatePacketHash():
# 1. Payload type (1 byte)
# 2. Path length (2 bytes as uint16_t, little-endian) - ONLY for TRACE packets (type 9)
# 3. Payload data
hash_obj = hashlib.sha256()
hash_obj.update(bytes([payload_type]))
if payload_type == 9: # PAYLOAD_TYPE_TRACE
# C++ does: sha.update(&path_len, sizeof(path_len))
# path_len is uint16_t, so sizeof(path_len) = 2 bytes
# Convert wire path_len byte to 2-byte little-endian uint16_t
hash_obj.update(path_len_byte.to_bytes(2, byteorder='little'))
hash_obj.update(payload_data)
# Return first 16 hex characters (8 bytes) in uppercase
return hash_obj.hexdigest()[:16].upper()
except Exception as e:
self.logger.debug(f"Error calculating hash: {e}")
return "0000000000000000"
def format_packet_data(self, raw_hex: str, rf_data: Optional[Dict] = None) -> Dict[str, Any]:
"""Format packet data to match mctomqtt.py exactly"""
current_time = datetime.now()
timestamp = current_time.isoformat()
# Decode packet using the same logic as mctomqtt.py
decoded_message = self.decode_and_publish_message(raw_hex)
# Extract basic info
packet_len = len(raw_hex) // 2 # Convert hex string to byte count
# Get route type from decoded message
route = "U" # Default
packet_type = "0" # Default
payload_len = "0" # Default
# Initialize firmware payload length early
firmware_payload_len = None
if rf_data:
firmware_payload_len = rf_data.get('payload_length')
if decoded_message:
# Map route type names to single letters like mctomqtt.py
route_map = {
"TRANSPORT_FLOOD": "F",
"FLOOD": "F",
"DIRECT": "D",
"TRANSPORT_DIRECT": "T"
}
route = route_map.get(decoded_message.get('route_type', ''), "U")
# Get payload type as string - now matches C++ definitions exactly
payload_type_map = {
"REQ": "0",
"RESPONSE": "1",
"TXT_MSG": "2",
"ACK": "3",
"ADVERT": "4",
"GRP_TXT": "5",
"GRP_DATA": "6",
"ANON_REQ": "7",
"PATH": "8",
"TRACE": "9",
"MULTIPART": "10",
"CONTROL": "11",
"Type12": "12",
"Type13": "13",
"Type14": "14",
"RAW_CUSTOM": "15"
}
packet_type = payload_type_map.get(decoded_message.get('payload_type', ''), "0")
# Use firmware-provided payload length if available, otherwise calculate
if firmware_payload_len is not None:
payload_len = str(firmware_payload_len)
else:
# Fallback calculation if firmware doesn't provide it
if decoded_message and 'path' in decoded_message:
# Calculate actual payload length from the raw data
# Total bytes - header(1) - transport(4 if present) - path_length(1) - path_bytes
path_len_bytes = decoded_message.get('path_byte_len')
if path_len_bytes is None:
path_len_bytes = len(decoded_message.get('path', []))
has_transport = decoded_message.get('route_type') in ['TRANSPORT_FLOOD', 'TRANSPORT_DIRECT']
transport_bytes = 4 if has_transport else 0
payload_len = str(max(0, packet_len - 1 - transport_bytes - 1 - path_len_bytes))
else:
# Fallback calculation
payload_len = str(max(0, packet_len - 1))
# Get origin_id (use device info if available, otherwise use config or generate)
origin_id = None
if self.device_public_key and self.device_public_key != 'Unknown':
origin_id = self.device_public_key
else:
# Try to get from environment as fallback
origin_id = self.get_env('ORIGIN_ID', None)
if not origin_id:
# Generate a hash from device name as last resort
device_name = self.device_name or 'Unknown'
origin_id = hashlib.sha256(device_name.encode()).hexdigest()
self.logger.warning(f"Using generated origin_id from device name: {origin_id}")
# Normalize origin_id to uppercase
if origin_id and origin_id != 'Unknown':
origin_id = origin_id.upper()
# Extract RF data if available
snr = "Unknown"
rssi = "Unknown"
if rf_data:
snr = str(rf_data.get('snr', 'Unknown'))
rssi = str(rf_data.get('rssi', 'Unknown'))
# Build the packet data structure to match mctomqtt.py exactly
packet_data = {
"origin": self.device_name or self.get_env('ORIGIN', 'MeshCore Device'),
"origin_id": origin_id,
"timestamp": timestamp,
"type": "PACKET",
"direction": "rx",
"time": current_time.strftime("%H:%M:%S"),
"date": current_time.strftime("%d/%m/%Y"),
"len": str(packet_len),
"packet_type": packet_type,
"route": route,
"payload_len": payload_len,
"raw": raw_hex.upper(),
"SNR": snr,
"RSSI": rssi,
"hash": self.calculate_packet_hash(raw_hex, decoded_message.get('payload_type_value') if decoded_message else None)
}
# Add path for route=D like mctomqtt.py
if route == "D" and decoded_message and 'path' in decoded_message:
packet_data["path"] = ",".join(decoded_message['path'])
return packet_data
async def handle_rf_log_data(self, event):
"""Handle RF log data events to cache SNR/RSSI information and process packets"""
try:
payload = event.payload
if 'snr' in payload:
# Try to get packet data - prefer 'payload' field, fallback to 'raw_hex'
raw_hex = None
# First, try the 'payload' field (already stripped of framing bytes)
if 'payload' in payload and payload['payload']:
raw_hex = payload['payload']
# Fallback to raw_hex with first 2 bytes stripped
elif 'raw_hex' in payload and payload['raw_hex']:
raw_hex = payload['raw_hex'][4:] # Skip first 2 bytes (4 hex chars)
if raw_hex:
packet_prefix = raw_hex[:32]
rf_data = {
'snr': payload.get('snr'),
'rssi': payload.get('rssi'),
'timestamp': time.time(),
'raw_hex': raw_hex,
'payload_length': payload.get('payload_length')
}
self.rf_data_cache[packet_prefix] = rf_data
# Clean up old cache entries
current_time = time.time()
timeout = self.get_env_float('RF_DATA_TIMEOUT', 15.0)
self.rf_data_cache = {
k: v for k, v in self.rf_data_cache.items()
if current_time - v['timestamp'] < timeout
}
# Remember RF-originated packets so RAW_DATA for the same reception doesn't double-publish.
self.recent_rf_packets[raw_hex.upper()] = current_time
self.recent_rf_packets = {
k: v for k, v in self.recent_rf_packets.items()
if current_time - v < self.raw_duplicate_window
}
# Process the packet
await self.process_packet_from_rf_data(raw_hex, rf_data)
else:
self.logger.warning(f"RF log data missing both 'payload' and 'raw_hex' fields: {payload.keys()}")
except Exception as e:
self.logger.error(f"Error handling RF log data: {e}", exc_info=True)
async def process_packet_from_rf_data(self, raw_hex: str, rf_data: dict):
"""Process packet data from RF log data"""
try:
# Format packet data
packet_data = self.format_packet_data(raw_hex, rf_data)
# Output the packet data
publish_metrics = self.output_packet(packet_data)
self.packet_count += 1
# Standard log line format for both modes
self.logger.info(f"📦 Captured packet #{self.packet_count}: {packet_data['route']} type {packet_data['packet_type']}, {packet_data['len']} bytes, SNR: {packet_data['SNR']}, RSSI: {packet_data['RSSI']}, hash: {packet_data['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})")
# Output full packet data structure in debug mode only
if self.debug:
self.logger.debug("📋 Full packet data structure:")
import json
self.logger.debug(json.dumps(packet_data, indent=2))
except Exception as e:
self.logger.error(f"Error processing packet from RF data: {e}")
async def handle_raw_data(self, event):
"""Handle raw data events (full packet data)"""
try:
payload = event.payload
self.logger.info(f"📦 RAW_DATA EVENT RECEIVED")
# Extract raw hex data
raw_hex = None
if hasattr(payload, 'data'):
raw_hex = payload.data
elif 'data' in payload:
raw_hex = payload['data']
elif 'raw_hex' in payload:
raw_hex = payload['raw_hex']
if raw_hex:
# Remove 0x prefix if present
if raw_hex.startswith('0x'):
raw_hex = raw_hex[2:]
raw_hex = raw_hex.upper()
current_time = time.time()
recent_rf_time = self.recent_rf_packets.get(raw_hex)
if recent_rf_time is not None and (current_time - recent_rf_time) < self.raw_duplicate_window:
if self.debug:
self.logger.debug("Skipping RAW_DATA packet already processed from RX_LOG_DATA")
return
self.recent_rf_packets = {
k: v for k, v in self.recent_rf_packets.items()
if current_time - v < self.raw_duplicate_window
}
# Find corresponding RF data
packet_prefix = raw_hex[:32]
rf_data = self.rf_data_cache.get(packet_prefix)
# Format packet data
packet_data = self.format_packet_data(raw_hex, rf_data)
# Output the packet data
publish_metrics = self.output_packet(packet_data)
self.packet_count += 1
self.logger.info(f"📦 Captured packet #{self.packet_count}: {packet_data['route']} type {packet_data['packet_type']}, {packet_data['len']} bytes, SNR: {packet_data['SNR']}, RSSI: {packet_data['RSSI']}, hash: {packet_data['hash']} (MQTT: {publish_metrics['succeeded']}/{publish_metrics['attempted']})")
except Exception as e:
self.logger.error(f"Error handling raw data event: {e}")
def output_packet(self, packet_data: Dict[str, Any]):
"""Output packet data to console, file, and MQTT"""
# Convert to JSON
json_data = json.dumps(packet_data, indent=2)
# Output JSON packet data to console only in verbose mode
if self.verbose:
self.logger.info("=" * 80)
self.logger.info(json_data)
self.logger.info("=" * 80)
# Output to file if specified
if self.output_handle:
self.output_handle.write(json_data + "\n")
self.output_handle.flush()
# Filter by packet type if configured (only affects MQTT upload, not file/console output)
if self.allowed_upload_types is not None:
packet_type = packet_data.get('packet_type')
if packet_type not in self.allowed_upload_types:
# Skip MQTT upload but already wrote to file/console above
if self.debug:
self.logger.debug(f"Filtered out packet type {packet_type} from upload (not in allowed types: {sorted(self.allowed_upload_types)})")
# Return zero metrics since we didn't upload
return {"attempted": 0, "succeeded": 0}
# Publish to MQTT if enabled
publish_metrics = {"attempted": 0, "succeeded": 0}
if self.enable_mqtt:
# Publish full packet data
packet_metrics = self.safe_publish(None, json.dumps(packet_data), topic_type="packets")
# Publish raw data only to brokers that have RAW topic explicitly configured
raw_data = {
"origin": packet_data["origin"],
"origin_id": packet_data["origin_id"],
"timestamp": packet_data["timestamp"],
"type": "RAW",
"data": packet_data["raw"]
}
raw_metrics = self.safe_publish(None, json.dumps(raw_data), topic_type="raw")
# Combine metrics: sum up all successful publishes across all brokers
# Each broker publishes to its configured topics independently
publish_metrics["attempted"] = packet_metrics["attempted"] + raw_metrics["attempted"]
publish_metrics["succeeded"] = packet_metrics["succeeded"] + raw_metrics["succeeded"]
return publish_metrics
async def setup_disconnect_handler(self):
"""Set up handler for disconnect events from meshcore"""
async def on_disconnect(event):
reason = event.payload.get('reason', 'unknown')
self.logger.warning(f"Disconnect event received: {reason}")
if reason == 'tcp_no_response':
self.logger.error("Disconnected due to no TCP responses - possible WiFi issue")
elif reason == 'tcp_disconnect':
self.logger.error("TCP connection closed by remote - possible radio reset")
elif reason == 'ble_disconnect':
self.logger.error("BLE connection lost - device may have moved out of range")
elif reason == 'serial_disconnect':
self.logger.error("Serial connection lost - cable may be disconnected")
else:
self.logger.warning(f"Disconnected for unknown reason: {reason}")
# For TCP connections with SDK auto-reconnect, this event means SDK has exhausted its attempts
if self.connection_type == 'tcp' and self.tcp_sdk_auto_reconnect_enabled:
self.sdk_reconnect_exhausted = True
self.logger.info("SDK auto-reconnect has exhausted - custom reconnect logic will take over")
# Update connection status - connection monitor will handle reconnection
self.connected = False
self.logger.info("Connection status updated - connection monitor will handle reconnection")
self.meshcore.subscribe(EventType.DISCONNECTED, on_disconnect)
self.logger.debug("Disconnect event handler registered")
async def setup_event_handlers(self):
"""Setup event handlers for packet capture"""
# Clean up any existing subscriptions before setting up new ones
# This prevents orphaned EventDispatcher tasks when reconnecting
self.cleanup_event_subscriptions()
# Handle RF log data for SNR/RSSI information
async def on_rf_data(event):
if self.debug:
self.logger.debug(f"RF_DATA event received: {event}")
await self.handle_rf_log_data(event)
# Handle raw data events (full packet data)
async def on_raw_data(event):
if self.debug:
self.logger.debug(f"RAW_DATA event received: {event}")
await self.handle_raw_data(event)
# Handle status response events
async def on_status_response(event):
if self.debug:
self.logger.debug(f"STATUS_RESPONSE event received: {event}")
# Log the status data to see what's available
if hasattr(event, 'payload') and event.payload:
self.logger.debug(f"Status data: {event.payload}")
# Subscribe to events
self.meshcore.subscribe(EventType.RX_LOG_DATA, on_rf_data)
self.meshcore.subscribe(EventType.RAW_DATA, on_raw_data)
self.meshcore.subscribe(EventType.STATUS_RESPONSE, on_status_response)
# Setup disconnect handler
await self.setup_disconnect_handler()
self.logger.info("Event handlers setup complete")
# Note: Packet capture mode is automatically enabled when subscribing to events
self.logger.info("Packet capture mode enabled via event subscriptions")
async def start(self):
"""Start packet capture"""
self.logger.info("Starting MeshCore Packet Capture...")
# Connect to MeshCore node
if not await self.connect():
self.logger.error("Failed to connect to MeshCore node")
return
# Connect to MQTT broker if enabled
if self.enable_mqtt:
if not await self.connect_mqtt():
self.logger.warning("Failed to connect to MQTT broker, continuing without MQTT...")
else:
self.logger.info("MQTT disabled, skipping MQTT connection")
# Setup event handlers
await self.setup_event_handlers()
# Start auto message fetching (optional; see PACKETCAPTURE_DRAIN_MESSAGES)
await self._start_auto_message_fetching_if_enabled()
self.logger.info("Packet capture is running. Press Ctrl+C to stop.")
self.logger.info("Waiting for packets...")
# Start connection monitoring task (delay to allow MQTT connections to stabilize)
await asyncio.sleep(5) # Give MQTT connections time to fully establish
monitoring_task = asyncio.create_task(self.connection_monitor())
# Start advert scheduler task
if self.advert_interval_hours > 0:
self.advert_task = asyncio.create_task(self.advert_scheduler())
# Start JWT renewal scheduler task
if self.jwt_renewal_interval > 0:
self.jwt_renewal_task = asyncio.create_task(self.jwt_renewal_scheduler())
# Start stats refresh scheduler
if self.stats_status_enabled and self.stats_refresh_interval > 0:
self.stats_update_task = asyncio.create_task(self.stats_refresh_scheduler())
try:
while not self.should_exit:
current_time = time.time()
# Check if we should exit for systemd restart
if self.should_exit_for_systemd_restart():
self.logger.critical("Service failure threshold reached - exiting for systemd restart")
self.should_exit = True
# Monitor active tasks to prevent explosion
if current_time - self.last_task_check >= self.task_monitoring_interval:
active_count = len(self.active_tasks)
if active_count > self.max_active_tasks:
self.logger.warning(f"Too many active tasks ({active_count}), cleaning up...")
# Cancel excess tasks
tasks_to_cancel = list(self.active_tasks)[self.max_active_tasks:]
for task in tasks_to_cancel:
task.cancel()
self.active_tasks.discard(task)
self.last_task_check = current_time
# Use shutdown-aware waiting
if await self.wait_with_shutdown(5):
break # Shutdown was requested
except KeyboardInterrupt:
self.logger.info("Received interrupt signal")
finally:
# Cancel all active tasks
monitoring_task.cancel()
if self.advert_task:
self.advert_task.cancel()
if self.jwt_renewal_task:
self.jwt_renewal_task.cancel()
if self.stats_update_task:
self.stats_update_task.cancel()
# Cancel all tracked active tasks
for task in self.active_tasks.copy():
task.cancel()
# Wait for all tasks to complete
try:
await monitoring_task
except asyncio.CancelledError:
pass
if self.advert_task:
try:
await self.advert_task
except asyncio.CancelledError:
pass
if self.jwt_renewal_task:
try:
await self.jwt_renewal_task
except asyncio.CancelledError:
pass
if self.stats_update_task:
try:
await self.stats_update_task
except asyncio.CancelledError:
pass
# Wait for all active tasks to complete
if self.active_tasks:
await asyncio.gather(*self.active_tasks, return_exceptions=True)
await self.stop()
async def stop(self):
"""Stop packet capture with timeout"""
self.logger.info("Stopping packet capture...")
self.connected = False
try:
# Publish offline status with timeout
if self.enable_mqtt and self.mqtt_connected:
await asyncio.wait_for(self.publish_status("offline", refresh_stats=False), timeout=5.0)
except asyncio.TimeoutError:
self.logger.warning("Timeout publishing offline status")
except Exception as e:
self.logger.warning(f"Error publishing offline status: {e}")
# Handle BLE disconnection if using BLE connection
if self.meshcore and self.get_env('CONNECTION_TYPE', 'ble').lower() == 'ble':
try:
self.logger.info("Disconnecting BLE device...")
# Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks
self.cleanup_event_subscriptions()
# Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors
try:
self.meshcore.stop()
except Exception as e:
self.logger.debug(f"Error stopping meshcore event dispatcher: {e}")
await asyncio.wait_for(self.meshcore.disconnect(), timeout=10.0)
# Additional BLE disconnection using bluetoothctl on Linux
import platform
if platform.system() == 'Linux':
try:
import subprocess
ble_device = self.get_env('BLE_DEVICE', '') or self.get_env('BLE_ADDRESS', '')
if ble_device and ble_device != 'Unknown':
self.logger.info(f"Force disconnecting BLE device {ble_device}...")
subprocess.run(['bluetoothctl', 'disconnect', ble_device],
capture_output=True, timeout=10)
await asyncio.sleep(1) # Give time for disconnection
except Exception as e:
self.logger.debug(f"Could not force BLE disconnect via bluetoothctl: {e}")
else:
# On non-Linux systems, add a short delay to ensure BLE cleanup completes
await asyncio.sleep(0.5)
except asyncio.TimeoutError:
self.logger.warning("Timeout disconnecting BLE device")
except Exception as e:
self.logger.warning(f"Error during BLE disconnection: {e}")
elif self.meshcore:
try:
# Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks
self.cleanup_event_subscriptions()
# Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors
try:
self.meshcore.stop()
except Exception as e:
self.logger.debug(f"Error stopping meshcore event dispatcher: {e}")
await asyncio.wait_for(self.meshcore.disconnect(), timeout=5.0)
except asyncio.TimeoutError:
self.logger.warning("Timeout disconnecting MeshCore device")
except Exception as e:
self.logger.warning(f"Error disconnecting MeshCore device: {e}")
for mqtt_client_info in self.mqtt_clients:
try:
mqtt_client_info['client'].disconnect()
mqtt_client_info['client'].loop_stop()
except:
pass
if self.output_handle:
self.output_handle.close()
self.logger.info(f"Packet capture stopped. Total packets captured: {self.packet_count}")
async def send_advert(self):
"""Send a flood advert using meshcore commands"""
try:
if not self._ensure_connected("send_advert", "warning"):
return False
self.logger.info("Sending flood advert...")
await self.meshcore.commands.send_advert(flood=True)
self.last_advert_time = time.time()
self._save_advert_state() # Persist the timestamp
self.logger.info("Flood advert sent successfully!")
return True
except Exception as e:
self.logger.error(f"Error sending flood advert: {e}")
return False
async def advert_scheduler(self):
"""Background task to send adverts at configured intervals"""
if self.advert_interval_hours <= 0:
if self.debug:
self.logger.debug("Advert scheduling disabled (interval = 0)")
return
if self.debug:
self.logger.debug(f"Starting advert scheduler with {self.advert_interval_hours} hour interval")
while not self.should_exit:
try:
# Calculate seconds until next advert
current_time = time.time()
time_since_last = current_time - self.last_advert_time
interval_seconds = self.advert_interval_hours * 3600
if time_since_last >= interval_seconds:
# Time to send an advert
await self.send_advert()
# Sleep for the full interval to avoid rapid-fire adverts
if await self.wait_with_shutdown(interval_seconds):
break # Shutdown was requested
else:
# Sleep until it's time for the next advert
sleep_time = interval_seconds - time_since_last
if self.debug:
self.logger.debug(f"Next advert in {sleep_time/3600:.1f} hours")
if await self.wait_with_shutdown(sleep_time):
break # Shutdown was requested
except asyncio.CancelledError:
if self.debug:
self.logger.debug("Advert scheduler cancelled")
break
except Exception as e:
self.logger.error(f"Error in advert scheduler: {e}")
if await self.wait_with_shutdown(60):
break # Shutdown was requested
async def jwt_renewal_scheduler(self):
"""Background task to check and renew JWT tokens"""
if self.jwt_renewal_interval <= 0:
if self.debug:
self.logger.debug("JWT renewal scheduling disabled (interval = 0)")
return
if self.debug:
self.logger.debug(f"Starting JWT renewal scheduler with {self.jwt_renewal_interval} second interval")
while not self.should_exit:
try:
if await self.wait_with_shutdown(self.jwt_renewal_interval):
break # Shutdown was requested
# Check and renew JWT tokens
await self.check_and_renew_jwt_tokens()
except asyncio.CancelledError:
if self.debug:
self.logger.debug("JWT renewal scheduler cancelled")
break
except Exception as e:
self.logger.error(f"Error in JWT renewal scheduler: {e}")
if await self.wait_with_shutdown(60):
break # Shutdown was requested
async def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description='MeshCore Packet Capture Script')
parser.add_argument('--output', help='Output file path (optional)')
parser.add_argument('--verbose', action='store_true', help='Enable verbose output (shows JSON packet data)')
parser.add_argument('--debug', action='store_true', help='Enable debug output (shows all detailed debugging info)')
parser.add_argument('--no-mqtt', action='store_true', help='Disable MQTT publishing')
args = parser.parse_args()
# Command line arguments will be handled after PacketCapture instantiation
# Setup signal handlers for graceful shutdown
import signal
# Global shutdown event for immediate response
shutdown_event = asyncio.Event()
def signal_handler(signum, frame):
capture.logger.info(f"Received signal {signum}, initiating immediate shutdown...")
capture.should_exit = True
shutdown_event.set() # Wake up all waiting tasks immediately
# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Create packet capture instance with shutdown event
capture = PacketCapture(
output_file=args.output,
verbose=args.verbose,
debug=args.debug,
enable_mqtt=not args.no_mqtt,
shutdown_event=shutdown_event
)
# Command line arguments override environment variable
if args.debug:
capture.logger.setLevel(logging.DEBUG)
elif args.verbose:
capture.logger.setLevel(logging.INFO)
# If neither debug nor verbose specified, use environment variable (already set in setup_logging)
try:
# Start the capture in a task so we can wait on shutdown event
capture_task = asyncio.create_task(capture.start())
# Wait for either completion or shutdown signal
done, pending = await asyncio.wait(
[capture_task, asyncio.create_task(shutdown_event.wait())],
return_when=asyncio.FIRST_COMPLETED
)
# Cancel any pending tasks
for task in pending:
task.cancel()
# If shutdown was triggered, stop the capture
if shutdown_event.is_set():
capture.logger.info("Shutdown signal received, stopping capture...")
await capture.stop()
except KeyboardInterrupt:
print("\nShutting down...")
await capture.stop()
except Exception as e:
print(f"Error: {e}")
await capture.stop()
if __name__ == "__main__":
asyncio.run(main())