mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-04-20 22:13:49 +00:00
Add support for new statistics event types and commands
This commit is contained in:
parent
a61616297e
commit
f82ed89c02
5 changed files with 123 additions and 0 deletions
72
examples/ble_stats.py
Executable file
72
examples/ble_stats.py
Executable file
|
|
@ -0,0 +1,72 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import asyncio
|
||||
import argparse
|
||||
import json
|
||||
from meshcore import MeshCore, EventType
|
||||
|
||||
DEFAULT_ADDRESS = "MeshCore-123456789" # Default BLE address or name
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="Read statistics from MeshCore device via BLE")
|
||||
parser.add_argument("-a", "--address", default=DEFAULT_ADDRESS,
|
||||
help="BLE device address or name (default: %(default)s)")
|
||||
parser.add_argument("-p", "--pin", type=int, default=None,
|
||||
help="PIN for BLE pairing (optional)")
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Connecting to BLE device: {args.address}")
|
||||
if args.pin:
|
||||
print(f"Using PIN pairing: {args.pin}")
|
||||
mc = await MeshCore.create_ble(args.address, pin=str(args.pin))
|
||||
else:
|
||||
mc = await MeshCore.create_ble(args.address)
|
||||
|
||||
print("Connected successfully!\n")
|
||||
|
||||
try:
|
||||
# Get core statistics
|
||||
print("Fetching core statistics...")
|
||||
result = await mc.commands.get_stats_core()
|
||||
if result.type == EventType.ERROR:
|
||||
print(f"❌ Error getting core stats: {result.payload}")
|
||||
else:
|
||||
print("📊 Core Statistics:")
|
||||
print(json.dumps(result.payload, indent=2))
|
||||
print()
|
||||
|
||||
# Get radio statistics
|
||||
print("Fetching radio statistics...")
|
||||
result = await mc.commands.get_stats_radio()
|
||||
if result.type == EventType.ERROR:
|
||||
print(f"❌ Error getting radio stats: {result.payload}")
|
||||
else:
|
||||
print("📡 Radio Statistics:")
|
||||
print(json.dumps(result.payload, indent=2))
|
||||
print()
|
||||
|
||||
# Get packet statistics
|
||||
print("Fetching packet statistics...")
|
||||
result = await mc.commands.get_stats_packets()
|
||||
if result.type == EventType.ERROR:
|
||||
print(f"❌ Error getting packet stats: {result.payload}")
|
||||
else:
|
||||
print("📦 Packet Statistics:")
|
||||
print(json.dumps(result.payload, indent=2))
|
||||
print()
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error: {e}")
|
||||
finally:
|
||||
print("Disconnecting...")
|
||||
await mc.disconnect()
|
||||
print("Disconnected.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
print("\nExited cleanly")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
|
|
@ -205,3 +205,15 @@ class DeviceCommands(CommandHandlerBase):
|
|||
async def export_private_key(self) -> Event:
|
||||
logger.debug("Requesting private key export")
|
||||
return await self.send(b"\x17", [EventType.PRIVATE_KEY, EventType.DISABLED, EventType.ERROR])
|
||||
|
||||
async def get_stats_core(self) -> Event:
|
||||
logger.debug("Getting core statistics")
|
||||
return await self.send(b"\x38", [EventType.STATS_CORE, EventType.ERROR])
|
||||
|
||||
async def get_stats_radio(self) -> Event:
|
||||
logger.debug("Getting radio statistics")
|
||||
return await self.send(b"\x39", [EventType.STATS_RADIO, EventType.ERROR])
|
||||
|
||||
async def get_stats_packets(self) -> Event:
|
||||
logger.debug("Getting packet statistics")
|
||||
return await self.send(b"\x3a", [EventType.STATS_PACKETS, EventType.ERROR])
|
||||
|
|
|
|||
|
|
@ -40,6 +40,9 @@ class EventType(Enum):
|
|||
MMA_RESPONSE = "mma_response"
|
||||
ACL_RESPONSE = "acl_response"
|
||||
CUSTOM_VARS = "custom_vars"
|
||||
STATS_CORE = "stats_core"
|
||||
STATS_RADIO = "stats_radio"
|
||||
STATS_PACKETS = "stats_packets"
|
||||
CHANNEL_INFO = "channel_info"
|
||||
PATH_RESPONSE = "path_response"
|
||||
PRIVATE_KEY = "private_key"
|
||||
|
|
|
|||
|
|
@ -35,6 +35,9 @@ class PacketType(Enum):
|
|||
SIGN_START = 19
|
||||
SIGNATURE = 20
|
||||
CUSTOM_VARS = 21
|
||||
STATS_CORE = 24
|
||||
STATS_RADIO = 25
|
||||
STATS_PACKETS = 26
|
||||
BINARY_REQ = 50
|
||||
FACTORY_RESET = 51
|
||||
PATH_DISCOVERY = 52
|
||||
|
|
|
|||
|
|
@ -285,6 +285,39 @@ class MessageReader:
|
|||
logger.debug(f"got custom vars : {res}")
|
||||
await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res))
|
||||
|
||||
elif packet_type_value == PacketType.STATS_CORE.value:
|
||||
logger.debug(f"received stats core response: {data.hex()}")
|
||||
rawdata = dbuf.read().decode("utf-8", "ignore")
|
||||
try:
|
||||
res = json.loads(rawdata)
|
||||
logger.debug(f"parsed stats core: {res}")
|
||||
await self.dispatcher.dispatch(Event(EventType.STATS_CORE, res))
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error parsing stats core JSON: {e}, raw data: {rawdata}")
|
||||
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"json_parse_error: {e}"}))
|
||||
|
||||
elif packet_type_value == PacketType.STATS_RADIO.value:
|
||||
logger.debug(f"received stats radio response: {data.hex()}")
|
||||
rawdata = dbuf.read().decode("utf-8", "ignore")
|
||||
try:
|
||||
res = json.loads(rawdata)
|
||||
logger.debug(f"parsed stats radio: {res}")
|
||||
await self.dispatcher.dispatch(Event(EventType.STATS_RADIO, res))
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error parsing stats radio JSON: {e}, raw data: {rawdata}")
|
||||
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"json_parse_error: {e}"}))
|
||||
|
||||
elif packet_type_value == PacketType.STATS_PACKETS.value:
|
||||
logger.debug(f"received stats packets response: {data.hex()}")
|
||||
rawdata = dbuf.read().decode("utf-8", "ignore")
|
||||
try:
|
||||
res = json.loads(rawdata)
|
||||
logger.debug(f"parsed stats packets: {res}")
|
||||
await self.dispatcher.dispatch(Event(EventType.STATS_PACKETS, res))
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error parsing stats packets JSON: {e}, raw data: {rawdata}")
|
||||
await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": f"json_parse_error: {e}"}))
|
||||
|
||||
elif packet_type_value == PacketType.CHANNEL_INFO.value:
|
||||
logger.debug(f"received channel info response: {data.hex()}")
|
||||
res = {}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue