diff --git a/examples/serial_repeater_telemetry.py b/examples/serial_repeater_telemetry.py index dfc5ef4..b11fd41 100755 --- a/examples/serial_repeater_telemetry.py +++ b/examples/serial_repeater_telemetry.py @@ -9,22 +9,20 @@ from meshcore.events import EventType async def main(): # Parse command line arguments parser = argparse.ArgumentParser(description='Get status from a repeater via serial connection') - # parser.add_argument('-p', '--port', required=True, help='Serial port') - # parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate') + parser.add_argument('-p', '--port', required=True, help='Serial port') + parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate') parser.add_argument('-r', '--repeater', required=True, help='Repeater name') parser.add_argument('-pw', '--password', required=True, help='Password for login') - # parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds') + parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds') args = parser.parse_args() # Connect to the device - mc = await MeshCore.create_ble("lora-py-tester") - 534463 + mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True) + try: # Get contacts - result = await mc.commands.get_contacts() - print(result) - print(mc._contacts) - repeater = mc.get_contact_by_key_prefix(args.repeater) + await mc.ensure_contacts() + repeater = mc.get_contact_by_name(args.repeater) if repeater is None: print(f"Repeater '{args.repeater}' not found in contacts.") @@ -37,25 +35,14 @@ async def main(): if login_event.type != EventType.ERROR: print("Login successful") - # Continuously poll for telemetry every 60 seconds - print("Starting continuous telemetry polling every 60 seconds...") - while True: - try: - # Send status request - print("Sending status request...") - await mc.commands.send_telemetry_req(repeater) - - # Wait for status response - telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=10) - print(telemetry_event) - - # Wait 60 seconds before next poll - await asyncio.sleep(60) - - except Exception as e: - print(f"Error during telemetry poll: {e}") - # Wait before retrying - await asyncio.sleep(60) + # Send status request + print("Sending status request...") + await mc.commands.send_telemetry_req(repeater) + + # Wait for status response + telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=args.timeout) + print(telemetry_event.payload["lpp"]) + else: print("Login failed or timed out") diff --git a/examples/test_command_queue.py b/examples/test_command_queue.py new file mode 100644 index 0000000..edb78d0 --- /dev/null +++ b/examples/test_command_queue.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +""" +Test script to verify command queue implementation prevents concurrent command collisions. +Demonstrates that the queue system properly serializes commands to the single-threaded device. +""" + +import asyncio +import sys +import time +from pathlib import Path + +# Add src to path +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from meshcore import MeshCore, SerialConnection + +TESTER_NAME = "lora-py-tester" + +async def test_concurrent_commands(): + """Test that multiple concurrent commands are properly queued.""" + + mc = await MeshCore.create_ble(TESTER_NAME, debug=False) + + try: + await mc.connect() + print("Connected successfully!") + + # Test 1: Send multiple commands concurrently + print("\n=== Test 1: Concurrent Commands ===") + print("Sending 4 commands simultaneously...") + start_time = time.time() + + # Create multiple command tasks that would normally collide + tasks = [ + asyncio.create_task(mc.commands.get_time()), + asyncio.create_task(mc.commands.get_bat()), + asyncio.create_task(mc.commands.send_device_query()), + asyncio.create_task(mc.commands.get_contacts()), + ] + + # Wait for all to complete + results = await asyncio.gather(*tasks, return_exceptions=True) + + elapsed = time.time() - start_time + print(f"Completed {len(tasks)} commands in {elapsed:.2f} seconds") + + # Check results + for i, result in enumerate(results): + if isinstance(result, Exception): + print(f" Task {i}: ERROR - {result}") + else: + print(f" Task {i}: {result.type.name}") # type: ignore + + # Test 2: Rapid sequential commands + print("\n=== Test 2: Rapid Sequential Commands ===") + print("Sending 5 commands rapidly without delay...") + start_time = time.time() + + for i in range(5): + result = await mc.commands.get_time() + print(f" Command {i}: {result.payload}") + # No delay - commands should still work due to queue + + elapsed = time.time() - start_time + print(f"Completed 5 sequential commands in {elapsed:.2f} seconds") + + print("\nāœ… All tests completed successfully!") + print("The queue system is properly serializing commands.") + + except Exception as e: + print(f"āŒ Test failed: {e}") + import traceback + traceback.print_exc() + + finally: + await mc.disconnect() + print("Disconnected") + + + +async def test_cleanup_on_disconnect(): + """Test that queue properly cleans up on disconnect.""" + + print("\n=== Test 3: Clean Disconnect ===") + print("Testing queue cleanup on disconnect...") + + mc = await MeshCore.create_ble(TESTER_NAME, debug=False) + + try: + await mc.connect() + + # Start some commands but disconnect immediately + tasks = [ + asyncio.create_task(mc.commands.get_contacts()), + asyncio.create_task(mc.commands.get_time()), + asyncio.create_task(mc.commands.get_bat()), + ] + + # Give them time to queue + await asyncio.sleep(0.1) + + # Disconnect (should cancel pending commands) + print("Disconnecting with commands in queue...") + await mc.disconnect() + + # Check that tasks were handled properly + cancelled = 0 + completed = 0 + + for task in tasks: + if task.done(): + try: + result = task.result() + completed += 1 + print(f" Task completed: {result.type.name}") + except asyncio.CancelledError: + cancelled += 1 + print(f" Task was properly cancelled") + except Exception as e: + print(f" Task failed: {e}") + + print(f"Results: {completed} completed, {cancelled} cancelled") + print("āœ… Cleanup test passed!") + + except Exception as e: + print(f"āŒ Cleanup test failed: {e}") + + +async def main(): + """Run all queue tests.""" + print("=" * 60) + print("Command Queue Implementation Tests") + print("=" * 60) + print("\nThis tests the command queue system that prevents") + print("multiple commands from colliding on the single-threaded device.") + print("=" * 60) + + # Run tests + await test_concurrent_commands() + print("\n" + "=" * 60) + await test_cleanup_on_disconnect() + + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/meshcore/commands/__init__.py b/src/meshcore/commands/__init__.py index 0638847..1402a03 100644 --- a/src/meshcore/commands/__init__.py +++ b/src/meshcore/commands/__init__.py @@ -10,7 +10,7 @@ from .messaging import MessagingCommands class CommandHandler( - DeviceCommands, ContactCommands, MessagingCommands, BinaryCommandHandler + BinaryCommandHandler, DeviceCommands, ContactCommands, MessagingCommands ): pass diff --git a/src/meshcore/commands/base.py b/src/meshcore/commands/base.py index e16df57..fe524f7 100644 --- a/src/meshcore/commands/base.py +++ b/src/meshcore/commands/base.py @@ -141,6 +141,7 @@ class CommandHandlerBase: if not self.dispatcher: raise RuntimeError("Dispatcher not set, cannot send commands") + # Use the provided timeout or fall back to default_timeout timeout = timeout if timeout is not None else self.default_timeout if self._sender_func: @@ -151,11 +152,13 @@ class CommandHandlerBase: if expected_events: try: + # Convert single event to list if needed if not isinstance(expected_events, list): expected_events = [expected_events] logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") + # Create futures for all expected events futures = [] for event_type in expected_events: future = asyncio.create_task( @@ -163,18 +166,22 @@ class CommandHandlerBase: ) futures.append(future) + # Wait for the first event to complete or all to timeout done, pending = await asyncio.wait( futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED ) + # Cancel all pending futures for future in pending: future.cancel() + # Check if any future completed successfully for future in done: event = await future if event: return event + # Create an error event when no event is received return Event(EventType.ERROR, {"reason": "no_event_received"}) except asyncio.TimeoutError: logger.debug(f"Command timed out {data}") @@ -182,6 +189,7 @@ class CommandHandlerBase: except Exception as e: logger.debug(f"Command error: {e}") return Event(EventType.ERROR, {"error": str(e)}) + # For commands that don't expect events, return a success event return Event(EventType.OK, {}) async def start_queue_processor(self): diff --git a/src/meshcore/commands/binary.py b/src/meshcore/commands/binary.py index 085d6f9..a189152 100644 --- a/src/meshcore/commands/binary.py +++ b/src/meshcore/commands/binary.py @@ -1,10 +1,8 @@ import logging from enum import Enum import json -from mailbox import Message from meshcore.commands.messaging import MessagingCommands -from .base import CommandHandlerBase from ..events import EventType from cayennelpp import LppFrame, LppData from cayennelpp.lpp_type import LppType diff --git a/tests/unit/test_commands.py b/tests/unit/test_commands.py index 91d531b..7c500f5 100644 --- a/tests/unit/test_commands.py +++ b/tests/unit/test_commands.py @@ -1,4 +1,5 @@ import pytest +import pytest_asyncio import asyncio from unittest.mock import MagicMock, AsyncMock from meshcore.commands import CommandHandler @@ -23,17 +24,23 @@ def mock_dispatcher(): return dispatcher -@pytest.fixture -def command_handler(mock_connection, mock_dispatcher): +@pytest_asyncio.fixture +async def command_handler(mock_connection, mock_dispatcher): handler = CommandHandler() async def sender(data): await mock_connection.send(data) handler._sender_func = sender - handler.dispatcher = mock_dispatcher - return handler + + # Start the queue processor for tests + await handler.start_queue_processor() + + yield handler + + # Clean up after tests + await handler.stop_queue_processor() # Test helper