diff --git a/pytest.ini b/pytest.ini index d280de0..eea2c18 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1 @@ [pytest] -asyncio_mode = auto \ No newline at end of file diff --git a/src/meshcore/ble_cx.py b/src/meshcore/ble_cx.py index e68d7b4..517ef1b 100644 --- a/src/meshcore/ble_cx.py +++ b/src/meshcore/ble_cx.py @@ -1,57 +1,80 @@ -""" - mccli.py : CLI interface to MeschCore BLE companion app """ +mccli.py : CLI interface to MeschCore BLE companion app +""" + import asyncio import logging -# Get logger -logger = logging.getLogger("meshcore") - from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData from bleak.exc import BleakDeviceNotFoundError +# Get logger +logger = logging.getLogger("meshcore") + UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" + class BLEConnection: - def __init__(self, address): - """ Constructor : specify address """ + def __init__(self, address=None, client=None): + """ + Constructor: specify address or an existing BleakClient. + + Args: + address (str, optional): The Bluetooth address of the device. + client (BleakClient, optional): An existing BleakClient instance. + """ self.address = address self._user_provided_address = address - self.client = None + self.client = client self.rx_char = None self._disconnect_callback = None async def connect(self): """ - Connects to the device + Connects to the device. - Returns : the address used for connection + If a BleakClient was provided to the constructor, it uses that. + Otherwise, it will scan or connect based on the provided address. + + Returns: + The address used for connection, or None on failure. """ - logger.debug(f"Connecting existing connection: {self.client} with address {self.address}") - def match_meshcore_device(_: BLEDevice, adv: AdvertisementData): - """ Filter to mach MeshCore devices """ - if not adv.local_name is None\ - and adv.local_name.startswith("MeshCore")\ - and (self.address is None or self.address in adv.local_name) : - return True - return False + logger.debug(f"Connecting with client: {self.client}, address: {self.address}") - if self.address is None or self.address == "" or len(self.address.split(":")) != 6: - scanner = BleakScanner() - logger.info("Scanning for devices") - device = await scanner.find_device_by_filter(match_meshcore_device) - if device is None: - return None - logger.info(f"Found device : {device}") - self.client = BleakClient(device, disconnected_callback=self.handle_disconnect) + if self.client: + logger.debug("Using pre-configured BleakClient.") + # If a client is already provided, ensure its disconnect callback is set + self.client._disconnected_callback = self.handle_disconnect self.address = self.client.address else: - self.client = BleakClient(self.address, disconnected_callback=self.handle_disconnect) + + def match_meshcore_device(_: BLEDevice, adv: AdvertisementData): + """Filter to match MeshCore devices.""" + if adv.local_name and adv.local_name.startswith("MeshCore"): + if self.address is None or self.address in adv.local_name: + return True + return False + + if self.address is None or ":" not in self.address: + logger.info("Scanning for devices...") + device = await BleakScanner.find_device_by_filter(match_meshcore_device) + if device is None: + logger.warning("No MeshCore device found during scan.") + return None + logger.info(f"Found device: {device}") + self.client = BleakClient( + device, disconnected_callback=self.handle_disconnect + ) + self.address = self.client.address + else: + self.client = BleakClient( + self.address, disconnected_callback=self.handle_disconnect + ) try: await self.client.connect() @@ -72,24 +95,26 @@ class BLEConnection: return self.address def handle_disconnect(self, client: BleakClient): - """ Callback to handle disconnection """ - logger.debug(f"BLE device disconnected: {client.address} (is_connected: {client.is_connected})") + """Callback to handle disconnection""" + logger.debug( + f"BLE device disconnected: {client.address} (is_connected: {client.is_connected})" + ) # Reset the address we found to what user specified # this allows to reconnect to the same device self.address = self._user_provided_address - + if self._disconnect_callback: asyncio.create_task(self._disconnect_callback("ble_disconnect")) - + def set_disconnect_callback(self, callback): """Set callback to handle disconnections.""" self._disconnect_callback = callback - def set_reader(self, reader) : + def set_reader(self, reader): self.reader = reader def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray): - if not self.reader is None: + if self.reader is not None: asyncio.create_task(self.reader.handle_rx(data)) async def send(self, data): @@ -99,8 +124,8 @@ class BLEConnection: if not self.rx_char: logger.error("RX characteristic not found") return False - await self.client.write_gatt_char(self.rx_char, bytes(data), response=False) - + await self.client.write_gatt_char(self.rx_char, bytes(data), response=True) + async def disconnect(self): """Disconnect from the BLE device.""" if self.client and self.client.is_connected: diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index 0ce7a6c..699cc92 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -83,14 +83,19 @@ class MeshCore: return mc @classmethod - async def create_ble(cls, address: Optional[str] = None, debug: bool = False, only_error:bool=False, default_timeout=None, + async def create_ble(cls, address: Optional[str] = None, client=None, debug: bool = False, only_error:bool=False, default_timeout=None, auto_reconnect: bool = False, max_reconnect_attempts: int = 3) -> 'MeshCore': - """Create and connect a MeshCore instance using BLE connection - - If address is None, it will scan for and connect to the first available MeshCore device. + """ + Create and connect a MeshCore instance using BLE connection. + + Args: + address (str, optional): The Bluetooth address of the device. + client (BleakClient, optional): An existing BleakClient instance to use. + If provided, 'address' is ignored for connection + but can be used for identification. """ - connection = BLEConnection(address) + connection = BLEConnection(address=address, client=client) mc = cls(connection, debug=debug, only_error=only_error, default_timeout=default_timeout, auto_reconnect=auto_reconnect, max_reconnect_attempts=max_reconnect_attempts) diff --git a/tests/test_ble_connection.py b/tests/test_ble_connection.py new file mode 100644 index 0000000..dcd7c56 --- /dev/null +++ b/tests/test_ble_connection.py @@ -0,0 +1,71 @@ +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from meshcore.ble_cx import BLEConnection, UART_SERVICE_UUID, UART_TX_CHAR_UUID, UART_RX_CHAR_UUID + +class TestBLEConnection(unittest.TestCase): + @patch('meshcore.ble_cx.BleakClient') + def test_ble_connection_and_disconnection(self, mock_bleak_client): + """ + Tests the BLEConnection class for connecting and disconnecting from a BLE device. + """ + # Arrange + mock_client_instance = self._get_mock_bleak_client() + mock_bleak_client.return_value = mock_client_instance + + address = "00:11:22:33:44:55" + ble_conn = BLEConnection(address=address) + + # Act + asyncio.run(ble_conn.connect()) + asyncio.run(ble_conn.disconnect()) + + # Assert + mock_client_instance.connect.assert_called_once() + mock_client_instance.start_notify.assert_called_once_with(UART_TX_CHAR_UUID, ble_conn.handle_rx) + mock_client_instance.disconnect.assert_called_once() + + @patch('meshcore.ble_cx.BleakClient') + def test_send_data(self, mock_bleak_client): + """ + Tests the send method of the BLEConnection class. + """ + # Arrange + mock_client_instance = self._get_mock_bleak_client() + mock_bleak_client.return_value = mock_client_instance + + address = "00:11:22:33:44:55" + ble_conn = BLEConnection(address=address) + asyncio.run(ble_conn.connect()) + + # Act + data_to_send = b"Hello, BLE" + asyncio.run(ble_conn.send(data_to_send)) + + # Assert + ble_conn.rx_char.write_gatt_char.assert_called_once_with(ble_conn.rx_char, data_to_send, response=False) + + def _get_mock_bleak_client(self): + """ + Creates a mock BleakClient instance with all the necessary async methods and attributes. + """ + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.start_notify = AsyncMock() + mock_client.write_gatt_char = AsyncMock() + mock_client.is_connected = True + + mock_service = MagicMock() + mock_char = MagicMock() + mock_char.uuid = UART_RX_CHAR_UUID + mock_char.write_gatt_char = mock_client.write_gatt_char + + mock_service.get_characteristic.return_value = mock_char + mock_client.services.get_service.return_value = mock_service + + return mock_client + +if __name__ == '__main__': + unittest.main()