Merge pull request #29 from agessaman/feature/private-key-export

Feature/private key export
This commit is contained in:
fdlamotte 2025-10-13 12:35:23 +02:00 committed by GitHub
commit 00a32a0255
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 278 additions and 0 deletions

View file

@ -515,6 +515,8 @@ All commands are async methods that return `Event` objects. Commands are organiz
| **Device Actions** ||||
| `send_advert(flood=False)` | `flood: bool` | `OK` | Send advertisement (optionally flood network) |
| `reboot()` | None | None | Reboot device (no response expected) |
| **Security** ||||
| `export_private_key()` | None | `PRIVATE_KEY/DISABLED` | Export device private key (requires PIN auth & enabled firmware) |
#### Contact Commands (`meshcore.commands.*`)
@ -609,5 +611,6 @@ Check the `examples/` directory for more:
- `serial_infos.py`: Quick device info retrieval
- `serial_msg.py`: Message sending and receiving
- `ble_pin_pairing_example.py`: BLE connection with PIN pairing
- `ble_private_key_export.py`: BLE private key export with PIN authentication
- `ble_t1000_infos.py`: BLE connections

View file

@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Example: BLE Private Key Export with MeshCore
This example demonstrates how to export the private key from a MeshCore device
using BLE with PIN-based pairing for enhanced security.
Note: This feature requires:
1. A MeshCore device running companion radio firmware
2. ENABLE_PRIVATE_KEY_EXPORT=1 compile-time flag enabled
3. Authenticated BLE connection with PIN
"""
import asyncio
import argparse
from meshcore import MeshCore
from meshcore.events import EventType
async def main():
parser = argparse.ArgumentParser(description="Export private key from MeshCore device via BLE")
parser.add_argument("-a", "--addr", help="BLE address of the device (optional, will scan if not provided)")
parser.add_argument("-p", "--pin", help="PIN for BLE pairing (required for private key export)")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
args = parser.parse_args()
if not args.pin:
print("❌ PIN is required for private key export. Use -p or --pin to specify it.")
return 1
try:
print("Connecting to MeshCore device...")
print(f"Using PIN for pairing: {args.pin}")
# Create BLE connection with PIN (required for private key export)
meshcore = await MeshCore.create_ble(
address=args.addr,
pin=args.pin,
debug=args.debug
)
print("✅ Connected successfully!")
# Get device information to verify connection
result = await meshcore.commands.send_device_query()
if result.payload:
print(f"Device model: {result.payload.get('model', 'Unknown')}")
print(f"Firmware version: {result.payload.get('fw ver', 'Unknown')}")
# Export private key
print("\n🔑 Requesting private key export...")
result = await meshcore.commands.export_private_key()
if result.type == EventType.PRIVATE_KEY:
private_key = result.payload["private_key"]
print("✅ Private key exported successfully!")
print(f"Private key (64 bytes): {private_key.hex()}")
print(f"Private key length: {len(private_key)} bytes")
# Optionally save to file
save_to_file = input("\nSave private key to file? (y/N): ").lower().strip()
if save_to_file == 'y':
filename = input("Enter filename (default: private_key.bin): ").strip()
if not filename:
filename = "private_key.bin"
with open(filename, 'wb') as f:
f.write(private_key)
print(f"Private key saved to {filename}")
elif result.type == EventType.DISABLED:
print("❌ Private key export is disabled on this device")
print("This feature requires:")
print(" - Companion radio firmware")
print(" - ENABLE_PRIVATE_KEY_EXPORT=1 compile-time flag")
elif result.type == EventType.ERROR:
print(f"❌ Error exporting private key: {result.payload}")
else:
print(f"❌ Unexpected response: {result.type}")
print("\nPrivate key export test completed!")
except ConnectionError as e:
print(f"❌ Failed to connect: {e}")
return 1
except Exception as e:
print(f"❌ Error: {e}")
return 1
finally:
if 'meshcore' in locals():
await meshcore.disconnect()
print("Disconnected from device")
return 0
if __name__ == "__main__":
import sys
sys.exit(asyncio.run(main()))

View file

@ -202,3 +202,7 @@ class DeviceCommands(CommandHandlerBase):
data = b"\x20" + channel_idx.to_bytes(1, "little") + name_bytes + channel_secret
return await self.send(data, [EventType.OK, EventType.ERROR])
async def export_private_key(self) -> Event:
logger.debug("Requesting private key export")
return await self.send(b"\x17", [EventType.PRIVATE_KEY, EventType.DISABLED, EventType.ERROR])

View file

@ -41,6 +41,8 @@ class EventType(Enum):
CUSTOM_VARS = "custom_vars"
CHANNEL_INFO = "channel_info"
PATH_RESPONSE = "path_response"
PRIVATE_KEY = "private_key"
DISABLED = "disabled"
# Command response types
OK = "command_ok"

View file

@ -569,6 +569,20 @@ class MessageReader:
Event(EventType.PATH_RESPONSE, res, attributes)
)
elif packet_type_value == PacketType.PRIVATE_KEY.value:
logger.debug(f"Received private key response: {data.hex()}")
if len(data) >= 65: # 1 byte response code + 64 bytes private key
private_key = data[1:65] # Extract 64-byte private key
res = {"private_key": private_key}
await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res))
else:
logger.error(f"Invalid private key response length: {len(data)}")
elif packet_type_value == PacketType.DISABLED.value:
logger.debug("Received disabled response")
res = {"reason": "private_key_export_disabled"}
await self.dispatcher.dispatch(Event(EventType.DISABLED, res))
else:
logger.debug(f"Unhandled data received {data}")
logger.debug(f"Unhandled packet type: {packet_type_value}")

View file

@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""
Unit tests for private key export functionality
"""
import pytest
import asyncio
from unittest.mock import MagicMock, AsyncMock
from meshcore.commands import CommandHandler
from meshcore.events import Event, EventType
from meshcore.reader import MessageReader
pytestmark = pytest.mark.asyncio
# Fixtures (consistent with existing test patterns)
@pytest.fixture
def mock_connection():
connection = MagicMock()
connection.send = AsyncMock()
return connection
@pytest.fixture
def mock_dispatcher():
dispatcher = MagicMock()
dispatcher.wait_for_event = AsyncMock()
dispatcher.dispatch = AsyncMock()
return dispatcher
@pytest.fixture
def command_handler(mock_connection, mock_dispatcher):
handler = CommandHandler()
async def sender(data):
await mock_connection.send(data)
handler._sender_func = sender
handler.dispatcher = mock_dispatcher
return handler
# Test helper (consistent with existing patterns)
def setup_event_response(mock_dispatcher, event_type, payload):
async def wait_response(requested_type, filters=None, timeout=None):
if requested_type == event_type:
return Event(event_type, payload)
return None
mock_dispatcher.wait_for_event.side_effect = wait_response
# Command tests
async def test_export_private_key_success(command_handler, mock_connection, mock_dispatcher):
"""Test successful private key export"""
private_key_data = b"x" * 64
expected_payload = {"private_key": private_key_data}
setup_event_response(mock_dispatcher, EventType.PRIVATE_KEY, expected_payload)
result = await command_handler.export_private_key()
# Verify the command was sent correctly
mock_connection.send.assert_called_once_with(b"\x17")
assert result.type == EventType.PRIVATE_KEY
assert len(result.payload["private_key"]) == 64
assert result.payload["private_key"] == private_key_data
async def test_export_private_key_disabled(command_handler, mock_connection, mock_dispatcher):
"""Test private key export when disabled"""
expected_payload = {"reason": "private_key_export_disabled"}
setup_event_response(mock_dispatcher, EventType.DISABLED, expected_payload)
result = await command_handler.export_private_key()
# Verify the command was sent correctly
mock_connection.send.assert_called_once_with(b"\x17")
assert result.type == EventType.DISABLED
assert result.payload["reason"] == "private_key_export_disabled"
async def test_export_private_key_error(command_handler, mock_connection, mock_dispatcher):
"""Test private key export error handling"""
expected_payload = {"reason": "timeout"}
setup_event_response(mock_dispatcher, EventType.ERROR, expected_payload)
result = await command_handler.export_private_key()
# Verify the command was sent correctly
mock_connection.send.assert_called_once_with(b"\x17")
assert result.type == EventType.ERROR
assert result.payload["reason"] == "timeout"
# Packet parsing tests
class MockDispatcher:
def __init__(self):
self.dispatched_events = []
async def dispatch(self, event):
self.dispatched_events.append(event)
async def test_parse_private_key_packet():
"""Test parsing of PRIVATE_KEY packet (type 14)"""
mock_dispatcher = MockDispatcher()
reader = MessageReader(mock_dispatcher)
# Create a mock private key packet: [14][64 bytes of key data]
private_key_data = b"x" * 64
packet = bytes([14]) + private_key_data # PRIVATE_KEY = 14
await reader.handle_rx(bytearray(packet))
# Verify the event was dispatched
assert len(mock_dispatcher.dispatched_events) == 1
event = mock_dispatcher.dispatched_events[0]
assert event.type == EventType.PRIVATE_KEY
assert event.payload["private_key"] == private_key_data
async def test_parse_private_key_packet_invalid_length():
"""Test parsing of PRIVATE_KEY packet with invalid length"""
mock_dispatcher = MockDispatcher()
reader = MessageReader(mock_dispatcher)
# Create a packet that's too short
packet = bytes([14]) + b"short" # Only 5 bytes instead of 64
await reader.handle_rx(bytearray(packet))
# Should not dispatch an event for invalid length
assert len(mock_dispatcher.dispatched_events) == 0
async def test_parse_disabled_packet():
"""Test parsing of DISABLED packet (type 15)"""
mock_dispatcher = MockDispatcher()
reader = MessageReader(mock_dispatcher)
# Create a disabled packet: [15]
packet = bytes([15]) # DISABLED = 15
await reader.handle_rx(bytearray(packet))
# Verify the event was dispatched
assert len(mock_dispatcher.dispatched_events) == 1
event = mock_dispatcher.dispatched_events[0]
assert event.type == EventType.DISABLED
assert event.payload["reason"] == "private_key_export_disabled"