Fix race conditions

This commit is contained in:
zach 2025-12-30 21:42:14 -07:00
parent 83b2817cc4
commit be97e5c7fc
4 changed files with 197 additions and 10 deletions

View file

@ -7,19 +7,35 @@ import '../models/path_selection.dart';
import 'storage_service.dart';
import 'app_settings_service.dart';
class _AckHistoryEntry {
final String messageId;
final List<Uint8List> ackHashes;
final DateTime timestamp;
_AckHistoryEntry({
required this.messageId,
required this.ackHashes,
required this.timestamp,
});
}
class MessageRetryService extends ChangeNotifier {
static const int maxRetries = 5;
static const int maxAckHistorySize = 100;
final StorageService _storage;
final Map<String, Timer> _timeoutTimers = {};
final Map<String, Message> _pendingMessages = {};
final Map<String, Contact> _pendingContacts = {};
final Map<String, PathSelection> _pendingPathSelections = {};
final Map<String, List<Uint8List>> _expectedAckHashes = {}; // Track all expected ACKs for retries
final List<_AckHistoryEntry> _ackHistory = []; // Rolling buffer of recent ACK hashes
Function(Contact, String, int, int)? _sendMessageCallback;
Function(String, Message)? _addMessageCallback;
Function(Message)? _updateMessageCallback;
Function(Contact)? _clearContactPathCallback;
Function(Contact, Uint8List, int)? _setContactPathCallback;
Function(int, int)? _calculateTimeoutCallback;
AppSettingsService? _appSettingsService;
Function(String, PathSelection, bool, int?)? _recordPathResultCallback;
@ -31,6 +47,7 @@ class MessageRetryService extends ChangeNotifier {
required Function(String, Message) addMessageCallback,
required Function(Message) updateMessageCallback,
Function(Contact)? clearContactPathCallback,
Function(Contact, Uint8List, int)? setContactPathCallback,
Function(int pathLength, int messageBytes)? calculateTimeoutCallback,
AppSettingsService? appSettingsService,
Function(String, PathSelection, bool, int?)? recordPathResultCallback,
@ -39,6 +56,7 @@ class MessageRetryService extends ChangeNotifier {
_addMessageCallback = addMessageCallback;
_updateMessageCallback = updateMessageCallback;
_clearContactPathCallback = clearContactPathCallback;
_setContactPathCallback = setContactPathCallback;
_calculateTimeoutCallback = calculateTimeoutCallback;
_appSettingsService = appSettingsService;
_recordPathResultCallback = recordPathResultCallback;
@ -89,6 +107,23 @@ class MessageRetryService extends ChangeNotifier {
if (message == null || contact == null) return;
// Sync path settings with device before sending
// Use the path that was captured when the message was first sent
if (_setContactPathCallback != null && _clearContactPathCallback != null) {
if (message.pathLength != null && message.pathLength! < 0) {
// Flood mode - clear the path
debugPrint('Setting flood mode for retry attempt ${message.retryCount}');
_clearContactPathCallback!(contact);
} else if (message.pathLength != null && message.pathLength! >= 0) {
// Specific path (including direct neighbor with pathLength=0)
final pathStr = message.pathBytes.isEmpty
? 'direct'
: message.pathBytes.map((b) => b.toRadixString(16).padLeft(2, '0')).join(',');
debugPrint('Setting path [$pathStr] (${message.pathLength} hops) for retry attempt ${message.retryCount}');
await _setContactPathCallback!(contact, message.pathBytes, message.pathLength!);
}
}
final attempt = message.retryCount.clamp(0, 3);
if (_sendMessageCallback != null) {
@ -105,10 +140,21 @@ class MessageRetryService extends ChangeNotifier {
void updateMessageFromSent(Uint8List ackHash, int timeoutMs) {
for (var entry in _pendingMessages.entries) {
final message = entry.value;
if (message.status == MessageStatus.pending) {
// Only update if pending (waiting to send) or already sent with matching ACK
if (message.status == MessageStatus.pending ||
(message.status == MessageStatus.sent &&
message.expectedAckHash != null &&
listEquals(message.expectedAckHash, ackHash))) {
final contact = _pendingContacts[entry.key];
final selection = _pendingPathSelections[entry.key];
// Add this ACK hash to the list of expected ACKs for this message
_expectedAckHashes[entry.key] ??= [];
if (!_expectedAckHashes[entry.key]!.any((hash) => listEquals(hash, ackHash))) {
_expectedAckHashes[entry.key]!.add(Uint8List.fromList(ackHash));
debugPrint('Added ACK hash ${ackHash.map((b) => b.toRadixString(16).padLeft(2, '0')).join()} to message ${entry.key} (total: ${_expectedAckHashes[entry.key]!.length})');
}
// Use device-provided timeout, or calculate from radio settings if timeout is 0 or invalid
int actualTimeout = timeoutMs;
if (timeoutMs <= 0 && _calculateTimeoutCallback != null && contact != null) {
@ -127,7 +173,7 @@ class MessageRetryService extends ChangeNotifier {
final updatedMessage = message.copyWith(
status: MessageStatus.sent,
expectedAckHash: ackHash,
expectedAckHash: ackHash, // Keep the most recent one for display
estimatedTimeoutMs: actualTimeout,
sentAt: DateTime.now(),
);
@ -139,9 +185,11 @@ class MessageRetryService extends ChangeNotifier {
}
_startTimeoutTimer(entry.key, actualTimeout);
debugPrint('Updated message ${entry.key} with ACK hash: ${ackHash.map((b) => b.toRadixString(16).padLeft(2, '0')).join()}');
return;
}
}
debugPrint('No pending message found for ACK hash: ${ackHash.map((b) => b.toRadixString(16).padLeft(2, '0')).join()}');
}
void _startTimeoutTimer(String messageId, int timeoutMs) {
@ -158,12 +206,15 @@ class MessageRetryService extends ChangeNotifier {
if (message == null || contact == null) return;
debugPrint('Timeout for message $messageId (retry ${message.retryCount}/${maxRetries - 1})');
if (message.retryCount < maxRetries - 1) {
final backoffMs = 1000 * (1 << message.retryCount);
final updatedMessage = message.copyWith(
retryCount: message.retryCount + 1,
status: MessageStatus.pending,
// Keep expectedAckHash - it will be updated when the new attempt is sent
);
_pendingMessages[messageId] = updatedMessage;
@ -172,6 +223,7 @@ class MessageRetryService extends ChangeNotifier {
_updateMessageCallback!(updatedMessage);
}
debugPrint('Scheduling retry after ${backoffMs}ms');
Timer(Duration(milliseconds: backoffMs), () {
_attemptSend(messageId);
});
@ -179,6 +231,9 @@ class MessageRetryService extends ChangeNotifier {
// Max retries reached - mark as failed
final failedMessage = message.copyWith(status: MessageStatus.failed);
// Move ACK hashes to history before removing
_moveAckHashesToHistory(messageId);
_pendingMessages.remove(messageId);
_pendingContacts.remove(messageId);
_pendingPathSelections.remove(messageId);
@ -201,22 +256,71 @@ class MessageRetryService extends ChangeNotifier {
}
}
void _moveAckHashesToHistory(String messageId) {
final ackHashes = _expectedAckHashes.remove(messageId);
if (ackHashes != null && ackHashes.isNotEmpty) {
_ackHistory.add(_AckHistoryEntry(
messageId: messageId,
ackHashes: ackHashes,
timestamp: DateTime.now(),
));
// Trim history to max size (rolling buffer)
while (_ackHistory.length > maxAckHistorySize) {
_ackHistory.removeAt(0);
}
debugPrint('Moved ${ackHashes.length} ACK hashes to history for message $messageId (history size: ${_ackHistory.length})');
}
}
bool _checkAckHistory(Uint8List ackHash) {
for (final entry in _ackHistory) {
for (final expectedHash in entry.ackHashes) {
if (listEquals(expectedHash, ackHash)) {
debugPrint('Found ACK match in history: messageId=${entry.messageId}, age=${DateTime.now().difference(entry.timestamp).inSeconds}s');
return true;
}
}
}
return false;
}
void handleAckReceived(Uint8List ackHash, int tripTimeMs) {
String? matchedMessageId;
final ackHashHex = ackHash.map((b) => b.toRadixString(16).padLeft(2, '0')).join();
debugPrint('ACK received: $ackHashHex, trip time: ${tripTimeMs}ms');
debugPrint('Pending messages:');
for (var entry in _pendingMessages.entries) {
final message = entry.value;
if (message.expectedAckHash != null &&
listEquals(message.expectedAckHash, ackHash)) {
matchedMessageId = entry.key;
break;
final expectedHex = message.expectedAckHash?.map((b) => b.toRadixString(16).padLeft(2, '0')).join() ?? 'none';
final allExpectedHashes = _expectedAckHashes[entry.key]?.map((h) => h.map((b) => b.toRadixString(16).padLeft(2, '0')).join()).join(', ') ?? 'none';
debugPrint(' ${entry.key}: status=${message.status}, latestAck=$expectedHex, allAcks=[$allExpectedHashes], retry=${message.retryCount}');
}
// Check against ALL expected ACK hashes (from all retry attempts)
for (var entry in _expectedAckHashes.entries) {
final messageId = entry.key;
final expectedHashes = entry.value;
for (final expectedHash in expectedHashes) {
if (listEquals(expectedHash, ackHash)) {
matchedMessageId = messageId;
debugPrint('Matched ACK to message: $matchedMessageId (matched hash from attempt ${expectedHashes.indexOf(expectedHash)})');
break;
}
}
if (matchedMessageId != null) break;
}
if (matchedMessageId != null) {
final message = _pendingMessages[matchedMessageId]!;
final contact = _pendingContacts[matchedMessageId];
final selection = _pendingPathSelections[matchedMessageId];
// Cancel any pending timeout or retry
_timeoutTimers[matchedMessageId]?.cancel();
_timeoutTimers.remove(matchedMessageId);
@ -226,6 +330,9 @@ class MessageRetryService extends ChangeNotifier {
tripTimeMs: tripTimeMs,
);
// Move ACK hashes to history before removing
_moveAckHashesToHistory(matchedMessageId);
_pendingMessages.remove(matchedMessageId);
_pendingContacts.remove(matchedMessageId);
_pendingPathSelections.remove(matchedMessageId);
@ -239,6 +346,13 @@ class MessageRetryService extends ChangeNotifier {
}
notifyListeners();
} else {
// Check ACK history for recently completed messages
if (_checkAckHistory(ackHash)) {
debugPrint('ACK matched a recently completed message from history');
} else {
debugPrint('No matching message found for ACK: $ackHashHex');
}
}
}
@ -326,6 +440,8 @@ class MessageRetryService extends ChangeNotifier {
_pendingMessages.clear();
_pendingContacts.clear();
_pendingPathSelections.clear();
_expectedAckHashes.clear();
_ackHistory.clear();
super.dispose();
}
}