feat: Add SFPP confirmed status to Messages and Reactions (#4139)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
Co-authored-by: Mac DeCourcy <github.znq26@slmail.me>
This commit is contained in:
James Rich 2026-01-08 07:21:21 -06:00 committed by GitHub
parent 78bd1ad6dd
commit 782c068ead
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 2699 additions and 61 deletions

View file

@ -51,7 +51,12 @@ constructor(
dbManager.currentDb.flatMapLatest { db -> db.packetDao().getContactKeys() }
fun getContactsPaged(): Flow<PagingData<Packet>> = Pager(
config = PagingConfig(pageSize = 30, enablePlaceholders = false, initialLoadSize = 30),
config =
PagingConfig(
pageSize = CONTACTS_PAGE_SIZE,
enablePlaceholders = false,
initialLoadSize = CONTACTS_PAGE_SIZE,
),
pagingSourceFactory = { dbManager.currentDb.value.packetDao().getContactKeysPaged() },
)
.flow
@ -113,7 +118,12 @@ constructor(
}
fun getMessagesFromPaged(contact: String, getNode: suspend (String?) -> Node): Flow<PagingData<Message>> = Pager(
config = PagingConfig(pageSize = 50, enablePlaceholders = false, initialLoadSize = 50),
config =
PagingConfig(
pageSize = MESSAGES_PAGE_SIZE,
enablePlaceholders = false,
initialLoadSize = MESSAGES_PAGE_SIZE,
),
pagingSourceFactory = { dbManager.currentDb.value.packetDao().getMessagesFromPaged(contact) },
)
.flow
@ -140,8 +150,100 @@ constructor(
suspend fun getPacketByPacketId(packetId: Int) =
withContext(dispatchers.io) { dbManager.currentDb.value.packetDao().getPacketByPacketId(packetId) }
@Suppress("CyclomaticComplexMethod")
suspend fun updateSFPPStatus(
packetId: Int,
from: Int,
to: Int,
hash: ByteArray,
status: MessageStatus = MessageStatus.SFPP_CONFIRMED,
rxTime: Long = 0,
myNodeNum: Int? = null,
) = withContext(dispatchers.io) {
val dao = dbManager.currentDb.value.packetDao()
val packets = dao.findPacketsWithId(packetId)
val reactions = dao.findReactionsWithId(packetId)
val fromId = DataPacket.nodeNumToDefaultId(from)
val isFromLocalNode = myNodeNum != null && from == myNodeNum
val toId =
if (to == 0 || to == DataPacket.NODENUM_BROADCAST) {
DataPacket.ID_BROADCAST
} else {
DataPacket.nodeNumToDefaultId(to)
}
packets.forEach { packet ->
// For sent messages, from is stored as ID_LOCAL, but SFPP packet has node number
val fromMatches =
packet.data.from == fromId || (isFromLocalNode && packet.data.from == DataPacket.ID_LOCAL)
co.touchlab.kermit.Logger.d {
"SFPP match check: packetFrom=${packet.data.from} fromId=$fromId " +
"isFromLocal=$isFromLocalNode fromMatches=$fromMatches " +
"packetTo=${packet.data.to} toId=$toId toMatches=${packet.data.to == toId}"
}
if (fromMatches && packet.data.to == toId) {
// If it's already confirmed, don't downgrade it to routing
if (packet.data.status == MessageStatus.SFPP_CONFIRMED && status == MessageStatus.SFPP_ROUTING) {
return@forEach
}
val newTime = if (rxTime > 0) rxTime * MILLISECONDS_IN_SECOND else packet.received_time
val updatedData = packet.data.copy(status = status, sfppHash = hash, time = newTime)
dao.update(packet.copy(data = updatedData, sfpp_hash = hash, received_time = newTime))
}
}
reactions.forEach { reaction ->
val reactionFrom = reaction.userId
// For sent reactions, from is stored as ID_LOCAL, but SFPP packet has node number
val fromMatches = reactionFrom == fromId || (isFromLocalNode && reactionFrom == DataPacket.ID_LOCAL)
val toMatches = reaction.to == toId
co.touchlab.kermit.Logger.d {
"SFPP reaction match check: reactionFrom=$reactionFrom fromId=$fromId " +
"isFromLocal=$isFromLocalNode fromMatches=$fromMatches " +
"reactionTo=${reaction.to} toId=$toId toMatches=$toMatches"
}
if (fromMatches && (reaction.to == null || toMatches)) {
if (reaction.status == MessageStatus.SFPP_CONFIRMED && status == MessageStatus.SFPP_ROUTING) {
return@forEach
}
val newTime = if (rxTime > 0) rxTime * MILLISECONDS_IN_SECOND else reaction.timestamp
val updatedReaction = reaction.copy(status = status, sfpp_hash = hash, timestamp = newTime)
dao.update(updatedReaction)
}
}
}
suspend fun updateSFPPStatusByHash(
hash: ByteArray,
status: MessageStatus = MessageStatus.SFPP_CONFIRMED,
rxTime: Long = 0,
) = withContext(dispatchers.io) {
val dao = dbManager.currentDb.value.packetDao()
dao.findPacketBySfppHash(hash)?.let { packet ->
// If it's already confirmed, don't downgrade it
if (packet.data.status == MessageStatus.SFPP_CONFIRMED && status == MessageStatus.SFPP_ROUTING) {
return@let
}
val newTime = if (rxTime > 0) rxTime * MILLISECONDS_IN_SECOND else packet.received_time
val updatedData = packet.data.copy(status = status, sfppHash = hash, time = newTime)
dao.update(packet.copy(data = updatedData, sfpp_hash = hash, received_time = newTime))
}
dao.findReactionBySfppHash(hash)?.let { reaction ->
if (reaction.status == MessageStatus.SFPP_CONFIRMED && status == MessageStatus.SFPP_ROUTING) {
return@let
}
val newTime = if (rxTime > 0) rxTime * MILLISECONDS_IN_SECOND else reaction.timestamp
val updatedReaction = reaction.copy(status = status, sfpp_hash = hash, timestamp = newTime)
dao.update(updatedReaction)
}
}
suspend fun deleteMessages(uuidList: List<Long>) = withContext(dispatchers.io) {
for (chunk in uuidList.chunked(500)) {
for (chunk in uuidList.chunked(DELETE_CHUNK_SIZE)) {
// Fetch DAO per chunk to avoid holding a stale reference if the active DB switches
dbManager.currentDb.value.packetDao().deleteMessages(chunk)
}
@ -187,4 +289,11 @@ constructor(
private fun org.meshtastic.core.database.dao.PacketDao.getAllWaypointsFlow(): Flow<List<Packet>> =
getAllPackets(PortNum.WAYPOINT_APP_VALUE)
companion object {
private const val CONTACTS_PAGE_SIZE = 30
private const val MESSAGES_PAGE_SIZE = 50
private const val DELETE_CHUNK_SIZE = 500
private const val MILLISECONDS_IN_SECOND = 1000L
}
}