meshcore-open/lib/services/message_retry_service.dart
zjs81 b336aedbc5 fix: address PR #296 code review feedback
- Clamp ML predictions between physics floor (raw airtime) and ceiling
  (worst-case formula) so model can never produce unsafe timeouts
- Replace hourOfDay feature with secondsSinceLastRx for network activity
- Remove unused _ContactStats.stdDev and dead model persistence code
- Debounce observation writes (2s) instead of writing on every delivery
- Skip recording observations when pathLength is null to avoid corrupting
  training data
- Add comment explaining global (not per-contact) RX time tracking
- Remove notifyListeners from retrain to avoid unnecessary widget rebuilds
- Run dart format
2026-03-14 17:32:08 -07:00

912 lines
31 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:uuid/uuid.dart';
import 'package:crypto/crypto.dart';
import '../models/contact.dart';
import '../models/message.dart';
import '../models/path_selection.dart';
import 'app_settings_service.dart';
import 'app_debug_log_service.dart';
class _AckHistoryEntry {
final String messageId;
final List<Uint8List> ackHashes;
final DateTime timestamp;
_AckHistoryEntry({
required this.messageId,
required this.ackHashes,
required this.timestamp,
});
}
class _AckHashMapping {
final String messageId;
final DateTime timestamp;
_AckHashMapping({required this.messageId, required this.timestamp});
}
class MessageRetryService extends ChangeNotifier {
static const int maxRetries = 5;
static const int maxAckHistorySize = 100;
final Map<String, Timer> _timeoutTimers = {};
final Map<String, Message> _pendingMessages = {};
final Map<String, Contact> _pendingContacts = {};
final Map<String, PathSelection> _pendingPathSelections = {};
final Map<String, _AckHashMapping> _ackHashToMessageId =
{}; // ackHashHex → messageId + timestamp for O(1) lookup
final Map<String, List<Uint8List>> _expectedAckHashes =
{}; // Track all expected ACKs for retries (for history)
final List<_AckHistoryEntry> _ackHistory =
[]; // Rolling buffer of recent ACK hashes
final Map<String, List<String>> _pendingMessageQueuePerContact =
{}; // contactPubKeyHex → FIFO queue of messageIds (DEPRECATED - will be removed)
final Map<String, List<String>> _sendQueue =
{}; // contactPubKeyHex → ordered list of messageIds awaiting send
final Set<String> _activeMessages =
{}; // messageIds currently in-flight (sent/retrying)
final Set<String> _resolvedMessages =
{}; // messageIds already resolved (prevents double _onMessageResolved)
final Map<String, String> _expectedHashToMessageId =
{}; // expectedAckHashHex → messageId (for matching RESP_CODE_SENT by hash)
Function(Contact, String, int, int)? _sendMessageCallback;
Function(String, Message)? _addMessageCallback;
Function(Message)? _updateMessageCallback;
Function(Contact)? _clearContactPathCallback;
Function(Contact, Uint8List, int)? _setContactPathCallback;
Function(int, int, {String? contactKey})? _calculateTimeoutCallback;
Uint8List? Function()? _getSelfPublicKeyCallback;
String Function(Contact, String)? _prepareContactOutboundTextCallback;
AppSettingsService? _appSettingsService;
AppDebugLogService? _debugLogService;
Function(String, PathSelection, bool, int?)? _recordPathResultCallback;
Function(String, int, int, int)? _onDeliveryObservedCallback;
MessageRetryService();
void initialize({
required Function(Contact, String, int, int) sendMessageCallback,
required Function(String, Message) addMessageCallback,
required Function(Message) updateMessageCallback,
Function(Contact)? clearContactPathCallback,
Function(Contact, Uint8List, int)? setContactPathCallback,
Function(int pathLength, int messageBytes, {String? contactKey})?
calculateTimeoutCallback,
Uint8List? Function()? getSelfPublicKeyCallback,
String Function(Contact, String)? prepareContactOutboundTextCallback,
AppSettingsService? appSettingsService,
AppDebugLogService? debugLogService,
Function(String, PathSelection, bool, int?)? recordPathResultCallback,
Function(
String contactKey,
int pathLength,
int messageBytes,
int tripTimeMs,
)?
onDeliveryObservedCallback,
}) {
_sendMessageCallback = sendMessageCallback;
_addMessageCallback = addMessageCallback;
_updateMessageCallback = updateMessageCallback;
_clearContactPathCallback = clearContactPathCallback;
_setContactPathCallback = setContactPathCallback;
_calculateTimeoutCallback = calculateTimeoutCallback;
_getSelfPublicKeyCallback = getSelfPublicKeyCallback;
_prepareContactOutboundTextCallback = prepareContactOutboundTextCallback;
_appSettingsService = appSettingsService;
_debugLogService = debugLogService;
_recordPathResultCallback = recordPathResultCallback;
_onDeliveryObservedCallback = onDeliveryObservedCallback;
}
/// Compute expected ACK hash using same algorithm as firmware:
/// SHA256([timestamp(4)][attempt(1)][text][sender_pubkey(32)]) -> first 4 bytes
static Uint8List computeExpectedAckHash(
int timestampSeconds,
int attempt,
String text,
Uint8List senderPubKey,
) {
final textBytes = utf8.encode(text);
final buffer = Uint8List(4 + 1 + textBytes.length + senderPubKey.length);
int offset = 0;
// timestamp (4 bytes, little-endian)
buffer[offset++] = timestampSeconds & 0xFF;
buffer[offset++] = (timestampSeconds >> 8) & 0xFF;
buffer[offset++] = (timestampSeconds >> 16) & 0xFF;
buffer[offset++] = (timestampSeconds >> 24) & 0xFF;
// attempt (1 byte)
buffer[offset++] = attempt & 0x03;
// text
buffer.setRange(offset, offset + textBytes.length, textBytes);
offset += textBytes.length;
// sender public key (32 bytes)
buffer.setRange(offset, offset + senderPubKey.length, senderPubKey);
// Compute SHA256 and return first 4 bytes
final hash = sha256.convert(buffer);
return Uint8List.fromList(hash.bytes.sublist(0, 4));
}
Future<void> sendMessageWithRetry({
required Contact contact,
required String text,
PathSelection? pathSelection,
Uint8List? pathBytes,
int? pathLength,
}) async {
final messageId = const Uuid().v4();
final useFlood = pathSelection?.useFlood ?? false;
final messagePathBytes =
pathBytes ?? _resolveMessagePathBytes(contact, useFlood, pathSelection);
final messagePathLength =
pathLength ??
_resolveMessagePathLength(contact, useFlood, pathSelection);
final message = Message(
senderKey: contact.publicKey,
text: text,
timestamp: DateTime.now(),
isOutgoing: true,
status: MessageStatus.pending,
messageId: messageId,
retryCount: 0,
pathLength: messagePathLength,
pathBytes: messagePathBytes,
);
_pendingMessages[messageId] = message;
_pendingContacts[messageId] = contact;
if (pathSelection != null) {
_pendingPathSelections[messageId] = pathSelection;
}
if (_addMessageCallback != null) {
_addMessageCallback!(contact.publicKeyHex, message);
}
// Queue per contact — only one message in-flight at a time to avoid
// overflowing the firmware's 8-entry expected_ack_table.
final contactKey = contact.publicKeyHex;
_sendQueue[contactKey] ??= [];
_sendQueue[contactKey]!.add(messageId);
if (!_activeMessages.any(
(id) => _pendingContacts[id]?.publicKeyHex == contactKey,
)) {
_sendNextForContact(contactKey);
}
}
void _sendNextForContact(String contactKey) {
final queue = _sendQueue[contactKey];
if (queue == null) return;
// Drain stale entries iteratively instead of recursing.
while (queue.isNotEmpty) {
final messageId = queue.removeAt(0);
if (_pendingMessages.containsKey(messageId)) {
_activeMessages.add(messageId);
_attemptSend(messageId).catchError((e) {
debugPrint('_attemptSend threw for $messageId: $e');
final msg = _pendingMessages[messageId];
if (msg != null) {
final failed = msg.copyWith(status: MessageStatus.failed);
_pendingMessages[messageId] = failed;
_updateMessageCallback?.call(failed);
}
_onMessageResolved(messageId, contactKey);
});
return;
}
// Message was cancelled/cleaned up while queued — try next
}
}
void _onMessageResolved(String messageId, String contactKey) {
if (_resolvedMessages.contains(messageId)) return;
_resolvedMessages.add(messageId);
_activeMessages.remove(messageId);
_sendNextForContact(contactKey);
}
Future<void> _attemptSend(String messageId) async {
final message = _pendingMessages[messageId];
final contact = _pendingContacts[messageId];
if (message == null || contact == null) return;
// Sync path settings with device before sending
// Use the path that was captured when the message was first sent
if (_setContactPathCallback != null && _clearContactPathCallback != null) {
if (message.pathLength != null && message.pathLength! < 0) {
debugPrint(
'Setting flood mode for retry attempt ${message.retryCount}',
);
await _clearContactPathCallback!(contact);
} else if (message.pathLength != null && message.pathLength! >= 0) {
final pathStr = message.pathBytes.isEmpty
? 'direct'
: message.pathBytes
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join(',');
debugPrint(
'Setting path [$pathStr] (${message.pathLength} hops) for retry attempt ${message.retryCount}',
);
await _setContactPathCallback!(
contact,
message.pathBytes,
message.pathLength!,
);
}
}
// Re-validate after async gap — a timer or ACK could have resolved/retried
// this message while we were awaiting the path callback.
final currentMessage = _pendingMessages[messageId];
if (currentMessage == null || _resolvedMessages.contains(messageId)) {
debugPrint(
'_attemptSend: message $messageId resolved during path sync, aborting',
);
return;
}
// If the message was retried by a timer during our await, the retryCount
// will have advanced. Only proceed if it still matches the attempt we started.
if (currentMessage.retryCount != message.retryCount) {
debugPrint(
'_attemptSend: message $messageId retryCount changed during path sync, aborting',
);
return;
}
final attempt = message.retryCount.clamp(0, 3);
final timestampSeconds = message.timestamp.millisecondsSinceEpoch ~/ 1000;
// Compute expected ACK hash that device will return in RESP_CODE_SENT
// IMPORTANT: Use the transformed text (with SMAZ encoding if enabled) to match device's hash
final selfPubKey = _getSelfPublicKeyCallback?.call();
if (selfPubKey != null) {
final outboundText =
_prepareContactOutboundTextCallback?.call(contact, message.text) ??
message.text;
final expectedHash = MessageRetryService.computeExpectedAckHash(
timestampSeconds,
attempt,
outboundText,
selfPubKey,
);
final expectedHashHex = expectedHash
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join();
_expectedHashToMessageId[expectedHashHex] = messageId;
final shortText = message.text.length > 20
? '${message.text.substring(0, 20)}...'
: message.text;
_debugLogService?.info(
'Sent "$shortText" to ${contact.name} → expect ACK hash $expectedHashHex (attempt $attempt)',
tag: 'AckHash',
);
debugPrint(
'Computed expected ACK hash $expectedHashHex for message $messageId',
);
}
// DEPRECATED: Old queue-based matching (kept for fallback)
_pendingMessageQueuePerContact[contact.publicKeyHex] ??= [];
_pendingMessageQueuePerContact[contact.publicKeyHex]!.add(messageId);
if (_sendMessageCallback != null) {
_sendMessageCallback!(contact, message.text, attempt, timestampSeconds);
} else {
// No send callback — message would be stuck forever. Fail it immediately.
debugPrint(
'_attemptSend: no sendMessageCallback, failing message $messageId',
);
final failedMessage = message.copyWith(status: MessageStatus.failed);
_pendingMessages[messageId] = failedMessage;
_updateMessageCallback?.call(failedMessage);
_onMessageResolved(messageId, contact.publicKeyHex);
}
}
bool updateMessageFromSent(
Uint8List ackHash,
int timeoutMs, {
bool allowQueueFallback = true,
}) {
final ackHashHex = ackHash
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join();
// NEW: Try hash-based matching first (fixes LoRa message drops causing mismatches)
String? messageId = _expectedHashToMessageId.remove(ackHashHex);
Contact? contact;
if (messageId != null) {
contact = _pendingContacts[messageId];
final message = _pendingMessages[messageId];
if (contact != null && message != null) {
final shortText = message.text.length > 20
? '${message.text.substring(0, 20)}...'
: message.text;
_debugLogService?.info(
'RESP_CODE_SENT received: ACK hash $ackHashHex ✓ matched "$shortText" to ${contact.name}',
tag: 'AckHash',
);
debugPrint(
'Hash-based match: ACK hash $ackHashHex → message $messageId',
);
// Remove from old queue since we matched
_pendingMessageQueuePerContact[contact.publicKeyHex]?.remove(messageId);
if (_pendingMessageQueuePerContact[contact.publicKeyHex]?.isEmpty ??
false) {
_pendingMessageQueuePerContact.remove(contact.publicKeyHex);
}
} else {
_debugLogService?.warn(
'RESP_CODE_SENT: ACK hash $ackHashHex matched but message no longer pending',
tag: 'AckHash',
);
debugPrint('Hash matched $messageId but message no longer pending');
messageId = null;
contact = null;
}
}
// FALLBACK: Old queue-based matching (for messages sent before hash computation was added)
// Only match within a single contact's queue to avoid cross-contact mismatches.
if (messageId == null && allowQueueFallback) {
_debugLogService?.warn(
'RESP_CODE_SENT: ACK hash $ackHashHex not found in hash table, falling back to queue',
tag: 'AckHash',
);
debugPrint(
'Hash-based match failed for $ackHashHex, falling back to queue-based matching',
);
// Search all contact queues so concurrent chats don't miss matches.
final queuesToSearch = _pendingMessageQueuePerContact;
for (var entry in queuesToSearch.entries) {
final contactKey = entry.key;
final queue = entry.value;
// Drain stale entries until we find a valid one or exhaust the queue.
while (queue.isNotEmpty) {
final candidateMessageId = queue.removeAt(0);
if (_pendingMessages.containsKey(candidateMessageId)) {
messageId = candidateMessageId;
contact = _pendingContacts[candidateMessageId];
debugPrint(
'Queue-based match (fallback): $ackHashHex → message $messageId for $contactKey',
);
break;
}
debugPrint('Dequeued stale message $candidateMessageId - skipping');
}
if (messageId != null) break;
}
}
if (messageId == null || contact == null) {
debugPrint('No pending message found for ACK hash: $ackHashHex');
return false;
}
// Store the mapping for future lookups (e.g., when ACK arrives)
// Keep timestamp so we can clean up old mappings later
_ackHashToMessageId[ackHashHex] = _AckHashMapping(
messageId: messageId,
timestamp: DateTime.now(),
);
debugPrint('Mapped ACK hash $ackHashHex to message $messageId');
final message = _pendingMessages[messageId];
final selection = _pendingPathSelections[messageId];
if (message == null) {
debugPrint(
'Message $messageId no longer pending for ACK hash: $ackHashHex',
);
_ackHashToMessageId.remove(ackHashHex);
return false;
}
// Add this ACK hash to the list of expected ACKs for this message (for history)
_expectedAckHashes[messageId] ??= [];
if (!_expectedAckHashes[messageId]!.any(
(hash) => listEquals(hash, ackHash),
)) {
_expectedAckHashes[messageId]!.add(Uint8List.fromList(ackHash));
debugPrint(
'Added ACK hash $ackHashHex to message $messageId (total: ${_expectedAckHashes[messageId]!.length})',
);
}
// Calculate timeout: prefer ML prediction, then device-provided, then physics fallback
int pathLengthValue;
if (selection != null) {
pathLengthValue = selection.useFlood ? -1 : selection.hopCount;
if (pathLengthValue < 0) pathLengthValue = contact.pathLength;
} else if (message.pathLength != null) {
pathLengthValue = message.pathLength!;
} else {
pathLengthValue = contact.pathLength;
}
int actualTimeout = timeoutMs;
if (_calculateTimeoutCallback != null) {
final calculated = _calculateTimeoutCallback!(
pathLengthValue,
message.text.length,
contactKey: contact.publicKeyHex,
);
// calculateTimeout tries ML first, falls back to physics.
// Use calculated value if device didn't provide one, or if ML
// produced a tighter prediction than the device's estimate.
if (timeoutMs <= 0 || calculated < timeoutMs) {
actualTimeout = calculated;
debugPrint(
'Using calculated timeout: ${actualTimeout}ms for path length $pathLengthValue',
);
}
}
final updatedMessage = message.copyWith(
status: MessageStatus.sent,
expectedAckHash: ackHash,
estimatedTimeoutMs: actualTimeout,
sentAt: DateTime.now(),
);
_pendingMessages[messageId] = updatedMessage;
if (_updateMessageCallback != null) {
_updateMessageCallback!(updatedMessage);
}
_startTimeoutTimer(messageId, actualTimeout);
debugPrint('Updated message $messageId with ACK hash: $ackHashHex');
return true;
}
bool get hasPendingMessages => _pendingMessages.isNotEmpty;
void _startTimeoutTimer(String messageId, int timeoutMs) {
_timeoutTimers[messageId]?.cancel();
_timeoutTimers[messageId] = Timer(Duration(milliseconds: timeoutMs), () {
_handleTimeout(messageId);
});
}
void _handleTimeout(String messageId) {
final message = _pendingMessages[messageId];
final contact = _pendingContacts[messageId];
final selection = _pendingPathSelections[messageId];
if (message == null || contact == null) {
debugPrint(
'Timeout fired but message $messageId no longer pending (likely already delivered)',
);
return;
}
final shortText = message.text.length > 20
? '${message.text.substring(0, 20)}...'
: message.text;
_debugLogService?.warn(
'Timeout: No ACK received for "$shortText" to ${contact.name} (attempt ${message.retryCount}) → retrying',
tag: 'AckHash',
);
debugPrint(
'Timeout for message $messageId (retry ${message.retryCount}/${maxRetries - 1})',
);
if (message.retryCount < maxRetries - 1) {
final backoffMs = 1000 * (1 << message.retryCount);
final updatedMessage = message.copyWith(
retryCount: message.retryCount + 1,
status: MessageStatus.pending,
// Keep expectedAckHash - it will be updated when the new attempt is sent
);
_pendingMessages[messageId] = updatedMessage;
if (_updateMessageCallback != null) {
_updateMessageCallback!(updatedMessage);
}
_debugLogService?.info(
'Scheduling retry for "$shortText" to ${contact.name} after ${backoffMs}ms backoff',
tag: 'AckHash',
);
debugPrint('Scheduling retry after ${backoffMs}ms');
// Store the backoff timer so it can be canceled if new RESP_CODE_SENT arrives
_timeoutTimers[messageId] = Timer(Duration(milliseconds: backoffMs), () {
// Double-check message is still pending before retry
if (_pendingMessages.containsKey(messageId)) {
_attemptSend(messageId);
} else {
debugPrint(
'Retry cancelled: message $messageId was delivered while waiting',
);
}
});
} else {
// Max retries reached - mark as failed
final failedMessage = message.copyWith(status: MessageStatus.failed);
_pendingMessages[messageId] = failedMessage;
// Check if we should clear the path on max retry
if (_appSettingsService?.settings.clearPathOnMaxRetry == true &&
_clearContactPathCallback != null) {
_clearContactPathCallback!(contact);
}
_recordPathResultFromMessage(
contact.publicKeyHex,
message,
selection,
false,
null,
);
if (_updateMessageCallback != null) {
_updateMessageCallback!(failedMessage);
}
notifyListeners();
// Message is done retrying — send next queued message for this contact
_onMessageResolved(messageId, contact.publicKeyHex);
// Keep message in pending maps for 30s grace period so late ACKs
// can still match and update the message to delivered.
_timeoutTimers[messageId] = Timer(const Duration(seconds: 30), () {
_moveAckHashesToHistory(messageId);
// Clean up ALL hash mappings for this message
_ackHashToMessageId.removeWhere(
(_, mapping) => mapping.messageId == messageId,
);
_expectedHashToMessageId.removeWhere((_, msgId) => msgId == messageId);
_pendingMessages.remove(messageId);
_pendingContacts.remove(messageId);
_pendingPathSelections.remove(messageId);
_timeoutTimers.remove(messageId);
_resolvedMessages.remove(messageId);
final contactKey = contact.publicKeyHex;
_pendingMessageQueuePerContact[contactKey]?.remove(messageId);
if (_pendingMessageQueuePerContact[contactKey]?.isEmpty ?? false) {
_pendingMessageQueuePerContact.remove(contactKey);
}
});
}
}
void _moveAckHashesToHistory(String messageId) {
final ackHashes = _expectedAckHashes.remove(messageId);
if (ackHashes != null && ackHashes.isNotEmpty) {
_ackHistory.add(
_AckHistoryEntry(
messageId: messageId,
ackHashes: ackHashes,
timestamp: DateTime.now(),
),
);
// Trim history to max size (rolling buffer)
while (_ackHistory.length > maxAckHistorySize) {
_ackHistory.removeAt(0);
}
debugPrint(
'Moved ${ackHashes.length} ACK hashes to history for message $messageId (history size: ${_ackHistory.length})',
);
}
}
bool _checkAckHistory(Uint8List ackHash) {
for (final entry in _ackHistory) {
for (final expectedHash in entry.ackHashes) {
if (listEquals(expectedHash, ackHash)) {
debugPrint(
'Found ACK match in history: messageId=${entry.messageId}, age=${DateTime.now().difference(entry.timestamp).inSeconds}s',
);
return true;
}
}
}
return false;
}
void handleAckReceived(Uint8List ackHash, int tripTimeMs) {
String? matchedMessageId;
final ackHashHex = ackHash
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join();
debugPrint('ACK received: $ackHashHex, trip time: ${tripTimeMs}ms');
// First, clean up old ACK hash mappings (older than 15 minutes)
final cutoffTime = DateTime.now().subtract(const Duration(minutes: 15));
final hashesToRemove = <String>[];
for (var entry in _ackHashToMessageId.entries) {
if (entry.value.timestamp.isBefore(cutoffTime)) {
hashesToRemove.add(entry.key);
}
}
for (var hash in hashesToRemove) {
_ackHashToMessageId.remove(hash);
}
if (hashesToRemove.isNotEmpty) {
debugPrint('Cleaned up ${hashesToRemove.length} old ACK hash mappings');
}
// Use direct O(1) lookup via ACK hash mapping
final mapping = _ackHashToMessageId[ackHashHex];
if (mapping != null) {
matchedMessageId = mapping.messageId;
debugPrint('Matched ACK to message via direct lookup: $matchedMessageId');
} else {
_debugLogService?.warn(
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex not found in direct mapping, trying fallback',
tag: 'AckHash',
);
// Fallback: Check against ALL expected ACK hashes (from all retry attempts)
debugPrint(
'ACK not in mapping, checking _expectedAckHashes (${_expectedAckHashes.length} messages)',
);
for (var entry in _expectedAckHashes.entries) {
final messageId = entry.key;
final expectedHashes = entry.value;
for (final expectedHash in expectedHashes) {
if (listEquals(expectedHash, ackHash)) {
matchedMessageId = messageId;
debugPrint(
'Matched ACK to message via fallback: $matchedMessageId (attempt ${expectedHashes.indexOf(expectedHash)})',
);
break;
}
}
if (matchedMessageId != null) break;
}
}
if (matchedMessageId != null) {
final message = _pendingMessages[matchedMessageId];
if (message == null) {
// Message was already cleaned up (e.g. grace period expired)
_ackHashToMessageId.remove(ackHashHex);
debugPrint(
'ACK matched $matchedMessageId but message already cleaned up',
);
return;
}
final contact = _pendingContacts[matchedMessageId];
final selection = _pendingPathSelections[matchedMessageId];
final shortText = message.text.length > 20
? '${message.text.substring(0, 20)}...'
: message.text;
_debugLogService?.info(
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex ✓ "$shortText" delivered to ${contact?.name ?? "unknown"} in ${tripTimeMs}ms',
tag: 'AckHash',
);
// Cancel any pending timeout or retry
_timeoutTimers[matchedMessageId]?.cancel();
_timeoutTimers.remove(matchedMessageId);
final deliveredMessage = message.copyWith(
status: MessageStatus.delivered,
deliveredAt: DateTime.now(),
tripTimeMs: tripTimeMs,
);
// Clean up ALL hash mappings for this message (from all retry attempts)
_ackHashToMessageId.removeWhere(
(_, mapping) => mapping.messageId == matchedMessageId,
);
_expectedHashToMessageId.removeWhere(
(_, msgId) => msgId == matchedMessageId,
);
// Move ACK hashes to history before removing
_moveAckHashesToHistory(matchedMessageId);
_pendingMessages.remove(matchedMessageId);
_pendingContacts.remove(matchedMessageId);
_pendingPathSelections.remove(matchedMessageId);
_resolvedMessages.remove(matchedMessageId);
// Clean up the queue entry for this contact (remove any remaining references to this message)
if (contact != null) {
_pendingMessageQueuePerContact[contact.publicKeyHex]?.remove(
matchedMessageId,
);
if (_pendingMessageQueuePerContact[contact.publicKeyHex]?.isEmpty ??
false) {
_pendingMessageQueuePerContact.remove(contact.publicKeyHex);
}
}
if (_updateMessageCallback != null) {
_updateMessageCallback!(deliveredMessage);
}
if (contact != null) {
_recordPathResultFromMessage(
contact.publicKeyHex,
message,
selection,
true,
tripTimeMs,
);
if (_onDeliveryObservedCallback != null &&
tripTimeMs > 0 &&
message.pathLength != null) {
_onDeliveryObservedCallback!(
contact.publicKeyHex,
message.pathLength!,
message.text.length,
tripTimeMs,
);
}
_onMessageResolved(matchedMessageId, contact.publicKeyHex);
}
notifyListeners();
} else {
// Check ACK history for recently completed messages
if (_checkAckHistory(ackHash)) {
_debugLogService?.info(
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex matched a recently completed message (duplicate ACK)',
tag: 'AckHash',
);
debugPrint('ACK matched a recently completed message from history');
} else {
_debugLogService?.error(
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex has no matching message!',
tag: 'AckHash',
);
debugPrint('No matching message found for ACK: $ackHashHex');
}
}
}
Uint8List _resolveMessagePathBytes(
Contact contact,
bool forceFlood,
PathSelection? selection,
) {
// Priority 1: Check user's path override
if (contact.pathOverride != null) {
if (contact.pathOverride! < 0) {
return Uint8List(0); // Force flood
}
return contact.pathOverrideBytes ?? Uint8List(0);
}
// Priority 2: Check forceFlood or device flood mode
if (forceFlood || contact.pathLength < 0 || selection?.useFlood == true) {
return Uint8List(0);
}
// Priority 3: Check PathSelection (auto-rotation)
if (selection != null && selection.pathBytes.isNotEmpty) {
return Uint8List.fromList(selection.pathBytes);
}
// Priority 4: Use device's discovered path
return contact.path;
}
int? _resolveMessagePathLength(
Contact contact,
bool forceFlood,
PathSelection? selection,
) {
// Priority 1: Check user's path override
if (contact.pathOverride != null) {
return contact.pathOverride;
}
// Priority 2: Check forceFlood or device flood mode
if (forceFlood || contact.pathLength < 0 || selection?.useFlood == true) {
return -1;
}
// Priority 3: Check PathSelection (auto-rotation)
if (selection != null && selection.pathBytes.isNotEmpty) {
return selection.hopCount;
}
// Priority 4: Use device's discovered path
return contact.pathLength;
}
String? getContactKeyForAckHash(Uint8List ackHash) {
for (var entry in _pendingMessages.entries) {
final message = entry.value;
if (message.expectedAckHash != null &&
listEquals(message.expectedAckHash, ackHash)) {
final contact = _pendingContacts[entry.key];
return contact?.publicKeyHex;
}
}
return null;
}
int calculateDefaultTimeout(Contact contact) {
if (contact.pathLength < 0) {
return 15000;
} else {
return 3000 + (3000 * contact.pathLength);
}
}
void _recordPathResultFromMessage(
String contactKey,
Message message,
PathSelection? selection,
bool success,
int? tripTimeMs,
) {
if (_recordPathResultCallback == null) return;
final recordSelection = selection ?? _selectionFromMessage(message);
if (recordSelection == null) return;
_recordPathResultCallback!(
contactKey,
recordSelection,
success,
tripTimeMs,
);
}
PathSelection? _selectionFromMessage(Message message) {
if (message.pathLength != null && message.pathLength! < 0) {
return const PathSelection(pathBytes: [], hopCount: -1, useFlood: true);
}
if (message.pathBytes.isEmpty && message.pathLength == null) {
return null;
}
return PathSelection(
pathBytes: message.pathBytes,
hopCount: message.pathLength ?? message.pathBytes.length,
useFlood: false,
);
}
@override
void dispose() {
for (var timer in _timeoutTimers.values) {
timer.cancel();
}
_timeoutTimers.clear();
_pendingMessages.clear();
_pendingContacts.clear();
_pendingPathSelections.clear();
_expectedAckHashes.clear();
_ackHistory.clear();
_ackHashToMessageId.clear();
_pendingMessageQueuePerContact.clear();
_sendQueue.clear();
_activeMessages.clear();
_resolvedMessages.clear();
super.dispose();
}
}