meshcore-open/lib/services/message_retry_service.dart

884 lines
30 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:uuid/uuid.dart';
import 'package:crypto/crypto.dart';
import '../models/contact.dart';
import '../models/message.dart';
import '../models/path_selection.dart';
import 'app_settings_service.dart';
import 'app_debug_log_service.dart';
class _AckHistoryEntry {
final String messageId;
final List<Uint8List> ackHashes;
final DateTime timestamp;
_AckHistoryEntry({
required this.messageId,
required this.ackHashes,
required this.timestamp,
});
}
class _AckHashMapping {
final String messageId;
final DateTime timestamp;
_AckHashMapping({required this.messageId, required this.timestamp});
}
class MessageRetryService extends ChangeNotifier {
static const int maxRetries = 5;
static const int maxAckHistorySize = 100;
final Map<String, Timer> _timeoutTimers = {};
final Map<String, Message> _pendingMessages = {};
final Map<String, Contact> _pendingContacts = {};
final Map<String, PathSelection> _pendingPathSelections = {};
final Map<String, _AckHashMapping> _ackHashToMessageId =
{}; // ackHashHex → messageId + timestamp for O(1) lookup
final Map<String, List<Uint8List>> _expectedAckHashes =
{}; // Track all expected ACKs for retries (for history)
final List<_AckHistoryEntry> _ackHistory =
[]; // Rolling buffer of recent ACK hashes
final Map<String, List<String>> _pendingMessageQueuePerContact =
{}; // contactPubKeyHex → FIFO queue of messageIds (DEPRECATED - will be removed)
final Map<String, List<String>> _sendQueue =
{}; // contactPubKeyHex → ordered list of messageIds awaiting send
final Set<String> _activeMessages =
{}; // messageIds currently in-flight (sent/retrying)
final Set<String> _resolvedMessages =
{}; // messageIds already resolved (prevents double _onMessageResolved)
final Map<String, String> _expectedHashToMessageId =
{}; // expectedAckHashHex → messageId (for matching RESP_CODE_SENT by hash)
Function(Contact, String, int, int)? _sendMessageCallback;
Function(String, Message)? _addMessageCallback;
Function(Message)? _updateMessageCallback;
Function(Contact)? _clearContactPathCallback;
Function(Contact, Uint8List, int)? _setContactPathCallback;
Function(int, int)? _calculateTimeoutCallback;
Uint8List? Function()? _getSelfPublicKeyCallback;
String Function(Contact, String)? _prepareContactOutboundTextCallback;
AppSettingsService? _appSettingsService;
AppDebugLogService? _debugLogService;
Function(String, PathSelection, bool, int?)? _recordPathResultCallback;
MessageRetryService();
void initialize({
required Function(Contact, String, int, int) sendMessageCallback,
required Function(String, Message) addMessageCallback,
required Function(Message) updateMessageCallback,
Function(Contact)? clearContactPathCallback,
Function(Contact, Uint8List, int)? setContactPathCallback,
Function(int pathLength, int messageBytes)? calculateTimeoutCallback,
Uint8List? Function()? getSelfPublicKeyCallback,
String Function(Contact, String)? prepareContactOutboundTextCallback,
AppSettingsService? appSettingsService,
AppDebugLogService? debugLogService,
Function(String, PathSelection, bool, int?)? recordPathResultCallback,
}) {
_sendMessageCallback = sendMessageCallback;
_addMessageCallback = addMessageCallback;
_updateMessageCallback = updateMessageCallback;
_clearContactPathCallback = clearContactPathCallback;
_setContactPathCallback = setContactPathCallback;
_calculateTimeoutCallback = calculateTimeoutCallback;
_getSelfPublicKeyCallback = getSelfPublicKeyCallback;
_prepareContactOutboundTextCallback = prepareContactOutboundTextCallback;
_appSettingsService = appSettingsService;
_debugLogService = debugLogService;
_recordPathResultCallback = recordPathResultCallback;
}
/// Compute expected ACK hash using same algorithm as firmware:
/// SHA256([timestamp(4)][attempt(1)][text][sender_pubkey(32)]) -> first 4 bytes
static Uint8List computeExpectedAckHash(
int timestampSeconds,
int attempt,
String text,
Uint8List senderPubKey,
) {
final textBytes = utf8.encode(text);
final buffer = Uint8List(4 + 1 + textBytes.length + senderPubKey.length);
int offset = 0;
// timestamp (4 bytes, little-endian)
buffer[offset++] = timestampSeconds & 0xFF;
buffer[offset++] = (timestampSeconds >> 8) & 0xFF;
buffer[offset++] = (timestampSeconds >> 16) & 0xFF;
buffer[offset++] = (timestampSeconds >> 24) & 0xFF;
// attempt (1 byte)
buffer[offset++] = attempt & 0x03;
// text
buffer.setRange(offset, offset + textBytes.length, textBytes);
offset += textBytes.length;
// sender public key (32 bytes)
buffer.setRange(offset, offset + senderPubKey.length, senderPubKey);
// Compute SHA256 and return first 4 bytes
final hash = sha256.convert(buffer);
return Uint8List.fromList(hash.bytes.sublist(0, 4));
}
Future<void> sendMessageWithRetry({
required Contact contact,
required String text,
PathSelection? pathSelection,
Uint8List? pathBytes,
int? pathLength,
}) async {
final messageId = const Uuid().v4();
final useFlood = pathSelection?.useFlood ?? false;
final messagePathBytes =
pathBytes ?? _resolveMessagePathBytes(contact, useFlood, pathSelection);
final messagePathLength =
pathLength ??
_resolveMessagePathLength(contact, useFlood, pathSelection);
final message = Message(
senderKey: contact.publicKey,
text: text,
timestamp: DateTime.now(),
isOutgoing: true,
status: MessageStatus.pending,
messageId: messageId,
retryCount: 0,
pathLength: messagePathLength,
pathBytes: messagePathBytes,
);
_pendingMessages[messageId] = message;
_pendingContacts[messageId] = contact;
if (pathSelection != null) {
_pendingPathSelections[messageId] = pathSelection;
}
if (_addMessageCallback != null) {
_addMessageCallback!(contact.publicKeyHex, message);
}
// Queue per contact — only one message in-flight at a time to avoid
// overflowing the firmware's 8-entry expected_ack_table.
final contactKey = contact.publicKeyHex;
_sendQueue[contactKey] ??= [];
_sendQueue[contactKey]!.add(messageId);
if (!_activeMessages.any(
(id) => _pendingContacts[id]?.publicKeyHex == contactKey,
)) {
_sendNextForContact(contactKey);
}
}
void _sendNextForContact(String contactKey) {
final queue = _sendQueue[contactKey];
if (queue == null) return;
// Drain stale entries iteratively instead of recursing.
while (queue.isNotEmpty) {
final messageId = queue.removeAt(0);
if (_pendingMessages.containsKey(messageId)) {
_activeMessages.add(messageId);
_attemptSend(messageId).catchError((e) {
debugPrint('_attemptSend threw for $messageId: $e');
final msg = _pendingMessages[messageId];
if (msg != null) {
final failed = msg.copyWith(status: MessageStatus.failed);
_pendingMessages[messageId] = failed;
_updateMessageCallback?.call(failed);
}
_onMessageResolved(messageId, contactKey);
});
return;
}
// Message was cancelled/cleaned up while queued — try next
}
}
void _onMessageResolved(String messageId, String contactKey) {
if (_resolvedMessages.contains(messageId)) return;
_resolvedMessages.add(messageId);
_activeMessages.remove(messageId);
_sendNextForContact(contactKey);
}
Future<void> _attemptSend(String messageId) async {
final message = _pendingMessages[messageId];
final contact = _pendingContacts[messageId];
if (message == null || contact == null) return;
// Sync path settings with device before sending
// Use the path that was captured when the message was first sent
if (_setContactPathCallback != null && _clearContactPathCallback != null) {
if (message.pathLength != null && message.pathLength! < 0) {
debugPrint(
'Setting flood mode for retry attempt ${message.retryCount}',
);
await _clearContactPathCallback!(contact);
} else if (message.pathLength != null && message.pathLength! >= 0) {
final pathStr = message.pathBytes.isEmpty
? 'direct'
: message.pathBytes
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join(',');
debugPrint(
'Setting path [$pathStr] (${message.pathLength} hops) for retry attempt ${message.retryCount}',
);
await _setContactPathCallback!(
contact,
message.pathBytes,
message.pathLength!,
);
}
}
// Re-validate after async gap — a timer or ACK could have resolved/retried
// this message while we were awaiting the path callback.
final currentMessage = _pendingMessages[messageId];
if (currentMessage == null || _resolvedMessages.contains(messageId)) {
debugPrint(
'_attemptSend: message $messageId resolved during path sync, aborting',
);
return;
}
// If the message was retried by a timer during our await, the retryCount
// will have advanced. Only proceed if it still matches the attempt we started.
if (currentMessage.retryCount != message.retryCount) {
debugPrint(
'_attemptSend: message $messageId retryCount changed during path sync, aborting',
);
return;
}
final attempt = message.retryCount.clamp(0, 3);
final timestampSeconds = message.timestamp.millisecondsSinceEpoch ~/ 1000;
// Compute expected ACK hash that device will return in RESP_CODE_SENT
// IMPORTANT: Use the transformed text (with SMAZ encoding if enabled) to match device's hash
final selfPubKey = _getSelfPublicKeyCallback?.call();
if (selfPubKey != null) {
final outboundText =
_prepareContactOutboundTextCallback?.call(contact, message.text) ??
message.text;
final expectedHash = MessageRetryService.computeExpectedAckHash(
timestampSeconds,
attempt,
outboundText,
selfPubKey,
);
final expectedHashHex = expectedHash
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join();
_expectedHashToMessageId[expectedHashHex] = messageId;
final shortText = message.text.length > 20
? '${message.text.substring(0, 20)}...'
: message.text;
_debugLogService?.info(
'Sent "$shortText" to ${contact.name} → expect ACK hash $expectedHashHex (attempt $attempt)',
tag: 'AckHash',
);
debugPrint(
'Computed expected ACK hash $expectedHashHex for message $messageId',
);
}
// DEPRECATED: Old queue-based matching (kept for fallback)
_pendingMessageQueuePerContact[contact.publicKeyHex] ??= [];
_pendingMessageQueuePerContact[contact.publicKeyHex]!.add(messageId);
if (_sendMessageCallback != null) {
_sendMessageCallback!(contact, message.text, attempt, timestampSeconds);
} else {
// No send callback — message would be stuck forever. Fail it immediately.
debugPrint(
'_attemptSend: no sendMessageCallback, failing message $messageId',
);
final failedMessage = message.copyWith(status: MessageStatus.failed);
_pendingMessages[messageId] = failedMessage;
_updateMessageCallback?.call(failedMessage);
_onMessageResolved(messageId, contact.publicKeyHex);
}
}
bool updateMessageFromSent(
Uint8List ackHash,
int timeoutMs, {
bool allowQueueFallback = true,
}) {
final ackHashHex = ackHash
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join();
// NEW: Try hash-based matching first (fixes LoRa message drops causing mismatches)
String? messageId = _expectedHashToMessageId.remove(ackHashHex);
Contact? contact;
if (messageId != null) {
contact = _pendingContacts[messageId];
final message = _pendingMessages[messageId];
if (contact != null && message != null) {
final shortText = message.text.length > 20
? '${message.text.substring(0, 20)}...'
: message.text;
_debugLogService?.info(
'RESP_CODE_SENT received: ACK hash $ackHashHex ✓ matched "$shortText" to ${contact.name}',
tag: 'AckHash',
);
debugPrint(
'Hash-based match: ACK hash $ackHashHex → message $messageId',
);
// Remove from old queue since we matched
_pendingMessageQueuePerContact[contact.publicKeyHex]?.remove(messageId);
if (_pendingMessageQueuePerContact[contact.publicKeyHex]?.isEmpty ??
false) {
_pendingMessageQueuePerContact.remove(contact.publicKeyHex);
}
} else {
_debugLogService?.warn(
'RESP_CODE_SENT: ACK hash $ackHashHex matched but message no longer pending',
tag: 'AckHash',
);
debugPrint('Hash matched $messageId but message no longer pending');
messageId = null;
contact = null;
}
}
// FALLBACK: Old queue-based matching (for messages sent before hash computation was added)
// Only match within a single contact's queue to avoid cross-contact mismatches.
if (messageId == null && allowQueueFallback) {
_debugLogService?.warn(
'RESP_CODE_SENT: ACK hash $ackHashHex not found in hash table, falling back to queue',
tag: 'AckHash',
);
debugPrint(
'Hash-based match failed for $ackHashHex, falling back to queue-based matching',
);
// Search all contact queues so concurrent chats don't miss matches.
final queuesToSearch = _pendingMessageQueuePerContact;
for (var entry in queuesToSearch.entries) {
final contactKey = entry.key;
final queue = entry.value;
// Drain stale entries until we find a valid one or exhaust the queue.
while (queue.isNotEmpty) {
final candidateMessageId = queue.removeAt(0);
if (_pendingMessages.containsKey(candidateMessageId)) {
messageId = candidateMessageId;
contact = _pendingContacts[candidateMessageId];
debugPrint(
'Queue-based match (fallback): $ackHashHex → message $messageId for $contactKey',
);
break;
}
debugPrint('Dequeued stale message $candidateMessageId - skipping');
}
if (messageId != null) break;
}
}
if (messageId == null || contact == null) {
debugPrint('No pending message found for ACK hash: $ackHashHex');
return false;
}
// Store the mapping for future lookups (e.g., when ACK arrives)
// Keep timestamp so we can clean up old mappings later
_ackHashToMessageId[ackHashHex] = _AckHashMapping(
messageId: messageId,
timestamp: DateTime.now(),
);
debugPrint('Mapped ACK hash $ackHashHex to message $messageId');
final message = _pendingMessages[messageId];
final selection = _pendingPathSelections[messageId];
if (message == null) {
debugPrint(
'Message $messageId no longer pending for ACK hash: $ackHashHex',
);
_ackHashToMessageId.remove(ackHashHex);
return false;
}
// Add this ACK hash to the list of expected ACKs for this message (for history)
_expectedAckHashes[messageId] ??= [];
if (!_expectedAckHashes[messageId]!.any(
(hash) => listEquals(hash, ackHash),
)) {
_expectedAckHashes[messageId]!.add(Uint8List.fromList(ackHash));
debugPrint(
'Added ACK hash $ackHashHex to message $messageId (total: ${_expectedAckHashes[messageId]!.length})',
);
}
// Use device-provided timeout, or calculate from radio settings if timeout is 0 or invalid
int actualTimeout = timeoutMs;
if (timeoutMs <= 0 && _calculateTimeoutCallback != null) {
int pathLengthValue;
if (selection != null) {
pathLengthValue = selection.useFlood ? -1 : selection.hopCount;
if (pathLengthValue < 0) pathLengthValue = contact.pathLength;
} else if (message.pathLength != null) {
pathLengthValue = message.pathLength!;
} else {
pathLengthValue = contact.pathLength;
}
actualTimeout = _calculateTimeoutCallback!(
pathLengthValue,
message.text.length,
);
debugPrint(
'Using calculated timeout: ${actualTimeout}ms for path length $pathLengthValue',
);
}
final updatedMessage = message.copyWith(
status: MessageStatus.sent,
expectedAckHash: ackHash,
estimatedTimeoutMs: actualTimeout,
sentAt: DateTime.now(),
);
_pendingMessages[messageId] = updatedMessage;
if (_updateMessageCallback != null) {
_updateMessageCallback!(updatedMessage);
}
_startTimeoutTimer(messageId, actualTimeout);
debugPrint('Updated message $messageId with ACK hash: $ackHashHex');
return true;
}
bool get hasPendingMessages => _pendingMessages.isNotEmpty;
void _startTimeoutTimer(String messageId, int timeoutMs) {
_timeoutTimers[messageId]?.cancel();
_timeoutTimers[messageId] = Timer(Duration(milliseconds: timeoutMs), () {
_handleTimeout(messageId);
});
}
void _handleTimeout(String messageId) {
final message = _pendingMessages[messageId];
final contact = _pendingContacts[messageId];
final selection = _pendingPathSelections[messageId];
if (message == null || contact == null) {
debugPrint(
'Timeout fired but message $messageId no longer pending (likely already delivered)',
);
return;
}
final shortText = message.text.length > 20
? '${message.text.substring(0, 20)}...'
: message.text;
_debugLogService?.warn(
'Timeout: No ACK received for "$shortText" to ${contact.name} (attempt ${message.retryCount}) → retrying',
tag: 'AckHash',
);
debugPrint(
'Timeout for message $messageId (retry ${message.retryCount}/${maxRetries - 1})',
);
if (message.retryCount < maxRetries - 1) {
final backoffMs = 1000 * (1 << message.retryCount);
final updatedMessage = message.copyWith(
retryCount: message.retryCount + 1,
status: MessageStatus.pending,
// Keep expectedAckHash - it will be updated when the new attempt is sent
);
_pendingMessages[messageId] = updatedMessage;
if (_updateMessageCallback != null) {
_updateMessageCallback!(updatedMessage);
}
_debugLogService?.info(
'Scheduling retry for "$shortText" to ${contact.name} after ${backoffMs}ms backoff',
tag: 'AckHash',
);
debugPrint('Scheduling retry after ${backoffMs}ms');
// Store the backoff timer so it can be canceled if new RESP_CODE_SENT arrives
_timeoutTimers[messageId] = Timer(Duration(milliseconds: backoffMs), () {
// Double-check message is still pending before retry
if (_pendingMessages.containsKey(messageId)) {
_attemptSend(messageId);
} else {
debugPrint(
'Retry cancelled: message $messageId was delivered while waiting',
);
}
});
} else {
// Max retries reached - mark as failed
final failedMessage = message.copyWith(status: MessageStatus.failed);
_pendingMessages[messageId] = failedMessage;
// Check if we should clear the path on max retry
if (_appSettingsService?.settings.clearPathOnMaxRetry == true &&
_clearContactPathCallback != null) {
_clearContactPathCallback!(contact);
}
_recordPathResultFromMessage(
contact.publicKeyHex,
message,
selection,
false,
null,
);
if (_updateMessageCallback != null) {
_updateMessageCallback!(failedMessage);
}
notifyListeners();
// Message is done retrying — send next queued message for this contact
_onMessageResolved(messageId, contact.publicKeyHex);
// Keep message in pending maps for 30s grace period so late ACKs
// can still match and update the message to delivered.
_timeoutTimers[messageId] = Timer(const Duration(seconds: 30), () {
_moveAckHashesToHistory(messageId);
// Clean up ALL hash mappings for this message
_ackHashToMessageId.removeWhere(
(_, mapping) => mapping.messageId == messageId,
);
_expectedHashToMessageId.removeWhere((_, msgId) => msgId == messageId);
_pendingMessages.remove(messageId);
_pendingContacts.remove(messageId);
_pendingPathSelections.remove(messageId);
_timeoutTimers.remove(messageId);
_resolvedMessages.remove(messageId);
final contactKey = contact.publicKeyHex;
_pendingMessageQueuePerContact[contactKey]?.remove(messageId);
if (_pendingMessageQueuePerContact[contactKey]?.isEmpty ?? false) {
_pendingMessageQueuePerContact.remove(contactKey);
}
});
}
}
void _moveAckHashesToHistory(String messageId) {
final ackHashes = _expectedAckHashes.remove(messageId);
if (ackHashes != null && ackHashes.isNotEmpty) {
_ackHistory.add(
_AckHistoryEntry(
messageId: messageId,
ackHashes: ackHashes,
timestamp: DateTime.now(),
),
);
// Trim history to max size (rolling buffer)
while (_ackHistory.length > maxAckHistorySize) {
_ackHistory.removeAt(0);
}
debugPrint(
'Moved ${ackHashes.length} ACK hashes to history for message $messageId (history size: ${_ackHistory.length})',
);
}
}
bool _checkAckHistory(Uint8List ackHash) {
for (final entry in _ackHistory) {
for (final expectedHash in entry.ackHashes) {
if (listEquals(expectedHash, ackHash)) {
debugPrint(
'Found ACK match in history: messageId=${entry.messageId}, age=${DateTime.now().difference(entry.timestamp).inSeconds}s',
);
return true;
}
}
}
return false;
}
void handleAckReceived(Uint8List ackHash, int tripTimeMs) {
String? matchedMessageId;
final ackHashHex = ackHash
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join();
debugPrint('ACK received: $ackHashHex, trip time: ${tripTimeMs}ms');
// First, clean up old ACK hash mappings (older than 15 minutes)
final cutoffTime = DateTime.now().subtract(const Duration(minutes: 15));
final hashesToRemove = <String>[];
for (var entry in _ackHashToMessageId.entries) {
if (entry.value.timestamp.isBefore(cutoffTime)) {
hashesToRemove.add(entry.key);
}
}
for (var hash in hashesToRemove) {
_ackHashToMessageId.remove(hash);
}
if (hashesToRemove.isNotEmpty) {
debugPrint('Cleaned up ${hashesToRemove.length} old ACK hash mappings');
}
// Use direct O(1) lookup via ACK hash mapping
final mapping = _ackHashToMessageId[ackHashHex];
if (mapping != null) {
matchedMessageId = mapping.messageId;
debugPrint('Matched ACK to message via direct lookup: $matchedMessageId');
} else {
_debugLogService?.warn(
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex not found in direct mapping, trying fallback',
tag: 'AckHash',
);
// Fallback: Check against ALL expected ACK hashes (from all retry attempts)
debugPrint(
'ACK not in mapping, checking _expectedAckHashes (${_expectedAckHashes.length} messages)',
);
for (var entry in _expectedAckHashes.entries) {
final messageId = entry.key;
final expectedHashes = entry.value;
for (final expectedHash in expectedHashes) {
if (listEquals(expectedHash, ackHash)) {
matchedMessageId = messageId;
debugPrint(
'Matched ACK to message via fallback: $matchedMessageId (attempt ${expectedHashes.indexOf(expectedHash)})',
);
break;
}
}
if (matchedMessageId != null) break;
}
}
if (matchedMessageId != null) {
final message = _pendingMessages[matchedMessageId];
if (message == null) {
// Message was already cleaned up (e.g. grace period expired)
_ackHashToMessageId.remove(ackHashHex);
debugPrint(
'ACK matched $matchedMessageId but message already cleaned up',
);
return;
}
final contact = _pendingContacts[matchedMessageId];
final selection = _pendingPathSelections[matchedMessageId];
final shortText = message.text.length > 20
? '${message.text.substring(0, 20)}...'
: message.text;
_debugLogService?.info(
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex ✓ "$shortText" delivered to ${contact?.name ?? "unknown"} in ${tripTimeMs}ms',
tag: 'AckHash',
);
// Cancel any pending timeout or retry
_timeoutTimers[matchedMessageId]?.cancel();
_timeoutTimers.remove(matchedMessageId);
final deliveredMessage = message.copyWith(
status: MessageStatus.delivered,
deliveredAt: DateTime.now(),
tripTimeMs: tripTimeMs,
);
// Clean up ALL hash mappings for this message (from all retry attempts)
_ackHashToMessageId.removeWhere(
(_, mapping) => mapping.messageId == matchedMessageId,
);
_expectedHashToMessageId.removeWhere(
(_, msgId) => msgId == matchedMessageId,
);
// Move ACK hashes to history before removing
_moveAckHashesToHistory(matchedMessageId);
_pendingMessages.remove(matchedMessageId);
_pendingContacts.remove(matchedMessageId);
_pendingPathSelections.remove(matchedMessageId);
_resolvedMessages.remove(matchedMessageId);
// Clean up the queue entry for this contact (remove any remaining references to this message)
if (contact != null) {
_pendingMessageQueuePerContact[contact.publicKeyHex]?.remove(
matchedMessageId,
);
if (_pendingMessageQueuePerContact[contact.publicKeyHex]?.isEmpty ??
false) {
_pendingMessageQueuePerContact.remove(contact.publicKeyHex);
}
}
if (_updateMessageCallback != null) {
_updateMessageCallback!(deliveredMessage);
}
if (contact != null) {
_recordPathResultFromMessage(
contact.publicKeyHex,
message,
selection,
true,
tripTimeMs,
);
_onMessageResolved(matchedMessageId, contact.publicKeyHex);
}
notifyListeners();
} else {
// Check ACK history for recently completed messages
if (_checkAckHistory(ackHash)) {
_debugLogService?.info(
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex matched a recently completed message (duplicate ACK)',
tag: 'AckHash',
);
debugPrint('ACK matched a recently completed message from history');
} else {
_debugLogService?.error(
'PUSH_CODE_SEND_CONFIRMED: ACK hash $ackHashHex has no matching message!',
tag: 'AckHash',
);
debugPrint('No matching message found for ACK: $ackHashHex');
}
}
}
Uint8List _resolveMessagePathBytes(
Contact contact,
bool forceFlood,
PathSelection? selection,
) {
// Priority 1: Check user's path override
if (contact.pathOverride != null) {
if (contact.pathOverride! < 0) {
return Uint8List(0); // Force flood
}
return contact.pathOverrideBytes ?? Uint8List(0);
}
// Priority 2: Check forceFlood or device flood mode
if (forceFlood || contact.pathLength < 0 || selection?.useFlood == true) {
return Uint8List(0);
}
// Priority 3: Check PathSelection (auto-rotation)
if (selection != null && selection.pathBytes.isNotEmpty) {
return Uint8List.fromList(selection.pathBytes);
}
// Priority 4: Use device's discovered path
return contact.path;
}
int? _resolveMessagePathLength(
Contact contact,
bool forceFlood,
PathSelection? selection,
) {
// Priority 1: Check user's path override
if (contact.pathOverride != null) {
return contact.pathOverride;
}
// Priority 2: Check forceFlood or device flood mode
if (forceFlood || contact.pathLength < 0 || selection?.useFlood == true) {
return -1;
}
// Priority 3: Check PathSelection (auto-rotation)
if (selection != null && selection.pathBytes.isNotEmpty) {
return selection.hopCount;
}
// Priority 4: Use device's discovered path
return contact.pathLength;
}
String? getContactKeyForAckHash(Uint8List ackHash) {
for (var entry in _pendingMessages.entries) {
final message = entry.value;
if (message.expectedAckHash != null &&
listEquals(message.expectedAckHash, ackHash)) {
final contact = _pendingContacts[entry.key];
return contact?.publicKeyHex;
}
}
return null;
}
int calculateDefaultTimeout(Contact contact) {
if (contact.pathLength < 0) {
return 15000;
} else {
return 3000 + (3000 * contact.pathLength);
}
}
void _recordPathResultFromMessage(
String contactKey,
Message message,
PathSelection? selection,
bool success,
int? tripTimeMs,
) {
if (_recordPathResultCallback == null) return;
final recordSelection = selection ?? _selectionFromMessage(message);
if (recordSelection == null) return;
_recordPathResultCallback!(
contactKey,
recordSelection,
success,
tripTimeMs,
);
}
PathSelection? _selectionFromMessage(Message message) {
if (message.pathLength != null && message.pathLength! < 0) {
return const PathSelection(pathBytes: [], hopCount: -1, useFlood: true);
}
if (message.pathBytes.isEmpty && message.pathLength == null) {
return null;
}
return PathSelection(
pathBytes: message.pathBytes,
hopCount: message.pathLength ?? message.pathBytes.length,
useFlood: false,
);
}
@override
void dispose() {
for (var timer in _timeoutTimers.values) {
timer.cancel();
}
_timeoutTimers.clear();
_pendingMessages.clear();
_pendingContacts.clear();
_pendingPathSelections.clear();
_expectedAckHashes.clear();
_ackHistory.clear();
_ackHashToMessageId.clear();
_pendingMessageQueuePerContact.clear();
_sendQueue.clear();
_activeMessages.clear();
_resolvedMessages.clear();
super.dispose();
}
}