mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-04-20 22:13:49 +00:00
Refactored the BinaryCommandHandler to align with the other command handlers, inheriting from CommandHandlerBase. This resolves an AttributeError and simplifies the command structure. Moved binary_commands.py into the commands module. Applied fixes to the BLE connection handler based on feedback, improving reliability on macOS and ensuring the device address is correctly handled.
129 lines
3.5 KiB
Python
129 lines
3.5 KiB
Python
import pytest
|
|
import asyncio
|
|
from unittest.mock import MagicMock
|
|
from meshcore.events import EventDispatcher, EventType, Event
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
|
|
@pytest.fixture
|
|
def dispatcher():
|
|
return EventDispatcher()
|
|
|
|
|
|
async def test_subscribe_with_attribute_filter(dispatcher):
|
|
callback = MagicMock()
|
|
|
|
# Subscribe with attribute filters
|
|
dispatcher.subscribe(
|
|
EventType.MSG_SENT,
|
|
callback,
|
|
attribute_filters={"type": 1, "expected_ack": "1234"},
|
|
)
|
|
|
|
# Start the dispatcher
|
|
await dispatcher.start()
|
|
|
|
try:
|
|
# Dispatch event that should NOT match (wrong type)
|
|
await dispatcher.dispatch(
|
|
Event(
|
|
EventType.MSG_SENT,
|
|
{"some": "data"},
|
|
{"type": 2, "expected_ack": "1234"},
|
|
)
|
|
)
|
|
await asyncio.sleep(0.1) # Allow processing
|
|
|
|
# Callback should NOT have been called
|
|
assert callback.call_count == 0
|
|
|
|
# Dispatch event that should match all filters
|
|
await dispatcher.dispatch(
|
|
Event(
|
|
EventType.MSG_SENT,
|
|
{"some": "data"},
|
|
{"type": 1, "expected_ack": "1234"},
|
|
)
|
|
)
|
|
await asyncio.sleep(0.1) # Allow processing
|
|
|
|
# Callback should have been called once
|
|
assert callback.call_count == 1
|
|
|
|
finally:
|
|
await dispatcher.stop()
|
|
|
|
|
|
async def test_wait_for_event_with_attribute_filter(dispatcher):
|
|
await dispatcher.start()
|
|
|
|
try:
|
|
future_event = asyncio.create_task(
|
|
dispatcher.wait_for_event(
|
|
EventType.ACK, attribute_filters={"code": "1234"}, timeout=3.0
|
|
)
|
|
)
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
await dispatcher.dispatch(
|
|
Event(EventType.ACK, {"some": "data"}, {"code": "5678"})
|
|
)
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
await dispatcher.dispatch(
|
|
Event(EventType.ACK, {"ack": "data"}, {"code": "1234"})
|
|
)
|
|
|
|
result = await asyncio.wait_for(future_event, 3.0)
|
|
|
|
assert result is not None
|
|
assert result.type == EventType.ACK
|
|
assert result.attributes["code"] == "1234"
|
|
assert result.payload == {"ack": "data"}
|
|
|
|
finally:
|
|
await dispatcher.stop()
|
|
|
|
|
|
async def test_wait_for_event_timeout_with_filter(dispatcher):
|
|
await dispatcher.start()
|
|
|
|
try:
|
|
# Wait for an event that won't arrive
|
|
result = await dispatcher.wait_for_event(
|
|
EventType.ACK, attribute_filters={"code": "1234"}, timeout=0.1
|
|
)
|
|
|
|
# Should get None due to timeout
|
|
assert result is None
|
|
|
|
finally:
|
|
await dispatcher.stop()
|
|
|
|
|
|
async def test_event_init_with_kwargs():
|
|
# Test creating an event with keyword attributes
|
|
event = Event(EventType.ACK, {"data": "value"}, code="1234", status="ok")
|
|
|
|
assert event.type == EventType.ACK
|
|
assert event.payload == {"data": "value"}
|
|
assert event.attributes == {"code": "1234", "status": "ok"}
|
|
|
|
|
|
async def test_channel_info_event():
|
|
# Test CHANNEL_INFO event type
|
|
channel_payload = {
|
|
"channel_idx": 3,
|
|
"channel_name": "TestChannel",
|
|
"channel_secret": b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10",
|
|
}
|
|
|
|
event = Event(EventType.CHANNEL_INFO, channel_payload)
|
|
|
|
assert event.type == EventType.CHANNEL_INFO
|
|
assert event.payload["channel_idx"] == 3
|
|
assert event.payload["channel_name"] == "TestChannel"
|
|
assert len(event.payload["channel_secret"]) == 16
|