perf(messaging): batch node + reply lookups in message loading (#5149)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich 2026-04-15 10:48:26 -05:00 committed by GitHub
parent dea364dd17
commit 878905aea3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 49 additions and 18 deletions

View file

@ -28,6 +28,7 @@ import kotlinx.coroutines.withContext
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.database.DatabaseProvider
import org.meshtastic.core.database.entity.PacketEntity
import org.meshtastic.core.database.entity.toReaction
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.ContactSettings
@ -154,13 +155,14 @@ class PacketRepositoryImpl(private val dbManager: DatabaseProvider, private val
else -> dao.getMessagesFrom(contact)
}
flow.mapLatest { packets ->
val cachedGetNode = memoize(getNode)
val replyIds = packets.mapNotNull { it.packet.data.replyId?.takeIf { id -> id != 0 } }.distinct()
val replyMap = batchGetPacketsByIds(replyIds)
packets.map { packet ->
val message = packet.toMessage(getNode)
message.replyId
.takeIf { it != null && it != 0 }
?.let { getPacketByPacketIdInternal(it) }
?.let { originalPacket -> originalPacket.toMessage(getNode) }
?.let { originalMessage -> message.copy(originalMessage = originalMessage) } ?: message
val message = packet.toMessage(cachedGetNode)
val replyId = message.replyId?.takeIf { it != 0 }
val originalMessage = replyId?.let { replyMap[it] }?.toMessage(cachedGetNode)
if (originalMessage != null) message.copy(originalMessage = originalMessage) else message
}
}
}
@ -177,13 +179,16 @@ class PacketRepositoryImpl(private val dbManager: DatabaseProvider, private val
)
.flow
.map { pagingData ->
val cachedGetNode = memoize(getNode)
val replyCache = mutableMapOf<Int, PacketEntity?>()
pagingData.map { packet ->
val message = packet.toMessage(getNode)
message.replyId
.takeIf { it != null && it != 0 }
?.let { getPacketByPacketIdInternal(it) }
?.let { originalPacket -> originalPacket.toMessage(getNode) }
?.let { originalMessage -> message.copy(originalMessage = originalMessage) } ?: message
val message = packet.toMessage(cachedGetNode)
val replyId = message.replyId?.takeIf { it != 0 }
val originalMessage =
replyId
?.let { id -> replyCache.getOrPut(id) { getPacketByPacketIdInternal(id) } }
?.toMessage(cachedGetNode)
if (originalMessage != null) message.copy(originalMessage = originalMessage) else message
}
}
@ -204,13 +209,16 @@ class PacketRepositoryImpl(private val dbManager: DatabaseProvider, private val
)
.flow
.map { pagingData ->
val cachedGetNode = memoize(getNode)
val replyCache = mutableMapOf<Int, PacketEntity?>()
pagingData.map { packet ->
val message = packet.toMessage(getNode)
message.replyId
.takeIf { it != null && it != 0 }
?.let { getPacketByPacketIdInternal(it) }
?.let { originalPacket -> originalPacket.toMessage(getNode) }
?.let { originalMessage -> message.copy(originalMessage = originalMessage) } ?: message
val message = packet.toMessage(cachedGetNode)
val replyId = message.replyId?.takeIf { it != 0 }
val originalMessage =
replyId
?.let { id -> replyCache.getOrPut(id) { getPacketByPacketIdInternal(id) } }
?.toMessage(cachedGetNode)
if (originalMessage != null) message.copy(originalMessage = originalMessage) else message
}
}
@ -230,6 +238,19 @@ class PacketRepositoryImpl(private val dbManager: DatabaseProvider, private val
private suspend fun getPacketByPacketIdInternal(packetId: Int) =
withContext(dispatchers.io) { dbManager.currentDb.value.packetDao().getPacketByPacketId(packetId) }
private suspend fun batchGetPacketsByIds(ids: List<Int>): Map<Int, PacketEntity> = if (ids.isEmpty()) {
emptyMap()
} else {
withContext(dispatchers.io) {
dbManager.currentDb.value.packetDao().getPacketsByPacketIds(ids).associateBy { it.packet.packetId }
}
}
private fun memoize(getNode: suspend (String?) -> Node): suspend (String?) -> Node {
val cache = mutableMapOf<String?, Node>()
return { id -> cache.getOrPut(id) { getNode(id) } }
}
override suspend fun insert(
packet: DataPacket,
myNodeNum: Int,

View file

@ -309,6 +309,16 @@ interface PacketDao {
)
suspend fun getPacketByPacketId(packetId: Int): PacketEntity?
@Transaction
@Query(
"""
SELECT * FROM packet
WHERE packet_id IN (:packetIds)
AND (myNodeNum = 0 OR myNodeNum = (SELECT myNodeNum FROM my_node))
""",
)
suspend fun getPacketsByPacketIds(packetIds: List<Int>): List<PacketEntity>
@Query(
"""
SELECT * FROM packet