meshcore_py/tests/unit/test_events.py

130 lines
3.5 KiB
Python
Raw Permalink Normal View History

2025-04-13 22:55:31 -07:00
import pytest
import asyncio
from unittest.mock import MagicMock
2025-04-13 22:55:31 -07:00
from meshcore.events import EventDispatcher, EventType, Event
pytestmark = pytest.mark.asyncio
2025-04-13 22:55:31 -07:00
@pytest.fixture
def dispatcher():
return EventDispatcher()
2025-04-13 22:55:31 -07:00
async def test_subscribe_with_attribute_filter(dispatcher):
callback = MagicMock()
2025-04-13 22:55:31 -07:00
# Subscribe with attribute filters
dispatcher.subscribe(
EventType.MSG_SENT,
2025-04-13 22:55:31 -07:00
callback,
attribute_filters={"type": 1, "expected_ack": "1234"},
2025-04-13 22:55:31 -07:00
)
2025-04-13 22:55:31 -07:00
# Start the dispatcher
await dispatcher.start()
2025-04-13 22:55:31 -07:00
try:
# Dispatch event that should NOT match (wrong type)
await dispatcher.dispatch(
Event(
EventType.MSG_SENT,
{"some": "data"},
{"type": 2, "expected_ack": "1234"},
)
)
2025-04-13 22:55:31 -07:00
await asyncio.sleep(0.1) # Allow processing
2025-04-13 22:55:31 -07:00
# Callback should NOT have been called
assert callback.call_count == 0
2025-04-13 22:55:31 -07:00
# Dispatch event that should match all filters
await dispatcher.dispatch(
Event(
EventType.MSG_SENT,
{"some": "data"},
{"type": 1, "expected_ack": "1234"},
)
)
2025-04-13 22:55:31 -07:00
await asyncio.sleep(0.1) # Allow processing
2025-04-13 22:55:31 -07:00
# Callback should have been called once
assert callback.call_count == 1
2025-04-13 22:55:31 -07:00
finally:
await dispatcher.stop()
2025-04-13 22:55:31 -07:00
async def test_wait_for_event_with_attribute_filter(dispatcher):
await dispatcher.start()
2025-04-13 22:55:31 -07:00
try:
future_event = asyncio.create_task(
dispatcher.wait_for_event(
EventType.ACK, attribute_filters={"code": "1234"}, timeout=3.0
2025-04-13 22:55:31 -07:00
)
)
2025-04-13 22:55:31 -07:00
await asyncio.sleep(0.1)
await dispatcher.dispatch(
Event(EventType.ACK, {"some": "data"}, {"code": "5678"})
)
2025-04-13 22:55:31 -07:00
await asyncio.sleep(0.1)
await dispatcher.dispatch(
Event(EventType.ACK, {"ack": "data"}, {"code": "1234"})
)
2025-04-13 22:55:31 -07:00
result = await asyncio.wait_for(future_event, 3.0)
2025-04-13 22:55:31 -07:00
assert result is not None
assert result.type == EventType.ACK
assert result.attributes["code"] == "1234"
assert result.payload == {"ack": "data"}
2025-04-13 22:55:31 -07:00
finally:
await dispatcher.stop()
2025-04-13 22:55:31 -07:00
async def test_wait_for_event_timeout_with_filter(dispatcher):
await dispatcher.start()
2025-04-13 22:55:31 -07:00
try:
# Wait for an event that won't arrive
result = await dispatcher.wait_for_event(
EventType.ACK, attribute_filters={"code": "1234"}, timeout=0.1
2025-04-13 22:55:31 -07:00
)
2025-04-13 22:55:31 -07:00
# Should get None due to timeout
assert result is None
2025-04-13 22:55:31 -07:00
finally:
await dispatcher.stop()
2025-04-13 22:55:31 -07:00
async def test_event_init_with_kwargs():
# Test creating an event with keyword attributes
event = Event(EventType.ACK, {"data": "value"}, code="1234", status="ok")
2025-04-13 22:55:31 -07:00
assert event.type == EventType.ACK
assert event.payload == {"data": "value"}
assert event.attributes == {"code": "1234", "status": "ok"}
async def test_channel_info_event():
# Test CHANNEL_INFO event type
channel_payload = {
"channel_idx": 3,
"channel_name": "TestChannel",
"channel_secret": b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10",
}
event = Event(EventType.CHANNEL_INFO, channel_payload)
assert event.type == EventType.CHANNEL_INFO
assert event.payload["channel_idx"] == 3
assert event.payload["channel_name"] == "TestChannel"
assert len(event.payload["channel_secret"]) == 16