diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..01d7f95 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +venv \ No newline at end of file diff --git a/README.md b/README.md index 76bb37e..bea2c49 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,280 @@ -# Python meshcore +# Python MeshCore -Bindings to access your [MeshCore](https://meshcore.co.uk) companion radio nodes in python. +Python library for interacting with [MeshCore](https://meshcore.co.uk) companion radio nodes. -Can be installed via `pip` using `pip install meshcore` +## Installation -Used by [mccli](https://github.com/fdlamotte/mccli). +```bash +pip install meshcore +``` + +## Quick Start + +Connect to your device and send a message: + +```python +import asyncio +from meshcore import MeshCore, EventType + +async def main(): + # Connect to your device + meshcore = await MeshCore.create_serial("/dev/ttyUSB0") + + # Get your contacts + contacts = await meshcore.commands.get_contacts() + print(f"Found {len(contacts)} contacts") + + # Send a message to the first contact + if contacts: + contact_key = next(iter(contacts.items()))[1]['public_key'] + await meshcore.commands.send_msg(bytes.fromhex(contact_key), "Hello from Python!") + + await meshcore.disconnect() + +asyncio.run(main()) +``` + +## Development Setup + +To set up for development: + +```bash +# Create and activate virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install in development mode +pip install -e . + +# Run examples +python examples/pubsub_example.py -p /dev/ttyUSB0 +``` + +## Usage Guide + +### Connecting to Your Device + +Connect via Serial, BLE, or TCP: + +```python +# Serial connection +meshcore = await MeshCore.create_serial("/dev/ttyUSB0", 115200, debug=True) + +# BLE connection (scans for devices if address not provided) +meshcore = await MeshCore.create_ble("12:34:56:78:90:AB") + +# TCP connection +meshcore = await MeshCore.create_tcp("192.168.1.100", 4000) +``` + +### Using Commands (Synchronous Style) + +Send commands and wait for responses: + +```python +# Get device information +device_info = await meshcore.commands.send_device_query() +print(f"Device model: {device_info['model']}") + +# Get list of contacts +contacts = await meshcore.commands.get_contacts() +for contact_id, contact in contacts.items(): + print(f"Contact: {contact['adv_name']} ({contact_id})") + +# Send a message (destination key in bytes) +await meshcore.commands.send_msg(dst_key, "Hello!") + +# Setting device parameters +await meshcore.commands.set_name("My Device") +await meshcore.commands.set_tx_power(20) # Set transmit power +``` + +### Finding Contacts + +Easily find contacts by name or key: + +```python +# Find a contact by name +contact = meshcore.get_contact_by_name("Bob's Radio") +if contact: + print(f"Found Bob at: {contact['adv_lat']}, {contact['adv_lon']}") + +# Find by partial key prefix +contact = meshcore.get_contact_by_key_prefix("a1b2c3") +``` + +### Event-Based Programming (Asynchronous Style) + +Subscribe to events to handle them asynchronously: + +```python +# Subscribe to incoming messages +async def handle_message(event): + data = event.payload + print(f"Message from {data['pubkey_prefix']}: {data['text']}") + +subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, handle_message) + +# Subscribe to advertisements +async def handle_advert(event): + print("Advertisement detected!") + +meshcore.subscribe(EventType.ADVERTISEMENT, handle_advert) + +# When done, unsubscribe +meshcore.unsubscribe(subscription) +``` + +#### Filtering Events by Attributes + +Filter events based on their attributes to handle only specific ones: + +```python +# Subscribe only to messages from a specific contact +async def handle_specific_contact_messages(event): + print(f"Message from Alice: {event.payload['text']}") + +contact = meshcore.get_contact_by_name("Alice") +if contact: + alice_subscription = meshcore.subscribe( + EventType.CONTACT_MSG_RECV, + handle_specific_contact_messages, + attribute_filters={"pubkey_prefix": contact["public_key"][:12]} + ) + +# Send a message and wait for its specific acknowledgment +async def send_and_confirm_message(meshcore, dst_key, message): + # Send the message and get information about the sent message + sent_result = await meshcore.commands.send_msg(dst_key, message) + + # Extract the expected acknowledgment code from the message sent event + expected_ack = sent_result["expected_ack"].hex() + print(f"Message sent, waiting for ack with code: {expected_ack}") + + # Wait specifically for this acknowledgment + result = await meshcore.wait_for_event( + EventType.ACK, + attribute_filters={"code": expected_ack}, + timeout=10.0 + ) + + if result: + print("Message confirmed delivered!") + return True + else: + print("Message delivery confirmation timed out") + return False +``` + +### Hybrid Approach (Commands + Events) + +Combine command-based and event-based styles: + +```python +import asyncio + +async def main(): + # Connect to device + meshcore = await MeshCore.create_serial("/dev/ttyUSB0") + + # Set up event handlers + async def handle_ack(event): + print("Message acknowledged!") + + async def handle_battery(event): + print(f"Battery level: {event.payload}%") + + # Subscribe to events + meshcore.subscribe(EventType.ACK, handle_ack) + meshcore.subscribe(EventType.BATTERY, handle_battery) + + # Create background task for battery checking + async def check_battery_periodically(): + while True: + # Send command (returns battery level) + result = await meshcore.commands.get_bat() + print(f"Battery check initiated, response: {result}") + await asyncio.sleep(60) # Wait 60 seconds between checks + + # Start the background task + battery_task = asyncio.create_task(check_battery_periodically()) + + # Send manual command and wait for response + await meshcore.commands.send_advert(flood=True) + + try: + # Keep the main program running + await asyncio.sleep(float('inf')) + except asyncio.CancelledError: + # Clean up when program ends + battery_task.cancel() + await meshcore.disconnect() + +# Run the program +asyncio.run(main()) +``` + +### Auto-Fetching Messages + +Let the library automatically fetch incoming messages: + +```python +# Start auto-fetching messages +await meshcore.start_auto_message_fetching() + +# Just subscribe to message events - the library handles fetching +async def on_message(event): + print(f"New message: {event.payload['text']}") + +meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_message) + +# When done +await meshcore.stop_auto_message_fetching() +``` + +### Debug Mode + +Enable debug logging for troubleshooting: + +```python +# Enable debug mode when creating the connection +meshcore = await MeshCore.create_serial("/dev/ttyUSB0", debug=True) +``` + +This logs detailed information about commands sent and events received. + +## Common Examples + +### Sending Messages to Contacts + +```python +# Get contacts and send to a specific one +contacts = await meshcore.commands.get_contacts() +for key, contact in contacts.items(): + if contact["adv_name"] == "Alice": + # Convert the hex key to bytes + dst_key = bytes.fromhex(contact["public_key"]) + await meshcore.commands.send_msg(dst_key, "Hello Alice!") + break +``` + +### Monitoring Channel Messages + +```python +# Subscribe to channel messages +async def channel_handler(event): + msg = event.payload + print(f"Channel {msg['channel_idx']}: {msg['text']}") + +meshcore.subscribe(EventType.CHANNEL_MSG_RECV, channel_handler) +``` + +## Examples in the Repo + +Check the `examples/` directory for more: + +- `pubsub_example.py`: Event subscription system with auto-fetching +- `serial_infos.py`: Quick device info retrieval +- `serial_msg.py`: Message sending and receiving +- `ble_t1000_infos.py`: BLE connections diff --git a/examples/ble_t1000_infos.py b/examples/ble_t1000_infos.py index 132da32..a48d591 100755 --- a/examples/ble_t1000_infos.py +++ b/examples/ble_t1000_infos.py @@ -3,16 +3,14 @@ import asyncio from meshcore import MeshCore -from meshcore import BLEConnection ADDRESS = "t1000" -async def main () : - con = BLEConnection(ADDRESS) - await con.connect() - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_ble(ADDRESS) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/ble_t1000_msg.py b/examples/ble_t1000_msg.py index b2a5dbd..c9ec543 100755 --- a/examples/ble_t1000_msg.py +++ b/examples/ble_t1000_msg.py @@ -1,7 +1,6 @@ #!/usr/bin/python import asyncio -import json from meshcore import MeshCore from meshcore import BLEConnection @@ -15,7 +14,7 @@ async def main () : mc = MeshCore(con) await mc.connect() - await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.get_contacts() + await mc.commands.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) asyncio.run(main()) diff --git a/examples/pubsub_example.py b/examples/pubsub_example.py new file mode 100644 index 0000000..e72d175 --- /dev/null +++ b/examples/pubsub_example.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +""" +Example demonstrating the new pub-sub system in MeshCore +Shows how to subscribe to events and use the event system +""" +import asyncio +import argparse + +from meshcore import MeshCore +from meshcore.events import EventType + + +async def message_callback(event): + print(f"\nReceived message: {event.payload['text']}") + print(f"From: {event.payload.get('pubkey_prefix', 'channel')}") + print(f"Type: {event.payload['type']}") + print(f"Timestamp: {event.payload['sender_timestamp']}") + + +async def advertisement_callback(event): + print("\nDetected advertisement") + + +async def main(): + parser = argparse.ArgumentParser(description="MeshCore Pub-Sub Example") + parser.add_argument( + "--port", "-p", help="Serial port", required=True + ) + parser.add_argument( + "--baud", "-b", type=int, help="Baud rate", default=115200 + ) + args = parser.parse_args() + + print(f"Connecting to {args.port} at {args.baud} baud...") + + # Create MeshCore instance with serial connection + meshcore = await MeshCore.create_serial(args.port, args.baud, debug=True) + + # Connection is already established + success = True + if not success: + print("Failed to connect to MeshCore device") + return + + print("Connected to MeshCore device") + + # Get contacts + contacts = await meshcore.commands.get_contacts() + if contacts: + print(f"\nFound {len(contacts)} contacts:") + for name, contact in contacts.items(): + print(f"- {name}") + + + await meshcore.commands.send_advert(flood=True) + + # Subscribe to private messages + private_subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, message_callback) + + # Subscribe to channel messages + channel_subscription = meshcore.subscribe(EventType.CHANNEL_MSG_RECV, message_callback) + + # Subscribe to advertisements + advert_subscription = meshcore.subscribe(EventType.ADVERTISEMENT, advertisement_callback) + + await meshcore.start_auto_message_fetching() + + print("\nSubscribed to events:") + print("- Private messages") + print("- Channel messages") + print("- Advertisements") + + print("\nWaiting for events. Press Ctrl+C to exit...\n") + + # Get device info + device_info = await meshcore.commands.send_device_query() + if device_info: + print(f"Device info: {device_info}") + + # Get time from the device + device_time = await meshcore.commands.get_time() + print(f"Device time: {device_time}") + + # Access current time through the property + print(f"Current time (property): {meshcore.time}") + + try: + while True: + # Wait for messages + await asyncio.sleep(1) + except KeyboardInterrupt: + meshcore.stop() + print("\nExiting...") + except asyncio.CancelledError: + # Handle task cancellation from KeyboardInterrupt in asyncio.run() + print("\nTask cancelled - cleaning up...") + finally: + # Clean up subscriptions + meshcore.unsubscribe(private_subscription) + meshcore.unsubscribe(channel_subscription) + meshcore.unsubscribe(advert_subscription) + + # Stop auto-message fetching + await meshcore.stop_auto_message_fetching() + + # Disconnect + await meshcore.disconnect() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + # This prevents the KeyboardInterrupt traceback from being shown + print("\nExited cleanly") + except Exception as e: + print(f"Error: {e}") \ No newline at end of file diff --git a/examples/rf_packet_monitor.py b/examples/rf_packet_monitor.py new file mode 100644 index 0000000..47b6408 --- /dev/null +++ b/examples/rf_packet_monitor.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +import asyncio +import argparse +from meshcore import MeshCore +from meshcore.events import EventType + +async def main(): + parser = argparse.ArgumentParser(description='MeshCore RF Packet Monitor') + parser.add_argument('-p', '--port', required=True, help='Serial port path') + args = parser.parse_args() + + try: + # Connect to device + print(f"Connecting to {args.port}...") + mc = await MeshCore.create_serial(args.port, 115200) + + def handle_rf_packet(event): + packet = event.payload + if isinstance(packet, dict): + print(f"Raw RF packet received:") + if 'snr' in packet: + print(f" SNR: {packet['snr']:.1f} dB") + if 'rssi' in packet: + print(f" RSSI: {packet['rssi']} dBm") + if 'payload_length' in packet: + print(f" Payload length: {packet['payload_length']} bytes") + if 'payload' in packet: + print(f" Payload (hex): {packet['payload']}") + else: + print(f"RF packet received: {packet}") + + # Subscribe to RF log data + subscription = mc.subscribe(EventType.RX_LOG_DATA, handle_rf_packet) + + print("Waiting for log data (press Ctrl+C to exit)...") + try: + # Keep the script running to receive logs + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + print("\nExiting...") + + # Clean up + mc.unsubscribe(subscription) + await mc.disconnect() + + except Exception as e: + print(f"Error: {e}") + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/examples/serial_battery_monitor.py b/examples/serial_battery_monitor.py new file mode 100644 index 0000000..6343f4c --- /dev/null +++ b/examples/serial_battery_monitor.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +""" +Simple example of eventing approach with battery monitoring +""" +import asyncio +import argparse + +from meshcore import MeshCore +from meshcore.events import EventType + + +async def main(): + parser = argparse.ArgumentParser(description="Battery Monitor Example") + parser.add_argument("--port", "-p", help="Serial port", required=True) + parser.add_argument("--baud", "-b", type=int, help="Baud rate", default=115200) + args = parser.parse_args() + + print(f"Connecting to {args.port}...") + + # Connect to device + meshcore = await MeshCore.create_serial(args.port, args.baud) + + # Event handler for battery updates + async def on_battery(event): + print(f"Battery event: {event.payload}mV") + + # Subscribe to battery events + meshcore.subscribe(EventType.BATTERY, on_battery) + + # Background task that checks battery every 10 seconds + async def check_battery(): + while True: + print("Requesting battery level...") + await meshcore.commands.get_bat() # This command will trigger a battery event + await asyncio.sleep(10) + + # Start the battery check task + task = asyncio.create_task(check_battery()) + + try: + # Keep program running + print("Monitoring battery (press Ctrl+C to exit)...") + await asyncio.sleep(float('inf')) + except KeyboardInterrupt: + print("\nExiting...") + finally: + # Clean up + task.cancel() + await meshcore.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/examples/serial_contacts.py b/examples/serial_contacts.py index 2ab5c76..9b07a02 100755 --- a/examples/serial_contacts.py +++ b/examples/serial_contacts.py @@ -2,6 +2,7 @@ import asyncio import json + from meshcore import MeshCore from meshcore import SerialConnection @@ -15,6 +16,6 @@ async def main () : mc = MeshCore(con) await mc.connect() - print(json.dumps(await mc.get_contacts(),indent=4)) + print(json.dumps(await mc.commands.get_contacts(),indent=4)) asyncio.run(main()) diff --git a/examples/serial_infos.py b/examples/serial_infos.py index cb41be5..7e8000d 100755 --- a/examples/serial_infos.py +++ b/examples/serial_infos.py @@ -3,19 +3,15 @@ import asyncio from meshcore import MeshCore -from meshcore import SerialConnection -PORT = "/dev/ttyUSB0" +PORT = "/dev/tty.usbserial-583A0069501" BAUDRATE = 115200 -async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish - - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_serial(PORT, BAUDRATE) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/serial_msg.py b/examples/serial_msg.py index 6a093ec..cb21a10 100755 --- a/examples/serial_msg.py +++ b/examples/serial_msg.py @@ -1,24 +1,67 @@ #!/usr/bin/python +""" +Example of sending a message and waiting for its specific acknowledgment +using event attribute filtering. +""" import asyncio -import json -from meshcore import MeshCore -from meshcore import SerialConnection +import argparse +from meshcore import MeshCore, EventType -PORT = "/dev/ttyUSB0" -BAUDRATE = 115200 -DEST = "mchome" -MSG = "hello from serial" +async def main(): + # Parse command line arguments + parser = argparse.ArgumentParser(description="Send a message and wait for ACK") + parser.add_argument("-p", "--port", required=True, help="Serial port") + parser.add_argument("-b", "--baudrate", type=int, default=115200, help="Baud rate") + parser.add_argument("-d", "--dest", default="🦄", help="Destination contact name") + parser.add_argument("-m", "--message", default="hello from serial", help="Message to send") + parser.add_argument("--timeout", type=float, default=30.0, help="ACK timeout in seconds") + args = parser.parse_args() -async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish + # Connect to the device + mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True) - mc = MeshCore(con) - await mc.connect() + try: + # Make sure we have contacts loaded + await mc.ensure_contacts() + + # Find the contact by name + contact = mc.get_contact_by_name(args.dest) + if not contact: + print(f"Contact '{args.dest}' not found. Available contacts:") + for name, c in mc.contacts.items(): + print(f"- {c.get('adv_name', 'Unknown')}") + return + + print(f"Found contact: {contact.get('adv_name')} ({contact['public_key'][:12]}...)") + + # Send the message and get the MSG_SENT event + print(f"Sending message: '{args.message}'") + send_result = await mc.commands.send_msg( + bytes.fromhex(contact["public_key"])[0:6], + args.message + ) + + # Extract the expected ACK code + expected_ack = send_result["expected_ack"].hex() + print(f"Message sent, waiting for ACK with code: {expected_ack}") + + # Wait for the specific ACK that matches our message + ack_event = await mc.wait_for_event( + EventType.ACK, + attribute_filters={"code": expected_ack}, + timeout=args.timeout + ) + + if ack_event: + print(f"✅ Message confirmed delivered! (ACK received)") + else: + print(f"⚠️ Timed out waiting for ACK after {args.timeout} seconds") + + finally: + # Always disconnect + await mc.disconnect() - await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) +if __name__ == "__main__": + asyncio.run(main()) -asyncio.run(main()) diff --git a/examples/serial_repeater_status.py b/examples/serial_repeater_status.py index b0ebfdd..2a77336 100755 --- a/examples/serial_repeater_status.py +++ b/examples/serial_repeater_status.py @@ -3,31 +3,27 @@ import asyncio from meshcore import MeshCore -from meshcore import printerr -from meshcore import SerialConnection +from meshcore.events import EventType -PORT = "/dev/ttyUSB0" +PORT = "/dev/tty.usbserial-583A0069501" BAUDRATE = 115200 -REPEATER="FdlRoom" -PASSWORD="password" +REPEATER="Orion" +PASSWORD="floopyboopy" async def main () : - con = SerialConnection(PORT, BAUDRATE) - await con.connect() - await asyncio.sleep(0.1) # time for transport to establish + mc = await MeshCore.create_serial(PORT, BAUDRATE) + await mc.commands.get_contacts() + repeater = mc.get_contact_by_name(REPEATER) + + await mc.commands.send_login(bytes.fromhex(repeater["public_key"]), PASSWORD) - mc = MeshCore(con) - await mc.connect() + print("Login sent ... awaiting") - contacts = await mc.get_contacts() - repeater = contacts[REPEATER] - await mc.send_login(bytes.fromhex(repeater["public_key"]), PASSWORD) - - printerr("Login sent ... awaiting") - - if await mc.wait_login() : - await mc.send_statusreq(bytes.fromhex(repeater["public_key"])) - print(await mc.wait_status()) + if await mc.wait_for_event(EventType.LOGIN_SUCCESS): + print("Logged in success") + await mc.commands.send_statusreq(bytes.fromhex(repeater["public_key"])) + print("Status request sent ... awaiting") + print(await mc.wait_for_event(EventType.STATUS_RESPONSE)) asyncio.run(main()) diff --git a/examples/serial_trace.py b/examples/serial_trace.py new file mode 100644 index 0000000..b169edd --- /dev/null +++ b/examples/serial_trace.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +import asyncio +import argparse +import random +from meshcore import MeshCore +from meshcore.events import EventType + +async def get_repeater_name(mc, hash_prefix): + """Find a contact by its 2-character hash prefix and return its name""" + # Ensure contacts are available + await mc.ensure_contacts() + + # Find contact with matching hash prefix + contact = mc.get_contact_by_key_prefix(hash_prefix) + if contact: + return contact.get("adv_name", f"Unknown ({hash_prefix})") + else: + return f"Unknown ({hash_prefix})" + +async def main(): + parser = argparse.ArgumentParser(description='MeshCore Serial Trace Example') + parser.add_argument('-p', '--port', required=True, help='Serial port path') + parser.add_argument('--path', type=str, help='Trace path (comma-separated hex values)') + args = parser.parse_args() + + try: + # Connect to device + print(f"Connecting to {args.port}...") + mc = await MeshCore.create_serial(args.port, 115200, debug=True) + + # Send trace packet + print(f"Sending trace packet...") + # Send trace with a path if provided + tag = random.randint(1, 0xFFFFFFFF) + result = await mc.commands.send_trace(path=args.path, tag=tag) + + # Check if the result has a success indicator + if result.get("success") == False: + print(f"Failed to send trace packet: {result.get('reason', 'unknown error')}") + elif result: + print(f"Trace packet sent successfully with tag={tag}") + print("Waiting for trace response matching our tag...") + + # Wait for a trace response with our specific tag + event = await mc.wait_for_event( + EventType.TRACE_DATA, + attribute_filters={"tag": tag}, + timeout=15 + ) + + if event: + trace = event.payload + print(f"Trace data received:") + print(f" Tag: {trace['tag']}") + print(f" Flags: {trace.get('flags', 0)}") + print(f" Path Length: {trace.get('path_len', 0)}") + + if trace.get('path'): + print(f" Path ({len(trace['path'])} nodes):") + + # Process nodes with hash (repeaters) + for i, node in enumerate(trace['path']): + if 'hash' in node: + # Look up repeater name + repeater_name = await get_repeater_name(mc, node['hash']) + print(f" Node {i+1}: {repeater_name}, SNR={node['snr']:.1f} dB") + else: + print(f" Node {i+1}: SNR={node['snr']:.1f} dB (final node)") + else: + print("No trace response received within timeout") + else: + print("Failed to send trace packet") + + await mc.disconnect() + + except Exception as e: + print(f"Error: {e}") + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/examples/tcp_mchome_contacts.py b/examples/tcp_mchome_contacts.py index 3e730d8..a4f71f2 100755 --- a/examples/tcp_mchome_contacts.py +++ b/examples/tcp_mchome_contacts.py @@ -14,5 +14,5 @@ async def main () : mc = MeshCore(con) await mc.connect() - print(json.dumps(await mc.get_contacts(),indent=4)) + print(json.dumps(await mc.commands.get_contacts(),indent=4)) asyncio.run(main()) diff --git a/examples/tcp_mchome_infos.py b/examples/tcp_mchome_infos.py index d1ec958..b5c24d2 100755 --- a/examples/tcp_mchome_infos.py +++ b/examples/tcp_mchome_infos.py @@ -3,17 +3,15 @@ import asyncio from meshcore import MeshCore -from meshcore import TCPConnection HOSTNAME = "mchome" PORT = 5000 -async def main () : - con = TCPConnection(HOSTNAME, PORT) - await con.connect() - mc = MeshCore(con) - await mc.connect() - +async def main(): + mc = await MeshCore.create_tcp(HOSTNAME, PORT) + print(mc.self_info) + + await mc.disconnect() asyncio.run(main()) diff --git a/examples/tcp_mchome_msg.py b/examples/tcp_mchome_msg.py index d3f751e..ec44d38 100755 --- a/examples/tcp_mchome_msg.py +++ b/examples/tcp_mchome_msg.py @@ -17,6 +17,6 @@ async def main () : await mc.connect() await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.commands.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6],MSG) asyncio.run(main()) diff --git a/examples/tcp_mchome_readmsgs.py b/examples/tcp_mchome_readmsgs.py index e575000..59b2cde 100755 --- a/examples/tcp_mchome_readmsgs.py +++ b/examples/tcp_mchome_readmsgs.py @@ -18,8 +18,10 @@ async def main () : res = True while res: - res = await mc.get_msg() - if res : - print (res) + result = await mc.commands.get_msg() + if result.get("success") == False: + res = False + print("No more messages") + print (result) asyncio.run(main()) diff --git a/src/meshcore/__init__.py b/src/meshcore/__init__.py index 415d909..0171409 100644 --- a/src/meshcore/__init__.py +++ b/src/meshcore/__init__.py @@ -1,5 +1,10 @@ -from meshcore.meshcore import printerr -from meshcore.meshcore import MeshCore +import logging + +# Setup default logger +logging.basicConfig(level=logging.INFO) + +from meshcore.events import EventType +from meshcore.meshcore import MeshCore, logger from meshcore.tcp_cx import TCPConnection from meshcore.ble_cx import BLEConnection from meshcore.serial_cx import SerialConnection diff --git a/src/meshcore/ble_cx.py b/src/meshcore/ble_cx.py index 6280e8c..fbdca89 100644 --- a/src/meshcore/ble_cx.py +++ b/src/meshcore/ble_cx.py @@ -2,9 +2,10 @@ mccli.py : CLI interface to MeschCore BLE companion app """ import asyncio -import sys +import logging -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic @@ -40,11 +41,11 @@ class BLEConnection: if self.address is None or self.address == "" or len(self.address.split(":")) != 6 : scanner = BleakScanner() - printerr("Scanning for devices") + logger.info("Scanning for devices") device = await scanner.find_device_by_filter(match_meshcore_device) if device is None : return None - printerr(f"Found device : {device}") + logger.info(f"Found device : {device}") self.client = BleakClient(device) self.address = self.client.address else: @@ -60,24 +61,33 @@ class BLEConnection: await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx) nus = self.client.services.get_service(UART_SERVICE_UUID) + if nus is None: + logger.error("Could not find UART service") + return None self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID) - printerr("BLE Connexion started") + logger.info("BLE Connection started") return self.address def handle_disconnect(self, _: BleakClient): """ Callback to handle disconnection """ - printerr ("Device was disconnected, goodbye.") + logger.info("Device was disconnected, goodbye.") # cancelling all tasks effectively ends the program for task in asyncio.all_tasks(): task.cancel() - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray): - if not self.mc is None: - self.mc.handle_rx(data) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(data)) async def send(self, data): + if not self.client: + logger.error("Client is not connected") + return False + if not self.rx_char: + logger.error("RX characteristic not found") + return False await self.client.write_gatt_char(self.rx_char, bytes(data), response=False) diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py new file mode 100644 index 0000000..56e0f4b --- /dev/null +++ b/src/meshcore/commands.py @@ -0,0 +1,269 @@ +import asyncio +import logging +from typing import Any, Dict +from .events import EventType +import random + +logger = logging.getLogger("meshcore") + +class CommandHandler: + DEFAULT_TIMEOUT = 5.0 + + def __init__(self, default_timeout=None): + self._sender_func = None + self._reader = None + self.dispatcher = None + self.default_timeout = default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT + + def set_connection(self, connection): + async def sender(data): + await connection.send(data) + self._sender_func = sender + + def set_reader(self, reader): + self._reader = reader + + def set_dispatcher(self, dispatcher): + self.dispatcher = dispatcher + + async def send(self, data, expected_events=None, timeout=None) -> Dict[str, Any]: + """ + Send a command and wait for expected event responses. + + Args: + data: The data to send + expected_events: EventType or list of EventTypes to wait for + timeout: Timeout in seconds, or None to use default_timeout + + Returns: + Dict[str, Any]: Dictionary containing the response data or status + """ + if not self.dispatcher: + raise RuntimeError("Dispatcher not set, cannot send commands") + + # Use the provided timeout or fall back to default_timeout + timeout = timeout if timeout is not None else self.default_timeout + + if self._sender_func: + logger.debug(f"Sending raw data: {data.hex() if isinstance(data, bytes) else data}") + await self._sender_func(data) + + if expected_events: + try: + # Convert single event to list if needed + if not isinstance(expected_events, list): + expected_events = [expected_events] + + logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") + for event_type in expected_events: + # don't apply any filters for now, might change later + event = await self.dispatcher.wait_for_event(event_type, {}, timeout) + if event: + return event.payload + return {"success": False, "reason": "no_event_received"} + except asyncio.TimeoutError: + logger.debug(f"Command timed out {data}") + return {"success": False, "reason": "timeout"} + except Exception as e: + logger.debug(f"Command error: {e}") + return {"error": str(e)} + return {"success": True} + + + async def send_appstart(self): + logger.debug("Sending appstart command") + b1 = bytearray(b'\x01\x03 mccli') + return await self.send(b1, [EventType.SELF_INFO]) + + async def send_device_query(self): + logger.debug("Sending device query command") + return await self.send(b"\x16\x03", [EventType.DEVICE_INFO, EventType.ERROR]) + + async def send_advert(self, flood=False): + logger.debug(f"Sending advertisement command (flood={flood})") + if flood: + return await self.send(b"\x07\x01", [EventType.OK, EventType.ERROR]) + else: + return await self.send(b"\x07", [EventType.OK, EventType.ERROR]) + + async def set_name(self, name): + logger.debug(f"Setting device name to: {name}") + return await self.send(b'\x08' + name.encode("ascii"), [EventType.OK, EventType.ERROR]) + + async def set_coords(self, lat, lon): + logger.debug(f"Setting coordinates to: lat={lat}, lon={lon}") + return await self.send(b'\x0e'\ + + int(lat*1e6).to_bytes(4, 'little', signed=True)\ + + int(lon*1e6).to_bytes(4, 'little', signed=True)\ + + int(0).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def reboot(self): + logger.debug("Sending reboot command") + return await self.send(b'\x13reboot') + + async def get_bat(self): + logger.debug("Getting battery information") + return await self.send(b'\x14', [EventType.BATTERY, EventType.ERROR]) + + async def get_time(self): + logger.debug("Getting device time") + return await self.send(b"\x05", [EventType.CURRENT_TIME, EventType.ERROR]) + + async def set_time(self, val): + logger.debug(f"Setting device time to: {val}") + return await self.send(b"\x06" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_tx_power(self, val): + logger.debug(f"Setting TX power to: {val}") + return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_radio(self, freq, bw, sf, cr): + logger.debug(f"Setting radio params: freq={freq}, bw={bw}, sf={sf}, cr={cr}") + return await self.send(b"\x0b" \ + + int(float(freq)*1000).to_bytes(4, 'little')\ + + int(float(bw)*1000).to_bytes(4, 'little')\ + + int(sf).to_bytes(1, 'little')\ + + int(cr).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_tuning(self, rx_dly, af): + logger.debug(f"Setting tuning params: rx_dly={rx_dly}, af={af}") + return await self.send(b"\x15" \ + + int(rx_dly).to_bytes(4, 'little')\ + + int(af).to_bytes(4, 'little')\ + + int(0).to_bytes(1, 'little')\ + + int(0).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR]) + + async def set_devicepin(self, pin): + logger.debug(f"Setting device PIN to: {pin}") + return await self.send(b"\x25" \ + + int(pin).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) + + async def get_contacts(self): + logger.debug("Getting contacts") + return await self.send(b"\x04", [EventType.CONTACTS, EventType.ERROR]) + + async def reset_path(self, key): + logger.debug(f"Resetting path for contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x0D" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def share_contact(self, key): + logger.debug(f"Sharing contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x10" + key + return await self.send(data, [EventType.CONTACT_SHARE, EventType.ERROR]) + + async def export_contact(self, key=b""): + logger.debug(f"Exporting contact: {key.hex() if key else 'all'}") + data = b"\x11" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def remove_contact(self, key): + logger.debug(f"Removing contact: {key.hex() if isinstance(key, bytes) else key}") + data = b"\x0f" + key + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def get_msg(self, timeout=1): + logger.debug("Requesting pending messages") + return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], timeout) + + async def send_login(self, dst, pwd): + logger.debug(f"Sending login request to: {dst.hex() if isinstance(dst, bytes) else dst}") + data = b"\x1a" + dst + pwd.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_logout(self, dst): + self.login_resp = asyncio.Future() + data = b"\x1d" + dst + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_statusreq(self, dst): + logger.debug(f"Sending status request to: {dst.hex() if isinstance(dst, bytes) else dst}") + data = b"\x1b" + dst + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_cmd(self, dst, cmd, timestamp=None): + logger.debug(f"Sending command to {dst.hex() if isinstance(dst, bytes) else dst}: {cmd}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii") + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def send_msg(self, dst, msg, timestamp=None): + logger.debug(f"Sending message to {dst.hex() if isinstance(dst, bytes) else dst}: {msg}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_chan_msg(self, chan, msg, timestamp=None): + logger.debug(f"Sending channel message to channel {chan}: {msg}") + + # Default to current time if timestamp not provided + if timestamp is None: + import time + timestamp = int(time.time()).to_bytes(4, 'little') + + data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii") + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + + async def send_cli(self, cmd): + logger.debug(f"Sending CLI command: {cmd}") + data = b"\x32" + cmd.encode('ascii') + return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR]) + + async def send_trace(self, auth_code=0, tag=None, flags=0, path=None): + """ + Send a trace packet to test routing through specific repeaters + + Args: + auth_code: 32-bit authentication code (default: 0) + tag: 32-bit integer to identify this trace (default: random) + flags: 8-bit flags field (default: 0) + path: Optional string with comma-separated hex values representing repeater pubkeys (e.g. "23,5f,3a") + or a bytes/bytearray object with the raw path data + + Returns: + Dictionary with sent status, tag, and estimated timeout in milliseconds, or False if command failed + """ + # Generate random tag if not provided + if tag is None: + tag = random.randint(1, 0xFFFFFFFF) + if auth_code is None: + auth_code = random.randint(1, 0xFFFFFFFF) + + logger.debug(f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path}") + + # Prepare the command packet: CMD(1) + tag(4) + auth_code(4) + flags(1) + [path] + cmd_data = bytearray([36]) # CMD_SEND_TRACE_PATH + cmd_data.extend(tag.to_bytes(4, 'little')) + cmd_data.extend(auth_code.to_bytes(4, 'little')) + cmd_data.append(flags) + + # Process path if provided + if path: + if isinstance(path, str): + # Convert comma-separated hex values to bytes + try: + path_bytes = bytearray() + for hex_val in path.split(','): + hex_val = hex_val.strip() + path_bytes.append(int(hex_val, 16)) + cmd_data.extend(path_bytes) + except ValueError as e: + logger.error(f"Invalid path format: {e}") + return { "success": False, "reason": "invalid_path_format" } + elif isinstance(path, (bytes, bytearray)): + cmd_data.extend(path) + else: + logger.error(f"Unsupported path type: {type(path)}") + return { "success": False, "reason": "unsupported_path_type" } + + return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR]) \ No newline at end of file diff --git a/src/meshcore/events.py b/src/meshcore/events.py new file mode 100644 index 0000000..8a72664 --- /dev/null +++ b/src/meshcore/events.py @@ -0,0 +1,184 @@ +from enum import Enum +import logging +from math import log +from typing import Any, Dict, Optional, Callable, List, Union +import asyncio +from dataclasses import dataclass, field + +logger = logging.getLogger("meshcore") + +# Public event types for users to subscribe to +class EventType(Enum): + CONTACTS = "contacts" + SELF_INFO = "self_info" + CONTACT_MSG_RECV = "contact_message" + CHANNEL_MSG_RECV = "channel_message" + CURRENT_TIME = "time_update" + NO_MORE_MSGS = "no_more_messages" + CONTACT_SHARE = "contact_share" + BATTERY = "battery_info" + DEVICE_INFO = "device_info" + CLI_RESPONSE = "cli_response" + MSG_SENT = "message_sent" + + # Push notifications + ADVERTISEMENT = "advertisement" + PATH_UPDATE = "path_update" + ACK = "acknowledgement" + MESSAGES_WAITING = "messages_waiting" + RAW_DATA = "raw_data" + LOGIN_SUCCESS = "login_success" + LOGIN_FAILED = "login_failed" + STATUS_RESPONSE = "status_response" + LOG_DATA = "log_data" + TRACE_DATA = "trace_data" + RX_LOG_DATA = "rx_log_data" + + # Command response types + OK = "command_ok" + ERROR = "command_error" + + +@dataclass +class Event: + type: EventType + payload: Any + attributes: Dict[str, Any] = field(default_factory=dict) + + def __init__(self, type: EventType, payload: Any, attributes: Optional[Dict[str, Any]] = None, **kwargs): + """ + Initialize an Event + + Args: + type: The event type + payload: The event payload + attributes: Dictionary of event attributes for filtering + **kwargs: Additional attributes to add to the attributes dictionary + """ + self.type = type + self.payload = payload + self.attributes = attributes or {} + + # Add any keyword arguments to the attributes dictionary + if kwargs: + self.attributes.update(kwargs) + + +class Subscription: + def __init__(self, dispatcher, event_type, callback, attribute_filters=None): + self.dispatcher = dispatcher + self.event_type = event_type + self.callback = callback + self.attribute_filters = attribute_filters or {} + + def unsubscribe(self): + self.dispatcher._remove_subscription(self) + + +class EventDispatcher: + def __init__(self): + self.queue = asyncio.Queue() + self.subscriptions: List[Subscription] = [] + self.running = False + self._task = None + + def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]], + attribute_filters: Optional[Dict[str, Any]] = None) -> Subscription: + """ + Subscribe to events with optional attribute filtering. + + Parameters: + ----------- + event_type : EventType or None + The type of event to subscribe to, or None to subscribe to all events. + callback : Callable + Function to call when a matching event is received. + attribute_filters : Dict[str, Any], optional + Dictionary of attribute key-value pairs that must match for the event to trigger the callback. + + Returns: + -------- + Subscription object that can be used to unsubscribe. + """ + subscription = Subscription(self, event_type, callback, attribute_filters) + self.subscriptions.append(subscription) + return subscription + + def _remove_subscription(self, subscription: Subscription): + if subscription in self.subscriptions: + self.subscriptions.remove(subscription) + + async def dispatch(self, event: Event): + await self.queue.put(event) + + async def _process_events(self): + while self.running: + event = await self.queue.get() + logger.debug(f"Dispatching event: {event.type}, {event.payload}, {event.attributes}") + for subscription in self.subscriptions.copy(): + # Check if event type matches + if subscription.event_type is None or subscription.event_type == event.type: + # Check if all attribute filters match + if subscription.attribute_filters: + # Skip if any filter doesn't match the corresponding event attribute + if not all(event.attributes.get(key) == value + for key, value in subscription.attribute_filters.items()): + continue + try: + result = subscription.callback(event) + if asyncio.iscoroutine(result): + await result + except Exception as e: + print(f"Error in event handler: {e}") + + self.queue.task_done() + + async def start(self): + if not self.running: + self.running = True + self._task = asyncio.create_task(self._process_events()) + + async def stop(self): + if self.running: + self.running = False + if self._task: + await self.queue.join() + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None, + timeout: float | None = None) -> Optional[Event]: + """ + Wait for an event of the specified type that matches all attribute filters. + + Parameters: + ----------- + event_type : EventType + The type of event to wait for. + attribute_filters : Dict[str, Any], optional + Dictionary of attribute key-value pairs that must match for the event to be returned. + timeout : float | None, optional + Maximum time to wait for the event, in seconds. + + Returns: + -------- + The matched event, or None if timeout occurred before a matching event. + """ + future = asyncio.Future() + + def event_handler(event: Event): + if not future.done(): + future.set_result(event) + + subscription = self.subscribe(event_type, event_handler, attribute_filters) + + try: + return await asyncio.wait_for(future, timeout) + except asyncio.TimeoutError: + return None + finally: + subscription.unsubscribe() \ No newline at end of file diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index 52a9919..9d065f0 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -1,419 +1,306 @@ -""" - mccli.py : CLI interface to MeschCore BLE companion app -""" import asyncio -import sys +import logging +from typing import Optional, Dict, Any, Union -def printerr (s) : - sys.stderr.write(str(s)) - sys.stderr.write("\n") - sys.stderr.flush() +from .events import EventDispatcher, EventType +from .reader import MessageReader +from .commands import CommandHandler + + +# Setup default logger +logger = logging.getLogger("meshcore") class MeshCore: """ - Interface to a BLE MeshCore device + Interface to a MeshCore device """ - self_info={} - contacts={} - - def __init__(self, cx): - """ Constructor : specify address """ - self.time = 0 - self.result = asyncio.Future() - self.contact_nb = 0 - self.rx_sem = asyncio.Semaphore(0) - self.ack_ev = asyncio.Event() - self.login_resp = asyncio.Future() - self.status_resp = asyncio.Future() - + def __init__(self, cx, debug=False, default_timeout=None): self.cx = cx - cx.set_mc(self) - - async def connect(self) : - await self.send_appstart() - - def handle_rx(self, data: bytearray): - """ Callback to handle received data """ - match data[0]: - case 0: # ok - if len(data) == 5 : # an integer - self.result.set_result(int.from_bytes(data[1:5], byteorder='little')) - else: - self.result.set_result(True) - case 1: # error - if len(data) > 1: - res = {} - res["error_code"] = data[1] - self.result.set_result(res) # error code if fw > 1.4 - else: - self.result.set_result(False) - case 2: # contact start - self.contact_nb = int.from_bytes(data[1:5], byteorder='little') - self.contacts={} - case 3: # contact - c = {} - c["public_key"] = data[1:33].hex() - c["type"] = data[33] - c["flags"] = data[34] - c["out_path_len"] = int.from_bytes(data[35:36], signed=True) - plen = int.from_bytes(data[35:36], signed=True) - if plen == -1 : - plen = 0 - c["out_path"] = data[36:36+plen].hex() - c["adv_name"] = data[100:132].decode().replace("\0","") - c["last_advert"] = int.from_bytes(data[132:136], byteorder='little') - c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6 - c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6 - c["lastmod"] = int.from_bytes(data[144:148], byteorder='little') - self.contacts[c["adv_name"]]=c - case 4: # end of contacts - self.result.set_result(self.contacts) - case 5: # self info - self.self_info["adv_type"] = data[1] - self.self_info["tx_power"] = data[2] - self.self_info["max_tx_power"] = data[3] - self.self_info["public_key"] = data[4:36].hex() - self.self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6 - self.self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6 - #self.self_info["reserved_44:48"] = data[44:48].hex() - self.self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000 - self.self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000 - self.self_info["radio_sf"] = data[56] - self.self_info["radio_cr"] = data[57] - self.self_info["name"] = data[58:].decode() - self.result.set_result(True) - case 6: # msg sent - res = {} - res["type"] = data[1] - res["expected_ack"] = bytes(data[2:6]) - res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') - self.result.set_result(res) - case 7: # contact msg recv - res = {} - res["type"] = "PRIV" - res["pubkey_prefix"] = data[1:7].hex() - res["path_len"] = data[7] - res["txt_type"] = data[8] - res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little') - if data[8] == 2 : # signed packet - res["signature"] = data[13:17].hex() - res["text"] = data[17:].decode() - else : - res["text"] = data[13:].decode() - self.result.set_result(res) - case 16: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "PRIV" - res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4; - res["pubkey_prefix"] = data[4:10].hex() - res["path_len"] = data[10] - res["txt_type"] = data[11] - res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little') - if data[11] == 2 : # signed packet - res["signature"] = data[16:20].hex() - res["text"] = data[20:].decode() - else : - res["text"] = data[16:].decode() - self.result.set_result(res) - case 8 : # chanel msg recv - res = {} - res["type"] = "CHAN" - res["channel_idx"] = data[1] - res["path_len"] = data[2] - res["txt_type"] = data[3] - res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little') - res["text"] = data[8:].decode() - self.result.set_result(res) - case 17: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "CHAN" - res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4; - res["channel_idx"] = data[4] - res["path_len"] = data[5] - res["txt_type"] = data[6] - res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little') - res["text"] = data[11:].decode() - self.result.set_result(res) - case 9: # current time - self.result.set_result(int.from_bytes(data[1:5], byteorder='little')) - case 10: # no more msgs - self.result.set_result(False) - case 11: # contact - self.result.set_result("meshcore://" + data[1:].hex()) - case 12: # battery voltage - self.result.set_result(int.from_bytes(data[1:3], byteorder='little')) - case 13: # device info - res = {} - res["fw ver"] = data[1] - if data[1] >= 3: - res["max_contacts"] = data[2] * 2 - res["max_channels"] = data[3] - res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little') - res["fw_build"] = data[8:20].decode().replace("\0","") - res["model"] = data[20:60].decode().replace("\0","") - res["ver"] = data[60:80].decode().replace("\0","") - self.result.set_result(res) - case 50: # cli response - res = {} - res["response"] = data[1:].decode() - self.result.set_result(res) - # push notifications - case 0x80: - printerr ("Advertisment received") - case 0x81: - printerr ("Code path update") - case 0x82: - self.ack_ev.set() - printerr ("Received ACK") - case 0x83: - self.rx_sem.release() - printerr ("Msgs are waiting") - case 0x84: - printerr ("Received raw data") - res = {} - res["SNR"] = data[1] / 4 - res["RSSI"] = data[2] - res["payload"] = data[4:].hex() - print(res) - case 0x85: - self.login_resp.set_result(True) - - printerr ("Login success") - case 0x86: - self.login_resp.set_result(False) - printerr ("Login failed") - case 0x87: - res = {} - res["pubkey_pre"] = data[2:8].hex() - res["bat"] = int.from_bytes(data[8:10], byteorder='little') - res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little') - res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little') - res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True) - res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False) - res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False) - res["airtime"] = int.from_bytes(data[24:28], byteorder='little') - res["uptime"] = int.from_bytes(data[28:32], byteorder='little') - res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little') - res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little') - res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little') - res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little') - res["full_evts"] = int.from_bytes(data[48:50], byteorder='little') - res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4 - res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little') - res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little') - self.status_resp.set_result(res) - data_hex = data[8:].hex() - printerr (f"Status response: {data_hex}") - #printerr(res) - case 0x88: - printerr ("Received log data") - # unhandled - case _: - printerr (f"Unhandled data received {data}") - - async def send(self, data, timeout = 5): - """ Helper function to synchronously send (and receive) data to the node """ - self.result = asyncio.Future() - try: - await self.cx.send(data) - res = await asyncio.wait_for(self.result, timeout) - return res - except TimeoutError : - printerr ("Timeout while sending message ...") - return False - - async def send_only(self, data): # don't wait reply - await self.cx.send(data) - - async def send_appstart(self): - """ Send APPSTART to the node """ - b1 = bytearray(b'\x01\x03 mccli') - return await self.send(b1) - - async def send_device_qeury(self): - return await self.send(b"\x16\x03"); - - async def send_advert(self, flood=False): - """ Make the node send an advertisement """ - if flood : - return await self.send(b"\x07\x01") - else : - return await self.send(b"\x07") - - async def set_name(self, name): - """ Changes the name of the node """ - return await self.send(b'\x08' + name.encode("ascii")) - - async def set_coords(self, lat, lon): - return await self.send(b'\x0e'\ - + int(lat*1e6).to_bytes(4, 'little', signed=True)\ - + int(lon*1e6).to_bytes(4, 'little', signed=True)\ - + int(0).to_bytes(4, 'little')) - - async def reboot(self): - await self.send_only(b'\x13reboot') - return True - - async def get_bat(self): - return await self.send(b'\x14') - - async def get_time(self): - """ Get the time (epoch) of the node """ - self.time = await self.send(b"\x05") - return self.time - - async def set_time(self, val): - """ Sets a new epoch """ - return await self.send(b"\x06" + int(val).to_bytes(4, 'little')) - - async def set_tx_power(self, val): - """ Sets tx power """ - return await self.send(b"\x0c" + int(val).to_bytes(4, 'little')) - - async def set_radio (self, freq, bw, sf, cr): - """ Sets radio params """ - return await self.send(b"\x0b" \ - + int(float(freq)*1000).to_bytes(4, 'little')\ - + int(float(bw)*1000).to_bytes(4, 'little')\ - + int(sf).to_bytes(1, 'little')\ - + int(cr).to_bytes(1, 'little')) - - async def set_tuning (self, rx_dly, af): - """ Sets radio params """ - return await self.send(b"\x15" \ - + int(rx_dly).to_bytes(4, 'little')\ - + int(af).to_bytes(4, 'little')\ - + int(0).to_bytes(1, 'little')\ - + int(0).to_bytes(1, 'little')) - - async def set_devicepin (self, pin): - return await self.send(b"\x25" \ - + int(pin).to_bytes(4, 'little')) - - async def get_contacts(self): - """ Starts retreiving contacts """ - return await self.send(b"\x04") - + self.dispatcher = EventDispatcher() + self._reader = MessageReader(self.dispatcher) + self.commands = CommandHandler(default_timeout=default_timeout) + + # Set up logger + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + + # Set up connections + self.commands.set_connection(cx) + + # Set the dispatcher in the command handler + self.commands.set_dispatcher(self.dispatcher) + self.commands.set_reader(self._reader) + + # Initialize state (private) + self._contacts = {} + self._self_info = {} + self._time = 0 + + # Set up event subscriptions to track data + self._setup_data_tracking() + + cx.set_reader(self._reader) + + @classmethod + async def create_tcp(cls, host: str, port: int, debug: bool = False, default_timeout=None) -> 'MeshCore': + """Create and connect a MeshCore instance using TCP connection""" + from .tcp_cx import TCPConnection + + connection = TCPConnection(host, port) + await connection.connect() + + mc = cls(connection, debug=debug, default_timeout=default_timeout) + await mc.connect() + return mc + + @classmethod + async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False, default_timeout=None) -> 'MeshCore': + """Create and connect a MeshCore instance using serial connection""" + from .serial_cx import SerialConnection + import asyncio + + connection = SerialConnection(port, baudrate) + await connection.connect() + await asyncio.sleep(0.1) # Time for transport to establish + + mc = cls(connection, debug=debug, default_timeout=default_timeout) + await mc.connect() + return mc + + @classmethod + async def create_ble(cls, address: Optional[str] = None, debug: bool = False, default_timeout=None) -> 'MeshCore': + """Create and connect a MeshCore instance using BLE connection + + If address is None, it will scan for and connect to the first available MeshCore device. + """ + from .ble_cx import BLEConnection + + connection = BLEConnection(address) + result = await connection.connect() + if result is None: + raise ConnectionError("Failed to connect to BLE device") + + mc = cls(connection, debug=debug, default_timeout=default_timeout) + await mc.connect() + return mc + + async def connect(self): + await self.dispatcher.start() + return await self.commands.send_appstart() + + async def disconnect(self): + await self.dispatcher.stop() + + def stop(self): + """Synchronously stop the event dispatcher task""" + if self.dispatcher._task and not self.dispatcher._task.done(): + self.dispatcher.running = False + self.dispatcher._task.cancel() + + def subscribe(self, event_type: EventType, callback, attribute_filters: Optional[Dict[str, Any]] = None): + """ + Subscribe to events using EventType enum with optional attribute filtering + + Args: + event_type: Type of event to subscribe to, from EventType enum + callback: Async function to call when event occurs + attribute_filters: Dictionary of attribute key-value pairs that must match for the event to trigger the callback + + Returns: + Subscription object that can be used to unsubscribe + + Example: + # Subscribe to ACK events where the 'code' attribute has a specific value + mc.subscribe( + EventType.ACK, + my_callback_function, + attribute_filters={'code': 'SUCCESS'} + ) + """ + return self.dispatcher.subscribe(event_type, callback, attribute_filters) + + def unsubscribe(self, subscription): + """ + Unsubscribe from events using a subscription object + + Args: + subscription: Subscription object returned from subscribe() + """ + if subscription: + subscription.unsubscribe() + + async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None, timeout=None): + """ + Wait for an event using EventType enum with optional attribute filtering + + Args: + event_type: Type of event to wait for, from EventType enum + attribute_filters: Dictionary of attribute key-value pairs to match against the event + timeout: Maximum time to wait in seconds, or None to use default_timeout + + Returns: + Event object or None if timeout + + Example: + # Wait for an ACK event where the 'code' attribute has a specific value + await mc.wait_for_event( + EventType.ACK, + attribute_filters={'code': 'SUCCESS'}, + timeout=30.0 + ) + """ + # Use the provided timeout or fall back to default_timeout + if timeout is None: + timeout = self.default_timeout + + return await self.dispatcher.wait_for_event(event_type, attribute_filters, timeout) + + def _setup_data_tracking(self): + """Set up event subscriptions to track data internally""" + async def _update_contacts(event): + self._contacts = event.payload + + async def _update_self_info(event): + self._self_info = event.payload + + async def _update_time(event): + self._time = event.payload.get("time", 0) + + # Subscribe to events to update internal state + self.subscribe(EventType.CONTACTS, _update_contacts) + self.subscribe(EventType.SELF_INFO, _update_self_info) + self.subscribe(EventType.CURRENT_TIME, _update_time) + + # Getter methods for state + @property + def contacts(self): + """Get the current contacts""" + return self._contacts + + @property + def self_info(self): + """Get device self info""" + return self._self_info + + @property + def time(self): + """Get the current device time""" + return self._time + + @property + def default_timeout(self): + """Get the default timeout for commands""" + return self.commands.default_timeout + + @default_timeout.setter + def default_timeout(self, value): + """Set the default timeout for commands""" + self.commands.default_timeout = value + + def get_contact_by_name(self, name) -> Optional[Dict[str, Any]]: + """ + Find a contact by its name (adv_name field) + + Args: + name: The name to search for + + Returns: + Contact dictionary or None if not found + """ + if not self._contacts: + return None + + for _, contact in self._contacts.items(): + if contact.get("adv_name", "").lower() == name.lower(): + return contact + + return None + + def get_contact_by_key_prefix(self, prefix) -> Optional[Dict[str, Any]]: + """ + Find a contact by its public key prefix + + Args: + prefix: The public key prefix to search for (can be a partial prefix) + + Returns: + Contact dictionary or None if not found + """ + if not self._contacts or not prefix: + return None + + # Convert the prefix to lowercase for case-insensitive matching + prefix = prefix.lower() + + for contact_id, contact in self._contacts.items(): + public_key = contact.get("public_key", "").lower() + if public_key.startswith(prefix): + return contact + + return None + + async def start_auto_message_fetching(self): + """ + Start automatically fetching messages when messages_waiting events are received. + This will continuously check for new messages when the device indicates + messages are waiting. + """ + self._auto_fetch_task = None + self._auto_fetch_running = True + + async def _handle_messages_waiting(event): + # Only start a new fetch task if one isn't already running + if not self._auto_fetch_task or self._auto_fetch_task.done(): + self._auto_fetch_task = asyncio.create_task(_fetch_messages_loop()) + + async def _fetch_messages_loop(): + while self._auto_fetch_running: + try: + # Request the next message + result = await self.commands.get_msg() + + # If we got a NO_MORE_MSGS event or an error, stop fetching + if not result.get("success") or isinstance(result, dict) and "error" in result: + break + + # Small delay to prevent overwhelming the device + await asyncio.sleep(0.1) + except Exception as e: + logger.error(f"Error fetching messages: {e}") + break + + # Subscribe to MESSAGES_WAITING events + self._auto_fetch_subscription = self.subscribe(EventType.MESSAGES_WAITING, _handle_messages_waiting) + + # Check for any pending messages immediately + await self.commands.get_msg() + + return self._auto_fetch_subscription + + async def stop_auto_message_fetching(self): + """ + Stop automatically fetching messages when messages_waiting events are received. + """ + if hasattr(self, '_auto_fetch_subscription') and self._auto_fetch_subscription: + self.unsubscribe(self._auto_fetch_subscription) + self._auto_fetch_subscription = None + + if hasattr(self, '_auto_fetch_running'): + self._auto_fetch_running = False + + if hasattr(self, '_auto_fetch_task') and self._auto_fetch_task and not self._auto_fetch_task.done(): + self._auto_fetch_task.cancel() + try: + await self._auto_fetch_task # type: ignore + except asyncio.CancelledError: + pass + self._auto_fetch_task = None + async def ensure_contacts(self): - if len(self.contacts) == 0 : - await self.get_contacts() - - async def reset_path(self, key): - data = b"\x0D" + key - return await self.send(data) - - async def share_contact(self, key): - data = b"\x10" + key - return await self.send(data) - - async def export_contact(self, key=b""): - data = b"\x11" + key - return await self.send(data) - - async def remove_contact(self, key): - data = b"\x0f" + key - return await self.send(data) - - async def set_out_path(self, contact, path): - contact["out_path"] = path - contact["out_path_len"] = -1 - contact["out_path_len"] = int(len(path) / 2) - - async def update_contact(self, contact): - out_path_hex = contact["out_path"] - out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0" - adv_name_hex = contact["adv_name"].encode().hex() - adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0" - data = b"\x09" \ - + bytes.fromhex(contact["public_key"])\ - + contact["type"].to_bytes(1)\ - + contact["flags"].to_bytes(1)\ - + contact["out_path_len"].to_bytes(1, 'little', signed=True)\ - + bytes.fromhex(out_path_hex)\ - + bytes.fromhex(adv_name_hex)\ - + contact["last_advert"].to_bytes(4, 'little')\ - + int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\ - + int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True) - return await self.send(data) - - async def send_login(self, dst, pwd): - self.login_resp = asyncio.Future() - data = b"\x1a" + dst + pwd.encode("ascii") - return await self.send(data) - - async def wait_login(self, timeout = 5): - try : - return await asyncio.wait_for(self.login_resp, timeout) - except TimeoutError : - printerr ("Timeout ...") - return False - - async def send_logout(self, dst): - self.login_resp = asyncio.Future() - data = b"\x1d" + dst - return await self.send(data) - - async def send_statusreq(self, dst): - self.status_resp = asyncio.Future() - data = b"\x1b" + dst - return await self.send(data) - - async def wait_status(self, timeout = 5): - try : - return await asyncio.wait_for(self.status_resp, timeout) - except TimeoutError : - printerr ("Timeout...") - return False - - async def send_cmd(self, dst, cmd): - """ Send a cmd to a node """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii") - #self.ack_ev.clear() # no ack ? - return await self.send(data) - - async def send_msg(self, dst, msg): - """ Send a message to a node """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii") - self.ack_ev.clear() - return await self.send(data) - - async def send_chan_msg(self, chan, msg): - """ Send a message to a public channel """ - timestamp = (await self.get_time()).to_bytes(4, 'little') - data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii") - return await self.send(data) - - async def get_msg(self): - """ Get message from the node (stored in queue) """ - res = await self.send(b"\x0A", 1) - if res is False : - self.rx_sem=asyncio.Semaphore(0) # reset semaphore as there are no msgs in queue - return res - - async def wait_msg(self, timeout=-1): - """ Wait for a message """ - if timeout == -1 : - await self.rx_sem.acquire() + """Ensure contacts are fetched""" + if not self._contacts: + await self.commands.get_contacts() return True - - try: - await asyncio.wait_for(self.rx_sem.acquire(), timeout) - return True - except TimeoutError : - printerr("Timeout waiting msg") - return False - - async def wait_ack(self, timeout=6): - """ Wait ack """ - try: - await asyncio.wait_for(self.ack_ev.wait(), timeout) - return True - except TimeoutError : - printerr("Timeout waiting ack") - return False - - async def send_cli(self, cmd): - data = b"\x32" + cmd.encode('ascii') - return await self.send(data) + return False \ No newline at end of file diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py new file mode 100644 index 0000000..6797489 --- /dev/null +++ b/src/meshcore/packets.py @@ -0,0 +1,31 @@ +from enum import Enum + +# Packet prefixes for the protocol +class PacketType(Enum): + OK = 0 + ERROR = 1 + CONTACT_START = 2 + CONTACT = 3 + CONTACT_END = 4 + SELF_INFO = 5 + MSG_SENT = 6 + CONTACT_MSG_RECV = 7 + CHANNEL_MSG_RECV = 8 + CURRENT_TIME = 9 + NO_MORE_MSGS = 10 + CONTACT_SHARE = 11 + BATTERY = 12 + DEVICE_INFO = 13 + CLI_RESPONSE = 50 + + # Push notifications + ADVERTISEMENT = 0x80 + PATH_UPDATE = 0x81 + ACK = 0x82 + MESSAGES_WAITING = 0x83 + RAW_DATA = 0x84 + LOGIN_SUCCESS = 0x85 + LOGIN_FAILED = 0x86 + STATUS_RESPONSE = 0x87 + LOG_DATA = 0x88 + TRACE_DATA = 0x89 \ No newline at end of file diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py new file mode 100644 index 0000000..ebcdc03 --- /dev/null +++ b/src/meshcore/reader.py @@ -0,0 +1,364 @@ +import sys +import logging +import asyncio +from typing import Any, Optional, Dict +from .events import Event, EventType, EventDispatcher +from .packets import PacketType + +logger = logging.getLogger("meshcore") + + +class MessageReader: + def __init__(self, dispatcher: EventDispatcher): + self.dispatcher = dispatcher + # We're only keeping state here that's needed for processing + # before events are dispatched + self.contacts = {} # Temporary storage during contact list building + self.contact_nb = 0 # Used for contact processing + + async def handle_rx(self, data: bytearray): + packet_type_value = data[0] + logger.debug(f"Received data: {data.hex()}") + + # Handle command responses + if packet_type_value == PacketType.OK.value: + result: Dict[str, Any] = {"success": True} + if len(data) == 5: + result["value"] = int.from_bytes(data[1:5], byteorder='little') + + # Dispatch event for the OK response + await self.dispatcher.dispatch(Event(EventType.OK, result)) + + elif packet_type_value == PacketType.ERROR.value: + if len(data) > 1: + result = {"success": False, "error_code": data[1]} + else: + result = {"success": False} + + # Dispatch event for the ERROR response + await self.dispatcher.dispatch(Event(EventType.ERROR, result)) + + elif packet_type_value == PacketType.CONTACT_START.value: + self.contact_nb = int.from_bytes(data[1:5], byteorder='little') + self.contacts = {} + + elif packet_type_value == PacketType.CONTACT.value: + c = {} + c["public_key"] = data[1:33].hex() + c["type"] = data[33] + c["flags"] = data[34] + c["out_path_len"] = int.from_bytes(data[35:36], signed=True) + plen = int.from_bytes(data[35:36], signed=True) + if plen == -1: + plen = 0 + c["out_path"] = data[36:36+plen].hex() + c["adv_name"] = data[100:132].decode().replace("\0","") + c["last_advert"] = int.from_bytes(data[132:136], byteorder='little') + c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6 + c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6 + c["lastmod"] = int.from_bytes(data[144:148], byteorder='little') + self.contacts[c["public_key"]] = c + + elif packet_type_value == PacketType.CONTACT_END.value: + await self.dispatcher.dispatch(Event(EventType.CONTACTS, self.contacts)) + + + elif packet_type_value == PacketType.SELF_INFO.value: + self_info = {} + self_info["adv_type"] = data[1] + self_info["tx_power"] = data[2] + self_info["max_tx_power"] = data[3] + self_info["public_key"] = data[4:36].hex() + self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6 + self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6 + self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000 + self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000 + self_info["radio_sf"] = data[56] + self_info["radio_cr"] = data[57] + self_info["name"] = data[58:].decode() + await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) + + elif packet_type_value == PacketType.MSG_SENT.value: + res = {} + res["type"] = data[1] + res["expected_ack"] = bytes(data[2:6]) + res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') + + attributes = { + "type": res["type"], + "expected_ack": res["expected_ack"].hex() + } + + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) + + elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: + res = {} + res["type"] = "PRIV" + res["pubkey_prefix"] = data[1:7].hex() + res["path_len"] = data[7] + res["txt_type"] = data[8] + res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little') + if data[8] == 2: + res["signature"] = data[13:17].hex() + res["text"] = data[17:].decode() + else: + res["text"] = data[13:].decode() + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, attributes)) + + elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "PRIV" + res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4 + res["pubkey_prefix"] = data[4:10].hex() + res["path_len"] = data[10] + res["txt_type"] = data[11] + res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little') + if data[11] == 2: + res["signature"] = data[16:20].hex() + res["text"] = data[20:].decode() + else: + res["text"] = data[16:].decode() + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, attributes)) + + elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: + res = {} + res["type"] = "CHAN" + res["channel_idx"] = data[1] + res["path_len"] = data[2] + res["txt_type"] = data[3] + res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little') + res["text"] = data[8:].decode() + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, attributes)) + + elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "CHAN" + res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4 + res["channel_idx"] = data[4] + res["path_len"] = data[5] + res["txt_type"] = data[6] + res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little') + res["text"] = data[11:].decode() + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, attributes)) + + elif packet_type_value == PacketType.CURRENT_TIME.value: + time_value = int.from_bytes(data[1:5], byteorder='little') + result = {"time": time_value} + await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) + + elif packet_type_value == PacketType.NO_MORE_MSGS.value: + result = {"messages_available": False} + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) + + elif packet_type_value == PacketType.CONTACT_SHARE.value: + contact_uri = "meshcore://" + data[1:].hex() + result = {"uri": contact_uri} + await self.dispatcher.dispatch(Event(EventType.CONTACT_SHARE, result)) + + elif packet_type_value == PacketType.BATTERY.value: + battery_level = int.from_bytes(data[1:3], byteorder='little') + result = {"level": battery_level} + await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) + + elif packet_type_value == PacketType.DEVICE_INFO.value: + res = {} + res["fw ver"] = data[1] + if data[1] >= 3: + res["max_contacts"] = data[2] * 2 + res["max_channels"] = data[3] + res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little') + res["fw_build"] = data[8:20].decode().replace("\0","") + res["model"] = data[20:60].decode().replace("\0","") + res["ver"] = data[60:80].decode().replace("\0","") + await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) + + elif packet_type_value == PacketType.CLI_RESPONSE.value: + res = {} + res["response"] = data[1:].decode() + await self.dispatcher.dispatch(Event(EventType.CLI_RESPONSE, res)) + + # Push notifications + elif packet_type_value == PacketType.ADVERTISEMENT.value: + logger.debug("Advertisement received") + # TODO: Read advertisement attributes + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, {})) + + elif packet_type_value == PacketType.PATH_UPDATE.value: + logger.debug("Code path update") + # TODO: Read path update attributes + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, {})) + + elif packet_type_value == PacketType.ACK.value: + logger.debug("Received ACK") + ack_data = {} + + if len(data) >= 5: + ack_data["code"] = bytes(data[1:5]).hex() + + attributes = { + "code": ack_data.get("code", "") + } + + await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) + + elif packet_type_value == PacketType.MESSAGES_WAITING.value: + logger.debug("Msgs are waiting") + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) + + elif packet_type_value == PacketType.RAW_DATA.value: + res = {} + res["SNR"] = data[1] / 4 + res["RSSI"] = data[2] + res["payload"] = data[4:].hex() + logger.debug("Received raw data") + print(res) + await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) + + elif packet_type_value == PacketType.LOGIN_SUCCESS.value: + logger.debug("Login success") + # TODO: Read login attributes + await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, {})) + + elif packet_type_value == PacketType.LOGIN_FAILED.value: + logger.debug("Login failed") + # TODO: Read login attributes + await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, {})) + + elif packet_type_value == PacketType.STATUS_RESPONSE.value: + res = {} + res["pubkey_pre"] = data[2:8].hex() + res["bat"] = int.from_bytes(data[8:10], byteorder='little') + res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little') + res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little') + res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True) + res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False) + res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False) + res["airtime"] = int.from_bytes(data[24:28], byteorder='little') + res["uptime"] = int.from_bytes(data[28:32], byteorder='little') + res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little') + res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little') + res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little') + res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little') + res["full_evts"] = int.from_bytes(data[48:50], byteorder='little') + res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4 + res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little') + res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little') + data_hex = data[8:].hex() + logger.debug(f"Status response: {data_hex}") + + attributes = { + "pubkey_prefix": res["pubkey_pre"], + } + await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res, attributes)) + + elif packet_type_value == PacketType.LOG_DATA.value: + logger.debug(f"Received RF log data: {data.hex()}") + + # Parse as raw RX data + log_data: Dict[str, Any] = { + "raw_hex": data[1:].hex() + } + + # First byte is SNR (signed byte, multiplied by 4) + if len(data) > 1: + snr_byte = data[1] + # Convert to signed value + snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 + log_data["snr"] = snr + + # Second byte is RSSI (signed byte) + if len(data) > 2: + rssi_byte = data[2] + # Convert to signed value + rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 + log_data["rssi"] = rssi + + # Remaining bytes are the raw data payload + if len(data) > 3: + log_data["payload"] = data[3:].hex() + log_data["payload_length"] = len(data) - 3 + + attributes = { + "pubkey_prefix": log_data["raw_hex"], + } + + # Dispatch as RF log data + await self.dispatcher.dispatch(Event(EventType.RX_LOG_DATA, log_data, attributes)) + + elif packet_type_value == PacketType.TRACE_DATA.value: + logger.debug(f"Received trace data: {data.hex()}") + res = {} + + # According to the source, format is: + # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr + + reserved = data[1] + path_len = data[2] + flags = data[3] + tag = int.from_bytes(data[4:8], byteorder='little') + auth_code = int.from_bytes(data[8:12], byteorder='little') + + # Initialize result + res["tag"] = tag + res["auth"] = auth_code + res["flags"] = flags + res["path_len"] = path_len + + # Process path as array of objects with hash and SNR + path_nodes = [] + + if path_len > 0 and len(data) >= 12 + path_len*2 + 1: + # Extract path with hash and SNR pairs + for i in range(path_len): + node = { + "hash": f"{data[12+i]:02x}", + # SNR is stored as a signed byte representing SNR * 4 + "snr": (data[12+path_len+i] if data[12+path_len+i] < 128 else data[12+path_len+i] - 256) / 4.0 + } + path_nodes.append(node) + + # Add the final node (our device) with its SNR + final_snr_byte = data[12+path_len*2] + final_snr = (final_snr_byte if final_snr_byte < 128 else final_snr_byte - 256) / 4.0 + path_nodes.append({ + "snr": final_snr + }) + + res["path"] = path_nodes + + logger.debug(f"Parsed trace data: {res}") + + attributes = { + "tag": res["tag"], + "auth_code": res["auth"], + } + + await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) + + else: + logger.debug(f"Unhandled data received {data}") + logger.debug(f"Unhandled packet type: {packet_type_value}") \ No newline at end of file diff --git a/src/meshcore/serial_cx.py b/src/meshcore/serial_cx.py index f7e9b7b..b7b07c6 100644 --- a/src/meshcore/serial_cx.py +++ b/src/meshcore/serial_cx.py @@ -2,10 +2,11 @@ mccli.py : CLI interface to MeschCore BLE companion app """ import asyncio -import sys +import logging import serial_asyncio -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") class SerialConnection: def __init__(self, port, baudrate): @@ -13,6 +14,7 @@ class SerialConnection: self.baudrate = baudrate self.frame_started = False self.frame_size = 0 + self.transport = None self.header = b"" self.inframe = b"" @@ -22,21 +24,21 @@ class SerialConnection: def connection_made(self, transport): self.cx.transport = transport -# printerr('port opened') - transport.serial.rts = False # You can manipulate Serial object via transport + logger.debug('port opened') + if isinstance(transport, serial_asyncio.SerialTransport) and transport.serial: + transport.serial.rts = False # You can manipulate Serial object via transport def data_received(self, data): -# printerr('data received') self.cx.handle_rx(data) def connection_lost(self, exc): - printerr('port closed') + logger.info('port closed') def pause_writing(self): - printerr('pause writing') + logger.debug('pause writing') def resume_writing(self): - printerr('resume writing') + logger.debug('resume writing') async def connect(self): """ @@ -47,11 +49,11 @@ class SerialConnection: loop, lambda: self.MCSerialClientProtocol(self), self.port, baudrate=self.baudrate) - printerr("Serial Connexion started") + logger.info("Serial Connection started") return self.port - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, data: bytearray): headerlen = len(self.header) @@ -69,8 +71,8 @@ class SerialConnection: self.inframe = self.inframe + data else: self.inframe = self.inframe + data[:self.frame_size-framelen] - if not self.mc is None: - self.mc.handle_rx(self.inframe) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(self.inframe)) self.frame_started = False self.header = b"" self.inframe = b"" @@ -78,7 +80,10 @@ class SerialConnection: self.handle_rx(data[self.frame_size-framelen:]) async def send(self, data): + if not self.transport: + logger.error("Transport not connected, cannot send data") + return size = len(data) pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data -# printerr(f"sending pkt : {pkt}") - self.transport.write(pkt) + logger.debug(f"sending pkt : {pkt}") + self.transport.write(pkt) \ No newline at end of file diff --git a/src/meshcore/tcp_cx.py b/src/meshcore/tcp_cx.py index e998a18..25c38c8 100644 --- a/src/meshcore/tcp_cx.py +++ b/src/meshcore/tcp_cx.py @@ -2,9 +2,10 @@ mccli.py : CLI interface to MeschCore BLE companion app """ import asyncio -import sys +import logging -from meshcore import printerr +# Get logger +logger = logging.getLogger("meshcore") class TCPConnection: def __init__(self, host, port): @@ -16,21 +17,23 @@ class TCPConnection: self.header = b"" self.inframe = b"" - class MCClientProtocol: + class MCClientProtocol(asyncio.Protocol): def __init__(self, cx): self.cx = cx def connection_made(self, transport): self.cx.transport = transport + logger.debug('connection established') def data_received(self, data): + logger.debug('data received') self.cx.handle_rx(data) def error_received(self, exc): - printerr(f'Error received: {exc}') + logger.error(f'Error received: {exc}') def connection_lost(self, exc): - printerr('The server closed the connection') + logger.info('The server closed the connection') async def connect(self): """ @@ -41,11 +44,11 @@ class TCPConnection: lambda: self.MCClientProtocol(self), self.host, self.port) - printerr("TCP Connexion started") + logger.info("TCP Connection started") return self.host - def set_mc(self, mc) : - self.mc = mc + def set_reader(self, reader) : + self.reader = reader def handle_rx(self, data: bytearray): headerlen = len(self.header) @@ -63,8 +66,8 @@ class TCPConnection: self.inframe = self.inframe + data else: self.inframe = self.inframe + data[:self.frame_size-framelen] - if not self.mc is None: - self.mc.handle_rx(self.inframe) + if not self.reader is None: + asyncio.create_task(self.reader.handle_rx(self.inframe)) self.frame_started = False self.header = b"" self.inframe = b"" @@ -72,6 +75,10 @@ class TCPConnection: self.handle_rx(data[self.frame_size-framelen:]) async def send(self, data): + if not self.transport: + logger.error("Transport not connected, cannot send data") + return size = len(data) pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data + logger.debug(f"sending pkt : {pkt}") self.transport.write(pkt)