Add status telemetry stats support to PacketCapture.

This commit is contained in:
agessaman 2025-11-24 09:49:03 -08:00
parent 83904e9a47
commit 8407eeec70
3 changed files with 157 additions and 5 deletions

View file

@ -39,6 +39,7 @@ bash <(curl -fsSL https://raw.githubusercontent.com/agessaman/meshcore-packet-ca
- **Connection Types**: Supports BLE, serial, and TCP connections to Companion radios
- **Packet Analysis**: Parses packet headers, routes, payloads, and metadata
- **RF Data**: Captures signal quality metrics (SNR, RSSI)
- **Status Telemetry Stats**: MQTT status messages optionally contain battery/uptime/radio metrics
- **Multi-Broker MQTT**: Supports up to 4 MQTT brokers simultaneously
- **Auth Token Authentication**: JWT-based authentication using device private key
- **TLS/WebSocket Support**: Secure connections with TLS/SSL and WebSocket transport
@ -48,7 +49,7 @@ bash <(curl -fsSL https://raw.githubusercontent.com/agessaman/meshcore-packet-ca
## Requirements
- Python 3.7+
- `meshcore` package (official MeshCore Python library)
- `meshcore` package (official MeshCore Python library). **Note:** Until the next PyPI release ships, this project pins to commit `3220c419` from [`meshcore_py`](https://github.com/meshcore-dev/meshcore_py) so we can consume the new stats APIs.
- `paho-mqtt` package (for MQTT functionality)
**Note**: For Docker deployment, this application is best deployed on Linux systems due to Bluetooth Low Energy (BLE) and serial device access requirements. While Docker containers can run on macOS and Windows, BLE functionality may be limited or require additional configuration.
@ -116,6 +117,12 @@ Configuration is handled via environment variables and `.env` files. The install
- `PACKETCAPTURE_LOG_LEVEL`: Log level (`DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`) - default: `INFO`
- Command line arguments (`--debug`, `--verbose`) override this setting
#### Status Telemetry / Stats
- `PACKETCAPTURE_STATS_IN_STATUS_ENABLED`: Toggle stat collection in status payloads (default: `true`)
- `PACKETCAPTURE_STATS_REFRESH_INTERVAL`: Seconds between stat refreshes/status republishes (default: `300`, i.e. 5 minutes)
When enabled, status messages published to MQTT include a `stats` object with battery, uptime, queue depth, and radio runtime metrics refreshed at the configured cadence.
#### MQTT Settings
The script supports up to 4 MQTT brokers (MQTT1, MQTT2, MQTT3, MQTT4). Each broker can be configured independently:

View file

@ -232,6 +232,16 @@ class PacketCapture:
self.mqtt_connected = False
self.should_exit = False # Flag to exit when reconnection attempts fail
# Stats/status publishing
self.stats_status_enabled = self.get_env_bool('STATS_IN_STATUS_ENABLED', True)
self.stats_refresh_interval = self.get_env_int('STATS_REFRESH_INTERVAL', 300) # seconds
self.latest_stats = None
self.last_stats_fetch = 0
self.stats_supported = False
self.stats_capability_state = None
self.stats_update_task = None
self.stats_fetch_lock = asyncio.Lock()
# Service-level failure tracking for systemd restart
self.service_failure_count = 0
self.max_service_failures = self.get_env_int('MAX_SERVICE_FAILURES', 3)
@ -1788,7 +1798,7 @@ class PacketCapture:
async def publish_status(self, status, client=None, broker_num=None):
async def publish_status(self, status, client=None, broker_num=None, refresh_stats=True):
"""Publish status with additional information"""
firmware_info = await self.get_firmware_info()
status_msg = {
@ -1801,6 +1811,26 @@ class PacketCapture:
"radio": self.radio_info or "unknown",
"client_version": self._load_client_version()
}
# Attach stats (online status only) if supported and enabled
if (
status.lower() == "online"
and self.stats_status_enabled
):
stats_payload = None
if refresh_stats:
# Always force refresh stats right before publishing to ensure fresh data
stats_payload = await self.refresh_stats(force=True)
if not stats_payload:
self.logger.debug("Stats refresh returned no data - stats will not be included in status message")
elif self.latest_stats:
stats_payload = dict(self.latest_stats)
if stats_payload:
status_msg["stats"] = stats_payload
elif self.debug:
self.logger.debug("No stats payload available - status message will not include stats")
if client:
self.safe_publish(None, json.dumps(status_msg), retain=True, client=client, broker_num=broker_num, topic_type="status")
else:
@ -1808,6 +1838,110 @@ class PacketCapture:
if self.debug:
self.logger.debug(f"Published status: {status}")
def stats_commands_available(self) -> bool:
"""Detect whether the connected meshcore build exposes stats commands."""
if not self.meshcore or not hasattr(self.meshcore, "commands"):
return False
commands = self.meshcore.commands
required = ["get_stats_core", "get_stats_radio"]
available = all(callable(getattr(commands, attr, None)) for attr in required)
state = "available" if available else "missing"
if state != self.stats_capability_state:
if available:
self.logger.info("MeshCore stats commands detected - status messages will include device stats")
else:
self.logger.info("MeshCore stats commands not available - skipping stats in status messages")
self.stats_capability_state = state
self.stats_supported = available
return available
async def refresh_stats(self, force: bool = False):
"""Fetch stats from the radio and cache them for status publishing."""
if not self.stats_status_enabled:
if self.debug:
self.logger.debug("Stats refresh skipped: stats_status_enabled is False")
return None
if not self.meshcore or not self.meshcore.is_connected:
if self.debug:
self.logger.debug("Stats refresh skipped: meshcore not connected")
return None
if self.stats_refresh_interval <= 0:
if self.debug:
self.logger.debug("Stats refresh skipped: stats_refresh_interval is 0 or negative")
return None
if not self.stats_commands_available():
if self.debug:
self.logger.debug("Stats refresh skipped: stats commands not available")
return None
now = time.time()
if (
not force
and self.latest_stats
and (now - self.last_stats_fetch) < max(60, self.stats_refresh_interval // 2)
):
return dict(self.latest_stats)
async with self.stats_fetch_lock:
# Another coroutine may have completed the refresh while we waited
if (
not force
and self.latest_stats
and (time.time() - self.last_stats_fetch) < max(60, self.stats_refresh_interval // 2)
):
return dict(self.latest_stats)
stats_payload = {}
try:
core_result = await self.meshcore.commands.get_stats_core()
if core_result.type == EventType.STATS_CORE and core_result.payload:
stats_payload.update(core_result.payload)
elif core_result.type == EventType.ERROR:
self.logger.debug(f"Core stats unavailable: {core_result.payload}")
except Exception as exc:
self.logger.debug(f"Error fetching core stats: {exc}")
try:
radio_result = await self.meshcore.commands.get_stats_radio()
if radio_result.type == EventType.STATS_RADIO and radio_result.payload:
stats_payload.update(radio_result.payload)
elif radio_result.type == EventType.ERROR:
self.logger.debug(f"Radio stats unavailable: {radio_result.payload}")
except Exception as exc:
self.logger.debug(f"Error fetching radio stats: {exc}")
if stats_payload:
self.latest_stats = stats_payload
self.last_stats_fetch = time.time()
if self.debug:
self.logger.debug(f"Updated stats cache: {self.latest_stats}")
elif self.debug:
self.logger.debug("Stats refresh completed but returned no data")
return dict(self.latest_stats) if self.latest_stats else None
async def stats_refresh_scheduler(self):
"""Periodically refresh stats and publish them via MQTT."""
if self.stats_refresh_interval <= 0 or not self.stats_status_enabled:
return
while not self.should_exit:
try:
# Only fetch stats when we're about to publish status
if self.enable_mqtt and self.mqtt_connected:
await self.publish_status("online", refresh_stats=True)
except asyncio.CancelledError:
break
except Exception as exc:
self.logger.debug(f"Stats refresh error: {exc}")
if await self.wait_with_shutdown(self.stats_refresh_interval):
break
def safe_publish(self, topic, payload, retain=False, client=None, broker_num=None, topic_type=None):
"""Publish to one or all MQTT brokers and return publish metrics."""
metrics = {"attempted": 0, "succeeded": 0}
@ -2511,6 +2645,10 @@ class PacketCapture:
if self.jwt_renewal_interval > 0:
self.jwt_renewal_task = asyncio.create_task(self.jwt_renewal_scheduler())
# Start stats refresh scheduler
if self.stats_status_enabled and self.stats_refresh_interval > 0:
self.stats_update_task = asyncio.create_task(self.stats_refresh_scheduler())
try:
while not self.should_exit:
@ -2545,6 +2683,8 @@ class PacketCapture:
self.advert_task.cancel()
if self.jwt_renewal_task:
self.jwt_renewal_task.cancel()
if self.stats_update_task:
self.stats_update_task.cancel()
# Cancel all tracked active tasks
for task in self.active_tasks.copy():
@ -2565,6 +2705,11 @@ class PacketCapture:
await self.jwt_renewal_task
except asyncio.CancelledError:
pass
if self.stats_update_task:
try:
await self.stats_update_task
except asyncio.CancelledError:
pass
# Wait for all active tasks to complete
if self.active_tasks:
@ -2580,7 +2725,7 @@ class PacketCapture:
try:
# Publish offline status with timeout
if self.enable_mqtt and self.mqtt_connected:
await asyncio.wait_for(self.publish_status("offline"), timeout=5.0)
await asyncio.wait_for(self.publish_status("offline", refresh_stats=False), timeout=5.0)
except asyncio.TimeoutError:
self.logger.warning("Timeout publishing offline status")
except Exception as e:

View file

@ -1,8 +1,8 @@
# Core dependencies for MeshCore packet capture
paho-mqtt>=1.6.0
# MeshCore package - now using PyPI version
meshcore>=2.1.10
# MeshCore package - temporarily pinned to unreleased commit with stats APIs
meshcore @ git+https://github.com/meshcore-dev/meshcore_py@3220c4196df8f5ddf179e9a852b27d3a6fafc769
# MeshCore package dependencies (installed automatically with meshcore)
bleak>=0.21.0