Refactor command system to be queue based

This commit is contained in:
Alex Wolden 2025-08-21 19:08:57 -07:00
parent 2b6a4b267b
commit 28957a4b60
6 changed files with 202 additions and 30 deletions

View file

@ -9,20 +9,22 @@ from meshcore.events import EventType
async def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='Get status from a repeater via serial connection')
parser.add_argument('-p', '--port', required=True, help='Serial port')
parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate')
# parser.add_argument('-p', '--port', required=True, help='Serial port')
# parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate')
parser.add_argument('-r', '--repeater', required=True, help='Repeater name')
parser.add_argument('-pw', '--password', required=True, help='Password for login')
parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds')
# parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds')
args = parser.parse_args()
# Connect to the device
mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True)
mc = await MeshCore.create_ble("lora-py-tester")
534463
try:
# Get contacts
await mc.ensure_contacts()
repeater = mc.get_contact_by_name(args.repeater)
result = await mc.commands.get_contacts()
print(result)
print(mc._contacts)
repeater = mc.get_contact_by_key_prefix(args.repeater)
if repeater is None:
print(f"Repeater '{args.repeater}' not found in contacts.")
@ -35,14 +37,25 @@ async def main():
if login_event.type != EventType.ERROR:
print("Login successful")
# Send status request
print("Sending status request...")
await mc.commands.send_telemetry_req(repeater)
# Wait for status response
telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=args.timeout)
print(telemetry_event.payload["lpp"])
# Continuously poll for telemetry every 60 seconds
print("Starting continuous telemetry polling every 60 seconds...")
while True:
try:
# Send status request
print("Sending status request...")
await mc.commands.send_telemetry_req(repeater)
# Wait for status response
telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=10)
print(telemetry_event)
# Wait 60 seconds before next poll
await asyncio.sleep(60)
except Exception as e:
print(f"Error during telemetry poll: {e}")
# Wait before retrying
await asyncio.sleep(60)
else:
print("Login failed or timed out")

View file

@ -1,6 +1,5 @@
import asyncio
import logging
import random
from typing import Any, Callable, Coroutine, Dict, List, Optional, Union
from ..events import Event, EventDispatcher, EventType
@ -53,14 +52,21 @@ def _validate_destination(dst: DestinationType, prefix_length: int = 6) -> bytes
class CommandHandlerBase:
DEFAULT_TIMEOUT = 5.0
MAX_QUEUE_SIZE = 100
def __init__(self, default_timeout: Optional[float] = None):
def __init__(self, default_timeout: Optional[float] = None, max_queue_size: Optional[int] = None):
self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None
self._reader: Optional[MessageReader] = None
self.dispatcher: Optional[EventDispatcher] = None
self.default_timeout = (
default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT
)
max_size = max_queue_size if max_queue_size is not None else self.MAX_QUEUE_SIZE
self._command_queue = asyncio.Queue(maxsize=max_size)
self._start_lock = asyncio.Lock() # Only for start/stop operations
self._queue_processor_task: Optional[asyncio.Task] = None
self._is_running = False
def set_connection(self, connection: Any) -> None:
async def sender(data: bytes) -> None:
@ -81,7 +87,48 @@ class CommandHandlerBase:
timeout: Optional[float] = None,
) -> Event:
"""
Send a command and wait for expected event responses.
Queue a command for execution and wait for the response.
Args:
data: The data to send
expected_events: EventType or list of EventTypes to wait for
timeout: Timeout in seconds, or None to use default_timeout
Returns:
Event: The full event object that was received in response to the command
Raises:
RuntimeError: If the command queue is full
"""
async with self._start_lock:
if not self._is_running:
await self._start_queue_processor()
future = asyncio.Future()
try:
await asyncio.wait_for(
self._command_queue.put((data, expected_events, timeout, future)),
timeout=1.0
)
except asyncio.TimeoutError:
future.set_exception(RuntimeError(
f"Command queue is full ({self._command_queue.maxsize} commands pending)"
))
except Exception as e:
future.set_exception(e)
return await future
async def _send_internal(
self,
data: bytes,
expected_events: Optional[Union[EventType, List[EventType]]] = None,
timeout: Optional[float] = None,
) -> Event:
"""
Internal method that does the actual sending and waiting for events.
This runs inside the queue processor with lock protection.
Args:
data: The data to send
@ -94,7 +141,6 @@ class CommandHandlerBase:
if not self.dispatcher:
raise RuntimeError("Dispatcher not set, cannot send commands")
# Use the provided timeout or fall back to default_timeout
timeout = timeout if timeout is not None else self.default_timeout
if self._sender_func:
@ -105,13 +151,11 @@ class CommandHandlerBase:
if expected_events:
try:
# Convert single event to list if needed
if not isinstance(expected_events, list):
expected_events = [expected_events]
logger.debug(f"Waiting for events {expected_events}, timeout={timeout}")
# Create futures for all expected events
futures = []
for event_type in expected_events:
future = asyncio.create_task(
@ -119,22 +163,18 @@ class CommandHandlerBase:
)
futures.append(future)
# Wait for the first event to complete or all to timeout
done, pending = await asyncio.wait(
futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
# Cancel all pending futures
for future in pending:
future.cancel()
# Check if any future completed successfully
for future in done:
event = await future
if event:
return event
# Create an error event when no event is received
return Event(EventType.ERROR, {"reason": "no_event_received"})
except asyncio.TimeoutError:
logger.debug(f"Command timed out {data}")
@ -142,5 +182,107 @@ class CommandHandlerBase:
except Exception as e:
logger.debug(f"Command error: {e}")
return Event(EventType.ERROR, {"error": str(e)})
# For commands that don't expect events, return a success event
return Event(EventType.OK, {})
async def start_queue_processor(self):
"""
Start the command queue processor.
This should be called once when the connection is established.
"""
async with self._start_lock:
if not self._is_running:
await self._start_queue_processor()
async def _start_queue_processor(self):
"""Internal method to start the background queue processor."""
if not self._queue_processor_task or self._queue_processor_task.done():
self._is_running = True
self._queue_processor_task = asyncio.create_task(self._process_queue())
logger.debug("Started command queue processor")
async def _process_queue(self):
"""Process commands from the queue sequentially."""
logger.debug("Command queue processor started")
while self._is_running:
try:
item = await self._command_queue.get()
# kill queue signal
if item is None:
logger.debug("Received shutdown sentinel")
break
data, expected_events, timeout, future = item
if future.cancelled():
continue
try:
logger.debug(f"Processing queued command: {data.hex() if isinstance(data, bytes) else data}")
result = await self._send_internal(data, expected_events, timeout)
if not future.cancelled():
future.set_result(result)
except Exception as e:
logger.error(f"Error processing command: {e}")
if not future.cancelled():
future.set_exception(e)
# Small delay between commands to avoid overwhelming the device
await asyncio.sleep(0.01)
except asyncio.CancelledError:
logger.debug("Queue processor cancelled")
break
except Exception as e:
logger.error(f"Queue processor error: {e}")
# Continue processing even if there was an error
logger.debug("Command queue processor stopped")
async def stop_queue_processor(self):
"""Stop the queue processor gracefully."""
logger.debug("Stopping command queue processor")
if not self._is_running:
return
self._is_running = False
try:
# send kill signal and wait for it to be processed
await asyncio.wait_for(self._command_queue.put(None), timeout=1.0)
except asyncio.TimeoutError:
logger.warning("Could not send shutdown sentinel (queue may be full)")
if self._queue_processor_task:
try:
await asyncio.wait_for(self._queue_processor_task, timeout=2.0)
except asyncio.TimeoutError:
logger.warning("Queue processor did not stop gracefully, cancelling")
self._queue_processor_task.cancel()
try:
await self._queue_processor_task
except asyncio.CancelledError:
pass
self._queue_processor_task = None
cancelled_count = 0
while not self._command_queue.empty():
try:
item = self._command_queue.get_nowait()
if item is None:
continue
if isinstance(item, tuple) and len(item) == 4:
_, _, _, future = item
if not future.cancelled():
future.cancel()
cancelled_count += 1
except Exception as e:
logger.debug(f"Error during cleanup: {e}")
break
if cancelled_count > 0:
logger.debug(f"Cancelled {cancelled_count} pending commands")
logger.debug("Command queue processor stopped")

View file

@ -1,6 +1,9 @@
import logging
from enum import Enum
import json
from mailbox import Message
from meshcore.commands.messaging import MessagingCommands
from .base import CommandHandlerBase
from ..events import EventType
from cayennelpp import LppFrame, LppData
@ -38,6 +41,9 @@ def lpp_parse_mma(buf):
i = i + 1
type = buf[i]
lpp_type = LppType.get_lpp_type(type)
if lpp_type is None:
logger.error(f"Unknown LPP type: {type}")
return None
size = lpp_type.size
i = i + 1
min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
@ -70,7 +76,7 @@ def parse_acl(buf):
return res
class BinaryCommandHandler(CommandHandlerBase):
class BinaryCommandHandler(MessagingCommands):
"""Helper functions to handle binary requests through binary commands"""
async def req_binary(self, contact, request, timeout=0):
@ -84,6 +90,9 @@ class BinaryCommandHandler(CommandHandlerBase):
timeout = (
res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
)
if self.dispatcher is None:
logger.error("No dispatcher set, cannot wait for response")
return None
res2 = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE,
attribute_filters={"tag": exp_tag},

View file

@ -21,7 +21,7 @@ class ConnectionProtocol(Protocol):
"""Disconnect from the device/server."""
...
async def send(self, data):
async def send(self, data) -> Any:
"""Send data through the connection."""
...

View file

@ -1,3 +1,4 @@
from collections.abc import Coroutine
from enum import Enum
import inspect
import logging
@ -113,7 +114,7 @@ class EventDispatcher:
def subscribe(
self,
event_type: Union[EventType, None],
callback: Callable[[Event], Union[None, asyncio.Future]],
callback: Callable[[Event], Coroutine[Any, Any, None]],
attribute_filters: Optional[Dict[str, Any]] = None,
) -> Subscription:
"""
@ -226,7 +227,7 @@ class EventDispatcher:
"""
future = asyncio.Future()
def event_handler(event: Event):
async def event_handler(event: Event):
if not future.done():
future.set_result(event)

View file

@ -162,6 +162,10 @@ class MeshCore:
result = await self.connection_manager.connect()
if result is None:
raise ConnectionError("Failed to connect to device")
# Start the command queue processor after successful connection
await self.commands.start_queue_processor()
return await self.commands.send_appstart()
async def disconnect(self):
@ -173,6 +177,9 @@ class MeshCore:
if hasattr(self, "_auto_fetch_subscription") and self._auto_fetch_subscription:
await self.stop_auto_message_fetching()
# Stop the command queue processor
await self.commands.stop_queue_processor()
# Disconnect the connection object
await self.connection_manager.disconnect()