Fix: Improved BLE Connection Logic on macOS

This commit is contained in:
Ventz Petkov 2025-08-05 07:50:59 -04:00
parent 999bf2ec8b
commit c19fd166f8
4 changed files with 119 additions and 29 deletions

View file

@ -1,2 +1 @@
[pytest]
asyncio_mode = auto

View file

@ -18,40 +18,55 @@ UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
class BLEConnection:
def __init__(self, address):
""" Constructor : specify address """
def __init__(self, address=None, client=None):
"""
Constructor: specify address or an existing BleakClient.
Args:
address (str, optional): The Bluetooth address of the device.
client (BleakClient, optional): An existing BleakClient instance.
"""
self.address = address
self._user_provided_address = address
self.client = None
self.client = client
self.rx_char = None
self._disconnect_callback = None
async def connect(self):
"""
Connects to the device
Connects to the device.
Returns : the address used for connection
If a BleakClient was provided to the constructor, it uses that.
Otherwise, it will scan or connect based on the provided address.
Returns:
The address used for connection, or None on failure.
"""
logger.debug(f"Connecting existing connection: {self.client} with address {self.address}")
def match_meshcore_device(_: BLEDevice, adv: AdvertisementData):
""" Filter to mach MeshCore devices """
if not adv.local_name is None\
and adv.local_name.startswith("MeshCore")\
and (self.address is None or self.address in adv.local_name) :
return True
return False
logger.debug(f"Connecting with client: {self.client}, address: {self.address}")
if self.address is None or self.address == "" or len(self.address.split(":")) != 6:
scanner = BleakScanner()
logger.info("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device)
if device is None:
return None
logger.info(f"Found device : {device}")
self.client = BleakClient(device, disconnected_callback=self.handle_disconnect)
self.address = self.client.address
if self.client:
logger.debug("Using pre-configured BleakClient.")
# If a client is already provided, ensure its disconnect callback is set
self.client._disconnected_callback = self.handle_disconnect
else:
self.client = BleakClient(self.address, disconnected_callback=self.handle_disconnect)
def match_meshcore_device(_: BLEDevice, adv: AdvertisementData):
"""Filter to match MeshCore devices."""
if adv.local_name and adv.local_name.startswith("MeshCore"):
if self.address is None or self.address in adv.local_name:
return True
return False
if self.address is None or ":" not in self.address:
logger.info("Scanning for devices...")
device = await BleakScanner.find_device_by_filter(match_meshcore_device)
if device is None:
logger.warning("No MeshCore device found during scan.")
return None
logger.info(f"Found device: {device}")
self.client = BleakClient(device, disconnected_callback=self.handle_disconnect)
self.address = self.client.address
else:
self.client = BleakClient(self.address, disconnected_callback=self.handle_disconnect)
try:
await self.client.connect()

View file

@ -83,14 +83,19 @@ class MeshCore:
return mc
@classmethod
async def create_ble(cls, address: Optional[str] = None, debug: bool = False, only_error:bool=False, default_timeout=None,
async def create_ble(cls, address: Optional[str] = None, client=None, debug: bool = False, only_error:bool=False, default_timeout=None,
auto_reconnect: bool = False, max_reconnect_attempts: int = 3) -> 'MeshCore':
"""Create and connect a MeshCore instance using BLE connection
If address is None, it will scan for and connect to the first available MeshCore device.
"""
Create and connect a MeshCore instance using BLE connection.
Args:
address (str, optional): The Bluetooth address of the device.
client (BleakClient, optional): An existing BleakClient instance to use.
If provided, 'address' is ignored for connection
but can be used for identification.
"""
connection = BLEConnection(address)
connection = BLEConnection(address=address, client=client)
mc = cls(connection, debug=debug, only_error=only_error, default_timeout=default_timeout,
auto_reconnect=auto_reconnect, max_reconnect_attempts=max_reconnect_attempts)

View file

@ -0,0 +1,71 @@
import asyncio
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
from meshcore.ble_cx import BLEConnection, UART_SERVICE_UUID, UART_TX_CHAR_UUID, UART_RX_CHAR_UUID
class TestBLEConnection(unittest.TestCase):
@patch('meshcore.ble_cx.BleakClient')
def test_ble_connection_and_disconnection(self, mock_bleak_client):
"""
Tests the BLEConnection class for connecting and disconnecting from a BLE device.
"""
# Arrange
mock_client_instance = self._get_mock_bleak_client()
mock_bleak_client.return_value = mock_client_instance
address = "00:11:22:33:44:55"
ble_conn = BLEConnection(address=address)
# Act
asyncio.run(ble_conn.connect())
asyncio.run(ble_conn.disconnect())
# Assert
mock_client_instance.connect.assert_called_once()
mock_client_instance.start_notify.assert_called_once_with(UART_TX_CHAR_UUID, ble_conn.handle_rx)
mock_client_instance.disconnect.assert_called_once()
@patch('meshcore.ble_cx.BleakClient')
def test_send_data(self, mock_bleak_client):
"""
Tests the send method of the BLEConnection class.
"""
# Arrange
mock_client_instance = self._get_mock_bleak_client()
mock_bleak_client.return_value = mock_client_instance
address = "00:11:22:33:44:55"
ble_conn = BLEConnection(address=address)
asyncio.run(ble_conn.connect())
# Act
data_to_send = b"Hello, BLE"
asyncio.run(ble_conn.send(data_to_send))
# Assert
ble_conn.rx_char.write_gatt_char.assert_called_once_with(ble_conn.rx_char, data_to_send, response=False)
def _get_mock_bleak_client(self):
"""
Creates a mock BleakClient instance with all the necessary async methods and attributes.
"""
mock_client = MagicMock()
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.start_notify = AsyncMock()
mock_client.write_gatt_char = AsyncMock()
mock_client.is_connected = True
mock_service = MagicMock()
mock_char = MagicMock()
mock_char.uuid = UART_RX_CHAR_UUID
mock_char.write_gatt_char = mock_client.write_gatt_char
mock_service.get_characteristic.return_value = mock_char
mock_client.services.get_service.return_value = mock_service
return mock_client
if __name__ == '__main__':
unittest.main()