mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-04-20 22:13:49 +00:00
fix: subscribe before send to prevent event race condition
This commit is contained in:
parent
debe6b8770
commit
8b3149cb7e
1 changed files with 78 additions and 9 deletions
|
|
@ -140,6 +140,10 @@ class CommandHandlerBase:
|
|||
"""
|
||||
Send a command and wait for expected event responses.
|
||||
|
||||
Uses subscribe-before-send to avoid race conditions where the
|
||||
device responds before the event listener is registered. This
|
||||
mirrors the pattern used by the companion apps (JS/iOS/Android).
|
||||
|
||||
Args:
|
||||
data: The data to send
|
||||
expected_events: EventType or list of EventTypes to wait for
|
||||
|
|
@ -154,17 +158,82 @@ class CommandHandlerBase:
|
|||
# Use the provided timeout or fall back to default_timeout
|
||||
timeout = timeout if timeout is not None else self.default_timeout
|
||||
|
||||
if self._sender_func:
|
||||
logger.debug(
|
||||
f"Sending raw data: {data.hex() if isinstance(data, bytes) else data}"
|
||||
)
|
||||
await self._sender_func(data)
|
||||
|
||||
if expected_events:
|
||||
# For commands that don't expect events, return a success event
|
||||
return await self.wait_for_events(expected_events, timeout)
|
||||
# ── Subscribe BEFORE sending ──────────────────────────
|
||||
# Register event listeners first so we never miss a fast
|
||||
# device response, even on a busy mesh network where the
|
||||
# asyncio event loop processes RX_LOG events in between.
|
||||
if not isinstance(expected_events, list):
|
||||
expected_events = [expected_events]
|
||||
|
||||
return Event(EventType.OK, {})
|
||||
futures: List[asyncio.Future] = []
|
||||
subscriptions = []
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
for event_type in expected_events:
|
||||
future = loop.create_future()
|
||||
|
||||
def _handler(event: Event, f: asyncio.Future = future) -> None:
|
||||
if not f.done():
|
||||
f.set_result(event)
|
||||
|
||||
sub = self.dispatcher.subscribe(event_type, _handler)
|
||||
futures.append(future)
|
||||
subscriptions.append(sub)
|
||||
|
||||
try:
|
||||
# ── Now send the command ──────────────────────────
|
||||
if self._sender_func:
|
||||
logger.debug(
|
||||
f"Sending raw data: "
|
||||
f"{data.hex() if isinstance(data, bytes) else data}"
|
||||
)
|
||||
await self._sender_func(data)
|
||||
|
||||
# ── Wait for the first matching event ─────────────
|
||||
done, pending = await asyncio.wait(
|
||||
futures,
|
||||
timeout=timeout,
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
|
||||
# Cancel futures we no longer need
|
||||
for f in pending:
|
||||
f.cancel()
|
||||
|
||||
# Return the first successfully received event
|
||||
for f in done:
|
||||
try:
|
||||
event = f.result()
|
||||
if event:
|
||||
return event
|
||||
except (asyncio.CancelledError, asyncio.InvalidStateError):
|
||||
pass
|
||||
|
||||
return Event(EventType.ERROR, {"reason": "no_event_received"})
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.debug(
|
||||
f"Command timed out waiting for events {expected_events}"
|
||||
)
|
||||
return Event(EventType.ERROR, {"reason": "timeout"})
|
||||
except Exception as e:
|
||||
logger.debug(f"Command error: {e}")
|
||||
return Event(EventType.ERROR, {"error": str(e)})
|
||||
finally:
|
||||
# Always clean up subscriptions
|
||||
for sub in subscriptions:
|
||||
sub.unsubscribe()
|
||||
|
||||
else:
|
||||
# Fire-and-forget commands (no expected response)
|
||||
if self._sender_func:
|
||||
logger.debug(
|
||||
f"Sending raw data: "
|
||||
f"{data.hex() if isinstance(data, bytes) else data}"
|
||||
)
|
||||
await self._sender_func(data)
|
||||
return Event(EventType.OK, {})
|
||||
|
||||
# attached at base because its a common method
|
||||
async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, context={}, timeout=None, min_timeout=0) -> Event:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue