| examples | ||
| src/meshcore | ||
| tests | ||
| .gitignore | ||
| LICENSE | ||
| pyproject.toml | ||
| pytest.ini | ||
| README.md | ||
Python MeshCore
Python library for interacting with MeshCore companion radio nodes.
Installation
pip install meshcore
Quick Start
Connect to your device and send a message:
import asyncio
from meshcore import MeshCore, EventType
async def main():
# Connect to your device
meshcore = await MeshCore.create_serial("/dev/ttyUSB0")
# Get your contacts
result = await meshcore.commands.get_contacts()
if result.type == EventType.ERROR:
print(f"Error getting contacts: {result.payload}")
return
contacts = result.payload
print(f"Found {len(contacts)} contacts")
# Send a message to the first contact
if contacts:
# Get the first contact
contact = next(iter(contacts.items()))[1]
# Pass the contact object directly to send_msg
result = await meshcore.commands.send_msg(contact, "Hello from Python!")
if result.type == EventType.ERROR:
print(f"Error sending message: {result.payload}")
else:
print("Message sent successfully!")
await meshcore.disconnect()
asyncio.run(main())
Development Setup
To set up for development:
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install in development mode
pip install -e .
# Run examples
python examples/pubsub_example.py -p /dev/ttyUSB0
Usage Guide
Command Return Values
All command methods in MeshCore return an Event object that contains both the event type and its payload. This allows for consistent error handling and type checking:
# Command result structure
result = await meshcore.commands.some_command()
# Check if the command was successful or resulted in an error
if result.type == EventType.ERROR:
# Handle error case
print(f"Command failed: {result.payload}")
else:
# Handle success case - the event type will be specific to the command
# (e.g., EventType.DEVICE_INFO, EventType.CONTACTS, EventType.MSG_SENT)
print(f"Command succeeded with event type: {result.type}")
# Access the payload data
data = result.payload
Common error handling pattern:
result = await meshcore.commands.send_msg(contact, "Hello!")
if result.type == EventType.ERROR:
print(f"Error sending message: {result.payload}")
else:
# For send_msg, a successful result will have type EventType.MSG_SENT
print(f"Message sent with expected ack: {result.payload['expected_ack'].hex()}")
Connecting to Your Device
Connect via Serial, BLE, or TCP:
# Serial connection
meshcore = await MeshCore.create_serial("/dev/ttyUSB0", 115200, debug=True)
# BLE connection (scans for devices if address not provided)
meshcore = await MeshCore.create_ble("12:34:56:78:90:AB")
# TCP connection
meshcore = await MeshCore.create_tcp("192.168.1.100", 4000)
Auto-Reconnect and Connection Events
Enable automatic reconnection when connections are lost:
# Enable auto-reconnect with custom retry limits
meshcore = await MeshCore.create_tcp(
"192.168.1.100", 4000,
auto_reconnect=True,
max_reconnect_attempts=5
)
# Subscribe to connection events
async def on_connected(event):
print(f"Connected: {event.payload}")
if event.payload.get('reconnected'):
print("Successfully reconnected!")
async def on_disconnected(event):
print(f"Disconnected: {event.payload['reason']}")
if event.payload.get('max_attempts_exceeded'):
print("Max reconnection attempts exceeded")
meshcore.subscribe(EventType.CONNECTED, on_connected)
meshcore.subscribe(EventType.DISCONNECTED, on_disconnected)
# Check connection status
if meshcore.is_connected:
print("Device is currently connected")
Auto-reconnect features:
- Exponential backoff (1s, 2s, 4s, 8s max delay)
- Configurable retry limits (default: 3 attempts)
- Automatic disconnect detection (especially useful for TCP connections)
- Connection events with detailed information
Using Commands (Synchronous Style)
Send commands and wait for responses:
# Get device information
result = await meshcore.commands.send_device_query()
if result.type == EventType.ERROR:
print(f"Error getting device info: {result.payload}")
else:
print(f"Device model: {result.payload['model']}")
# Get list of contacts
result = await meshcore.commands.get_contacts()
if result.type == EventType.ERROR:
print(f"Error getting contacts: {result.payload}")
else:
contacts = result.payload
for contact_id, contact in contacts.items():
print(f"Contact: {contact['adv_name']} ({contact_id})")
# Send a message (destination key in bytes)
result = await meshcore.commands.send_msg(dst_key, "Hello!")
if result.type == EventType.ERROR:
print(f"Error sending message: {result.payload}")
# Setting device parameters
result = await meshcore.commands.set_name("My Device")
if result.type == EventType.ERROR:
print(f"Error setting name: {result.payload}")
result = await meshcore.commands.set_tx_power(20) # Set transmit power
if result.type == EventType.ERROR:
print(f"Error setting TX power: {result.payload}")
Finding Contacts
Easily find contacts by name or key:
# Find a contact by name
contact = meshcore.get_contact_by_name("Bob's Radio")
if contact:
print(f"Found Bob at: {contact['adv_lat']}, {contact['adv_lon']}")
# Find by partial key prefix
contact = meshcore.get_contact_by_key_prefix("a1b2c3")
Event-Based Programming (Asynchronous Style)
Subscribe to events to handle them asynchronously:
# Subscribe to incoming messages
async def handle_message(event):
data = event.payload
print(f"Message from {data['pubkey_prefix']}: {data['text']}")
subscription = meshcore.subscribe(EventType.CONTACT_MSG_RECV, handle_message)
# Subscribe to advertisements
async def handle_advert(event):
print("Advertisement detected!")
meshcore.subscribe(EventType.ADVERTISEMENT, handle_advert)
# When done, unsubscribe
meshcore.unsubscribe(subscription)
Filtering Events by Attributes
Filter events based on their attributes to handle only specific ones:
# Subscribe only to messages from a specific contact
async def handle_specific_contact_messages(event):
print(f"Message from Alice: {event.payload['text']}")
contact = meshcore.get_contact_by_name("Alice")
if contact:
alice_subscription = meshcore.subscribe(
EventType.CONTACT_MSG_RECV,
handle_specific_contact_messages,
attribute_filters={"pubkey_prefix": contact["public_key"][:12]}
)
# Send a message and wait for its specific acknowledgment
async def send_and_confirm_message(meshcore, dst_key, message):
# Send the message and get information about the sent message
sent_result = await meshcore.commands.send_msg(dst_key, message)
# Extract the expected acknowledgment code from the message sent event
if sent_result.type == EventType.ERROR:
print(f"Error sending message: {sent_result.payload}")
return False
expected_ack = sent_result.payload["expected_ack"].hex()
print(f"Message sent, waiting for ack with code: {expected_ack}")
# Wait specifically for this acknowledgment
result = await meshcore.wait_for_event(
EventType.ACK,
attribute_filters={"code": expected_ack},
timeout=10.0
)
if result:
print("Message confirmed delivered!")
return True
else:
print("Message delivery confirmation timed out")
return False
Hybrid Approach (Commands + Events)
Combine command-based and event-based styles:
import asyncio
async def main():
# Connect to device
meshcore = await MeshCore.create_serial("/dev/ttyUSB0")
# Set up event handlers
async def handle_ack(event):
print("Message acknowledged!")
async def handle_battery(event):
print(f"Battery level: {event.payload}%")
# Subscribe to events
meshcore.subscribe(EventType.ACK, handle_ack)
meshcore.subscribe(EventType.BATTERY, handle_battery)
# Create background task for battery checking
async def check_battery_periodically():
while True:
# Send command (returns battery level)
result = await meshcore.commands.get_bat()
if result.type == EventType.ERROR:
print(f"Error checking battery: {result.payload}")
else:
print(f"Battery level: {result.payload.get('level', 'unknown')}%")
await asyncio.sleep(60) # Wait 60 seconds between checks
# Start the background task
battery_task = asyncio.create_task(check_battery_periodically())
# Send manual command and wait for response
await meshcore.commands.send_advert(flood=True)
try:
# Keep the main program running
await asyncio.sleep(float('inf'))
except asyncio.CancelledError:
# Clean up when program ends
battery_task.cancel()
await meshcore.disconnect()
# Run the program
asyncio.run(main())
Auto-Fetching Messages
Let the library automatically fetch incoming messages:
# Start auto-fetching messages
await meshcore.start_auto_message_fetching()
# Just subscribe to message events - the library handles fetching
async def on_message(event):
print(f"New message: {event.payload['text']}")
meshcore.subscribe(EventType.CONTACT_MSG_RECV, on_message)
# When done
await meshcore.stop_auto_message_fetching()
Debug Mode
Enable debug logging for troubleshooting:
# Enable debug mode when creating the connection
meshcore = await MeshCore.create_serial("/dev/ttyUSB0", debug=True)
This logs detailed information about commands sent and events received.
Common Examples
Sending Messages to Contacts
Commands that require a destination (send_msg, send_login, send_statusreq, etc.) now accept either:
- A string with the hex representation of a public key
- A contact object with a "public_key" field
- Bytes object (for backward compatibility)
# Get contacts and send to a specific one
result = await meshcore.commands.get_contacts()
if result.type == EventType.ERROR:
print(f"Error getting contacts: {result.payload}")
else:
contacts = result.payload
for key, contact in contacts.items():
if contact["adv_name"] == "Alice":
# Option 1: Pass the contact object directly
result = await meshcore.commands.send_msg(contact, "Hello Alice!")
if result.type == EventType.ERROR:
print(f"Error sending message: {result.payload}")
# Option 2: Use the public key string
result = await meshcore.commands.send_msg(contact["public_key"], "Hello again Alice!")
if result.type == EventType.ERROR:
print(f"Error sending message: {result.payload}")
# Option 3 (backward compatible): Convert the hex key to bytes
dst_key = bytes.fromhex(contact["public_key"])
result = await meshcore.commands.send_msg(dst_key, "Hello once more Alice!")
if result.type == EventType.ERROR:
print(f"Error sending message: {result.payload}")
break
# You can also directly use a contact found by name
contact = meshcore.get_contact_by_name("Bob")
if contact:
result = await meshcore.commands.send_msg(contact, "Hello Bob!")
if result.type == EventType.ERROR:
print(f"Error sending message: {result.payload}")
Monitoring Channel Messages
# Subscribe to channel messages
async def channel_handler(event):
msg = event.payload
print(f"Channel {msg['channel_idx']}: {msg['text']}")
meshcore.subscribe(EventType.CHANNEL_MSG_RECV, channel_handler)
Examples in the Repo
Check the examples/ directory for more:
pubsub_example.py: Event subscription system with auto-fetchingserial_infos.py: Quick device info retrievalserial_msg.py: Message sending and receivingble_t1000_infos.py: BLE connections