Merge pull request #28 from alex-vg/main

Implement BLE PIN pairing support for enhanced security
This commit is contained in:
fdlamotte 2025-09-28 20:40:24 +02:00 committed by GitHub
commit ff09da49ba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 273 additions and 3 deletions

View file

@ -107,10 +107,36 @@ meshcore = await MeshCore.create_serial("/dev/ttyUSB0", 115200, debug=True)
# BLE connection (scans for devices if address not provided)
meshcore = await MeshCore.create_ble("12:34:56:78:90:AB")
# BLE connection with PIN pairing for enhanced security
meshcore = await MeshCore.create_ble("12:34:56:78:90:AB", pin="123456")
# TCP connection
meshcore = await MeshCore.create_tcp("192.168.1.100", 4000)
```
#### BLE PIN Pairing
For enhanced security, MeshCore supports BLE PIN pairing. This requires the device to be configured with a PIN and the client to provide the matching PIN during connection:
```python
# First configure the device PIN (if not already set)
meshcore = await MeshCore.create_ble("12:34:56:78:90:AB")
await meshcore.commands.set_devicepin(123456)
# Then connect with PIN pairing
meshcore = await MeshCore.create_ble("12:34:56:78:90:AB", pin="123456")
```
**PIN Pairing Features:**
- Automatic pairing initiation when PIN is provided
- Graceful fallback if pairing fails (connection continues if device is already paired)
- Compatible with all BLE connection methods (address, scanning, pre-configured client)
- Logging of pairing success/failure for debugging
**Note:** BLE pairing behavior may vary by platform:
- **Linux/Windows**: PIN pairing is fully supported
- **macOS**: Pairing may be handled automatically by the system UI
#### Auto-Reconnect and Connection Events
Enable automatic reconnection when connections are lost:
@ -582,5 +608,6 @@ Check the `examples/` directory for more:
- `pubsub_example.py`: Event subscription system with auto-fetching
- `serial_infos.py`: Quick device info retrieval
- `serial_msg.py`: Message sending and receiving
- `ble_pin_pairing_example.py`: BLE connection with PIN pairing
- `ble_t1000_infos.py`: BLE connections

View file

@ -0,0 +1,69 @@
#!/usr/bin/env python3
"""
Example: BLE PIN Pairing with MeshCore
This example demonstrates how to connect to a MeshCore device using BLE
with PIN-based pairing for enhanced security.
"""
import asyncio
import argparse
from meshcore import MeshCore
async def main():
parser = argparse.ArgumentParser(description="Connect to MeshCore device with BLE PIN pairing")
parser.add_argument("-a", "--addr", help="BLE address of the device (optional, will scan if not provided)")
parser.add_argument("-p", "--pin", help="PIN for BLE pairing (optional)")
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
args = parser.parse_args()
try:
print("Connecting to MeshCore device...")
if args.pin:
print(f"Using PIN for pairing: {args.pin}")
# Create BLE connection with optional PIN
meshcore = await MeshCore.create_ble(
address=args.addr,
pin=args.pin,
debug=args.debug
)
print("✅ Connected successfully!")
# Get device information to verify connection
result = await meshcore.commands.send_device_query()
if result.payload:
print(f"Device model: {result.payload.get('model', 'Unknown')}")
print(f"Firmware version: {result.payload.get('fw_version', 'Unknown')}")
# Get device self-info
result = await meshcore.commands.send_appstart()
if result.payload:
print(f"Device public key: {result.payload.get('public_key', 'Unknown')[:16]}...")
print("\nConnection test completed successfully!")
except ConnectionError as e:
print(f"❌ Failed to connect: {e}")
return 1
except Exception as e:
print(f"❌ Error: {e}")
return 1
finally:
if 'meshcore' in locals():
await meshcore.disconnect()
print("Disconnected from device")
return 0
if __name__ == "__main__":
import sys
try:
sys.exit(asyncio.run(main()))
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(1)

View file

@ -20,13 +20,15 @@ UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
class BLEConnection:
def __init__(self, address=None, device=None, client=None):
def __init__(self, address=None, device=None, client=None, pin=None):
"""
Constructor: specify address or an existing BleakClient.
Args:
address (str, optional): The Bluetooth address of the device.
device (BLEDevice, optional): A BLEDevice instance.
client (BleakClient, optional): An existing BleakClient instance.
pin (str, optional): PIN for BLE pairing authentication.
"""
self.address = address
self._user_provided_address = address
@ -34,6 +36,7 @@ class BLEConnection:
self._user_provided_client = client
self.device = device
self._user_provided_device = device
self.pin = pin
self.rx_char = None
self._disconnect_callback = None
@ -93,6 +96,18 @@ class BLEConnection:
try:
await self.client.connect()
# Perform pairing if PIN is provided
if self.pin is not None:
logger.debug(f"Attempting BLE pairing with PIN")
try:
await self.client.pair()
logger.info("BLE pairing successful")
except Exception as e:
logger.warning(f"BLE pairing failed: {e}")
# Don't fail the connection if pairing fails, as the device
# might already be paired or not require pairing
except BleakDeviceNotFoundError:
return None
except TimeoutError:

View file

@ -128,6 +128,7 @@ class MeshCore:
address: Optional[str] = None,
client=None,
device=None,
pin: Optional[str] = None,
debug: bool = False,
only_error: bool = False,
default_timeout=None,
@ -143,8 +144,10 @@ class MeshCore:
client (BleakClient, optional): An existing BleakClient instance to use.
If provided, 'address' is ignored for connection
but can be used for identification.
device (BLEDevice, optional): A BLEDevice instance to use for connection.
pin (str, optional): PIN for BLE pairing authentication.
"""
connection = BLEConnection(address=address, client=client, device=device)
connection = BLEConnection(address=address, client=client, device=device, pin=pin)
mc = cls(
connection,

View file

@ -53,7 +53,7 @@ class TestBLEConnection(unittest.TestCase):
# Assert
ble_conn.rx_char.write_gatt_char.assert_called_once_with(
ble_conn.rx_char, data_to_send, response=False
ble_conn.rx_char, data_to_send, response=True
)
def _get_mock_bleak_client(self):

View file

@ -0,0 +1,121 @@
import asyncio
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
from meshcore.ble_cx import (
BLEConnection,
UART_SERVICE_UUID,
UART_TX_CHAR_UUID,
UART_RX_CHAR_UUID,
)
class TestBLEPinPairing(unittest.TestCase):
"""Test BLE PIN pairing functionality"""
@patch("meshcore.ble_cx.BleakClient")
def test_ble_connection_with_pin_successful_pairing(self, mock_bleak_client):
"""Test BLE connection with PIN when pairing succeeds"""
# Arrange
mock_client_instance = self._get_mock_bleak_client()
mock_bleak_client.return_value = mock_client_instance
address = "00:11:22:33:44:55"
pin = "123456"
ble_conn = BLEConnection(address=address, pin=pin)
# Act
result = asyncio.run(ble_conn.connect())
# Assert
mock_client_instance.connect.assert_called_once()
mock_client_instance.pair.assert_called_once()
mock_client_instance.start_notify.assert_called_once_with(
UART_TX_CHAR_UUID, ble_conn.handle_rx
)
self.assertEqual(result, address)
@patch("meshcore.ble_cx.BleakClient")
def test_ble_connection_with_pin_failed_pairing(self, mock_bleak_client):
"""Test BLE connection with PIN when pairing fails but connection continues"""
# Arrange
mock_client_instance = self._get_mock_bleak_client()
mock_client_instance.pair = AsyncMock(side_effect=Exception("Pairing failed"))
mock_bleak_client.return_value = mock_client_instance
address = "00:11:22:33:44:55"
pin = "123456"
ble_conn = BLEConnection(address=address, pin=pin)
# Act
result = asyncio.run(ble_conn.connect())
# Assert
mock_client_instance.connect.assert_called_once()
mock_client_instance.pair.assert_called_once()
mock_client_instance.start_notify.assert_called_once_with(
UART_TX_CHAR_UUID, ble_conn.handle_rx
)
# Connection should still succeed even if pairing fails
self.assertEqual(result, address)
@patch("meshcore.ble_cx.BleakClient")
def test_ble_connection_without_pin_no_pairing(self, mock_bleak_client):
"""Test BLE connection without PIN - no pairing should be attempted"""
# Arrange
mock_client_instance = self._get_mock_bleak_client()
mock_bleak_client.return_value = mock_client_instance
address = "00:11:22:33:44:55"
ble_conn = BLEConnection(address=address)
# Act
result = asyncio.run(ble_conn.connect())
# Assert
mock_client_instance.connect.assert_called_once()
mock_client_instance.pair.assert_not_called()
mock_client_instance.start_notify.assert_called_once_with(
UART_TX_CHAR_UUID, ble_conn.handle_rx
)
self.assertEqual(result, address)
@patch("meshcore.ble_cx.BleakClient")
def test_ble_connection_pin_constructor_parameter(self, mock_bleak_client):
"""Test that PIN parameter is properly stored in constructor"""
# Arrange
address = "00:11:22:33:44:55"
pin = "654321"
# Act
ble_conn = BLEConnection(address=address, pin=pin)
# Assert
self.assertEqual(ble_conn.pin, pin)
self.assertEqual(ble_conn.address, address)
def _get_mock_bleak_client(self):
"""
Creates a mock BleakClient instance with all the necessary async methods and attributes.
"""
mock_client = MagicMock()
mock_client.address = "00:11:22:33:44:55"
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.start_notify = AsyncMock()
mock_client.write_gatt_char = AsyncMock()
mock_client.pair = AsyncMock()
mock_client.is_connected = True
mock_service = MagicMock()
mock_char = MagicMock()
mock_char.uuid = UART_RX_CHAR_UUID
mock_char.write_gatt_char = mock_client.write_gatt_char
mock_service.get_characteristic.return_value = mock_char
mock_client.services.get_service.return_value = mock_service
return mock_client
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,35 @@
import unittest
from meshcore.ble_cx import BLEConnection
class TestMeshCoreBLEPin(unittest.TestCase):
"""Test MeshCore BLE PIN pairing functionality"""
def test_ble_connection_pin_parameter_propagation(self):
"""Test that PIN parameter is properly passed to BLEConnection"""
# Arrange
address = "00:11:22:33:44:55"
pin = "654321"
# Act
connection = BLEConnection(address=address, pin=pin)
# Assert
self.assertEqual(connection.pin, pin)
self.assertEqual(connection.address, address)
def test_ble_connection_pin_none_by_default(self):
"""Test that PIN is None by default when not specified"""
# Arrange
address = "00:11:22:33:44:55"
# Act
connection = BLEConnection(address=address)
# Assert
self.assertIsNone(connection.pin)
self.assertEqual(connection.address, address)
if __name__ == "__main__":
unittest.main()