diff --git a/lib/connector/meshcore_connector.dart b/lib/connector/meshcore_connector.dart index 10ee15b..2fbddc7 100644 --- a/lib/connector/meshcore_connector.dart +++ b/lib/connector/meshcore_connector.dart @@ -114,6 +114,10 @@ class MeshCoreConnector extends ChangeNotifier { final List _channels = []; final Map> _conversations = {}; final Map> _channelMessages = {}; + final List _pendingChannelSentQueue = []; + final List<_PendingCommandAck> _pendingGenericAckQueue = []; + static const String _reactionSendQueuePrefix = '__reaction_send__'; + int _reactionSendQueueSequence = 0; final Set _loadedConversationKeys = {}; final Map> _processedChannelReactions = {}; // channelIndex -> Set of "targetHash_emoji" @@ -988,6 +992,9 @@ class MeshCoreConnector extends ChangeNotifier { _isSyncingChannels = false; _channelSyncInFlight = false; _hasLoadedChannels = false; + _pendingChannelSentQueue.clear(); + _pendingGenericAckQueue.clear(); + _reactionSendQueueSequence = 0; _setState(MeshCoreConnectionState.disconnected); if (!manual) { @@ -995,7 +1002,11 @@ class MeshCoreConnector extends ChangeNotifier { } } - Future sendFrame(Uint8List data) async { + Future sendFrame( + Uint8List data, { + String? channelSendQueueId, + bool expectsGenericAck = false, + }) async { if (!isConnected || _rxCharacteristic == null) { throw Exception("Not connected to a MeshCore device"); } @@ -1014,6 +1025,11 @@ class MeshCoreConnector extends ChangeNotifier { data.toList(), withoutResponse: canWriteWithoutResponse, ); + _trackPendingGenericAck( + data, + channelSendQueueId: channelSendQueueId, + expectsGenericAck: expectsGenericAck, + ); } Future requestBatteryStatus({bool force = false}) async { @@ -1369,7 +1385,13 @@ class MeshCoreConnector extends ChangeNotifier { notifyListeners(); // Send the reaction to the device (don't add as a visible message) - await sendFrame(buildSendChannelTextMsgFrame(channel.index, text)); + final reactionQueueId = _nextReactionSendQueueId(); + _pendingChannelSentQueue.add(reactionQueueId); + await sendFrame( + buildSendChannelTextMsgFrame(channel.index, text), + channelSendQueueId: reactionQueueId, + expectsGenericAck: true, + ); return; } @@ -1379,6 +1401,7 @@ class MeshCoreConnector extends ChangeNotifier { channel.index, ); _addChannelMessage(channel.index, message); + _pendingChannelSentQueue.add(message.messageId); notifyListeners(); final trimmed = text.trim(); @@ -1388,7 +1411,11 @@ class MeshCoreConnector extends ChangeNotifier { (isChannelSmazEnabled(channel.index) && !isStructuredPayload) ? Smaz.encodeIfSmaller(text) : text; - await sendFrame(buildSendChannelTextMsgFrame(channel.index, outboundText)); + await sendFrame( + buildSendChannelTextMsgFrame(channel.index, outboundText), + channelSendQueueId: message.messageId, + expectsGenericAck: true, + ); } Future removeContact(Contact contact) async { @@ -1735,6 +1762,9 @@ class MeshCoreConnector extends ChangeNotifier { debugPrint('RX frame: code=$code len=${frame.length}'); switch (code) { + case respCodeOk: + _handleOk(); + break; case respCodeDeviceInfo: _handleDeviceInfo(frame); break; @@ -1829,6 +1859,17 @@ class MeshCoreConnector extends ChangeNotifier { 'Firmware responded with error code: $errCode', tag: 'Protocol', ); + + if (_pendingGenericAckQueue.isEmpty) { + return; + } + + final failedAck = _pendingGenericAckQueue.removeAt(0); + if (failedAck.commandCode != cmdSendChannelTxtMsg || + failedAck.channelSendQueueId == null) { + return; + } + _pendingChannelSentQueue.remove(failedAck.channelSendQueueId); } void _handlePathUpdated(Uint8List frame) { @@ -2611,8 +2652,22 @@ class MeshCoreConnector extends ChangeNotifier { return; } - if (_retryService != null) { - _retryService!.updateMessageFromSent(ackHash, timeoutMs); + final retryService = _retryService; + if (retryService != null && + retryService.updateMessageFromSent( + ackHash, + timeoutMs, + allowQueueFallback: false, + )) { + return; + } + + if (_markNextPendingChannelMessageSent()) { + return; + } + + if (retryService != null) { + retryService.updateMessageFromSent(ackHash, timeoutMs); } } else { // Fallback to old behavior @@ -2629,6 +2684,64 @@ class MeshCoreConnector extends ChangeNotifier { } } + bool _markNextPendingChannelMessageSent() { + while (_pendingChannelSentQueue.isNotEmpty) { + final queuedMessageId = _pendingChannelSentQueue.removeAt(0); + if (_isReactionSendQueueId(queuedMessageId)) { + return true; + } + if (_markPendingChannelMessageSentById(queuedMessageId)) { + return true; + } + } + return false; + } + + bool _markPendingChannelMessageSentById(String messageId) { + for (final entry in _channelMessages.entries) { + final channelMessages = entry.value; + for (int i = channelMessages.length - 1; i >= 0; i--) { + final message = channelMessages[i]; + if (message.messageId != messageId) { + continue; + } + if (!message.isOutgoing || + message.status != ChannelMessageStatus.pending) { + return false; + } + channelMessages[i] = message.copyWith( + status: ChannelMessageStatus.sent, + ); + _pendingChannelSentQueue.remove(messageId); + unawaited( + _channelMessageStore.saveChannelMessages(entry.key, channelMessages), + ); + notifyListeners(); + return true; + } + } + return false; + } + + void _handleOk() { + if (_pendingGenericAckQueue.isEmpty) { + return; + } + + final pendingAck = _pendingGenericAckQueue.removeAt(0); + if (pendingAck.commandCode != cmdSendChannelTxtMsg || + pendingAck.channelSendQueueId == null) { + return; + } + + final queueId = pendingAck.channelSendQueueId!; + _pendingChannelSentQueue.remove(queueId); + if (_isReactionSendQueueId(queueId)) { + return; + } + _markPendingChannelMessageSentById(queueId); + } + void _handleSendConfirmed(Uint8List frame) { // Frame format from C++: // [0] = PUSH_CODE_SEND_CONFIRMED @@ -3207,18 +3320,22 @@ class MeshCoreConnector extends ChangeNotifier { mergedPathBytes.length, ); final newRepeatCount = existing.repeatCount + 1; + final promotedFromPending = + newRepeatCount == 1 && + existing.status == ChannelMessageStatus.pending; messages[existingIndex] = existing.copyWith( repeatCount: newRepeatCount, pathLength: mergedPathLength, pathBytes: mergedPathBytes, pathVariants: mergedPathVariants, // Mark as sent when first repeat is heard - status: - newRepeatCount == 1 && - existing.status == ChannelMessageStatus.pending + status: promotedFromPending ? ChannelMessageStatus.sent : existing.status, ); + if (promotedFromPending) { + _pendingChannelSentQueue.remove(existing.messageId); + } } else { messages.add(processedMessage); } @@ -3391,11 +3508,37 @@ class MeshCoreConnector extends ChangeNotifier { _queuedMessageSyncInFlight = false; _isSyncingChannels = false; _channelSyncInFlight = false; + _pendingChannelSentQueue.clear(); + _pendingGenericAckQueue.clear(); + _reactionSendQueueSequence = 0; _setState(MeshCoreConnectionState.disconnected); _scheduleReconnect(); } + void _trackPendingGenericAck( + Uint8List data, { + String? channelSendQueueId, + required bool expectsGenericAck, + }) { + if (!expectsGenericAck || data.isEmpty) return; + _pendingGenericAckQueue.add( + _PendingCommandAck( + commandCode: data[0], + channelSendQueueId: channelSendQueueId, + ), + ); + } + + String _nextReactionSendQueueId() { + _reactionSendQueueSequence++; + return '$_reactionSendQueuePrefix$_reactionSendQueueSequence'; + } + + bool _isReactionSendQueueId(String queueId) { + return queueId.startsWith(_reactionSendQueuePrefix); + } + Map _parseKeyValueString(String input) { final result = {}; @@ -3691,3 +3834,10 @@ class _RepeaterAckContext { required this.messageBytes, }); } + +class _PendingCommandAck { + final int commandCode; + final String? channelSendQueueId; + + _PendingCommandAck({required this.commandCode, this.channelSendQueueId}); +} diff --git a/lib/screens/contacts_screen.dart b/lib/screens/contacts_screen.dart index a6828dd..c3f783c 100644 --- a/lib/screens/contacts_screen.dart +++ b/lib/screens/contacts_screen.dart @@ -183,14 +183,17 @@ class _ContactsScreenState extends State final connector = Provider.of(context, listen: false); final exportContactFrame = buildExportContactFrame(pubKey); _pendingOperations.add(ContactOperationType.export); - await connector.sendFrame(exportContactFrame); + await connector.sendFrame(exportContactFrame, expectsGenericAck: true); } Future _contactZeroHop(Uint8List pubKey) async { final connector = Provider.of(context, listen: false); final exportContactZeroHopFrame = buildZeroHopContact(pubKey); _pendingOperations.add(ContactOperationType.zeroHopShare); - await connector.sendFrame(exportContactZeroHopFrame); + await connector.sendFrame( + exportContactZeroHopFrame, + expectsGenericAck: true, + ); } Future _contactImport() async { @@ -217,7 +220,7 @@ class _ContactsScreenState extends State try { final importContactFrame = buildImportContactFrame(hexString); _pendingOperations.add(ContactOperationType.import); - await connector.sendFrame(importContactFrame); + await connector.sendFrame(importContactFrame, expectsGenericAck: true); } catch (e) { if (mounted) { ScaffoldMessenger.of(context).showSnackBar( diff --git a/lib/services/message_retry_service.dart b/lib/services/message_retry_service.dart index 9cbd68f..694a616 100644 --- a/lib/services/message_retry_service.dart +++ b/lib/services/message_retry_service.dart @@ -234,7 +234,11 @@ class MessageRetryService extends ChangeNotifier { } } - void updateMessageFromSent(Uint8List ackHash, int timeoutMs) { + bool updateMessageFromSent( + Uint8List ackHash, + int timeoutMs, { + bool allowQueueFallback = true, + }) { final ackHashHex = ackHash .map((b) => b.toRadixString(16).padLeft(2, '0')) .join(); @@ -277,7 +281,7 @@ class MessageRetryService extends ChangeNotifier { } // FALLBACK: Old queue-based matching (for messages sent before hash computation was added) - if (messageId == null) { + if (messageId == null && allowQueueFallback) { _debugLogService?.warn( 'RESP_CODE_SENT: ACK hash $ackHashHex not found in hash table, falling back to queue', tag: 'AckHash', @@ -320,7 +324,7 @@ class MessageRetryService extends ChangeNotifier { if (messageId == null || contact == null) { debugPrint('No pending message found for ACK hash: $ackHashHex'); - return; + return false; } // Store the mapping for future lookups (e.g., when ACK arrives) @@ -339,7 +343,7 @@ class MessageRetryService extends ChangeNotifier { 'Message $messageId no longer pending for ACK hash: $ackHashHex', ); _ackHashToMessageId.remove(ackHashHex); - return; + return false; } // Add this ACK hash to the list of expected ACKs for this message (for history) @@ -389,8 +393,11 @@ class MessageRetryService extends ChangeNotifier { _startTimeoutTimer(messageId, actualTimeout); debugPrint('Updated message $messageId with ACK hash: $ackHashHex'); + return true; } + bool get hasPendingMessages => _pendingMessages.isNotEmpty; + void _startTimeoutTimer(String messageId, int timeoutMs) { _timeoutTimers[messageId]?.cancel(); _timeoutTimers[messageId] = Timer(Duration(milliseconds: timeoutMs), () {