diff --git a/packet_capture.py b/packet_capture.py index 71f3ea8..c72afce 100644 --- a/packet_capture.py +++ b/packet_capture.py @@ -1035,6 +1035,19 @@ class PacketCapture: try: self.logger.info("Connecting to MeshCore node...") + # Clean up any existing connection before attempting new one + # This prevents pending tasks from interfering with new connections + if self.meshcore: + try: + self.cleanup_event_subscriptions() + self.meshcore.stop() + await self.meshcore.disconnect() + except Exception as cleanup_error: + self.logger.debug(f"Error cleaning up existing connection before reconnect: {cleanup_error}") + self.meshcore = None + # Brief delay to ensure cleanup completes + await asyncio.sleep(0.2) + # Get connection type from environment connection_type = self.get_env('CONNECTION_TYPE', 'ble').lower() self.connection_type = connection_type # Store for health checks @@ -1122,6 +1135,14 @@ class PacketCapture: self.meshcore = await meshcore.MeshCore.create_ble(ble_device_name, debug=False) except Exception as e: self.logger.error(f"Error connecting to device '{ble_device_name}': {e}") + # Clean up any partial connection + if self.meshcore: + try: + self.meshcore.stop() + await self.meshcore.disconnect() + except: + pass + self.meshcore = None # Fallback to general scan self.logger.info("Falling back to general BLE scan...") self.meshcore = await meshcore.MeshCore.create_ble(debug=False) @@ -1130,7 +1151,17 @@ class PacketCapture: self.logger.info("Scanning for available BLE devices...") self.meshcore = await meshcore.MeshCore.create_ble(debug=False) - if self.meshcore.is_connected: + # Wait a brief moment for connection to fully establish (especially for BLE) + if self.meshcore and self.connection_type == 'ble': + await asyncio.sleep(0.5) + # Retry connection check a few times in case it's still establishing + for attempt in range(3): + if self.meshcore.is_connected: + break + if attempt < 2: + await asyncio.sleep(0.5) + + if self.meshcore and self.meshcore.is_connected: self.connected = True # Reset SDK reconnect exhaustion flag on successful connection if self.connection_type == 'tcp': @@ -1200,10 +1231,28 @@ class PacketCapture: return True else: self.logger.error("Failed to connect to MeshCore node") + # Clean up failed connection attempt to prevent pending tasks + if self.meshcore: + try: + self.cleanup_event_subscriptions() + self.meshcore.stop() + await self.meshcore.disconnect() + except Exception as cleanup_error: + self.logger.debug(f"Error cleaning up failed connection: {cleanup_error}") + self.meshcore = None return False except Exception as e: self.logger.error(f"Connection failed: {e}") + # Clean up any partial connection on exception + if self.meshcore: + try: + self.cleanup_event_subscriptions() + self.meshcore.stop() + await self.meshcore.disconnect() + except Exception as cleanup_error: + self.logger.debug(f"Error cleaning up failed connection: {cleanup_error}") + self.meshcore = None return False def cleanup_event_subscriptions(self): @@ -1212,6 +1261,7 @@ class PacketCapture: return try: + # Use meshcore.unsubscribe() method which is the proper API if hasattr(self.meshcore, "dispatcher") and hasattr(self.meshcore.dispatcher, "subscriptions"): subscription_count = len(self.meshcore.dispatcher.subscriptions) if subscription_count > 0: @@ -1219,7 +1269,8 @@ class PacketCapture: # Create a copy of the list to avoid modification during iteration for subscription in list(self.meshcore.dispatcher.subscriptions): try: - subscription.unsubscribe() + # Use meshcore.unsubscribe() - the proper API method + self.meshcore.unsubscribe(subscription) except Exception as e: self.logger.debug(f"Error unsubscribing: {e}") self.logger.debug(f"Cleared {subscription_count} event subscriptions") @@ -1248,12 +1299,21 @@ class PacketCapture: # Clean up existing connection if self.meshcore: try: - # Clean up event subscriptions BEFORE disconnecting to prevent pending tasks + # Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks self.cleanup_event_subscriptions() + # Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors + try: + self.meshcore.stop() + except Exception as e: + self.logger.debug(f"Error stopping meshcore event dispatcher: {e}") + # Disconnect the connection await self.meshcore.disconnect() except Exception as e: self.logger.debug(f"Error disconnecting during reconnect: {e}") self.meshcore = None + # For BLE connections, wait a bit longer to ensure full cleanup + if self.connection_type == 'ble': + await asyncio.sleep(0.5) # Wait before retrying with exponential backoff if delay > 0: @@ -2483,8 +2543,13 @@ class PacketCapture: if self.meshcore and self.get_env('CONNECTION_TYPE', 'ble').lower() == 'ble': try: self.logger.info("Disconnecting BLE device...") - # Clean up event subscriptions BEFORE disconnecting to prevent pending tasks + # Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks self.cleanup_event_subscriptions() + # Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors + try: + self.meshcore.stop() + except Exception as e: + self.logger.debug(f"Error stopping meshcore event dispatcher: {e}") await asyncio.wait_for(self.meshcore.disconnect(), timeout=10.0) # Additional BLE disconnection using bluetoothctl on Linux @@ -2500,14 +2565,22 @@ class PacketCapture: await asyncio.sleep(1) # Give time for disconnection except Exception as e: self.logger.debug(f"Could not force BLE disconnect via bluetoothctl: {e}") + else: + # On non-Linux systems, add a short delay to ensure BLE cleanup completes + await asyncio.sleep(0.5) except asyncio.TimeoutError: self.logger.warning("Timeout disconnecting BLE device") except Exception as e: self.logger.warning(f"Error during BLE disconnection: {e}") elif self.meshcore: try: - # Clean up event subscriptions BEFORE disconnecting to prevent pending tasks + # Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks self.cleanup_event_subscriptions() + # Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors + try: + self.meshcore.stop() + except Exception as e: + self.logger.debug(f"Error stopping meshcore event dispatcher: {e}") await asyncio.wait_for(self.meshcore.disconnect(), timeout=5.0) except asyncio.TimeoutError: self.logger.warning("Timeout disconnecting MeshCore device")