diff --git a/README.md b/README.md index f76de78..5296d87 100644 --- a/README.md +++ b/README.md @@ -515,6 +515,8 @@ All commands are async methods that return `Event` objects. Commands are organiz | **Device Actions** |||| | `send_advert(flood=False)` | `flood: bool` | `OK` | Send advertisement (optionally flood network) | | `reboot()` | None | None | Reboot device (no response expected) | +| **Security** |||| +| `export_private_key()` | None | `PRIVATE_KEY/DISABLED` | Export device private key (requires PIN auth & enabled firmware) | #### Contact Commands (`meshcore.commands.*`) @@ -609,5 +611,6 @@ Check the `examples/` directory for more: - `serial_infos.py`: Quick device info retrieval - `serial_msg.py`: Message sending and receiving - `ble_pin_pairing_example.py`: BLE connection with PIN pairing +- `ble_private_key_export.py`: BLE private key export with PIN authentication - `ble_t1000_infos.py`: BLE connections diff --git a/examples/ble_private_key_export.py b/examples/ble_private_key_export.py new file mode 100644 index 0000000..c01efca --- /dev/null +++ b/examples/ble_private_key_export.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +""" +Example: BLE Private Key Export with MeshCore + +This example demonstrates how to export the private key from a MeshCore device +using BLE with PIN-based pairing for enhanced security. + +Note: This feature requires: +1. A MeshCore device running companion radio firmware +2. ENABLE_PRIVATE_KEY_EXPORT=1 compile-time flag enabled +3. Authenticated BLE connection with PIN +""" + +import asyncio +import argparse +from meshcore import MeshCore +from meshcore.events import EventType + + +async def main(): + parser = argparse.ArgumentParser(description="Export private key from MeshCore device via BLE") + parser.add_argument("-a", "--addr", help="BLE address of the device (optional, will scan if not provided)") + parser.add_argument("-p", "--pin", help="PIN for BLE pairing (required for private key export)") + parser.add_argument("--debug", action="store_true", help="Enable debug logging") + + args = parser.parse_args() + + if not args.pin: + print("āŒ PIN is required for private key export. Use -p or --pin to specify it.") + return 1 + + try: + print("Connecting to MeshCore device...") + print(f"Using PIN for pairing: {args.pin}") + + # Create BLE connection with PIN (required for private key export) + meshcore = await MeshCore.create_ble( + address=args.addr, + pin=args.pin, + debug=args.debug + ) + + print("āœ… Connected successfully!") + + # Get device information to verify connection + result = await meshcore.commands.send_device_query() + if result.payload: + print(f"Device model: {result.payload.get('model', 'Unknown')}") + print(f"Firmware version: {result.payload.get('fw ver', 'Unknown')}") + + # Export private key + print("\nšŸ”‘ Requesting private key export...") + result = await meshcore.commands.export_private_key() + + if result.type == EventType.PRIVATE_KEY: + private_key = result.payload["private_key"] + print("āœ… Private key exported successfully!") + print(f"Private key (64 bytes): {private_key.hex()}") + print(f"Private key length: {len(private_key)} bytes") + + # Optionally save to file + save_to_file = input("\nSave private key to file? (y/N): ").lower().strip() + if save_to_file == 'y': + filename = input("Enter filename (default: private_key.bin): ").strip() + if not filename: + filename = "private_key.bin" + + with open(filename, 'wb') as f: + f.write(private_key) + print(f"Private key saved to {filename}") + + elif result.type == EventType.DISABLED: + print("āŒ Private key export is disabled on this device") + print("This feature requires:") + print(" - Companion radio firmware") + print(" - ENABLE_PRIVATE_KEY_EXPORT=1 compile-time flag") + + elif result.type == EventType.ERROR: + print(f"āŒ Error exporting private key: {result.payload}") + + else: + print(f"āŒ Unexpected response: {result.type}") + + print("\nPrivate key export test completed!") + + except ConnectionError as e: + print(f"āŒ Failed to connect: {e}") + return 1 + except Exception as e: + print(f"āŒ Error: {e}") + return 1 + finally: + if 'meshcore' in locals(): + await meshcore.disconnect() + print("Disconnected from device") + + return 0 + + +if __name__ == "__main__": + import sys + sys.exit(asyncio.run(main())) diff --git a/src/meshcore/commands/device.py b/src/meshcore/commands/device.py index 5e39dcf..0b87bb3 100644 --- a/src/meshcore/commands/device.py +++ b/src/meshcore/commands/device.py @@ -202,3 +202,7 @@ class DeviceCommands(CommandHandlerBase): data = b"\x20" + channel_idx.to_bytes(1, "little") + name_bytes + channel_secret return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def export_private_key(self) -> Event: + logger.debug("Requesting private key export") + return await self.send(b"\x17", [EventType.PRIVATE_KEY, EventType.DISABLED, EventType.ERROR]) diff --git a/src/meshcore/events.py b/src/meshcore/events.py index cdc5d9b..4fbcc6d 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -41,6 +41,8 @@ class EventType(Enum): CUSTOM_VARS = "custom_vars" CHANNEL_INFO = "channel_info" PATH_RESPONSE = "path_response" + PRIVATE_KEY = "private_key" + DISABLED = "disabled" # Command response types OK = "command_ok" diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 1ea8587..7671509 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -569,6 +569,20 @@ class MessageReader: Event(EventType.PATH_RESPONSE, res, attributes) ) + elif packet_type_value == PacketType.PRIVATE_KEY.value: + logger.debug(f"Received private key response: {data.hex()}") + if len(data) >= 65: # 1 byte response code + 64 bytes private key + private_key = data[1:65] # Extract 64-byte private key + res = {"private_key": private_key} + await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) + else: + logger.error(f"Invalid private key response length: {len(data)}") + + elif packet_type_value == PacketType.DISABLED.value: + logger.debug("Received disabled response") + res = {"reason": "private_key_export_disabled"} + await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) + else: logger.debug(f"Unhandled data received {data}") logger.debug(f"Unhandled packet type: {packet_type_value}") diff --git a/tests/unit/test_private_key_export.py b/tests/unit/test_private_key_export.py new file mode 100644 index 0000000..71e1ccf --- /dev/null +++ b/tests/unit/test_private_key_export.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +""" +Unit tests for private key export functionality +""" + +import pytest +import asyncio +from unittest.mock import MagicMock, AsyncMock +from meshcore.commands import CommandHandler +from meshcore.events import Event, EventType +from meshcore.reader import MessageReader + +pytestmark = pytest.mark.asyncio + + +# Fixtures (consistent with existing test patterns) +@pytest.fixture +def mock_connection(): + connection = MagicMock() + connection.send = AsyncMock() + return connection + + +@pytest.fixture +def mock_dispatcher(): + dispatcher = MagicMock() + dispatcher.wait_for_event = AsyncMock() + dispatcher.dispatch = AsyncMock() + return dispatcher + + +@pytest.fixture +def command_handler(mock_connection, mock_dispatcher): + handler = CommandHandler() + + async def sender(data): + await mock_connection.send(data) + + handler._sender_func = sender + handler.dispatcher = mock_dispatcher + return handler + + +# Test helper (consistent with existing patterns) +def setup_event_response(mock_dispatcher, event_type, payload): + async def wait_response(requested_type, filters=None, timeout=None): + if requested_type == event_type: + return Event(event_type, payload) + return None + + mock_dispatcher.wait_for_event.side_effect = wait_response + + +# Command tests +async def test_export_private_key_success(command_handler, mock_connection, mock_dispatcher): + """Test successful private key export""" + private_key_data = b"x" * 64 + expected_payload = {"private_key": private_key_data} + setup_event_response(mock_dispatcher, EventType.PRIVATE_KEY, expected_payload) + + result = await command_handler.export_private_key() + + # Verify the command was sent correctly + mock_connection.send.assert_called_once_with(b"\x17") + assert result.type == EventType.PRIVATE_KEY + assert len(result.payload["private_key"]) == 64 + assert result.payload["private_key"] == private_key_data + + +async def test_export_private_key_disabled(command_handler, mock_connection, mock_dispatcher): + """Test private key export when disabled""" + expected_payload = {"reason": "private_key_export_disabled"} + setup_event_response(mock_dispatcher, EventType.DISABLED, expected_payload) + + result = await command_handler.export_private_key() + + # Verify the command was sent correctly + mock_connection.send.assert_called_once_with(b"\x17") + assert result.type == EventType.DISABLED + assert result.payload["reason"] == "private_key_export_disabled" + + +async def test_export_private_key_error(command_handler, mock_connection, mock_dispatcher): + """Test private key export error handling""" + expected_payload = {"reason": "timeout"} + setup_event_response(mock_dispatcher, EventType.ERROR, expected_payload) + + result = await command_handler.export_private_key() + + # Verify the command was sent correctly + mock_connection.send.assert_called_once_with(b"\x17") + assert result.type == EventType.ERROR + assert result.payload["reason"] == "timeout" + + +# Packet parsing tests +class MockDispatcher: + def __init__(self): + self.dispatched_events = [] + + async def dispatch(self, event): + self.dispatched_events.append(event) + + +async def test_parse_private_key_packet(): + """Test parsing of PRIVATE_KEY packet (type 14)""" + mock_dispatcher = MockDispatcher() + reader = MessageReader(mock_dispatcher) + + # Create a mock private key packet: [14][64 bytes of key data] + private_key_data = b"x" * 64 + packet = bytes([14]) + private_key_data # PRIVATE_KEY = 14 + + await reader.handle_rx(bytearray(packet)) + + # Verify the event was dispatched + assert len(mock_dispatcher.dispatched_events) == 1 + event = mock_dispatcher.dispatched_events[0] + + assert event.type == EventType.PRIVATE_KEY + assert event.payload["private_key"] == private_key_data + + +async def test_parse_private_key_packet_invalid_length(): + """Test parsing of PRIVATE_KEY packet with invalid length""" + mock_dispatcher = MockDispatcher() + reader = MessageReader(mock_dispatcher) + + # Create a packet that's too short + packet = bytes([14]) + b"short" # Only 5 bytes instead of 64 + + await reader.handle_rx(bytearray(packet)) + + # Should not dispatch an event for invalid length + assert len(mock_dispatcher.dispatched_events) == 0 + + +async def test_parse_disabled_packet(): + """Test parsing of DISABLED packet (type 15)""" + mock_dispatcher = MockDispatcher() + reader = MessageReader(mock_dispatcher) + + # Create a disabled packet: [15] + packet = bytes([15]) # DISABLED = 15 + + await reader.handle_rx(bytearray(packet)) + + # Verify the event was dispatched + assert len(mock_dispatcher.dispatched_events) == 1 + event = mock_dispatcher.dispatched_events[0] + + assert event.type == EventType.DISABLED + assert event.payload["reason"] == "private_key_export_disabled"