mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-04-20 22:13:49 +00:00
commit
3e70d009bc
6 changed files with 221 additions and 1 deletions
|
|
@ -468,6 +468,8 @@ All events in MeshCore are represented by the `EventType` enum. These events are
|
|||
| `LOG_DATA` | `"log_data"` | Generic log data | Various log information |
|
||||
| **Binary Protocol Events** |||
|
||||
| `BINARY_RESPONSE` | `"binary_response"` | Generic binary response | Tag and hex data |
|
||||
| `SIGN_START` | `"sign_start"` | Start of an on-device signing session | Maximum buffer size (bytes) for data to sign |
|
||||
| `SIGNATURE` | `"signature"` | Resulting on-device signature | Raw signature bytes |
|
||||
| **Authentication Events** |||
|
||||
| `LOGIN_SUCCESS` | `"login_success"` | Successful login | Permissions, admin status, pubkey prefix |
|
||||
| `LOGIN_FAILED` | `"login_failed"` | Failed login attempt | Pubkey prefix |
|
||||
|
|
@ -564,6 +566,9 @@ All commands are async methods that return `Event` objects. Commands are organiz
|
|||
| `req_telemetry(contact, timeout=0)` | `contact: dict, timeout: float` | `TELEMETRY_RESPONSE` | Get telemetry via binary protocol |
|
||||
| `req_mma(contact, start, end, timeout=0)` | `contact: dict, start: int, end: int, timeout: float` | `MMA_RESPONSE` | Get historical telemetry data |
|
||||
| `req_acl(contact, timeout=0)` | `contact: dict, timeout: float` | `ACL_RESPONSE` | Get access control list |
|
||||
| `sign_start()` | None | `SIGN_START` | Begin a signing session; returns maximum buffer size for data to sign |
|
||||
| `sign_data(chunk)` | `chunk: bytes` | `OK` | Append a data chunk to the current signing session (can be called multiple times) |
|
||||
| `sign_finish()` | None | `SIGNATURE` | Finalize signing and return the signature for all accumulated data |
|
||||
|
||||
### Helper Methods
|
||||
|
||||
|
|
@ -571,6 +576,7 @@ All commands are async methods that return `Event` objects. Commands are organiz
|
|||
|--------|---------|-------------|
|
||||
| `get_contact_by_name(name)` | `dict/None` | Find contact by advertisement name |
|
||||
| `get_contact_by_key_prefix(prefix)` | `dict/None` | Find contact by partial public key |
|
||||
| `sign(data, chunk_size=512)` | `Event` (`SIGNATURE`/`ERROR`) | High-level helper to sign arbitrary data on-device, handling chunking for you |
|
||||
| `is_connected` | `bool` | Check if device is currently connected |
|
||||
| `subscribe(event_type, callback, filters=None)` | `Subscription` | Subscribe to events with optional filtering |
|
||||
| `unsubscribe(subscription)` | None | Remove event subscription |
|
||||
|
|
|
|||
136
examples/ble_sign_example.py
Normal file
136
examples/ble_sign_example.py
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Example: Sign arbitrary data with a MeshCore device over BLE.
|
||||
|
||||
The device performs signing on its private key via the CMD_SIGN_* flow:
|
||||
- sign_start(): initializes a signing session and returns max buffer size (8KB on firmware)
|
||||
- sign_data(): streams one or more data chunks
|
||||
- sign_finish(): returns the signature
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from textwrap import wrap
|
||||
|
||||
# Ensure local src/ is on path when running from repo root
|
||||
repo_root = Path(__file__).resolve().parents[1]
|
||||
src_path = repo_root / "src"
|
||||
if src_path.exists():
|
||||
sys.path.insert(0, str(src_path))
|
||||
|
||||
from meshcore import MeshCore, EventType
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Sign data using a MeshCore device over BLE"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-a",
|
||||
"--addr",
|
||||
help="BLE address of the device (optional, will scan if not provided)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--pin",
|
||||
help="PIN for BLE pairing (optional)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--data",
|
||||
default="Hello from meshcore_py!",
|
||||
help="ASCII data to sign (will be UTF-8 encoded)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--chunk-size",
|
||||
type=int,
|
||||
default=120,
|
||||
help="Chunk size to stream to the device (bytes). Default 120 for BLE (frames under 128 bytes work better). For serial/TCP, larger values (e.g., 512) work fine.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeout",
|
||||
type=float,
|
||||
default=None,
|
||||
help="Timeout for sign_finish operation in seconds (default: 15s minimum, longer for large data like JWT tokens)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--debug",
|
||||
action="store_true",
|
||||
help="Enable debug logging",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
meshcore = None
|
||||
try:
|
||||
print("Connecting to MeshCore device...")
|
||||
meshcore = await MeshCore.create_ble(address=args.addr, pin=args.pin, debug=args.debug)
|
||||
print("✅ Connected.")
|
||||
|
||||
data_bytes = args.data.encode("utf-8")
|
||||
print(f"Data to sign: {len(data_bytes)} bytes")
|
||||
if args.debug:
|
||||
print(f"Data hex (first 100 bytes): {data_bytes[:100].hex()}")
|
||||
|
||||
sig_evt = await meshcore.commands.sign(data_bytes, chunk_size=max(1, args.chunk_size), timeout=args.timeout)
|
||||
if sig_evt.type == EventType.ERROR:
|
||||
raise RuntimeError(f"sign failed: {sig_evt.payload}")
|
||||
signature = sig_evt.payload.get("signature", b"")
|
||||
print(f"Signature ({len(signature)} bytes):")
|
||||
# Pretty-print hex in 32-byte lines
|
||||
hex_sig = signature.hex()
|
||||
for line in wrap(hex_sig, 64):
|
||||
print(line)
|
||||
|
||||
# Verify signature with device's public key
|
||||
try:
|
||||
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
||||
# Get device's public key from self_info
|
||||
self_info = meshcore.self_info
|
||||
if not self_info or "public_key" not in self_info:
|
||||
print("\n⚠️ Could not get device public key for verification")
|
||||
else:
|
||||
pubkey_hex = self_info["public_key"]
|
||||
pubkey_bytes = bytes.fromhex(pubkey_hex)
|
||||
|
||||
try:
|
||||
public_key = Ed25519PublicKey.from_public_bytes(pubkey_bytes)
|
||||
public_key.verify(signature, data_bytes)
|
||||
print("\n✅ Signature verification: SUCCESS (signature is valid)")
|
||||
except InvalidSignature:
|
||||
print("\n❌ Signature verification: FAILED (signature is invalid)")
|
||||
if args.debug:
|
||||
print(f" Public key: {pubkey_hex}")
|
||||
print(f" Data length: {len(data_bytes)} bytes")
|
||||
print(f" Signature length: {len(signature)} bytes")
|
||||
print(f" Data (first 50 bytes): {data_bytes[:50].hex()}")
|
||||
except Exception as e:
|
||||
print(f"\n⚠️ Signature verification error: {e}")
|
||||
except ImportError:
|
||||
print("\n⚠️ cryptography library not available - skipping signature verification")
|
||||
print(" Install with: pip install cryptography")
|
||||
|
||||
print("\nSigning flow completed!")
|
||||
|
||||
except ConnectionError as e:
|
||||
print(f"❌ Failed to connect: {e}")
|
||||
return 1
|
||||
except Exception as e:
|
||||
print(f"❌ Error: {e}")
|
||||
return 1
|
||||
finally:
|
||||
if meshcore:
|
||||
await meshcore.disconnect()
|
||||
print("Disconnected.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
sys.exit(asyncio.run(main()))
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user")
|
||||
sys.exit(1)
|
||||
|
||||
|
|
@ -122,7 +122,7 @@ class CommandHandlerBase:
|
|||
# Create an error event when no event is received
|
||||
return Event(EventType.ERROR, {"reason": "no_event_received"})
|
||||
except asyncio.TimeoutError:
|
||||
logger.debug(f"Command timed out {data}")
|
||||
logger.debug(f"Command timed out waiting for events {expected_events}")
|
||||
return Event(EventType.ERROR, {"reason": "timeout"})
|
||||
except Exception as e:
|
||||
logger.debug(f"Command error: {e}")
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from hashlib import sha256
|
||||
from typing import Optional
|
||||
|
|
@ -211,6 +212,67 @@ class DeviceCommands(CommandHandlerBase):
|
|||
data = b"\x18" + key
|
||||
return await self.send(data, [EventType.OK, EventType.ERROR])
|
||||
|
||||
async def sign_start(self) -> Event:
|
||||
logger.debug("Starting signing session on device")
|
||||
return await self.send(b"\x21", [EventType.SIGN_START, EventType.ERROR])
|
||||
|
||||
async def sign_data(self, chunk: bytes) -> Event:
|
||||
if not isinstance(chunk, (bytes, bytearray)):
|
||||
raise TypeError("chunk must be bytes-like")
|
||||
logger.debug(f"Sending signing data chunk ({len(chunk)} bytes)")
|
||||
data = b"\x22" + bytes(chunk)
|
||||
result = await self.send(data, [EventType.OK, EventType.ERROR], timeout=5.0)
|
||||
|
||||
# If we got an error (not just timeout), return it immediately
|
||||
if result.type == EventType.ERROR:
|
||||
# If it's a timeout/no_event, log a warning but continue - the data may have been received
|
||||
if result.payload.get("reason") in ("timeout", "no_event_received"):
|
||||
logger.warning(
|
||||
f"sign_data OK response not received (timeout), but continuing - "
|
||||
f"data may have been processed by device"
|
||||
)
|
||||
return Event(EventType.OK, {})
|
||||
# For actual errors (bad state, table full, etc.), return the error
|
||||
return result
|
||||
|
||||
return result
|
||||
|
||||
async def sign_finish(self, timeout: Optional[float] = None, data_size: int = 0) -> Event:
|
||||
logger.debug("Finalizing signing session on device")
|
||||
if timeout is None:
|
||||
base_timeout = max(self.default_timeout * 3, 15.0)
|
||||
size_bonus = min(data_size / 2048.0, 5.0)
|
||||
timeout = base_timeout + size_bonus
|
||||
logger.debug(f"sign_finish using timeout={timeout:.1f} seconds (data_size={data_size} bytes)")
|
||||
return await self.send(b"\x23", [EventType.SIGNATURE, EventType.ERROR], timeout=timeout)
|
||||
|
||||
async def sign(self, data: bytes, chunk_size: int = 120, timeout: Optional[float] = None) -> Event:
|
||||
if not isinstance(data, (bytes, bytearray)):
|
||||
raise TypeError("data must be bytes-like")
|
||||
if chunk_size <= 0:
|
||||
raise ValueError("chunk_size must be > 0")
|
||||
|
||||
start_evt = await self.sign_start()
|
||||
if start_evt.type == EventType.ERROR:
|
||||
return start_evt
|
||||
|
||||
max_len = start_evt.payload.get("max_length", 0)
|
||||
if max_len and len(data) > max_len:
|
||||
return Event(EventType.ERROR, {"reason": "data_too_large", "max_length": max_len, "len": len(data)})
|
||||
|
||||
for idx in range(0, len(data), chunk_size):
|
||||
chunk = data[idx : idx + chunk_size]
|
||||
chunk_num = (idx // chunk_size) + 1
|
||||
total_chunks = (len(data) + chunk_size - 1) // chunk_size
|
||||
logger.debug(f"Sending chunk {chunk_num}/{total_chunks} ({len(chunk)} bytes)")
|
||||
evt = await self.sign_data(chunk)
|
||||
if evt.type == EventType.ERROR:
|
||||
logger.error(f"Error sending chunk {chunk_num}/{total_chunks}: {evt.payload}")
|
||||
return evt
|
||||
logger.debug(f"Chunk {chunk_num}/{total_chunks} sent successfully")
|
||||
|
||||
return await self.sign_finish(timeout=timeout, data_size=len(data))
|
||||
|
||||
async def get_stats_core(self) -> Event:
|
||||
logger.debug("Getting core statistics")
|
||||
# CMD_GET_STATS (56) + STATS_TYPE_CORE (0)
|
||||
|
|
|
|||
|
|
@ -50,6 +50,8 @@ class EventType(Enum):
|
|||
CONTROL_DATA = "control_data"
|
||||
DISCOVER_RESPONSE = "discover_response"
|
||||
NEIGHBOURS_RESPONSE = "neighbours_response"
|
||||
SIGN_START = "sign_start"
|
||||
SIGNATURE = "signature"
|
||||
|
||||
# Command response types
|
||||
OK = "command_ok"
|
||||
|
|
|
|||
|
|
@ -710,6 +710,20 @@ class MessageReader:
|
|||
else:
|
||||
logger.error(f"Invalid private key response length: {len(data)}")
|
||||
|
||||
elif packet_type_value == PacketType.SIGN_START.value:
|
||||
logger.debug(f"Received sign start response: {data.hex()}")
|
||||
# Payload: 1 reserved byte, 4-byte max length
|
||||
dbuf.read(1)
|
||||
max_len = int.from_bytes(dbuf.read(4), "little")
|
||||
res = {"max_length": max_len}
|
||||
await self.dispatcher.dispatch(Event(EventType.SIGN_START, res))
|
||||
|
||||
elif packet_type_value == PacketType.SIGNATURE.value:
|
||||
logger.debug(f"Received signature: {data.hex()}")
|
||||
signature = dbuf.read()
|
||||
res = {"signature": signature}
|
||||
await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res))
|
||||
|
||||
elif packet_type_value == PacketType.DISABLED.value:
|
||||
logger.debug("Received disabled response")
|
||||
res = {"reason": "private_key_export_disabled"}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue