diff --git a/pyproject.toml b/pyproject.toml index 601dfed..10deec2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,9 @@ license = "MIT" license-files = ["LICEN[CS]E*"] dependencies = [ "bleak", "pyserial-asyncio" ] +[project.optional-dependencies] +dev = ["pytest", "pytest-asyncio"] + [project.urls] Homepage = "https://github.com/fdlamotte/meshcore_py" Issues = "https://github.com/fdlamotte/meshcore_py/issues" diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..d280de0 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode = auto \ No newline at end of file diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..d047299 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,25 @@ +# MeshCore Tests + +## Running Tests + +To run the tests, first install the development dependencies: + +```bash +pip install -e ".[dev]" +``` + +Then run the tests using pytest: + +```bash +# Run all tests +pytest + +# Run tests with verbose output +pytest -v + +# Run a specific test file +pytest tests/unit/test_commands.py + +# Run a specific test +pytest tests/unit/test_commands.py::test_send_msg +``` \ No newline at end of file diff --git a/tests/unit/test_commands.py b/tests/unit/test_commands.py new file mode 100644 index 0000000..9b90562 --- /dev/null +++ b/tests/unit/test_commands.py @@ -0,0 +1,234 @@ +import pytest +import asyncio +from unittest.mock import MagicMock, patch, AsyncMock +from meshcore.commands import CommandHandler +from meshcore.events import EventType, Event + +pytestmark = pytest.mark.asyncio + +# Fixtures +@pytest.fixture +def mock_connection(): + connection = MagicMock() + connection.send = AsyncMock() + return connection + +@pytest.fixture +def mock_dispatcher(): + dispatcher = MagicMock() + dispatcher.wait_for_event = AsyncMock() + dispatcher.dispatch = AsyncMock() + return dispatcher + +@pytest.fixture +def command_handler(mock_connection, mock_dispatcher): + handler = CommandHandler() + + async def sender(data): + await mock_connection.send(data) + handler._sender_func = sender + + handler.dispatcher = mock_dispatcher + return handler + +# Test helper +def setup_event_response(mock_dispatcher, event_type, payload, attribute_filters=None): + async def wait_response(requested_type, filters=None, timeout=None): + if requested_type == event_type: + if filters and attribute_filters: + if not all(attribute_filters.get(key) == value for key, value in filters.items()): + return None + return Event(event_type, payload) + return None + + mock_dispatcher.wait_for_event.side_effect = wait_response + +# Basic tests +async def test_send_basic(command_handler, mock_connection): + result = await command_handler.send(b"test_data") + mock_connection.send.assert_called_once_with(b"test_data") + assert result == {"success": True} + +async def test_send_with_event(command_handler, mock_connection, mock_dispatcher): + expected_payload = {"success": True, "value": 42} + setup_event_response(mock_dispatcher, EventType.OK, expected_payload) + + result = await command_handler.send(b"test_command", [EventType.OK]) + + mock_connection.send.assert_called_once_with(b"test_command") + assert result == expected_payload + +async def test_send_timeout(command_handler, mock_connection, mock_dispatcher): + mock_dispatcher.wait_for_event.side_effect = asyncio.TimeoutError + + result = await command_handler.send(b"test_command", [EventType.OK], timeout=0.1) + assert result == {"success": False, "reason": "timeout"} + +# Destination validation tests +async def test_validate_destination_bytes(command_handler, mock_connection): + dst = b"123456789012" # 12 bytes + await command_handler.send_msg(dst, "test message") + + assert mock_connection.send.call_args[0][0].startswith(b"\x02\x00\x00") + assert b"123456" in mock_connection.send.call_args[0][0] + +async def test_validate_destination_hex_string(command_handler, mock_connection): + dst = "0123456789abcdef" + await command_handler.send_msg(dst, "test message") + + assert mock_connection.send.call_args[0][0].startswith(b"\x02\x00\x00") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + +async def test_validate_destination_contact_object(command_handler, mock_connection): + dst = {"public_key": "0123456789abcdef", "adv_name": "Test Contact"} + await command_handler.send_msg(dst, "test message") + + assert mock_connection.send.call_args[0][0].startswith(b"\x02\x00\x00") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + +# Command tests +async def test_send_login(command_handler, mock_connection): + await command_handler.send_login("0123456789abcdef", "password") + + assert mock_connection.send.call_args[0][0].startswith(b"\x1a") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + assert b"password" in mock_connection.send.call_args[0][0] + +async def test_send_msg(command_handler, mock_connection): + await command_handler.send_msg("0123456789abcdef", "hello") + + assert mock_connection.send.call_args[0][0].startswith(b"\x02\x00\x00") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + assert b"hello" in mock_connection.send.call_args[0][0] + +async def test_send_cmd(command_handler, mock_connection): + await command_handler.send_cmd("0123456789abcdef", "test_cmd") + + assert mock_connection.send.call_args[0][0].startswith(b"\x02\x01\x00") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + assert b"test_cmd" in mock_connection.send.call_args[0][0] + +# Device settings tests +async def test_set_name(command_handler, mock_connection): + await command_handler.set_name("Test Device") + + assert mock_connection.send.call_args[0][0].startswith(b"\x08") + assert b"Test Device" in mock_connection.send.call_args[0][0] + +async def test_set_coords(command_handler, mock_connection): + await command_handler.set_coords(37.7749, -122.4194) + + assert mock_connection.send.call_args[0][0].startswith(b"\x0e") + # Could add more detailed assertions for the byte encoding + +async def test_send_appstart(command_handler, mock_connection): + await command_handler.send_appstart() + assert mock_connection.send.call_args[0][0].startswith(b"\x01\x03") + assert b"mccli" in mock_connection.send.call_args[0][0] + +async def test_send_device_query(command_handler, mock_connection): + await command_handler.send_device_query() + assert mock_connection.send.call_args[0][0].startswith(b"\x16\x03") + +async def test_send_advert(command_handler, mock_connection): + # Test without flood + await command_handler.send_advert(flood=False) + assert mock_connection.send.call_args[0][0] == b"\x07" + + # Test with flood + mock_connection.reset_mock() + await command_handler.send_advert(flood=True) + assert mock_connection.send.call_args[0][0] == b"\x07\x01" + +async def test_reboot(command_handler, mock_connection): + await command_handler.reboot() + assert mock_connection.send.call_args[0][0].startswith(b"\x13reboot") + +async def test_get_bat(command_handler, mock_connection): + await command_handler.get_bat() + assert mock_connection.send.call_args[0][0].startswith(b"\x14") + +async def test_get_time(command_handler, mock_connection): + await command_handler.get_time() + assert mock_connection.send.call_args[0][0].startswith(b"\x05") + +async def test_set_time(command_handler, mock_connection): + timestamp = 1620000000 # Example timestamp + await command_handler.set_time(timestamp) + assert mock_connection.send.call_args[0][0].startswith(b"\x06") + +async def test_set_tx_power(command_handler, mock_connection): + await command_handler.set_tx_power(20) + assert mock_connection.send.call_args[0][0].startswith(b"\x0c") + +async def test_get_contacts(command_handler, mock_connection): + await command_handler.get_contacts() + assert mock_connection.send.call_args[0][0].startswith(b"\x04") + +async def test_reset_path(command_handler, mock_connection): + dst = "0123456789abcdef" + await command_handler.reset_path(dst) + assert mock_connection.send.call_args[0][0].startswith(b"\x0D") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + +async def test_share_contact(command_handler, mock_connection): + dst = "0123456789abcdef" + await command_handler.share_contact(dst) + assert mock_connection.send.call_args[0][0].startswith(b"\x10") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + +async def test_export_contact(command_handler, mock_connection): + # Test exporting all contacts + await command_handler.export_contact() + assert mock_connection.send.call_args[0][0] == b"\x11" + + # Test exporting specific contact + mock_connection.reset_mock() + dst = "0123456789abcdef" + await command_handler.export_contact(dst) + assert mock_connection.send.call_args[0][0].startswith(b"\x11") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + +async def test_remove_contact(command_handler, mock_connection): + dst = "0123456789abcdef" + await command_handler.remove_contact(dst) + assert mock_connection.send.call_args[0][0].startswith(b"\x0f") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + +async def test_get_msg(command_handler, mock_connection): + await command_handler.get_msg() + assert mock_connection.send.call_args[0][0].startswith(b"\x0A") + + # Test with custom timeout + mock_connection.reset_mock() + await command_handler.get_msg(timeout=5.0) + assert mock_connection.send.call_args[0][0].startswith(b"\x0A") + +async def test_send_logout(command_handler, mock_connection): + dst = "0123456789abcdef" + await command_handler.send_logout(dst) + assert mock_connection.send.call_args[0][0].startswith(b"\x1d") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + +async def test_send_statusreq(command_handler, mock_connection): + dst = "0123456789abcdef" + await command_handler.send_statusreq(dst) + assert mock_connection.send.call_args[0][0].startswith(b"\x1b") + assert b"\x01\x23\x45\x67\x89\xab" in mock_connection.send.call_args[0][0] + +async def test_send_trace(command_handler, mock_connection): + # Test with minimal parameters + await command_handler.send_trace() + first_call = mock_connection.send.call_args[0][0] + assert first_call.startswith(b"\x24") # 36 in decimal = 0x24 in hex + + # Test with all parameters + mock_connection.reset_mock() + await command_handler.send_trace( + auth_code=12345, + tag=67890, + flags=1, + path="01,23,45" + ) + second_call = mock_connection.send.call_args[0][0] + assert second_call.startswith(b"\x24") \ No newline at end of file diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py new file mode 100644 index 0000000..a2925f6 --- /dev/null +++ b/tests/unit/test_events.py @@ -0,0 +1,112 @@ +import pytest +import asyncio +from unittest.mock import MagicMock, AsyncMock +from meshcore.events import EventDispatcher, EventType, Event + +pytestmark = pytest.mark.asyncio + +@pytest.fixture +def dispatcher(): + return EventDispatcher() + +async def test_subscribe_with_attribute_filter(dispatcher): + callback = MagicMock() + + # Subscribe with attribute filters + subscription = dispatcher.subscribe( + EventType.MSG_SENT, + callback, + attribute_filters={"type": 1, "expected_ack": "1234"} + ) + + # Start the dispatcher + await dispatcher.start() + + try: + # Dispatch event that should NOT match (wrong type) + await dispatcher.dispatch(Event( + EventType.MSG_SENT, + {"some": "data"}, + {"type": 2, "expected_ack": "1234"} + )) + await asyncio.sleep(0.1) # Allow processing + + # Callback should NOT have been called + assert callback.call_count == 0 + + # Dispatch event that should match all filters + await dispatcher.dispatch(Event( + EventType.MSG_SENT, + {"some": "data"}, + {"type": 1, "expected_ack": "1234"} + )) + await asyncio.sleep(0.1) # Allow processing + + # Callback should have been called once + assert callback.call_count == 1 + + finally: + await dispatcher.stop() + +async def test_wait_for_event_with_attribute_filter(dispatcher): + await dispatcher.start() + + try: + future_event = asyncio.create_task( + dispatcher.wait_for_event( + EventType.ACK, + attribute_filters={"code": "1234"}, + timeout=3.0 + ) + ) + + await asyncio.sleep(0.1) + + await dispatcher.dispatch(Event( + EventType.ACK, + {"some": "data"}, + {"code": "5678"} + )) + + await asyncio.sleep(0.1) + + await dispatcher.dispatch(Event( + EventType.ACK, + {"ack": "data"}, + {"code": "1234"} + )) + + result = await asyncio.wait_for(future_event, 3.0) + + assert result is not None + assert result.type == EventType.ACK + assert result.attributes["code"] == "1234" + assert result.payload == {"ack": "data"} + + finally: + await dispatcher.stop() + +async def test_wait_for_event_timeout_with_filter(dispatcher): + await dispatcher.start() + + try: + # Wait for an event that won't arrive + result = await dispatcher.wait_for_event( + EventType.ACK, + attribute_filters={"code": "1234"}, + timeout=0.1 + ) + + # Should get None due to timeout + assert result is None + + finally: + await dispatcher.stop() + +async def test_event_init_with_kwargs(): + # Test creating an event with keyword attributes + event = Event(EventType.ACK, {"data": "value"}, code="1234", status="ok") + + assert event.type == EventType.ACK + assert event.payload == {"data": "value"} + assert event.attributes == {"code": "1234", "status": "ok"} \ No newline at end of file