From 8f0ecd7d75294f6389448f777d5ea7142f509e38 Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Tue, 8 Apr 2025 22:56:16 -0700 Subject: [PATCH 1/8] Refactor to event system --- .gitignore | 2 + README.md | 239 +++++++++- examples/ble_t1000_infos.py | 12 +- examples/ble_t1000_msg.py | 5 +- examples/pubsub_example.py | 117 +++++ examples/serial_battery_monitor.py | 53 +++ examples/serial_contacts.py | 3 +- examples/serial_infos.py | 16 +- examples/serial_msg.py | 12 +- examples/serial_repeater_status.py | 34 +- examples/tcp_mchome_contacts.py | 2 +- examples/tcp_mchome_infos.py | 12 +- examples/tcp_mchome_msg.py | 2 +- examples/tcp_mchome_readmsgs.py | 2 +- src/meshcore/__init__.py | 9 +- src/meshcore/ble_cx.py | 20 +- src/meshcore/commands.py | 216 +++++++++ src/meshcore/events.py | 123 ++++++ src/meshcore/meshcore.py | 684 ++++++++++++----------------- src/meshcore/meshcore_new.py | 249 +++++++++++ src/meshcore/packets.py | 30 ++ src/meshcore/reader.py | 235 ++++++++++ src/meshcore/serial_cx.py | 27 +- src/meshcore/tcp_cx.py | 21 +- 24 files changed, 1625 insertions(+), 500 deletions(-) create mode 100644 .gitignore create mode 100644 examples/pubsub_example.py create mode 100644 examples/serial_battery_monitor.py create mode 100644 src/meshcore/commands.py create mode 100644 src/meshcore/events.py create mode 100644 src/meshcore/meshcore_new.py create mode 100644 src/meshcore/packets.py create mode 100644 src/meshcore/reader.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..01d7f95 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +venv \ No newline at end of file diff --git a/README.md b/README.md index 76bb37e..b4f19cb 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,239 @@ -# Python meshcore +# Python MeshCore -Bindings to access your [MeshCore](https://meshcore.co.uk) companion radio nodes in python. +Python library for interacting with [MeshCore](https://meshcore.co.uk) companion radio nodes. -Can be installed via `pip` using `pip install meshcore` +## Installation -Used by [mccli](https://github.com/fdlamotte/mccli). +```bash +pip install meshcore +``` + +## Quick Start + +Connect to your device and send a message: + +```python +import asyncio +from meshcore import MeshCore, EventType + +async def main(): + # Connect to your device + meshcore = await MeshCore.create_serial("/dev/ttyUSB0") + + # Get your contacts + contacts = await meshcore.commands.get_contacts() + print(f"Found {len(contacts)} contacts") + + # Send a message to the first contact + if contacts: + contact_key = next(iter(contacts.items()))[1]['public_key'] + await meshcore.commands.send_msg(bytes.fromhex(contact_key), "Hello from Python!") + + await meshcore.disconnect() + +asyncio.run(main()) +``` + +## Development Setup + +To set up for development: + +```bash +# Create and activate virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install in development mode +pip install -e . + +# Run examples +python examples/pubsub_example.py -p /dev/ttyUSB0 +``` + +## Usage Guide + +### Connecting to Your Device + +Connect via Serial, BLE, or TCP: + +```python +# Serial connection +meshcore = await MeshCore.create_serial("/dev/ttyUSB0", 115200, debug=True) + +# BLE connection (scans for devices if address not provided) +meshcore = await MeshCore.create_ble("12:34:56:78:90:AB") + +# TCP connection +meshcore = await MeshCore.create_tcp("192.168.1.100", 4000) +``` + +### Using Commands (Synchronous Style) + +Send commands and wait for responses: + +```python +# Get device information +device_info = await meshcore.commands.send_device_query() +print(f"Device model: {device_info['model']}") + +# Get list of contacts +contacts = await meshcore.commands.get_contacts() +for contact_id, contact in contacts.items(): + print(f"Contact: {contact['adv_name']} ({contact_id})") + +# Send a message (destination key in bytes) +await meshcore.commands.send_msg(dst_key, "Hello!") + +# Setting device parameters +await meshcore.commands.set_name("My Device") +await meshcore.commands.set_tx_power(20) # Set transmit power +``` + +### Finding Contacts + +Easily find contacts by name or key: + +```python +# Find a contact by name +contact = meshcore.get_contact_by_name("Bob's Radio") +if contact: + print(f"Found Bob at: {contact['adv_lat']}, {contact['adv_lon']}") + +# Find by partial key prefix +contact = meshcore.get_contact_by_key_prefix("a1b2c3") +``` + +### Event-Based Programming (Asynchronous Style) + +Subscribe to events to handle them asynchronously: + +```python +# Subscribe to incoming messages +async def handle_message(event): + data = event.payload + print(f"Message from {data['pubkey_prefix']}: {data['text']}") + +subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, handle_message) + +# Subscribe to advertisements +async def handle_advert(event): + print("Advertisement detected!") + +meshcore.subscribe(EventType.ADVERTISEMENT, handle_advert) + +# When done, unsubscribe +meshcore.unsubscribe(subscription) +``` + +### Hybrid Approach (Commands + Events) + +Combine command-based and event-based styles: + +```python +import asyncio + +async def main(): + # Connect to device + meshcore = await MeshCore.create_serial("/dev/ttyUSB0") + + # Set up event handlers + async def handle_ack(event): + print("Message acknowledged!") + + async def handle_battery(event): + print(f"Battery level: {event.payload}%") + + # Subscribe to events + meshcore.subscribe(EventType.ACK, handle_ack) + meshcore.subscribe(EventType.BATTERY, handle_battery) + + # Create background task for battery checking + async def check_battery_periodically(): + while True: + # Send command (returns battery level) + result = await meshcore.commands.get_bat() + print(f"Battery check initiated, response: {result}") + await asyncio.sleep(60) # Wait 60 seconds between checks + + # Start the background task + battery_task = asyncio.create_task(check_battery_periodically()) + + # Send manual command and wait for response + await meshcore.commands.send_advert(flood=True) + + try: + # Keep the main program running + await asyncio.sleep(float('inf')) + except asyncio.CancelledError: + # Clean up when program ends + battery_task.cancel() + await meshcore.disconnect() + +# Run the program +asyncio.run(main()) +``` + +### Auto-Fetching Messages + +Let the library automatically fetch incoming messages: + +```python +# Start auto-fetching messages +await meshcore.start_auto_message_fetching() + +# Just subscribe to message events - the library handles fetching +async def on_message(event): + print(f"New message: {event.payload['text']}") + +meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_message) + +# When done +await meshcore.stop_auto_message_fetching() +``` + +### Debug Mode + +Enable debug logging for troubleshooting: + +```python +# Enable debug mode when creating the connection +meshcore = await MeshCore.create_serial("/dev/ttyUSB0", debug=True) +``` + +This logs detailed information about commands sent and events received. + +## Common Examples + +### Sending Messages to Contacts + +```python +# Get contacts and send to a specific one +contacts = await meshcore.commands.get_contacts() +for key, contact in contacts.items(): + if contact["adv_name"] == "Alice": + # Convert the hex key to bytes + dst_key = bytes.fromhex(contact["public_key"]) + await meshcore.commands.send_msg(dst_key, "Hello Alice!") + break +``` + +### Monitoring Channel Messages + +```python +# Subscribe to channel messages +async def channel_handler(event): + msg = event.payload + print(f"Channel {msg['channel_idx']}: {msg['text']}") + +meshcore.subscribe(EventType.CHANNEL_MSG_RECV, channel_handler) +``` + +## Examples in the Repo + +Check the `examples/` directory for more: + +- `pubsub_example.py`: Event subscription system with auto-fetching +- `serial_infos.py`: Quick device info retrieval +- `serial_msg.py`: Message sending and receiving +- `ble_t1000_infos.py`: BLE connections diff --git a/examples/ble_t1000_infos.py b/examples/ble_t1000_infos.py index 132da32..a48d591 100755 --- a/examples/ble_t1000_infos.py +++ b/examples/ble_t1000_infos.py @@ -3,16 +3,14 @@ import asyncio from meshcore import MeshCore -from meshcore import BLEConnection ADDRESS = "t1000" -async def main () : - con = BLEConnection(ADDRESS) - await con.connect() - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_ble(ADDRESS) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/ble_t1000_msg.py b/examples/ble_t1000_msg.py index b2a5dbd..c9ec543 100755 --- a/examples/ble_t1000_msg.py +++ b/examples/ble_t1000_msg.py @@ -1,7 +1,6 @@ #!/usr/bin/python import asyncio -import json from meshcore import MeshCore from meshcore import BLEConnection @@ -15,7 +14,7 @@ async def main () : mc = MeshCore(con) await mc.connect() - await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.get_contacts() + await mc.commands.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) asyncio.run(main()) diff --git a/examples/pubsub_example.py b/examples/pubsub_example.py new file mode 100644 index 0000000..e72d175 --- /dev/null +++ b/examples/pubsub_example.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +""" +Example demonstrating the new pub-sub system in MeshCore +Shows how to subscribe to events and use the event system +""" +import asyncio +import argparse + +from meshcore import MeshCore +from meshcore.events import EventType + + +async def message_callback(event): + print(f"\nReceived message: {event.payload['text']}") + print(f"From: {event.payload.get('pubkey_prefix', 'channel')}") + print(f"Type: {event.payload['type']}") + print(f"Timestamp: {event.payload['sender_timestamp']}") + + +async def advertisement_callback(event): + print("\nDetected advertisement") + + +async def main(): + parser = argparse.ArgumentParser(description="MeshCore Pub-Sub Example") + parser.add_argument( + "--port", "-p", help="Serial port", required=True + ) + parser.add_argument( + "--baud", "-b", type=int, help="Baud rate", default=115200 + ) + args = parser.parse_args() + + print(f"Connecting to {args.port} at {args.baud} baud...") + + # Create MeshCore instance with serial connection + meshcore = await MeshCore.create_serial(args.port, args.baud, debug=True) + + # Connection is already established + success = True + if not success: + print("Failed to connect to MeshCore device") + return + + print("Connected to MeshCore device") + + # Get contacts + contacts = await meshcore.commands.get_contacts() + if contacts: + print(f"\nFound {len(contacts)} contacts:") + for name, contact in contacts.items(): + print(f"- {name}") + + + await meshcore.commands.send_advert(flood=True) + + # Subscribe to private messages + private_subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, message_callback) + + # Subscribe to channel messages + channel_subscription = meshcore.subscribe(EventType.CHANNEL_MSG_RECV, message_callback) + + # Subscribe to advertisements + advert_subscription = meshcore.subscribe(EventType.ADVERTISEMENT, advertisement_callback) + + await meshcore.start_auto_message_fetching() + + print("\nSubscribed to events:") + print("- Private messages") + print("- Channel messages") + print("- Advertisements") + + print("\nWaiting for events. Press Ctrl+C to exit...\n") + + # Get device info + device_info = await meshcore.commands.send_device_query() + if device_info: + print(f"Device info: {device_info}") + + # Get time from the device + device_time = await meshcore.commands.get_time() + print(f"Device time: {device_time}") + + # Access current time through the property + print(f"Current time (property): {meshcore.time}") + + try: + while True: + # Wait for messages + await asyncio.sleep(1) + except KeyboardInterrupt: + meshcore.stop() + print("\nExiting...") + except asyncio.CancelledError: + # Handle task cancellation from KeyboardInterrupt in asyncio.run() + print("\nTask cancelled - cleaning up...") + finally: + # Clean up subscriptions + meshcore.unsubscribe(private_subscription) + meshcore.unsubscribe(channel_subscription) + meshcore.unsubscribe(advert_subscription) + + # Stop auto-message fetching + await meshcore.stop_auto_message_fetching() + + # Disconnect + await meshcore.disconnect() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + # This prevents the KeyboardInterrupt traceback from being shown + print("\nExited cleanly") + except Exception as e: + print(f"Error: {e}") \ No newline at end of file diff --git a/examples/serial_battery_monitor.py b/examples/serial_battery_monitor.py new file mode 100644 index 0000000..6343f4c --- /dev/null +++ b/examples/serial_battery_monitor.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +""" +Simple example of eventing approach with battery monitoring +""" +import asyncio +import argparse + +from meshcore import MeshCore +from meshcore.events import EventType + + +async def main(): + parser = argparse.ArgumentParser(description="Battery Monitor Example") + parser.add_argument("--port", "-p", help="Serial port", required=True) + parser.add_argument("--baud", "-b", type=int, help="Baud rate", default=115200) + args = parser.parse_args() + + print(f"Connecting to {args.port}...") + + # Connect to device + meshcore = await MeshCore.create_serial(args.port, args.baud) + + # Event handler for battery updates + async def on_battery(event): + print(f"Battery event: {event.payload}mV") + + # Subscribe to battery events + meshcore.subscribe(EventType.BATTERY, on_battery) + + # Background task that checks battery every 10 seconds + async def check_battery(): + while True: + print("Requesting battery level...") + await meshcore.commands.get_bat() # This command will trigger a battery event + await asyncio.sleep(10) + + # Start the battery check task + task = asyncio.create_task(check_battery()) + + try: + # Keep program running + print("Monitoring battery (press Ctrl+C to exit)...") + await asyncio.sleep(float('inf')) + except KeyboardInterrupt: + print("\nExiting...") + finally: + # Clean up + task.cancel() + await meshcore.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/serial_contacts.py b/examples/serial_contacts.py index 2ab5c76..9b07a02 100755 --- a/examples/serial_contacts.py +++ b/examples/serial_contacts.py @@ -2,6 +2,7 @@ import asyncio import json + from meshcore import MeshCore from meshcore import SerialConnection @@ -15,6 +16,6 @@ async def main () : mc = MeshCore(con) await mc.connect() - print(json.dumps(await mc.get_contacts(),indent=4)) + print(json.dumps(await mc.commands.get_contacts(),indent=4)) asyncio.run(main()) diff --git a/examples/serial_infos.py b/examples/serial_infos.py index cb41be5..7e8000d 100755 --- a/examples/serial_infos.py +++ b/examples/serial_infos.py @@ -3,19 +3,15 @@ import asyncio from meshcore import MeshCore -from meshcore import SerialConnection -PORT = "/dev/ttyUSB0" +PORT = "/dev/tty.usbserial-583A0069501" BAUDRATE = 115200 -async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish - - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_serial(PORT, BAUDRATE) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/serial_msg.py b/examples/serial_msg.py index 6a093ec..f532c38 100755 --- a/examples/serial_msg.py +++ b/examples/serial_msg.py @@ -1,9 +1,7 @@ #!/usr/bin/python import asyncio -import json from meshcore import MeshCore -from meshcore import SerialConnection PORT = "/dev/ttyUSB0" BAUDRATE = 115200 @@ -11,14 +9,10 @@ DEST = "mchome" MSG = "hello from serial" async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish - - mc = MeshCore(con) - await mc.connect() + mc = await MeshCore.create_serial(PORT, BAUDRATE) await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.commands.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6], MSG) asyncio.run(main()) + diff --git a/examples/serial_repeater_status.py b/examples/serial_repeater_status.py index b0ebfdd..2a77336 100755 --- a/examples/serial_repeater_status.py +++ b/examples/serial_repeater_status.py @@ -3,31 +3,27 @@ import asyncio from meshcore import MeshCore -from meshcore import printerr -from meshcore import SerialConnection +from meshcore.events import EventType -PORT = "/dev/ttyUSB0" +PORT = "/dev/tty.usbserial-583A0069501" BAUDRATE = 115200 -REPEATER="FdlRoom" -PASSWORD="password" +REPEATER="Orion" +PASSWORD="floopyboopy" async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish + mc = await MeshCore.create_serial(PORT, BAUDRATE) + await mc.commands.get_contacts() + repeater = mc.get_contact_by_name(REPEATER) + + await mc.commands.send_login(bytes.fromhex(repeater["public_key"]), PASSWORD) - mc = MeshCore(con) - await mc.connect() + print("Login sent ... awaiting") - contacts = await mc.get_contacts() - repeater = contacts[REPEATER] - await mc.send_login(bytes.fromhex(repeater["public_key"]), PASSWORD) - - printerr("Login sent ... awaiting") - - if await mc.wait_login() : - await mc.send_statusreq(bytes.fromhex(repeater["public_key"])) - print(await mc.wait_status()) + if await mc.wait_for_event(EventType.LOGIN_SUCCESS): + print("Logged in success") + await mc.commands.send_statusreq(bytes.fromhex(repeater["public_key"])) + print("Status request sent ... awaiting") + print(await mc.wait_for_event(EventType.STATUS_RESPONSE)) asyncio.run(main()) diff --git a/examples/tcp_mchome_contacts.py b/examples/tcp_mchome_contacts.py index 3e730d8..a4f71f2 100755 --- a/examples/tcp_mchome_contacts.py +++ b/examples/tcp_mchome_contacts.py @@ -14,5 +14,5 @@ async def main () : mc = MeshCore(con) await mc.connect() - print(json.dumps(await mc.get_contacts(),indent=4)) + print(json.dumps(await mc.commands.get_contacts(),indent=4)) asyncio.run(main()) diff --git a/examples/tcp_mchome_infos.py b/examples/tcp_mchome_infos.py index d1ec958..b5c24d2 100755 --- a/examples/tcp_mchome_infos.py +++ b/examples/tcp_mchome_infos.py @@ -3,17 +3,15 @@ import asyncio from meshcore import MeshCore -from meshcore import TCPConnection HOSTNAME = "mchome" PORT = 5000 -async def main () : - con = TCPConnection(HOSTNAME, PORT) - await con.connect() - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_tcp(HOSTNAME, PORT) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/tcp_mchome_msg.py b/examples/tcp_mchome_msg.py index d3f751e..45f36af 100755 --- a/examples/tcp_mchome_msg.py +++ b/examples/tcp_mchome_msg.py @@ -17,6 +17,6 @@ async def main () : await mc.connect() await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6],MSG) asyncio.run(main()) diff --git a/examples/tcp_mchome_readmsgs.py b/examples/tcp_mchome_readmsgs.py index e575000..4cafca9 100755 --- a/examples/tcp_mchome_readmsgs.py +++ b/examples/tcp_mchome_readmsgs.py @@ -18,7 +18,7 @@ async def main () : res = True while res: - res = await mc.get_msg() + res = await mc.commands.get_msg() if res : print (res) diff --git a/src/meshcore/__init__.py b/src/meshcore/__init__.py index 415d909..0171409 100644 --- a/src/meshcore/__init__.py +++ b/src/meshcore/__init__.py @@ -1,5 +1,10 @@ -from meshcore.meshcore import printerr -from meshcore.meshcore import MeshCore +import logging + +# Setup default logger +logging.basicConfig(level=logging.INFO) + +from meshcore.events import EventType +from meshcore.meshcore import MeshCore, logger from meshcore.tcp_cx import TCPConnection from meshcore.ble_cx import BLEConnection from meshcore.serial_cx import SerialConnection diff --git a/src/meshcore/ble_cx.py b/src/meshcore/ble_cx.py index 6280e8c..8f49a1c 100644 --- a/src/meshcore/ble_cx.py +++ b/src/meshcore/ble_cx.py @@ -3,8 +3,10 @@ """ import asyncio import sys +import logging -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic @@ -40,11 +42,11 @@ class BLEConnection: if self.address is None or self.address == "" or len(self.address.split(":")) != 6 : scanner = BleakScanner() - printerr("Scanning for devices") + logger.info("Scanning for devices") device = await scanner.find_device_by_filter(match_meshcore_device) if device is None : return None - printerr(f"Found device : {device}") + logger.info(f"Found device : {device}") self.client = BleakClient(device) self.address = self.client.address else: @@ -62,22 +64,22 @@ class BLEConnection: nus = self.client.services.get_service(UART_SERVICE_UUID) self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID) - printerr("BLE Connexion started") + logger.info("BLE Connection started") return self.address def handle_disconnect(self, _: BleakClient): """ Callback to handle disconnection """ - printerr ("Device was disconnected, goodbye.") + logger.info("Device was disconnected, goodbye.") # cancelling all tasks effectively ends the program for task in asyncio.all_tasks(): task.cancel() - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray): - if not self.mc is None: - self.mc.handle_rx(data) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(data)) async def send(self, data): await self.client.write_gatt_char(self.rx_char, bytes(data), response=False) diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py new file mode 100644 index 0000000..856a82a --- /dev/null +++ b/src/meshcore/commands.py @@ -0,0 +1,216 @@ +import functools +import asyncio +import logging +import warnings +import time +from typing import Any, Callable, Awaitable, Optional, Union +from .events import EventType + +logger = logging.getLogger("meshcore") + +class CommandError(Exception): + def __init__(self, details=None): + self.details = details + super().__init__(f"Command error: {details}") + +def deprecated(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + warnings.warn( + f"Method {func.__name__} is deprecated. Use commands.{func.__name__} instead.", + DeprecationWarning, + stacklevel=2 + ) + return await func(*args, **kwargs) + return wrapper + + +class CommandHandler: + def __init__(self): + self._sender_func = None + self._reader = None + self.dispatcher = None + + def set_connection(self, connection): + async def sender(data): + await connection.send(data) + self._sender_func = sender + + def set_reader(self, reader): + self._reader = reader + + def set_dispatcher(self, dispatcher): + self.dispatcher = dispatcher + + async def send(self, data, expected_events=None, timeout=5.0): + if not self.dispatcher: + raise RuntimeError("Dispatcher not set, cannot send commands") + + if self._sender_func: + logger.debug(f"Sending raw data: {data.hex() if isinstance(data, bytes) else data}") + await self._sender_func(data) + + if expected_events: + try: + # Convert single event to list if needed + if not isinstance(expected_events, list): + expected_events = [expected_events] + + logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") + for event_type in expected_events: + event = await self.dispatcher.wait_for_event(event_type, timeout) + if event: + return event.payload + return False + except asyncio.TimeoutError: + logger.debug(f"Command timed out {data}") + return False + except Exception as e: + logger.debug(f"Command error: {e}") + return {"error": str(e)} + return True + + + async def send_appstart(self): + logger.debug("Sending appstart command") + b1 = bytearray(b'\x01\x03 mccli') + return await self.send(b1, [EventType.SELF_INFO]) + + async def send_device_query(self): + logger.debug("Sending device query command") + return await self.send(b"\x16\x03", [EventType.DEVICE_INFO, EventType.ERROR]) + + async def send_advert(self, flood=False): + logger.debug(f"Sending advertisement command (flood={flood})") + if flood: + return await self.send(b"\x07\x01", [EventType.OK, EventType.ERROR]) + else: + return await self.send(b"\x07", [EventType.OK, EventType.ERROR]) + + async def set_name(self, name): + logger.debug(f"Setting device name to: {name}") + return await self.send(b'\x08' + name.encode("ascii"), [EventType.OK, EventType.ERROR]) + + async def set_coords(self, lat, lon): + logger.debug(f"Setting coordinates to: lat={lat}, lon={lon}") + return await self.send(b'\x0e'\ + + int(lat*1e6).to_bytes(4, 'little', signed=True)\ + + int(lon*1e6).to_bytes(4, 'little', signed=True)\ + + int(0).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def reboot(self): + logger.debug("Sending reboot command") + return await self.send(b'\x13reboot') + + async def get_bat(self): + logger.debug("Getting battery information") + return await self.send(b'\x14', [EventType.BATTERY, EventType.ERROR]) + + async def get_time(self): + logger.debug("Getting device time") + return await self.send(b"\x05", [EventType.CURRENT_TIME, EventType.ERROR]) + + async def set_time(self, val): + logger.debug(f"Setting device time to: {val}") + return await self.send(b"\x06" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_tx_power(self, val): + logger.debug(f"Setting TX power to: {val}") + return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_radio(self, freq, bw, sf, cr): + logger.debug(f"Setting radio params: freq={freq}, bw={bw}, sf={sf}, cr={cr}") + return await self.send(b"\x0b" \ + + int(float(freq)*1000).to_bytes(4, 'little')\ + + int(float(bw)*1000).to_bytes(4, 'little')\ + + int(sf).to_bytes(1, 'little')\ + + int(cr).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_tuning(self, rx_dly, af): + logger.debug(f"Setting tuning params: rx_dly={rx_dly}, af={af}") + return await self.send(b"\x15" \ + + int(rx_dly).to_bytes(4, 'little')\ + + int(af).to_bytes(4, 'little')\ + + int(0).to_bytes(1, 'little')\ + + int(0).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_devicepin(self, pin): + logger.debug(f"Setting device PIN to: {pin}") + return await self.send(b"\x25" \ + + int(pin).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def get_contacts(self): + logger.debug("Getting contacts") + return await self.send(b"\x04", [EventType.CONTACTS, EventType.ERROR]) + + async def reset_path(self, key): + logger.debug(f"Resetting path for contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x0D" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def share_contact(self, key): + logger.debug(f"Sharing contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x10" + key + return await self.send(data, [EventType.CONTACT_SHARE, EventType.ERROR]) + + async def export_contact(self, key=b""): + logger.debug(f"Exporting contact: {key.hex() if key else 'all'}") + data = b"\x11" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def remove_contact(self, key): + logger.debug(f"Removing contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x0f" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def get_msg(self): + logger.debug("Requesting pending messages") + return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], 1) + + async def send_login(self, dst, pwd): + logger.debug(f"Sending login request to: {dst.hex() if isinstance(dst, bytes) else dst}") + data = b"\x1a" + dst + pwd.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_statusreq(self, dst): + logger.debug(f"Sending status request to: {dst.hex() if isinstance(dst, bytes) else dst}") + data = b"\x1b" + dst + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_cmd(self, dst, cmd, timestamp=None): + logger.debug(f"Sending command to {dst.hex() if isinstance(dst, bytes) else dst}: {cmd}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii") + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def send_msg(self, dst, msg, timestamp=None): + logger.debug(f"Sending message to {dst.hex() if isinstance(dst, bytes) else dst}: {msg}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_chan_msg(self, chan, msg, timestamp=None): + logger.debug(f"Sending channel message to channel {chan}: {msg}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_cli(self, cmd): + logger.debug(f"Sending CLI command: {cmd}") + data = b"\x32" + cmd.encode('ascii') + return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR]) \ No newline at end of file diff --git a/src/meshcore/events.py b/src/meshcore/events.py new file mode 100644 index 0000000..0f3ddf5 --- /dev/null +++ b/src/meshcore/events.py @@ -0,0 +1,123 @@ +from enum import Enum +import logging +from typing import Any, Dict, Optional, Callable, List, Union +import asyncio +from dataclasses import dataclass + +logger = logging.getLogger("meshcore") + +# Public event types for users to subscribe to +class EventType(Enum): + CONTACTS = "contacts" + SELF_INFO = "self_info" + CONTACT_MSG_RECV = "contact_message" + CHANNEL_MSG_RECV = "channel_message" + CURRENT_TIME = "time_update" + NO_MORE_MSGS = "no_more_messages" + CONTACT_SHARE = "contact_share" + BATTERY = "battery_info" + DEVICE_INFO = "device_info" + CLI_RESPONSE = "cli_response" + MSG_SENT = "message_sent" + + # Push notifications + ADVERTISEMENT = "advertisement" + PATH_UPDATE = "path_update" + ACK = "acknowledgement" + MESSAGES_WAITING = "messages_waiting" + RAW_DATA = "raw_data" + LOGIN_SUCCESS = "login_success" + LOGIN_FAILED = "login_failed" + STATUS_RESPONSE = "status_response" + LOG_DATA = "log_data" + + # Command response types + OK = "command_ok" + ERROR = "command_error" + + +@dataclass +class Event: + type: EventType + payload: Any + attributes: Dict[str, Any] = None + + def __post_init__(self): + if self.attributes is None: + self.attributes = {} + + +class Subscription: + def __init__(self, dispatcher, event_type, callback): + self.dispatcher = dispatcher + self.event_type = event_type + self.callback = callback + + def unsubscribe(self): + self.dispatcher._remove_subscription(self) + + +class EventDispatcher: + def __init__(self): + self.queue = asyncio.Queue() + self.subscriptions: List[Subscription] = [] + self.running = False + self._task = None + + def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], None]) -> Subscription: + subscription = Subscription(self, event_type, callback) + self.subscriptions.append(subscription) + return subscription + + def _remove_subscription(self, subscription: Subscription): + if subscription in self.subscriptions: + self.subscriptions.remove(subscription) + + async def dispatch(self, event: Event): + await self.queue.put(event) + + async def _process_events(self): + while self.running: + event = await self.queue.get() + logger.debug(f"Dispatching event: {event.type}, {event.payload}") + for subscription in self.subscriptions.copy(): + if subscription.event_type is None or subscription.event_type == event.type: + try: + await subscription.callback(event) + except Exception as e: + print(f"Error in event handler: {e}") + + self.queue.task_done() + + async def start(self): + if not self.running: + self.running = True + self._task = asyncio.create_task(self._process_events()) + + async def stop(self): + if self.running: + self.running = False + if self._task: + await self.queue.join() + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + async def wait_for_event(self, event_type: EventType, timeout: float = None) -> Optional[Event]: + future = asyncio.Future() + + async def event_handler(event: Event): + if not future.done(): + future.set_result(event) + + subscription = self.subscribe(event_type, event_handler) + + try: + return await asyncio.wait_for(future, timeout) + except asyncio.TimeoutError: + return None + finally: + subscription.unsubscribe() \ No newline at end of file diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index c70c4a9..4ac579c 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -1,414 +1,288 @@ -""" - mccli.py : CLI interface to MeschCore BLE companion app -""" import asyncio -import sys +import functools +import warnings +import logging +from typing import Optional -def printerr (s) : - sys.stderr.write(str(s)) - sys.stderr.write("\n") - sys.stderr.flush() +from .events import EventDispatcher, EventType +from .reader import MessageReader +from .commands import CommandHandler + + +# Setup default logger +logger = logging.getLogger("meshcore") + + +def deprecated(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + warnings.warn( + f"Method {func.__name__} is deprecated. Use commands.{func.__name__} instead.", + DeprecationWarning, + stacklevel=2 + ) + return await func(*args, **kwargs) + return wrapper class MeshCore: """ - Interface to a BLE MeshCore device + Interface to a MeshCore device """ - self_info={} - contacts={} - - def __init__(self, cx): - """ Constructor : specify address """ - self.time = 0 - self.result = asyncio.Future() - self.contact_nb = 0 - self.rx_sem = asyncio.Semaphore(0) - self.ack_ev = asyncio.Event() - self.login_resp = asyncio.Future() - self.status_resp = asyncio.Future() - + def __init__(self, cx, debug=False): self.cx = cx - cx.set_mc(self) - - async def connect(self) : - await self.send_appstart() - - def handle_rx(self, data: bytearray): - """ Callback to handle received data """ - match data[0]: - case 0: # ok - if len(data) == 5 : # an integer - self.result.set_result(int.from_bytes(data[1:5], byteorder='little')) - else: - self.result.set_result(True) - case 1: # error - if len(data) > 1: - res = {} - res["error_code"] = data[1] - self.result.set_result(res) # error code if fw > 1.4 - else: - self.result.set_result(False) - case 2: # contact start - self.contact_nb = int.from_bytes(data[1:5], byteorder='little') - self.contacts={} - case 3: # contact - c = {} - c["public_key"] = data[1:33].hex() - c["type"] = data[33] - c["flags"] = data[34] - c["out_path_len"] = int.from_bytes(data[35:36], signed=True) - plen = int.from_bytes(data[35:36], signed=True) - if plen == -1 : - plen = 0 - c["out_path"] = data[36:36+plen].hex() - c["adv_name"] = data[100:132].decode().replace("\0","") - c["last_advert"] = int.from_bytes(data[132:136], byteorder='little') - c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6 - c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6 - c["lastmod"] = int.from_bytes(data[144:148], byteorder='little') - self.contacts[c["adv_name"]]=c - case 4: # end of contacts - self.result.set_result(self.contacts) - case 5: # self info - self.self_info["adv_type"] = data[1] - self.self_info["tx_power"] = data[2] - self.self_info["max_tx_power"] = data[3] - self.self_info["public_key"] = data[4:36].hex() - self.self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6 - self.self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6 - #self.self_info["reserved_44:48"] = data[44:48].hex() - self.self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000 - self.self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000 - self.self_info["radio_sf"] = data[56] - self.self_info["radio_cr"] = data[57] - self.self_info["name"] = data[58:].decode() - self.result.set_result(True) - case 6: # msg sent - res = {} - res["type"] = data[1] - res["expected_ack"] = bytes(data[2:6]) - res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') - self.result.set_result(res) - case 7: # contact msg recv - res = {} - res["type"] = "PRIV" - res["pubkey_prefix"] = data[1:7].hex() - res["path_len"] = data[7] - res["txt_type"] = data[8] - res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little') - if data[8] == 2 : # signed packet - res["signature"] = data[13:17].hex() - res["text"] = data[17:].decode() - else : - res["text"] = data[13:].decode() - self.result.set_result(res) - case 16: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "PRIV" - res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4; - res["pubkey_prefix"] = data[4:10].hex() - res["path_len"] = data[10] - res["txt_type"] = data[11] - res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little') - if data[11] == 2 : # signed packet - res["signature"] = data[16:20].hex() - res["text"] = data[20:].decode() - else : - res["text"] = data[16:].decode() - self.result.set_result(res) - case 8 : # chanel msg recv - res = {} - res["type"] = "CHAN" - res["channel_idx"] = data[1] - res["path_len"] = data[2] - res["txt_type"] = data[3] - res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little') - res["text"] = data[8:].decode() - self.result.set_result(res) - case 17: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "CHAN" - res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4; - res["channel_idx"] = data[4] - res["path_len"] = data[5] - res["txt_type"] = data[6] - res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little') - res["text"] = data[11:].decode() - self.result.set_result(res) - case 9: # current time - self.result.set_result(int.from_bytes(data[1:5], byteorder='little')) - case 10: # no more msgs - self.result.set_result(False) - case 11: # contact - self.result.set_result("meshcore://" + data[1:].hex()) - case 12: # battery voltage - self.result.set_result(int.from_bytes(data[1:3], byteorder='little')) - case 13: # device info - res = {} - res["fw ver"] = data[1] - if data[1] >= 3: - res["max_contacts"] = data[2] * 2 - res["max_channels"] = data[3] - res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little') - res["fw_build"] = data[8:20].decode().replace("\0","") - res["model"] = data[20:60].decode().replace("\0","") - res["ver"] = data[60:80].decode().replace("\0","") - self.result.set_result(res) - case 50: # cli response - res = {} - res["response"] = data[1:].decode() - self.result.set_result(res) - # push notifications - case 0x80: - printerr ("Advertisment received") - case 0x81: - printerr ("Code path update") - case 0x82: - self.ack_ev.set() - printerr ("Received ACK") - case 0x83: - self.rx_sem.release() - printerr ("Msgs are waiting") - case 0x84: - printerr ("Received raw data") - res = {} - res["SNR"] = data[1] / 4 - res["RSSI"] = data[2] - res["payload"] = data[4:].hex() - print(res) - case 0x85: - self.login_resp.set_result(True) - - printerr ("Login success") - case 0x86: - self.login_resp.set_result(False) - printerr ("Login failed") - case 0x87: - res = {} - res["pubkey_pre"] = data[2:8].hex() - res["bat"] = int.from_bytes(data[8:10], byteorder='little') - res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little') - res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little') - res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True) - res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False) - res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False) - res["airtime"] = int.from_bytes(data[24:28], byteorder='little') - res["uptime"] = int.from_bytes(data[28:32], byteorder='little') - res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little') - res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little') - res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little') - res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little') - res["full_evts"] = int.from_bytes(data[48:50], byteorder='little') - res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4 - res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little') - res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little') - self.status_resp.set_result(res) - data_hex = data[8:].hex() - printerr (f"Status response: {data_hex}") - #printerr(res) - case 0x88: - printerr ("Received log data") - # unhandled - case _: - printerr (f"Unhandled data received {data}") - - async def send(self, data, timeout = 5): - """ Helper function to synchronously send (and receive) data to the node """ - self.result = asyncio.Future() - try: - await self.cx.send(data) - res = await asyncio.wait_for(self.result, timeout) - return res - except TimeoutError : - printerr ("Timeout while sending message ...") - return False - - async def send_only(self, data): # don't wait reply - await self.cx.send(data) - - async def send_appstart(self): - """ Send APPSTART to the node """ - b1 = bytearray(b'\x01\x03 mccli') - return await self.send(b1) - - async def send_device_qeury(self): - return await self.send(b"\x16\x03"); - - async def send_advert(self, flood=False): - """ Make the node send an advertisement """ - if flood : - return await self.send(b"\x07\x01") - else : - return await self.send(b"\x07") - - async def set_name(self, name): - """ Changes the name of the node """ - return await self.send(b'\x08' + name.encode("ascii")) - - async def set_coords(self, lat, lon): - return await self.send(b'\x0e'\ - + int(lat*1e6).to_bytes(4, 'little', signed=True)\ - + int(lon*1e6).to_bytes(4, 'little', signed=True)\ - + int(0).to_bytes(4, 'little')) - - async def reboot(self): - await self.send_only(b'\x13reboot') - return True - - async def get_bat(self): - return await self.send(b'\x14') - - async def get_time(self): - """ Get the time (epoch) of the node """ - self.time = await self.send(b"\x05") - return self.time - - async def set_time(self, val): - """ Sets a new epoch """ - return await self.send(b"\x06" + int(val).to_bytes(4, 'little')) - - async def set_tx_power(self, val): - """ Sets tx power """ - return await self.send(b"\x0c" + int(val).to_bytes(4, 'little')) - - async def set_radio (self, freq, bw, sf, cr): - """ Sets radio params """ - return await self.send(b"\x0b" \ - + int(float(freq)*1000).to_bytes(4, 'little')\ - + int(float(bw)*1000).to_bytes(4, 'little')\ - + int(sf).to_bytes(1, 'little')\ - + int(cr).to_bytes(1, 'little')) - - async def set_tuning (self, rx_dly, af): - """ Sets radio params """ - return await self.send(b"\x15" \ - + int(rx_dly).to_bytes(4, 'little')\ - + int(af).to_bytes(4, 'little')\ - + int(0).to_bytes(1, 'little')\ - + int(0).to_bytes(1, 'little')) - - async def set_devicepin (self, pin): - return await self.send(b"\x25" \ - + int(pin).to_bytes(4, 'little')) - - async def get_contacts(self): - """ Starts retreiving contacts """ - return await self.send(b"\x04") - + self.dispatcher = EventDispatcher() + self._reader = MessageReader(self.dispatcher) + self.commands = CommandHandler() + + # Set up logger + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + # Set up connections + self.commands.set_connection(cx) + + # Set the dispatcher in the command handler + self.commands.set_dispatcher(self.dispatcher) + self.commands.set_reader(self._reader) + + # Initialize state (private) + self._contacts = {} + self._self_info = {} + self._time = 0 + + # Set up event subscriptions to track data + self._setup_data_tracking() + + cx.set_reader(self._reader) + + @classmethod + async def create_tcp(cls, host: str, port: int, debug: bool = False) -> 'MeshCore': + """Create and connect a MeshCore instance using TCP connection""" + from .tcp_cx import TCPConnection + + connection = TCPConnection(host, port) + await connection.connect() + + mc = cls(connection, debug=debug) + await mc.connect() + return mc + + @classmethod + async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False) -> 'MeshCore': + """Create and connect a MeshCore instance using serial connection""" + from .serial_cx import SerialConnection + import asyncio + + connection = SerialConnection(port, baudrate) + await connection.connect() + await asyncio.sleep(0.1) # Time for transport to establish + + mc = cls(connection, debug=debug) + await mc.connect() + return mc + + @classmethod + async def create_ble(cls, address: Optional[str] = None, debug: bool = False) -> 'MeshCore': + """Create and connect a MeshCore instance using BLE connection + + If address is None, it will scan for and connect to the first available MeshCore device. + """ + from .ble_cx import BLEConnection + + connection = BLEConnection(address) + result = await connection.connect() + if result is None: + raise ConnectionError("Failed to connect to BLE device") + + mc = cls(connection, debug=debug) + await mc.connect() + return mc + + async def connect(self): + await self.dispatcher.start() + return await self.commands.send_appstart() + + async def disconnect(self): + await self.dispatcher.stop() + + def stop(self): + """Synchronously stop the event dispatcher task""" + if self.dispatcher._task and not self.dispatcher._task.done(): + self.dispatcher.running = False + self.dispatcher._task.cancel() + + def subscribe(self, event_type: EventType, callback): + """ + Subscribe to events using EventType enum + + Args: + event_type: Type of event to subscribe to, from EventType enum + callback: Async function to call when event occurs + + Returns: + Subscription object that can be used to unsubscribe + """ + return self.dispatcher.subscribe(event_type, callback) + + def unsubscribe(self, subscription): + """ + Unsubscribe from events using a subscription object + + Args: + subscription: Subscription object returned from subscribe() + """ + if subscription: + subscription.unsubscribe() + + async def wait_for_event(self, event_type: EventType, timeout=None): + """ + Wait for an event using EventType enum + + Args: + event_type: Type of event to wait for, from EventType enum + timeout: Maximum time to wait in seconds, or None for no timeout + + Returns: + Event object or None if timeout + """ + return await self.dispatcher.wait_for_event(event_type, timeout) + + def _setup_data_tracking(self): + """Set up event subscriptions to track data internally""" + async def _update_contacts(event): + self._contacts = event.payload + + async def _update_self_info(event): + self._self_info = event.payload + + async def _update_time(event): + self._time = event.payload + + # Subscribe to events to update internal state + self.subscribe(EventType.CONTACTS, _update_contacts) + self.subscribe(EventType.SELF_INFO, _update_self_info) + self.subscribe(EventType.CURRENT_TIME, _update_time) + + # Getter methods for state + @property + def contacts(self): + """Get the current contacts""" + return self._contacts + + @property + def self_info(self): + """Get device self info""" + return self._self_info + + @property + def time(self): + """Get the current device time""" + return self._time + + def get_contact_by_name(self, name): + """ + Find a contact by its name (adv_name field) + + Args: + name: The name to search for + + Returns: + Contact dictionary or None if not found + """ + if not self._contacts: + return None + + for contact_id, contact in self._contacts.items(): + if contact.get("adv_name", "").lower() == name.lower(): + return contact + + return None + + def get_contact_by_key_prefix(self, prefix): + """ + Find a contact by its public key prefix + + Args: + prefix: The public key prefix to search for (can be a partial prefix) + + Returns: + Contact dictionary or None if not found + """ + if not self._contacts or not prefix: + return None + + # Convert the prefix to lowercase for case-insensitive matching + prefix = prefix.lower() + + for contact_id, contact in self._contacts.items(): + public_key = contact.get("public_key", "").lower() + if public_key.startswith(prefix): + return contact + + return None + + async def start_auto_message_fetching(self): + """ + Start automatically fetching messages when messages_waiting events are received. + This will continuously check for new messages when the device indicates + messages are waiting. + """ + self._auto_fetch_task = None + self._auto_fetch_running = True + + async def _handle_messages_waiting(event): + # Only start a new fetch task if one isn't already running + if not self._auto_fetch_task or self._auto_fetch_task.done(): + self._auto_fetch_task = asyncio.create_task(_fetch_messages_loop()) + + async def _fetch_messages_loop(): + while self._auto_fetch_running: + try: + # Request the next message + result = await self.commands.get_msg() + + # If we got a NO_MORE_MSGS event or an error, stop fetching + if not result or isinstance(result, dict) and "error" in result: + break + + # Small delay to prevent overwhelming the device + await asyncio.sleep(0.1) + except Exception as e: + logger.error(f"Error fetching messages: {e}") + break + + # Subscribe to MESSAGES_WAITING events + self._auto_fetch_subscription = self.subscribe(EventType.MESSAGES_WAITING, _handle_messages_waiting) + + # Check for any pending messages immediately + await self.commands.get_msg() + + return self._auto_fetch_subscription + + async def stop_auto_message_fetching(self): + """ + Stop automatically fetching messages when messages_waiting events are received. + """ + if hasattr(self, '_auto_fetch_subscription') and self._auto_fetch_subscription: + self.unsubscribe(self._auto_fetch_subscription) + self._auto_fetch_subscription = None + + if hasattr(self, '_auto_fetch_running'): + self._auto_fetch_running = False + + if hasattr(self, '_auto_fetch_task') and self._auto_fetch_task and not self._auto_fetch_task.done(): + self._auto_fetch_task.cancel() + try: + await self._auto_fetch_task + except asyncio.CancelledError: + pass + self._auto_fetch_task = None + async def ensure_contacts(self): - if len(self.contacts) == 0 : - await self.get_contacts() - - async def reset_path(self, key): - data = b"\x0D" + key - return await self.send(data) - - async def share_contact(self, key): - data = b"\x10" + key - return await self.send(data) - - async def export_contact(self, key=b""): - data = b"\x11" + key - return await self.send(data) - - async def remove_contact(self, key): - data = b"\x0f" + key - return await self.send(data) - - async def set_out_path(self, contact, path): - contact["out_path"] = path - contact["out_path_len"] = -1 - contact["out_path_len"] = int(len(path) / 2) - - async def update_contact(self, contact): - out_path_hex = contact["out_path"] - out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0" - adv_name_hex = contact["adv_name"].encode().hex() - adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0" - data = b"\x09" \ - + bytes.fromhex(contact["public_key"])\ - + contact["type"].to_bytes(1)\ - + contact["flags"].to_bytes(1)\ - + contact["out_path_len"].to_bytes(1, 'little', signed=True)\ - + bytes.fromhex(out_path_hex)\ - + bytes.fromhex(adv_name_hex)\ - + contact["last_advert"].to_bytes(4, 'little')\ - + int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\ - + int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True) - return await self.send(data) - - async def send_login(self, dst, pwd): - self.login_resp = asyncio.Future() - data = b"\x1a" + dst + pwd.encode("ascii") - return await self.send(data) - - async def wait_login(self, timeout = 5): - try : - return await asyncio.wait_for(self.login_resp, timeout) - except TimeoutError : - printerr ("Timeout ...") - return False - - async def send_statusreq(self, dst): - self.status_resp = asyncio.Future() - data = b"\x1b" + dst - return await self.send(data) - - async def wait_status(self, timeout = 5): - try : - return await asyncio.wait_for(self.status_resp, timeout) - except TimeoutError : - printerr ("Timeout...") - return False - - async def send_cmd(self, dst, cmd): - """ Send a cmd to a node """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii") - #self.ack_ev.clear() # no ack ? - return await self.send(data) - - async def send_msg(self, dst, msg): - """ Send a message to a node """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii") - self.ack_ev.clear() - return await self.send(data) - - async def send_chan_msg(self, chan, msg): - """ Send a message to a public channel """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii") - return await self.send(data) - - async def get_msg(self): - """ Get message from the node (stored in queue) """ - res = await self.send(b"\x0A", 1) - if res is False : - self.rx_sem=asyncio.Semaphore(0) # reset semaphore as there are no msgs in queue - return res - - async def wait_msg(self, timeout=-1): - """ Wait for a message """ - if timeout == -1 : - await self.rx_sem.acquire() + """Ensure contacts are fetched""" + if not self._contacts: + await self.commands.get_contacts() return True - - try: - await asyncio.wait_for(self.rx_sem.acquire(), timeout) - return True - except TimeoutError : - printerr("Timeout waiting msg") - return False - - async def wait_ack(self, timeout=6): - """ Wait ack """ - try: - await asyncio.wait_for(self.ack_ev.wait(), timeout) - return True - except TimeoutError : - printerr("Timeout waiting ack") - return False - - async def send_cli(self, cmd): - data = b"\x32" + cmd.encode('ascii') - return await self.send(data) + return False \ No newline at end of file diff --git a/src/meshcore/meshcore_new.py b/src/meshcore/meshcore_new.py new file mode 100644 index 0000000..e48caca --- /dev/null +++ b/src/meshcore/meshcore_new.py @@ -0,0 +1,249 @@ +import asyncio +from typing import Dict, Any, Optional, Callable + +from .events import EventDispatcher, MessageType, Event +from .reader import MessageReader +from .commands import CommandHandler, deprecated + + +class MeshCore: + def __init__(self, cx): + self.cx = cx + self.dispatcher = EventDispatcher() + self._reader = MessageReader(self.dispatcher) + self.commands = CommandHandler() + + # Set up connections + self.commands.set_connection(cx) + + # Initialize state + self.contacts = {} + self.self_info = {} + self.time = 0 + + # Set the message handler in the connection + cx.set_mc(self) + + async def connect(self): + # Start the event dispatcher + await self.dispatcher.start() + + # Start the command handler + await self.commands.start() + + # Send the initial app start + return await self.commands.send_appstart() + + async def disconnect(self): + # Stop the event dispatcher + await self.dispatcher.stop() + + # Stop the command handler + await self.commands.stop() + + # Internal method - called by the connection + def handle_rx(self, data: bytearray): + asyncio.create_task(self._reader.handle_rx(data)) + + # Expose subscribe/wait capabilities from the event system + def subscribe(self, message_type, callback): + return self.dispatcher.subscribe(message_type, callback) + + async def wait_for_event(self, message_type, timeout=None): + return await self.dispatcher.wait_for_event(message_type, timeout) + + # Legacy method implementations that delegate to the command handler + # using the deprecated decorator from commands.py + + @deprecated + async def send(self, data, timeout=5): + return await self.commands.send(data, timeout) + + @deprecated + async def send_only(self, data): + await self.commands.send_only(data) + + @deprecated + async def send_appstart(self): + return await self.commands.send_appstart() + + @deprecated + async def send_device_query(self): + return await self.commands.send_device_query() + + @deprecated + async def send_advert(self, flood=False): + return await self.commands.send_advert(flood) + + @deprecated + async def set_name(self, name): + return await self.commands.set_name(name) + + @deprecated + async def set_coords(self, lat, lon): + return await self.commands.set_coords(lat, lon) + + @deprecated + async def reboot(self): + return await self.commands.reboot() + + @deprecated + async def get_bat(self): + return await self.commands.get_bat() + + @deprecated + async def get_time(self): + time_result = await self.commands.get_time() + if isinstance(time_result, int): + self.time = time_result + return self.time + + @deprecated + async def set_time(self, val): + return await self.commands.set_time(val) + + @deprecated + async def set_tx_power(self, val): + return await self.commands.set_tx_power(val) + + @deprecated + async def set_radio(self, freq, bw, sf, cr): + return await self.commands.set_radio(freq, bw, sf, cr) + + @deprecated + async def set_tuning(self, rx_dly, af): + return await self.commands.set_tuning(rx_dly, af) + + @deprecated + async def set_devicepin(self, pin): + return await self.commands.set_devicepin(pin) + + @deprecated + async def get_contacts(self): + await self.commands.get_contacts() + contact_end = await self.dispatcher.wait_for_event(MessageType.CONTACT_END) + if contact_end: + self.contacts = contact_end.payload + return self.contacts + + @deprecated + async def ensure_contacts(self): + if not self.contacts: + await self.get_contacts() + + @deprecated + async def reset_path(self, key): + return await self.commands.reset_path(key) + + @deprecated + async def share_contact(self, key): + return await self.commands.share_contact(key) + + @deprecated + async def export_contact(self, key=b""): + return await self.commands.export_contact(key) + + @deprecated + async def remove_contact(self, key): + return await self.commands.remove_contact(key) + + @deprecated + async def set_out_path(self, contact, path): + contact["out_path"] = path + contact["out_path_len"] = -1 + contact["out_path_len"] = int(len(path) / 2) + + @deprecated + async def update_contact(self, contact): + out_path_hex = contact["out_path"] + out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0" + adv_name_hex = contact["adv_name"].encode().hex() + adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0" + data = b"\x09" \ + + bytes.fromhex(contact["public_key"])\ + + contact["type"].to_bytes(1)\ + + contact["flags"].to_bytes(1)\ + + contact["out_path_len"].to_bytes(1, 'little', signed=True)\ + + bytes.fromhex(out_path_hex)\ + + bytes.fromhex(adv_name_hex)\ + + contact["last_advert"].to_bytes(4, 'little')\ + + int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\ + + int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True) + return await self.send(data) + + @deprecated + async def send_login(self, dst, pwd): + await self.commands.send_login(dst, pwd) + login_event = await self.dispatcher.wait_for_event(MessageType.LOGIN_SUCCESS, 0.1) + if login_event: + return True + return await self.commands.send_login(dst, pwd) + + @deprecated + async def wait_login(self, timeout=5): + login_event = await self.dispatcher.wait_for_event(MessageType.LOGIN_SUCCESS, timeout) + if login_event: + return True + login_failed = await self.dispatcher.wait_for_event(MessageType.LOGIN_FAILED, 0) + if login_failed: + return False + return False + + @deprecated + async def send_statusreq(self, dst): + await self.commands.send_statusreq(dst) + + @deprecated + async def wait_status(self, timeout=5): + status_event = await self.dispatcher.wait_for_event(MessageType.STATUS_RESPONSE, timeout) + if status_event: + return status_event.payload + return False + + @deprecated + async def send_cmd(self, dst, cmd): + timestamp = await self.get_time() + return await self.commands.send_cmd(dst, cmd, timestamp.to_bytes(4, 'little')) + + @deprecated + async def send_msg(self, dst, msg): + timestamp = await self.get_time() + result = await self.commands.send_msg(dst, msg, timestamp.to_bytes(4, 'little')) + return result + + @deprecated + async def send_chan_msg(self, chan, msg): + timestamp = await self.get_time() + return await self.commands.send_chan_msg(chan, msg, timestamp.to_bytes(4, 'little')) + + @deprecated + async def get_msg(self): + await self.commands.get_msg() + + # Wait for any message type that could be received + message_types = [ + MessageType.CONTACT_MSG_RECV, + MessageType.CHANNEL_MSG_RECV, + MessageType.NO_MORE_MSGS + ] + + for msg_type in message_types: + event = await self.dispatcher.wait_for_event(msg_type, 0) + if event: + return event.payload + + return False + + @deprecated + async def wait_msg(self, timeout=-1): + msg_event = await self.dispatcher.wait_for_event(MessageType.MESSAGES_WAITING, timeout) + return msg_event is not None + + @deprecated + async def wait_ack(self, timeout=6): + ack_event = await self.dispatcher.wait_for_event(MessageType.ACK, timeout) + return ack_event is not None + + @deprecated + async def send_cli(self, cmd): + return await self.commands.send_cli(cmd) \ No newline at end of file diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py new file mode 100644 index 0000000..2539982 --- /dev/null +++ b/src/meshcore/packets.py @@ -0,0 +1,30 @@ +from enum import Enum + +# Packet prefixes for the protocol +class PacketType(Enum): + OK = 0 + ERROR = 1 + CONTACT_START = 2 + CONTACT = 3 + CONTACT_END = 4 + SELF_INFO = 5 + MSG_SENT = 6 + CONTACT_MSG_RECV = 7 + CHANNEL_MSG_RECV = 8 + CURRENT_TIME = 9 + NO_MORE_MSGS = 10 + CONTACT_SHARE = 11 + BATTERY = 12 + DEVICE_INFO = 13 + CLI_RESPONSE = 50 + + # Push notifications + ADVERTISEMENT = 0x80 + PATH_UPDATE = 0x81 + ACK = 0x82 + MESSAGES_WAITING = 0x83 + RAW_DATA = 0x84 + LOGIN_SUCCESS = 0x85 + LOGIN_FAILED = 0x86 + STATUS_RESPONSE = 0x87 + LOG_DATA = 0x88 \ No newline at end of file diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py new file mode 100644 index 0000000..761bdbb --- /dev/null +++ b/src/meshcore/reader.py @@ -0,0 +1,235 @@ +import sys +import logging +import asyncio +from typing import Any, Optional, Dict +from .events import Event, EventType, EventDispatcher +from .packets import PacketType + +logger = logging.getLogger("meshcore") + + +class MessageReader: + def __init__(self, dispatcher: EventDispatcher): + self.dispatcher = dispatcher + # We're only keeping state here that's needed for processing + # before events are dispatched + self.contacts = {} # Temporary storage during contact list building + self.contact_nb = 0 # Used for contact processing + + async def handle_rx(self, data: bytearray): + packet_type_value = data[0] + logger.debug(f"Received data: {data.hex()}") + + # Handle command responses + if packet_type_value == PacketType.OK.value: + result = None + if len(data) == 5: + result = int.from_bytes(data[1:5], byteorder='little') + else: + result = True + + # Dispatch event for the OK response + await self.dispatcher.dispatch(Event(EventType.OK, result)) + + elif packet_type_value == PacketType.ERROR.value: + result = False + if len(data) > 1: + result = {"error_code": data[1]} + + # Dispatch event for the ERROR response + await self.dispatcher.dispatch(Event(EventType.ERROR, result)) + + elif packet_type_value == PacketType.CONTACT_START.value: + self.contact_nb = int.from_bytes(data[1:5], byteorder='little') + self.contacts = {} + + elif packet_type_value == PacketType.CONTACT.value: + c = {} + c["public_key"] = data[1:33].hex() + c["type"] = data[33] + c["flags"] = data[34] + c["out_path_len"] = int.from_bytes(data[35:36], signed=True) + plen = int.from_bytes(data[35:36], signed=True) + if plen == -1: + plen = 0 + c["out_path"] = data[36:36+plen].hex() + c["adv_name"] = data[100:132].decode().replace("\0","") + c["last_advert"] = int.from_bytes(data[132:136], byteorder='little') + c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6 + c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6 + c["lastmod"] = int.from_bytes(data[144:148], byteorder='little') + self.contacts[c["public_key"]] = c + + elif packet_type_value == PacketType.CONTACT_END.value: + await self.dispatcher.dispatch(Event(EventType.CONTACTS, self.contacts)) + + + elif packet_type_value == PacketType.SELF_INFO.value: + self_info = {} + self_info["adv_type"] = data[1] + self_info["tx_power"] = data[2] + self_info["max_tx_power"] = data[3] + self_info["public_key"] = data[4:36].hex() + self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6 + self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6 + self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000 + self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000 + self_info["radio_sf"] = data[56] + self_info["radio_cr"] = data[57] + self_info["name"] = data[58:].decode() + await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) + + elif packet_type_value == PacketType.MSG_SENT.value: + res = {} + res["type"] = data[1] + res["expected_ack"] = bytes(data[2:6]) + res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res)) + + elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: + res = {} + res["type"] = "PRIV" + res["pubkey_prefix"] = data[1:7].hex() + res["path_len"] = data[7] + res["txt_type"] = data[8] + res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little') + if data[8] == 2: + res["signature"] = data[13:17].hex() + res["text"] = data[17:].decode() + else: + res["text"] = data[13:].decode() + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res)) + + elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "PRIV" + res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4 + res["pubkey_prefix"] = data[4:10].hex() + res["path_len"] = data[10] + res["txt_type"] = data[11] + res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little') + if data[11] == 2: + res["signature"] = data[16:20].hex() + res["text"] = data[20:].decode() + else: + res["text"] = data[16:].decode() + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, {"extended": True})) + + elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: + res = {} + res["type"] = "CHAN" + res["channel_idx"] = data[1] + res["path_len"] = data[2] + res["txt_type"] = data[3] + res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little') + res["text"] = data[8:].decode() + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res)) + + elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "CHAN" + res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4 + res["channel_idx"] = data[4] + res["path_len"] = data[5] + res["txt_type"] = data[6] + res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little') + res["text"] = data[11:].decode() + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, {"extended": True})) + + elif packet_type_value == PacketType.CURRENT_TIME.value: + result = int.from_bytes(data[1:5], byteorder='little') + await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) + + elif packet_type_value == PacketType.NO_MORE_MSGS.value: + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, False)) + + elif packet_type_value == PacketType.CONTACT_SHARE.value: + result = "meshcore://" + data[1:].hex() + await self.dispatcher.dispatch(Event(EventType.CONTACT_SHARE, result)) + + elif packet_type_value == PacketType.BATTERY.value: + result = int.from_bytes(data[1:3], byteorder='little') + await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) + + elif packet_type_value == PacketType.DEVICE_INFO.value: + res = {} + res["fw ver"] = data[1] + if data[1] >= 3: + res["max_contacts"] = data[2] * 2 + res["max_channels"] = data[3] + res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little') + res["fw_build"] = data[8:20].decode().replace("\0","") + res["model"] = data[20:60].decode().replace("\0","") + res["ver"] = data[60:80].decode().replace("\0","") + await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) + + elif packet_type_value == PacketType.CLI_RESPONSE.value: + res = {} + res["response"] = data[1:].decode() + await self.dispatcher.dispatch(Event(EventType.CLI_RESPONSE, res)) + + # Push notifications + elif packet_type_value == PacketType.ADVERTISEMENT.value: + logger.debug("Advertisement received") + # todo: Read advertisement? + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, None)) + + elif packet_type_value == PacketType.PATH_UPDATE.value: + logger.debug("Code path update") + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, None)) + + elif packet_type_value == PacketType.ACK.value: + logger.debug("Received ACK") + await self.dispatcher.dispatch(Event(EventType.ACK, None)) + + elif packet_type_value == PacketType.MESSAGES_WAITING.value: + logger.debug("Msgs are waiting") + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, None)) + + elif packet_type_value == PacketType.RAW_DATA.value: + res = {} + res["SNR"] = data[1] / 4 + res["RSSI"] = data[2] + res["payload"] = data[4:].hex() + logger.debug("Received raw data") + print(res) + await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) + + elif packet_type_value == PacketType.LOGIN_SUCCESS.value: + logger.debug("Login success") + await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, None)) + + elif packet_type_value == PacketType.LOGIN_FAILED.value: + logger.debug("Login failed") + await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, None)) + + elif packet_type_value == PacketType.STATUS_RESPONSE.value: + res = {} + res["pubkey_pre"] = data[2:8].hex() + res["bat"] = int.from_bytes(data[8:10], byteorder='little') + res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little') + res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little') + res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True) + res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False) + res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False) + res["airtime"] = int.from_bytes(data[24:28], byteorder='little') + res["uptime"] = int.from_bytes(data[28:32], byteorder='little') + res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little') + res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little') + res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little') + res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little') + res["full_evts"] = int.from_bytes(data[48:50], byteorder='little') + res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4 + res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little') + res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little') + data_hex = data[8:].hex() + logger.debug(f"Status response: {data_hex}") + await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res)) + + elif packet_type_value == PacketType.LOG_DATA.value: + logger.debug("Received log data") + await self.dispatcher.dispatch(Event(EventType.LOG_DATA, data[1:].decode('utf-8', errors='replace'))) + + else: + logger.debug(f"Unhandled data received {data}") + logger.debug(f"Unhandled packet type: {packet_type_value}") \ No newline at end of file diff --git a/src/meshcore/serial_cx.py b/src/meshcore/serial_cx.py index f7e9b7b..9258a17 100644 --- a/src/meshcore/serial_cx.py +++ b/src/meshcore/serial_cx.py @@ -3,9 +3,11 @@ """ import asyncio import sys +import logging import serial_asyncio -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") class SerialConnection: def __init__(self, port, baudrate): @@ -22,21 +24,20 @@ class SerialConnection: def connection_made(self, transport): self.cx.transport = transport -# printerr('port opened') + logger.debug('port opened') transport.serial.rts = False # You can manipulate Serial object via transport def data_received(self, data): -# printerr('data received') self.cx.handle_rx(data) def connection_lost(self, exc): - printerr('port closed') + logger.info('port closed') def pause_writing(self): - printerr('pause writing') + logger.debug('pause writing') def resume_writing(self): - printerr('resume writing') + logger.debug('resume writing') async def connect(self): """ @@ -47,11 +48,11 @@ class SerialConnection: loop, lambda: self.MCSerialClientProtocol(self), self.port, baudrate=self.baudrate) - printerr("Serial Connexion started") + logger.info("Serial Connection started") return self.port - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, data: bytearray): headerlen = len(self.header) @@ -69,8 +70,8 @@ class SerialConnection: self.inframe = self.inframe + data else: self.inframe = self.inframe + data[:self.frame_size-framelen] - if not self.mc is None: - self.mc.handle_rx(self.inframe) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(self.inframe)) self.frame_started = False self.header = b"" self.inframe = b"" @@ -80,5 +81,5 @@ class SerialConnection: async def send(self, data): size = len(data) pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data -# printerr(f"sending pkt : {pkt}") - self.transport.write(pkt) + logger.debug(f"sending pkt : {pkt}") + self.transport.write(pkt) \ No newline at end of file diff --git a/src/meshcore/tcp_cx.py b/src/meshcore/tcp_cx.py index e998a18..60aafc6 100644 --- a/src/meshcore/tcp_cx.py +++ b/src/meshcore/tcp_cx.py @@ -3,8 +3,10 @@ """ import asyncio import sys +import logging -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") class TCPConnection: def __init__(self, host, port): @@ -22,15 +24,17 @@ class TCPConnection: def connection_made(self, transport): self.cx.transport = transport + logger.debug('connection established') def data_received(self, data): + logger.debug('data received') self.cx.handle_rx(data) def error_received(self, exc): - printerr(f'Error received: {exc}') + logger.error(f'Error received: {exc}') def connection_lost(self, exc): - printerr('The server closed the connection') + logger.info('The server closed the connection') async def connect(self): """ @@ -41,11 +45,11 @@ class TCPConnection: lambda: self.MCClientProtocol(self), self.host, self.port) - printerr("TCP Connexion started") + logger.info("TCP Connection started") return self.host - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, data: bytearray): headerlen = len(self.header) @@ -63,8 +67,8 @@ class TCPConnection: self.inframe = self.inframe + data else: self.inframe = self.inframe + data[:self.frame_size-framelen] - if not self.mc is None: - self.mc.handle_rx(self.inframe) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(self.inframe)) self.frame_started = False self.header = b"" self.inframe = b"" @@ -74,4 +78,5 @@ class TCPConnection: async def send(self, data): size = len(data) pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data + logger.debug(f"sending pkt : {pkt}") self.transport.write(pkt) From a5f1ec5c2634104cdcc285f86ab37e5477fdce06 Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Tue, 8 Apr 2025 22:56:16 -0700 Subject: [PATCH 2/8] Refactor to event system --- src/meshcore/ble_cx.py | 9 ++ src/meshcore/commands.py | 19 ++- src/meshcore/events.py | 14 +- src/meshcore/meshcore.py | 34 +++-- src/meshcore/meshcore_new.py | 249 ----------------------------------- src/meshcore/serial_cx.py | 7 +- src/meshcore/tcp_cx.py | 5 +- 7 files changed, 66 insertions(+), 271 deletions(-) delete mode 100644 src/meshcore/meshcore_new.py diff --git a/src/meshcore/ble_cx.py b/src/meshcore/ble_cx.py index 8f49a1c..d8f4875 100644 --- a/src/meshcore/ble_cx.py +++ b/src/meshcore/ble_cx.py @@ -62,6 +62,9 @@ class BLEConnection: await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx) nus = self.client.services.get_service(UART_SERVICE_UUID) + if nus is None: + logger.error("Could not find UART service") + return None self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID) logger.info("BLE Connection started") @@ -82,4 +85,10 @@ class BLEConnection: asyncio.create_task(self.reader.handle_rx(data)) async def send(self, data): + if not self.client: + logger.error("Client is not connected") + return False + if not self.rx_char: + logger.error("RX characteristic not found") + return False await self.client.write_gatt_char(self.rx_char, bytes(data), response=False) diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py index 856a82a..67f5f0b 100644 --- a/src/meshcore/commands.py +++ b/src/meshcore/commands.py @@ -26,10 +26,13 @@ def deprecated(func): class CommandHandler: - def __init__(self): + DEFAULT_TIMEOUT = 5.0 + + def __init__(self, default_timeout=None): self._sender_func = None self._reader = None self.dispatcher = None + self.default_timeout = default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT def set_connection(self, connection): async def sender(data): @@ -42,10 +45,13 @@ class CommandHandler: def set_dispatcher(self, dispatcher): self.dispatcher = dispatcher - async def send(self, data, expected_events=None, timeout=5.0): + async def send(self, data, expected_events=None, timeout=None): if not self.dispatcher: raise RuntimeError("Dispatcher not set, cannot send commands") + # Use the provided timeout or fall back to default_timeout + timeout = timeout if timeout is not None else self.default_timeout + if self._sender_func: logger.debug(f"Sending raw data: {data.hex() if isinstance(data, bytes) else data}") await self._sender_func(data) @@ -163,15 +169,20 @@ class CommandHandler: data = b"\x0f" + key return await self.send(data, [EventType.OK, EventType.ERROR]) - async def get_msg(self): + async def get_msg(self, timeout=1): logger.debug("Requesting pending messages") - return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], 1) + return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], timeout) async def send_login(self, dst, pwd): logger.debug(f"Sending login request to: {dst.hex() if isinstance(dst, bytes) else dst}") data = b"\x1a" + dst + pwd.encode("ascii") return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + async def send_logout(self, dst): + self.login_resp = asyncio.Future() + data = b"\x1d" + dst + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + async def send_statusreq(self, dst): logger.debug(f"Sending status request to: {dst.hex() if isinstance(dst, bytes) else dst}") data = b"\x1b" + dst diff --git a/src/meshcore/events.py b/src/meshcore/events.py index 0f3ddf5..8688a0e 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -40,7 +40,7 @@ class EventType(Enum): class Event: type: EventType payload: Any - attributes: Dict[str, Any] = None + attributes: Dict[str, Any] = {} def __post_init__(self): if self.attributes is None: @@ -64,7 +64,7 @@ class EventDispatcher: self.running = False self._task = None - def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], None]) -> Subscription: + def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]]) -> Subscription: subscription = Subscription(self, event_type, callback) self.subscriptions.append(subscription) return subscription @@ -83,7 +83,9 @@ class EventDispatcher: for subscription in self.subscriptions.copy(): if subscription.event_type is None or subscription.event_type == event.type: try: - await subscription.callback(event) + result = subscription.callback(event) + if asyncio.iscoroutine(result): + await result except Exception as e: print(f"Error in event handler: {e}") @@ -106,13 +108,13 @@ class EventDispatcher: pass self._task = None - async def wait_for_event(self, event_type: EventType, timeout: float = None) -> Optional[Event]: + async def wait_for_event(self, event_type: EventType, timeout: float | None = None) -> Optional[Event]: future = asyncio.Future() - async def event_handler(event: Event): + def event_handler(event: Event): if not future.done(): future.set_result(event) - + subscription = self.subscribe(event_type, event_handler) try: diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index 4ac579c..e498333 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -28,11 +28,11 @@ class MeshCore: """ Interface to a MeshCore device """ - def __init__(self, cx, debug=False): + def __init__(self, cx, debug=False, default_timeout=None): self.cx = cx self.dispatcher = EventDispatcher() self._reader = MessageReader(self.dispatcher) - self.commands = CommandHandler() + self.commands = CommandHandler(default_timeout=default_timeout) # Set up logger if debug: @@ -58,19 +58,19 @@ class MeshCore: cx.set_reader(self._reader) @classmethod - async def create_tcp(cls, host: str, port: int, debug: bool = False) -> 'MeshCore': + async def create_tcp(cls, host: str, port: int, debug: bool = False, default_timeout=None) -> 'MeshCore': """Create and connect a MeshCore instance using TCP connection""" from .tcp_cx import TCPConnection connection = TCPConnection(host, port) await connection.connect() - mc = cls(connection, debug=debug) + mc = cls(connection, debug=debug, default_timeout=default_timeout) await mc.connect() return mc @classmethod - async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False) -> 'MeshCore': + async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False, default_timeout=None) -> 'MeshCore': """Create and connect a MeshCore instance using serial connection""" from .serial_cx import SerialConnection import asyncio @@ -79,12 +79,12 @@ class MeshCore: await connection.connect() await asyncio.sleep(0.1) # Time for transport to establish - mc = cls(connection, debug=debug) + mc = cls(connection, debug=debug, default_timeout=default_timeout) await mc.connect() return mc @classmethod - async def create_ble(cls, address: Optional[str] = None, debug: bool = False) -> 'MeshCore': + async def create_ble(cls, address: Optional[str] = None, debug: bool = False, default_timeout=None) -> 'MeshCore': """Create and connect a MeshCore instance using BLE connection If address is None, it will scan for and connect to the first available MeshCore device. @@ -96,7 +96,7 @@ class MeshCore: if result is None: raise ConnectionError("Failed to connect to BLE device") - mc = cls(connection, debug=debug) + mc = cls(connection, debug=debug, default_timeout=default_timeout) await mc.connect() return mc @@ -142,11 +142,15 @@ class MeshCore: Args: event_type: Type of event to wait for, from EventType enum - timeout: Maximum time to wait in seconds, or None for no timeout + timeout: Maximum time to wait in seconds, or None to use default_timeout Returns: Event object or None if timeout """ + # Use the provided timeout or fall back to default_timeout + if timeout is None: + timeout = self.default_timeout + return await self.dispatcher.wait_for_event(event_type, timeout) def _setup_data_tracking(self): @@ -181,6 +185,16 @@ class MeshCore: """Get the current device time""" return self._time + @property + def default_timeout(self): + """Get the default timeout for commands""" + return self.commands.default_timeout + + @default_timeout.setter + def default_timeout(self, value): + """Set the default timeout for commands""" + self.commands.default_timeout = value + def get_contact_by_name(self, name): """ Find a contact by its name (adv_name field) @@ -275,7 +289,7 @@ class MeshCore: if hasattr(self, '_auto_fetch_task') and self._auto_fetch_task and not self._auto_fetch_task.done(): self._auto_fetch_task.cancel() try: - await self._auto_fetch_task + await self._auto_fetch_task # type: ignore except asyncio.CancelledError: pass self._auto_fetch_task = None diff --git a/src/meshcore/meshcore_new.py b/src/meshcore/meshcore_new.py deleted file mode 100644 index e48caca..0000000 --- a/src/meshcore/meshcore_new.py +++ /dev/null @@ -1,249 +0,0 @@ -import asyncio -from typing import Dict, Any, Optional, Callable - -from .events import EventDispatcher, MessageType, Event -from .reader import MessageReader -from .commands import CommandHandler, deprecated - - -class MeshCore: - def __init__(self, cx): - self.cx = cx - self.dispatcher = EventDispatcher() - self._reader = MessageReader(self.dispatcher) - self.commands = CommandHandler() - - # Set up connections - self.commands.set_connection(cx) - - # Initialize state - self.contacts = {} - self.self_info = {} - self.time = 0 - - # Set the message handler in the connection - cx.set_mc(self) - - async def connect(self): - # Start the event dispatcher - await self.dispatcher.start() - - # Start the command handler - await self.commands.start() - - # Send the initial app start - return await self.commands.send_appstart() - - async def disconnect(self): - # Stop the event dispatcher - await self.dispatcher.stop() - - # Stop the command handler - await self.commands.stop() - - # Internal method - called by the connection - def handle_rx(self, data: bytearray): - asyncio.create_task(self._reader.handle_rx(data)) - - # Expose subscribe/wait capabilities from the event system - def subscribe(self, message_type, callback): - return self.dispatcher.subscribe(message_type, callback) - - async def wait_for_event(self, message_type, timeout=None): - return await self.dispatcher.wait_for_event(message_type, timeout) - - # Legacy method implementations that delegate to the command handler - # using the deprecated decorator from commands.py - - @deprecated - async def send(self, data, timeout=5): - return await self.commands.send(data, timeout) - - @deprecated - async def send_only(self, data): - await self.commands.send_only(data) - - @deprecated - async def send_appstart(self): - return await self.commands.send_appstart() - - @deprecated - async def send_device_query(self): - return await self.commands.send_device_query() - - @deprecated - async def send_advert(self, flood=False): - return await self.commands.send_advert(flood) - - @deprecated - async def set_name(self, name): - return await self.commands.set_name(name) - - @deprecated - async def set_coords(self, lat, lon): - return await self.commands.set_coords(lat, lon) - - @deprecated - async def reboot(self): - return await self.commands.reboot() - - @deprecated - async def get_bat(self): - return await self.commands.get_bat() - - @deprecated - async def get_time(self): - time_result = await self.commands.get_time() - if isinstance(time_result, int): - self.time = time_result - return self.time - - @deprecated - async def set_time(self, val): - return await self.commands.set_time(val) - - @deprecated - async def set_tx_power(self, val): - return await self.commands.set_tx_power(val) - - @deprecated - async def set_radio(self, freq, bw, sf, cr): - return await self.commands.set_radio(freq, bw, sf, cr) - - @deprecated - async def set_tuning(self, rx_dly, af): - return await self.commands.set_tuning(rx_dly, af) - - @deprecated - async def set_devicepin(self, pin): - return await self.commands.set_devicepin(pin) - - @deprecated - async def get_contacts(self): - await self.commands.get_contacts() - contact_end = await self.dispatcher.wait_for_event(MessageType.CONTACT_END) - if contact_end: - self.contacts = contact_end.payload - return self.contacts - - @deprecated - async def ensure_contacts(self): - if not self.contacts: - await self.get_contacts() - - @deprecated - async def reset_path(self, key): - return await self.commands.reset_path(key) - - @deprecated - async def share_contact(self, key): - return await self.commands.share_contact(key) - - @deprecated - async def export_contact(self, key=b""): - return await self.commands.export_contact(key) - - @deprecated - async def remove_contact(self, key): - return await self.commands.remove_contact(key) - - @deprecated - async def set_out_path(self, contact, path): - contact["out_path"] = path - contact["out_path_len"] = -1 - contact["out_path_len"] = int(len(path) / 2) - - @deprecated - async def update_contact(self, contact): - out_path_hex = contact["out_path"] - out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0" - adv_name_hex = contact["adv_name"].encode().hex() - adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0" - data = b"\x09" \ - + bytes.fromhex(contact["public_key"])\ - + contact["type"].to_bytes(1)\ - + contact["flags"].to_bytes(1)\ - + contact["out_path_len"].to_bytes(1, 'little', signed=True)\ - + bytes.fromhex(out_path_hex)\ - + bytes.fromhex(adv_name_hex)\ - + contact["last_advert"].to_bytes(4, 'little')\ - + int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\ - + int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True) - return await self.send(data) - - @deprecated - async def send_login(self, dst, pwd): - await self.commands.send_login(dst, pwd) - login_event = await self.dispatcher.wait_for_event(MessageType.LOGIN_SUCCESS, 0.1) - if login_event: - return True - return await self.commands.send_login(dst, pwd) - - @deprecated - async def wait_login(self, timeout=5): - login_event = await self.dispatcher.wait_for_event(MessageType.LOGIN_SUCCESS, timeout) - if login_event: - return True - login_failed = await self.dispatcher.wait_for_event(MessageType.LOGIN_FAILED, 0) - if login_failed: - return False - return False - - @deprecated - async def send_statusreq(self, dst): - await self.commands.send_statusreq(dst) - - @deprecated - async def wait_status(self, timeout=5): - status_event = await self.dispatcher.wait_for_event(MessageType.STATUS_RESPONSE, timeout) - if status_event: - return status_event.payload - return False - - @deprecated - async def send_cmd(self, dst, cmd): - timestamp = await self.get_time() - return await self.commands.send_cmd(dst, cmd, timestamp.to_bytes(4, 'little')) - - @deprecated - async def send_msg(self, dst, msg): - timestamp = await self.get_time() - result = await self.commands.send_msg(dst, msg, timestamp.to_bytes(4, 'little')) - return result - - @deprecated - async def send_chan_msg(self, chan, msg): - timestamp = await self.get_time() - return await self.commands.send_chan_msg(chan, msg, timestamp.to_bytes(4, 'little')) - - @deprecated - async def get_msg(self): - await self.commands.get_msg() - - # Wait for any message type that could be received - message_types = [ - MessageType.CONTACT_MSG_RECV, - MessageType.CHANNEL_MSG_RECV, - MessageType.NO_MORE_MSGS - ] - - for msg_type in message_types: - event = await self.dispatcher.wait_for_event(msg_type, 0) - if event: - return event.payload - - return False - - @deprecated - async def wait_msg(self, timeout=-1): - msg_event = await self.dispatcher.wait_for_event(MessageType.MESSAGES_WAITING, timeout) - return msg_event is not None - - @deprecated - async def wait_ack(self, timeout=6): - ack_event = await self.dispatcher.wait_for_event(MessageType.ACK, timeout) - return ack_event is not None - - @deprecated - async def send_cli(self, cmd): - return await self.commands.send_cli(cmd) \ No newline at end of file diff --git a/src/meshcore/serial_cx.py b/src/meshcore/serial_cx.py index 9258a17..93c469a 100644 --- a/src/meshcore/serial_cx.py +++ b/src/meshcore/serial_cx.py @@ -15,6 +15,7 @@ class SerialConnection: self.baudrate = baudrate self.frame_started = False self.frame_size = 0 + self.transport = None self.header = b"" self.inframe = b"" @@ -25,7 +26,8 @@ class SerialConnection: def connection_made(self, transport): self.cx.transport = transport logger.debug('port opened') - transport.serial.rts = False # You can manipulate Serial object via transport + if isinstance(transport, serial_asyncio.SerialTransport) and transport.serial: + transport.serial.rts = False # You can manipulate Serial object via transport def data_received(self, data): self.cx.handle_rx(data) @@ -79,6 +81,9 @@ class SerialConnection: self.handle_rx(data[self.frame_size-framelen:]) async def send(self, data): + if not self.transport: + logger.error("Transport not connected, cannot send data") + return size = len(data) pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data logger.debug(f"sending pkt : {pkt}") diff --git a/src/meshcore/tcp_cx.py b/src/meshcore/tcp_cx.py index 60aafc6..c14d4a5 100644 --- a/src/meshcore/tcp_cx.py +++ b/src/meshcore/tcp_cx.py @@ -18,7 +18,7 @@ class TCPConnection: self.header = b"" self.inframe = b"" - class MCClientProtocol: + class MCClientProtocol(asyncio.Protocol): def __init__(self, cx): self.cx = cx @@ -76,6 +76,9 @@ class TCPConnection: self.handle_rx(data[self.frame_size-framelen:]) async def send(self, data): + if not self.transport: + logger.error("Transport not connected, cannot send data") + return size = len(data) pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data logger.debug(f"sending pkt : {pkt}") From ea2f17025f9315b2a0103d8dfe4eba16f35fc8f4 Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Sat, 12 Apr 2025 13:02:01 -0700 Subject: [PATCH 3/8] Add trace packet type --- examples/serial_trace.py | 70 ++++++++++++++++++++++++++++++++++++++++ src/meshcore/commands.py | 50 +++++++++++++++++++++++++++- src/meshcore/events.py | 5 +-- src/meshcore/packets.py | 3 +- src/meshcore/reader.py | 44 +++++++++++++++++++++++++ 5 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 examples/serial_trace.py diff --git a/examples/serial_trace.py b/examples/serial_trace.py new file mode 100644 index 0000000..cd609e9 --- /dev/null +++ b/examples/serial_trace.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +import asyncio +import argparse +from meshcore import MeshCore +from meshcore.events import EventType + +async def get_repeater_name(mc, hash_prefix): + """Find a contact by its 2-character hash prefix and return its name""" + # Ensure contacts are available + await mc.ensure_contacts() + + # Find contact with matching hash prefix + contact = mc.get_contact_by_key_prefix(hash_prefix) + if contact: + return contact.get("adv_name", f"Unknown ({hash_prefix})") + else: + return f"Unknown ({hash_prefix})" + +async def main(): + parser = argparse.ArgumentParser(description='MeshCore Serial Trace Example') + parser.add_argument('-p', '--port', required=True, help='Serial port path') + parser.add_argument('--path', type=str, help='Trace path (comma-separated hex values)') + args = parser.parse_args() + + try: + # Connect to device + print(f"Connecting to {args.port}...") + mc = await MeshCore.create_serial(args.port, 115200, debug=True) + + # Send trace packet + print(f"Sending trace packet...") + result = await mc.commands.send_trace(path=args.path) + + if result: + print("Trace packet sent successfully") + print("Waiting for trace response...") + + # Wait for a trace response with 15-second timeout + event = await mc.wait_for_event(EventType.TRACE_DATA, timeout=15) + + if event: + trace = event.payload + print(f"Trace data received:") + print(f" Tag: {trace['tag']}") + print(f" Flags: {trace.get('flags', 0)}") + print(f" Path Length: {trace.get('path_len', 0)}") + + if trace.get('path'): + print(f" Path ({len(trace['path'])} nodes):") + + # Process nodes with hash (repeaters) + for i, node in enumerate(trace['path']): + if 'hash' in node: + # Look up repeater name + repeater_name = await get_repeater_name(mc, node['hash']) + print(f" Node {i+1}: {repeater_name}, SNR={node['snr']:.1f} dB") + else: + print(f" Node {i+1}: SNR={node['snr']:.1f} dB (final node)") + else: + print("No trace response received within timeout") + else: + print("Failed to send trace packet") + + await mc.disconnect() + + except Exception as e: + print(f"Error: {e}") + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py index 67f5f0b..293c0e6 100644 --- a/src/meshcore/commands.py +++ b/src/meshcore/commands.py @@ -224,4 +224,52 @@ class CommandHandler: async def send_cli(self, cmd): logger.debug(f"Sending CLI command: {cmd}") data = b"\x32" + cmd.encode('ascii') - return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR]) \ No newline at end of file + return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR]) + + async def send_trace(self, auth_code=0, tag=None, flags=0, path=None): + """ + Send a trace packet to test routing through specific repeaters + + Args: + auth_code: 32-bit authentication code (default: 0) + tag: 32-bit integer to identify this trace (default: random) + flags: 8-bit flags field (default: 0) + path: Optional string with comma-separated hex values representing repeater pubkeys (e.g. "23,5f,3a") + or a bytes/bytearray object with the raw path data + + Returns: + Dictionary with sent status, tag, and estimated timeout in milliseconds, or False if command failed + """ + # Generate random tag if not provided + if tag is None: + import random + tag = random.randint(1, 0xFFFFFFFF) + + logger.debug(f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path}") + + # Prepare the command packet: CMD(1) + tag(4) + auth_code(4) + flags(1) + [path] + cmd_data = bytearray([36]) # CMD_SEND_TRACE_PATH + cmd_data.extend(tag.to_bytes(4, 'little')) + cmd_data.extend(auth_code.to_bytes(4, 'little')) + cmd_data.append(flags) + + # Process path if provided + if path: + if isinstance(path, str): + # Convert comma-separated hex values to bytes + try: + path_bytes = bytearray() + for hex_val in path.split(','): + hex_val = hex_val.strip() + path_bytes.append(int(hex_val, 16)) + cmd_data.extend(path_bytes) + except ValueError as e: + logger.error(f"Invalid path format: {e}") + return False + elif isinstance(path, (bytes, bytearray)): + cmd_data.extend(path) + else: + logger.error(f"Unsupported path type: {type(path)}") + return False + + return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR]) \ No newline at end of file diff --git a/src/meshcore/events.py b/src/meshcore/events.py index 8688a0e..d24991c 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -2,7 +2,7 @@ from enum import Enum import logging from typing import Any, Dict, Optional, Callable, List, Union import asyncio -from dataclasses import dataclass +from dataclasses import dataclass, field logger = logging.getLogger("meshcore") @@ -30,6 +30,7 @@ class EventType(Enum): LOGIN_FAILED = "login_failed" STATUS_RESPONSE = "status_response" LOG_DATA = "log_data" + TRACE_DATA = "trace_data" # Command response types OK = "command_ok" @@ -40,7 +41,7 @@ class EventType(Enum): class Event: type: EventType payload: Any - attributes: Dict[str, Any] = {} + attributes: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): if self.attributes is None: diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py index 2539982..6797489 100644 --- a/src/meshcore/packets.py +++ b/src/meshcore/packets.py @@ -27,4 +27,5 @@ class PacketType(Enum): LOGIN_SUCCESS = 0x85 LOGIN_FAILED = 0x86 STATUS_RESPONSE = 0x87 - LOG_DATA = 0x88 \ No newline at end of file + LOG_DATA = 0x88 + TRACE_DATA = 0x89 \ No newline at end of file diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 761bdbb..eb795e9 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -230,6 +230,50 @@ class MessageReader: logger.debug("Received log data") await self.dispatcher.dispatch(Event(EventType.LOG_DATA, data[1:].decode('utf-8', errors='replace'))) + elif packet_type_value == PacketType.TRACE_DATA.value: + logger.debug(f"Received trace data: {data.hex()}") + res = {} + + # According to the source, format is: + # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr + + reserved = data[1] + path_len = data[2] + flags = data[3] + tag = int.from_bytes(data[4:8], byteorder='little') + auth_code = int.from_bytes(data[8:12], byteorder='little') + + # Initialize result + res["tag"] = tag + res["auth"] = auth_code + res["flags"] = flags + res["path_len"] = path_len + + # Process path as array of objects with hash and SNR + path_nodes = [] + + if path_len > 0 and len(data) >= 12 + path_len*2 + 1: + # Extract path with hash and SNR pairs + for i in range(path_len): + node = { + "hash": f"{data[12+i]:02x}", + # SNR is stored as a signed byte representing SNR * 4 + "snr": (data[12+path_len+i] if data[12+path_len+i] < 128 else data[12+path_len+i] - 256) / 4.0 + } + path_nodes.append(node) + + # Add the final node (our device) with its SNR + final_snr_byte = data[12+path_len*2] + final_snr = (final_snr_byte if final_snr_byte < 128 else final_snr_byte - 256) / 4.0 + path_nodes.append({ + "snr": final_snr + }) + + res["path"] = path_nodes + + logger.debug(f"Parsed trace data: {res}") + await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res)) + else: logger.debug(f"Unhandled data received {data}") logger.debug(f"Unhandled packet type: {packet_type_value}") \ No newline at end of file From 55af0b2e61f1d3680b3619aeeacacded6c43a101 Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Sat, 12 Apr 2025 13:35:56 -0700 Subject: [PATCH 4/8] Improve log tracking event --- examples/rf_packet_monitor.py | 51 +++++++++++++++++++++++++++++++++++ src/meshcore/events.py | 1 + src/meshcore/reader.py | 30 +++++++++++++++++++-- 3 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 examples/rf_packet_monitor.py diff --git a/examples/rf_packet_monitor.py b/examples/rf_packet_monitor.py new file mode 100644 index 0000000..47b6408 --- /dev/null +++ b/examples/rf_packet_monitor.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +import asyncio +import argparse +from meshcore import MeshCore +from meshcore.events import EventType + +async def main(): + parser = argparse.ArgumentParser(description='MeshCore RF Packet Monitor') + parser.add_argument('-p', '--port', required=True, help='Serial port path') + args = parser.parse_args() + + try: + # Connect to device + print(f"Connecting to {args.port}...") + mc = await MeshCore.create_serial(args.port, 115200) + + def handle_rf_packet(event): + packet = event.payload + if isinstance(packet, dict): + print(f"Raw RF packet received:") + if 'snr' in packet: + print(f" SNR: {packet['snr']:.1f} dB") + if 'rssi' in packet: + print(f" RSSI: {packet['rssi']} dBm") + if 'payload_length' in packet: + print(f" Payload length: {packet['payload_length']} bytes") + if 'payload' in packet: + print(f" Payload (hex): {packet['payload']}") + else: + print(f"RF packet received: {packet}") + + # Subscribe to RF log data + subscription = mc.subscribe(EventType.RX_LOG_DATA, handle_rf_packet) + + print("Waiting for log data (press Ctrl+C to exit)...") + try: + # Keep the script running to receive logs + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + print("\nExiting...") + + # Clean up + mc.unsubscribe(subscription) + await mc.disconnect() + + except Exception as e: + print(f"Error: {e}") + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/src/meshcore/events.py b/src/meshcore/events.py index d24991c..cd28d63 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -31,6 +31,7 @@ class EventType(Enum): STATUS_RESPONSE = "status_response" LOG_DATA = "log_data" TRACE_DATA = "trace_data" + RX_LOG_DATA = "rx_log_data" # Command response types OK = "command_ok" diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index eb795e9..c779500 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -227,8 +227,34 @@ class MessageReader: await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res)) elif packet_type_value == PacketType.LOG_DATA.value: - logger.debug("Received log data") - await self.dispatcher.dispatch(Event(EventType.LOG_DATA, data[1:].decode('utf-8', errors='replace'))) + logger.debug(f"Received RF log data: {data.hex()}") + + # Parse as raw RX data + log_data = { + "raw_hex": data[1:].hex() + } + + # First byte is SNR (signed byte, multiplied by 4) + if len(data) > 1: + snr_byte = data[1] + # Convert to signed value + snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 + log_data["snr"] = snr + + # Second byte is RSSI (signed byte) + if len(data) > 2: + rssi_byte = data[2] + # Convert to signed value + rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 + log_data["rssi"] = rssi + + # Remaining bytes are the raw data payload + if len(data) > 3: + log_data["payload"] = data[3:].hex() + log_data["payload_length"] = len(data) - 3 + + # Dispatch as RF log data + await self.dispatcher.dispatch(Event(EventType.RX_LOG_DATA, log_data)) elif packet_type_value == PacketType.TRACE_DATA.value: logger.debug(f"Received trace data: {data.hex()}") From b700ae75e96a3bd98c71498783e4439bd47edb7f Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Sat, 12 Apr 2025 13:53:37 -0700 Subject: [PATCH 5/8] Fix example --- examples/serial_msg.py | 8 +++++--- src/meshcore/meshcore.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/examples/serial_msg.py b/examples/serial_msg.py index f532c38..71f4d5d 100755 --- a/examples/serial_msg.py +++ b/examples/serial_msg.py @@ -3,16 +3,18 @@ import asyncio from meshcore import MeshCore -PORT = "/dev/ttyUSB0" +PORT = "/dev/tty.usbserial-583A0069501" BAUDRATE = 115200 -DEST = "mchome" +DEST = "🦄" MSG = "hello from serial" async def main () : mc = await MeshCore.create_serial(PORT, BAUDRATE) await mc.ensure_contacts() - await mc.commands.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6], MSG) + contact = mc.get_contact_by_name(DEST) + await mc.commands.send_msg(bytes.fromhex(contact["public_key"])[0:6], MSG) + print ("Message sent ... awaiting") asyncio.run(main()) diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index e498333..2f0a030 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -208,7 +208,7 @@ class MeshCore: if not self._contacts: return None - for contact_id, contact in self._contacts.items(): + for _, contact in self._contacts.items(): if contact.get("adv_name", "").lower() == name.lower(): return contact From cbfc940de6554d75d548a0c8ddfc51600aec9cd2 Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Sat, 12 Apr 2025 13:53:37 -0700 Subject: [PATCH 6/8] Fix example --- examples/serial_msg.py | 3 +++ src/meshcore/meshcore.py | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/examples/serial_msg.py b/examples/serial_msg.py index 71f4d5d..522a68f 100755 --- a/examples/serial_msg.py +++ b/examples/serial_msg.py @@ -13,6 +13,9 @@ async def main () : await mc.ensure_contacts() contact = mc.get_contact_by_name(DEST) + if not contact: + print(f"Contact {DEST} not found") + return await mc.commands.send_msg(bytes.fromhex(contact["public_key"])[0:6], MSG) print ("Message sent ... awaiting") diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index 2f0a030..eedcb12 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -2,7 +2,7 @@ import asyncio import functools import warnings import logging -from typing import Optional +from typing import Optional, Dict, Any from .events import EventDispatcher, EventType from .reader import MessageReader @@ -195,7 +195,7 @@ class MeshCore: """Set the default timeout for commands""" self.commands.default_timeout = value - def get_contact_by_name(self, name): + def get_contact_by_name(self, name) -> Optional[Dict[str, Any]]: """ Find a contact by its name (adv_name field) @@ -214,7 +214,7 @@ class MeshCore: return None - def get_contact_by_key_prefix(self, prefix): + def get_contact_by_key_prefix(self, prefix) -> Optional[Dict[str, Any]]: """ Find a contact by its public key prefix From 478bcd92c1b6c31d5c3800a47800a4e3cc987bdf Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Sat, 12 Apr 2025 14:00:51 -0700 Subject: [PATCH 7/8] remove some dead code --- src/meshcore/ble_cx.py | 1 - src/meshcore/commands.py | 22 +--------------------- src/meshcore/meshcore.py | 14 -------------- src/meshcore/serial_cx.py | 1 - src/meshcore/tcp_cx.py | 1 - 5 files changed, 1 insertion(+), 38 deletions(-) diff --git a/src/meshcore/ble_cx.py b/src/meshcore/ble_cx.py index d8f4875..fbdca89 100644 --- a/src/meshcore/ble_cx.py +++ b/src/meshcore/ble_cx.py @@ -2,7 +2,6 @@ mccli.py : CLI interface to MeschCore BLE companion app """ import asyncio -import sys import logging # Get logger diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py index 293c0e6..018cdbf 100644 --- a/src/meshcore/commands.py +++ b/src/meshcore/commands.py @@ -1,30 +1,10 @@ -import functools import asyncio import logging -import warnings -import time -from typing import Any, Callable, Awaitable, Optional, Union +from typing import Any from .events import EventType logger = logging.getLogger("meshcore") -class CommandError(Exception): - def __init__(self, details=None): - self.details = details - super().__init__(f"Command error: {details}") - -def deprecated(func): - @functools.wraps(func) - async def wrapper(*args, **kwargs): - warnings.warn( - f"Method {func.__name__} is deprecated. Use commands.{func.__name__} instead.", - DeprecationWarning, - stacklevel=2 - ) - return await func(*args, **kwargs) - return wrapper - - class CommandHandler: DEFAULT_TIMEOUT = 5.0 diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index eedcb12..bd26b53 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -1,6 +1,4 @@ import asyncio -import functools -import warnings import logging from typing import Optional, Dict, Any @@ -12,18 +10,6 @@ from .commands import CommandHandler # Setup default logger logger = logging.getLogger("meshcore") - -def deprecated(func): - @functools.wraps(func) - async def wrapper(*args, **kwargs): - warnings.warn( - f"Method {func.__name__} is deprecated. Use commands.{func.__name__} instead.", - DeprecationWarning, - stacklevel=2 - ) - return await func(*args, **kwargs) - return wrapper - class MeshCore: """ Interface to a MeshCore device diff --git a/src/meshcore/serial_cx.py b/src/meshcore/serial_cx.py index 93c469a..b7b07c6 100644 --- a/src/meshcore/serial_cx.py +++ b/src/meshcore/serial_cx.py @@ -2,7 +2,6 @@ mccli.py : CLI interface to MeschCore BLE companion app """ import asyncio -import sys import logging import serial_asyncio diff --git a/src/meshcore/tcp_cx.py b/src/meshcore/tcp_cx.py index c14d4a5..25c38c8 100644 --- a/src/meshcore/tcp_cx.py +++ b/src/meshcore/tcp_cx.py @@ -2,7 +2,6 @@ mccli.py : CLI interface to MeschCore BLE companion app """ import asyncio -import sys import logging # Get logger From 6dc87bafbb688975afb9400c79fe93aba9ff57a6 Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Sun, 13 Apr 2025 12:03:47 -0700 Subject: [PATCH 8/8] Add event filtering to support ACK tracking --- README.md | 41 ++++++++++++ examples/serial_msg.py | 74 ++++++++++++++++----- examples/serial_trace.py | 22 +++++-- examples/tcp_mchome_msg.py | 2 +- examples/tcp_mchome_readmsgs.py | 8 ++- src/meshcore/commands.py | 34 +++++++--- src/meshcore/events.py | 75 ++++++++++++++++++--- src/meshcore/meshcore.py | 36 +++++++--- src/meshcore/reader.py | 113 ++++++++++++++++++++++++-------- 9 files changed, 325 insertions(+), 80 deletions(-) diff --git a/README.md b/README.md index b4f19cb..bea2c49 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,47 @@ meshcore.subscribe(EventType.ADVERTISEMENT, handle_advert) meshcore.unsubscribe(subscription) ``` +#### Filtering Events by Attributes + +Filter events based on their attributes to handle only specific ones: + +```python +# Subscribe only to messages from a specific contact +async def handle_specific_contact_messages(event): + print(f"Message from Alice: {event.payload['text']}") + +contact = meshcore.get_contact_by_name("Alice") +if contact: + alice_subscription = meshcore.subscribe( + EventType.CONTACT_MSG_RECV, + handle_specific_contact_messages, + attribute_filters={"pubkey_prefix": contact["public_key"][:12]} + ) + +# Send a message and wait for its specific acknowledgment +async def send_and_confirm_message(meshcore, dst_key, message): + # Send the message and get information about the sent message + sent_result = await meshcore.commands.send_msg(dst_key, message) + + # Extract the expected acknowledgment code from the message sent event + expected_ack = sent_result["expected_ack"].hex() + print(f"Message sent, waiting for ack with code: {expected_ack}") + + # Wait specifically for this acknowledgment + result = await meshcore.wait_for_event( + EventType.ACK, + attribute_filters={"code": expected_ack}, + timeout=10.0 + ) + + if result: + print("Message confirmed delivered!") + return True + else: + print("Message delivery confirmation timed out") + return False +``` + ### Hybrid Approach (Commands + Events) Combine command-based and event-based styles: diff --git a/examples/serial_msg.py b/examples/serial_msg.py index 522a68f..cb21a10 100755 --- a/examples/serial_msg.py +++ b/examples/serial_msg.py @@ -1,23 +1,67 @@ #!/usr/bin/python +""" +Example of sending a message and waiting for its specific acknowledgment +using event attribute filtering. +""" import asyncio -from meshcore import MeshCore +import argparse +from meshcore import MeshCore, EventType -PORT = "/dev/tty.usbserial-583A0069501" -BAUDRATE = 115200 -DEST = "🦄" -MSG = "hello from serial" +async def main(): + # Parse command line arguments + parser = argparse.ArgumentParser(description="Send a message and wait for ACK") + parser.add_argument("-p", "--port", required=True, help="Serial port") + parser.add_argument("-b", "--baudrate", type=int, default=115200, help="Baud rate") + parser.add_argument("-d", "--dest", default="🦄", help="Destination contact name") + parser.add_argument("-m", "--message", default="hello from serial", help="Message to send") + parser.add_argument("--timeout", type=float, default=30.0, help="ACK timeout in seconds") + args = parser.parse_args() -async def main () : - mc = await MeshCore.create_serial(PORT, BAUDRATE) + # Connect to the device + mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True) - await mc.ensure_contacts() - contact = mc.get_contact_by_name(DEST) - if not contact: - print(f"Contact {DEST} not found") - return - await mc.commands.send_msg(bytes.fromhex(contact["public_key"])[0:6], MSG) - print ("Message sent ... awaiting") + try: + # Make sure we have contacts loaded + await mc.ensure_contacts() + + # Find the contact by name + contact = mc.get_contact_by_name(args.dest) + if not contact: + print(f"Contact '{args.dest}' not found. Available contacts:") + for name, c in mc.contacts.items(): + print(f"- {c.get('adv_name', 'Unknown')}") + return + + print(f"Found contact: {contact.get('adv_name')} ({contact['public_key'][:12]}...)") + + # Send the message and get the MSG_SENT event + print(f"Sending message: '{args.message}'") + send_result = await mc.commands.send_msg( + bytes.fromhex(contact["public_key"])[0:6], + args.message + ) + + # Extract the expected ACK code + expected_ack = send_result["expected_ack"].hex() + print(f"Message sent, waiting for ACK with code: {expected_ack}") + + # Wait for the specific ACK that matches our message + ack_event = await mc.wait_for_event( + EventType.ACK, + attribute_filters={"code": expected_ack}, + timeout=args.timeout + ) + + if ack_event: + print(f"✅ Message confirmed delivered! (ACK received)") + else: + print(f"⚠️ Timed out waiting for ACK after {args.timeout} seconds") + + finally: + # Always disconnect + await mc.disconnect() -asyncio.run(main()) +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/serial_trace.py b/examples/serial_trace.py index cd609e9..b169edd 100644 --- a/examples/serial_trace.py +++ b/examples/serial_trace.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import asyncio import argparse +import random from meshcore import MeshCore from meshcore.events import EventType @@ -29,14 +30,23 @@ async def main(): # Send trace packet print(f"Sending trace packet...") - result = await mc.commands.send_trace(path=args.path) + # Send trace with a path if provided + tag = random.randint(1, 0xFFFFFFFF) + result = await mc.commands.send_trace(path=args.path, tag=tag) - if result: - print("Trace packet sent successfully") - print("Waiting for trace response...") + # Check if the result has a success indicator + if result.get("success") == False: + print(f"Failed to send trace packet: {result.get('reason', 'unknown error')}") + elif result: + print(f"Trace packet sent successfully with tag={tag}") + print("Waiting for trace response matching our tag...") - # Wait for a trace response with 15-second timeout - event = await mc.wait_for_event(EventType.TRACE_DATA, timeout=15) + # Wait for a trace response with our specific tag + event = await mc.wait_for_event( + EventType.TRACE_DATA, + attribute_filters={"tag": tag}, + timeout=15 + ) if event: trace = event.payload diff --git a/examples/tcp_mchome_msg.py b/examples/tcp_mchome_msg.py index 45f36af..ec44d38 100755 --- a/examples/tcp_mchome_msg.py +++ b/examples/tcp_mchome_msg.py @@ -17,6 +17,6 @@ async def main () : await mc.connect() await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6],MSG) + await mc.commands.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6],MSG) asyncio.run(main()) diff --git a/examples/tcp_mchome_readmsgs.py b/examples/tcp_mchome_readmsgs.py index 4cafca9..59b2cde 100755 --- a/examples/tcp_mchome_readmsgs.py +++ b/examples/tcp_mchome_readmsgs.py @@ -18,8 +18,10 @@ async def main () : res = True while res: - res = await mc.commands.get_msg() - if res : - print (res) + result = await mc.commands.get_msg() + if result.get("success") == False: + res = False + print("No more messages") + print (result) asyncio.run(main()) diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py index 018cdbf..56e0f4b 100644 --- a/src/meshcore/commands.py +++ b/src/meshcore/commands.py @@ -1,8 +1,9 @@ import asyncio import logging -from typing import Any +from typing import Any, Dict from .events import EventType - +import random + logger = logging.getLogger("meshcore") class CommandHandler: @@ -25,7 +26,18 @@ class CommandHandler: def set_dispatcher(self, dispatcher): self.dispatcher = dispatcher - async def send(self, data, expected_events=None, timeout=None): + async def send(self, data, expected_events=None, timeout=None) -> Dict[str, Any]: + """ + Send a command and wait for expected event responses. + + Args: + data: The data to send + expected_events: EventType or list of EventTypes to wait for + timeout: Timeout in seconds, or None to use default_timeout + + Returns: + Dict[str, Any]: Dictionary containing the response data or status + """ if not self.dispatcher: raise RuntimeError("Dispatcher not set, cannot send commands") @@ -44,17 +56,18 @@ class CommandHandler: logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") for event_type in expected_events: - event = await self.dispatcher.wait_for_event(event_type, timeout) + # don't apply any filters for now, might change later + event = await self.dispatcher.wait_for_event(event_type, {}, timeout) if event: return event.payload - return False + return {"success": False, "reason": "no_event_received"} except asyncio.TimeoutError: logger.debug(f"Command timed out {data}") - return False + return {"success": False, "reason": "timeout"} except Exception as e: logger.debug(f"Command error: {e}") return {"error": str(e)} - return True + return {"success": True} async def send_appstart(self): @@ -222,8 +235,9 @@ class CommandHandler: """ # Generate random tag if not provided if tag is None: - import random tag = random.randint(1, 0xFFFFFFFF) + if auth_code is None: + auth_code = random.randint(1, 0xFFFFFFFF) logger.debug(f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path}") @@ -245,11 +259,11 @@ class CommandHandler: cmd_data.extend(path_bytes) except ValueError as e: logger.error(f"Invalid path format: {e}") - return False + return { "success": False, "reason": "invalid_path_format" } elif isinstance(path, (bytes, bytearray)): cmd_data.extend(path) else: logger.error(f"Unsupported path type: {type(path)}") - return False + return { "success": False, "reason": "unsupported_path_type" } return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR]) \ No newline at end of file diff --git a/src/meshcore/events.py b/src/meshcore/events.py index cd28d63..8a72664 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -1,5 +1,6 @@ from enum import Enum import logging +from math import log from typing import Any, Dict, Optional, Callable, List, Union import asyncio from dataclasses import dataclass, field @@ -44,16 +45,31 @@ class Event: payload: Any attributes: Dict[str, Any] = field(default_factory=dict) - def __post_init__(self): - if self.attributes is None: - self.attributes = {} + def __init__(self, type: EventType, payload: Any, attributes: Optional[Dict[str, Any]] = None, **kwargs): + """ + Initialize an Event + + Args: + type: The event type + payload: The event payload + attributes: Dictionary of event attributes for filtering + **kwargs: Additional attributes to add to the attributes dictionary + """ + self.type = type + self.payload = payload + self.attributes = attributes or {} + + # Add any keyword arguments to the attributes dictionary + if kwargs: + self.attributes.update(kwargs) class Subscription: - def __init__(self, dispatcher, event_type, callback): + def __init__(self, dispatcher, event_type, callback, attribute_filters=None): self.dispatcher = dispatcher self.event_type = event_type self.callback = callback + self.attribute_filters = attribute_filters or {} def unsubscribe(self): self.dispatcher._remove_subscription(self) @@ -66,8 +82,25 @@ class EventDispatcher: self.running = False self._task = None - def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]]) -> Subscription: - subscription = Subscription(self, event_type, callback) + def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]], + attribute_filters: Optional[Dict[str, Any]] = None) -> Subscription: + """ + Subscribe to events with optional attribute filtering. + + Parameters: + ----------- + event_type : EventType or None + The type of event to subscribe to, or None to subscribe to all events. + callback : Callable + Function to call when a matching event is received. + attribute_filters : Dict[str, Any], optional + Dictionary of attribute key-value pairs that must match for the event to trigger the callback. + + Returns: + -------- + Subscription object that can be used to unsubscribe. + """ + subscription = Subscription(self, event_type, callback, attribute_filters) self.subscriptions.append(subscription) return subscription @@ -81,9 +114,16 @@ class EventDispatcher: async def _process_events(self): while self.running: event = await self.queue.get() - logger.debug(f"Dispatching event: {event.type}, {event.payload}") + logger.debug(f"Dispatching event: {event.type}, {event.payload}, {event.attributes}") for subscription in self.subscriptions.copy(): + # Check if event type matches if subscription.event_type is None or subscription.event_type == event.type: + # Check if all attribute filters match + if subscription.attribute_filters: + # Skip if any filter doesn't match the corresponding event attribute + if not all(event.attributes.get(key) == value + for key, value in subscription.attribute_filters.items()): + continue try: result = subscription.callback(event) if asyncio.iscoroutine(result): @@ -110,14 +150,31 @@ class EventDispatcher: pass self._task = None - async def wait_for_event(self, event_type: EventType, timeout: float | None = None) -> Optional[Event]: + async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None, + timeout: float | None = None) -> Optional[Event]: + """ + Wait for an event of the specified type that matches all attribute filters. + + Parameters: + ----------- + event_type : EventType + The type of event to wait for. + attribute_filters : Dict[str, Any], optional + Dictionary of attribute key-value pairs that must match for the event to be returned. + timeout : float | None, optional + Maximum time to wait for the event, in seconds. + + Returns: + -------- + The matched event, or None if timeout occurred before a matching event. + """ future = asyncio.Future() def event_handler(event: Event): if not future.done(): future.set_result(event) - subscription = self.subscribe(event_type, event_handler) + subscription = self.subscribe(event_type, event_handler, attribute_filters) try: return await asyncio.wait_for(future, timeout) diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index bd26b53..9d065f0 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -1,6 +1,6 @@ import asyncio import logging -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Union from .events import EventDispatcher, EventType from .reader import MessageReader @@ -99,18 +99,27 @@ class MeshCore: self.dispatcher.running = False self.dispatcher._task.cancel() - def subscribe(self, event_type: EventType, callback): + def subscribe(self, event_type: EventType, callback, attribute_filters: Optional[Dict[str, Any]] = None): """ - Subscribe to events using EventType enum + Subscribe to events using EventType enum with optional attribute filtering Args: event_type: Type of event to subscribe to, from EventType enum callback: Async function to call when event occurs + attribute_filters: Dictionary of attribute key-value pairs that must match for the event to trigger the callback Returns: Subscription object that can be used to unsubscribe + + Example: + # Subscribe to ACK events where the 'code' attribute has a specific value + mc.subscribe( + EventType.ACK, + my_callback_function, + attribute_filters={'code': 'SUCCESS'} + ) """ - return self.dispatcher.subscribe(event_type, callback) + return self.dispatcher.subscribe(event_type, callback, attribute_filters) def unsubscribe(self, subscription): """ @@ -122,22 +131,31 @@ class MeshCore: if subscription: subscription.unsubscribe() - async def wait_for_event(self, event_type: EventType, timeout=None): + async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None, timeout=None): """ - Wait for an event using EventType enum + Wait for an event using EventType enum with optional attribute filtering Args: event_type: Type of event to wait for, from EventType enum + attribute_filters: Dictionary of attribute key-value pairs to match against the event timeout: Maximum time to wait in seconds, or None to use default_timeout Returns: Event object or None if timeout + + Example: + # Wait for an ACK event where the 'code' attribute has a specific value + await mc.wait_for_event( + EventType.ACK, + attribute_filters={'code': 'SUCCESS'}, + timeout=30.0 + ) """ # Use the provided timeout or fall back to default_timeout if timeout is None: timeout = self.default_timeout - return await self.dispatcher.wait_for_event(event_type, timeout) + return await self.dispatcher.wait_for_event(event_type, attribute_filters, timeout) def _setup_data_tracking(self): """Set up event subscriptions to track data internally""" @@ -148,7 +166,7 @@ class MeshCore: self._self_info = event.payload async def _update_time(event): - self._time = event.payload + self._time = event.payload.get("time", 0) # Subscribe to events to update internal state self.subscribe(EventType.CONTACTS, _update_contacts) @@ -244,7 +262,7 @@ class MeshCore: result = await self.commands.get_msg() # If we got a NO_MORE_MSGS event or an error, stop fetching - if not result or isinstance(result, dict) and "error" in result: + if not result.get("success") or isinstance(result, dict) and "error" in result: break # Small delay to prevent overwhelming the device diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index c779500..ebcdc03 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -22,19 +22,18 @@ class MessageReader: # Handle command responses if packet_type_value == PacketType.OK.value: - result = None + result: Dict[str, Any] = {"success": True} if len(data) == 5: - result = int.from_bytes(data[1:5], byteorder='little') - else: - result = True + result["value"] = int.from_bytes(data[1:5], byteorder='little') # Dispatch event for the OK response await self.dispatcher.dispatch(Event(EventType.OK, result)) elif packet_type_value == PacketType.ERROR.value: - result = False if len(data) > 1: - result = {"error_code": data[1]} + result = {"success": False, "error_code": data[1]} + else: + result = {"success": False} # Dispatch event for the ERROR response await self.dispatcher.dispatch(Event(EventType.ERROR, result)) @@ -84,7 +83,13 @@ class MessageReader: res["type"] = data[1] res["expected_ack"] = bytes(data[2:6]) res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') - await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res)) + + attributes = { + "type": res["type"], + "expected_ack": res["expected_ack"].hex() + } + + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: res = {} @@ -98,7 +103,13 @@ class MessageReader: res["text"] = data[17:].decode() else: res["text"] = data[13:].decode() - await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res)) + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, attributes)) elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} @@ -113,7 +124,13 @@ class MessageReader: res["text"] = data[20:].decode() else: res["text"] = data[16:].decode() - await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, {"extended": True})) + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, attributes)) elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: res = {} @@ -123,7 +140,13 @@ class MessageReader: res["txt_type"] = data[3] res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little') res["text"] = data[8:].decode() - await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res)) + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, attributes)) elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} @@ -134,21 +157,31 @@ class MessageReader: res["txt_type"] = data[6] res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little') res["text"] = data[11:].decode() - await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, {"extended": True})) + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, attributes)) elif packet_type_value == PacketType.CURRENT_TIME.value: - result = int.from_bytes(data[1:5], byteorder='little') + time_value = int.from_bytes(data[1:5], byteorder='little') + result = {"time": time_value} await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) elif packet_type_value == PacketType.NO_MORE_MSGS.value: - await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, False)) + result = {"messages_available": False} + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) elif packet_type_value == PacketType.CONTACT_SHARE.value: - result = "meshcore://" + data[1:].hex() + contact_uri = "meshcore://" + data[1:].hex() + result = {"uri": contact_uri} await self.dispatcher.dispatch(Event(EventType.CONTACT_SHARE, result)) elif packet_type_value == PacketType.BATTERY.value: - result = int.from_bytes(data[1:3], byteorder='little') + battery_level = int.from_bytes(data[1:3], byteorder='little') + result = {"level": battery_level} await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) elif packet_type_value == PacketType.DEVICE_INFO.value: @@ -171,20 +204,30 @@ class MessageReader: # Push notifications elif packet_type_value == PacketType.ADVERTISEMENT.value: logger.debug("Advertisement received") - # todo: Read advertisement? - await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, None)) + # TODO: Read advertisement attributes + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, {})) elif packet_type_value == PacketType.PATH_UPDATE.value: logger.debug("Code path update") - await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, None)) + # TODO: Read path update attributes + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, {})) elif packet_type_value == PacketType.ACK.value: logger.debug("Received ACK") - await self.dispatcher.dispatch(Event(EventType.ACK, None)) + ack_data = {} + + if len(data) >= 5: + ack_data["code"] = bytes(data[1:5]).hex() + + attributes = { + "code": ack_data.get("code", "") + } + + await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) elif packet_type_value == PacketType.MESSAGES_WAITING.value: logger.debug("Msgs are waiting") - await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, None)) + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) elif packet_type_value == PacketType.RAW_DATA.value: res = {} @@ -197,11 +240,13 @@ class MessageReader: elif packet_type_value == PacketType.LOGIN_SUCCESS.value: logger.debug("Login success") - await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, None)) + # TODO: Read login attributes + await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, {})) elif packet_type_value == PacketType.LOGIN_FAILED.value: logger.debug("Login failed") - await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, None)) + # TODO: Read login attributes + await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, {})) elif packet_type_value == PacketType.STATUS_RESPONSE.value: res = {} @@ -224,16 +269,20 @@ class MessageReader: res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little') data_hex = data[8:].hex() logger.debug(f"Status response: {data_hex}") - await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res)) + + attributes = { + "pubkey_prefix": res["pubkey_pre"], + } + await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res, attributes)) elif packet_type_value == PacketType.LOG_DATA.value: logger.debug(f"Received RF log data: {data.hex()}") # Parse as raw RX data - log_data = { + log_data: Dict[str, Any] = { "raw_hex": data[1:].hex() } - + # First byte is SNR (signed byte, multiplied by 4) if len(data) > 1: snr_byte = data[1] @@ -253,8 +302,12 @@ class MessageReader: log_data["payload"] = data[3:].hex() log_data["payload_length"] = len(data) - 3 + attributes = { + "pubkey_prefix": log_data["raw_hex"], + } + # Dispatch as RF log data - await self.dispatcher.dispatch(Event(EventType.RX_LOG_DATA, log_data)) + await self.dispatcher.dispatch(Event(EventType.RX_LOG_DATA, log_data, attributes)) elif packet_type_value == PacketType.TRACE_DATA.value: logger.debug(f"Received trace data: {data.hex()}") @@ -298,7 +351,13 @@ class MessageReader: res["path"] = path_nodes logger.debug(f"Parsed trace data: {res}") - await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res)) + + attributes = { + "tag": res["tag"], + "auth_code": res["auth"], + } + + await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) else: logger.debug(f"Unhandled data received {data}")