From 28957a4b60689bd13221c0343cce975e0711d458 Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Thu, 21 Aug 2025 19:08:57 -0700 Subject: [PATCH] Refactor command system to be queue based --- examples/serial_repeater_telemetry.py | 43 ++++--- src/meshcore/commands/base.py | 164 ++++++++++++++++++++++++-- src/meshcore/commands/binary.py | 11 +- src/meshcore/connection_manager.py | 2 +- src/meshcore/events.py | 5 +- src/meshcore/meshcore.py | 7 ++ 6 files changed, 202 insertions(+), 30 deletions(-) diff --git a/examples/serial_repeater_telemetry.py b/examples/serial_repeater_telemetry.py index b11fd41..dfc5ef4 100755 --- a/examples/serial_repeater_telemetry.py +++ b/examples/serial_repeater_telemetry.py @@ -9,20 +9,22 @@ from meshcore.events import EventType async def main(): # Parse command line arguments parser = argparse.ArgumentParser(description='Get status from a repeater via serial connection') - parser.add_argument('-p', '--port', required=True, help='Serial port') - parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate') + # parser.add_argument('-p', '--port', required=True, help='Serial port') + # parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate') parser.add_argument('-r', '--repeater', required=True, help='Repeater name') parser.add_argument('-pw', '--password', required=True, help='Password for login') - parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds') + # parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds') args = parser.parse_args() # Connect to the device - mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True) - + mc = await MeshCore.create_ble("lora-py-tester") + 534463 try: # Get contacts - await mc.ensure_contacts() - repeater = mc.get_contact_by_name(args.repeater) + result = await mc.commands.get_contacts() + print(result) + print(mc._contacts) + repeater = mc.get_contact_by_key_prefix(args.repeater) if repeater is None: print(f"Repeater '{args.repeater}' not found in contacts.") @@ -35,14 +37,25 @@ async def main(): if login_event.type != EventType.ERROR: print("Login successful") - # Send status request - print("Sending status request...") - await mc.commands.send_telemetry_req(repeater) - - # Wait for status response - telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=args.timeout) - print(telemetry_event.payload["lpp"]) - + # Continuously poll for telemetry every 60 seconds + print("Starting continuous telemetry polling every 60 seconds...") + while True: + try: + # Send status request + print("Sending status request...") + await mc.commands.send_telemetry_req(repeater) + + # Wait for status response + telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=10) + print(telemetry_event) + + # Wait 60 seconds before next poll + await asyncio.sleep(60) + + except Exception as e: + print(f"Error during telemetry poll: {e}") + # Wait before retrying + await asyncio.sleep(60) else: print("Login failed or timed out") diff --git a/src/meshcore/commands/base.py b/src/meshcore/commands/base.py index ba6faad..e16df57 100644 --- a/src/meshcore/commands/base.py +++ b/src/meshcore/commands/base.py @@ -1,6 +1,5 @@ import asyncio import logging -import random from typing import Any, Callable, Coroutine, Dict, List, Optional, Union from ..events import Event, EventDispatcher, EventType @@ -53,14 +52,21 @@ def _validate_destination(dst: DestinationType, prefix_length: int = 6) -> bytes class CommandHandlerBase: DEFAULT_TIMEOUT = 5.0 + MAX_QUEUE_SIZE = 100 - def __init__(self, default_timeout: Optional[float] = None): + def __init__(self, default_timeout: Optional[float] = None, max_queue_size: Optional[int] = None): self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None self._reader: Optional[MessageReader] = None self.dispatcher: Optional[EventDispatcher] = None self.default_timeout = ( default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT ) + + max_size = max_queue_size if max_queue_size is not None else self.MAX_QUEUE_SIZE + self._command_queue = asyncio.Queue(maxsize=max_size) + self._start_lock = asyncio.Lock() # Only for start/stop operations + self._queue_processor_task: Optional[asyncio.Task] = None + self._is_running = False def set_connection(self, connection: Any) -> None: async def sender(data: bytes) -> None: @@ -81,7 +87,48 @@ class CommandHandlerBase: timeout: Optional[float] = None, ) -> Event: """ - Send a command and wait for expected event responses. + Queue a command for execution and wait for the response. + + Args: + data: The data to send + expected_events: EventType or list of EventTypes to wait for + timeout: Timeout in seconds, or None to use default_timeout + + Returns: + Event: The full event object that was received in response to the command + + Raises: + RuntimeError: If the command queue is full + """ + async with self._start_lock: + if not self._is_running: + await self._start_queue_processor() + + future = asyncio.Future() + + try: + await asyncio.wait_for( + self._command_queue.put((data, expected_events, timeout, future)), + timeout=1.0 + ) + except asyncio.TimeoutError: + future.set_exception(RuntimeError( + f"Command queue is full ({self._command_queue.maxsize} commands pending)" + )) + except Exception as e: + future.set_exception(e) + + return await future + + async def _send_internal( + self, + data: bytes, + expected_events: Optional[Union[EventType, List[EventType]]] = None, + timeout: Optional[float] = None, + ) -> Event: + """ + Internal method that does the actual sending and waiting for events. + This runs inside the queue processor with lock protection. Args: data: The data to send @@ -94,7 +141,6 @@ class CommandHandlerBase: if not self.dispatcher: raise RuntimeError("Dispatcher not set, cannot send commands") - # Use the provided timeout or fall back to default_timeout timeout = timeout if timeout is not None else self.default_timeout if self._sender_func: @@ -105,13 +151,11 @@ class CommandHandlerBase: if expected_events: try: - # Convert single event to list if needed if not isinstance(expected_events, list): expected_events = [expected_events] logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") - # Create futures for all expected events futures = [] for event_type in expected_events: future = asyncio.create_task( @@ -119,22 +163,18 @@ class CommandHandlerBase: ) futures.append(future) - # Wait for the first event to complete or all to timeout done, pending = await asyncio.wait( futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED ) - # Cancel all pending futures for future in pending: future.cancel() - # Check if any future completed successfully for future in done: event = await future if event: return event - # Create an error event when no event is received return Event(EventType.ERROR, {"reason": "no_event_received"}) except asyncio.TimeoutError: logger.debug(f"Command timed out {data}") @@ -142,5 +182,107 @@ class CommandHandlerBase: except Exception as e: logger.debug(f"Command error: {e}") return Event(EventType.ERROR, {"error": str(e)}) - # For commands that don't expect events, return a success event return Event(EventType.OK, {}) + + async def start_queue_processor(self): + """ + Start the command queue processor. + This should be called once when the connection is established. + """ + async with self._start_lock: + if not self._is_running: + await self._start_queue_processor() + + async def _start_queue_processor(self): + """Internal method to start the background queue processor.""" + if not self._queue_processor_task or self._queue_processor_task.done(): + self._is_running = True + self._queue_processor_task = asyncio.create_task(self._process_queue()) + logger.debug("Started command queue processor") + + async def _process_queue(self): + """Process commands from the queue sequentially.""" + logger.debug("Command queue processor started") + while self._is_running: + try: + item = await self._command_queue.get() + + # kill queue signal + if item is None: + logger.debug("Received shutdown sentinel") + break + + data, expected_events, timeout, future = item + + if future.cancelled(): + continue + + try: + logger.debug(f"Processing queued command: {data.hex() if isinstance(data, bytes) else data}") + result = await self._send_internal(data, expected_events, timeout) + + if not future.cancelled(): + future.set_result(result) + except Exception as e: + logger.error(f"Error processing command: {e}") + if not future.cancelled(): + future.set_exception(e) + + # Small delay between commands to avoid overwhelming the device + await asyncio.sleep(0.01) + + except asyncio.CancelledError: + logger.debug("Queue processor cancelled") + break + except Exception as e: + logger.error(f"Queue processor error: {e}") + # Continue processing even if there was an error + + logger.debug("Command queue processor stopped") + + async def stop_queue_processor(self): + """Stop the queue processor gracefully.""" + logger.debug("Stopping command queue processor") + + if not self._is_running: + return + + self._is_running = False + + try: + # send kill signal and wait for it to be processed + await asyncio.wait_for(self._command_queue.put(None), timeout=1.0) + except asyncio.TimeoutError: + logger.warning("Could not send shutdown sentinel (queue may be full)") + + if self._queue_processor_task: + try: + await asyncio.wait_for(self._queue_processor_task, timeout=2.0) + except asyncio.TimeoutError: + logger.warning("Queue processor did not stop gracefully, cancelling") + self._queue_processor_task.cancel() + try: + await self._queue_processor_task + except asyncio.CancelledError: + pass + self._queue_processor_task = None + + cancelled_count = 0 + while not self._command_queue.empty(): + try: + item = self._command_queue.get_nowait() + if item is None: + continue + if isinstance(item, tuple) and len(item) == 4: + _, _, _, future = item + if not future.cancelled(): + future.cancel() + cancelled_count += 1 + except Exception as e: + logger.debug(f"Error during cleanup: {e}") + break + + if cancelled_count > 0: + logger.debug(f"Cancelled {cancelled_count} pending commands") + + logger.debug("Command queue processor stopped") diff --git a/src/meshcore/commands/binary.py b/src/meshcore/commands/binary.py index d35f54f..4bd2392 100644 --- a/src/meshcore/commands/binary.py +++ b/src/meshcore/commands/binary.py @@ -1,6 +1,9 @@ import logging from enum import Enum import json +from mailbox import Message + +from meshcore.commands.messaging import MessagingCommands from .base import CommandHandlerBase from ..events import EventType from cayennelpp import LppFrame, LppData @@ -38,6 +41,9 @@ def lpp_parse_mma(buf): i = i + 1 type = buf[i] lpp_type = LppType.get_lpp_type(type) + if lpp_type is None: + logger.error(f"Unknown LPP type: {type}") + return None size = lpp_type.size i = i + 1 min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) @@ -70,7 +76,7 @@ def parse_acl(buf): return res -class BinaryCommandHandler(CommandHandlerBase): +class BinaryCommandHandler(MessagingCommands): """Helper functions to handle binary requests through binary commands""" async def req_binary(self, contact, request, timeout=0): @@ -84,6 +90,9 @@ class BinaryCommandHandler(CommandHandlerBase): timeout = ( res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout ) + if self.dispatcher is None: + logger.error("No dispatcher set, cannot wait for response") + return None res2 = await self.dispatcher.wait_for_event( EventType.BINARY_RESPONSE, attribute_filters={"tag": exp_tag}, diff --git a/src/meshcore/connection_manager.py b/src/meshcore/connection_manager.py index c95ec37..4b46403 100644 --- a/src/meshcore/connection_manager.py +++ b/src/meshcore/connection_manager.py @@ -21,7 +21,7 @@ class ConnectionProtocol(Protocol): """Disconnect from the device/server.""" ... - async def send(self, data): + async def send(self, data) -> Any: """Send data through the connection.""" ... diff --git a/src/meshcore/events.py b/src/meshcore/events.py index 2ee3d56..137fd43 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -1,3 +1,4 @@ +from collections.abc import Coroutine from enum import Enum import inspect import logging @@ -113,7 +114,7 @@ class EventDispatcher: def subscribe( self, event_type: Union[EventType, None], - callback: Callable[[Event], Union[None, asyncio.Future]], + callback: Callable[[Event], Coroutine[Any, Any, None]], attribute_filters: Optional[Dict[str, Any]] = None, ) -> Subscription: """ @@ -226,7 +227,7 @@ class EventDispatcher: """ future = asyncio.Future() - def event_handler(event: Event): + async def event_handler(event: Event): if not future.done(): future.set_result(event) diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index 84e533f..2314da0 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -162,6 +162,10 @@ class MeshCore: result = await self.connection_manager.connect() if result is None: raise ConnectionError("Failed to connect to device") + + # Start the command queue processor after successful connection + await self.commands.start_queue_processor() + return await self.commands.send_appstart() async def disconnect(self): @@ -173,6 +177,9 @@ class MeshCore: if hasattr(self, "_auto_fetch_subscription") and self._auto_fetch_subscription: await self.stop_auto_message_fetching() + # Stop the command queue processor + await self.commands.stop_queue_processor() + # Disconnect the connection object await self.connection_manager.disconnect()