meshcore_py/tests/unit/test_events.py
Ventz Petkov 36727f4ea3 feat: Refactor binary commands and apply BLE fixes
Refactored the BinaryCommandHandler to align with the other command handlers, inheriting from CommandHandlerBase. This resolves an AttributeError and simplifies the command structure. Moved binary_commands.py into the commands module. Applied fixes to the BLE connection handler based on feedback, improving reliability on macOS and ensuring the device address is correctly handled.
2025-08-05 15:31:54 -04:00

129 lines
3.5 KiB
Python

import pytest
import asyncio
from unittest.mock import MagicMock
from meshcore.events import EventDispatcher, EventType, Event
pytestmark = pytest.mark.asyncio
@pytest.fixture
def dispatcher():
return EventDispatcher()
async def test_subscribe_with_attribute_filter(dispatcher):
callback = MagicMock()
# Subscribe with attribute filters
dispatcher.subscribe(
EventType.MSG_SENT,
callback,
attribute_filters={"type": 1, "expected_ack": "1234"},
)
# Start the dispatcher
await dispatcher.start()
try:
# Dispatch event that should NOT match (wrong type)
await dispatcher.dispatch(
Event(
EventType.MSG_SENT,
{"some": "data"},
{"type": 2, "expected_ack": "1234"},
)
)
await asyncio.sleep(0.1) # Allow processing
# Callback should NOT have been called
assert callback.call_count == 0
# Dispatch event that should match all filters
await dispatcher.dispatch(
Event(
EventType.MSG_SENT,
{"some": "data"},
{"type": 1, "expected_ack": "1234"},
)
)
await asyncio.sleep(0.1) # Allow processing
# Callback should have been called once
assert callback.call_count == 1
finally:
await dispatcher.stop()
async def test_wait_for_event_with_attribute_filter(dispatcher):
await dispatcher.start()
try:
future_event = asyncio.create_task(
dispatcher.wait_for_event(
EventType.ACK, attribute_filters={"code": "1234"}, timeout=3.0
)
)
await asyncio.sleep(0.1)
await dispatcher.dispatch(
Event(EventType.ACK, {"some": "data"}, {"code": "5678"})
)
await asyncio.sleep(0.1)
await dispatcher.dispatch(
Event(EventType.ACK, {"ack": "data"}, {"code": "1234"})
)
result = await asyncio.wait_for(future_event, 3.0)
assert result is not None
assert result.type == EventType.ACK
assert result.attributes["code"] == "1234"
assert result.payload == {"ack": "data"}
finally:
await dispatcher.stop()
async def test_wait_for_event_timeout_with_filter(dispatcher):
await dispatcher.start()
try:
# Wait for an event that won't arrive
result = await dispatcher.wait_for_event(
EventType.ACK, attribute_filters={"code": "1234"}, timeout=0.1
)
# Should get None due to timeout
assert result is None
finally:
await dispatcher.stop()
async def test_event_init_with_kwargs():
# Test creating an event with keyword attributes
event = Event(EventType.ACK, {"data": "value"}, code="1234", status="ok")
assert event.type == EventType.ACK
assert event.payload == {"data": "value"}
assert event.attributes == {"code": "1234", "status": "ok"}
async def test_channel_info_event():
# Test CHANNEL_INFO event type
channel_payload = {
"channel_idx": 3,
"channel_name": "TestChannel",
"channel_secret": b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10",
}
event = Event(EventType.CHANNEL_INFO, channel_payload)
assert event.type == EventType.CHANNEL_INFO
assert event.payload["channel_idx"] == 3
assert event.payload["channel_name"] == "TestChannel"
assert len(event.payload["channel_secret"]) == 16