Merge pull request #34 from agessaman/main

Companion Nodes Stats
This commit is contained in:
fdlamotte 2025-11-24 10:38:04 +01:00 committed by GitHub
commit 3220c4196d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 175 additions and 0 deletions

72
examples/ble_stats.py Executable file
View file

@ -0,0 +1,72 @@
#!/usr/bin/python
import asyncio
import argparse
import json
from meshcore import MeshCore, EventType
DEFAULT_ADDRESS = "MeshCore-123456789" # Default BLE address or name
async def main():
parser = argparse.ArgumentParser(description="Read statistics from MeshCore device via BLE")
parser.add_argument("-a", "--address", default=DEFAULT_ADDRESS,
help="BLE device address or name (default: %(default)s)")
parser.add_argument("-p", "--pin", type=int, default=None,
help="PIN for BLE pairing (optional)")
args = parser.parse_args()
print(f"Connecting to BLE device: {args.address}")
if args.pin:
print(f"Using PIN pairing: {args.pin}")
mc = await MeshCore.create_ble(args.address, pin=str(args.pin))
else:
mc = await MeshCore.create_ble(args.address)
print("Connected successfully!\n")
try:
# Get core statistics
print("Fetching core statistics...")
result = await mc.commands.get_stats_core()
if result.type == EventType.ERROR:
print(f"❌ Error getting core stats: {result.payload}")
else:
print("📊 Core Statistics:")
print(json.dumps(result.payload, indent=2))
print()
# Get radio statistics
print("Fetching radio statistics...")
result = await mc.commands.get_stats_radio()
if result.type == EventType.ERROR:
print(f"❌ Error getting radio stats: {result.payload}")
else:
print("📡 Radio Statistics:")
print(json.dumps(result.payload, indent=2))
print()
# Get packet statistics
print("Fetching packet statistics...")
result = await mc.commands.get_stats_packets()
if result.type == EventType.ERROR:
print(f"❌ Error getting packet stats: {result.payload}")
else:
print("📦 Packet Statistics:")
print(json.dumps(result.payload, indent=2))
print()
except Exception as e:
print(f"❌ Error: {e}")
finally:
print("Disconnecting...")
await mc.disconnect()
print("Disconnected.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nExited cleanly")
except Exception as e:
print(f"Error: {e}")

View file

@ -205,3 +205,18 @@ class DeviceCommands(CommandHandlerBase):
async def export_private_key(self) -> Event:
logger.debug("Requesting private key export")
return await self.send(b"\x17", [EventType.PRIVATE_KEY, EventType.DISABLED, EventType.ERROR])
async def get_stats_core(self) -> Event:
logger.debug("Getting core statistics")
# CMD_GET_STATS (56) + STATS_TYPE_CORE (0)
return await self.send(b"\x38\x00", [EventType.STATS_CORE, EventType.ERROR])
async def get_stats_radio(self) -> Event:
logger.debug("Getting radio statistics")
# CMD_GET_STATS (56) + STATS_TYPE_RADIO (1)
return await self.send(b"\x38\x01", [EventType.STATS_RADIO, EventType.ERROR])
async def get_stats_packets(self) -> Event:
logger.debug("Getting packet statistics")
# CMD_GET_STATS (56) + STATS_TYPE_PACKETS (2)
return await self.send(b"\x38\x02", [EventType.STATS_PACKETS, EventType.ERROR])

View file

@ -40,6 +40,9 @@ class EventType(Enum):
MMA_RESPONSE = "mma_response"
ACL_RESPONSE = "acl_response"
CUSTOM_VARS = "custom_vars"
STATS_CORE = "stats_core"
STATS_RADIO = "stats_radio"
STATS_PACKETS = "stats_packets"
CHANNEL_INFO = "channel_info"
PATH_RESPONSE = "path_response"
PRIVATE_KEY = "private_key"

View file

@ -36,6 +36,9 @@ class PacketType(Enum):
SIGN_START = 19
SIGNATURE = 20
CUSTOM_VARS = 21
STATS_CORE = 24
STATS_RADIO = 25
STATS_PACKETS = 26
BINARY_REQ = 50
FACTORY_RESET = 51
PATH_DISCOVERY = 52

View file

@ -1,5 +1,6 @@
import logging
import json
import struct
import time
import io
from typing import Any, Dict
@ -290,6 +291,87 @@ class MessageReader:
logger.debug(f"got custom vars : {res}")
await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res))
elif packet_type_value == PacketType.STATS_CORE.value: # RESP_CODE_STATS (24)
logger.debug(f"received stats response: {data.hex()}")
# RESP_CODE_STATS: All stats responses use code 24 with sub-type byte
# Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets)
if len(data) < 2:
logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"}))
return
stats_type = data[1]
if stats_type == 0: # STATS_TYPE_CORE
# RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total
# Format: <B B H I H B (response_code, stats_type, battery_mv, uptime_secs, errors, queue_len)
if len(data) < 11:
logger.error(f"Stats core response too short: {len(data)} bytes, expected 11")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"}))
else:
try:
battery_mv, uptime_secs, errors, queue_len = struct.unpack('<H I H B', data[2:11])
res = {
'battery_mv': battery_mv,
'uptime_secs': uptime_secs,
'errors': errors,
'queue_len': queue_len
}
logger.debug(f"parsed stats core: {res}")
await self.dispatcher.dispatch(Event(EventType.STATS_CORE, res))
except struct.error as e:
logger.error(f"Error parsing stats core binary frame: {e}, data: {data.hex()}")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"binary_parse_error: {e}"}))
elif stats_type == 1: # STATS_TYPE_RADIO
# RESP_CODE_STATS + STATS_TYPE_RADIO: 14 bytes total
# Format: <B B h b b I I (response_code, stats_type, noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs)
if len(data) < 14:
logger.error(f"Stats radio response too short: {len(data)} bytes, expected 14")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"}))
else:
try:
noise_floor, last_rssi, last_snr_scaled, tx_air_secs, rx_air_secs = struct.unpack('<h b b I I', data[2:14])
res = {
'noise_floor': noise_floor,
'last_rssi': last_rssi,
'last_snr': last_snr_scaled / 4.0, # Unscale SNR (was multiplied by 4)
'tx_air_secs': tx_air_secs,
'rx_air_secs': rx_air_secs
}
logger.debug(f"parsed stats radio: {res}")
await self.dispatcher.dispatch(Event(EventType.STATS_RADIO, res))
except struct.error as e:
logger.error(f"Error parsing stats radio binary frame: {e}, data: {data.hex()}")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"binary_parse_error: {e}"}))
elif stats_type == 2: # STATS_TYPE_PACKETS
# RESP_CODE_STATS + STATS_TYPE_PACKETS: 26 bytes total
# Format: <B B I I I I I I (response_code, stats_type, recv, sent, flood_tx, direct_tx, flood_rx, direct_rx)
if len(data) < 26:
logger.error(f"Stats packets response too short: {len(data)} bytes, expected 26")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"}))
else:
try:
recv, sent, flood_tx, direct_tx, flood_rx, direct_rx = struct.unpack('<I I I I I I', data[2:26])
res = {
'recv': recv,
'sent': sent,
'flood_tx': flood_tx,
'direct_tx': direct_tx,
'flood_rx': flood_rx,
'direct_rx': direct_rx
}
logger.debug(f"parsed stats packets: {res}")
await self.dispatcher.dispatch(Event(EventType.STATS_PACKETS, res))
except struct.error as e:
logger.error(f"Error parsing stats packets binary frame: {e}, data: {data.hex()}")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"binary_parse_error: {e}"}))
else:
logger.error(f"Unknown stats type: {stats_type}, data: {data.hex()}")
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"unknown_stats_type: {stats_type}"}))
elif packet_type_value == PacketType.CHANNEL_INFO.value:
logger.debug(f"received channel info response: {data.hex()}")
res = {}