Update BLE signing example to use a default chunk size of 120 bytes and add signature verification with the device's public key. Enhance debug output for data length and signature verification results.

This commit is contained in:
agessaman 2025-12-15 21:29:12 -08:00
parent 1ecc1d8055
commit 45c265f9c9
2 changed files with 57 additions and 63 deletions

View file

@ -46,8 +46,8 @@ async def main():
parser.add_argument(
"--chunk-size",
type=int,
default=512,
help="Chunk size to stream to the device (bytes)",
default=120,
help="Chunk size to stream to the device (bytes). Default 120 for BLE (frames under 128 bytes work better). For serial/TCP, larger values (e.g., 512) work fine.",
)
parser.add_argument(
"--timeout",
@ -69,6 +69,10 @@ async def main():
print("✅ Connected.")
data_bytes = args.data.encode("utf-8")
print(f"Data to sign: {len(data_bytes)} bytes")
if args.debug:
print(f"Data hex (first 100 bytes): {data_bytes[:100].hex()}")
sig_evt = await meshcore.commands.sign(data_bytes, chunk_size=max(1, args.chunk_size), timeout=args.timeout)
if sig_evt.type == EventType.ERROR:
raise RuntimeError(f"sign failed: {sig_evt.payload}")
@ -79,6 +83,36 @@ async def main():
for line in wrap(hex_sig, 64):
print(line)
# Verify signature with device's public key
try:
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from cryptography.exceptions import InvalidSignature
# Get device's public key from self_info
self_info = meshcore.self_info
if not self_info or "public_key" not in self_info:
print("\n⚠️ Could not get device public key for verification")
else:
pubkey_hex = self_info["public_key"]
pubkey_bytes = bytes.fromhex(pubkey_hex)
try:
public_key = Ed25519PublicKey.from_public_bytes(pubkey_bytes)
public_key.verify(signature, data_bytes)
print("\n✅ Signature verification: SUCCESS (signature is valid)")
except InvalidSignature:
print("\n❌ Signature verification: FAILED (signature is invalid)")
if args.debug:
print(f" Public key: {pubkey_hex}")
print(f" Data length: {len(data_bytes)} bytes")
print(f" Signature length: {len(signature)} bytes")
print(f" Data (first 50 bytes): {data_bytes[:50].hex()}")
except Exception as e:
print(f"\n⚠️ Signature verification error: {e}")
except ImportError:
print("\n⚠️ cryptography library not available - skipping signature verification")
print(" Install with: pip install cryptography")
print("\nSigning flow completed!")
except ConnectionError as e:

View file

@ -1,3 +1,4 @@
import asyncio
import logging
from hashlib import sha256
from typing import Optional
@ -212,86 +213,40 @@ class DeviceCommands(CommandHandlerBase):
return await self.send(data, [EventType.OK, EventType.ERROR])
async def sign_start(self) -> Event:
"""
Initialize a signing session on the device.
Returns the available buffer size for signing data.
"""
logger.debug("Starting signing session on device")
return await self.send(b"\x21", [EventType.SIGN_START, EventType.ERROR])
async def sign_data(self, chunk: bytes) -> Event:
"""
Send a chunk of data to be included in the device-side signature.
The device accepts up to 8KB total across chunks; caller is responsible
for chunking appropriately.
Note: The device does not send OK responses for sign_data commands.
It accumulates data silently and only responds at sign_finish().
Errors will still be reported if they occur.
This is a fire-and-forget operation.
"""
if not isinstance(chunk, (bytes, bytearray)):
raise TypeError("chunk must be bytes-like")
logger.debug(f"Sending signing data chunk ({len(chunk)} bytes)")
data = b"\x22" + bytes(chunk)
result = await self.send(data, [EventType.OK, EventType.ERROR], timeout=5.0)
# The device doesn't send OK for sign_data - it's fire-and-forget until sign_finish.
# We send the data and return success immediately. If there's an error,
# it will be reported at sign_finish().
if not self.dispatcher:
raise RuntimeError("Dispatcher not set, cannot send commands")
# If we got an error (not just timeout), return it immediately
if result.type == EventType.ERROR:
# If it's a timeout/no_event, log a warning but continue - the data may have been received
if result.payload.get("reason") in ("timeout", "no_event_received"):
logger.warning(
f"sign_data OK response not received (timeout), but continuing - "
f"data may have been processed by device"
)
return Event(EventType.OK, {})
# For actual errors (bad state, table full, etc.), return the error
return result
if self._sender_func:
logger.debug(
f"Sending raw data: {(b'\x22' + bytes(chunk)).hex() if isinstance(chunk, bytes) else chunk}"
)
await self._sender_func(b"\x22" + bytes(chunk))
# Return success immediately - device accumulates data silently
return Event(EventType.OK, {})
return result
async def sign_finish(self, timeout: Optional[float] = None, data_size: int = 0) -> Event:
"""
Finalize signing and retrieve the signature produced by the device.
This operation performs the actual cryptographic signing on the device.
The timeout accounts for BLE communication delays and device processing overhead.
Args:
timeout: Timeout in seconds. If None, uses a calculated default based on
data size and default_timeout (minimum 15 seconds).
data_size: Size of data that was signed (in bytes). Used to calculate
appropriate timeout if timeout is None. Defaults to 0.
"""
logger.debug("Finalizing signing session on device")
# Use a longer default timeout for sign_finish since it performs crypto operations
# Ed25519 signing is fast, but we need time for BLE communication and device overhead
if timeout is None:
# Base timeout: at least 15 seconds, or 3x default (whichever is larger)
base_timeout = max(self.default_timeout * 3, 15.0)
# Add extra time for very large data (1 second per 2KB, capped at +5 seconds)
# This accounts for potential device processing delays with large payloads
size_bonus = min(data_size / 2048.0, 5.0)
timeout = base_timeout + size_bonus
logger.debug(f"sign_finish using timeout={timeout:.1f} seconds (data_size={data_size} bytes)")
return await self.send(b"\x23", [EventType.SIGNATURE, EventType.ERROR], timeout=timeout)
async def sign(self, data: bytes, chunk_size: int = 512, timeout: Optional[float] = None) -> Event:
"""
Convenience: sign the given data on device, handling chunking.
Args:
data: The data to sign
chunk_size: Size of chunks to send (default: 512 bytes)
timeout: Timeout for sign_finish operation in seconds. If None, uses
a longer default (15 seconds minimum) since cryptographic
operations can take time, especially for larger data like JWT tokens.
Returns:
The signature event or an error event.
"""
async def sign(self, data: bytes, chunk_size: int = 120, timeout: Optional[float] = None) -> Event:
if not isinstance(data, (bytes, bytearray)):
raise TypeError("data must be bytes-like")
if chunk_size <= 0:
@ -307,9 +262,14 @@ class DeviceCommands(CommandHandlerBase):
for idx in range(0, len(data), chunk_size):
chunk = data[idx : idx + chunk_size]
chunk_num = (idx // chunk_size) + 1
total_chunks = (len(data) + chunk_size - 1) // chunk_size
logger.debug(f"Sending chunk {chunk_num}/{total_chunks} ({len(chunk)} bytes)")
evt = await self.sign_data(chunk)
if evt.type == EventType.ERROR:
logger.error(f"Error sending chunk {chunk_num}/{total_chunks}: {evt.payload}")
return evt
logger.debug(f"Chunk {chunk_num}/{total_chunks} sent successfully")
return await self.sign_finish(timeout=timeout, data_size=len(data))