Add retry logic for device commands in auth_token and packet_capture modules

- Introduced `_retryable_device_sign` in `auth_token.py` to handle transient errors during device signing operations with exponential backoff.
- Implemented `retryable_device_command` in `packet_capture.py` for executing device commands with timeout and retry logic, enhancing robustness against communication issues.
- Updated various device command calls in `packet_capture.py` to utilize the new retry logic, improving error handling and reliability in device interactions.
This commit is contained in:
agessaman 2025-12-18 08:04:02 -08:00
parent d825d2860d
commit 6d030af555
2 changed files with 275 additions and 36 deletions

View file

@ -211,6 +211,91 @@ def _create_auth_token_internal(
return f"{header_encoded}.{payload_encoded}.{signature_hex}"
async def _retryable_device_sign(
sign_func,
command_name: str = "sign",
timeout: float = 20.0,
max_retries: int = 3,
retry_delay: float = 0.3,
backoff_multiplier: float = 1.5
):
"""
Execute a device signing command with timeout and retry logic.
Args:
sign_func: Async function that returns a meshcore Event
command_name: Name of the command for logging
timeout: Timeout in seconds for each attempt
max_retries: Maximum number of retry attempts (including initial attempt)
retry_delay: Initial delay between retries in seconds
backoff_multiplier: Multiplier for exponential backoff
Returns:
Event object from the command
Raises:
Exception: If all retries fail
"""
from meshcore import EventType
last_error = None
current_delay = retry_delay
for attempt in range(max_retries):
try:
# Add small delay between retries (except first attempt)
if attempt > 0:
await asyncio.sleep(current_delay)
current_delay *= backoff_multiplier # Exponential backoff
# Execute command with timeout
result = await asyncio.wait_for(
sign_func(),
timeout=timeout
)
# Check if result is an error
if result and hasattr(result, 'type'):
if result.type == EventType.ERROR:
error_payload = result.payload if hasattr(result, 'payload') else {}
error_reason = error_payload.get('reason', 'unknown')
# Check if it's a transient error that we should retry
if error_reason == 'no_event_received' and attempt < max_retries - 1:
last_error = f"{command_name} failed: {error_reason}"
logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
# Permanent error or last attempt
raise Exception(f"{command_name} failed: {error_payload}")
else:
# Success - return the result
if attempt > 0:
logger.debug(f"{command_name} succeeded on attempt {attempt + 1}")
return result
else:
# Unexpected result format
raise Exception(f"{command_name} returned unexpected result format")
except asyncio.TimeoutError:
last_error = f"{command_name} timed out after {timeout}s"
if attempt < max_retries - 1:
logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
raise Exception(f"{last_error} (all {max_retries} attempts exhausted)")
except Exception as e:
# Re-raise if it's not a retryable error
if "failed:" in str(e) and attempt < max_retries - 1:
last_error = str(e)
logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
raise
# All retries failed
raise Exception(f"{command_name} failed after {max_retries} attempts: {last_error}")
async def _create_auth_token_with_device(
payload: AuthTokenPayload,
public_key_hex: str,
@ -383,31 +468,48 @@ async def _create_auth_token_with_device(
logger.debug(f"Total bytes sent: {total_sent} (expected: {len(signing_input_bytes)})")
# Manual sign_finish
# Manual sign_finish with retry logic
logger.debug("Calling sign_finish...")
sig_evt = await meshcore_instance.commands.sign_finish(timeout=None, data_size=len(signing_input_bytes))
sig_evt = await _retryable_device_sign(
lambda: meshcore_instance.commands.sign_finish(timeout=None, data_size=len(signing_input_bytes)),
"sign_finish",
timeout=20.0,
max_retries=3,
retry_delay=0.3
)
except Exception as e:
logger.debug(f"Manual signing flow failed: {e}")
raise
else:
# Use the high-level sign() method with library defaults
# Use the high-level sign() method with library defaults and retry logic
# Call sign() - it internally:
# 1. Calls sign_start() to initialize
# 2. Calls sign_data() for each chunk (using library's default chunk_size)
# 3. Calls sign_finish() with calculated timeout based on data_size
# The timeout parameter lets sign_finish() calculate timeout based on data_size
# For JWT tokens (~372 bytes), it will use at least 15 seconds
# Try with timeout first (newer dev branch), fall back if not supported
try:
sig_evt = await meshcore_instance.commands.sign(
signing_input_bytes,
timeout=None # Let sign_finish() calculate timeout based on data_size (15s minimum)
)
except TypeError:
# Older version doesn't support timeout parameter
sig_evt = await meshcore_instance.commands.sign(
signing_input_bytes
)
# Wrap in retry logic to handle transient BLE communication issues
async def sign_with_retry():
# Try with timeout first (newer dev branch), fall back if not supported
try:
return await meshcore_instance.commands.sign(
signing_input_bytes,
timeout=None # Let sign_finish() calculate timeout based on data_size (15s minimum)
)
except TypeError:
# Older version doesn't support timeout parameter
return await meshcore_instance.commands.sign(
signing_input_bytes
)
# Use retry helper with longer timeout for signing operations
sig_evt = await _retryable_device_sign(
sign_with_retry,
"sign",
timeout=20.0, # Longer timeout for signing (device needs time to process)
max_retries=3,
retry_delay=0.3
)
# Check for error first (as shown in example)
if sig_evt.type == EventType.ERROR:

View file

@ -474,6 +474,93 @@ class PacketCapture:
else:
await asyncio.sleep(timeout)
return False
async def retryable_device_command(self, command_func, command_name: str,
timeout: float = 10.0, max_retries: int = 3,
retry_delay: float = 0.2, backoff_multiplier: float = 1.5):
"""
Execute a device command with timeout and retry logic.
Args:
command_func: Async function that returns a meshcore Event
command_name: Name of the command for logging
timeout: Timeout in seconds for each attempt
max_retries: Maximum number of retry attempts (including initial attempt)
retry_delay: Initial delay between retries in seconds
backoff_multiplier: Multiplier for exponential backoff
Returns:
Event object from the command, or None if all retries failed
"""
if not self.meshcore or not self.meshcore.is_connected:
self.logger.debug(f"Cannot execute {command_name} - not connected to device")
return None
last_error = None
current_delay = retry_delay
for attempt in range(max_retries):
try:
# Add small delay between retries (except first attempt)
if attempt > 0:
await asyncio.sleep(current_delay)
current_delay *= backoff_multiplier # Exponential backoff
# Execute command with timeout
result = await asyncio.wait_for(
command_func(),
timeout=timeout
)
# Check if result is an error
if result and hasattr(result, 'type'):
if result.type == EventType.ERROR:
error_payload = result.payload if hasattr(result, 'payload') else {}
error_reason = error_payload.get('reason', 'unknown')
# Check if it's a transient error that we should retry
if error_reason == 'no_event_received' and attempt < max_retries - 1:
last_error = f"{command_name} failed: {error_reason}"
if self.debug:
self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
# Permanent error or last attempt
self.logger.debug(f"{command_name} failed: {error_payload}")
return result
else:
# Success - return the result
if attempt > 0:
self.logger.debug(f"{command_name} succeeded on attempt {attempt + 1}")
return result
else:
# Unexpected result format
self.logger.debug(f"{command_name} returned unexpected result format")
return result
except asyncio.TimeoutError:
last_error = f"{command_name} timed out after {timeout}s"
if attempt < max_retries - 1:
if self.debug:
self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
self.logger.debug(f"{last_error} (all {max_retries} attempts exhausted)")
return None
except Exception as e:
last_error = f"{command_name} raised exception: {e}"
if attempt < max_retries - 1:
if self.debug:
self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
self.logger.debug(f"{last_error} (all {max_retries} attempts exhausted)")
return None
# All retries failed
if last_error:
self.logger.debug(f"{command_name} failed after {max_retries} attempts: {last_error}")
return None
def should_exit_for_systemd_restart(self) -> bool:
"""Determine if we should exit to allow systemd restart"""
@ -546,8 +633,17 @@ class PacketCapture:
return {"model": "unknown", "version": "unknown"}
self.logger.debug("Querying device for firmware info...")
# Use send_device_query() to get firmware version
result = await self.meshcore.commands.send_device_query()
# Use send_device_query() to get firmware version with retry logic
result = await self.retryable_device_command(
lambda: self.meshcore.commands.send_device_query(),
"send_device_query",
timeout=10.0,
max_retries=3
)
if result is None:
self.logger.debug("Device query failed after retries")
return {"model": "unknown", "version": "unknown"}
self.logger.debug(f"Device query result type: {result.type}")
self.logger.debug(f"Device query result: {result}")
@ -722,10 +818,16 @@ class PacketCapture:
self.logger.warning("Cannot set radio clock - not connected to device")
return False
# Get current device time
# Get current device time with retry logic
self.logger.info("Checking device time...")
time_result = await self.meshcore.commands.get_time()
if time_result.type == EventType.ERROR:
time_result = await self.retryable_device_command(
lambda: self.meshcore.commands.get_time(),
"get_time",
timeout=8.0,
max_retries=2,
retry_delay=0.2
)
if time_result is None or time_result.type == EventType.ERROR:
self.logger.warning("Device does not support time commands")
return False
@ -739,8 +841,14 @@ class PacketCapture:
time_diff = current_time - device_time
self.logger.info(f"Device time is {time_diff} seconds behind, updating...")
result = await self.meshcore.commands.set_time(current_time)
if result.type == EventType.OK:
result = await self.retryable_device_command(
lambda: self.meshcore.commands.set_time(current_time),
"set_time",
timeout=8.0,
max_retries=2,
retry_delay=0.2
)
if result and result.type == EventType.OK:
self.logger.info(f"✓ Radio clock updated to: {current_time}")
self.last_clock_sync_time = current_time
return True
@ -764,8 +872,19 @@ class PacketCapture:
self.logger.error("Cannot fetch private key - not connected to device")
return False
# Use meshcore library to export private key
result = await self.meshcore.commands.export_private_key()
# Use meshcore library to export private key with retry logic
result = await self.retryable_device_command(
lambda: self.meshcore.commands.export_private_key(),
"export_private_key",
timeout=10.0,
max_retries=3,
retry_delay=0.3 # Slightly longer delay for private key operations
)
if result is None:
self.logger.error("Error fetching private key: command failed after retries")
self.private_key_export_available = False
return False
if result.type == EventType.PRIVATE_KEY:
self.device_private_key = result.payload["private_key"]
@ -1065,14 +1184,17 @@ class PacketCapture:
self.logger.warning("TCP transport is closed or closing")
return False
# 3. Try a lightweight command with timeout
# 3. Try a lightweight command with timeout and retry
# Use longer timeout for TCP with SDK auto-reconnect (device might be busy)
health_check_timeout = 8.0 if (self.connection_type == 'tcp' and self.tcp_sdk_auto_reconnect_enabled) else 5.0
try:
result = await asyncio.wait_for(
self.meshcore.commands.send_device_query(),
timeout=health_check_timeout
result = await self.retryable_device_command(
lambda: self.meshcore.commands.send_device_query(),
"send_device_query (health check)",
timeout=health_check_timeout,
max_retries=2, # Fewer retries for health checks
retry_delay=0.2
)
if result and hasattr(result, 'type') and result.type != EventType.ERROR:
return True
@ -1334,9 +1456,12 @@ class PacketCapture:
if not self_info_populated and hasattr(self.meshcore, 'commands'):
try:
self.logger.debug("Attempting to query device info...")
await asyncio.wait_for(
self.meshcore.commands.send_device_query(),
timeout=3.0
result = await self.retryable_device_command(
lambda: self.meshcore.commands.send_device_query(),
"send_device_query (device info)",
timeout=3.0,
max_retries=2, # Fewer retries for device info query
retry_delay=0.2
)
# Wait a bit more after query
await asyncio.sleep(0.5)
@ -2033,19 +2158,31 @@ class PacketCapture:
stats_payload = {}
try:
core_result = await self.meshcore.commands.get_stats_core()
if core_result.type == EventType.STATS_CORE and core_result.payload:
core_result = await self.retryable_device_command(
lambda: self.meshcore.commands.get_stats_core(),
"get_stats_core",
timeout=8.0,
max_retries=2, # Fewer retries for stats (non-critical)
retry_delay=0.2
)
if core_result and core_result.type == EventType.STATS_CORE and core_result.payload:
stats_payload.update(core_result.payload)
elif core_result.type == EventType.ERROR:
elif core_result and core_result.type == EventType.ERROR:
self.logger.debug(f"Core stats unavailable: {core_result.payload}")
except Exception as exc:
self.logger.debug(f"Error fetching core stats: {exc}")
try:
radio_result = await self.meshcore.commands.get_stats_radio()
if radio_result.type == EventType.STATS_RADIO and radio_result.payload:
radio_result = await self.retryable_device_command(
lambda: self.meshcore.commands.get_stats_radio(),
"get_stats_radio",
timeout=8.0,
max_retries=2, # Fewer retries for stats (non-critical)
retry_delay=0.2
)
if radio_result and radio_result.type == EventType.STATS_RADIO and radio_result.payload:
stats_payload.update(radio_result.payload)
elif radio_result.type == EventType.ERROR:
elif radio_result and radio_result.type == EventType.ERROR:
self.logger.debug(f"Radio stats unavailable: {radio_result.payload}")
except Exception as exc:
self.logger.debug(f"Error fetching radio stats: {exc}")