From c42c30c25d577d2ca541abf20b72ad2be5cf1756 Mon Sep 17 00:00:00 2001 From: agessaman Date: Sun, 14 Dec 2025 22:23:33 -0800 Subject: [PATCH] implement device signing binary frames and sign() command for on-device signing. --- examples/ble_sign_example.py | 96 +++++++++++++++++++++++++++++++++ src/meshcore/commands/device.py | 55 +++++++++++++++++++ src/meshcore/events.py | 2 + src/meshcore/reader.py | 14 +++++ 4 files changed, 167 insertions(+) create mode 100644 examples/ble_sign_example.py diff --git a/examples/ble_sign_example.py b/examples/ble_sign_example.py new file mode 100644 index 0000000..f8808f5 --- /dev/null +++ b/examples/ble_sign_example.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Example: Sign arbitrary data with a MeshCore device over BLE. + +The device performs signing on its private key via the CMD_SIGN_* flow: +- sign_start(): initializes a signing session and returns max buffer size (8KB on firmware) +- sign_data(): streams one or more data chunks +- sign_finish(): returns the signature +""" + +import argparse +import asyncio +from pathlib import Path +import sys +from textwrap import wrap + +# Ensure local src/ is on path when running from repo root +repo_root = Path(__file__).resolve().parents[1] +src_path = repo_root / "src" +if src_path.exists(): + sys.path.insert(0, str(src_path)) + +from meshcore import MeshCore, EventType + + +async def main(): + parser = argparse.ArgumentParser( + description="Sign data using a MeshCore device over BLE" + ) + parser.add_argument( + "-a", + "--addr", + help="BLE address of the device (optional, will scan if not provided)", + ) + parser.add_argument( + "-p", + "--pin", + help="PIN for BLE pairing (optional)", + ) + parser.add_argument( + "-d", + "--data", + default="Hello from meshcore_py!", + help="ASCII data to sign (will be UTF-8 encoded)", + ) + parser.add_argument( + "--chunk-size", + type=int, + default=512, + help="Chunk size to stream to the device (bytes)", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + args = parser.parse_args() + + meshcore = None + try: + print("Connecting to MeshCore device...") + meshcore = await MeshCore.create_ble(address=args.addr, pin=args.pin, debug=args.debug) + print("✅ Connected.") + + data_bytes = args.data.encode("utf-8") + sig_evt = await meshcore.commands.sign(data_bytes, chunk_size=max(1, args.chunk_size)) + if sig_evt.type == EventType.ERROR: + raise RuntimeError(f"sign failed: {sig_evt.payload}") + signature = sig_evt.payload.get("signature", b"") + print(f"Signature ({len(signature)} bytes):") + # Pretty-print hex in 32-byte lines + hex_sig = signature.hex() + for line in wrap(hex_sig, 64): + print(line) + + print("\nSigning flow completed!") + + except ConnectionError as e: + print(f"❌ Failed to connect: {e}") + return 1 + except Exception as e: + print(f"❌ Error: {e}") + return 1 + finally: + if meshcore: + await meshcore.disconnect() + print("Disconnected.") + + +if __name__ == "__main__": + try: + sys.exit(asyncio.run(main())) + except KeyboardInterrupt: + print("\nInterrupted by user") + sys.exit(1) + diff --git a/src/meshcore/commands/device.py b/src/meshcore/commands/device.py index a2d8f29..d9654cd 100644 --- a/src/meshcore/commands/device.py +++ b/src/meshcore/commands/device.py @@ -211,6 +211,61 @@ class DeviceCommands(CommandHandlerBase): data = b"\x18" + key return await self.send(data, [EventType.OK, EventType.ERROR]) + async def sign_start(self) -> Event: + """ + Initialize a signing session on the device. + + Returns the available buffer size for signing data. + """ + logger.debug("Starting signing session on device") + return await self.send(b"\x21", [EventType.SIGN_START, EventType.ERROR]) + + async def sign_data(self, chunk: bytes) -> Event: + """ + Send a chunk of data to be included in the device-side signature. + + The device accepts up to 8KB total across chunks; caller is responsible + for chunking appropriately. + """ + if not isinstance(chunk, (bytes, bytearray)): + raise TypeError("chunk must be bytes-like") + logger.debug(f"Sending signing data chunk ({len(chunk)} bytes)") + return await self.send(b"\x22" + bytes(chunk), [EventType.OK, EventType.ERROR]) + + async def sign_finish(self) -> Event: + """ + Finalize signing and retrieve the signature produced by the device. + """ + logger.debug("Finalizing signing session on device") + return await self.send(b"\x23", [EventType.SIGNATURE, EventType.ERROR]) + + async def sign(self, data: bytes, chunk_size: int = 512) -> Event: + """ + Convenience: sign the given data on device, handling chunking. + + Returns the signature event or an error event. + """ + if not isinstance(data, (bytes, bytearray)): + raise TypeError("data must be bytes-like") + if chunk_size <= 0: + raise ValueError("chunk_size must be > 0") + + start_evt = await self.sign_start() + if start_evt.type == EventType.ERROR: + return start_evt + + max_len = start_evt.payload.get("max_length", 0) + if max_len and len(data) > max_len: + return Event(EventType.ERROR, {"reason": "data_too_large", "max_length": max_len, "len": len(data)}) + + for idx in range(0, len(data), chunk_size): + chunk = data[idx : idx + chunk_size] + evt = await self.sign_data(chunk) + if evt.type == EventType.ERROR: + return evt + + return await self.sign_finish() + async def get_stats_core(self) -> Event: logger.debug("Getting core statistics") # CMD_GET_STATS (56) + STATS_TYPE_CORE (0) diff --git a/src/meshcore/events.py b/src/meshcore/events.py index 56a5611..d3b3820 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -50,6 +50,8 @@ class EventType(Enum): CONTROL_DATA = "control_data" DISCOVER_RESPONSE = "discover_response" NEIGHBOURS_RESPONSE = "neighbours_response" + SIGN_START = "sign_start" + SIGNATURE = "signature" # Command response types OK = "command_ok" diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 69fb669..c65a8eb 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -710,6 +710,20 @@ class MessageReader: else: logger.error(f"Invalid private key response length: {len(data)}") + elif packet_type_value == PacketType.SIGN_START.value: + logger.debug(f"Received sign start response: {data.hex()}") + # Payload: 1 reserved byte, 4-byte max length + dbuf.read(1) + max_len = int.from_bytes(dbuf.read(4), "little") + res = {"max_length": max_len} + await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) + + elif packet_type_value == PacketType.SIGNATURE.value: + logger.debug(f"Received signature: {data.hex()}") + signature = dbuf.read() + res = {"signature": signature} + await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) + elif packet_type_value == PacketType.DISABLED.value: logger.debug("Received disabled response") res = {"reason": "private_key_export_disabled"}