From 6d030af555cf9e4ee3efc0958eede22a4bb4ccfc Mon Sep 17 00:00:00 2001 From: agessaman Date: Thu, 18 Dec 2025 08:04:02 -0800 Subject: [PATCH] Add retry logic for device commands in auth_token and packet_capture modules - Introduced `_retryable_device_sign` in `auth_token.py` to handle transient errors during device signing operations with exponential backoff. - Implemented `retryable_device_command` in `packet_capture.py` for executing device commands with timeout and retry logic, enhancing robustness against communication issues. - Updated various device command calls in `packet_capture.py` to utilize the new retry logic, improving error handling and reliability in device interactions. --- auth_token.py | 130 +++++++++++++++++++++++++++++---- packet_capture.py | 181 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 275 insertions(+), 36 deletions(-) diff --git a/auth_token.py b/auth_token.py index ba8d902..bdf4996 100644 --- a/auth_token.py +++ b/auth_token.py @@ -211,6 +211,91 @@ def _create_auth_token_internal( return f"{header_encoded}.{payload_encoded}.{signature_hex}" +async def _retryable_device_sign( + sign_func, + command_name: str = "sign", + timeout: float = 20.0, + max_retries: int = 3, + retry_delay: float = 0.3, + backoff_multiplier: float = 1.5 +): + """ + Execute a device signing command with timeout and retry logic. + + Args: + sign_func: Async function that returns a meshcore Event + command_name: Name of the command for logging + timeout: Timeout in seconds for each attempt + max_retries: Maximum number of retry attempts (including initial attempt) + retry_delay: Initial delay between retries in seconds + backoff_multiplier: Multiplier for exponential backoff + + Returns: + Event object from the command + + Raises: + Exception: If all retries fail + """ + from meshcore import EventType + + last_error = None + current_delay = retry_delay + + for attempt in range(max_retries): + try: + # Add small delay between retries (except first attempt) + if attempt > 0: + await asyncio.sleep(current_delay) + current_delay *= backoff_multiplier # Exponential backoff + + # Execute command with timeout + result = await asyncio.wait_for( + sign_func(), + timeout=timeout + ) + + # Check if result is an error + if result and hasattr(result, 'type'): + if result.type == EventType.ERROR: + error_payload = result.payload if hasattr(result, 'payload') else {} + error_reason = error_payload.get('reason', 'unknown') + + # Check if it's a transient error that we should retry + if error_reason == 'no_event_received' and attempt < max_retries - 1: + last_error = f"{command_name} failed: {error_reason}" + logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})") + continue + else: + # Permanent error or last attempt + raise Exception(f"{command_name} failed: {error_payload}") + else: + # Success - return the result + if attempt > 0: + logger.debug(f"{command_name} succeeded on attempt {attempt + 1}") + return result + else: + # Unexpected result format + raise Exception(f"{command_name} returned unexpected result format") + + except asyncio.TimeoutError: + last_error = f"{command_name} timed out after {timeout}s" + if attempt < max_retries - 1: + logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})") + continue + else: + raise Exception(f"{last_error} (all {max_retries} attempts exhausted)") + except Exception as e: + # Re-raise if it's not a retryable error + if "failed:" in str(e) and attempt < max_retries - 1: + last_error = str(e) + logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})") + continue + raise + + # All retries failed + raise Exception(f"{command_name} failed after {max_retries} attempts: {last_error}") + + async def _create_auth_token_with_device( payload: AuthTokenPayload, public_key_hex: str, @@ -383,31 +468,48 @@ async def _create_auth_token_with_device( logger.debug(f"Total bytes sent: {total_sent} (expected: {len(signing_input_bytes)})") - # Manual sign_finish + # Manual sign_finish with retry logic logger.debug("Calling sign_finish...") - sig_evt = await meshcore_instance.commands.sign_finish(timeout=None, data_size=len(signing_input_bytes)) + sig_evt = await _retryable_device_sign( + lambda: meshcore_instance.commands.sign_finish(timeout=None, data_size=len(signing_input_bytes)), + "sign_finish", + timeout=20.0, + max_retries=3, + retry_delay=0.3 + ) except Exception as e: logger.debug(f"Manual signing flow failed: {e}") raise else: - # Use the high-level sign() method with library defaults + # Use the high-level sign() method with library defaults and retry logic # Call sign() - it internally: # 1. Calls sign_start() to initialize # 2. Calls sign_data() for each chunk (using library's default chunk_size) # 3. Calls sign_finish() with calculated timeout based on data_size # The timeout parameter lets sign_finish() calculate timeout based on data_size # For JWT tokens (~372 bytes), it will use at least 15 seconds - # Try with timeout first (newer dev branch), fall back if not supported - try: - sig_evt = await meshcore_instance.commands.sign( - signing_input_bytes, - timeout=None # Let sign_finish() calculate timeout based on data_size (15s minimum) - ) - except TypeError: - # Older version doesn't support timeout parameter - sig_evt = await meshcore_instance.commands.sign( - signing_input_bytes - ) + # Wrap in retry logic to handle transient BLE communication issues + async def sign_with_retry(): + # Try with timeout first (newer dev branch), fall back if not supported + try: + return await meshcore_instance.commands.sign( + signing_input_bytes, + timeout=None # Let sign_finish() calculate timeout based on data_size (15s minimum) + ) + except TypeError: + # Older version doesn't support timeout parameter + return await meshcore_instance.commands.sign( + signing_input_bytes + ) + + # Use retry helper with longer timeout for signing operations + sig_evt = await _retryable_device_sign( + sign_with_retry, + "sign", + timeout=20.0, # Longer timeout for signing (device needs time to process) + max_retries=3, + retry_delay=0.3 + ) # Check for error first (as shown in example) if sig_evt.type == EventType.ERROR: diff --git a/packet_capture.py b/packet_capture.py index 814c8bc..c9ceaa1 100644 --- a/packet_capture.py +++ b/packet_capture.py @@ -474,6 +474,93 @@ class PacketCapture: else: await asyncio.sleep(timeout) return False + + async def retryable_device_command(self, command_func, command_name: str, + timeout: float = 10.0, max_retries: int = 3, + retry_delay: float = 0.2, backoff_multiplier: float = 1.5): + """ + Execute a device command with timeout and retry logic. + + Args: + command_func: Async function that returns a meshcore Event + command_name: Name of the command for logging + timeout: Timeout in seconds for each attempt + max_retries: Maximum number of retry attempts (including initial attempt) + retry_delay: Initial delay between retries in seconds + backoff_multiplier: Multiplier for exponential backoff + + Returns: + Event object from the command, or None if all retries failed + """ + if not self.meshcore or not self.meshcore.is_connected: + self.logger.debug(f"Cannot execute {command_name} - not connected to device") + return None + + last_error = None + current_delay = retry_delay + + for attempt in range(max_retries): + try: + # Add small delay between retries (except first attempt) + if attempt > 0: + await asyncio.sleep(current_delay) + current_delay *= backoff_multiplier # Exponential backoff + + # Execute command with timeout + result = await asyncio.wait_for( + command_func(), + timeout=timeout + ) + + # Check if result is an error + if result and hasattr(result, 'type'): + if result.type == EventType.ERROR: + error_payload = result.payload if hasattr(result, 'payload') else {} + error_reason = error_payload.get('reason', 'unknown') + + # Check if it's a transient error that we should retry + if error_reason == 'no_event_received' and attempt < max_retries - 1: + last_error = f"{command_name} failed: {error_reason}" + if self.debug: + self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})") + continue + else: + # Permanent error or last attempt + self.logger.debug(f"{command_name} failed: {error_payload}") + return result + else: + # Success - return the result + if attempt > 0: + self.logger.debug(f"{command_name} succeeded on attempt {attempt + 1}") + return result + else: + # Unexpected result format + self.logger.debug(f"{command_name} returned unexpected result format") + return result + + except asyncio.TimeoutError: + last_error = f"{command_name} timed out after {timeout}s" + if attempt < max_retries - 1: + if self.debug: + self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})") + continue + else: + self.logger.debug(f"{last_error} (all {max_retries} attempts exhausted)") + return None + except Exception as e: + last_error = f"{command_name} raised exception: {e}" + if attempt < max_retries - 1: + if self.debug: + self.logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})") + continue + else: + self.logger.debug(f"{last_error} (all {max_retries} attempts exhausted)") + return None + + # All retries failed + if last_error: + self.logger.debug(f"{command_name} failed after {max_retries} attempts: {last_error}") + return None def should_exit_for_systemd_restart(self) -> bool: """Determine if we should exit to allow systemd restart""" @@ -546,8 +633,17 @@ class PacketCapture: return {"model": "unknown", "version": "unknown"} self.logger.debug("Querying device for firmware info...") - # Use send_device_query() to get firmware version - result = await self.meshcore.commands.send_device_query() + # Use send_device_query() to get firmware version with retry logic + result = await self.retryable_device_command( + lambda: self.meshcore.commands.send_device_query(), + "send_device_query", + timeout=10.0, + max_retries=3 + ) + + if result is None: + self.logger.debug("Device query failed after retries") + return {"model": "unknown", "version": "unknown"} self.logger.debug(f"Device query result type: {result.type}") self.logger.debug(f"Device query result: {result}") @@ -722,10 +818,16 @@ class PacketCapture: self.logger.warning("Cannot set radio clock - not connected to device") return False - # Get current device time + # Get current device time with retry logic self.logger.info("Checking device time...") - time_result = await self.meshcore.commands.get_time() - if time_result.type == EventType.ERROR: + time_result = await self.retryable_device_command( + lambda: self.meshcore.commands.get_time(), + "get_time", + timeout=8.0, + max_retries=2, + retry_delay=0.2 + ) + if time_result is None or time_result.type == EventType.ERROR: self.logger.warning("Device does not support time commands") return False @@ -739,8 +841,14 @@ class PacketCapture: time_diff = current_time - device_time self.logger.info(f"Device time is {time_diff} seconds behind, updating...") - result = await self.meshcore.commands.set_time(current_time) - if result.type == EventType.OK: + result = await self.retryable_device_command( + lambda: self.meshcore.commands.set_time(current_time), + "set_time", + timeout=8.0, + max_retries=2, + retry_delay=0.2 + ) + if result and result.type == EventType.OK: self.logger.info(f"✓ Radio clock updated to: {current_time}") self.last_clock_sync_time = current_time return True @@ -764,8 +872,19 @@ class PacketCapture: self.logger.error("Cannot fetch private key - not connected to device") return False - # Use meshcore library to export private key - result = await self.meshcore.commands.export_private_key() + # Use meshcore library to export private key with retry logic + result = await self.retryable_device_command( + lambda: self.meshcore.commands.export_private_key(), + "export_private_key", + timeout=10.0, + max_retries=3, + retry_delay=0.3 # Slightly longer delay for private key operations + ) + + if result is None: + self.logger.error("Error fetching private key: command failed after retries") + self.private_key_export_available = False + return False if result.type == EventType.PRIVATE_KEY: self.device_private_key = result.payload["private_key"] @@ -1065,14 +1184,17 @@ class PacketCapture: self.logger.warning("TCP transport is closed or closing") return False - # 3. Try a lightweight command with timeout + # 3. Try a lightweight command with timeout and retry # Use longer timeout for TCP with SDK auto-reconnect (device might be busy) health_check_timeout = 8.0 if (self.connection_type == 'tcp' and self.tcp_sdk_auto_reconnect_enabled) else 5.0 try: - result = await asyncio.wait_for( - self.meshcore.commands.send_device_query(), - timeout=health_check_timeout + result = await self.retryable_device_command( + lambda: self.meshcore.commands.send_device_query(), + "send_device_query (health check)", + timeout=health_check_timeout, + max_retries=2, # Fewer retries for health checks + retry_delay=0.2 ) if result and hasattr(result, 'type') and result.type != EventType.ERROR: return True @@ -1334,9 +1456,12 @@ class PacketCapture: if not self_info_populated and hasattr(self.meshcore, 'commands'): try: self.logger.debug("Attempting to query device info...") - await asyncio.wait_for( - self.meshcore.commands.send_device_query(), - timeout=3.0 + result = await self.retryable_device_command( + lambda: self.meshcore.commands.send_device_query(), + "send_device_query (device info)", + timeout=3.0, + max_retries=2, # Fewer retries for device info query + retry_delay=0.2 ) # Wait a bit more after query await asyncio.sleep(0.5) @@ -2033,19 +2158,31 @@ class PacketCapture: stats_payload = {} try: - core_result = await self.meshcore.commands.get_stats_core() - if core_result.type == EventType.STATS_CORE and core_result.payload: + core_result = await self.retryable_device_command( + lambda: self.meshcore.commands.get_stats_core(), + "get_stats_core", + timeout=8.0, + max_retries=2, # Fewer retries for stats (non-critical) + retry_delay=0.2 + ) + if core_result and core_result.type == EventType.STATS_CORE and core_result.payload: stats_payload.update(core_result.payload) - elif core_result.type == EventType.ERROR: + elif core_result and core_result.type == EventType.ERROR: self.logger.debug(f"Core stats unavailable: {core_result.payload}") except Exception as exc: self.logger.debug(f"Error fetching core stats: {exc}") try: - radio_result = await self.meshcore.commands.get_stats_radio() - if radio_result.type == EventType.STATS_RADIO and radio_result.payload: + radio_result = await self.retryable_device_command( + lambda: self.meshcore.commands.get_stats_radio(), + "get_stats_radio", + timeout=8.0, + max_retries=2, # Fewer retries for stats (non-critical) + retry_delay=0.2 + ) + if radio_result and radio_result.type == EventType.STATS_RADIO and radio_result.payload: stats_payload.update(radio_result.payload) - elif radio_result.type == EventType.ERROR: + elif radio_result and radio_result.type == EventType.ERROR: self.logger.debug(f"Radio stats unavailable: {radio_result.payload}") except Exception as exc: self.logger.debug(f"Error fetching radio stats: {exc}")