Refactor unread message tracking and implement channel caching (#126)

* Refactor unread message tracking and implement channel caching

* formatted files
This commit is contained in:
zjs81 2026-02-04 20:34:03 -07:00 committed by GitHub
parent b3645481c7
commit c320378be1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 243 additions and 179 deletions

View file

@ -24,6 +24,7 @@ import '../services/notification_service.dart';
import '../storage/channel_message_store.dart';
import '../storage/channel_order_store.dart';
import '../storage/channel_settings_store.dart';
import '../storage/channel_store.dart';
import '../storage/contact_settings_store.dart';
import '../storage/contact_store.dart';
import '../storage/message_store.dart';
@ -139,14 +140,15 @@ class MeshCoreConnector extends ChangeNotifier {
final ChannelSettingsStore _channelSettingsStore = ChannelSettingsStore();
final ContactSettingsStore _contactSettingsStore = ContactSettingsStore();
final ContactStore _contactStore = ContactStore();
final ChannelStore _channelStore = ChannelStore();
final UnreadStore _unreadStore = UnreadStore();
List<Channel> _cachedChannels = [];
final Map<int, bool> _channelSmazEnabled = {};
bool _lastSentWasCliCommand =
false; // Track if last sent message was a CLI command
final Map<String, bool> _contactSmazEnabled = {};
final Set<String> _knownContactKeys = {};
final Map<String, int> _contactLastReadMs = {};
final Map<int, int> _channelLastReadMs = {};
final Map<String, int> _contactUnreadCount = {};
bool _unreadStateLoaded = false;
final Map<String, _RepeaterAckContext> _pendingRepeaterAcks = {};
String? _activeContactKey;
@ -321,17 +323,7 @@ class MeshCoreConnector extends ChangeNotifier {
int getUnreadCountForContactKey(String contactKeyHex) {
if (!_unreadStateLoaded) return 0;
if (!_shouldTrackUnreadForContactKey(contactKeyHex)) return 0;
final messages = _conversations[contactKeyHex];
if (messages == null || messages.isEmpty) return 0;
final lastReadMs = _contactLastReadMs[contactKeyHex] ?? 0;
var count = 0;
for (final message in messages) {
if (message.isOutgoing || message.isCli) continue;
if (message.timestamp.millisecondsSinceEpoch > lastReadMs) {
count++;
}
}
return count;
return _contactUnreadCount[contactKeyHex] ?? 0;
}
int getUnreadCountForChannel(Channel channel) {
@ -340,17 +332,7 @@ class MeshCoreConnector extends ChangeNotifier {
int getUnreadCountForChannelIndex(int channelIndex) {
if (!_unreadStateLoaded) return 0;
final messages = _channelMessages[channelIndex];
if (messages == null || messages.isEmpty) return 0;
final lastReadMs = _channelLastReadMs[channelIndex] ?? 0;
var count = 0;
for (final message in messages) {
if (message.isOutgoing) continue;
if (message.timestamp.millisecondsSinceEpoch > lastReadMs) {
count++;
}
}
return count;
return _findChannelByIndex(channelIndex)?.unreadCount ?? 0;
}
int getTotalUnreadCount() {
@ -380,16 +362,17 @@ class MeshCoreConnector extends ChangeNotifier {
}
Future<void> loadUnreadState() async {
_contactLastReadMs
_contactUnreadCount
..clear()
..addAll(await _unreadStore.loadContactLastRead());
_channelLastReadMs
..clear()
..addAll(await _unreadStore.loadChannelLastRead());
..addAll(await _unreadStore.loadContactUnreadCount());
_unreadStateLoaded = true;
notifyListeners();
}
Future<void> loadCachedChannels() async {
_cachedChannels = await _channelStore.loadChannels();
}
void setActiveContact(String? contactKeyHex) {
if (contactKeyHex != null &&
!_shouldTrackUnreadForContactKey(contactKeyHex)) {
@ -411,17 +394,36 @@ class MeshCoreConnector extends ChangeNotifier {
void markContactRead(String contactKeyHex) {
if (!_shouldTrackUnreadForContactKey(contactKeyHex)) return;
final markMs = _calculateReadTimestampMs(
_conversations[contactKeyHex]?.map((m) => m.timestamp),
);
_setContactLastReadMs(contactKeyHex, markMs);
final previousCount = _contactUnreadCount[contactKeyHex] ?? 0;
if (previousCount > 0) {
_contactUnreadCount[contactKeyHex] = 0;
_appDebugLogService?.info(
'Contact $contactKeyHex marked as read (was $previousCount unread)',
tag: 'Unread',
);
_unreadStore.saveContactUnreadCount(
Map<String, int>.from(_contactUnreadCount),
);
notifyListeners();
}
}
void markChannelRead(int channelIndex) {
final markMs = _calculateReadTimestampMs(
_channelMessages[channelIndex]?.map((m) => m.timestamp),
);
_setChannelLastReadMs(channelIndex, markMs);
final channel = _findChannelByIndex(channelIndex);
if (channel != null && channel.unreadCount > 0) {
final previousCount = channel.unreadCount;
channel.unreadCount = 0;
_appDebugLogService?.info(
'Channel ${channel.name.isNotEmpty ? channel.name : channelIndex} marked as read (was $previousCount unread)',
tag: 'Unread',
);
unawaited(
_channelStore.saveChannels(
_channels.isNotEmpty ? _channels : _cachedChannels,
),
);
notifyListeners();
}
}
Future<void> setChannelSmazEnabled(int channelIndex, bool enabled) async {
@ -788,6 +790,9 @@ class MeshCoreConnector extends ChangeNotifier {
// Keep device clock aligned on every connection.
await syncTime();
// Fetch channels so we can track unread counts for incoming messages
unawaited(getChannels());
} catch (e) {
debugPrint("Connection error: $e");
await disconnect(manual: false);
@ -1341,8 +1346,10 @@ class MeshCoreConnector extends ChangeNotifier {
unawaited(_persistContacts());
_conversations.remove(contact.publicKeyHex);
_loadedConversationKeys.remove(contact.publicKeyHex);
_contactLastReadMs.remove(contact.publicKeyHex);
_unreadStore.saveContactLastRead(Map<String, int>.from(_contactLastReadMs));
_contactUnreadCount.remove(contact.publicKeyHex);
_unreadStore.saveContactUnreadCount(
Map<String, int>.from(_contactUnreadCount),
);
_messageStore.clearMessages(contact.publicKeyHex);
notifyListeners();
}
@ -1617,6 +1624,10 @@ class MeshCoreConnector extends ChangeNotifier {
_cleanupChannelSync(completed: true);
// Cache channels for offline use
_cachedChannels = List<Channel>.from(_channels);
unawaited(_channelStore.saveChannels(_channels));
// Apply ordering and notify UI
_applyChannelOrder();
notifyListeners();
@ -1651,8 +1662,6 @@ class MeshCoreConnector extends ChangeNotifier {
// Delete by setting empty name and zero PSK
await sendFrame(buildSetChannelFrame(index, '', Uint8List(16)));
_channelLastReadMs.remove(index);
_unreadStore.saveChannelLastRead(Map<int, int>.from(_channelLastReadMs));
// Clear stored messages for this channel
await _channelMessageStore.clearChannelMessages(index);
// Clear in-memory messages for this channel
@ -1930,9 +1939,9 @@ class MeshCoreConnector extends ChangeNotifier {
final contact = Contact.fromFrame(frame);
if (contact != null) {
if (contact.type == advTypeRepeater) {
_contactLastReadMs.remove(contact.publicKeyHex);
_unreadStore.saveContactLastRead(
Map<String, int>.from(_contactLastReadMs),
_contactUnreadCount.remove(contact.publicKeyHex);
_unreadStore.saveContactUnreadCount(
Map<String, int>.from(_contactUnreadCount),
);
}
// Check if this is a new contact
@ -2157,7 +2166,7 @@ class MeshCoreConnector extends ChangeNotifier {
}
}
_addMessage(message.senderKeyHex, message);
_maybeMarkActiveContactRead(message);
_maybeIncrementContactUnread(message);
notifyListeners();
// Show notification for new incoming message
@ -2348,7 +2357,7 @@ class MeshCoreConnector extends ChangeNotifier {
pathBytes: message.pathBytes,
);
final isNew = _addChannelMessage(message.channelIndex!, message);
_maybeMarkActiveChannelRead(message);
_maybeIncrementChannelUnread(message, isNew: isNew);
notifyListeners();
if (isNew) {
_maybeNotifyChannelMessage(message);
@ -2370,7 +2379,9 @@ class MeshCoreConnector extends ChangeNotifier {
final channelHash = payload[0];
final encrypted = Uint8List.fromList(payload.sublist(1));
for (final channel in _channels) {
// Use cached channels as fallback if live channels not yet loaded
final channelsToSearch = _channels.isNotEmpty ? _channels : _cachedChannels;
for (final channel in channelsToSearch) {
if (channel.isEmpty) continue;
final hash = _computeChannelHash(channel.psk);
if (hash != channelHash) continue;
@ -2409,7 +2420,7 @@ class MeshCoreConnector extends ChangeNotifier {
pathBytes: message.pathBytes,
);
final isNew = _addChannelMessage(channel.index, message);
_maybeMarkActiveChannelRead(message);
_maybeIncrementChannelUnread(message, isNew: isNew);
notifyListeners();
if (isNew) {
final label = channel.name.isEmpty
@ -2539,6 +2550,15 @@ class MeshCoreConnector extends ChangeNotifier {
'[ChannelSync] Received channel ${channel.index}: ${channel.isEmpty ? "empty" : channel.name}',
);
// Preserve unread count from cached channel
final cachedChannel = _cachedChannels.cast<Channel?>().firstWhere(
(c) => c?.index == channel.index,
orElse: () => null,
);
if (cachedChannel != null) {
channel.unreadCount = cachedChannel.unreadCount;
}
// If we're syncing and this is the channel we're waiting for
if (_isSyncingChannels && _channelSyncInFlight) {
if (channel.index == _nextChannelIndexToRequest) {
@ -2578,6 +2598,8 @@ class MeshCoreConnector extends ChangeNotifier {
(c) => c.index == channel.index,
);
if (existingIndex >= 0) {
// Preserve unread count from existing channel
channel.unreadCount = _channels[existingIndex].unreadCount;
_channels[existingIndex] = channel;
} else {
_channels.add(channel);
@ -2628,67 +2650,98 @@ class MeshCoreConnector extends ChangeNotifier {
return contact.type != advTypeRepeater;
}
int _calculateReadTimestampMs(Iterable<DateTime>? timestamps) {
var latestMs = 0;
if (timestamps != null) {
for (final timestamp in timestamps) {
final ms = timestamp.millisecondsSinceEpoch;
if (ms > latestMs) {
latestMs = ms;
}
}
}
return latestMs;
Channel? _findChannelByIndex(int index) {
return _channels.cast<Channel?>().firstWhere(
(c) => c?.index == index,
orElse: () => null,
) ??
_cachedChannels.cast<Channel?>().firstWhere(
(c) => c?.index == index,
orElse: () => null,
);
}
void _setContactLastReadMs(
String contactKeyHex,
int timestampMs, {
bool notify = true,
void _maybeIncrementChannelUnread(
ChannelMessage message, {
required bool isNew,
}) {
if (!_shouldTrackUnreadForContactKey(contactKeyHex)) return;
final existing = _contactLastReadMs[contactKeyHex] ?? 0;
if (timestampMs <= existing) return;
_contactLastReadMs[contactKeyHex] = timestampMs;
_unreadStore.saveContactLastRead(Map<String, int>.from(_contactLastReadMs));
if (notify) {
notifyListeners();
if (!isNew || message.isOutgoing) {
_appDebugLogService?.info(
'Skip unread increment: isNew=$isNew, isOutgoing=${message.isOutgoing}',
tag: 'Unread',
);
return;
}
}
void _setChannelLastReadMs(
int channelIndex,
int timestampMs, {
bool notify = true,
}) {
final existing = _channelLastReadMs[channelIndex] ?? 0;
if (timestampMs <= existing) return;
_channelLastReadMs[channelIndex] = timestampMs;
_unreadStore.saveChannelLastRead(Map<int, int>.from(_channelLastReadMs));
if (notify) {
notifyListeners();
}
}
void _maybeMarkActiveContactRead(Message message) {
if (message.isOutgoing || message.isCli) return;
if (_activeContactKey != message.senderKeyHex) return;
if (!_shouldTrackUnreadForContactKey(message.senderKeyHex)) return;
_setContactLastReadMs(
message.senderKeyHex,
message.timestamp.millisecondsSinceEpoch,
notify: false,
);
}
void _maybeMarkActiveChannelRead(ChannelMessage message) {
if (message.isOutgoing) return;
final channelIndex = message.channelIndex;
if (channelIndex == null || _activeChannelIndex != channelIndex) return;
_setChannelLastReadMs(
channelIndex,
message.timestamp.millisecondsSinceEpoch,
notify: false,
if (channelIndex == null) {
_appDebugLogService?.info(
'Skip unread increment: channelIndex is null',
tag: 'Unread',
);
return;
}
// Don't increment if user is viewing this channel
if (_activeChannelIndex == channelIndex) {
_appDebugLogService?.info(
'Skip unread increment: channel $channelIndex is active',
tag: 'Unread',
);
return;
}
final channel = _findChannelByIndex(channelIndex);
if (channel != null) {
channel.unreadCount++;
_appDebugLogService?.info(
'Channel ${channel.name.isNotEmpty ? channel.name : channelIndex} unread count incremented to ${channel.unreadCount}',
tag: 'Unread',
);
unawaited(
_channelStore.saveChannels(
_channels.isNotEmpty ? _channels : _cachedChannels,
),
);
} else {
_appDebugLogService?.info(
'Channel $channelIndex not found in _channels (${_channels.length}) or _cachedChannels (${_cachedChannels.length})',
tag: 'Unread',
);
}
}
void _maybeIncrementContactUnread(Message message) {
if (message.isOutgoing || message.isCli) {
_appDebugLogService?.info(
'Skip contact unread increment: isOutgoing=${message.isOutgoing}, isCli=${message.isCli}',
tag: 'Unread',
);
return;
}
final contactKey = message.senderKeyHex;
if (!_shouldTrackUnreadForContactKey(contactKey)) {
_appDebugLogService?.info(
'Skip contact unread increment: should not track for $contactKey',
tag: 'Unread',
);
return;
}
// Don't increment if user is viewing this contact
if (_activeContactKey == contactKey) {
_appDebugLogService?.info(
'Skip contact unread increment: contact $contactKey is active',
tag: 'Unread',
);
return;
}
final currentCount = _contactUnreadCount[contactKey] ?? 0;
_contactUnreadCount[contactKey] = currentCount + 1;
_appDebugLogService?.info(
'Contact $contactKey unread count incremented to ${currentCount + 1}',
tag: 'Unread',
);
_unreadStore.saveContactUnreadCount(
Map<String, int>.from(_contactUnreadCount),
);
}

View file

@ -60,6 +60,7 @@ void main() async {
await connector.loadContactCache();
await connector.loadChannelSettings();
await connector.loadCachedChannels();
// Load persisted channel messages
await connector.loadAllChannelMessages();

View file

@ -9,8 +9,14 @@ class Channel {
final int index;
final String name;
final Uint8List psk; // 16 bytes
int unreadCount;
Channel({required this.index, required this.name, required this.psk});
Channel({
required this.index,
required this.name,
required this.psk,
this.unreadCount = 0,
});
String get pskHex => _bytesToHex(psk);

View file

@ -41,6 +41,8 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
final Map<String, GlobalKey> _messageKeys = {};
bool _isLoadingOlder = false;
MeshCoreConnector? _connector;
@override
void initState() {
super.initState();
@ -48,7 +50,8 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
_scrollController.onScrollNearTop = _loadOlderMessages;
SchedulerBinding.instance.addPostFrameCallback((_) {
if (!mounted) return;
context.read<MeshCoreConnector>().setActiveChannel(widget.channel.index);
_connector = context.read<MeshCoreConnector>();
_connector?.setActiveChannel(widget.channel.index);
});
}
@ -72,7 +75,7 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
@override
void dispose() {
context.read<MeshCoreConnector>().setActiveChannel(null);
_connector?.setActiveChannel(null);
_textFieldFocusNode.removeListener(_onTextFieldFocusChange);
_textFieldFocusNode.dispose();
_textController.dispose();

View file

@ -44,6 +44,7 @@ class _ChatScreenState extends State<ChatScreen> {
final _scrollController = ChatScrollController();
final _textFieldFocusNode = FocusNode();
bool _isLoadingOlder = false;
MeshCoreConnector? _connector;
@override
void initState() {
@ -52,9 +53,8 @@ class _ChatScreenState extends State<ChatScreen> {
_scrollController.onScrollNearTop = _loadOlderMessages;
SchedulerBinding.instance.addPostFrameCallback((_) {
if (!mounted) return;
context.read<MeshCoreConnector>().setActiveContact(
widget.contact.publicKeyHex,
);
_connector = context.read<MeshCoreConnector>();
_connector?.setActiveContact(widget.contact.publicKeyHex);
});
}
@ -78,7 +78,7 @@ class _ChatScreenState extends State<ChatScreen> {
@override
void dispose() {
context.read<MeshCoreConnector>().setActiveContact(null);
_connector?.setActiveContact(null);
_textFieldFocusNode.removeListener(_onTextFieldFocusChange);
_textFieldFocusNode.dispose();
_textController.dispose();

View file

@ -0,0 +1,50 @@
import 'dart:convert';
import 'dart:typed_data';
import '../models/channel.dart';
import 'prefs_manager.dart';
class ChannelStore {
static const String _key = 'channels';
Future<List<Channel>> loadChannels() async {
final prefs = PrefsManager.instance;
final jsonStr = prefs.getString(_key);
if (jsonStr == null) return [];
try {
final jsonList = jsonDecode(jsonStr) as List<dynamic>;
return jsonList
.map((entry) => _fromJson(entry as Map<String, dynamic>))
.toList();
} catch (_) {
return [];
}
}
Future<void> saveChannels(List<Channel> channels) async {
final prefs = PrefsManager.instance;
final jsonList = channels.map(_toJson).toList();
await prefs.setString(_key, jsonEncode(jsonList));
}
Map<String, dynamic> _toJson(Channel channel) {
return {
'index': channel.index,
'name': channel.name,
'psk': base64Encode(channel.psk),
'unreadCount': channel.unreadCount,
};
}
Channel _fromJson(Map<String, dynamic> json) {
return Channel(
index: json['index'] as int,
name: json['name'] as String? ?? '',
psk: json['psk'] != null
? Uint8List.fromList(base64Decode(json['psk'] as String))
: Uint8List(16),
unreadCount: json['unreadCount'] as int? ?? 0,
);
}
}

View file

@ -5,27 +5,23 @@ import 'prefs_manager.dart';
/// Storage for unread message tracking with debounced writes to reduce I/O.
class UnreadStore {
static const String _contactLastReadKey = 'contact_last_read';
static const String _channelLastReadKey = 'channel_last_read';
static const String _contactUnreadCountKey = 'contact_unread_count';
// Debounce timers to batch rapid writes
Timer? _contactSaveTimer;
Timer? _channelSaveTimer;
Timer? _contactUnreadSaveTimer;
static const Duration _saveDebounceDuration = Duration(milliseconds: 500);
// Pending write data
Map<String, int>? _pendingContactLastRead;
Map<int, int>? _pendingChannelLastRead;
Map<String, int>? _pendingContactUnreadCount;
/// Dispose timers when no longer needed
void dispose() {
_contactSaveTimer?.cancel();
_channelSaveTimer?.cancel();
_contactUnreadSaveTimer?.cancel();
}
Future<Map<String, int>> loadContactLastRead() async {
Future<Map<String, int>> loadContactUnreadCount() async {
final prefs = PrefsManager.instance;
final jsonStr = prefs.getString(_contactLastReadKey);
final jsonStr = prefs.getString(_contactUnreadCountKey);
if (jsonStr == null) return {};
try {
@ -36,75 +32,30 @@ class UnreadStore {
}
}
/// Save contact last read timestamps with debouncing.
/// Writes are delayed by 500ms and batched to reduce I/O operations.
void saveContactLastRead(Map<String, int> lastReadMs) {
_pendingContactLastRead = lastReadMs;
void saveContactUnreadCount(Map<String, int> counts) {
_pendingContactUnreadCount = counts;
// Cancel existing timer
_contactSaveTimer?.cancel();
_contactUnreadSaveTimer?.cancel();
// Schedule new write
_contactSaveTimer = Timer(_saveDebounceDuration, () async {
if (_pendingContactLastRead != null) {
await _flushContactLastRead();
_contactUnreadSaveTimer = Timer(_saveDebounceDuration, () async {
if (_pendingContactUnreadCount != null) {
await _flushContactUnreadCount();
}
});
}
Future<Map<int, int>> loadChannelLastRead() async {
final prefs = PrefsManager.instance;
final jsonStr = prefs.getString(_channelLastReadKey);
if (jsonStr == null) return {};
try {
final json = jsonDecode(jsonStr) as Map<String, dynamic>;
return json.map((key, value) => MapEntry(int.parse(key), value as int));
} catch (_) {
return {};
}
}
/// Save channel last read timestamps with debouncing.
/// Writes are delayed by 500ms and batched to reduce I/O operations.
void saveChannelLastRead(Map<int, int> lastReadMs) {
_pendingChannelLastRead = lastReadMs;
_channelSaveTimer?.cancel();
_channelSaveTimer = Timer(_saveDebounceDuration, () async {
if (_pendingChannelLastRead != null) {
await _flushChannelLastRead();
}
});
}
Future<void> _flushContactLastRead() async {
if (_pendingContactLastRead == null) return;
Future<void> _flushContactUnreadCount() async {
if (_pendingContactUnreadCount == null) return;
final prefs = PrefsManager.instance;
final jsonStr = jsonEncode(_pendingContactLastRead);
await prefs.setString(_contactLastReadKey, jsonStr);
_pendingContactLastRead = null;
}
Future<void> _flushChannelLastRead() async {
if (_pendingChannelLastRead == null) return;
final prefs = PrefsManager.instance;
final asString = _pendingChannelLastRead!.map(
(key, value) => MapEntry(key.toString(), value),
);
final jsonStr = jsonEncode(asString);
await prefs.setString(_channelLastReadKey, jsonStr);
_pendingChannelLastRead = null;
final jsonStr = jsonEncode(_pendingContactUnreadCount);
await prefs.setString(_contactUnreadCountKey, jsonStr);
_pendingContactUnreadCount = null;
}
/// Immediately flush pending writes (call before app termination or disposal)
Future<void> flush() async {
_contactSaveTimer?.cancel();
_channelSaveTimer?.cancel();
await Future.wait([_flushContactLastRead(), _flushChannelLastRead()]);
_contactUnreadSaveTimer?.cancel();
await _flushContactUnreadCount();
}
}