meshcore_py/src/meshcore/events.py
2025-06-30 20:23:35 -07:00

200 lines
7.1 KiB
Python

from enum import Enum
import logging
from math import log
from typing import Any, Dict, Optional, Callable, List, Union
import asyncio
from dataclasses import dataclass, field
logger = logging.getLogger("meshcore")
# Public event types for users to subscribe to
class EventType(Enum):
CONTACTS = "contacts"
SELF_INFO = "self_info"
CONTACT_MSG_RECV = "contact_message"
CHANNEL_MSG_RECV = "channel_message"
CURRENT_TIME = "time_update"
NO_MORE_MSGS = "no_more_messages"
CONTACT_URI = "contact_uri"
BATTERY = "battery_info"
DEVICE_INFO = "device_info"
CLI_RESPONSE = "cli_response"
MSG_SENT = "message_sent"
# Push notifications
ADVERTISEMENT = "advertisement"
PATH_UPDATE = "path_update"
ACK = "acknowledgement"
MESSAGES_WAITING = "messages_waiting"
RAW_DATA = "raw_data"
LOGIN_SUCCESS = "login_success"
LOGIN_FAILED = "login_failed"
STATUS_RESPONSE = "status_response"
LOG_DATA = "log_data"
TRACE_DATA = "trace_data"
RX_LOG_DATA = "rx_log_data"
TELEMETRY_RESPONSE = "telemetry_response"
CUSTOM_VARS = "custom_vars"
CHANNEL_INFO = "channel_info"
# Command response types
OK = "command_ok"
ERROR = "command_error"
# Connection events
CONNECTED = "connected"
DISCONNECTED = "disconnected"
@dataclass
class Event:
type: EventType
payload: Any
attributes: Dict[str, Any] = field(default_factory=dict)
def __init__(self, type: EventType, payload: Any, attributes: Optional[Dict[str, Any]] = None, **kwargs):
"""
Initialize an Event
Args:
type: The event type
payload: The event payload
attributes: Dictionary of event attributes for filtering
**kwargs: Additional attributes to add to the attributes dictionary
"""
self.type = type
self.payload = payload
self.attributes = attributes or {}
# Add any keyword arguments to the attributes dictionary
if kwargs:
self.attributes.update(kwargs)
def clone(self):
"""
Create a copy of the event.
Returns:
A new Event object with the same type, payload, and attributes.
"""
copied_payload = self.payload.copy() if isinstance(self.payload, dict) else self.payload
return Event(self.type, copied_payload, self.attributes.copy())
class Subscription:
def __init__(self, dispatcher, event_type, callback, attribute_filters=None):
self.dispatcher = dispatcher
self.event_type = event_type
self.callback = callback
self.attribute_filters = attribute_filters or {}
def unsubscribe(self):
self.dispatcher._remove_subscription(self)
class EventDispatcher:
def __init__(self):
self.queue: asyncio.Queue[Event] = asyncio.Queue()
self.subscriptions: List[Subscription] = []
self.running = False
self._task = None
def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]],
attribute_filters: Optional[Dict[str, Any]] = None) -> Subscription:
"""
Subscribe to events with optional attribute filtering.
Parameters:
-----------
event_type : EventType or None
The type of event to subscribe to, or None to subscribe to all events.
callback : Callable
Function to call when a matching event is received.
attribute_filters : Dict[str, Any], optional
Dictionary of attribute key-value pairs that must match for the event to trigger the callback.
Returns:
--------
Subscription object that can be used to unsubscribe.
"""
subscription = Subscription(self, event_type, callback, attribute_filters)
self.subscriptions.append(subscription)
return subscription
def _remove_subscription(self, subscription: Subscription):
if subscription in self.subscriptions:
self.subscriptions.remove(subscription)
async def dispatch(self, event: Event):
await self.queue.put(event)
async def _process_events(self):
while self.running:
event = await self.queue.get()
logger.debug(f"Dispatching event: {event.type}, {event.payload}, {event.attributes}")
for subscription in self.subscriptions.copy():
# Check if event type matches
if subscription.event_type is None or subscription.event_type == event.type:
# Check if all attribute filters match
if subscription.attribute_filters and subscription.attribute_filters != {}:
# Skip if any filter doesn't match the corresponding event attribute
if not all(event.attributes.get(key) == value
for key, value in subscription.attribute_filters.items()):
continue
try:
result = subscription.callback(event.clone())
if asyncio.iscoroutine(result):
await result
except Exception as e:
print(f"Error in event handler: {e}")
self.queue.task_done()
async def start(self):
if not self.running:
self.running = True
self._task = asyncio.create_task(self._process_events())
async def stop(self):
if self.running:
self.running = False
if self._task:
await self.queue.join()
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None,
timeout: float | None = None) -> Optional[Event]:
"""
Wait for an event of the specified type that matches all attribute filters.
Parameters:
-----------
event_type : EventType
The type of event to wait for.
attribute_filters : Dict[str, Any], optional
Dictionary of attribute key-value pairs that must match for the event to be returned.
timeout : float | None, optional
Maximum time to wait for the event, in seconds.
Returns:
--------
The matched event, or None if timeout occurred before a matching event.
"""
future = asyncio.Future()
def event_handler(event: Event):
if not future.done():
future.set_result(event)
subscription = self.subscribe(event_type, event_handler, attribute_filters)
try:
return await asyncio.wait_for(future, timeout)
except asyncio.TimeoutError:
return None
finally:
subscription.unsubscribe()