From 8b3149cb7eb0da0c10d94a4e474d9fbec6205088 Mon Sep 17 00:00:00 2001 From: pe1hvh Date: Mon, 9 Feb 2026 22:53:27 +0100 Subject: [PATCH] fix: subscribe before send to prevent event race condition --- src/meshcore/commands/base.py | 87 +++++++++++++++++++++++++++++++---- 1 file changed, 78 insertions(+), 9 deletions(-) diff --git a/src/meshcore/commands/base.py b/src/meshcore/commands/base.py index 4625473..6e89ab1 100644 --- a/src/meshcore/commands/base.py +++ b/src/meshcore/commands/base.py @@ -140,6 +140,10 @@ class CommandHandlerBase: """ Send a command and wait for expected event responses. + Uses subscribe-before-send to avoid race conditions where the + device responds before the event listener is registered. This + mirrors the pattern used by the companion apps (JS/iOS/Android). + Args: data: The data to send expected_events: EventType or list of EventTypes to wait for @@ -154,17 +158,82 @@ class CommandHandlerBase: # Use the provided timeout or fall back to default_timeout timeout = timeout if timeout is not None else self.default_timeout - if self._sender_func: - logger.debug( - f"Sending raw data: {data.hex() if isinstance(data, bytes) else data}" - ) - await self._sender_func(data) - if expected_events: - # For commands that don't expect events, return a success event - return await self.wait_for_events(expected_events, timeout) + # ── Subscribe BEFORE sending ────────────────────────── + # Register event listeners first so we never miss a fast + # device response, even on a busy mesh network where the + # asyncio event loop processes RX_LOG events in between. + if not isinstance(expected_events, list): + expected_events = [expected_events] - return Event(EventType.OK, {}) + futures: List[asyncio.Future] = [] + subscriptions = [] + + loop = asyncio.get_event_loop() + for event_type in expected_events: + future = loop.create_future() + + def _handler(event: Event, f: asyncio.Future = future) -> None: + if not f.done(): + f.set_result(event) + + sub = self.dispatcher.subscribe(event_type, _handler) + futures.append(future) + subscriptions.append(sub) + + try: + # ── Now send the command ────────────────────────── + if self._sender_func: + logger.debug( + f"Sending raw data: " + f"{data.hex() if isinstance(data, bytes) else data}" + ) + await self._sender_func(data) + + # ── Wait for the first matching event ───────────── + done, pending = await asyncio.wait( + futures, + timeout=timeout, + return_when=asyncio.FIRST_COMPLETED, + ) + + # Cancel futures we no longer need + for f in pending: + f.cancel() + + # Return the first successfully received event + for f in done: + try: + event = f.result() + if event: + return event + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + + return Event(EventType.ERROR, {"reason": "no_event_received"}) + + except asyncio.TimeoutError: + logger.debug( + f"Command timed out waiting for events {expected_events}" + ) + return Event(EventType.ERROR, {"reason": "timeout"}) + except Exception as e: + logger.debug(f"Command error: {e}") + return Event(EventType.ERROR, {"error": str(e)}) + finally: + # Always clean up subscriptions + for sub in subscriptions: + sub.unsubscribe() + + else: + # Fire-and-forget commands (no expected response) + if self._sender_func: + logger.debug( + f"Sending raw data: " + f"{data.hex() if isinstance(data, bytes) else data}" + ) + await self._sender_func(data) + return Event(EventType.OK, {}) # attached at base because its a common method async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, context={}, timeout=None, min_timeout=0) -> Event: