mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-04-20 22:13:49 +00:00
Add event filtering to support ACK tracking
This commit is contained in:
parent
478bcd92c1
commit
6dc87bafbb
9 changed files with 325 additions and 80 deletions
|
|
@ -1,5 +1,6 @@
|
|||
from enum import Enum
|
||||
import logging
|
||||
from math import log
|
||||
from typing import Any, Dict, Optional, Callable, List, Union
|
||||
import asyncio
|
||||
from dataclasses import dataclass, field
|
||||
|
|
@ -44,16 +45,31 @@ class Event:
|
|||
payload: Any
|
||||
attributes: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self):
|
||||
if self.attributes is None:
|
||||
self.attributes = {}
|
||||
def __init__(self, type: EventType, payload: Any, attributes: Optional[Dict[str, Any]] = None, **kwargs):
|
||||
"""
|
||||
Initialize an Event
|
||||
|
||||
Args:
|
||||
type: The event type
|
||||
payload: The event payload
|
||||
attributes: Dictionary of event attributes for filtering
|
||||
**kwargs: Additional attributes to add to the attributes dictionary
|
||||
"""
|
||||
self.type = type
|
||||
self.payload = payload
|
||||
self.attributes = attributes or {}
|
||||
|
||||
# Add any keyword arguments to the attributes dictionary
|
||||
if kwargs:
|
||||
self.attributes.update(kwargs)
|
||||
|
||||
|
||||
class Subscription:
|
||||
def __init__(self, dispatcher, event_type, callback):
|
||||
def __init__(self, dispatcher, event_type, callback, attribute_filters=None):
|
||||
self.dispatcher = dispatcher
|
||||
self.event_type = event_type
|
||||
self.callback = callback
|
||||
self.attribute_filters = attribute_filters or {}
|
||||
|
||||
def unsubscribe(self):
|
||||
self.dispatcher._remove_subscription(self)
|
||||
|
|
@ -66,8 +82,25 @@ class EventDispatcher:
|
|||
self.running = False
|
||||
self._task = None
|
||||
|
||||
def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]]) -> Subscription:
|
||||
subscription = Subscription(self, event_type, callback)
|
||||
def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]],
|
||||
attribute_filters: Optional[Dict[str, Any]] = None) -> Subscription:
|
||||
"""
|
||||
Subscribe to events with optional attribute filtering.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
event_type : EventType or None
|
||||
The type of event to subscribe to, or None to subscribe to all events.
|
||||
callback : Callable
|
||||
Function to call when a matching event is received.
|
||||
attribute_filters : Dict[str, Any], optional
|
||||
Dictionary of attribute key-value pairs that must match for the event to trigger the callback.
|
||||
|
||||
Returns:
|
||||
--------
|
||||
Subscription object that can be used to unsubscribe.
|
||||
"""
|
||||
subscription = Subscription(self, event_type, callback, attribute_filters)
|
||||
self.subscriptions.append(subscription)
|
||||
return subscription
|
||||
|
||||
|
|
@ -81,9 +114,16 @@ class EventDispatcher:
|
|||
async def _process_events(self):
|
||||
while self.running:
|
||||
event = await self.queue.get()
|
||||
logger.debug(f"Dispatching event: {event.type}, {event.payload}")
|
||||
logger.debug(f"Dispatching event: {event.type}, {event.payload}, {event.attributes}")
|
||||
for subscription in self.subscriptions.copy():
|
||||
# Check if event type matches
|
||||
if subscription.event_type is None or subscription.event_type == event.type:
|
||||
# Check if all attribute filters match
|
||||
if subscription.attribute_filters:
|
||||
# Skip if any filter doesn't match the corresponding event attribute
|
||||
if not all(event.attributes.get(key) == value
|
||||
for key, value in subscription.attribute_filters.items()):
|
||||
continue
|
||||
try:
|
||||
result = subscription.callback(event)
|
||||
if asyncio.iscoroutine(result):
|
||||
|
|
@ -110,14 +150,31 @@ class EventDispatcher:
|
|||
pass
|
||||
self._task = None
|
||||
|
||||
async def wait_for_event(self, event_type: EventType, timeout: float | None = None) -> Optional[Event]:
|
||||
async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None,
|
||||
timeout: float | None = None) -> Optional[Event]:
|
||||
"""
|
||||
Wait for an event of the specified type that matches all attribute filters.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
event_type : EventType
|
||||
The type of event to wait for.
|
||||
attribute_filters : Dict[str, Any], optional
|
||||
Dictionary of attribute key-value pairs that must match for the event to be returned.
|
||||
timeout : float | None, optional
|
||||
Maximum time to wait for the event, in seconds.
|
||||
|
||||
Returns:
|
||||
--------
|
||||
The matched event, or None if timeout occurred before a matching event.
|
||||
"""
|
||||
future = asyncio.Future()
|
||||
|
||||
def event_handler(event: Event):
|
||||
if not future.done():
|
||||
future.set_result(event)
|
||||
|
||||
subscription = self.subscribe(event_type, event_handler)
|
||||
subscription = self.subscribe(event_type, event_handler, attribute_filters)
|
||||
|
||||
try:
|
||||
return await asyncio.wait_for(future, timeout)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue