From 8f0ecd7d75294f6389448f777d5ea7142f509e38 Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Tue, 8 Apr 2025 22:56:16 -0700 Subject: [PATCH] Refactor to event system --- .gitignore | 2 + README.md | 239 +++++++++- examples/ble_t1000_infos.py | 12 +- examples/ble_t1000_msg.py | 5 +- examples/pubsub_example.py | 117 +++++ examples/serial_battery_monitor.py | 53 +++ examples/serial_contacts.py | 3 +- examples/serial_infos.py | 16 +- examples/serial_msg.py | 12 +- examples/serial_repeater_status.py | 34 +- examples/tcp_mchome_contacts.py | 2 +- examples/tcp_mchome_infos.py | 12 +- examples/tcp_mchome_msg.py | 2 +- examples/tcp_mchome_readmsgs.py | 2 +- src/meshcore/__init__.py | 9 +- src/meshcore/ble_cx.py | 20 +- src/meshcore/commands.py | 216 +++++++++ src/meshcore/events.py | 123 ++++++ src/meshcore/meshcore.py | 684 ++++++++++++----------------- src/meshcore/meshcore_new.py | 249 +++++++++++ src/meshcore/packets.py | 30 ++ src/meshcore/reader.py | 235 ++++++++++ src/meshcore/serial_cx.py | 27 +- src/meshcore/tcp_cx.py | 21 +- 24 files changed, 1625 insertions(+), 500 deletions(-) create mode 100644 .gitignore create mode 100644 examples/pubsub_example.py create mode 100644 examples/serial_battery_monitor.py create mode 100644 src/meshcore/commands.py create mode 100644 src/meshcore/events.py create mode 100644 src/meshcore/meshcore_new.py create mode 100644 src/meshcore/packets.py create mode 100644 src/meshcore/reader.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..01d7f95 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +venv \ No newline at end of file diff --git a/README.md b/README.md index 76bb37e..b4f19cb 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,239 @@ -# Python meshcore +# Python MeshCore -Bindings to access your [MeshCore](https://meshcore.co.uk) companion radio nodes in python. +Python library for interacting with [MeshCore](https://meshcore.co.uk) companion radio nodes. -Can be installed via `pip` using `pip install meshcore` +## Installation -Used by [mccli](https://github.com/fdlamotte/mccli). +```bash +pip install meshcore +``` + +## Quick Start + +Connect to your device and send a message: + +```python +import asyncio +from meshcore import MeshCore, EventType + +async def main(): + # Connect to your device + meshcore = await MeshCore.create_serial("/dev/ttyUSB0") + + # Get your contacts + contacts = await meshcore.commands.get_contacts() + print(f"Found {len(contacts)} contacts") + + # Send a message to the first contact + if contacts: + contact_key = next(iter(contacts.items()))[1]['public_key'] + await meshcore.commands.send_msg(bytes.fromhex(contact_key), "Hello from Python!") + + await meshcore.disconnect() + +asyncio.run(main()) +``` + +## Development Setup + +To set up for development: + +```bash +# Create and activate virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install in development mode +pip install -e . + +# Run examples +python examples/pubsub_example.py -p /dev/ttyUSB0 +``` + +## Usage Guide + +### Connecting to Your Device + +Connect via Serial, BLE, or TCP: + +```python +# Serial connection +meshcore = await MeshCore.create_serial("/dev/ttyUSB0", 115200, debug=True) + +# BLE connection (scans for devices if address not provided) +meshcore = await MeshCore.create_ble("12:34:56:78:90:AB") + +# TCP connection +meshcore = await MeshCore.create_tcp("192.168.1.100", 4000) +``` + +### Using Commands (Synchronous Style) + +Send commands and wait for responses: + +```python +# Get device information +device_info = await meshcore.commands.send_device_query() +print(f"Device model: {device_info['model']}") + +# Get list of contacts +contacts = await meshcore.commands.get_contacts() +for contact_id, contact in contacts.items(): + print(f"Contact: {contact['adv_name']} ({contact_id})") + +# Send a message (destination key in bytes) +await meshcore.commands.send_msg(dst_key, "Hello!") + +# Setting device parameters +await meshcore.commands.set_name("My Device") +await meshcore.commands.set_tx_power(20) # Set transmit power +``` + +### Finding Contacts + +Easily find contacts by name or key: + +```python +# Find a contact by name +contact = meshcore.get_contact_by_name("Bob's Radio") +if contact: + print(f"Found Bob at: {contact['adv_lat']}, {contact['adv_lon']}") + +# Find by partial key prefix +contact = meshcore.get_contact_by_key_prefix("a1b2c3") +``` + +### Event-Based Programming (Asynchronous Style) + +Subscribe to events to handle them asynchronously: + +```python +# Subscribe to incoming messages +async def handle_message(event): + data = event.payload + print(f"Message from {data['pubkey_prefix']}: {data['text']}") + +subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, handle_message) + +# Subscribe to advertisements +async def handle_advert(event): + print("Advertisement detected!") + +meshcore.subscribe(EventType.ADVERTISEMENT, handle_advert) + +# When done, unsubscribe +meshcore.unsubscribe(subscription) +``` + +### Hybrid Approach (Commands + Events) + +Combine command-based and event-based styles: + +```python +import asyncio + +async def main(): + # Connect to device + meshcore = await MeshCore.create_serial("/dev/ttyUSB0") + + # Set up event handlers + async def handle_ack(event): + print("Message acknowledged!") + + async def handle_battery(event): + print(f"Battery level: {event.payload}%") + + # Subscribe to events + meshcore.subscribe(EventType.ACK, handle_ack) + meshcore.subscribe(EventType.BATTERY, handle_battery) + + # Create background task for battery checking + async def check_battery_periodically(): + while True: + # Send command (returns battery level) + result = await meshcore.commands.get_bat() + print(f"Battery check initiated, response: {result}") + await asyncio.sleep(60) # Wait 60 seconds between checks + + # Start the background task + battery_task = asyncio.create_task(check_battery_periodically()) + + # Send manual command and wait for response + await meshcore.commands.send_advert(flood=True) + + try: + # Keep the main program running + await asyncio.sleep(float('inf')) + except asyncio.CancelledError: + # Clean up when program ends + battery_task.cancel() + await meshcore.disconnect() + +# Run the program +asyncio.run(main()) +``` + +### Auto-Fetching Messages + +Let the library automatically fetch incoming messages: + +```python +# Start auto-fetching messages +await meshcore.start_auto_message_fetching() + +# Just subscribe to message events - the library handles fetching +async def on_message(event): + print(f"New message: {event.payload['text']}") + +meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_message) + +# When done +await meshcore.stop_auto_message_fetching() +``` + +### Debug Mode + +Enable debug logging for troubleshooting: + +```python +# Enable debug mode when creating the connection +meshcore = await MeshCore.create_serial("/dev/ttyUSB0", debug=True) +``` + +This logs detailed information about commands sent and events received. + +## Common Examples + +### Sending Messages to Contacts + +```python +# Get contacts and send to a specific one +contacts = await meshcore.commands.get_contacts() +for key, contact in contacts.items(): + if contact["adv_name"] == "Alice": + # Convert the hex key to bytes + dst_key = bytes.fromhex(contact["public_key"]) + await meshcore.commands.send_msg(dst_key, "Hello Alice!") + break +``` + +### Monitoring Channel Messages + +```python +# Subscribe to channel messages +async def channel_handler(event): + msg = event.payload + print(f"Channel {msg['channel_idx']}: {msg['text']}") + +meshcore.subscribe(EventType.CHANNEL_MSG_RECV, channel_handler) +``` + +## Examples in the Repo + +Check the `examples/` directory for more: + +- `pubsub_example.py`: Event subscription system with auto-fetching +- `serial_infos.py`: Quick device info retrieval +- `serial_msg.py`: Message sending and receiving +- `ble_t1000_infos.py`: BLE connections diff --git a/examples/ble_t1000_infos.py b/examples/ble_t1000_infos.py index 132da32..a48d591 100755 --- a/examples/ble_t1000_infos.py +++ b/examples/ble_t1000_infos.py @@ -3,16 +3,14 @@ import asyncio from meshcore import MeshCore -from meshcore import BLEConnection ADDRESS = "t1000" -async def main () : - con = BLEConnection(ADDRESS) - await con.connect() - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_ble(ADDRESS) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/ble_t1000_msg.py b/examples/ble_t1000_msg.py index b2a5dbd..c9ec543 100755 --- a/examples/ble_t1000_msg.py +++ b/examples/ble_t1000_msg.py @@ -1,7 +1,6 @@ #!/usr/bin/python import asyncio -import json from meshcore import MeshCore from meshcore import BLEConnection @@ -15,7 +14,7 @@ async def main () : mc = MeshCore(con) await mc.connect() - await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.get_contacts() + await mc.commands.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) asyncio.run(main()) diff --git a/examples/pubsub_example.py b/examples/pubsub_example.py new file mode 100644 index 0000000..e72d175 --- /dev/null +++ b/examples/pubsub_example.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +""" +Example demonstrating the new pub-sub system in MeshCore +Shows how to subscribe to events and use the event system +""" +import asyncio +import argparse + +from meshcore import MeshCore +from meshcore.events import EventType + + +async def message_callback(event): + print(f"\nReceived message: {event.payload['text']}") + print(f"From: {event.payload.get('pubkey_prefix', 'channel')}") + print(f"Type: {event.payload['type']}") + print(f"Timestamp: {event.payload['sender_timestamp']}") + + +async def advertisement_callback(event): + print("\nDetected advertisement") + + +async def main(): + parser = argparse.ArgumentParser(description="MeshCore Pub-Sub Example") + parser.add_argument( + "--port", "-p", help="Serial port", required=True + ) + parser.add_argument( + "--baud", "-b", type=int, help="Baud rate", default=115200 + ) + args = parser.parse_args() + + print(f"Connecting to {args.port} at {args.baud} baud...") + + # Create MeshCore instance with serial connection + meshcore = await MeshCore.create_serial(args.port, args.baud, debug=True) + + # Connection is already established + success = True + if not success: + print("Failed to connect to MeshCore device") + return + + print("Connected to MeshCore device") + + # Get contacts + contacts = await meshcore.commands.get_contacts() + if contacts: + print(f"\nFound {len(contacts)} contacts:") + for name, contact in contacts.items(): + print(f"- {name}") + + + await meshcore.commands.send_advert(flood=True) + + # Subscribe to private messages + private_subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, message_callback) + + # Subscribe to channel messages + channel_subscription = meshcore.subscribe(EventType.CHANNEL_MSG_RECV, message_callback) + + # Subscribe to advertisements + advert_subscription = meshcore.subscribe(EventType.ADVERTISEMENT, advertisement_callback) + + await meshcore.start_auto_message_fetching() + + print("\nSubscribed to events:") + print("- Private messages") + print("- Channel messages") + print("- Advertisements") + + print("\nWaiting for events. Press Ctrl+C to exit...\n") + + # Get device info + device_info = await meshcore.commands.send_device_query() + if device_info: + print(f"Device info: {device_info}") + + # Get time from the device + device_time = await meshcore.commands.get_time() + print(f"Device time: {device_time}") + + # Access current time through the property + print(f"Current time (property): {meshcore.time}") + + try: + while True: + # Wait for messages + await asyncio.sleep(1) + except KeyboardInterrupt: + meshcore.stop() + print("\nExiting...") + except asyncio.CancelledError: + # Handle task cancellation from KeyboardInterrupt in asyncio.run() + print("\nTask cancelled - cleaning up...") + finally: + # Clean up subscriptions + meshcore.unsubscribe(private_subscription) + meshcore.unsubscribe(channel_subscription) + meshcore.unsubscribe(advert_subscription) + + # Stop auto-message fetching + await meshcore.stop_auto_message_fetching() + + # Disconnect + await meshcore.disconnect() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + # This prevents the KeyboardInterrupt traceback from being shown + print("\nExited cleanly") + except Exception as e: + print(f"Error: {e}") \ No newline at end of file diff --git a/examples/serial_battery_monitor.py b/examples/serial_battery_monitor.py new file mode 100644 index 0000000..6343f4c --- /dev/null +++ b/examples/serial_battery_monitor.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +""" +Simple example of eventing approach with battery monitoring +""" +import asyncio +import argparse + +from meshcore import MeshCore +from meshcore.events import EventType + + +async def main(): + parser = argparse.ArgumentParser(description="Battery Monitor Example") + parser.add_argument("--port", "-p", help="Serial port", required=True) + parser.add_argument("--baud", "-b", type=int, help="Baud rate", default=115200) + args = parser.parse_args() + + print(f"Connecting to {args.port}...") + + # Connect to device + meshcore = await MeshCore.create_serial(args.port, args.baud) + + # Event handler for battery updates + async def on_battery(event): + print(f"Battery event: {event.payload}mV") + + # Subscribe to battery events + meshcore.subscribe(EventType.BATTERY, on_battery) + + # Background task that checks battery every 10 seconds + async def check_battery(): + while True: + print("Requesting battery level...") + await meshcore.commands.get_bat() # This command will trigger a battery event + await asyncio.sleep(10) + + # Start the battery check task + task = asyncio.create_task(check_battery()) + + try: + # Keep program running + print("Monitoring battery (press Ctrl+C to exit)...") + await asyncio.sleep(float('inf')) + except KeyboardInterrupt: + print("\nExiting...") + finally: + # Clean up + task.cancel() + await meshcore.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/serial_contacts.py b/examples/serial_contacts.py index 2ab5c76..9b07a02 100755 --- a/examples/serial_contacts.py +++ b/examples/serial_contacts.py @@ -2,6 +2,7 @@ import asyncio import json + from meshcore import MeshCore from meshcore import SerialConnection @@ -15,6 +16,6 @@ async def main () : mc = MeshCore(con) await mc.connect() - print(json.dumps(await mc.get_contacts(),indent=4)) + print(json.dumps(await mc.commands.get_contacts(),indent=4)) asyncio.run(main()) diff --git a/examples/serial_infos.py b/examples/serial_infos.py index cb41be5..7e8000d 100755 --- a/examples/serial_infos.py +++ b/examples/serial_infos.py @@ -3,19 +3,15 @@ import asyncio from meshcore import MeshCore -from meshcore import SerialConnection -PORT = "/dev/ttyUSB0" +PORT = "/dev/tty.usbserial-583A0069501" BAUDRATE = 115200 -async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish - - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_serial(PORT, BAUDRATE) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/serial_msg.py b/examples/serial_msg.py index 6a093ec..f532c38 100755 --- a/examples/serial_msg.py +++ b/examples/serial_msg.py @@ -1,9 +1,7 @@ #!/usr/bin/python import asyncio -import json from meshcore import MeshCore -from meshcore import SerialConnection PORT = "/dev/ttyUSB0" BAUDRATE = 115200 @@ -11,14 +9,10 @@ DEST = "mchome" MSG = "hello from serial" async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish - - mc = MeshCore(con) - await mc.connect() + mc = await MeshCore.create_serial(PORT, BAUDRATE) await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.commands.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6], MSG) asyncio.run(main()) + diff --git a/examples/serial_repeater_status.py b/examples/serial_repeater_status.py index b0ebfdd..2a77336 100755 --- a/examples/serial_repeater_status.py +++ b/examples/serial_repeater_status.py @@ -3,31 +3,27 @@ import asyncio from meshcore import MeshCore -from meshcore import printerr -from meshcore import SerialConnection +from meshcore.events import EventType -PORT = "/dev/ttyUSB0" +PORT = "/dev/tty.usbserial-583A0069501" BAUDRATE = 115200 -REPEATER="FdlRoom" -PASSWORD="password" +REPEATER="Orion" +PASSWORD="floopyboopy" async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish + mc = await MeshCore.create_serial(PORT, BAUDRATE) + await mc.commands.get_contacts() + repeater = mc.get_contact_by_name(REPEATER) + + await mc.commands.send_login(bytes.fromhex(repeater["public_key"]), PASSWORD) - mc = MeshCore(con) - await mc.connect() + print("Login sent ... awaiting") - contacts = await mc.get_contacts() - repeater = contacts[REPEATER] - await mc.send_login(bytes.fromhex(repeater["public_key"]), PASSWORD) - - printerr("Login sent ... awaiting") - - if await mc.wait_login() : - await mc.send_statusreq(bytes.fromhex(repeater["public_key"])) - print(await mc.wait_status()) + if await mc.wait_for_event(EventType.LOGIN_SUCCESS): + print("Logged in success") + await mc.commands.send_statusreq(bytes.fromhex(repeater["public_key"])) + print("Status request sent ... awaiting") + print(await mc.wait_for_event(EventType.STATUS_RESPONSE)) asyncio.run(main()) diff --git a/examples/tcp_mchome_contacts.py b/examples/tcp_mchome_contacts.py index 3e730d8..a4f71f2 100755 --- a/examples/tcp_mchome_contacts.py +++ b/examples/tcp_mchome_contacts.py @@ -14,5 +14,5 @@ async def main () : mc = MeshCore(con) await mc.connect() - print(json.dumps(await mc.get_contacts(),indent=4)) + print(json.dumps(await mc.commands.get_contacts(),indent=4)) asyncio.run(main()) diff --git a/examples/tcp_mchome_infos.py b/examples/tcp_mchome_infos.py index d1ec958..b5c24d2 100755 --- a/examples/tcp_mchome_infos.py +++ b/examples/tcp_mchome_infos.py @@ -3,17 +3,15 @@ import asyncio from meshcore import MeshCore -from meshcore import TCPConnection HOSTNAME = "mchome" PORT = 5000 -async def main () : - con = TCPConnection(HOSTNAME, PORT) - await con.connect() - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_tcp(HOSTNAME, PORT) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/tcp_mchome_msg.py b/examples/tcp_mchome_msg.py index d3f751e..45f36af 100755 --- a/examples/tcp_mchome_msg.py +++ b/examples/tcp_mchome_msg.py @@ -17,6 +17,6 @@ async def main () : await mc.connect() await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6],MSG) asyncio.run(main()) diff --git a/examples/tcp_mchome_readmsgs.py b/examples/tcp_mchome_readmsgs.py index e575000..4cafca9 100755 --- a/examples/tcp_mchome_readmsgs.py +++ b/examples/tcp_mchome_readmsgs.py @@ -18,7 +18,7 @@ async def main () : res = True while res: - res = await mc.get_msg() + res = await mc.commands.get_msg() if res : print (res) diff --git a/src/meshcore/__init__.py b/src/meshcore/__init__.py index 415d909..0171409 100644 --- a/src/meshcore/__init__.py +++ b/src/meshcore/__init__.py @@ -1,5 +1,10 @@ -from meshcore.meshcore import printerr -from meshcore.meshcore import MeshCore +import logging + +# Setup default logger +logging.basicConfig(level=logging.INFO) + +from meshcore.events import EventType +from meshcore.meshcore import MeshCore, logger from meshcore.tcp_cx import TCPConnection from meshcore.ble_cx import BLEConnection from meshcore.serial_cx import SerialConnection diff --git a/src/meshcore/ble_cx.py b/src/meshcore/ble_cx.py index 6280e8c..8f49a1c 100644 --- a/src/meshcore/ble_cx.py +++ b/src/meshcore/ble_cx.py @@ -3,8 +3,10 @@ """ import asyncio import sys +import logging -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic @@ -40,11 +42,11 @@ class BLEConnection: if self.address is None or self.address == "" or len(self.address.split(":")) != 6 : scanner = BleakScanner() - printerr("Scanning for devices") + logger.info("Scanning for devices") device = await scanner.find_device_by_filter(match_meshcore_device) if device is None : return None - printerr(f"Found device : {device}") + logger.info(f"Found device : {device}") self.client = BleakClient(device) self.address = self.client.address else: @@ -62,22 +64,22 @@ class BLEConnection: nus = self.client.services.get_service(UART_SERVICE_UUID) self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID) - printerr("BLE Connexion started") + logger.info("BLE Connection started") return self.address def handle_disconnect(self, _: BleakClient): """ Callback to handle disconnection """ - printerr ("Device was disconnected, goodbye.") + logger.info("Device was disconnected, goodbye.") # cancelling all tasks effectively ends the program for task in asyncio.all_tasks(): task.cancel() - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray): - if not self.mc is None: - self.mc.handle_rx(data) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(data)) async def send(self, data): await self.client.write_gatt_char(self.rx_char, bytes(data), response=False) diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py new file mode 100644 index 0000000..856a82a --- /dev/null +++ b/src/meshcore/commands.py @@ -0,0 +1,216 @@ +import functools +import asyncio +import logging +import warnings +import time +from typing import Any, Callable, Awaitable, Optional, Union +from .events import EventType + +logger = logging.getLogger("meshcore") + +class CommandError(Exception): + def __init__(self, details=None): + self.details = details + super().__init__(f"Command error: {details}") + +def deprecated(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + warnings.warn( + f"Method {func.__name__} is deprecated. Use commands.{func.__name__} instead.", + DeprecationWarning, + stacklevel=2 + ) + return await func(*args, **kwargs) + return wrapper + + +class CommandHandler: + def __init__(self): + self._sender_func = None + self._reader = None + self.dispatcher = None + + def set_connection(self, connection): + async def sender(data): + await connection.send(data) + self._sender_func = sender + + def set_reader(self, reader): + self._reader = reader + + def set_dispatcher(self, dispatcher): + self.dispatcher = dispatcher + + async def send(self, data, expected_events=None, timeout=5.0): + if not self.dispatcher: + raise RuntimeError("Dispatcher not set, cannot send commands") + + if self._sender_func: + logger.debug(f"Sending raw data: {data.hex() if isinstance(data, bytes) else data}") + await self._sender_func(data) + + if expected_events: + try: + # Convert single event to list if needed + if not isinstance(expected_events, list): + expected_events = [expected_events] + + logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") + for event_type in expected_events: + event = await self.dispatcher.wait_for_event(event_type, timeout) + if event: + return event.payload + return False + except asyncio.TimeoutError: + logger.debug(f"Command timed out {data}") + return False + except Exception as e: + logger.debug(f"Command error: {e}") + return {"error": str(e)} + return True + + + async def send_appstart(self): + logger.debug("Sending appstart command") + b1 = bytearray(b'\x01\x03 mccli') + return await self.send(b1, [EventType.SELF_INFO]) + + async def send_device_query(self): + logger.debug("Sending device query command") + return await self.send(b"\x16\x03", [EventType.DEVICE_INFO, EventType.ERROR]) + + async def send_advert(self, flood=False): + logger.debug(f"Sending advertisement command (flood={flood})") + if flood: + return await self.send(b"\x07\x01", [EventType.OK, EventType.ERROR]) + else: + return await self.send(b"\x07", [EventType.OK, EventType.ERROR]) + + async def set_name(self, name): + logger.debug(f"Setting device name to: {name}") + return await self.send(b'\x08' + name.encode("ascii"), [EventType.OK, EventType.ERROR]) + + async def set_coords(self, lat, lon): + logger.debug(f"Setting coordinates to: lat={lat}, lon={lon}") + return await self.send(b'\x0e'\ + + int(lat*1e6).to_bytes(4, 'little', signed=True)\ + + int(lon*1e6).to_bytes(4, 'little', signed=True)\ + + int(0).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def reboot(self): + logger.debug("Sending reboot command") + return await self.send(b'\x13reboot') + + async def get_bat(self): + logger.debug("Getting battery information") + return await self.send(b'\x14', [EventType.BATTERY, EventType.ERROR]) + + async def get_time(self): + logger.debug("Getting device time") + return await self.send(b"\x05", [EventType.CURRENT_TIME, EventType.ERROR]) + + async def set_time(self, val): + logger.debug(f"Setting device time to: {val}") + return await self.send(b"\x06" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_tx_power(self, val): + logger.debug(f"Setting TX power to: {val}") + return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_radio(self, freq, bw, sf, cr): + logger.debug(f"Setting radio params: freq={freq}, bw={bw}, sf={sf}, cr={cr}") + return await self.send(b"\x0b" \ + + int(float(freq)*1000).to_bytes(4, 'little')\ + + int(float(bw)*1000).to_bytes(4, 'little')\ + + int(sf).to_bytes(1, 'little')\ + + int(cr).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_tuning(self, rx_dly, af): + logger.debug(f"Setting tuning params: rx_dly={rx_dly}, af={af}") + return await self.send(b"\x15" \ + + int(rx_dly).to_bytes(4, 'little')\ + + int(af).to_bytes(4, 'little')\ + + int(0).to_bytes(1, 'little')\ + + int(0).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_devicepin(self, pin): + logger.debug(f"Setting device PIN to: {pin}") + return await self.send(b"\x25" \ + + int(pin).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def get_contacts(self): + logger.debug("Getting contacts") + return await self.send(b"\x04", [EventType.CONTACTS, EventType.ERROR]) + + async def reset_path(self, key): + logger.debug(f"Resetting path for contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x0D" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def share_contact(self, key): + logger.debug(f"Sharing contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x10" + key + return await self.send(data, [EventType.CONTACT_SHARE, EventType.ERROR]) + + async def export_contact(self, key=b""): + logger.debug(f"Exporting contact: {key.hex() if key else 'all'}") + data = b"\x11" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def remove_contact(self, key): + logger.debug(f"Removing contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x0f" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def get_msg(self): + logger.debug("Requesting pending messages") + return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], 1) + + async def send_login(self, dst, pwd): + logger.debug(f"Sending login request to: {dst.hex() if isinstance(dst, bytes) else dst}") + data = b"\x1a" + dst + pwd.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_statusreq(self, dst): + logger.debug(f"Sending status request to: {dst.hex() if isinstance(dst, bytes) else dst}") + data = b"\x1b" + dst + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_cmd(self, dst, cmd, timestamp=None): + logger.debug(f"Sending command to {dst.hex() if isinstance(dst, bytes) else dst}: {cmd}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii") + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def send_msg(self, dst, msg, timestamp=None): + logger.debug(f"Sending message to {dst.hex() if isinstance(dst, bytes) else dst}: {msg}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_chan_msg(self, chan, msg, timestamp=None): + logger.debug(f"Sending channel message to channel {chan}: {msg}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_cli(self, cmd): + logger.debug(f"Sending CLI command: {cmd}") + data = b"\x32" + cmd.encode('ascii') + return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR]) \ No newline at end of file diff --git a/src/meshcore/events.py b/src/meshcore/events.py new file mode 100644 index 0000000..0f3ddf5 --- /dev/null +++ b/src/meshcore/events.py @@ -0,0 +1,123 @@ +from enum import Enum +import logging +from typing import Any, Dict, Optional, Callable, List, Union +import asyncio +from dataclasses import dataclass + +logger = logging.getLogger("meshcore") + +# Public event types for users to subscribe to +class EventType(Enum): + CONTACTS = "contacts" + SELF_INFO = "self_info" + CONTACT_MSG_RECV = "contact_message" + CHANNEL_MSG_RECV = "channel_message" + CURRENT_TIME = "time_update" + NO_MORE_MSGS = "no_more_messages" + CONTACT_SHARE = "contact_share" + BATTERY = "battery_info" + DEVICE_INFO = "device_info" + CLI_RESPONSE = "cli_response" + MSG_SENT = "message_sent" + + # Push notifications + ADVERTISEMENT = "advertisement" + PATH_UPDATE = "path_update" + ACK = "acknowledgement" + MESSAGES_WAITING = "messages_waiting" + RAW_DATA = "raw_data" + LOGIN_SUCCESS = "login_success" + LOGIN_FAILED = "login_failed" + STATUS_RESPONSE = "status_response" + LOG_DATA = "log_data" + + # Command response types + OK = "command_ok" + ERROR = "command_error" + + +@dataclass +class Event: + type: EventType + payload: Any + attributes: Dict[str, Any] = None + + def __post_init__(self): + if self.attributes is None: + self.attributes = {} + + +class Subscription: + def __init__(self, dispatcher, event_type, callback): + self.dispatcher = dispatcher + self.event_type = event_type + self.callback = callback + + def unsubscribe(self): + self.dispatcher._remove_subscription(self) + + +class EventDispatcher: + def __init__(self): + self.queue = asyncio.Queue() + self.subscriptions: List[Subscription] = [] + self.running = False + self._task = None + + def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], None]) -> Subscription: + subscription = Subscription(self, event_type, callback) + self.subscriptions.append(subscription) + return subscription + + def _remove_subscription(self, subscription: Subscription): + if subscription in self.subscriptions: + self.subscriptions.remove(subscription) + + async def dispatch(self, event: Event): + await self.queue.put(event) + + async def _process_events(self): + while self.running: + event = await self.queue.get() + logger.debug(f"Dispatching event: {event.type}, {event.payload}") + for subscription in self.subscriptions.copy(): + if subscription.event_type is None or subscription.event_type == event.type: + try: + await subscription.callback(event) + except Exception as e: + print(f"Error in event handler: {e}") + + self.queue.task_done() + + async def start(self): + if not self.running: + self.running = True + self._task = asyncio.create_task(self._process_events()) + + async def stop(self): + if self.running: + self.running = False + if self._task: + await self.queue.join() + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + async def wait_for_event(self, event_type: EventType, timeout: float = None) -> Optional[Event]: + future = asyncio.Future() + + async def event_handler(event: Event): + if not future.done(): + future.set_result(event) + + subscription = self.subscribe(event_type, event_handler) + + try: + return await asyncio.wait_for(future, timeout) + except asyncio.TimeoutError: + return None + finally: + subscription.unsubscribe() \ No newline at end of file diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index c70c4a9..4ac579c 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -1,414 +1,288 @@ -""" - mccli.py : CLI interface to MeschCore BLE companion app -""" import asyncio -import sys +import functools +import warnings +import logging +from typing import Optional -def printerr (s) : - sys.stderr.write(str(s)) - sys.stderr.write("\n") - sys.stderr.flush() +from .events import EventDispatcher, EventType +from .reader import MessageReader +from .commands import CommandHandler + + +# Setup default logger +logger = logging.getLogger("meshcore") + + +def deprecated(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + warnings.warn( + f"Method {func.__name__} is deprecated. Use commands.{func.__name__} instead.", + DeprecationWarning, + stacklevel=2 + ) + return await func(*args, **kwargs) + return wrapper class MeshCore: """ - Interface to a BLE MeshCore device + Interface to a MeshCore device """ - self_info={} - contacts={} - - def __init__(self, cx): - """ Constructor : specify address """ - self.time = 0 - self.result = asyncio.Future() - self.contact_nb = 0 - self.rx_sem = asyncio.Semaphore(0) - self.ack_ev = asyncio.Event() - self.login_resp = asyncio.Future() - self.status_resp = asyncio.Future() - + def __init__(self, cx, debug=False): self.cx = cx - cx.set_mc(self) - - async def connect(self) : - await self.send_appstart() - - def handle_rx(self, data: bytearray): - """ Callback to handle received data """ - match data[0]: - case 0: # ok - if len(data) == 5 : # an integer - self.result.set_result(int.from_bytes(data[1:5], byteorder='little')) - else: - self.result.set_result(True) - case 1: # error - if len(data) > 1: - res = {} - res["error_code"] = data[1] - self.result.set_result(res) # error code if fw > 1.4 - else: - self.result.set_result(False) - case 2: # contact start - self.contact_nb = int.from_bytes(data[1:5], byteorder='little') - self.contacts={} - case 3: # contact - c = {} - c["public_key"] = data[1:33].hex() - c["type"] = data[33] - c["flags"] = data[34] - c["out_path_len"] = int.from_bytes(data[35:36], signed=True) - plen = int.from_bytes(data[35:36], signed=True) - if plen == -1 : - plen = 0 - c["out_path"] = data[36:36+plen].hex() - c["adv_name"] = data[100:132].decode().replace("\0","") - c["last_advert"] = int.from_bytes(data[132:136], byteorder='little') - c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6 - c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6 - c["lastmod"] = int.from_bytes(data[144:148], byteorder='little') - self.contacts[c["adv_name"]]=c - case 4: # end of contacts - self.result.set_result(self.contacts) - case 5: # self info - self.self_info["adv_type"] = data[1] - self.self_info["tx_power"] = data[2] - self.self_info["max_tx_power"] = data[3] - self.self_info["public_key"] = data[4:36].hex() - self.self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6 - self.self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6 - #self.self_info["reserved_44:48"] = data[44:48].hex() - self.self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000 - self.self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000 - self.self_info["radio_sf"] = data[56] - self.self_info["radio_cr"] = data[57] - self.self_info["name"] = data[58:].decode() - self.result.set_result(True) - case 6: # msg sent - res = {} - res["type"] = data[1] - res["expected_ack"] = bytes(data[2:6]) - res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') - self.result.set_result(res) - case 7: # contact msg recv - res = {} - res["type"] = "PRIV" - res["pubkey_prefix"] = data[1:7].hex() - res["path_len"] = data[7] - res["txt_type"] = data[8] - res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little') - if data[8] == 2 : # signed packet - res["signature"] = data[13:17].hex() - res["text"] = data[17:].decode() - else : - res["text"] = data[13:].decode() - self.result.set_result(res) - case 16: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "PRIV" - res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4; - res["pubkey_prefix"] = data[4:10].hex() - res["path_len"] = data[10] - res["txt_type"] = data[11] - res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little') - if data[11] == 2 : # signed packet - res["signature"] = data[16:20].hex() - res["text"] = data[20:].decode() - else : - res["text"] = data[16:].decode() - self.result.set_result(res) - case 8 : # chanel msg recv - res = {} - res["type"] = "CHAN" - res["channel_idx"] = data[1] - res["path_len"] = data[2] - res["txt_type"] = data[3] - res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little') - res["text"] = data[8:].decode() - self.result.set_result(res) - case 17: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "CHAN" - res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4; - res["channel_idx"] = data[4] - res["path_len"] = data[5] - res["txt_type"] = data[6] - res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little') - res["text"] = data[11:].decode() - self.result.set_result(res) - case 9: # current time - self.result.set_result(int.from_bytes(data[1:5], byteorder='little')) - case 10: # no more msgs - self.result.set_result(False) - case 11: # contact - self.result.set_result("meshcore://" + data[1:].hex()) - case 12: # battery voltage - self.result.set_result(int.from_bytes(data[1:3], byteorder='little')) - case 13: # device info - res = {} - res["fw ver"] = data[1] - if data[1] >= 3: - res["max_contacts"] = data[2] * 2 - res["max_channels"] = data[3] - res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little') - res["fw_build"] = data[8:20].decode().replace("\0","") - res["model"] = data[20:60].decode().replace("\0","") - res["ver"] = data[60:80].decode().replace("\0","") - self.result.set_result(res) - case 50: # cli response - res = {} - res["response"] = data[1:].decode() - self.result.set_result(res) - # push notifications - case 0x80: - printerr ("Advertisment received") - case 0x81: - printerr ("Code path update") - case 0x82: - self.ack_ev.set() - printerr ("Received ACK") - case 0x83: - self.rx_sem.release() - printerr ("Msgs are waiting") - case 0x84: - printerr ("Received raw data") - res = {} - res["SNR"] = data[1] / 4 - res["RSSI"] = data[2] - res["payload"] = data[4:].hex() - print(res) - case 0x85: - self.login_resp.set_result(True) - - printerr ("Login success") - case 0x86: - self.login_resp.set_result(False) - printerr ("Login failed") - case 0x87: - res = {} - res["pubkey_pre"] = data[2:8].hex() - res["bat"] = int.from_bytes(data[8:10], byteorder='little') - res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little') - res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little') - res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True) - res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False) - res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False) - res["airtime"] = int.from_bytes(data[24:28], byteorder='little') - res["uptime"] = int.from_bytes(data[28:32], byteorder='little') - res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little') - res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little') - res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little') - res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little') - res["full_evts"] = int.from_bytes(data[48:50], byteorder='little') - res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4 - res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little') - res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little') - self.status_resp.set_result(res) - data_hex = data[8:].hex() - printerr (f"Status response: {data_hex}") - #printerr(res) - case 0x88: - printerr ("Received log data") - # unhandled - case _: - printerr (f"Unhandled data received {data}") - - async def send(self, data, timeout = 5): - """ Helper function to synchronously send (and receive) data to the node """ - self.result = asyncio.Future() - try: - await self.cx.send(data) - res = await asyncio.wait_for(self.result, timeout) - return res - except TimeoutError : - printerr ("Timeout while sending message ...") - return False - - async def send_only(self, data): # don't wait reply - await self.cx.send(data) - - async def send_appstart(self): - """ Send APPSTART to the node """ - b1 = bytearray(b'\x01\x03 mccli') - return await self.send(b1) - - async def send_device_qeury(self): - return await self.send(b"\x16\x03"); - - async def send_advert(self, flood=False): - """ Make the node send an advertisement """ - if flood : - return await self.send(b"\x07\x01") - else : - return await self.send(b"\x07") - - async def set_name(self, name): - """ Changes the name of the node """ - return await self.send(b'\x08' + name.encode("ascii")) - - async def set_coords(self, lat, lon): - return await self.send(b'\x0e'\ - + int(lat*1e6).to_bytes(4, 'little', signed=True)\ - + int(lon*1e6).to_bytes(4, 'little', signed=True)\ - + int(0).to_bytes(4, 'little')) - - async def reboot(self): - await self.send_only(b'\x13reboot') - return True - - async def get_bat(self): - return await self.send(b'\x14') - - async def get_time(self): - """ Get the time (epoch) of the node """ - self.time = await self.send(b"\x05") - return self.time - - async def set_time(self, val): - """ Sets a new epoch """ - return await self.send(b"\x06" + int(val).to_bytes(4, 'little')) - - async def set_tx_power(self, val): - """ Sets tx power """ - return await self.send(b"\x0c" + int(val).to_bytes(4, 'little')) - - async def set_radio (self, freq, bw, sf, cr): - """ Sets radio params """ - return await self.send(b"\x0b" \ - + int(float(freq)*1000).to_bytes(4, 'little')\ - + int(float(bw)*1000).to_bytes(4, 'little')\ - + int(sf).to_bytes(1, 'little')\ - + int(cr).to_bytes(1, 'little')) - - async def set_tuning (self, rx_dly, af): - """ Sets radio params """ - return await self.send(b"\x15" \ - + int(rx_dly).to_bytes(4, 'little')\ - + int(af).to_bytes(4, 'little')\ - + int(0).to_bytes(1, 'little')\ - + int(0).to_bytes(1, 'little')) - - async def set_devicepin (self, pin): - return await self.send(b"\x25" \ - + int(pin).to_bytes(4, 'little')) - - async def get_contacts(self): - """ Starts retreiving contacts """ - return await self.send(b"\x04") - + self.dispatcher = EventDispatcher() + self._reader = MessageReader(self.dispatcher) + self.commands = CommandHandler() + + # Set up logger + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + # Set up connections + self.commands.set_connection(cx) + + # Set the dispatcher in the command handler + self.commands.set_dispatcher(self.dispatcher) + self.commands.set_reader(self._reader) + + # Initialize state (private) + self._contacts = {} + self._self_info = {} + self._time = 0 + + # Set up event subscriptions to track data + self._setup_data_tracking() + + cx.set_reader(self._reader) + + @classmethod + async def create_tcp(cls, host: str, port: int, debug: bool = False) -> 'MeshCore': + """Create and connect a MeshCore instance using TCP connection""" + from .tcp_cx import TCPConnection + + connection = TCPConnection(host, port) + await connection.connect() + + mc = cls(connection, debug=debug) + await mc.connect() + return mc + + @classmethod + async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False) -> 'MeshCore': + """Create and connect a MeshCore instance using serial connection""" + from .serial_cx import SerialConnection + import asyncio + + connection = SerialConnection(port, baudrate) + await connection.connect() + await asyncio.sleep(0.1) # Time for transport to establish + + mc = cls(connection, debug=debug) + await mc.connect() + return mc + + @classmethod + async def create_ble(cls, address: Optional[str] = None, debug: bool = False) -> 'MeshCore': + """Create and connect a MeshCore instance using BLE connection + + If address is None, it will scan for and connect to the first available MeshCore device. + """ + from .ble_cx import BLEConnection + + connection = BLEConnection(address) + result = await connection.connect() + if result is None: + raise ConnectionError("Failed to connect to BLE device") + + mc = cls(connection, debug=debug) + await mc.connect() + return mc + + async def connect(self): + await self.dispatcher.start() + return await self.commands.send_appstart() + + async def disconnect(self): + await self.dispatcher.stop() + + def stop(self): + """Synchronously stop the event dispatcher task""" + if self.dispatcher._task and not self.dispatcher._task.done(): + self.dispatcher.running = False + self.dispatcher._task.cancel() + + def subscribe(self, event_type: EventType, callback): + """ + Subscribe to events using EventType enum + + Args: + event_type: Type of event to subscribe to, from EventType enum + callback: Async function to call when event occurs + + Returns: + Subscription object that can be used to unsubscribe + """ + return self.dispatcher.subscribe(event_type, callback) + + def unsubscribe(self, subscription): + """ + Unsubscribe from events using a subscription object + + Args: + subscription: Subscription object returned from subscribe() + """ + if subscription: + subscription.unsubscribe() + + async def wait_for_event(self, event_type: EventType, timeout=None): + """ + Wait for an event using EventType enum + + Args: + event_type: Type of event to wait for, from EventType enum + timeout: Maximum time to wait in seconds, or None for no timeout + + Returns: + Event object or None if timeout + """ + return await self.dispatcher.wait_for_event(event_type, timeout) + + def _setup_data_tracking(self): + """Set up event subscriptions to track data internally""" + async def _update_contacts(event): + self._contacts = event.payload + + async def _update_self_info(event): + self._self_info = event.payload + + async def _update_time(event): + self._time = event.payload + + # Subscribe to events to update internal state + self.subscribe(EventType.CONTACTS, _update_contacts) + self.subscribe(EventType.SELF_INFO, _update_self_info) + self.subscribe(EventType.CURRENT_TIME, _update_time) + + # Getter methods for state + @property + def contacts(self): + """Get the current contacts""" + return self._contacts + + @property + def self_info(self): + """Get device self info""" + return self._self_info + + @property + def time(self): + """Get the current device time""" + return self._time + + def get_contact_by_name(self, name): + """ + Find a contact by its name (adv_name field) + + Args: + name: The name to search for + + Returns: + Contact dictionary or None if not found + """ + if not self._contacts: + return None + + for contact_id, contact in self._contacts.items(): + if contact.get("adv_name", "").lower() == name.lower(): + return contact + + return None + + def get_contact_by_key_prefix(self, prefix): + """ + Find a contact by its public key prefix + + Args: + prefix: The public key prefix to search for (can be a partial prefix) + + Returns: + Contact dictionary or None if not found + """ + if not self._contacts or not prefix: + return None + + # Convert the prefix to lowercase for case-insensitive matching + prefix = prefix.lower() + + for contact_id, contact in self._contacts.items(): + public_key = contact.get("public_key", "").lower() + if public_key.startswith(prefix): + return contact + + return None + + async def start_auto_message_fetching(self): + """ + Start automatically fetching messages when messages_waiting events are received. + This will continuously check for new messages when the device indicates + messages are waiting. + """ + self._auto_fetch_task = None + self._auto_fetch_running = True + + async def _handle_messages_waiting(event): + # Only start a new fetch task if one isn't already running + if not self._auto_fetch_task or self._auto_fetch_task.done(): + self._auto_fetch_task = asyncio.create_task(_fetch_messages_loop()) + + async def _fetch_messages_loop(): + while self._auto_fetch_running: + try: + # Request the next message + result = await self.commands.get_msg() + + # If we got a NO_MORE_MSGS event or an error, stop fetching + if not result or isinstance(result, dict) and "error" in result: + break + + # Small delay to prevent overwhelming the device + await asyncio.sleep(0.1) + except Exception as e: + logger.error(f"Error fetching messages: {e}") + break + + # Subscribe to MESSAGES_WAITING events + self._auto_fetch_subscription = self.subscribe(EventType.MESSAGES_WAITING, _handle_messages_waiting) + + # Check for any pending messages immediately + await self.commands.get_msg() + + return self._auto_fetch_subscription + + async def stop_auto_message_fetching(self): + """ + Stop automatically fetching messages when messages_waiting events are received. + """ + if hasattr(self, '_auto_fetch_subscription') and self._auto_fetch_subscription: + self.unsubscribe(self._auto_fetch_subscription) + self._auto_fetch_subscription = None + + if hasattr(self, '_auto_fetch_running'): + self._auto_fetch_running = False + + if hasattr(self, '_auto_fetch_task') and self._auto_fetch_task and not self._auto_fetch_task.done(): + self._auto_fetch_task.cancel() + try: + await self._auto_fetch_task + except asyncio.CancelledError: + pass + self._auto_fetch_task = None + async def ensure_contacts(self): - if len(self.contacts) == 0 : - await self.get_contacts() - - async def reset_path(self, key): - data = b"\x0D" + key - return await self.send(data) - - async def share_contact(self, key): - data = b"\x10" + key - return await self.send(data) - - async def export_contact(self, key=b""): - data = b"\x11" + key - return await self.send(data) - - async def remove_contact(self, key): - data = b"\x0f" + key - return await self.send(data) - - async def set_out_path(self, contact, path): - contact["out_path"] = path - contact["out_path_len"] = -1 - contact["out_path_len"] = int(len(path) / 2) - - async def update_contact(self, contact): - out_path_hex = contact["out_path"] - out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0" - adv_name_hex = contact["adv_name"].encode().hex() - adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0" - data = b"\x09" \ - + bytes.fromhex(contact["public_key"])\ - + contact["type"].to_bytes(1)\ - + contact["flags"].to_bytes(1)\ - + contact["out_path_len"].to_bytes(1, 'little', signed=True)\ - + bytes.fromhex(out_path_hex)\ - + bytes.fromhex(adv_name_hex)\ - + contact["last_advert"].to_bytes(4, 'little')\ - + int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\ - + int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True) - return await self.send(data) - - async def send_login(self, dst, pwd): - self.login_resp = asyncio.Future() - data = b"\x1a" + dst + pwd.encode("ascii") - return await self.send(data) - - async def wait_login(self, timeout = 5): - try : - return await asyncio.wait_for(self.login_resp, timeout) - except TimeoutError : - printerr ("Timeout ...") - return False - - async def send_statusreq(self, dst): - self.status_resp = asyncio.Future() - data = b"\x1b" + dst - return await self.send(data) - - async def wait_status(self, timeout = 5): - try : - return await asyncio.wait_for(self.status_resp, timeout) - except TimeoutError : - printerr ("Timeout...") - return False - - async def send_cmd(self, dst, cmd): - """ Send a cmd to a node """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii") - #self.ack_ev.clear() # no ack ? - return await self.send(data) - - async def send_msg(self, dst, msg): - """ Send a message to a node """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii") - self.ack_ev.clear() - return await self.send(data) - - async def send_chan_msg(self, chan, msg): - """ Send a message to a public channel """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii") - return await self.send(data) - - async def get_msg(self): - """ Get message from the node (stored in queue) """ - res = await self.send(b"\x0A", 1) - if res is False : - self.rx_sem=asyncio.Semaphore(0) # reset semaphore as there are no msgs in queue - return res - - async def wait_msg(self, timeout=-1): - """ Wait for a message """ - if timeout == -1 : - await self.rx_sem.acquire() + """Ensure contacts are fetched""" + if not self._contacts: + await self.commands.get_contacts() return True - - try: - await asyncio.wait_for(self.rx_sem.acquire(), timeout) - return True - except TimeoutError : - printerr("Timeout waiting msg") - return False - - async def wait_ack(self, timeout=6): - """ Wait ack """ - try: - await asyncio.wait_for(self.ack_ev.wait(), timeout) - return True - except TimeoutError : - printerr("Timeout waiting ack") - return False - - async def send_cli(self, cmd): - data = b"\x32" + cmd.encode('ascii') - return await self.send(data) + return False \ No newline at end of file diff --git a/src/meshcore/meshcore_new.py b/src/meshcore/meshcore_new.py new file mode 100644 index 0000000..e48caca --- /dev/null +++ b/src/meshcore/meshcore_new.py @@ -0,0 +1,249 @@ +import asyncio +from typing import Dict, Any, Optional, Callable + +from .events import EventDispatcher, MessageType, Event +from .reader import MessageReader +from .commands import CommandHandler, deprecated + + +class MeshCore: + def __init__(self, cx): + self.cx = cx + self.dispatcher = EventDispatcher() + self._reader = MessageReader(self.dispatcher) + self.commands = CommandHandler() + + # Set up connections + self.commands.set_connection(cx) + + # Initialize state + self.contacts = {} + self.self_info = {} + self.time = 0 + + # Set the message handler in the connection + cx.set_mc(self) + + async def connect(self): + # Start the event dispatcher + await self.dispatcher.start() + + # Start the command handler + await self.commands.start() + + # Send the initial app start + return await self.commands.send_appstart() + + async def disconnect(self): + # Stop the event dispatcher + await self.dispatcher.stop() + + # Stop the command handler + await self.commands.stop() + + # Internal method - called by the connection + def handle_rx(self, data: bytearray): + asyncio.create_task(self._reader.handle_rx(data)) + + # Expose subscribe/wait capabilities from the event system + def subscribe(self, message_type, callback): + return self.dispatcher.subscribe(message_type, callback) + + async def wait_for_event(self, message_type, timeout=None): + return await self.dispatcher.wait_for_event(message_type, timeout) + + # Legacy method implementations that delegate to the command handler + # using the deprecated decorator from commands.py + + @deprecated + async def send(self, data, timeout=5): + return await self.commands.send(data, timeout) + + @deprecated + async def send_only(self, data): + await self.commands.send_only(data) + + @deprecated + async def send_appstart(self): + return await self.commands.send_appstart() + + @deprecated + async def send_device_query(self): + return await self.commands.send_device_query() + + @deprecated + async def send_advert(self, flood=False): + return await self.commands.send_advert(flood) + + @deprecated + async def set_name(self, name): + return await self.commands.set_name(name) + + @deprecated + async def set_coords(self, lat, lon): + return await self.commands.set_coords(lat, lon) + + @deprecated + async def reboot(self): + return await self.commands.reboot() + + @deprecated + async def get_bat(self): + return await self.commands.get_bat() + + @deprecated + async def get_time(self): + time_result = await self.commands.get_time() + if isinstance(time_result, int): + self.time = time_result + return self.time + + @deprecated + async def set_time(self, val): + return await self.commands.set_time(val) + + @deprecated + async def set_tx_power(self, val): + return await self.commands.set_tx_power(val) + + @deprecated + async def set_radio(self, freq, bw, sf, cr): + return await self.commands.set_radio(freq, bw, sf, cr) + + @deprecated + async def set_tuning(self, rx_dly, af): + return await self.commands.set_tuning(rx_dly, af) + + @deprecated + async def set_devicepin(self, pin): + return await self.commands.set_devicepin(pin) + + @deprecated + async def get_contacts(self): + await self.commands.get_contacts() + contact_end = await self.dispatcher.wait_for_event(MessageType.CONTACT_END) + if contact_end: + self.contacts = contact_end.payload + return self.contacts + + @deprecated + async def ensure_contacts(self): + if not self.contacts: + await self.get_contacts() + + @deprecated + async def reset_path(self, key): + return await self.commands.reset_path(key) + + @deprecated + async def share_contact(self, key): + return await self.commands.share_contact(key) + + @deprecated + async def export_contact(self, key=b""): + return await self.commands.export_contact(key) + + @deprecated + async def remove_contact(self, key): + return await self.commands.remove_contact(key) + + @deprecated + async def set_out_path(self, contact, path): + contact["out_path"] = path + contact["out_path_len"] = -1 + contact["out_path_len"] = int(len(path) / 2) + + @deprecated + async def update_contact(self, contact): + out_path_hex = contact["out_path"] + out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0" + adv_name_hex = contact["adv_name"].encode().hex() + adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0" + data = b"\x09" \ + + bytes.fromhex(contact["public_key"])\ + + contact["type"].to_bytes(1)\ + + contact["flags"].to_bytes(1)\ + + contact["out_path_len"].to_bytes(1, 'little', signed=True)\ + + bytes.fromhex(out_path_hex)\ + + bytes.fromhex(adv_name_hex)\ + + contact["last_advert"].to_bytes(4, 'little')\ + + int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\ + + int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True) + return await self.send(data) + + @deprecated + async def send_login(self, dst, pwd): + await self.commands.send_login(dst, pwd) + login_event = await self.dispatcher.wait_for_event(MessageType.LOGIN_SUCCESS, 0.1) + if login_event: + return True + return await self.commands.send_login(dst, pwd) + + @deprecated + async def wait_login(self, timeout=5): + login_event = await self.dispatcher.wait_for_event(MessageType.LOGIN_SUCCESS, timeout) + if login_event: + return True + login_failed = await self.dispatcher.wait_for_event(MessageType.LOGIN_FAILED, 0) + if login_failed: + return False + return False + + @deprecated + async def send_statusreq(self, dst): + await self.commands.send_statusreq(dst) + + @deprecated + async def wait_status(self, timeout=5): + status_event = await self.dispatcher.wait_for_event(MessageType.STATUS_RESPONSE, timeout) + if status_event: + return status_event.payload + return False + + @deprecated + async def send_cmd(self, dst, cmd): + timestamp = await self.get_time() + return await self.commands.send_cmd(dst, cmd, timestamp.to_bytes(4, 'little')) + + @deprecated + async def send_msg(self, dst, msg): + timestamp = await self.get_time() + result = await self.commands.send_msg(dst, msg, timestamp.to_bytes(4, 'little')) + return result + + @deprecated + async def send_chan_msg(self, chan, msg): + timestamp = await self.get_time() + return await self.commands.send_chan_msg(chan, msg, timestamp.to_bytes(4, 'little')) + + @deprecated + async def get_msg(self): + await self.commands.get_msg() + + # Wait for any message type that could be received + message_types = [ + MessageType.CONTACT_MSG_RECV, + MessageType.CHANNEL_MSG_RECV, + MessageType.NO_MORE_MSGS + ] + + for msg_type in message_types: + event = await self.dispatcher.wait_for_event(msg_type, 0) + if event: + return event.payload + + return False + + @deprecated + async def wait_msg(self, timeout=-1): + msg_event = await self.dispatcher.wait_for_event(MessageType.MESSAGES_WAITING, timeout) + return msg_event is not None + + @deprecated + async def wait_ack(self, timeout=6): + ack_event = await self.dispatcher.wait_for_event(MessageType.ACK, timeout) + return ack_event is not None + + @deprecated + async def send_cli(self, cmd): + return await self.commands.send_cli(cmd) \ No newline at end of file diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py new file mode 100644 index 0000000..2539982 --- /dev/null +++ b/src/meshcore/packets.py @@ -0,0 +1,30 @@ +from enum import Enum + +# Packet prefixes for the protocol +class PacketType(Enum): + OK = 0 + ERROR = 1 + CONTACT_START = 2 + CONTACT = 3 + CONTACT_END = 4 + SELF_INFO = 5 + MSG_SENT = 6 + CONTACT_MSG_RECV = 7 + CHANNEL_MSG_RECV = 8 + CURRENT_TIME = 9 + NO_MORE_MSGS = 10 + CONTACT_SHARE = 11 + BATTERY = 12 + DEVICE_INFO = 13 + CLI_RESPONSE = 50 + + # Push notifications + ADVERTISEMENT = 0x80 + PATH_UPDATE = 0x81 + ACK = 0x82 + MESSAGES_WAITING = 0x83 + RAW_DATA = 0x84 + LOGIN_SUCCESS = 0x85 + LOGIN_FAILED = 0x86 + STATUS_RESPONSE = 0x87 + LOG_DATA = 0x88 \ No newline at end of file diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py new file mode 100644 index 0000000..761bdbb --- /dev/null +++ b/src/meshcore/reader.py @@ -0,0 +1,235 @@ +import sys +import logging +import asyncio +from typing import Any, Optional, Dict +from .events import Event, EventType, EventDispatcher +from .packets import PacketType + +logger = logging.getLogger("meshcore") + + +class MessageReader: + def __init__(self, dispatcher: EventDispatcher): + self.dispatcher = dispatcher + # We're only keeping state here that's needed for processing + # before events are dispatched + self.contacts = {} # Temporary storage during contact list building + self.contact_nb = 0 # Used for contact processing + + async def handle_rx(self, data: bytearray): + packet_type_value = data[0] + logger.debug(f"Received data: {data.hex()}") + + # Handle command responses + if packet_type_value == PacketType.OK.value: + result = None + if len(data) == 5: + result = int.from_bytes(data[1:5], byteorder='little') + else: + result = True + + # Dispatch event for the OK response + await self.dispatcher.dispatch(Event(EventType.OK, result)) + + elif packet_type_value == PacketType.ERROR.value: + result = False + if len(data) > 1: + result = {"error_code": data[1]} + + # Dispatch event for the ERROR response + await self.dispatcher.dispatch(Event(EventType.ERROR, result)) + + elif packet_type_value == PacketType.CONTACT_START.value: + self.contact_nb = int.from_bytes(data[1:5], byteorder='little') + self.contacts = {} + + elif packet_type_value == PacketType.CONTACT.value: + c = {} + c["public_key"] = data[1:33].hex() + c["type"] = data[33] + c["flags"] = data[34] + c["out_path_len"] = int.from_bytes(data[35:36], signed=True) + plen = int.from_bytes(data[35:36], signed=True) + if plen == -1: + plen = 0 + c["out_path"] = data[36:36+plen].hex() + c["adv_name"] = data[100:132].decode().replace("\0","") + c["last_advert"] = int.from_bytes(data[132:136], byteorder='little') + c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6 + c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6 + c["lastmod"] = int.from_bytes(data[144:148], byteorder='little') + self.contacts[c["public_key"]] = c + + elif packet_type_value == PacketType.CONTACT_END.value: + await self.dispatcher.dispatch(Event(EventType.CONTACTS, self.contacts)) + + + elif packet_type_value == PacketType.SELF_INFO.value: + self_info = {} + self_info["adv_type"] = data[1] + self_info["tx_power"] = data[2] + self_info["max_tx_power"] = data[3] + self_info["public_key"] = data[4:36].hex() + self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6 + self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6 + self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000 + self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000 + self_info["radio_sf"] = data[56] + self_info["radio_cr"] = data[57] + self_info["name"] = data[58:].decode() + await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) + + elif packet_type_value == PacketType.MSG_SENT.value: + res = {} + res["type"] = data[1] + res["expected_ack"] = bytes(data[2:6]) + res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res)) + + elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: + res = {} + res["type"] = "PRIV" + res["pubkey_prefix"] = data[1:7].hex() + res["path_len"] = data[7] + res["txt_type"] = data[8] + res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little') + if data[8] == 2: + res["signature"] = data[13:17].hex() + res["text"] = data[17:].decode() + else: + res["text"] = data[13:].decode() + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res)) + + elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "PRIV" + res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4 + res["pubkey_prefix"] = data[4:10].hex() + res["path_len"] = data[10] + res["txt_type"] = data[11] + res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little') + if data[11] == 2: + res["signature"] = data[16:20].hex() + res["text"] = data[20:].decode() + else: + res["text"] = data[16:].decode() + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, {"extended": True})) + + elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: + res = {} + res["type"] = "CHAN" + res["channel_idx"] = data[1] + res["path_len"] = data[2] + res["txt_type"] = data[3] + res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little') + res["text"] = data[8:].decode() + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res)) + + elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "CHAN" + res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4 + res["channel_idx"] = data[4] + res["path_len"] = data[5] + res["txt_type"] = data[6] + res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little') + res["text"] = data[11:].decode() + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, {"extended": True})) + + elif packet_type_value == PacketType.CURRENT_TIME.value: + result = int.from_bytes(data[1:5], byteorder='little') + await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) + + elif packet_type_value == PacketType.NO_MORE_MSGS.value: + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, False)) + + elif packet_type_value == PacketType.CONTACT_SHARE.value: + result = "meshcore://" + data[1:].hex() + await self.dispatcher.dispatch(Event(EventType.CONTACT_SHARE, result)) + + elif packet_type_value == PacketType.BATTERY.value: + result = int.from_bytes(data[1:3], byteorder='little') + await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) + + elif packet_type_value == PacketType.DEVICE_INFO.value: + res = {} + res["fw ver"] = data[1] + if data[1] >= 3: + res["max_contacts"] = data[2] * 2 + res["max_channels"] = data[3] + res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little') + res["fw_build"] = data[8:20].decode().replace("\0","") + res["model"] = data[20:60].decode().replace("\0","") + res["ver"] = data[60:80].decode().replace("\0","") + await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) + + elif packet_type_value == PacketType.CLI_RESPONSE.value: + res = {} + res["response"] = data[1:].decode() + await self.dispatcher.dispatch(Event(EventType.CLI_RESPONSE, res)) + + # Push notifications + elif packet_type_value == PacketType.ADVERTISEMENT.value: + logger.debug("Advertisement received") + # todo: Read advertisement? + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, None)) + + elif packet_type_value == PacketType.PATH_UPDATE.value: + logger.debug("Code path update") + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, None)) + + elif packet_type_value == PacketType.ACK.value: + logger.debug("Received ACK") + await self.dispatcher.dispatch(Event(EventType.ACK, None)) + + elif packet_type_value == PacketType.MESSAGES_WAITING.value: + logger.debug("Msgs are waiting") + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, None)) + + elif packet_type_value == PacketType.RAW_DATA.value: + res = {} + res["SNR"] = data[1] / 4 + res["RSSI"] = data[2] + res["payload"] = data[4:].hex() + logger.debug("Received raw data") + print(res) + await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) + + elif packet_type_value == PacketType.LOGIN_SUCCESS.value: + logger.debug("Login success") + await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, None)) + + elif packet_type_value == PacketType.LOGIN_FAILED.value: + logger.debug("Login failed") + await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, None)) + + elif packet_type_value == PacketType.STATUS_RESPONSE.value: + res = {} + res["pubkey_pre"] = data[2:8].hex() + res["bat"] = int.from_bytes(data[8:10], byteorder='little') + res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little') + res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little') + res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True) + res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False) + res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False) + res["airtime"] = int.from_bytes(data[24:28], byteorder='little') + res["uptime"] = int.from_bytes(data[28:32], byteorder='little') + res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little') + res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little') + res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little') + res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little') + res["full_evts"] = int.from_bytes(data[48:50], byteorder='little') + res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4 + res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little') + res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little') + data_hex = data[8:].hex() + logger.debug(f"Status response: {data_hex}") + await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res)) + + elif packet_type_value == PacketType.LOG_DATA.value: + logger.debug("Received log data") + await self.dispatcher.dispatch(Event(EventType.LOG_DATA, data[1:].decode('utf-8', errors='replace'))) + + else: + logger.debug(f"Unhandled data received {data}") + logger.debug(f"Unhandled packet type: {packet_type_value}") \ No newline at end of file diff --git a/src/meshcore/serial_cx.py b/src/meshcore/serial_cx.py index f7e9b7b..9258a17 100644 --- a/src/meshcore/serial_cx.py +++ b/src/meshcore/serial_cx.py @@ -3,9 +3,11 @@ """ import asyncio import sys +import logging import serial_asyncio -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") class SerialConnection: def __init__(self, port, baudrate): @@ -22,21 +24,20 @@ class SerialConnection: def connection_made(self, transport): self.cx.transport = transport -# printerr('port opened') + logger.debug('port opened') transport.serial.rts = False # You can manipulate Serial object via transport def data_received(self, data): -# printerr('data received') self.cx.handle_rx(data) def connection_lost(self, exc): - printerr('port closed') + logger.info('port closed') def pause_writing(self): - printerr('pause writing') + logger.debug('pause writing') def resume_writing(self): - printerr('resume writing') + logger.debug('resume writing') async def connect(self): """ @@ -47,11 +48,11 @@ class SerialConnection: loop, lambda: self.MCSerialClientProtocol(self), self.port, baudrate=self.baudrate) - printerr("Serial Connexion started") + logger.info("Serial Connection started") return self.port - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, data: bytearray): headerlen = len(self.header) @@ -69,8 +70,8 @@ class SerialConnection: self.inframe = self.inframe + data else: self.inframe = self.inframe + data[:self.frame_size-framelen] - if not self.mc is None: - self.mc.handle_rx(self.inframe) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(self.inframe)) self.frame_started = False self.header = b"" self.inframe = b"" @@ -80,5 +81,5 @@ class SerialConnection: async def send(self, data): size = len(data) pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data -# printerr(f"sending pkt : {pkt}") - self.transport.write(pkt) + logger.debug(f"sending pkt : {pkt}") + self.transport.write(pkt) \ No newline at end of file diff --git a/src/meshcore/tcp_cx.py b/src/meshcore/tcp_cx.py index e998a18..60aafc6 100644 --- a/src/meshcore/tcp_cx.py +++ b/src/meshcore/tcp_cx.py @@ -3,8 +3,10 @@ """ import asyncio import sys +import logging -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") class TCPConnection: def __init__(self, host, port): @@ -22,15 +24,17 @@ class TCPConnection: def connection_made(self, transport): self.cx.transport = transport + logger.debug('connection established') def data_received(self, data): + logger.debug('data received') self.cx.handle_rx(data) def error_received(self, exc): - printerr(f'Error received: {exc}') + logger.error(f'Error received: {exc}') def connection_lost(self, exc): - printerr('The server closed the connection') + logger.info('The server closed the connection') async def connect(self): """ @@ -41,11 +45,11 @@ class TCPConnection: lambda: self.MCClientProtocol(self), self.host, self.port) - printerr("TCP Connexion started") + logger.info("TCP Connection started") return self.host - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, data: bytearray): headerlen = len(self.header) @@ -63,8 +67,8 @@ class TCPConnection: self.inframe = self.inframe + data else: self.inframe = self.inframe + data[:self.frame_size-framelen] - if not self.mc is None: - self.mc.handle_rx(self.inframe) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(self.inframe)) self.frame_started = False self.header = b"" self.inframe = b"" @@ -74,4 +78,5 @@ class TCPConnection: async def send(self, data): size = len(data) pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data + logger.debug(f"sending pkt : {pkt}") self.transport.write(pkt)