From f82ed89c022302b3fc62769e1f7796f67a6567c6 Mon Sep 17 00:00:00 2001 From: agessaman Date: Fri, 7 Nov 2025 22:44:43 -0800 Subject: [PATCH] Add support for new statistics event types and commands --- examples/ble_stats.py | 72 +++++++++++++++++++++++++++++++++ src/meshcore/commands/device.py | 12 ++++++ src/meshcore/events.py | 3 ++ src/meshcore/packets.py | 3 ++ src/meshcore/reader.py | 33 +++++++++++++++ 5 files changed, 123 insertions(+) create mode 100755 examples/ble_stats.py diff --git a/examples/ble_stats.py b/examples/ble_stats.py new file mode 100755 index 0000000..91bff5e --- /dev/null +++ b/examples/ble_stats.py @@ -0,0 +1,72 @@ +#!/usr/bin/python + +import asyncio +import argparse +import json +from meshcore import MeshCore, EventType + +DEFAULT_ADDRESS = "MeshCore-123456789" # Default BLE address or name + +async def main(): + parser = argparse.ArgumentParser(description="Read statistics from MeshCore device via BLE") + parser.add_argument("-a", "--address", default=DEFAULT_ADDRESS, + help="BLE device address or name (default: %(default)s)") + parser.add_argument("-p", "--pin", type=int, default=None, + help="PIN for BLE pairing (optional)") + args = parser.parse_args() + + print(f"Connecting to BLE device: {args.address}") + if args.pin: + print(f"Using PIN pairing: {args.pin}") + mc = await MeshCore.create_ble(args.address, pin=str(args.pin)) + else: + mc = await MeshCore.create_ble(args.address) + + print("Connected successfully!\n") + + try: + # Get core statistics + print("Fetching core statistics...") + result = await mc.commands.get_stats_core() + if result.type == EventType.ERROR: + print(f"❌ Error getting core stats: {result.payload}") + else: + print("📊 Core Statistics:") + print(json.dumps(result.payload, indent=2)) + print() + + # Get radio statistics + print("Fetching radio statistics...") + result = await mc.commands.get_stats_radio() + if result.type == EventType.ERROR: + print(f"❌ Error getting radio stats: {result.payload}") + else: + print("📡 Radio Statistics:") + print(json.dumps(result.payload, indent=2)) + print() + + # Get packet statistics + print("Fetching packet statistics...") + result = await mc.commands.get_stats_packets() + if result.type == EventType.ERROR: + print(f"❌ Error getting packet stats: {result.payload}") + else: + print("📦 Packet Statistics:") + print(json.dumps(result.payload, indent=2)) + print() + + except Exception as e: + print(f"❌ Error: {e}") + finally: + print("Disconnecting...") + await mc.disconnect() + print("Disconnected.") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nExited cleanly") + except Exception as e: + print(f"Error: {e}") + diff --git a/src/meshcore/commands/device.py b/src/meshcore/commands/device.py index 06be335..f04a6c1 100644 --- a/src/meshcore/commands/device.py +++ b/src/meshcore/commands/device.py @@ -205,3 +205,15 @@ class DeviceCommands(CommandHandlerBase): async def export_private_key(self) -> Event: logger.debug("Requesting private key export") return await self.send(b"\x17", [EventType.PRIVATE_KEY, EventType.DISABLED, EventType.ERROR]) + + async def get_stats_core(self) -> Event: + logger.debug("Getting core statistics") + return await self.send(b"\x38", [EventType.STATS_CORE, EventType.ERROR]) + + async def get_stats_radio(self) -> Event: + logger.debug("Getting radio statistics") + return await self.send(b"\x39", [EventType.STATS_RADIO, EventType.ERROR]) + + async def get_stats_packets(self) -> Event: + logger.debug("Getting packet statistics") + return await self.send(b"\x3a", [EventType.STATS_PACKETS, EventType.ERROR]) diff --git a/src/meshcore/events.py b/src/meshcore/events.py index fb889c0..30bd7b7 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -40,6 +40,9 @@ class EventType(Enum): MMA_RESPONSE = "mma_response" ACL_RESPONSE = "acl_response" CUSTOM_VARS = "custom_vars" + STATS_CORE = "stats_core" + STATS_RADIO = "stats_radio" + STATS_PACKETS = "stats_packets" CHANNEL_INFO = "channel_info" PATH_RESPONSE = "path_response" PRIVATE_KEY = "private_key" diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py index 62f45ee..e14ceab 100644 --- a/src/meshcore/packets.py +++ b/src/meshcore/packets.py @@ -35,6 +35,9 @@ class PacketType(Enum): SIGN_START = 19 SIGNATURE = 20 CUSTOM_VARS = 21 + STATS_CORE = 24 + STATS_RADIO = 25 + STATS_PACKETS = 26 BINARY_REQ = 50 FACTORY_RESET = 51 PATH_DISCOVERY = 52 diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index fdd878a..cfabcdb 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -285,6 +285,39 @@ class MessageReader: logger.debug(f"got custom vars : {res}") await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) + elif packet_type_value == PacketType.STATS_CORE.value: + logger.debug(f"received stats core response: {data.hex()}") + rawdata = dbuf.read().decode("utf-8", "ignore") + try: + res = json.loads(rawdata) + logger.debug(f"parsed stats core: {res}") + await self.dispatcher.dispatch(Event(EventType.STATS_CORE, res)) + except json.JSONDecodeError as e: + logger.error(f"Error parsing stats core JSON: {e}, raw data: {rawdata}") + await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"json_parse_error: {e}"})) + + elif packet_type_value == PacketType.STATS_RADIO.value: + logger.debug(f"received stats radio response: {data.hex()}") + rawdata = dbuf.read().decode("utf-8", "ignore") + try: + res = json.loads(rawdata) + logger.debug(f"parsed stats radio: {res}") + await self.dispatcher.dispatch(Event(EventType.STATS_RADIO, res)) + except json.JSONDecodeError as e: + logger.error(f"Error parsing stats radio JSON: {e}, raw data: {rawdata}") + await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"json_parse_error: {e}"})) + + elif packet_type_value == PacketType.STATS_PACKETS.value: + logger.debug(f"received stats packets response: {data.hex()}") + rawdata = dbuf.read().decode("utf-8", "ignore") + try: + res = json.loads(rawdata) + logger.debug(f"parsed stats packets: {res}") + await self.dispatcher.dispatch(Event(EventType.STATS_PACKETS, res)) + except json.JSONDecodeError as e: + logger.error(f"Error parsing stats packets JSON: {e}, raw data: {rawdata}") + await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"json_parse_error: {e}"})) + elif packet_type_value == PacketType.CHANNEL_INFO.value: logger.debug(f"received channel info response: {data.hex()}") res = {}