Handle error events properly in commands

This commit is contained in:
Alex Wolden 2025-04-14 09:03:56 -07:00
parent fa8ebcb50a
commit 52553a41bd
4 changed files with 117 additions and 26 deletions

View file

@ -95,11 +95,32 @@ class CommandHandler:
expected_events = [expected_events]
logger.debug(f"Waiting for events {expected_events}, timeout={timeout}")
# Create futures for all expected events
futures = []
for event_type in expected_events:
# don't apply any filters for now, might change later
event = await self.dispatcher.wait_for_event(event_type, {}, timeout)
future = asyncio.create_task(
self.dispatcher.wait_for_event(event_type, {}, timeout)
)
futures.append(future)
# Wait for the first event to complete or all to timeout
done, pending = await asyncio.wait(
futures,
timeout=timeout,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel all pending futures
for future in pending:
future.cancel()
# Check if any future completed successfully
for future in done:
event = await future
if event:
return event.payload
return {"success": False, "reason": "no_event_received"}
except asyncio.TimeoutError:
logger.debug(f"Command timed out {data}")

View file

@ -116,10 +116,11 @@ class EventDispatcher:
event = await self.queue.get()
logger.debug(f"Dispatching event: {event.type}, {event.payload}, {event.attributes}")
for subscription in self.subscriptions.copy():
logger.debug(f"Checking subscription: {subscription.event_type}, {subscription.attribute_filters}")
# Check if event type matches
if subscription.event_type is None or subscription.event_type == event.type:
# Check if all attribute filters match
if subscription.attribute_filters:
if subscription.attribute_filters and subscription.attribute_filters != {}:
# Skip if any filter doesn't match the corresponding event attribute
if not all(event.attributes.get(key) == value
for key, value in subscription.attribute_filters.items()):