G3: F03 — restructure _attempt_reconnect from tail-recursion to loop

_attempt_reconnect previously tail-recursed via asyncio.create_task,
re-assigning self._reconnect_task from inside the running coroutine.
This orphaned the current task — disconnect() cancelled only the
newest pointer, leaving previous-generation attempts in flight.  Those
orphaned tasks could set _is_connected = True after the caller thought
the session was closed.

Fix: replace with a single iterative while loop that holds one
persistent task for the entire reconnect session.  The task is created
once in handle_disconnect and torn down only on success, max attempts
exhausted, or disconnect() cancellation.

Refs: Forensics report finding F03
This commit is contained in:
Matthew Wolter 2026-04-11 19:47:12 -07:00
parent 115a402ac2
commit ab4c27dcae

View file

@ -118,45 +118,41 @@ class ConnectionManager:
)
async def _attempt_reconnect(self):
"""Attempt to reconnect with flat delay."""
logger.debug(
f"Attempting reconnection ({self._reconnect_attempts + 1}/{self.max_reconnect_attempts})"
)
self._reconnect_attempts += 1
"""Attempt to reconnect using an iterative loop.
# Flat 1 second delay for all attempts
await asyncio.sleep(1)
Runs as a single persistent task for the entire reconnect session.
Previous implementation used tail-recursion via create_task which
orphaned the running task reference disconnect() could only cancel
the newest pointer, leaving earlier attempts in flight (F03).
"""
while self._reconnect_attempts < self.max_reconnect_attempts:
self._reconnect_attempts += 1
logger.debug(
f"Attempting reconnection ({self._reconnect_attempts}/{self.max_reconnect_attempts})"
)
try:
result = await self.connection.connect()
if result is not None:
self._is_connected = True
self._reconnect_attempts = 0
await self._emit_event(
EventType.CONNECTED,
{"connection_info": result, "reconnected": True},
)
logger.debug("Reconnected successfully")
else:
# Reconnection failed, try again if we haven't exceeded max attempts
if self._reconnect_attempts < self.max_reconnect_attempts:
self._reconnect_task = asyncio.create_task(
self._attempt_reconnect()
)
else:
# Flat 1 second delay for all attempts
await asyncio.sleep(1)
try:
result = await self.connection.connect()
if result is not None:
self._is_connected = True
self._reconnect_attempts = 0
await self._emit_event(
EventType.DISCONNECTED,
{"reason": "reconnect_failed", "max_attempts_exceeded": True},
EventType.CONNECTED,
{"connection_info": result, "reconnected": True},
)
except Exception as e:
logger.debug(f"Reconnection attempt failed: {e}")
if self._reconnect_attempts < self.max_reconnect_attempts:
self._reconnect_task = asyncio.create_task(self._attempt_reconnect())
else:
await self._emit_event(
EventType.DISCONNECTED,
{"reason": f"reconnect_error: {e}", "max_attempts_exceeded": True},
)
logger.debug("Reconnected successfully")
return
except Exception as e:
logger.debug(f"Reconnection attempt failed: {e}")
# All attempts exhausted
await self._emit_event(
EventType.DISCONNECTED,
{"reason": "reconnect_failed", "max_attempts_exceeded": True},
)
async def _emit_event(self, event_type: EventType, payload: dict):
"""Emit connection events if dispatcher is available."""