diff --git a/README.md b/README.md index 21738f6..f76de78 100644 --- a/README.md +++ b/README.md @@ -107,10 +107,36 @@ meshcore = await MeshCore.create_serial("/dev/ttyUSB0", 115200, debug=True) # BLE connection (scans for devices if address not provided) meshcore = await MeshCore.create_ble("12:34:56:78:90:AB") +# BLE connection with PIN pairing for enhanced security +meshcore = await MeshCore.create_ble("12:34:56:78:90:AB", pin="123456") + # TCP connection meshcore = await MeshCore.create_tcp("192.168.1.100", 4000) ``` +#### BLE PIN Pairing + +For enhanced security, MeshCore supports BLE PIN pairing. This requires the device to be configured with a PIN and the client to provide the matching PIN during connection: + +```python +# First configure the device PIN (if not already set) +meshcore = await MeshCore.create_ble("12:34:56:78:90:AB") +await meshcore.commands.set_devicepin(123456) + +# Then connect with PIN pairing +meshcore = await MeshCore.create_ble("12:34:56:78:90:AB", pin="123456") +``` + +**PIN Pairing Features:** +- Automatic pairing initiation when PIN is provided +- Graceful fallback if pairing fails (connection continues if device is already paired) +- Compatible with all BLE connection methods (address, scanning, pre-configured client) +- Logging of pairing success/failure for debugging + +**Note:** BLE pairing behavior may vary by platform: +- **Linux/Windows**: PIN pairing is fully supported +- **macOS**: Pairing may be handled automatically by the system UI + #### Auto-Reconnect and Connection Events Enable automatic reconnection when connections are lost: @@ -582,5 +608,6 @@ Check the `examples/` directory for more: - `pubsub_example.py`: Event subscription system with auto-fetching - `serial_infos.py`: Quick device info retrieval - `serial_msg.py`: Message sending and receiving +- `ble_pin_pairing_example.py`: BLE connection with PIN pairing - `ble_t1000_infos.py`: BLE connections diff --git a/examples/ble_pin_pairing_example.py b/examples/ble_pin_pairing_example.py new file mode 100644 index 0000000..01417ef --- /dev/null +++ b/examples/ble_pin_pairing_example.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +""" +Example: BLE PIN Pairing with MeshCore + +This example demonstrates how to connect to a MeshCore device using BLE +with PIN-based pairing for enhanced security. +""" + +import asyncio +import argparse +from meshcore import MeshCore + + +async def main(): + parser = argparse.ArgumentParser(description="Connect to MeshCore device with BLE PIN pairing") + parser.add_argument("-a", "--addr", help="BLE address of the device (optional, will scan if not provided)") + parser.add_argument("-p", "--pin", help="PIN for BLE pairing (optional)") + parser.add_argument("--debug", action="store_true", help="Enable debug logging") + + args = parser.parse_args() + + try: + print("Connecting to MeshCore device...") + if args.pin: + print(f"Using PIN for pairing: {args.pin}") + + # Create BLE connection with optional PIN + meshcore = await MeshCore.create_ble( + address=args.addr, + pin=args.pin, + debug=args.debug + ) + + print("✅ Connected successfully!") + + # Get device information to verify connection + result = await meshcore.commands.send_device_query() + if result.payload: + print(f"Device model: {result.payload.get('model', 'Unknown')}") + print(f"Firmware version: {result.payload.get('fw_version', 'Unknown')}") + + # Get device self-info + result = await meshcore.commands.send_appstart() + if result.payload: + print(f"Device public key: {result.payload.get('public_key', 'Unknown')[:16]}...") + + print("\nConnection test completed successfully!") + + except ConnectionError as e: + print(f"❌ Failed to connect: {e}") + return 1 + except Exception as e: + print(f"❌ Error: {e}") + return 1 + finally: + if 'meshcore' in locals(): + await meshcore.disconnect() + print("Disconnected from device") + + return 0 + + +if __name__ == "__main__": + import sys + try: + sys.exit(asyncio.run(main())) + except KeyboardInterrupt: + print("\nInterrupted by user") + sys.exit(1) \ No newline at end of file diff --git a/src/meshcore/ble_cx.py b/src/meshcore/ble_cx.py index cc40c55..afe0c05 100644 --- a/src/meshcore/ble_cx.py +++ b/src/meshcore/ble_cx.py @@ -20,13 +20,15 @@ UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" class BLEConnection: - def __init__(self, address=None, device=None, client=None): + def __init__(self, address=None, device=None, client=None, pin=None): """ Constructor: specify address or an existing BleakClient. Args: address (str, optional): The Bluetooth address of the device. + device (BLEDevice, optional): A BLEDevice instance. client (BleakClient, optional): An existing BleakClient instance. + pin (str, optional): PIN for BLE pairing authentication. """ self.address = address self._user_provided_address = address @@ -34,6 +36,7 @@ class BLEConnection: self._user_provided_client = client self.device = device self._user_provided_device = device + self.pin = pin self.rx_char = None self._disconnect_callback = None @@ -93,6 +96,18 @@ class BLEConnection: try: await self.client.connect() + + # Perform pairing if PIN is provided + if self.pin is not None: + logger.debug(f"Attempting BLE pairing with PIN") + try: + await self.client.pair() + logger.info("BLE pairing successful") + except Exception as e: + logger.warning(f"BLE pairing failed: {e}") + # Don't fail the connection if pairing fails, as the device + # might already be paired or not require pairing + except BleakDeviceNotFoundError: return None except TimeoutError: diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index 910f608..17aba46 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -128,6 +128,7 @@ class MeshCore: address: Optional[str] = None, client=None, device=None, + pin: Optional[str] = None, debug: bool = False, only_error: bool = False, default_timeout=None, @@ -143,8 +144,10 @@ class MeshCore: client (BleakClient, optional): An existing BleakClient instance to use. If provided, 'address' is ignored for connection but can be used for identification. + device (BLEDevice, optional): A BLEDevice instance to use for connection. + pin (str, optional): PIN for BLE pairing authentication. """ - connection = BLEConnection(address=address, client=client, device=device) + connection = BLEConnection(address=address, client=client, device=device, pin=pin) mc = cls( connection, diff --git a/tests/test_ble_connection.py b/tests/test_ble_connection.py index 15512e0..d417302 100644 --- a/tests/test_ble_connection.py +++ b/tests/test_ble_connection.py @@ -53,7 +53,7 @@ class TestBLEConnection(unittest.TestCase): # Assert ble_conn.rx_char.write_gatt_char.assert_called_once_with( - ble_conn.rx_char, data_to_send, response=False + ble_conn.rx_char, data_to_send, response=True ) def _get_mock_bleak_client(self): diff --git a/tests/test_ble_pin_pairing.py b/tests/test_ble_pin_pairing.py new file mode 100644 index 0000000..aaa3acc --- /dev/null +++ b/tests/test_ble_pin_pairing.py @@ -0,0 +1,121 @@ +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from meshcore.ble_cx import ( + BLEConnection, + UART_SERVICE_UUID, + UART_TX_CHAR_UUID, + UART_RX_CHAR_UUID, +) + + +class TestBLEPinPairing(unittest.TestCase): + """Test BLE PIN pairing functionality""" + + @patch("meshcore.ble_cx.BleakClient") + def test_ble_connection_with_pin_successful_pairing(self, mock_bleak_client): + """Test BLE connection with PIN when pairing succeeds""" + # Arrange + mock_client_instance = self._get_mock_bleak_client() + mock_bleak_client.return_value = mock_client_instance + + address = "00:11:22:33:44:55" + pin = "123456" + ble_conn = BLEConnection(address=address, pin=pin) + + # Act + result = asyncio.run(ble_conn.connect()) + + # Assert + mock_client_instance.connect.assert_called_once() + mock_client_instance.pair.assert_called_once() + mock_client_instance.start_notify.assert_called_once_with( + UART_TX_CHAR_UUID, ble_conn.handle_rx + ) + self.assertEqual(result, address) + + @patch("meshcore.ble_cx.BleakClient") + def test_ble_connection_with_pin_failed_pairing(self, mock_bleak_client): + """Test BLE connection with PIN when pairing fails but connection continues""" + # Arrange + mock_client_instance = self._get_mock_bleak_client() + mock_client_instance.pair = AsyncMock(side_effect=Exception("Pairing failed")) + mock_bleak_client.return_value = mock_client_instance + + address = "00:11:22:33:44:55" + pin = "123456" + ble_conn = BLEConnection(address=address, pin=pin) + + # Act + result = asyncio.run(ble_conn.connect()) + + # Assert + mock_client_instance.connect.assert_called_once() + mock_client_instance.pair.assert_called_once() + mock_client_instance.start_notify.assert_called_once_with( + UART_TX_CHAR_UUID, ble_conn.handle_rx + ) + # Connection should still succeed even if pairing fails + self.assertEqual(result, address) + + @patch("meshcore.ble_cx.BleakClient") + def test_ble_connection_without_pin_no_pairing(self, mock_bleak_client): + """Test BLE connection without PIN - no pairing should be attempted""" + # Arrange + mock_client_instance = self._get_mock_bleak_client() + mock_bleak_client.return_value = mock_client_instance + + address = "00:11:22:33:44:55" + ble_conn = BLEConnection(address=address) + + # Act + result = asyncio.run(ble_conn.connect()) + + # Assert + mock_client_instance.connect.assert_called_once() + mock_client_instance.pair.assert_not_called() + mock_client_instance.start_notify.assert_called_once_with( + UART_TX_CHAR_UUID, ble_conn.handle_rx + ) + self.assertEqual(result, address) + + @patch("meshcore.ble_cx.BleakClient") + def test_ble_connection_pin_constructor_parameter(self, mock_bleak_client): + """Test that PIN parameter is properly stored in constructor""" + # Arrange + address = "00:11:22:33:44:55" + pin = "654321" + + # Act + ble_conn = BLEConnection(address=address, pin=pin) + + # Assert + self.assertEqual(ble_conn.pin, pin) + self.assertEqual(ble_conn.address, address) + + def _get_mock_bleak_client(self): + """ + Creates a mock BleakClient instance with all the necessary async methods and attributes. + """ + mock_client = MagicMock() + mock_client.address = "00:11:22:33:44:55" + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.start_notify = AsyncMock() + mock_client.write_gatt_char = AsyncMock() + mock_client.pair = AsyncMock() + mock_client.is_connected = True + + mock_service = MagicMock() + mock_char = MagicMock() + mock_char.uuid = UART_RX_CHAR_UUID + mock_char.write_gatt_char = mock_client.write_gatt_char + mock_service.get_characteristic.return_value = mock_char + mock_client.services.get_service.return_value = mock_service + + return mock_client + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_meshcore_ble_pin.py b/tests/test_meshcore_ble_pin.py new file mode 100644 index 0000000..10ecc88 --- /dev/null +++ b/tests/test_meshcore_ble_pin.py @@ -0,0 +1,35 @@ +import unittest +from meshcore.ble_cx import BLEConnection + + +class TestMeshCoreBLEPin(unittest.TestCase): + """Test MeshCore BLE PIN pairing functionality""" + + def test_ble_connection_pin_parameter_propagation(self): + """Test that PIN parameter is properly passed to BLEConnection""" + # Arrange + address = "00:11:22:33:44:55" + pin = "654321" + + # Act + connection = BLEConnection(address=address, pin=pin) + + # Assert + self.assertEqual(connection.pin, pin) + self.assertEqual(connection.address, address) + + def test_ble_connection_pin_none_by_default(self): + """Test that PIN is None by default when not specified""" + # Arrange + address = "00:11:22:33:44:55" + + # Act + connection = BLEConnection(address=address) + + # Assert + self.assertIsNone(connection.pin) + self.assertEqual(connection.address, address) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file