Add private key export support

- Add PRIVATE_KEY and DISABLED event types
- Add packet parsing for private key export responses
- Add export_private_key() method to DeviceCommands
- Add comprehensive unit tests
- Add BLE private key export example
- Update documentation with security notes
This commit is contained in:
agessaman 2025-10-12 18:23:32 -07:00
parent c697c960a6
commit e0f71482c6
6 changed files with 305 additions and 0 deletions

View file

@ -515,6 +515,8 @@ All commands are async methods that return `Event` objects. Commands are organiz
| **Device Actions** ||||
| `send_advert(flood=False)` | `flood: bool` | `OK` | Send advertisement (optionally flood network) |
| `reboot()` | None | None | Reboot device (no response expected) |
| **Security** ||||
| `export_private_key()` | None | `PRIVATE_KEY/DISABLED` | Export device private key (requires PIN auth & enabled firmware) |
#### Contact Commands (`meshcore.commands.*`)
@ -601,6 +603,33 @@ meshcore.subscribe(
)
```
## Private Key Export Example
Export your device's private key over BLE (requires PIN pairing and supported firmware):
```python
import asyncio
from meshcore import MeshCore, EventType
async def main():
# Connect to device over BLE with PIN authentication
meshcore = await MeshCore.create_ble(address="12:34:56:78:90:AB", pin="123456")
# Export the private key
result = await meshcore.commands.export_private_key()
if result.type == EventType.PRIVATE_KEY:
private_key = result.payload["private_key"] # 64 bytes
print(f"Private key: {private_key.hex()}")
elif result.type == EventType.DISABLED:
print("Private key export is disabled on this device.")
await meshcore.disconnect()
asyncio.run(main())
```
## Examples in the Repo
Check the `examples/` directory for more:
@ -609,5 +638,6 @@ Check the `examples/` directory for more:
- `serial_infos.py`: Quick device info retrieval
- `serial_msg.py`: Message sending and receiving
- `ble_pin_pairing_example.py`: BLE connection with PIN pairing
- `ble_private_key_export.py`: BLE private key export with PIN authentication
- `ble_t1000_infos.py`: BLE connections

View file

@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Example: BLE Private Key Export with MeshCore
This example demonstrates how to export the private key from a MeshCore device
using BLE with PIN-based pairing for enhanced security.
Note: This feature requires:
1. A MeshCore device running companion radio firmware
2. ENABLE_PRIVATE_KEY_EXPORT=1 compile-time flag enabled
3. Authenticated BLE connection with PIN
"""
import asyncio
import argparse
from meshcore import MeshCore
from meshcore.events import EventType
async def main():
parser = argparse.ArgumentParser(description="Export private key from MeshCore device via BLE")
parser.add_argument("-a", "--addr", help="BLE address of the device (optional, will scan if not provided)")
parser.add_argument("-p", "--pin", help="PIN for BLE pairing (required for private key export)")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
args = parser.parse_args()
if not args.pin:
print("❌ PIN is required for private key export. Use -p or --pin to specify it.")
return 1
try:
print("Connecting to MeshCore device...")
print(f"Using PIN for pairing: {args.pin}")
# Create BLE connection with PIN (required for private key export)
meshcore = await MeshCore.create_ble(
address=args.addr,
pin=args.pin,
debug=args.debug
)
print("✅ Connected successfully!")
# Get device information to verify connection
result = await meshcore.commands.send_device_query()
if result.payload:
print(f"Device model: {result.payload.get('model', 'Unknown')}")
print(f"Firmware version: {result.payload.get('fw ver', 'Unknown')}")
# Export private key
print("\n🔑 Requesting private key export...")
result = await meshcore.commands.export_private_key()
if result.type == EventType.PRIVATE_KEY:
private_key = result.payload["private_key"]
print("✅ Private key exported successfully!")
print(f"Private key (64 bytes): {private_key.hex()}")
print(f"Private key length: {len(private_key)} bytes")
# Optionally save to file
save_to_file = input("\nSave private key to file? (y/N): ").lower().strip()
if save_to_file == 'y':
filename = input("Enter filename (default: private_key.bin): ").strip()
if not filename:
filename = "private_key.bin"
with open(filename, 'wb') as f:
f.write(private_key)
print(f"Private key saved to {filename}")
elif result.type == EventType.DISABLED:
print("❌ Private key export is disabled on this device")
print("This feature requires:")
print(" - Companion radio firmware")
print(" - ENABLE_PRIVATE_KEY_EXPORT=1 compile-time flag")
elif result.type == EventType.ERROR:
print(f"❌ Error exporting private key: {result.payload}")
else:
print(f"❌ Unexpected response: {result.type}")
print("\nPrivate key export test completed!")
except ConnectionError as e:
print(f"❌ Failed to connect: {e}")
return 1
except Exception as e:
print(f"❌ Error: {e}")
return 1
finally:
if 'meshcore' in locals():
await meshcore.disconnect()
print("Disconnected from device")
return 0
if __name__ == "__main__":
import sys
sys.exit(asyncio.run(main()))

View file

@ -202,3 +202,7 @@ class DeviceCommands(CommandHandlerBase):
data = b"\x20" + channel_idx.to_bytes(1, "little") + name_bytes + channel_secret
return await self.send(data, [EventType.OK, EventType.ERROR])
async def export_private_key(self) -> Event:
logger.debug("Requesting private key export")
return await self.send(b"\x17", [EventType.PRIVATE_KEY, EventType.DISABLED, EventType.ERROR])

View file

@ -41,6 +41,8 @@ class EventType(Enum):
CUSTOM_VARS = "custom_vars"
CHANNEL_INFO = "channel_info"
PATH_RESPONSE = "path_response"
PRIVATE_KEY = "private_key"
DISABLED = "disabled"
# Command response types
OK = "command_ok"

View file

@ -569,6 +569,20 @@ class MessageReader:
Event(EventType.PATH_RESPONSE, res, attributes)
)
elif packet_type_value == PacketType.PRIVATE_KEY.value:
logger.debug(f"Received private key response: {data.hex()}")
if len(data) >= 65: # 1 byte response code + 64 bytes private key
private_key = data[1:65] # Extract 64-byte private key
res = {"private_key": private_key}
await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res))
else:
logger.error(f"Invalid private key response length: {len(data)}")
elif packet_type_value == PacketType.DISABLED.value:
logger.debug("Received disabled response")
res = {"reason": "private_key_export_disabled"}
await self.dispatcher.dispatch(Event(EventType.DISABLED, res))
else:
logger.debug(f"Unhandled data received {data}")
logger.debug(f"Unhandled packet type: {packet_type_value}")

View file

@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""
Unit tests for private key export functionality
"""
import pytest
import asyncio
from unittest.mock import MagicMock, AsyncMock
from meshcore.commands import CommandHandler
from meshcore.events import Event, EventType
from meshcore.reader import MessageReader
pytestmark = pytest.mark.asyncio
# Fixtures (consistent with existing test patterns)
@pytest.fixture
def mock_connection():
connection = MagicMock()
connection.send = AsyncMock()
return connection
@pytest.fixture
def mock_dispatcher():
dispatcher = MagicMock()
dispatcher.wait_for_event = AsyncMock()
dispatcher.dispatch = AsyncMock()
return dispatcher
@pytest.fixture
def command_handler(mock_connection, mock_dispatcher):
handler = CommandHandler()
async def sender(data):
await mock_connection.send(data)
handler._sender_func = sender
handler.dispatcher = mock_dispatcher
return handler
# Test helper (consistent with existing patterns)
def setup_event_response(mock_dispatcher, event_type, payload):
async def wait_response(requested_type, filters=None, timeout=None):
if requested_type == event_type:
return Event(event_type, payload)
return None
mock_dispatcher.wait_for_event.side_effect = wait_response
# Command tests
async def test_export_private_key_success(command_handler, mock_connection, mock_dispatcher):
"""Test successful private key export"""
private_key_data = b"x" * 64
expected_payload = {"private_key": private_key_data}
setup_event_response(mock_dispatcher, EventType.PRIVATE_KEY, expected_payload)
result = await command_handler.export_private_key()
# Verify the command was sent correctly
mock_connection.send.assert_called_once_with(b"\x17")
assert result.type == EventType.PRIVATE_KEY
assert len(result.payload["private_key"]) == 64
assert result.payload["private_key"] == private_key_data
async def test_export_private_key_disabled(command_handler, mock_connection, mock_dispatcher):
"""Test private key export when disabled"""
expected_payload = {"reason": "private_key_export_disabled"}
setup_event_response(mock_dispatcher, EventType.DISABLED, expected_payload)
result = await command_handler.export_private_key()
# Verify the command was sent correctly
mock_connection.send.assert_called_once_with(b"\x17")
assert result.type == EventType.DISABLED
assert result.payload["reason"] == "private_key_export_disabled"
async def test_export_private_key_error(command_handler, mock_connection, mock_dispatcher):
"""Test private key export error handling"""
expected_payload = {"reason": "timeout"}
setup_event_response(mock_dispatcher, EventType.ERROR, expected_payload)
result = await command_handler.export_private_key()
# Verify the command was sent correctly
mock_connection.send.assert_called_once_with(b"\x17")
assert result.type == EventType.ERROR
assert result.payload["reason"] == "timeout"
# Packet parsing tests
class MockDispatcher:
def __init__(self):
self.dispatched_events = []
async def dispatch(self, event):
self.dispatched_events.append(event)
async def test_parse_private_key_packet():
"""Test parsing of PRIVATE_KEY packet (type 14)"""
mock_dispatcher = MockDispatcher()
reader = MessageReader(mock_dispatcher)
# Create a mock private key packet: [14][64 bytes of key data]
private_key_data = b"x" * 64
packet = bytes([14]) + private_key_data # PRIVATE_KEY = 14
await reader.handle_rx(bytearray(packet))
# Verify the event was dispatched
assert len(mock_dispatcher.dispatched_events) == 1
event = mock_dispatcher.dispatched_events[0]
assert event.type == EventType.PRIVATE_KEY
assert event.payload["private_key"] == private_key_data
async def test_parse_private_key_packet_invalid_length():
"""Test parsing of PRIVATE_KEY packet with invalid length"""
mock_dispatcher = MockDispatcher()
reader = MessageReader(mock_dispatcher)
# Create a packet that's too short
packet = bytes([14]) + b"short" # Only 5 bytes instead of 64
await reader.handle_rx(bytearray(packet))
# Should not dispatch an event for invalid length
assert len(mock_dispatcher.dispatched_events) == 0
async def test_parse_disabled_packet():
"""Test parsing of DISABLED packet (type 15)"""
mock_dispatcher = MockDispatcher()
reader = MessageReader(mock_dispatcher)
# Create a disabled packet: [15]
packet = bytes([15]) # DISABLED = 15
await reader.handle_rx(bytearray(packet))
# Verify the event was dispatched
assert len(mock_dispatcher.dispatched_events) == 1
event = mock_dispatcher.dispatched_events[0]
assert event.type == EventType.DISABLED
assert event.payload["reason"] == "private_key_export_disabled"