Mark pending channel messages sent on RESP_CODE_SENT (#186)

* Mark pending channel message sent on RESP_CODE_SENT

* Disambiguate RESP_CODE_SENT handling for direct vs channel

* Handle channel sent feedback when firmware returns RESP_CODE_OK

* Correlate channel OK ACKs and queue reaction channel sends
This commit is contained in:
Aaron Easterling 2026-02-21 18:31:51 -05:00 committed by GitHub
parent 51d70ce086
commit 2feff809ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 175 additions and 15 deletions

View file

@ -114,6 +114,10 @@ class MeshCoreConnector extends ChangeNotifier {
final List<Channel> _channels = [];
final Map<String, List<Message>> _conversations = {};
final Map<int, List<ChannelMessage>> _channelMessages = {};
final List<String> _pendingChannelSentQueue = [];
final List<_PendingCommandAck> _pendingGenericAckQueue = [];
static const String _reactionSendQueuePrefix = '__reaction_send__';
int _reactionSendQueueSequence = 0;
final Set<String> _loadedConversationKeys = {};
final Map<int, Set<String>> _processedChannelReactions =
{}; // channelIndex -> Set of "targetHash_emoji"
@ -988,6 +992,9 @@ class MeshCoreConnector extends ChangeNotifier {
_isSyncingChannels = false;
_channelSyncInFlight = false;
_hasLoadedChannels = false;
_pendingChannelSentQueue.clear();
_pendingGenericAckQueue.clear();
_reactionSendQueueSequence = 0;
_setState(MeshCoreConnectionState.disconnected);
if (!manual) {
@ -995,7 +1002,11 @@ class MeshCoreConnector extends ChangeNotifier {
}
}
Future<void> sendFrame(Uint8List data) async {
Future<void> sendFrame(
Uint8List data, {
String? channelSendQueueId,
bool expectsGenericAck = false,
}) async {
if (!isConnected || _rxCharacteristic == null) {
throw Exception("Not connected to a MeshCore device");
}
@ -1014,6 +1025,11 @@ class MeshCoreConnector extends ChangeNotifier {
data.toList(),
withoutResponse: canWriteWithoutResponse,
);
_trackPendingGenericAck(
data,
channelSendQueueId: channelSendQueueId,
expectsGenericAck: expectsGenericAck,
);
}
Future<void> requestBatteryStatus({bool force = false}) async {
@ -1369,7 +1385,13 @@ class MeshCoreConnector extends ChangeNotifier {
notifyListeners();
// Send the reaction to the device (don't add as a visible message)
await sendFrame(buildSendChannelTextMsgFrame(channel.index, text));
final reactionQueueId = _nextReactionSendQueueId();
_pendingChannelSentQueue.add(reactionQueueId);
await sendFrame(
buildSendChannelTextMsgFrame(channel.index, text),
channelSendQueueId: reactionQueueId,
expectsGenericAck: true,
);
return;
}
@ -1379,6 +1401,7 @@ class MeshCoreConnector extends ChangeNotifier {
channel.index,
);
_addChannelMessage(channel.index, message);
_pendingChannelSentQueue.add(message.messageId);
notifyListeners();
final trimmed = text.trim();
@ -1388,7 +1411,11 @@ class MeshCoreConnector extends ChangeNotifier {
(isChannelSmazEnabled(channel.index) && !isStructuredPayload)
? Smaz.encodeIfSmaller(text)
: text;
await sendFrame(buildSendChannelTextMsgFrame(channel.index, outboundText));
await sendFrame(
buildSendChannelTextMsgFrame(channel.index, outboundText),
channelSendQueueId: message.messageId,
expectsGenericAck: true,
);
}
Future<void> removeContact(Contact contact) async {
@ -1735,6 +1762,9 @@ class MeshCoreConnector extends ChangeNotifier {
debugPrint('RX frame: code=$code len=${frame.length}');
switch (code) {
case respCodeOk:
_handleOk();
break;
case respCodeDeviceInfo:
_handleDeviceInfo(frame);
break;
@ -1829,6 +1859,17 @@ class MeshCoreConnector extends ChangeNotifier {
'Firmware responded with error code: $errCode',
tag: 'Protocol',
);
if (_pendingGenericAckQueue.isEmpty) {
return;
}
final failedAck = _pendingGenericAckQueue.removeAt(0);
if (failedAck.commandCode != cmdSendChannelTxtMsg ||
failedAck.channelSendQueueId == null) {
return;
}
_pendingChannelSentQueue.remove(failedAck.channelSendQueueId);
}
void _handlePathUpdated(Uint8List frame) {
@ -2611,8 +2652,22 @@ class MeshCoreConnector extends ChangeNotifier {
return;
}
if (_retryService != null) {
_retryService!.updateMessageFromSent(ackHash, timeoutMs);
final retryService = _retryService;
if (retryService != null &&
retryService.updateMessageFromSent(
ackHash,
timeoutMs,
allowQueueFallback: false,
)) {
return;
}
if (_markNextPendingChannelMessageSent()) {
return;
}
if (retryService != null) {
retryService.updateMessageFromSent(ackHash, timeoutMs);
}
} else {
// Fallback to old behavior
@ -2629,6 +2684,64 @@ class MeshCoreConnector extends ChangeNotifier {
}
}
bool _markNextPendingChannelMessageSent() {
while (_pendingChannelSentQueue.isNotEmpty) {
final queuedMessageId = _pendingChannelSentQueue.removeAt(0);
if (_isReactionSendQueueId(queuedMessageId)) {
return true;
}
if (_markPendingChannelMessageSentById(queuedMessageId)) {
return true;
}
}
return false;
}
bool _markPendingChannelMessageSentById(String messageId) {
for (final entry in _channelMessages.entries) {
final channelMessages = entry.value;
for (int i = channelMessages.length - 1; i >= 0; i--) {
final message = channelMessages[i];
if (message.messageId != messageId) {
continue;
}
if (!message.isOutgoing ||
message.status != ChannelMessageStatus.pending) {
return false;
}
channelMessages[i] = message.copyWith(
status: ChannelMessageStatus.sent,
);
_pendingChannelSentQueue.remove(messageId);
unawaited(
_channelMessageStore.saveChannelMessages(entry.key, channelMessages),
);
notifyListeners();
return true;
}
}
return false;
}
void _handleOk() {
if (_pendingGenericAckQueue.isEmpty) {
return;
}
final pendingAck = _pendingGenericAckQueue.removeAt(0);
if (pendingAck.commandCode != cmdSendChannelTxtMsg ||
pendingAck.channelSendQueueId == null) {
return;
}
final queueId = pendingAck.channelSendQueueId!;
_pendingChannelSentQueue.remove(queueId);
if (_isReactionSendQueueId(queueId)) {
return;
}
_markPendingChannelMessageSentById(queueId);
}
void _handleSendConfirmed(Uint8List frame) {
// Frame format from C++:
// [0] = PUSH_CODE_SEND_CONFIRMED
@ -3207,18 +3320,22 @@ class MeshCoreConnector extends ChangeNotifier {
mergedPathBytes.length,
);
final newRepeatCount = existing.repeatCount + 1;
final promotedFromPending =
newRepeatCount == 1 &&
existing.status == ChannelMessageStatus.pending;
messages[existingIndex] = existing.copyWith(
repeatCount: newRepeatCount,
pathLength: mergedPathLength,
pathBytes: mergedPathBytes,
pathVariants: mergedPathVariants,
// Mark as sent when first repeat is heard
status:
newRepeatCount == 1 &&
existing.status == ChannelMessageStatus.pending
status: promotedFromPending
? ChannelMessageStatus.sent
: existing.status,
);
if (promotedFromPending) {
_pendingChannelSentQueue.remove(existing.messageId);
}
} else {
messages.add(processedMessage);
}
@ -3391,11 +3508,37 @@ class MeshCoreConnector extends ChangeNotifier {
_queuedMessageSyncInFlight = false;
_isSyncingChannels = false;
_channelSyncInFlight = false;
_pendingChannelSentQueue.clear();
_pendingGenericAckQueue.clear();
_reactionSendQueueSequence = 0;
_setState(MeshCoreConnectionState.disconnected);
_scheduleReconnect();
}
void _trackPendingGenericAck(
Uint8List data, {
String? channelSendQueueId,
required bool expectsGenericAck,
}) {
if (!expectsGenericAck || data.isEmpty) return;
_pendingGenericAckQueue.add(
_PendingCommandAck(
commandCode: data[0],
channelSendQueueId: channelSendQueueId,
),
);
}
String _nextReactionSendQueueId() {
_reactionSendQueueSequence++;
return '$_reactionSendQueuePrefix$_reactionSendQueueSequence';
}
bool _isReactionSendQueueId(String queueId) {
return queueId.startsWith(_reactionSendQueuePrefix);
}
Map<String, String> _parseKeyValueString(String input) {
final result = <String, String>{};
@ -3691,3 +3834,10 @@ class _RepeaterAckContext {
required this.messageBytes,
});
}
class _PendingCommandAck {
final int commandCode;
final String? channelSendQueueId;
_PendingCommandAck({required this.commandCode, this.channelSendQueueId});
}

View file

@ -183,14 +183,17 @@ class _ContactsScreenState extends State<ContactsScreen>
final connector = Provider.of<MeshCoreConnector>(context, listen: false);
final exportContactFrame = buildExportContactFrame(pubKey);
_pendingOperations.add(ContactOperationType.export);
await connector.sendFrame(exportContactFrame);
await connector.sendFrame(exportContactFrame, expectsGenericAck: true);
}
Future<void> _contactZeroHop(Uint8List pubKey) async {
final connector = Provider.of<MeshCoreConnector>(context, listen: false);
final exportContactZeroHopFrame = buildZeroHopContact(pubKey);
_pendingOperations.add(ContactOperationType.zeroHopShare);
await connector.sendFrame(exportContactZeroHopFrame);
await connector.sendFrame(
exportContactZeroHopFrame,
expectsGenericAck: true,
);
}
Future<void> _contactImport() async {
@ -217,7 +220,7 @@ class _ContactsScreenState extends State<ContactsScreen>
try {
final importContactFrame = buildImportContactFrame(hexString);
_pendingOperations.add(ContactOperationType.import);
await connector.sendFrame(importContactFrame);
await connector.sendFrame(importContactFrame, expectsGenericAck: true);
} catch (e) {
if (mounted) {
ScaffoldMessenger.of(context).showSnackBar(

View file

@ -234,7 +234,11 @@ class MessageRetryService extends ChangeNotifier {
}
}
void updateMessageFromSent(Uint8List ackHash, int timeoutMs) {
bool updateMessageFromSent(
Uint8List ackHash,
int timeoutMs, {
bool allowQueueFallback = true,
}) {
final ackHashHex = ackHash
.map((b) => b.toRadixString(16).padLeft(2, '0'))
.join();
@ -277,7 +281,7 @@ class MessageRetryService extends ChangeNotifier {
}
// FALLBACK: Old queue-based matching (for messages sent before hash computation was added)
if (messageId == null) {
if (messageId == null && allowQueueFallback) {
_debugLogService?.warn(
'RESP_CODE_SENT: ACK hash $ackHashHex not found in hash table, falling back to queue',
tag: 'AckHash',
@ -320,7 +324,7 @@ class MessageRetryService extends ChangeNotifier {
if (messageId == null || contact == null) {
debugPrint('No pending message found for ACK hash: $ackHashHex');
return;
return false;
}
// Store the mapping for future lookups (e.g., when ACK arrives)
@ -339,7 +343,7 @@ class MessageRetryService extends ChangeNotifier {
'Message $messageId no longer pending for ACK hash: $ackHashHex',
);
_ackHashToMessageId.remove(ackHashHex);
return;
return false;
}
// Add this ACK hash to the list of expected ACKs for this message (for history)
@ -389,8 +393,11 @@ class MessageRetryService extends ChangeNotifier {
_startTimeoutTimer(messageId, actualTimeout);
debugPrint('Updated message $messageId with ACK hash: $ackHashHex');
return true;
}
bool get hasPendingMessages => _pendingMessages.isNotEmpty;
void _startTimeoutTimer(String messageId, int timeoutMs) {
_timeoutTimers[messageId]?.cancel();
_timeoutTimers[messageId] = Timer(Duration(milliseconds: timeoutMs), () {