Introduced brief delays to allow for complete cleanup and connection establishment, particularly for BLE connections. Updated event subscription cleanup.

This commit is contained in:
agessaman 2025-11-02 09:31:40 -08:00
parent fc53df1fdb
commit 155fba7fab

View file

@ -1035,6 +1035,19 @@ class PacketCapture:
try:
self.logger.info("Connecting to MeshCore node...")
# Clean up any existing connection before attempting new one
# This prevents pending tasks from interfering with new connections
if self.meshcore:
try:
self.cleanup_event_subscriptions()
self.meshcore.stop()
await self.meshcore.disconnect()
except Exception as cleanup_error:
self.logger.debug(f"Error cleaning up existing connection before reconnect: {cleanup_error}")
self.meshcore = None
# Brief delay to ensure cleanup completes
await asyncio.sleep(0.2)
# Get connection type from environment
connection_type = self.get_env('CONNECTION_TYPE', 'ble').lower()
self.connection_type = connection_type # Store for health checks
@ -1122,6 +1135,14 @@ class PacketCapture:
self.meshcore = await meshcore.MeshCore.create_ble(ble_device_name, debug=False)
except Exception as e:
self.logger.error(f"Error connecting to device '{ble_device_name}': {e}")
# Clean up any partial connection
if self.meshcore:
try:
self.meshcore.stop()
await self.meshcore.disconnect()
except:
pass
self.meshcore = None
# Fallback to general scan
self.logger.info("Falling back to general BLE scan...")
self.meshcore = await meshcore.MeshCore.create_ble(debug=False)
@ -1130,7 +1151,17 @@ class PacketCapture:
self.logger.info("Scanning for available BLE devices...")
self.meshcore = await meshcore.MeshCore.create_ble(debug=False)
if self.meshcore.is_connected:
# Wait a brief moment for connection to fully establish (especially for BLE)
if self.meshcore and self.connection_type == 'ble':
await asyncio.sleep(0.5)
# Retry connection check a few times in case it's still establishing
for attempt in range(3):
if self.meshcore.is_connected:
break
if attempt < 2:
await asyncio.sleep(0.5)
if self.meshcore and self.meshcore.is_connected:
self.connected = True
# Reset SDK reconnect exhaustion flag on successful connection
if self.connection_type == 'tcp':
@ -1200,10 +1231,28 @@ class PacketCapture:
return True
else:
self.logger.error("Failed to connect to MeshCore node")
# Clean up failed connection attempt to prevent pending tasks
if self.meshcore:
try:
self.cleanup_event_subscriptions()
self.meshcore.stop()
await self.meshcore.disconnect()
except Exception as cleanup_error:
self.logger.debug(f"Error cleaning up failed connection: {cleanup_error}")
self.meshcore = None
return False
except Exception as e:
self.logger.error(f"Connection failed: {e}")
# Clean up any partial connection on exception
if self.meshcore:
try:
self.cleanup_event_subscriptions()
self.meshcore.stop()
await self.meshcore.disconnect()
except Exception as cleanup_error:
self.logger.debug(f"Error cleaning up failed connection: {cleanup_error}")
self.meshcore = None
return False
def cleanup_event_subscriptions(self):
@ -1212,6 +1261,7 @@ class PacketCapture:
return
try:
# Use meshcore.unsubscribe() method which is the proper API
if hasattr(self.meshcore, "dispatcher") and hasattr(self.meshcore.dispatcher, "subscriptions"):
subscription_count = len(self.meshcore.dispatcher.subscriptions)
if subscription_count > 0:
@ -1219,7 +1269,8 @@ class PacketCapture:
# Create a copy of the list to avoid modification during iteration
for subscription in list(self.meshcore.dispatcher.subscriptions):
try:
subscription.unsubscribe()
# Use meshcore.unsubscribe() - the proper API method
self.meshcore.unsubscribe(subscription)
except Exception as e:
self.logger.debug(f"Error unsubscribing: {e}")
self.logger.debug(f"Cleared {subscription_count} event subscriptions")
@ -1248,12 +1299,21 @@ class PacketCapture:
# Clean up existing connection
if self.meshcore:
try:
# Clean up event subscriptions BEFORE disconnecting to prevent pending tasks
# Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks
self.cleanup_event_subscriptions()
# Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors
try:
self.meshcore.stop()
except Exception as e:
self.logger.debug(f"Error stopping meshcore event dispatcher: {e}")
# Disconnect the connection
await self.meshcore.disconnect()
except Exception as e:
self.logger.debug(f"Error disconnecting during reconnect: {e}")
self.meshcore = None
# For BLE connections, wait a bit longer to ensure full cleanup
if self.connection_type == 'ble':
await asyncio.sleep(0.5)
# Wait before retrying with exponential backoff
if delay > 0:
@ -2483,8 +2543,13 @@ class PacketCapture:
if self.meshcore and self.get_env('CONNECTION_TYPE', 'ble').lower() == 'ble':
try:
self.logger.info("Disconnecting BLE device...")
# Clean up event subscriptions BEFORE disconnecting to prevent pending tasks
# Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks
self.cleanup_event_subscriptions()
# Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors
try:
self.meshcore.stop()
except Exception as e:
self.logger.debug(f"Error stopping meshcore event dispatcher: {e}")
await asyncio.wait_for(self.meshcore.disconnect(), timeout=10.0)
# Additional BLE disconnection using bluetoothctl on Linux
@ -2500,14 +2565,22 @@ class PacketCapture:
await asyncio.sleep(1) # Give time for disconnection
except Exception as e:
self.logger.debug(f"Could not force BLE disconnect via bluetoothctl: {e}")
else:
# On non-Linux systems, add a short delay to ensure BLE cleanup completes
await asyncio.sleep(0.5)
except asyncio.TimeoutError:
self.logger.warning("Timeout disconnecting BLE device")
except Exception as e:
self.logger.warning(f"Error during BLE disconnection: {e}")
elif self.meshcore:
try:
# Clean up event subscriptions BEFORE disconnecting to prevent pending tasks
# Clean up event subscriptions BEFORE stopping/disconnecting to prevent pending tasks
self.cleanup_event_subscriptions()
# Stop the event dispatcher task synchronously to prevent "Task was destroyed" errors
try:
self.meshcore.stop()
except Exception as e:
self.logger.debug(f"Error stopping meshcore event dispatcher: {e}")
await asyncio.wait_for(self.meshcore.disconnect(), timeout=5.0)
except asyncio.TimeoutError:
self.logger.warning("Timeout disconnecting MeshCore device")