diff --git a/README.md b/README.md index b4f19cb..bea2c49 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,47 @@ meshcore.subscribe(EventType.ADVERTISEMENT, handle_advert) meshcore.unsubscribe(subscription) ``` +#### Filtering Events by Attributes + +Filter events based on their attributes to handle only specific ones: + +```python +# Subscribe only to messages from a specific contact +async def handle_specific_contact_messages(event): + print(f"Message from Alice: {event.payload['text']}") + +contact = meshcore.get_contact_by_name("Alice") +if contact: + alice_subscription = meshcore.subscribe( + EventType.CONTACT_MSG_RECV, + handle_specific_contact_messages, + attribute_filters={"pubkey_prefix": contact["public_key"][:12]} + ) + +# Send a message and wait for its specific acknowledgment +async def send_and_confirm_message(meshcore, dst_key, message): + # Send the message and get information about the sent message + sent_result = await meshcore.commands.send_msg(dst_key, message) + + # Extract the expected acknowledgment code from the message sent event + expected_ack = sent_result["expected_ack"].hex() + print(f"Message sent, waiting for ack with code: {expected_ack}") + + # Wait specifically for this acknowledgment + result = await meshcore.wait_for_event( + EventType.ACK, + attribute_filters={"code": expected_ack}, + timeout=10.0 + ) + + if result: + print("Message confirmed delivered!") + return True + else: + print("Message delivery confirmation timed out") + return False +``` + ### Hybrid Approach (Commands + Events) Combine command-based and event-based styles: diff --git a/examples/serial_msg.py b/examples/serial_msg.py index 522a68f..cb21a10 100755 --- a/examples/serial_msg.py +++ b/examples/serial_msg.py @@ -1,23 +1,67 @@ #!/usr/bin/python +""" +Example of sending a message and waiting for its specific acknowledgment +using event attribute filtering. +""" import asyncio -from meshcore import MeshCore +import argparse +from meshcore import MeshCore, EventType -PORT = "/dev/tty.usbserial-583A0069501" -BAUDRATE = 115200 -DEST = "🦄" -MSG = "hello from serial" +async def main(): + # Parse command line arguments + parser = argparse.ArgumentParser(description="Send a message and wait for ACK") + parser.add_argument("-p", "--port", required=True, help="Serial port") + parser.add_argument("-b", "--baudrate", type=int, default=115200, help="Baud rate") + parser.add_argument("-d", "--dest", default="🦄", help="Destination contact name") + parser.add_argument("-m", "--message", default="hello from serial", help="Message to send") + parser.add_argument("--timeout", type=float, default=30.0, help="ACK timeout in seconds") + args = parser.parse_args() -async def main () : - mc = await MeshCore.create_serial(PORT, BAUDRATE) + # Connect to the device + mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True) - await mc.ensure_contacts() - contact = mc.get_contact_by_name(DEST) - if not contact: - print(f"Contact {DEST} not found") - return - await mc.commands.send_msg(bytes.fromhex(contact["public_key"])[0:6], MSG) - print ("Message sent ... awaiting") + try: + # Make sure we have contacts loaded + await mc.ensure_contacts() + + # Find the contact by name + contact = mc.get_contact_by_name(args.dest) + if not contact: + print(f"Contact '{args.dest}' not found. Available contacts:") + for name, c in mc.contacts.items(): + print(f"- {c.get('adv_name', 'Unknown')}") + return + + print(f"Found contact: {contact.get('adv_name')} ({contact['public_key'][:12]}...)") + + # Send the message and get the MSG_SENT event + print(f"Sending message: '{args.message}'") + send_result = await mc.commands.send_msg( + bytes.fromhex(contact["public_key"])[0:6], + args.message + ) + + # Extract the expected ACK code + expected_ack = send_result["expected_ack"].hex() + print(f"Message sent, waiting for ACK with code: {expected_ack}") + + # Wait for the specific ACK that matches our message + ack_event = await mc.wait_for_event( + EventType.ACK, + attribute_filters={"code": expected_ack}, + timeout=args.timeout + ) + + if ack_event: + print(f"✅ Message confirmed delivered! (ACK received)") + else: + print(f"⚠️ Timed out waiting for ACK after {args.timeout} seconds") + + finally: + # Always disconnect + await mc.disconnect() -asyncio.run(main()) +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/serial_trace.py b/examples/serial_trace.py index cd609e9..b169edd 100644 --- a/examples/serial_trace.py +++ b/examples/serial_trace.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import asyncio import argparse +import random from meshcore import MeshCore from meshcore.events import EventType @@ -29,14 +30,23 @@ async def main(): # Send trace packet print(f"Sending trace packet...") - result = await mc.commands.send_trace(path=args.path) + # Send trace with a path if provided + tag = random.randint(1, 0xFFFFFFFF) + result = await mc.commands.send_trace(path=args.path, tag=tag) - if result: - print("Trace packet sent successfully") - print("Waiting for trace response...") + # Check if the result has a success indicator + if result.get("success") == False: + print(f"Failed to send trace packet: {result.get('reason', 'unknown error')}") + elif result: + print(f"Trace packet sent successfully with tag={tag}") + print("Waiting for trace response matching our tag...") - # Wait for a trace response with 15-second timeout - event = await mc.wait_for_event(EventType.TRACE_DATA, timeout=15) + # Wait for a trace response with our specific tag + event = await mc.wait_for_event( + EventType.TRACE_DATA, + attribute_filters={"tag": tag}, + timeout=15 + ) if event: trace = event.payload diff --git a/examples/tcp_mchome_msg.py b/examples/tcp_mchome_msg.py index 45f36af..ec44d38 100755 --- a/examples/tcp_mchome_msg.py +++ b/examples/tcp_mchome_msg.py @@ -17,6 +17,6 @@ async def main () : await mc.connect() await mc.ensure_contacts() - await mc.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6],MSG) + await mc.commands.send_msg(bytes.fromhex(mc.get_contact_by_name(DEST)["public_key"])[0:6],MSG) asyncio.run(main()) diff --git a/examples/tcp_mchome_readmsgs.py b/examples/tcp_mchome_readmsgs.py index 4cafca9..59b2cde 100755 --- a/examples/tcp_mchome_readmsgs.py +++ b/examples/tcp_mchome_readmsgs.py @@ -18,8 +18,10 @@ async def main () : res = True while res: - res = await mc.commands.get_msg() - if res : - print (res) + result = await mc.commands.get_msg() + if result.get("success") == False: + res = False + print("No more messages") + print (result) asyncio.run(main()) diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py index 018cdbf..56e0f4b 100644 --- a/src/meshcore/commands.py +++ b/src/meshcore/commands.py @@ -1,8 +1,9 @@ import asyncio import logging -from typing import Any +from typing import Any, Dict from .events import EventType - +import random + logger = logging.getLogger("meshcore") class CommandHandler: @@ -25,7 +26,18 @@ class CommandHandler: def set_dispatcher(self, dispatcher): self.dispatcher = dispatcher - async def send(self, data, expected_events=None, timeout=None): + async def send(self, data, expected_events=None, timeout=None) -> Dict[str, Any]: + """ + Send a command and wait for expected event responses. + + Args: + data: The data to send + expected_events: EventType or list of EventTypes to wait for + timeout: Timeout in seconds, or None to use default_timeout + + Returns: + Dict[str, Any]: Dictionary containing the response data or status + """ if not self.dispatcher: raise RuntimeError("Dispatcher not set, cannot send commands") @@ -44,17 +56,18 @@ class CommandHandler: logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") for event_type in expected_events: - event = await self.dispatcher.wait_for_event(event_type, timeout) + # don't apply any filters for now, might change later + event = await self.dispatcher.wait_for_event(event_type, {}, timeout) if event: return event.payload - return False + return {"success": False, "reason": "no_event_received"} except asyncio.TimeoutError: logger.debug(f"Command timed out {data}") - return False + return {"success": False, "reason": "timeout"} except Exception as e: logger.debug(f"Command error: {e}") return {"error": str(e)} - return True + return {"success": True} async def send_appstart(self): @@ -222,8 +235,9 @@ class CommandHandler: """ # Generate random tag if not provided if tag is None: - import random tag = random.randint(1, 0xFFFFFFFF) + if auth_code is None: + auth_code = random.randint(1, 0xFFFFFFFF) logger.debug(f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path}") @@ -245,11 +259,11 @@ class CommandHandler: cmd_data.extend(path_bytes) except ValueError as e: logger.error(f"Invalid path format: {e}") - return False + return { "success": False, "reason": "invalid_path_format" } elif isinstance(path, (bytes, bytearray)): cmd_data.extend(path) else: logger.error(f"Unsupported path type: {type(path)}") - return False + return { "success": False, "reason": "unsupported_path_type" } return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR]) \ No newline at end of file diff --git a/src/meshcore/events.py b/src/meshcore/events.py index cd28d63..8a72664 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -1,5 +1,6 @@ from enum import Enum import logging +from math import log from typing import Any, Dict, Optional, Callable, List, Union import asyncio from dataclasses import dataclass, field @@ -44,16 +45,31 @@ class Event: payload: Any attributes: Dict[str, Any] = field(default_factory=dict) - def __post_init__(self): - if self.attributes is None: - self.attributes = {} + def __init__(self, type: EventType, payload: Any, attributes: Optional[Dict[str, Any]] = None, **kwargs): + """ + Initialize an Event + + Args: + type: The event type + payload: The event payload + attributes: Dictionary of event attributes for filtering + **kwargs: Additional attributes to add to the attributes dictionary + """ + self.type = type + self.payload = payload + self.attributes = attributes or {} + + # Add any keyword arguments to the attributes dictionary + if kwargs: + self.attributes.update(kwargs) class Subscription: - def __init__(self, dispatcher, event_type, callback): + def __init__(self, dispatcher, event_type, callback, attribute_filters=None): self.dispatcher = dispatcher self.event_type = event_type self.callback = callback + self.attribute_filters = attribute_filters or {} def unsubscribe(self): self.dispatcher._remove_subscription(self) @@ -66,8 +82,25 @@ class EventDispatcher: self.running = False self._task = None - def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]]) -> Subscription: - subscription = Subscription(self, event_type, callback) + def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], Union[None, asyncio.Future]], + attribute_filters: Optional[Dict[str, Any]] = None) -> Subscription: + """ + Subscribe to events with optional attribute filtering. + + Parameters: + ----------- + event_type : EventType or None + The type of event to subscribe to, or None to subscribe to all events. + callback : Callable + Function to call when a matching event is received. + attribute_filters : Dict[str, Any], optional + Dictionary of attribute key-value pairs that must match for the event to trigger the callback. + + Returns: + -------- + Subscription object that can be used to unsubscribe. + """ + subscription = Subscription(self, event_type, callback, attribute_filters) self.subscriptions.append(subscription) return subscription @@ -81,9 +114,16 @@ class EventDispatcher: async def _process_events(self): while self.running: event = await self.queue.get() - logger.debug(f"Dispatching event: {event.type}, {event.payload}") + logger.debug(f"Dispatching event: {event.type}, {event.payload}, {event.attributes}") for subscription in self.subscriptions.copy(): + # Check if event type matches if subscription.event_type is None or subscription.event_type == event.type: + # Check if all attribute filters match + if subscription.attribute_filters: + # Skip if any filter doesn't match the corresponding event attribute + if not all(event.attributes.get(key) == value + for key, value in subscription.attribute_filters.items()): + continue try: result = subscription.callback(event) if asyncio.iscoroutine(result): @@ -110,14 +150,31 @@ class EventDispatcher: pass self._task = None - async def wait_for_event(self, event_type: EventType, timeout: float | None = None) -> Optional[Event]: + async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None, + timeout: float | None = None) -> Optional[Event]: + """ + Wait for an event of the specified type that matches all attribute filters. + + Parameters: + ----------- + event_type : EventType + The type of event to wait for. + attribute_filters : Dict[str, Any], optional + Dictionary of attribute key-value pairs that must match for the event to be returned. + timeout : float | None, optional + Maximum time to wait for the event, in seconds. + + Returns: + -------- + The matched event, or None if timeout occurred before a matching event. + """ future = asyncio.Future() def event_handler(event: Event): if not future.done(): future.set_result(event) - subscription = self.subscribe(event_type, event_handler) + subscription = self.subscribe(event_type, event_handler, attribute_filters) try: return await asyncio.wait_for(future, timeout) diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index bd26b53..9d065f0 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -1,6 +1,6 @@ import asyncio import logging -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Union from .events import EventDispatcher, EventType from .reader import MessageReader @@ -99,18 +99,27 @@ class MeshCore: self.dispatcher.running = False self.dispatcher._task.cancel() - def subscribe(self, event_type: EventType, callback): + def subscribe(self, event_type: EventType, callback, attribute_filters: Optional[Dict[str, Any]] = None): """ - Subscribe to events using EventType enum + Subscribe to events using EventType enum with optional attribute filtering Args: event_type: Type of event to subscribe to, from EventType enum callback: Async function to call when event occurs + attribute_filters: Dictionary of attribute key-value pairs that must match for the event to trigger the callback Returns: Subscription object that can be used to unsubscribe + + Example: + # Subscribe to ACK events where the 'code' attribute has a specific value + mc.subscribe( + EventType.ACK, + my_callback_function, + attribute_filters={'code': 'SUCCESS'} + ) """ - return self.dispatcher.subscribe(event_type, callback) + return self.dispatcher.subscribe(event_type, callback, attribute_filters) def unsubscribe(self, subscription): """ @@ -122,22 +131,31 @@ class MeshCore: if subscription: subscription.unsubscribe() - async def wait_for_event(self, event_type: EventType, timeout=None): + async def wait_for_event(self, event_type: EventType, attribute_filters: Optional[Dict[str, Any]] = None, timeout=None): """ - Wait for an event using EventType enum + Wait for an event using EventType enum with optional attribute filtering Args: event_type: Type of event to wait for, from EventType enum + attribute_filters: Dictionary of attribute key-value pairs to match against the event timeout: Maximum time to wait in seconds, or None to use default_timeout Returns: Event object or None if timeout + + Example: + # Wait for an ACK event where the 'code' attribute has a specific value + await mc.wait_for_event( + EventType.ACK, + attribute_filters={'code': 'SUCCESS'}, + timeout=30.0 + ) """ # Use the provided timeout or fall back to default_timeout if timeout is None: timeout = self.default_timeout - return await self.dispatcher.wait_for_event(event_type, timeout) + return await self.dispatcher.wait_for_event(event_type, attribute_filters, timeout) def _setup_data_tracking(self): """Set up event subscriptions to track data internally""" @@ -148,7 +166,7 @@ class MeshCore: self._self_info = event.payload async def _update_time(event): - self._time = event.payload + self._time = event.payload.get("time", 0) # Subscribe to events to update internal state self.subscribe(EventType.CONTACTS, _update_contacts) @@ -244,7 +262,7 @@ class MeshCore: result = await self.commands.get_msg() # If we got a NO_MORE_MSGS event or an error, stop fetching - if not result or isinstance(result, dict) and "error" in result: + if not result.get("success") or isinstance(result, dict) and "error" in result: break # Small delay to prevent overwhelming the device diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index c779500..ebcdc03 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -22,19 +22,18 @@ class MessageReader: # Handle command responses if packet_type_value == PacketType.OK.value: - result = None + result: Dict[str, Any] = {"success": True} if len(data) == 5: - result = int.from_bytes(data[1:5], byteorder='little') - else: - result = True + result["value"] = int.from_bytes(data[1:5], byteorder='little') # Dispatch event for the OK response await self.dispatcher.dispatch(Event(EventType.OK, result)) elif packet_type_value == PacketType.ERROR.value: - result = False if len(data) > 1: - result = {"error_code": data[1]} + result = {"success": False, "error_code": data[1]} + else: + result = {"success": False} # Dispatch event for the ERROR response await self.dispatcher.dispatch(Event(EventType.ERROR, result)) @@ -84,7 +83,13 @@ class MessageReader: res["type"] = data[1] res["expected_ack"] = bytes(data[2:6]) res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') - await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res)) + + attributes = { + "type": res["type"], + "expected_ack": res["expected_ack"].hex() + } + + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: res = {} @@ -98,7 +103,13 @@ class MessageReader: res["text"] = data[17:].decode() else: res["text"] = data[13:].decode() - await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res)) + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, attributes)) elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} @@ -113,7 +124,13 @@ class MessageReader: res["text"] = data[20:].decode() else: res["text"] = data[16:].decode() - await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, {"extended": True})) + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, attributes)) elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: res = {} @@ -123,7 +140,13 @@ class MessageReader: res["txt_type"] = data[3] res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little') res["text"] = data[8:].decode() - await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res)) + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, attributes)) elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} @@ -134,21 +157,31 @@ class MessageReader: res["txt_type"] = data[6] res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little') res["text"] = data[11:].decode() - await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, {"extended": True})) + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"] + } + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, attributes)) elif packet_type_value == PacketType.CURRENT_TIME.value: - result = int.from_bytes(data[1:5], byteorder='little') + time_value = int.from_bytes(data[1:5], byteorder='little') + result = {"time": time_value} await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) elif packet_type_value == PacketType.NO_MORE_MSGS.value: - await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, False)) + result = {"messages_available": False} + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) elif packet_type_value == PacketType.CONTACT_SHARE.value: - result = "meshcore://" + data[1:].hex() + contact_uri = "meshcore://" + data[1:].hex() + result = {"uri": contact_uri} await self.dispatcher.dispatch(Event(EventType.CONTACT_SHARE, result)) elif packet_type_value == PacketType.BATTERY.value: - result = int.from_bytes(data[1:3], byteorder='little') + battery_level = int.from_bytes(data[1:3], byteorder='little') + result = {"level": battery_level} await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) elif packet_type_value == PacketType.DEVICE_INFO.value: @@ -171,20 +204,30 @@ class MessageReader: # Push notifications elif packet_type_value == PacketType.ADVERTISEMENT.value: logger.debug("Advertisement received") - # todo: Read advertisement? - await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, None)) + # TODO: Read advertisement attributes + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, {})) elif packet_type_value == PacketType.PATH_UPDATE.value: logger.debug("Code path update") - await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, None)) + # TODO: Read path update attributes + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, {})) elif packet_type_value == PacketType.ACK.value: logger.debug("Received ACK") - await self.dispatcher.dispatch(Event(EventType.ACK, None)) + ack_data = {} + + if len(data) >= 5: + ack_data["code"] = bytes(data[1:5]).hex() + + attributes = { + "code": ack_data.get("code", "") + } + + await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) elif packet_type_value == PacketType.MESSAGES_WAITING.value: logger.debug("Msgs are waiting") - await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, None)) + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) elif packet_type_value == PacketType.RAW_DATA.value: res = {} @@ -197,11 +240,13 @@ class MessageReader: elif packet_type_value == PacketType.LOGIN_SUCCESS.value: logger.debug("Login success") - await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, None)) + # TODO: Read login attributes + await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, {})) elif packet_type_value == PacketType.LOGIN_FAILED.value: logger.debug("Login failed") - await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, None)) + # TODO: Read login attributes + await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, {})) elif packet_type_value == PacketType.STATUS_RESPONSE.value: res = {} @@ -224,16 +269,20 @@ class MessageReader: res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little') data_hex = data[8:].hex() logger.debug(f"Status response: {data_hex}") - await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res)) + + attributes = { + "pubkey_prefix": res["pubkey_pre"], + } + await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res, attributes)) elif packet_type_value == PacketType.LOG_DATA.value: logger.debug(f"Received RF log data: {data.hex()}") # Parse as raw RX data - log_data = { + log_data: Dict[str, Any] = { "raw_hex": data[1:].hex() } - + # First byte is SNR (signed byte, multiplied by 4) if len(data) > 1: snr_byte = data[1] @@ -253,8 +302,12 @@ class MessageReader: log_data["payload"] = data[3:].hex() log_data["payload_length"] = len(data) - 3 + attributes = { + "pubkey_prefix": log_data["raw_hex"], + } + # Dispatch as RF log data - await self.dispatcher.dispatch(Event(EventType.RX_LOG_DATA, log_data)) + await self.dispatcher.dispatch(Event(EventType.RX_LOG_DATA, log_data, attributes)) elif packet_type_value == PacketType.TRACE_DATA.value: logger.debug(f"Received trace data: {data.hex()}") @@ -298,7 +351,13 @@ class MessageReader: res["path"] = path_nodes logger.debug(f"Parsed trace data: {res}") - await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res)) + + attributes = { + "tag": res["tag"], + "auth_code": res["auth"], + } + + await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) else: logger.debug(f"Unhandled data received {data}")