Merge pull request #2 from awolden/eventing-refactor

Let's move to an hybrid API (synchronous/event based) ... paving the road for v2 !
This commit is contained in:
fdlamotte 2025-04-13 21:49:03 +02:00 committed by GitHub
commit cc4e8953d2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1889 additions and 519 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
__pycache__
venv

280
README.md
View file

@ -1,8 +1,280 @@
# Python meshcore
# Python MeshCore
Bindings to access your [MeshCore](https://meshcore.co.uk) companion radio nodes in python.
Python library for interacting with [MeshCore](https://meshcore.co.uk) companion radio nodes.
Can be installed via `pip` using `pip install meshcore`
## Installation
Used by [mccli](https://github.com/fdlamotte/mccli).
```bash
pip install meshcore
```
## Quick Start
Connect to your device and send a message:
```python
import asyncio
from meshcore import MeshCore, EventType
async def main():
# Connect to your device
meshcore = await MeshCore.create_serial("/dev/ttyUSB0")
# Get your contacts
contacts = await meshcore.commands.get_contacts()
print(f"Found {len(contacts)} contacts")
# Send a message to the first contact
if contacts:
contact_key = next(iter(contacts.items()))[1]['public_key']
await meshcore.commands.send_msg(bytes.fromhex(contact_key), "Hello from Python!")
await meshcore.disconnect()
asyncio.run(main())
```
## Development Setup
To set up for development:
```bash
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install in development mode
pip install -e .
# Run examples
python examples/pubsub_example.py -p /dev/ttyUSB0
```
## Usage Guide
### Connecting to Your Device
Connect via Serial, BLE, or TCP:
```python
# Serial connection
meshcore = await MeshCore.create_serial("/dev/ttyUSB0", 115200, debug=True)
# BLE connection (scans for devices if address not provided)
meshcore = await MeshCore.create_ble("12:34:56:78:90:AB")
# TCP connection
meshcore = await MeshCore.create_tcp("192.168.1.100", 4000)
```
### Using Commands (Synchronous Style)
Send commands and wait for responses:
```python
# Get device information
device_info = await meshcore.commands.send_device_query()
print(f"Device model: {device_info['model']}")
# Get list of contacts
contacts = await meshcore.commands.get_contacts()
for contact_id, contact in contacts.items():
print(f"Contact: {contact['adv_name']} ({contact_id})")
# Send a message (destination key in bytes)
await meshcore.commands.send_msg(dst_key, "Hello!")
# Setting device parameters
await meshcore.commands.set_name("My Device")
await meshcore.commands.set_tx_power(20) # Set transmit power
```
### Finding Contacts
Easily find contacts by name or key:
```python
# Find a contact by name
contact = meshcore.get_contact_by_name("Bob's Radio")
if contact:
print(f"Found Bob at: {contact['adv_lat']}, {contact['adv_lon']}")
# Find by partial key prefix
contact = meshcore.get_contact_by_key_prefix("a1b2c3")
```
### Event-Based Programming (Asynchronous Style)
Subscribe to events to handle them asynchronously:
```python
# Subscribe to incoming messages
async def handle_message(event):
data = event.payload
print(f"Message from {data['pubkey_prefix']}: {data['text']}")
subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, handle_message)
# Subscribe to advertisements
async def handle_advert(event):
print("Advertisement detected!")
meshcore.subscribe(EventType.ADVERTISEMENT, handle_advert)
# When done, unsubscribe
meshcore.unsubscribe(subscription)
```
#### Filtering Events by Attributes
Filter events based on their attributes to handle only specific ones:
```python
# Subscribe only to messages from a specific contact
async def handle_specific_contact_messages(event):
print(f"Message from Alice: {event.payload['text']}")
contact = meshcore.get_contact_by_name("Alice")
if contact:
alice_subscription = meshcore.subscribe(
EventType.CONTACT_MSG_RECV,
handle_specific_contact_messages,
attribute_filters={"pubkey_prefix": contact["public_key"][:12]}
)
# Send a message and wait for its specific acknowledgment
async def send_and_confirm_message(meshcore, dst_key, message):
# Send the message and get information about the sent message
sent_result = await meshcore.commands.send_msg(dst_key, message)
# Extract the expected acknowledgment code from the message sent event
expected_ack = sent_result["expected_ack"].hex()
print(f"Message sent, waiting for ack with code: {expected_ack}")
# Wait specifically for this acknowledgment
result = await meshcore.wait_for_event(
EventType.ACK,
attribute_filters={"code": expected_ack},
timeout=10.0
)
if result:
print("Message confirmed delivered!")
return True
else:
print("Message delivery confirmation timed out")
return False
```
### Hybrid Approach (Commands + Events)
Combine command-based and event-based styles:
```python
import asyncio
async def main():
# Connect to device
meshcore = await MeshCore.create_serial("/dev/ttyUSB0")
# Set up event handlers
async def handle_ack(event):
print("Message acknowledged!")
async def handle_battery(event):
print(f"Battery level: {event.payload}%")
# Subscribe to events
meshcore.subscribe(EventType.ACK, handle_ack)
meshcore.subscribe(EventType.BATTERY, handle_battery)
# Create background task for battery checking
async def check_battery_periodically():
while True:
# Send command (returns battery level)
result = await meshcore.commands.get_bat()
print(f"Battery check initiated, response: {result}")
await asyncio.sleep(60) # Wait 60 seconds between checks
# Start the background task
battery_task = asyncio.create_task(check_battery_periodically())
# Send manual command and wait for response
await meshcore.commands.send_advert(flood=True)
try:
# Keep the main program running
await asyncio.sleep(float('inf'))
except asyncio.CancelledError:
# Clean up when program ends
battery_task.cancel()
await meshcore.disconnect()
# Run the program
asyncio.run(main())
```
### Auto-Fetching Messages
Let the library automatically fetch incoming messages:
```python
# Start auto-fetching messages
await meshcore.start_auto_message_fetching()
# Just subscribe to message events - the library handles fetching
async def on_message(event):
print(f"New message: {event.payload['text']}")
meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_message)
# When done
await meshcore.stop_auto_message_fetching()
```
### Debug Mode
Enable debug logging for troubleshooting:
```python
# Enable debug mode when creating the connection
meshcore = await MeshCore.create_serial("/dev/ttyUSB0", debug=True)
```
This logs detailed information about commands sent and events received.
## Common Examples
### Sending Messages to Contacts
```python
# Get contacts and send to a specific one
contacts = await meshcore.commands.get_contacts()
for key, contact in contacts.items():
if contact["adv_name"] == "Alice":
# Convert the hex key to bytes
dst_key = bytes.fromhex(contact["public_key"])
await meshcore.commands.send_msg(dst_key, "Hello Alice!")
break
```
### Monitoring Channel Messages
```python
# Subscribe to channel messages
async def channel_handler(event):
msg = event.payload
print(f"Channel {msg['channel_idx']}: {msg['text']}")
meshcore.subscribe(EventType.CHANNEL_MSG_RECV, channel_handler)
```
## Examples in the Repo
Check the `examples/` directory for more:
- `pubsub_example.py`: Event subscription system with auto-fetching
- `serial_infos.py`: Quick device info retrieval
- `serial_msg.py`: Message sending and receiving
- `ble_t1000_infos.py`: BLE connections

View file

@ -3,16 +3,14 @@
import asyncio
from meshcore import MeshCore
from meshcore import BLEConnection
ADDRESS = "t1000"
async def main () :
con = BLEConnection(ADDRESS)
await con.connect()
mc = MeshCore(con)
await mc.connect()
async def main():
mc = await MeshCore.create_ble(ADDRESS)
print(mc.self_info)
await mc.disconnect()
asyncio.run(main())

View file

@ -1,7 +1,6 @@
#!/usr/bin/python
import asyncio
import json
from meshcore import MeshCore
from meshcore import BLEConnection
@ -15,7 +14,7 @@ async def main () :
mc = MeshCore(con)
await mc.connect()
await mc.ensure_contacts()
await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG)
await mc.get_contacts()
await mc.commands.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG)
asyncio.run(main())

117
examples/pubsub_example.py Normal file
View file

@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""
Example demonstrating the new pub-sub system in MeshCore
Shows how to subscribe to events and use the event system
"""
import asyncio
import argparse
from meshcore import MeshCore
from meshcore.events import EventType
async def message_callback(event):
print(f"\nReceived message: {event.payload['text']}")
print(f"From: {event.payload.get('pubkey_prefix', 'channel')}")
print(f"Type: {event.payload['type']}")
print(f"Timestamp: {event.payload['sender_timestamp']}")
async def advertisement_callback(event):
print("\nDetected advertisement")
async def main():
parser = argparse.ArgumentParser(description="MeshCore Pub-Sub Example")
parser.add_argument(
"--port", "-p", help="Serial port", required=True
)
parser.add_argument(
"--baud", "-b", type=int, help="Baud rate", default=115200
)
args = parser.parse_args()
print(f"Connecting to {args.port} at {args.baud} baud...")
# Create MeshCore instance with serial connection
meshcore = await MeshCore.create_serial(args.port, args.baud, debug=True)
# Connection is already established
success = True
if not success:
print("Failed to connect to MeshCore device")
return
print("Connected to MeshCore device")
# Get contacts
contacts = await meshcore.commands.get_contacts()
if contacts:
print(f"\nFound {len(contacts)} contacts:")
for name, contact in contacts.items():
print(f"- {name}")
await meshcore.commands.send_advert(flood=True)
# Subscribe to private messages
private_subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, message_callback)
# Subscribe to channel messages
channel_subscription = meshcore.subscribe(EventType.CHANNEL_MSG_RECV, message_callback)
# Subscribe to advertisements
advert_subscription = meshcore.subscribe(EventType.ADVERTISEMENT, advertisement_callback)
await meshcore.start_auto_message_fetching()
print("\nSubscribed to events:")
print("- Private messages")
print("- Channel messages")
print("- Advertisements")
print("\nWaiting for events. Press Ctrl+C to exit...\n")
# Get device info
device_info = await meshcore.commands.send_device_query()
if device_info:
print(f"Device info: {device_info}")
# Get time from the device
device_time = await meshcore.commands.get_time()
print(f"Device time: {device_time}")
# Access current time through the property
print(f"Current time (property): {meshcore.time}")
try:
while True:
# Wait for messages
await asyncio.sleep(1)
except KeyboardInterrupt:
meshcore.stop()
print("\nExiting...")
except asyncio.CancelledError:
# Handle task cancellation from KeyboardInterrupt in asyncio.run()
print("\nTask cancelled - cleaning up...")
finally:
# Clean up subscriptions
meshcore.unsubscribe(private_subscription)
meshcore.unsubscribe(channel_subscription)
meshcore.unsubscribe(advert_subscription)
# Stop auto-message fetching
await meshcore.stop_auto_message_fetching()
# Disconnect
await meshcore.disconnect()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
# This prevents the KeyboardInterrupt traceback from being shown
print("\nExited cleanly")
except Exception as e:
print(f"Error: {e}")

View file

@ -0,0 +1,51 @@
#!/usr/bin/env python3
import asyncio
import argparse
from meshcore import MeshCore
from meshcore.events import EventType
async def main():
parser = argparse.ArgumentParser(description='MeshCore RF Packet Monitor')
parser.add_argument('-p', '--port', required=True, help='Serial port path')
args = parser.parse_args()
try:
# Connect to device
print(f"Connecting to {args.port}...")
mc = await MeshCore.create_serial(args.port, 115200)
def handle_rf_packet(event):
packet = event.payload
if isinstance(packet, dict):
print(f"Raw RF packet received:")
if 'snr' in packet:
print(f" SNR: {packet['snr']:.1f} dB")
if 'rssi' in packet:
print(f" RSSI: {packet['rssi']} dBm")
if 'payload_length' in packet:
print(f" Payload length: {packet['payload_length']} bytes")
if 'payload' in packet:
print(f" Payload (hex): {packet['payload']}")
else:
print(f"RF packet received: {packet}")
# Subscribe to RF log data
subscription = mc.subscribe(EventType.RX_LOG_DATA, handle_rf_packet)
print("Waiting for log data (press Ctrl+C to exit)...")
try:
# Keep the script running to receive logs
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nExiting...")
# Clean up
mc.unsubscribe(subscription)
await mc.disconnect()
except Exception as e:
print(f"Error: {e}")
if __name__ == '__main__':
asyncio.run(main())

View file

@ -0,0 +1,53 @@
#!/usr/bin/env python3
"""
Simple example of eventing approach with battery monitoring
"""
import asyncio
import argparse
from meshcore import MeshCore
from meshcore.events import EventType
async def main():
parser = argparse.ArgumentParser(description="Battery Monitor Example")
parser.add_argument("--port", "-p", help="Serial port", required=True)
parser.add_argument("--baud", "-b", type=int, help="Baud rate", default=115200)
args = parser.parse_args()
print(f"Connecting to {args.port}...")
# Connect to device
meshcore = await MeshCore.create_serial(args.port, args.baud)
# Event handler for battery updates
async def on_battery(event):
print(f"Battery event: {event.payload}mV")
# Subscribe to battery events
meshcore.subscribe(EventType.BATTERY, on_battery)
# Background task that checks battery every 10 seconds
async def check_battery():
while True:
print("Requesting battery level...")
await meshcore.commands.get_bat() # This command will trigger a battery event
await asyncio.sleep(10)
# Start the battery check task
task = asyncio.create_task(check_battery())
try:
# Keep program running
print("Monitoring battery (press Ctrl+C to exit)...")
await asyncio.sleep(float('inf'))
except KeyboardInterrupt:
print("\nExiting...")
finally:
# Clean up
task.cancel()
await meshcore.disconnect()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -2,6 +2,7 @@
import asyncio
import json
from meshcore import MeshCore
from meshcore import SerialConnection
@ -15,6 +16,6 @@ async def main () :
mc = MeshCore(con)
await mc.connect()
print(json.dumps(await mc.get_contacts(),indent=4))
print(json.dumps(await mc.commands.get_contacts(),indent=4))
asyncio.run(main())

View file

@ -3,19 +3,15 @@
import asyncio
from meshcore import MeshCore
from meshcore import SerialConnection
PORT = "/dev/ttyUSB0"
PORT = "/dev/tty.usbserial-583A0069501"
BAUDRATE = 115200
async def main () :
con = SerialConnection(PORT, BAUDRATE)
await con.connect()
await asyncio.sleep(0.1) # time for transport to establish
mc = MeshCore(con)
await mc.connect()
async def main():
mc = await MeshCore.create_serial(PORT, BAUDRATE)
print(mc.self_info)
await mc.disconnect()
asyncio.run(main())

View file

@ -1,24 +1,67 @@
#!/usr/bin/python
"""
Example of sending a message and waiting for its specific acknowledgment
using event attribute filtering.
"""
import asyncio
import json
from meshcore import MeshCore
from meshcore import SerialConnection
import argparse
from meshcore import MeshCore, EventType
PORT = "/dev/ttyUSB0"
BAUDRATE = 115200
DEST = "mchome"
MSG = "hello from serial"
async def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Send a message and wait for ACK")
parser.add_argument("-p", "--port", required=True, help="Serial port")
parser.add_argument("-b", "--baudrate", type=int, default=115200, help="Baud rate")
parser.add_argument("-d", "--dest", default="🦄", help="Destination contact name")
parser.add_argument("-m", "--message", default="hello from serial", help="Message to send")
parser.add_argument("--timeout", type=float, default=30.0, help="ACK timeout in seconds")
args = parser.parse_args()
async def main () :
con = SerialConnection(PORT, BAUDRATE)
await con.connect()
await asyncio.sleep(0.1) # time for transport to establish
# Connect to the device
mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True)
mc = MeshCore(con)
await mc.connect()
try:
# Make sure we have contacts loaded
await mc.ensure_contacts()
# Find the contact by name
contact = mc.get_contact_by_name(args.dest)
if not contact:
print(f"Contact '{args.dest}' not found. Available contacts:")
for name, c in mc.contacts.items():
print(f"- {c.get('adv_name', 'Unknown')}")
return
print(f"Found contact: {contact.get('adv_name')} ({contact['public_key'][:12]}...)")
# Send the message and get the MSG_SENT event
print(f"Sending message: '{args.message}'")
send_result = await mc.commands.send_msg(
bytes.fromhex(contact["public_key"])[0:6],
args.message
)
# Extract the expected ACK code
expected_ack = send_result["expected_ack"].hex()
print(f"Message sent, waiting for ACK with code: {expected_ack}")
# Wait for the specific ACK that matches our message
ack_event = await mc.wait_for_event(
EventType.ACK,
attribute_filters={"code": expected_ack},
timeout=args.timeout
)
if ack_event:
print(f"✅ Message confirmed delivered! (ACK received)")
else:
print(f"⚠️ Timed out waiting for ACK after {args.timeout} seconds")
finally:
# Always disconnect
await mc.disconnect()
await mc.ensure_contacts()
await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

View file

@ -3,31 +3,27 @@
import asyncio
from meshcore import MeshCore
from meshcore import printerr
from meshcore import SerialConnection
from meshcore.events import EventType
PORT = "/dev/ttyUSB0"
PORT = "/dev/tty.usbserial-583A0069501"
BAUDRATE = 115200
REPEATER="FdlRoom"
PASSWORD="password"
REPEATER="Orion"
PASSWORD="floopyboopy"
async def main () :
con = SerialConnection(PORT, BAUDRATE)
await con.connect()
await asyncio.sleep(0.1) # time for transport to establish
mc = await MeshCore.create_serial(PORT, BAUDRATE)
await mc.commands.get_contacts()
repeater = mc.get_contact_by_name(REPEATER)
await mc.commands.send_login(bytes.fromhex(repeater["public_key"]), PASSWORD)
mc = MeshCore(con)
await mc.connect()
print("Login sent ... awaiting")
contacts = await mc.get_contacts()
repeater = contacts[REPEATER]
await mc.send_login(bytes.fromhex(repeater["public_key"]), PASSWORD)
printerr("Login sent ... awaiting")
if await mc.wait_login() :
await mc.send_statusreq(bytes.fromhex(repeater["public_key"]))
print(await mc.wait_status())
if await mc.wait_for_event(EventType.LOGIN_SUCCESS):
print("Logged in success")
await mc.commands.send_statusreq(bytes.fromhex(repeater["public_key"]))
print("Status request sent ... awaiting")
print(await mc.wait_for_event(EventType.STATUS_RESPONSE))
asyncio.run(main())

80
examples/serial_trace.py Normal file
View file

@ -0,0 +1,80 @@
#!/usr/bin/env python3
import asyncio
import argparse
import random
from meshcore import MeshCore
from meshcore.events import EventType
async def get_repeater_name(mc, hash_prefix):
"""Find a contact by its 2-character hash prefix and return its name"""
# Ensure contacts are available
await mc.ensure_contacts()
# Find contact with matching hash prefix
contact = mc.get_contact_by_key_prefix(hash_prefix)
if contact:
return contact.get("adv_name", f"Unknown ({hash_prefix})")
else:
return f"Unknown ({hash_prefix})"
async def main():
parser = argparse.ArgumentParser(description='MeshCore Serial Trace Example')
parser.add_argument('-p', '--port', required=True, help='Serial port path')
parser.add_argument('--path', type=str, help='Trace path (comma-separated hex values)')
args = parser.parse_args()
try:
# Connect to device
print(f"Connecting to {args.port}...")
mc = await MeshCore.create_serial(args.port, 115200, debug=True)
# Send trace packet
print(f"Sending trace packet...")
# Send trace with a path if provided
tag = random.randint(1, 0xFFFFFFFF)
result = await mc.commands.send_trace(path=args.path, tag=tag)
# Check if the result has a success indicator
if result.get("success") == False:
print(f"Failed to send trace packet: {result.get('reason', 'unknown error')}")
elif result:
print(f"Trace packet sent successfully with tag={tag}")
print("Waiting for trace response matching our tag...")
# Wait for a trace response with our specific tag
event = await mc.wait_for_event(
EventType.TRACE_DATA,
attribute_filters={"tag": tag},
timeout=15
)
if event:
trace = event.payload
print(f"Trace data received:")
print(f" Tag: {trace['tag']}")
print(f" Flags: {trace.get('flags', 0)}")
print(f" Path Length: {trace.get('path_len', 0)}")
if trace.get('path'):
print(f" Path ({len(trace['path'])} nodes):")
# Process nodes with hash (repeaters)
for i, node in enumerate(trace['path']):
if 'hash' in node:
# Look up repeater name
repeater_name = await get_repeater_name(mc, node['hash'])
print(f" Node {i+1}: {repeater_name}, SNR={node['snr']:.1f} dB")
else:
print(f" Node {i+1}: SNR={node['snr']:.1f} dB (final node)")
else:
print("No trace response received within timeout")
else:
print("Failed to send trace packet")
await mc.disconnect()
except Exception as e:
print(f"Error: {e}")
if __name__ == '__main__':
asyncio.run(main())

View file

@ -14,5 +14,5 @@ async def main () :
mc = MeshCore(con)
await mc.connect()
print(json.dumps(await mc.get_contacts(),indent=4))
print(json.dumps(await mc.commands.get_contacts(),indent=4))
asyncio.run(main())

View file

@ -3,17 +3,15 @@
import asyncio
from meshcore import MeshCore
from meshcore import TCPConnection
HOSTNAME = "mchome"
PORT = 5000
async def main () :
con = TCPConnection(HOSTNAME, PORT)
await con.connect()
mc = MeshCore(con)
await mc.connect()
async def main():
mc = await MeshCore.create_tcp(HOSTNAME, PORT)
print(mc.self_info)
await mc.disconnect()
asyncio.run(main())

View file

@ -17,6 +17,6 @@ async def main () :
await mc.connect()
await mc.ensure_contacts()
await mc.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG)
await mc.commands.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6],MSG)
asyncio.run(main())

View file

@ -18,8 +18,10 @@ async def main () :
res = True
while res:
res = await mc.get_msg()
if res :
print (res)
result = await mc.commands.get_msg()
if result.get("success") == False:
res = False
print("No more messages")
print (result)
asyncio.run(main())

View file

@ -1,5 +1,10 @@
from meshcore.meshcore import printerr
from meshcore.meshcore import MeshCore
import logging
# Setup default logger
logging.basicConfig(level=logging.INFO)
from meshcore.events import EventType
from meshcore.meshcore import MeshCore, logger
from meshcore.tcp_cx import TCPConnection
from meshcore.ble_cx import BLEConnection
from meshcore.serial_cx import SerialConnection

View file

@ -2,9 +2,10 @@
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import sys
import logging
from meshcore import printerr
# Get logger
logger = logging.getLogger("meshcore")
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
@ -40,11 +41,11 @@ class BLEConnection:
if self.address is None or self.address == "" or len(self.address.split(":")) != 6 :
scanner = BleakScanner()
printerr("Scanning for devices")
logger.info("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device)
if device is None :
return None
printerr(f"Found device : {device}")
logger.info(f"Found device : {device}")
self.client = BleakClient(device)
self.address = self.client.address
else:
@ -60,24 +61,33 @@ class BLEConnection:
await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx)
nus = self.client.services.get_service(UART_SERVICE_UUID)
if nus is None:
logger.error("Could not find UART service")
return None
self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID)
printerr("BLE Connexion started")
logger.info("BLE Connection started")
return self.address
def handle_disconnect(self, _: BleakClient):
""" Callback to handle disconnection """
printerr ("Device was disconnected, goodbye.")
logger.info("Device was disconnected, goodbye.")
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
def set_mc(self, mc) :
self.mc = mc
def set_reader(self, reader) :
self.reader = reader
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
if not self.mc is None:
self.mc.handle_rx(data)
if not self.reader is None:
asyncio.create_task(self.reader.handle_rx(data))
async def send(self, data):
if not self.client:
logger.error("Client is not connected")
return False
if not self.rx_char:
logger.error("RX characteristic not found")
return False
await self.client.write_gatt_char(self.rx_char, bytes(data), response=False)

269
src/meshcore/commands.py Normal file
View file

@ -0,0 +1,269 @@
import asyncio
import logging
from typing import Any, Dict
from .events import EventType
import random
logger = logging.getLogger("meshcore")
class CommandHandler:
DEFAULT_TIMEOUT = 5.0
def __init__(self, default_timeout=None):
self._sender_func = None
self._reader = None
self.dispatcher = None
self.default_timeout = default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT
def set_connection(self, connection):
async def sender(data):
await connection.send(data)
self._sender_func = sender
def set_reader(self, reader):
self._reader = reader
def set_dispatcher(self, dispatcher):
self.dispatcher = dispatcher
async def send(self, data, expected_events=None, timeout=None) -> Dict[str, Any]:
"""
Send a command and wait for expected event responses.
Args:
data: The data to send
expected_events: EventType or list of EventTypes to wait for
timeout: Timeout in seconds, or None to use default_timeout
Returns:
Dict[str, Any]: Dictionary containing the response data or status
"""
if not self.dispatcher:
raise RuntimeError("Dispatcher not set, cannot send commands")
# Use the provided timeout or fall back to default_timeout
timeout = timeout if timeout is not None else self.default_timeout
if self._sender_func:
logger.debug(f"Sending raw data: {data.hex() if isinstance(data, bytes) else data}")
await self._sender_func(data)
if expected_events:
try:
# Convert single event to list if needed
if not isinstance(expected_events, list):
expected_events = [expected_events]
logger.debug(f"Waiting for events {expected_events}, timeout={timeout}")
for event_type in expected_events:
# don't apply any filters for now, might change later
event = await self.dispatcher.wait_for_event(event_type, {}, timeout)
if event:
return event.payload
return {"success": False, "reason": "no_event_received"}
except asyncio.TimeoutError:
logger.debug(f"Command timed out {data}")
return {"success": False, "reason": "timeout"}
except Exception as e:
logger.debug(f"Command error: {e}")
return {"error": str(e)}
return {"success": True}
async def send_appstart(self):
logger.debug("Sending appstart command")
b1 = bytearray(b'\x01\x03 mccli')
return await self.send(b1, [EventType.SELF_INFO])
async def send_device_query(self):
logger.debug("Sending device query command")
return await self.send(b"\x16\x03", [EventType.DEVICE_INFO, EventType.ERROR])
async def send_advert(self, flood=False):
logger.debug(f"Sending advertisement command (flood={flood})")
if flood:
return await self.send(b"\x07\x01", [EventType.OK, EventType.ERROR])
else:
return await self.send(b"\x07", [EventType.OK, EventType.ERROR])
async def set_name(self, name):
logger.debug(f"Setting device name to: {name}")
return await self.send(b'\x08' + name.encode("ascii"), [EventType.OK, EventType.ERROR])
async def set_coords(self, lat, lon):
logger.debug(f"Setting coordinates to: lat={lat}, lon={lon}")
return await self.send(b'\x0e'\
+ int(lat*1e6).to_bytes(4, 'little', signed=True)\
+ int(lon*1e6).to_bytes(4, 'little', signed=True)\
+ int(0).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def reboot(self):
logger.debug("Sending reboot command")
return await self.send(b'\x13reboot')
async def get_bat(self):
logger.debug("Getting battery information")
return await self.send(b'\x14', [EventType.BATTERY, EventType.ERROR])
async def get_time(self):
logger.debug("Getting device time")
return await self.send(b"\x05", [EventType.CURRENT_TIME, EventType.ERROR])
async def set_time(self, val):
logger.debug(f"Setting device time to: {val}")
return await self.send(b"\x06" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def set_tx_power(self, val):
logger.debug(f"Setting TX power to: {val}")
return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def set_radio(self, freq, bw, sf, cr):
logger.debug(f"Setting radio params: freq={freq}, bw={bw}, sf={sf}, cr={cr}")
return await self.send(b"\x0b" \
+ int(float(freq)*1000).to_bytes(4, 'little')\
+ int(float(bw)*1000).to_bytes(4, 'little')\
+ int(sf).to_bytes(1, 'little')\
+ int(cr).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR])
async def set_tuning(self, rx_dly, af):
logger.debug(f"Setting tuning params: rx_dly={rx_dly}, af={af}")
return await self.send(b"\x15" \
+ int(rx_dly).to_bytes(4, 'little')\
+ int(af).to_bytes(4, 'little')\
+ int(0).to_bytes(1, 'little')\
+ int(0).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR])
async def set_devicepin(self, pin):
logger.debug(f"Setting device PIN to: {pin}")
return await self.send(b"\x25" \
+ int(pin).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def get_contacts(self):
logger.debug("Getting contacts")
return await self.send(b"\x04", [EventType.CONTACTS, EventType.ERROR])
async def reset_path(self, key):
logger.debug(f"Resetting path for contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x0D" + key
return await self.send(data, [EventType.OK, EventType.ERROR])
async def share_contact(self, key):
logger.debug(f"Sharing contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x10" + key
return await self.send(data, [EventType.CONTACT_SHARE, EventType.ERROR])
async def export_contact(self, key=b""):
logger.debug(f"Exporting contact: {key.hex() if key else 'all'}")
data = b"\x11" + key
return await self.send(data, [EventType.OK, EventType.ERROR])
async def remove_contact(self, key):
logger.debug(f"Removing contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x0f" + key
return await self.send(data, [EventType.OK, EventType.ERROR])
async def get_msg(self, timeout=1):
logger.debug("Requesting pending messages")
return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], timeout)
async def send_login(self, dst, pwd):
logger.debug(f"Sending login request to: {dst.hex() if isinstance(dst, bytes) else dst}")
data = b"\x1a" + dst + pwd.encode("ascii")
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_logout(self, dst):
self.login_resp = asyncio.Future()
data = b"\x1d" + dst
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_statusreq(self, dst):
logger.debug(f"Sending status request to: {dst.hex() if isinstance(dst, bytes) else dst}")
data = b"\x1b" + dst
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_cmd(self, dst, cmd, timestamp=None):
logger.debug(f"Sending command to {dst.hex() if isinstance(dst, bytes) else dst}: {cmd}")
# Default to current time if timestamp not provided
if timestamp is None:
import time
timestamp = int(time.time()).to_bytes(4, 'little')
data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii")
return await self.send(data, [EventType.OK, EventType.ERROR])
async def send_msg(self, dst, msg, timestamp=None):
logger.debug(f"Sending message to {dst.hex() if isinstance(dst, bytes) else dst}: {msg}")
# Default to current time if timestamp not provided
if timestamp is None:
import time
timestamp = int(time.time()).to_bytes(4, 'little')
data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii")
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_chan_msg(self, chan, msg, timestamp=None):
logger.debug(f"Sending channel message to channel {chan}: {msg}")
# Default to current time if timestamp not provided
if timestamp is None:
import time
timestamp = int(time.time()).to_bytes(4, 'little')
data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii")
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_cli(self, cmd):
logger.debug(f"Sending CLI command: {cmd}")
data = b"\x32" + cmd.encode('ascii')
return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR])
async def send_trace(self, auth_code=0, tag=None, flags=0, path=None):
"""
Send a trace packet to test routing through specific repeaters
Args:
auth_code: 32-bit authentication code (default: 0)
tag: 32-bit integer to identify this trace (default: random)
flags: 8-bit flags field (default: 0)
path: Optional string with comma-separated hex values representing repeater pubkeys (e.g. "23,5f,3a")
or a bytes/bytearray object with the raw path data
Returns:
Dictionary with sent status, tag, and estimated timeout in milliseconds, or False if command failed
"""
# Generate random tag if not provided
if tag is None:
tag = random.randint(1, 0xFFFFFFFF)
if auth_code is None:
auth_code = random.randint(1, 0xFFFFFFFF)
logger.debug(f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path}")
# Prepare the command packet: CMD(1) + tag(4) + auth_code(4) + flags(1) + [path]
cmd_data = bytearray([36]) # CMD_SEND_TRACE_PATH
cmd_data.extend(tag.to_bytes(4, 'little'))
cmd_data.extend(auth_code.to_bytes(4, 'little'))
cmd_data.append(flags)
# Process path if provided
if path:
if isinstance(path, str):
# Convert comma-separated hex values to bytes
try:
path_bytes = bytearray()
for hex_val in path.split(','):
hex_val = hex_val.strip()
path_bytes.append(int(hex_val, 16))
cmd_data.extend(path_bytes)
except ValueError as e:
logger.error(f"Invalid path format: {e}")
return { "success": False, "reason": "invalid_path_format" }
elif isinstance(path, (bytes, bytearray)):
cmd_data.extend(path)
else:
logger.error(f"Unsupported path type: {type(path)}")
return { "success": False, "reason": "unsupported_path_type" }
return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR])

184
src/meshcore/events.py Normal file
View file

@ -0,0 +1,184 @@
from enum import Enum
import logging
from math import log
from typing import Any, Dict, Optional, Callable, List, Union
import asyncio
from dataclasses import dataclass, field
logger = logging.getLogger("meshcore")
# Public event types for users to subscribe to
class EventType(Enum):
CONTACTS = "contacts"
SELF_INFO = "self_info"
CONTACT_MSG_RECV = "contact_message"
CHANNEL_MSG_RECV = "channel_message"
CURRENT_TIME = "time_update"
NO_MORE_MSGS = "no_more_messages"
CONTACT_SHARE = "contact_share"
BATTERY = "battery_info"
DEVICE_INFO = "device_info"
CLI_RESPONSE = "cli_response"
MSG_SENT = "message_sent"
# Push notifications
ADVERTISEMENT = "advertisement"
PATH_UPDATE = "path_update"
ACK = "acknowledgement"
MESSAGES_WAITING = "messages_waiting"
RAW_DATA = "raw_data"
LOGIN_SUCCESS = "login_success"
LOGIN_FAILED = "login_failed"
STATUS_RESPONSE = "status_response"
LOG_DATA = "log_data"
TRACE_DATA = "trace_data"
RX_LOG_DATA = "rx_log_data"
# Command response types
OK = "command_ok"
ERROR = "command_error"
@dataclass
class Event:
type: EventType
payload: Any
attributes: Dict[str, Any] = field(default_factory=dict)
def __init__(self, type: EventType, payload: Any, attributes: Optional[Dict[str, Any]] = None, **kwargs):
"""
Initialize an Event
Args:
type: The event type
payload: The event payload
attributes: Dictionary of event attributes for filtering
**kwargs: Additional attributes to add to the attributes dictionary
"""
self.type = type
self.payload = payload
self.attributes = attributes or {}
# Add any keyword arguments to the attributes dictionary
if kwargs:
self.attributes.update(kwargs)
class Subscription:
def __init__(self, dispatcher, event_type, callback, attribute_filters=None):
self.dispatcher = dispatcher
self.event_type = event_type
self.callback = callback
self.attribute_filters = attribute_filters or {}
def unsubscribe(self):
self.dispatcher._remove_subscription(self)
class EventDispatcher:
def __init__(self):
self.queue = asyncio.Queue()
self.subscriptions: List[Subscription] = []
self.running = False
self._task = None
def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]],
attribute_filters: Optional[Dict[str, Any]] = None) -> Subscription:
"""
Subscribe to events with optional attribute filtering.
Parameters:
-----------
event_type : EventType or None
The type of event to subscribe to, or None to subscribe to all events.
callback : Callable
Function to call when a matching event is received.
attribute_filters : Dict[str, Any], optional
Dictionary of attribute key-value pairs that must match for the event to trigger the callback.
Returns:
--------
Subscription object that can be used to unsubscribe.
"""
subscription = Subscription(self, event_type, callback, attribute_filters)
self.subscriptions.append(subscription)
return subscription
def _remove_subscription(self, subscription: Subscription):
if subscription in self.subscriptions:
self.subscriptions.remove(subscription)
async def dispatch(self, event: Event):
await self.queue.put(event)
async def _process_events(self):
while self.running:
event = await self.queue.get()
logger.debug(f"Dispatching event: {event.type}, {event.payload}, {event.attributes}")
for subscription in self.subscriptions.copy():
# Check if event type matches
if subscription.event_type is None or subscription.event_type == event.type:
# Check if all attribute filters match
if subscription.attribute_filters:
# Skip if any filter doesn't match the corresponding event attribute
if not all(event.attributes.get(key) == value
for key, value in subscription.attribute_filters.items()):
continue
try:
result = subscription.callback(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
print(f"Error in event handler: {e}")
self.queue.task_done()
async def start(self):
if not self.running:
self.running = True
self._task = asyncio.create_task(self._process_events())
async def stop(self):
if self.running:
self.running = False
if self._task:
await self.queue.join()
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None,
timeout: float | None = None) -> Optional[Event]:
"""
Wait for an event of the specified type that matches all attribute filters.
Parameters:
-----------
event_type : EventType
The type of event to wait for.
attribute_filters : Dict[str, Any], optional
Dictionary of attribute key-value pairs that must match for the event to be returned.
timeout : float | None, optional
Maximum time to wait for the event, in seconds.
Returns:
--------
The matched event, or None if timeout occurred before a matching event.
"""
future = asyncio.Future()
def event_handler(event: Event):
if not future.done():
future.set_result(event)
subscription = self.subscribe(event_type, event_handler, attribute_filters)
try:
return await asyncio.wait_for(future, timeout)
except asyncio.TimeoutError:
return None
finally:
subscription.unsubscribe()

View file

@ -1,419 +1,306 @@
"""
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import sys
import logging
from typing import Optional, Dict, Any, Union
def printerr (s) :
sys.stderr.write(str(s))
sys.stderr.write("\n")
sys.stderr.flush()
from .events import EventDispatcher, EventType
from .reader import MessageReader
from .commands import CommandHandler
# Setup default logger
logger = logging.getLogger("meshcore")
class MeshCore:
"""
Interface to a BLE MeshCore device
Interface to a MeshCore device
"""
self_info={}
contacts={}
def __init__(self, cx):
""" Constructor : specify address """
self.time = 0
self.result = asyncio.Future()
self.contact_nb = 0
self.rx_sem = asyncio.Semaphore(0)
self.ack_ev = asyncio.Event()
self.login_resp = asyncio.Future()
self.status_resp = asyncio.Future()
def __init__(self, cx, debug=False, default_timeout=None):
self.cx = cx
cx.set_mc(self)
async def connect(self) :
await self.send_appstart()
def handle_rx(self, data: bytearray):
""" Callback to handle received data """
match data[0]:
case 0: # ok
if len(data) == 5 : # an integer
self.result.set_result(int.from_bytes(data[1:5], byteorder='little'))
else:
self.result.set_result(True)
case 1: # error
if len(data) > 1:
res = {}
res["error_code"] = data[1]
self.result.set_result(res) # error code if fw > 1.4
else:
self.result.set_result(False)
case 2: # contact start
self.contact_nb = int.from_bytes(data[1:5], byteorder='little')
self.contacts={}
case 3: # contact
c = {}
c["public_key"] = data[1:33].hex()
c["type"] = data[33]
c["flags"] = data[34]
c["out_path_len"] = int.from_bytes(data[35:36], signed=True)
plen = int.from_bytes(data[35:36], signed=True)
if plen == -1 :
plen = 0
c["out_path"] = data[36:36+plen].hex()
c["adv_name"] = data[100:132].decode().replace("\0","")
c["last_advert"] = int.from_bytes(data[132:136], byteorder='little')
c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6
c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6
c["lastmod"] = int.from_bytes(data[144:148], byteorder='little')
self.contacts[c["adv_name"]]=c
case 4: # end of contacts
self.result.set_result(self.contacts)
case 5: # self info
self.self_info["adv_type"] = data[1]
self.self_info["tx_power"] = data[2]
self.self_info["max_tx_power"] = data[3]
self.self_info["public_key"] = data[4:36].hex()
self.self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6
self.self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6
#self.self_info["reserved_44:48"] = data[44:48].hex()
self.self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000
self.self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000
self.self_info["radio_sf"] = data[56]
self.self_info["radio_cr"] = data[57]
self.self_info["name"] = data[58:].decode()
self.result.set_result(True)
case 6: # msg sent
res = {}
res["type"] = data[1]
res["expected_ack"] = bytes(data[2:6])
res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little')
self.result.set_result(res)
case 7: # contact msg recv
res = {}
res["type"] = "PRIV"
res["pubkey_prefix"] = data[1:7].hex()
res["path_len"] = data[7]
res["txt_type"] = data[8]
res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little')
if data[8] == 2 : # signed packet
res["signature"] = data[13:17].hex()
res["text"] = data[17:].decode()
else :
res["text"] = data[13:].decode()
self.result.set_result(res)
case 16: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "PRIV"
res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4;
res["pubkey_prefix"] = data[4:10].hex()
res["path_len"] = data[10]
res["txt_type"] = data[11]
res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little')
if data[11] == 2 : # signed packet
res["signature"] = data[16:20].hex()
res["text"] = data[20:].decode()
else :
res["text"] = data[16:].decode()
self.result.set_result(res)
case 8 : # chanel msg recv
res = {}
res["type"] = "CHAN"
res["channel_idx"] = data[1]
res["path_len"] = data[2]
res["txt_type"] = data[3]
res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little')
res["text"] = data[8:].decode()
self.result.set_result(res)
case 17: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "CHAN"
res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4;
res["channel_idx"] = data[4]
res["path_len"] = data[5]
res["txt_type"] = data[6]
res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little')
res["text"] = data[11:].decode()
self.result.set_result(res)
case 9: # current time
self.result.set_result(int.from_bytes(data[1:5], byteorder='little'))
case 10: # no more msgs
self.result.set_result(False)
case 11: # contact
self.result.set_result("meshcore://" + data[1:].hex())
case 12: # battery voltage
self.result.set_result(int.from_bytes(data[1:3], byteorder='little'))
case 13: # device info
res = {}
res["fw ver"] = data[1]
if data[1] >= 3:
res["max_contacts"] = data[2] * 2
res["max_channels"] = data[3]
res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little')
res["fw_build"] = data[8:20].decode().replace("\0","")
res["model"] = data[20:60].decode().replace("\0","")
res["ver"] = data[60:80].decode().replace("\0","")
self.result.set_result(res)
case 50: # cli response
res = {}
res["response"] = data[1:].decode()
self.result.set_result(res)
# push notifications
case 0x80:
printerr ("Advertisment received")
case 0x81:
printerr ("Code path update")
case 0x82:
self.ack_ev.set()
printerr ("Received ACK")
case 0x83:
self.rx_sem.release()
printerr ("Msgs are waiting")
case 0x84:
printerr ("Received raw data")
res = {}
res["SNR"] = data[1] / 4
res["RSSI"] = data[2]
res["payload"] = data[4:].hex()
print(res)
case 0x85:
self.login_resp.set_result(True)
printerr ("Login success")
case 0x86:
self.login_resp.set_result(False)
printerr ("Login failed")
case 0x87:
res = {}
res["pubkey_pre"] = data[2:8].hex()
res["bat"] = int.from_bytes(data[8:10], byteorder='little')
res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little')
res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little')
res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True)
res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False)
res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False)
res["airtime"] = int.from_bytes(data[24:28], byteorder='little')
res["uptime"] = int.from_bytes(data[28:32], byteorder='little')
res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little')
res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little')
res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little')
res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little')
res["full_evts"] = int.from_bytes(data[48:50], byteorder='little')
res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4
res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little')
res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little')
self.status_resp.set_result(res)
data_hex = data[8:].hex()
printerr (f"Status response: {data_hex}")
#printerr(res)
case 0x88:
printerr ("Received log data")
# unhandled
case _:
printerr (f"Unhandled data received {data}")
async def send(self, data, timeout = 5):
""" Helper function to synchronously send (and receive) data to the node """
self.result = asyncio.Future()
try:
await self.cx.send(data)
res = await asyncio.wait_for(self.result, timeout)
return res
except TimeoutError :
printerr ("Timeout while sending message ...")
return False
async def send_only(self, data): # don't wait reply
await self.cx.send(data)
async def send_appstart(self):
""" Send APPSTART to the node """
b1 = bytearray(b'\x01\x03 mccli')
return await self.send(b1)
async def send_device_qeury(self):
return await self.send(b"\x16\x03");
async def send_advert(self, flood=False):
""" Make the node send an advertisement """
if flood :
return await self.send(b"\x07\x01")
else :
return await self.send(b"\x07")
async def set_name(self, name):
""" Changes the name of the node """
return await self.send(b'\x08' + name.encode("ascii"))
async def set_coords(self, lat, lon):
return await self.send(b'\x0e'\
+ int(lat*1e6).to_bytes(4, 'little', signed=True)\
+ int(lon*1e6).to_bytes(4, 'little', signed=True)\
+ int(0).to_bytes(4, 'little'))
async def reboot(self):
await self.send_only(b'\x13reboot')
return True
async def get_bat(self):
return await self.send(b'\x14')
async def get_time(self):
""" Get the time (epoch) of the node """
self.time = await self.send(b"\x05")
return self.time
async def set_time(self, val):
""" Sets a new epoch """
return await self.send(b"\x06" + int(val).to_bytes(4, 'little'))
async def set_tx_power(self, val):
""" Sets tx power """
return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'))
async def set_radio (self, freq, bw, sf, cr):
""" Sets radio params """
return await self.send(b"\x0b" \
+ int(float(freq)*1000).to_bytes(4, 'little')\
+ int(float(bw)*1000).to_bytes(4, 'little')\
+ int(sf).to_bytes(1, 'little')\
+ int(cr).to_bytes(1, 'little'))
async def set_tuning (self, rx_dly, af):
""" Sets radio params """
return await self.send(b"\x15" \
+ int(rx_dly).to_bytes(4, 'little')\
+ int(af).to_bytes(4, 'little')\
+ int(0).to_bytes(1, 'little')\
+ int(0).to_bytes(1, 'little'))
async def set_devicepin (self, pin):
return await self.send(b"\x25" \
+ int(pin).to_bytes(4, 'little'))
async def get_contacts(self):
""" Starts retreiving contacts """
return await self.send(b"\x04")
self.dispatcher = EventDispatcher()
self._reader = MessageReader(self.dispatcher)
self.commands = CommandHandler(default_timeout=default_timeout)
# Set up logger
if debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# Set up connections
self.commands.set_connection(cx)
# Set the dispatcher in the command handler
self.commands.set_dispatcher(self.dispatcher)
self.commands.set_reader(self._reader)
# Initialize state (private)
self._contacts = {}
self._self_info = {}
self._time = 0
# Set up event subscriptions to track data
self._setup_data_tracking()
cx.set_reader(self._reader)
@classmethod
async def create_tcp(cls, host: str, port: int, debug: bool = False, default_timeout=None) -> 'MeshCore':
"""Create and connect a MeshCore instance using TCP connection"""
from .tcp_cx import TCPConnection
connection = TCPConnection(host, port)
await connection.connect()
mc = cls(connection, debug=debug, default_timeout=default_timeout)
await mc.connect()
return mc
@classmethod
async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False, default_timeout=None) -> 'MeshCore':
"""Create and connect a MeshCore instance using serial connection"""
from .serial_cx import SerialConnection
import asyncio
connection = SerialConnection(port, baudrate)
await connection.connect()
await asyncio.sleep(0.1) # Time for transport to establish
mc = cls(connection, debug=debug, default_timeout=default_timeout)
await mc.connect()
return mc
@classmethod
async def create_ble(cls, address: Optional[str] = None, debug: bool = False, default_timeout=None) -> 'MeshCore':
"""Create and connect a MeshCore instance using BLE connection
If address is None, it will scan for and connect to the first available MeshCore device.
"""
from .ble_cx import BLEConnection
connection = BLEConnection(address)
result = await connection.connect()
if result is None:
raise ConnectionError("Failed to connect to BLE device")
mc = cls(connection, debug=debug, default_timeout=default_timeout)
await mc.connect()
return mc
async def connect(self):
await self.dispatcher.start()
return await self.commands.send_appstart()
async def disconnect(self):
await self.dispatcher.stop()
def stop(self):
"""Synchronously stop the event dispatcher task"""
if self.dispatcher._task and not self.dispatcher._task.done():
self.dispatcher.running = False
self.dispatcher._task.cancel()
def subscribe(self, event_type: EventType, callback, attribute_filters: Optional[Dict[str, Any]] = None):
"""
Subscribe to events using EventType enum with optional attribute filtering
Args:
event_type: Type of event to subscribe to, from EventType enum
callback: Async function to call when event occurs
attribute_filters: Dictionary of attribute key-value pairs that must match for the event to trigger the callback
Returns:
Subscription object that can be used to unsubscribe
Example:
# Subscribe to ACK events where the 'code' attribute has a specific value
mc.subscribe(
EventType.ACK,
my_callback_function,
attribute_filters={'code': 'SUCCESS'}
)
"""
return self.dispatcher.subscribe(event_type, callback, attribute_filters)
def unsubscribe(self, subscription):
"""
Unsubscribe from events using a subscription object
Args:
subscription: Subscription object returned from subscribe()
"""
if subscription:
subscription.unsubscribe()
async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None, timeout=None):
"""
Wait for an event using EventType enum with optional attribute filtering
Args:
event_type: Type of event to wait for, from EventType enum
attribute_filters: Dictionary of attribute key-value pairs to match against the event
timeout: Maximum time to wait in seconds, or None to use default_timeout
Returns:
Event object or None if timeout
Example:
# Wait for an ACK event where the 'code' attribute has a specific value
await mc.wait_for_event(
EventType.ACK,
attribute_filters={'code': 'SUCCESS'},
timeout=30.0
)
"""
# Use the provided timeout or fall back to default_timeout
if timeout is None:
timeout = self.default_timeout
return await self.dispatcher.wait_for_event(event_type, attribute_filters, timeout)
def _setup_data_tracking(self):
"""Set up event subscriptions to track data internally"""
async def _update_contacts(event):
self._contacts = event.payload
async def _update_self_info(event):
self._self_info = event.payload
async def _update_time(event):
self._time = event.payload.get("time", 0)
# Subscribe to events to update internal state
self.subscribe(EventType.CONTACTS, _update_contacts)
self.subscribe(EventType.SELF_INFO, _update_self_info)
self.subscribe(EventType.CURRENT_TIME, _update_time)
# Getter methods for state
@property
def contacts(self):
"""Get the current contacts"""
return self._contacts
@property
def self_info(self):
"""Get device self info"""
return self._self_info
@property
def time(self):
"""Get the current device time"""
return self._time
@property
def default_timeout(self):
"""Get the default timeout for commands"""
return self.commands.default_timeout
@default_timeout.setter
def default_timeout(self, value):
"""Set the default timeout for commands"""
self.commands.default_timeout = value
def get_contact_by_name(self, name) -> Optional[Dict[str, Any]]:
"""
Find a contact by its name (adv_name field)
Args:
name: The name to search for
Returns:
Contact dictionary or None if not found
"""
if not self._contacts:
return None
for _, contact in self._contacts.items():
if contact.get("adv_name", "").lower() == name.lower():
return contact
return None
def get_contact_by_key_prefix(self, prefix) -> Optional[Dict[str, Any]]:
"""
Find a contact by its public key prefix
Args:
prefix: The public key prefix to search for (can be a partial prefix)
Returns:
Contact dictionary or None if not found
"""
if not self._contacts or not prefix:
return None
# Convert the prefix to lowercase for case-insensitive matching
prefix = prefix.lower()
for contact_id, contact in self._contacts.items():
public_key = contact.get("public_key", "").lower()
if public_key.startswith(prefix):
return contact
return None
async def start_auto_message_fetching(self):
"""
Start automatically fetching messages when messages_waiting events are received.
This will continuously check for new messages when the device indicates
messages are waiting.
"""
self._auto_fetch_task = None
self._auto_fetch_running = True
async def _handle_messages_waiting(event):
# Only start a new fetch task if one isn't already running
if not self._auto_fetch_task or self._auto_fetch_task.done():
self._auto_fetch_task = asyncio.create_task(_fetch_messages_loop())
async def _fetch_messages_loop():
while self._auto_fetch_running:
try:
# Request the next message
result = await self.commands.get_msg()
# If we got a NO_MORE_MSGS event or an error, stop fetching
if not result.get("success") or isinstance(result, dict) and "error" in result:
break
# Small delay to prevent overwhelming the device
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error fetching messages: {e}")
break
# Subscribe to MESSAGES_WAITING events
self._auto_fetch_subscription = self.subscribe(EventType.MESSAGES_WAITING, _handle_messages_waiting)
# Check for any pending messages immediately
await self.commands.get_msg()
return self._auto_fetch_subscription
async def stop_auto_message_fetching(self):
"""
Stop automatically fetching messages when messages_waiting events are received.
"""
if hasattr(self, '_auto_fetch_subscription') and self._auto_fetch_subscription:
self.unsubscribe(self._auto_fetch_subscription)
self._auto_fetch_subscription = None
if hasattr(self, '_auto_fetch_running'):
self._auto_fetch_running = False
if hasattr(self, '_auto_fetch_task') and self._auto_fetch_task and not self._auto_fetch_task.done():
self._auto_fetch_task.cancel()
try:
await self._auto_fetch_task # type: ignore
except asyncio.CancelledError:
pass
self._auto_fetch_task = None
async def ensure_contacts(self):
if len(self.contacts) == 0 :
await self.get_contacts()
async def reset_path(self, key):
data = b"\x0D" + key
return await self.send(data)
async def share_contact(self, key):
data = b"\x10" + key
return await self.send(data)
async def export_contact(self, key=b""):
data = b"\x11" + key
return await self.send(data)
async def remove_contact(self, key):
data = b"\x0f" + key
return await self.send(data)
async def set_out_path(self, contact, path):
contact["out_path"] = path
contact["out_path_len"] = -1
contact["out_path_len"] = int(len(path) / 2)
async def update_contact(self, contact):
out_path_hex = contact["out_path"]
out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0"
adv_name_hex = contact["adv_name"].encode().hex()
adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0"
data = b"\x09" \
+ bytes.fromhex(contact["public_key"])\
+ contact["type"].to_bytes(1)\
+ contact["flags"].to_bytes(1)\
+ contact["out_path_len"].to_bytes(1, 'little', signed=True)\
+ bytes.fromhex(out_path_hex)\
+ bytes.fromhex(adv_name_hex)\
+ contact["last_advert"].to_bytes(4, 'little')\
+ int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\
+ int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True)
return await self.send(data)
async def send_login(self, dst, pwd):
self.login_resp = asyncio.Future()
data = b"\x1a" + dst + pwd.encode("ascii")
return await self.send(data)
async def wait_login(self, timeout = 5):
try :
return await asyncio.wait_for(self.login_resp, timeout)
except TimeoutError :
printerr ("Timeout ...")
return False
async def send_logout(self, dst):
self.login_resp = asyncio.Future()
data = b"\x1d" + dst
return await self.send(data)
async def send_statusreq(self, dst):
self.status_resp = asyncio.Future()
data = b"\x1b" + dst
return await self.send(data)
async def wait_status(self, timeout = 5):
try :
return await asyncio.wait_for(self.status_resp, timeout)
except TimeoutError :
printerr ("Timeout...")
return False
async def send_cmd(self, dst, cmd):
""" Send a cmd to a node """
timestamp = (await self.get_time()).to_bytes(4, 'little')
data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii")
#self.ack_ev.clear() # no ack ?
return await self.send(data)
async def send_msg(self, dst, msg):
""" Send a message to a node """
timestamp = (await self.get_time()).to_bytes(4, 'little')
data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii")
self.ack_ev.clear()
return await self.send(data)
async def send_chan_msg(self, chan, msg):
""" Send a message to a public channel """
timestamp = (await self.get_time()).to_bytes(4, 'little')
data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii")
return await self.send(data)
async def get_msg(self):
""" Get message from the node (stored in queue) """
res = await self.send(b"\x0A", 1)
if res is False :
self.rx_sem=asyncio.Semaphore(0) # reset semaphore as there are no msgs in queue
return res
async def wait_msg(self, timeout=-1):
""" Wait for a message """
if timeout == -1 :
await self.rx_sem.acquire()
"""Ensure contacts are fetched"""
if not self._contacts:
await self.commands.get_contacts()
return True
try:
await asyncio.wait_for(self.rx_sem.acquire(), timeout)
return True
except TimeoutError :
printerr("Timeout waiting msg")
return False
async def wait_ack(self, timeout=6):
""" Wait ack """
try:
await asyncio.wait_for(self.ack_ev.wait(), timeout)
return True
except TimeoutError :
printerr("Timeout waiting ack")
return False
async def send_cli(self, cmd):
data = b"\x32" + cmd.encode('ascii')
return await self.send(data)
return False

31
src/meshcore/packets.py Normal file
View file

@ -0,0 +1,31 @@
from enum import Enum
# Packet prefixes for the protocol
class PacketType(Enum):
OK = 0
ERROR = 1
CONTACT_START = 2
CONTACT = 3
CONTACT_END = 4
SELF_INFO = 5
MSG_SENT = 6
CONTACT_MSG_RECV = 7
CHANNEL_MSG_RECV = 8
CURRENT_TIME = 9
NO_MORE_MSGS = 10
CONTACT_SHARE = 11
BATTERY = 12
DEVICE_INFO = 13
CLI_RESPONSE = 50
# Push notifications
ADVERTISEMENT = 0x80
PATH_UPDATE = 0x81
ACK = 0x82
MESSAGES_WAITING = 0x83
RAW_DATA = 0x84
LOGIN_SUCCESS = 0x85
LOGIN_FAILED = 0x86
STATUS_RESPONSE = 0x87
LOG_DATA = 0x88
TRACE_DATA = 0x89

364
src/meshcore/reader.py Normal file
View file

@ -0,0 +1,364 @@
import sys
import logging
import asyncio
from typing import Any, Optional, Dict
from .events import Event, EventType, EventDispatcher
from .packets import PacketType
logger = logging.getLogger("meshcore")
class MessageReader:
def __init__(self, dispatcher: EventDispatcher):
self.dispatcher = dispatcher
# We're only keeping state here that's needed for processing
# before events are dispatched
self.contacts = {} # Temporary storage during contact list building
self.contact_nb = 0 # Used for contact processing
async def handle_rx(self, data: bytearray):
packet_type_value = data[0]
logger.debug(f"Received data: {data.hex()}")
# Handle command responses
if packet_type_value == PacketType.OK.value:
result: Dict[str, Any] = {"success": True}
if len(data) == 5:
result["value"] = int.from_bytes(data[1:5], byteorder='little')
# Dispatch event for the OK response
await self.dispatcher.dispatch(Event(EventType.OK, result))
elif packet_type_value == PacketType.ERROR.value:
if len(data) > 1:
result = {"success": False, "error_code": data[1]}
else:
result = {"success": False}
# Dispatch event for the ERROR response
await self.dispatcher.dispatch(Event(EventType.ERROR, result))
elif packet_type_value == PacketType.CONTACT_START.value:
self.contact_nb = int.from_bytes(data[1:5], byteorder='little')
self.contacts = {}
elif packet_type_value == PacketType.CONTACT.value:
c = {}
c["public_key"] = data[1:33].hex()
c["type"] = data[33]
c["flags"] = data[34]
c["out_path_len"] = int.from_bytes(data[35:36], signed=True)
plen = int.from_bytes(data[35:36], signed=True)
if plen == -1:
plen = 0
c["out_path"] = data[36:36+plen].hex()
c["adv_name"] = data[100:132].decode().replace("\0","")
c["last_advert"] = int.from_bytes(data[132:136], byteorder='little')
c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6
c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6
c["lastmod"] = int.from_bytes(data[144:148], byteorder='little')
self.contacts[c["public_key"]] = c
elif packet_type_value == PacketType.CONTACT_END.value:
await self.dispatcher.dispatch(Event(EventType.CONTACTS, self.contacts))
elif packet_type_value == PacketType.SELF_INFO.value:
self_info = {}
self_info["adv_type"] = data[1]
self_info["tx_power"] = data[2]
self_info["max_tx_power"] = data[3]
self_info["public_key"] = data[4:36].hex()
self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6
self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6
self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000
self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000
self_info["radio_sf"] = data[56]
self_info["radio_cr"] = data[57]
self_info["name"] = data[58:].decode()
await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info))
elif packet_type_value == PacketType.MSG_SENT.value:
res = {}
res["type"] = data[1]
res["expected_ack"] = bytes(data[2:6])
res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little')
attributes = {
"type": res["type"],
"expected_ack": res["expected_ack"].hex()
}
await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes))
elif packet_type_value == PacketType.CONTACT_MSG_RECV.value:
res = {}
res["type"] = "PRIV"
res["pubkey_prefix"] = data[1:7].hex()
res["path_len"] = data[7]
res["txt_type"] = data[8]
res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little')
if data[8] == 2:
res["signature"] = data[13:17].hex()
res["text"] = data[17:].decode()
else:
res["text"] = data[13:].decode()
attributes = {
"pubkey_prefix": res["pubkey_prefix"],
"txt_type": res["txt_type"]
}
await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, attributes))
elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "PRIV"
res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4
res["pubkey_prefix"] = data[4:10].hex()
res["path_len"] = data[10]
res["txt_type"] = data[11]
res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little')
if data[11] == 2:
res["signature"] = data[16:20].hex()
res["text"] = data[20:].decode()
else:
res["text"] = data[16:].decode()
attributes = {
"pubkey_prefix": res["pubkey_prefix"],
"txt_type": res["txt_type"]
}
await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, attributes))
elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value:
res = {}
res["type"] = "CHAN"
res["channel_idx"] = data[1]
res["path_len"] = data[2]
res["txt_type"] = data[3]
res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little')
res["text"] = data[8:].decode()
attributes = {
"channel_idx": res["channel_idx"],
"txt_type": res["txt_type"]
}
await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, attributes))
elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "CHAN"
res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4
res["channel_idx"] = data[4]
res["path_len"] = data[5]
res["txt_type"] = data[6]
res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little')
res["text"] = data[11:].decode()
attributes = {
"channel_idx": res["channel_idx"],
"txt_type": res["txt_type"]
}
await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, attributes))
elif packet_type_value == PacketType.CURRENT_TIME.value:
time_value = int.from_bytes(data[1:5], byteorder='little')
result = {"time": time_value}
await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result))
elif packet_type_value == PacketType.NO_MORE_MSGS.value:
result = {"messages_available": False}
await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result))
elif packet_type_value == PacketType.CONTACT_SHARE.value:
contact_uri = "meshcore://" + data[1:].hex()
result = {"uri": contact_uri}
await self.dispatcher.dispatch(Event(EventType.CONTACT_SHARE, result))
elif packet_type_value == PacketType.BATTERY.value:
battery_level = int.from_bytes(data[1:3], byteorder='little')
result = {"level": battery_level}
await self.dispatcher.dispatch(Event(EventType.BATTERY, result))
elif packet_type_value == PacketType.DEVICE_INFO.value:
res = {}
res["fw ver"] = data[1]
if data[1] >= 3:
res["max_contacts"] = data[2] * 2
res["max_channels"] = data[3]
res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little')
res["fw_build"] = data[8:20].decode().replace("\0","")
res["model"] = data[20:60].decode().replace("\0","")
res["ver"] = data[60:80].decode().replace("\0","")
await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res))
elif packet_type_value == PacketType.CLI_RESPONSE.value:
res = {}
res["response"] = data[1:].decode()
await self.dispatcher.dispatch(Event(EventType.CLI_RESPONSE, res))
# Push notifications
elif packet_type_value == PacketType.ADVERTISEMENT.value:
logger.debug("Advertisement received")
# TODO: Read advertisement attributes
await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, {}))
elif packet_type_value == PacketType.PATH_UPDATE.value:
logger.debug("Code path update")
# TODO: Read path update attributes
await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, {}))
elif packet_type_value == PacketType.ACK.value:
logger.debug("Received ACK")
ack_data = {}
if len(data) >= 5:
ack_data["code"] = bytes(data[1:5]).hex()
attributes = {
"code": ack_data.get("code", "")
}
await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes))
elif packet_type_value == PacketType.MESSAGES_WAITING.value:
logger.debug("Msgs are waiting")
await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {}))
elif packet_type_value == PacketType.RAW_DATA.value:
res = {}
res["SNR"] = data[1] / 4
res["RSSI"] = data[2]
res["payload"] = data[4:].hex()
logger.debug("Received raw data")
print(res)
await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res))
elif packet_type_value == PacketType.LOGIN_SUCCESS.value:
logger.debug("Login success")
# TODO: Read login attributes
await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, {}))
elif packet_type_value == PacketType.LOGIN_FAILED.value:
logger.debug("Login failed")
# TODO: Read login attributes
await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, {}))
elif packet_type_value == PacketType.STATUS_RESPONSE.value:
res = {}
res["pubkey_pre"] = data[2:8].hex()
res["bat"] = int.from_bytes(data[8:10], byteorder='little')
res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little')
res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little')
res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True)
res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False)
res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False)
res["airtime"] = int.from_bytes(data[24:28], byteorder='little')
res["uptime"] = int.from_bytes(data[28:32], byteorder='little')
res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little')
res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little')
res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little')
res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little')
res["full_evts"] = int.from_bytes(data[48:50], byteorder='little')
res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4
res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little')
res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little')
data_hex = data[8:].hex()
logger.debug(f"Status response: {data_hex}")
attributes = {
"pubkey_prefix": res["pubkey_pre"],
}
await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res, attributes))
elif packet_type_value == PacketType.LOG_DATA.value:
logger.debug(f"Received RF log data: {data.hex()}")
# Parse as raw RX data
log_data: Dict[str, Any] = {
"raw_hex": data[1:].hex()
}
# First byte is SNR (signed byte, multiplied by 4)
if len(data) > 1:
snr_byte = data[1]
# Convert to signed value
snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0
log_data["snr"] = snr
# Second byte is RSSI (signed byte)
if len(data) > 2:
rssi_byte = data[2]
# Convert to signed value
rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256
log_data["rssi"] = rssi
# Remaining bytes are the raw data payload
if len(data) > 3:
log_data["payload"] = data[3:].hex()
log_data["payload_length"] = len(data) - 3
attributes = {
"pubkey_prefix": log_data["raw_hex"],
}
# Dispatch as RF log data
await self.dispatcher.dispatch(Event(EventType.RX_LOG_DATA, log_data, attributes))
elif packet_type_value == PacketType.TRACE_DATA.value:
logger.debug(f"Received trace data: {data.hex()}")
res = {}
# According to the source, format is:
# 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr
reserved = data[1]
path_len = data[2]
flags = data[3]
tag = int.from_bytes(data[4:8], byteorder='little')
auth_code = int.from_bytes(data[8:12], byteorder='little')
# Initialize result
res["tag"] = tag
res["auth"] = auth_code
res["flags"] = flags
res["path_len"] = path_len
# Process path as array of objects with hash and SNR
path_nodes = []
if path_len > 0 and len(data) >= 12 + path_len*2 + 1:
# Extract path with hash and SNR pairs
for i in range(path_len):
node = {
"hash": f"{data[12+i]:02x}",
# SNR is stored as a signed byte representing SNR * 4
"snr": (data[12+path_len+i] if data[12+path_len+i] < 128 else data[12+path_len+i] - 256) / 4.0
}
path_nodes.append(node)
# Add the final node (our device) with its SNR
final_snr_byte = data[12+path_len*2]
final_snr = (final_snr_byte if final_snr_byte < 128 else final_snr_byte - 256) / 4.0
path_nodes.append({
"snr": final_snr
})
res["path"] = path_nodes
logger.debug(f"Parsed trace data: {res}")
attributes = {
"tag": res["tag"],
"auth_code": res["auth"],
}
await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes))
else:
logger.debug(f"Unhandled data received {data}")
logger.debug(f"Unhandled packet type: {packet_type_value}")

View file

@ -2,10 +2,11 @@
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import sys
import logging
import serial_asyncio
from meshcore import printerr
# Get logger
logger = logging.getLogger("meshcore")
class SerialConnection:
def __init__(self, port, baudrate):
@ -13,6 +14,7 @@ class SerialConnection:
self.baudrate = baudrate
self.frame_started = False
self.frame_size = 0
self.transport = None
self.header = b""
self.inframe = b""
@ -22,21 +24,21 @@ class SerialConnection:
def connection_made(self, transport):
self.cx.transport = transport
# printerr('port opened')
transport.serial.rts = False # You can manipulate Serial object via transport
logger.debug('port opened')
if isinstance(transport, serial_asyncio.SerialTransport) and transport.serial:
transport.serial.rts = False # You can manipulate Serial object via transport
def data_received(self, data):
# printerr('data received')
self.cx.handle_rx(data)
def connection_lost(self, exc):
printerr('port closed')
logger.info('port closed')
def pause_writing(self):
printerr('pause writing')
logger.debug('pause writing')
def resume_writing(self):
printerr('resume writing')
logger.debug('resume writing')
async def connect(self):
"""
@ -47,11 +49,11 @@ class SerialConnection:
loop, lambda: self.MCSerialClientProtocol(self),
self.port, baudrate=self.baudrate)
printerr("Serial Connexion started")
logger.info("Serial Connection started")
return self.port
def set_mc(self, mc) :
self.mc = mc
def set_reader(self, reader) :
self.reader = reader
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
@ -69,8 +71,8 @@ class SerialConnection:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
if not self.reader is None:
asyncio.create_task(self.reader.handle_rx(self.inframe))
self.frame_started = False
self.header = b""
self.inframe = b""
@ -78,7 +80,10 @@ class SerialConnection:
self.handle_rx(data[self.frame_size-framelen:])
async def send(self, data):
if not self.transport:
logger.error("Transport not connected, cannot send data")
return
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
# printerr(f"sending pkt : {pkt}")
self.transport.write(pkt)
logger.debug(f"sending pkt : {pkt}")
self.transport.write(pkt)

View file

@ -2,9 +2,10 @@
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import sys
import logging
from meshcore import printerr
# Get logger
logger = logging.getLogger("meshcore")
class TCPConnection:
def __init__(self, host, port):
@ -16,21 +17,23 @@ class TCPConnection:
self.header = b""
self.inframe = b""
class MCClientProtocol:
class MCClientProtocol(asyncio.Protocol):
def __init__(self, cx):
self.cx = cx
def connection_made(self, transport):
self.cx.transport = transport
logger.debug('connection established')
def data_received(self, data):
logger.debug('data received')
self.cx.handle_rx(data)
def error_received(self, exc):
printerr(f'Error received: {exc}')
logger.error(f'Error received: {exc}')
def connection_lost(self, exc):
printerr('The server closed the connection')
logger.info('The server closed the connection')
async def connect(self):
"""
@ -41,11 +44,11 @@ class TCPConnection:
lambda: self.MCClientProtocol(self),
self.host, self.port)
printerr("TCP Connexion started")
logger.info("TCP Connection started")
return self.host
def set_mc(self, mc) :
self.mc = mc
def set_reader(self, reader) :
self.reader = reader
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
@ -63,8 +66,8 @@ class TCPConnection:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
if not self.reader is None:
asyncio.create_task(self.reader.handle_rx(self.inframe))
self.frame_started = False
self.header = b""
self.inframe = b""
@ -72,6 +75,10 @@ class TCPConnection:
self.handle_rx(data[self.frame_size-framelen:])
async def send(self, data):
if not self.transport:
logger.error("Transport not connected, cannot send data")
return
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
logger.debug(f"sending pkt : {pkt}")
self.transport.write(pkt)