fix: improve PKI message routing and resolve database migration racecondition (#4996)

This commit is contained in:
James Rich 2026-04-04 19:37:20 -05:00 committed by GitHub
parent d0e3b682ab
commit b3be9e2c38
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 277 additions and 37 deletions

View file

@ -95,23 +95,31 @@ class CommandSenderImpl(
private fun computeHopLimit(): Int = (localConfig.value.lora?.hop_limit ?: 0).takeIf { it > 0 } ?: DEFAULT_HOP_LIMIT
private fun getAdminChannelIndex(toNum: Int): Int {
/**
* Resolves the correct channel index for sending a packet to [toNum].
*
* When both the local node and the destination support PKC, returns [DataPacket.PKC_CHANNEL_INDEX] so that
* [buildMeshPacket] enables PKI encryption. Otherwise falls back to the node's heard-on channel (for general
* packets) or the dedicated admin channel (for admin packets).
*/
private fun getChannelIndex(toNum: Int, isAdmin: Boolean = false): Int {
val myNum = nodeManager.myNodeNum.value ?: return 0
val myNode = nodeManager.nodeDBbyNodeNum[myNum]
val destNode = nodeManager.nodeDBbyNodeNum[toNum]
val adminChannelIndex =
when {
myNum == toNum -> 0
myNode?.hasPKC == true && destNode?.hasPKC == true -> DataPacket.PKC_CHANNEL_INDEX
else ->
channelSet.value.settings
.indexOfFirst { it.name.equals(ADMIN_CHANNEL_NAME, ignoreCase = true) }
.coerceAtLeast(0)
}
return adminChannelIndex
return when {
myNum == toNum -> 0
myNode?.hasPKC == true && destNode?.hasPKC == true -> DataPacket.PKC_CHANNEL_INDEX
isAdmin ->
channelSet.value.settings
.indexOfFirst { it.name.equals(ADMIN_CHANNEL_NAME, ignoreCase = true) }
.coerceAtLeast(0)
else -> destNode?.channel ?: 0
}
}
private fun getAdminChannelIndex(toNum: Int): Int = getChannelIndex(toNum, isAdmin = true)
override fun sendData(p: DataPacket) {
if (p.id == 0) p.id = generatePacketId()
val bytes = p.bytes ?: ByteString.EMPTY
@ -191,7 +199,7 @@ class CommandSenderImpl(
packetHandler.sendToRadio(
buildMeshPacket(
to = idNum,
channel = if (destNum == null) 0 else nodeManager.nodeDBbyNodeNum[destNum]?.channel ?: 0,
channel = if (destNum == null) 0 else getChannelIndex(destNum),
priority = MeshPacket.Priority.BACKGROUND,
decoded =
Data(
@ -214,7 +222,7 @@ class CommandSenderImpl(
packetHandler.sendToRadio(
buildMeshPacket(
to = destNum,
channel = nodeManager.nodeDBbyNodeNum[destNum]?.channel ?: 0,
channel = getChannelIndex(destNum),
priority = MeshPacket.Priority.BACKGROUND,
decoded =
Data(
@ -249,7 +257,7 @@ class CommandSenderImpl(
packetHandler.sendToRadio(
buildMeshPacket(
to = destNum,
channel = nodeManager.nodeDBbyNodeNum[destNum]?.channel ?: 0,
channel = getChannelIndex(destNum),
decoded =
Data(
portnum = PortNum.NODEINFO_APP,
@ -267,7 +275,7 @@ class CommandSenderImpl(
to = destNum,
wantAck = true,
id = requestId,
channel = nodeManager.nodeDBbyNodeNum[destNum]?.channel ?: 0,
channel = getChannelIndex(destNum),
decoded = Data(portnum = PortNum.TRACEROUTE_APP, want_response = true, dest = destNum),
),
)
@ -305,7 +313,7 @@ class CommandSenderImpl(
buildMeshPacket(
to = destNum,
id = requestId,
channel = nodeManager.nodeDBbyNodeNum[destNum]?.channel ?: 0,
channel = getChannelIndex(destNum),
decoded = Data(portnum = portNum, payload = payloadBytes, want_response = true, dest = destNum),
),
)
@ -342,7 +350,7 @@ class CommandSenderImpl(
to = destNum,
wantAck = true,
id = requestId,
channel = nodeManager.nodeDBbyNodeNum[destNum]?.channel ?: 0,
channel = getChannelIndex(destNum),
decoded =
Data(
portnum = PortNum.NEIGHBORINFO_APP,
@ -358,7 +366,7 @@ class CommandSenderImpl(
to = destNum,
wantAck = true,
id = requestId,
channel = nodeManager.nodeDBbyNodeNum[destNum]?.channel ?: 0,
channel = getChannelIndex(destNum),
decoded = Data(portnum = PortNum.NEIGHBORINFO_APP, want_response = true, dest = destNum),
),
)
@ -397,7 +405,14 @@ class CommandSenderImpl(
if (channel == DataPacket.PKC_CHANNEL_INDEX) {
pkiEncrypted = true
publicKey = nodeManager.nodeDBbyNodeNum[to]?.user?.public_key ?: ByteString.EMPTY
val destNode = nodeManager.nodeDBbyNodeNum[to]
// Resolve the public key using the same fallback as Node.hasPKC:
// standalone publicKey (populated after Room round-trip) first, then
// the embedded user.public_key (always available in-memory).
publicKey = destNode?.let { it.publicKey ?: it.user.public_key } ?: ByteString.EMPTY
if (publicKey.size == 0) {
Logger.w { "buildMeshPacket: no public key for node ${to.toUInt()}, PKI encryption will fail" }
}
actualChannel = 0
}

View file

@ -99,6 +99,7 @@ class HistoryManagerImpl(private val meshPrefs: MeshPrefs, private val packetHan
MeshPacket(
from = myNodeNum,
to = myNodeNum,
id = kotlin.random.Random.nextInt(1, Int.MAX_VALUE),
decoded = Data(portnum = PortNum.STORE_FORWARD_APP, payload = request.encode().toByteString()),
priority = MeshPacket.Priority.BACKGROUND,
),

View file

@ -79,7 +79,14 @@ class MeshActionHandlerImpl(
override suspend fun onServiceAction(action: ServiceAction) {
Logger.d { "ServiceAction dispatched: ${action::class.simpleName}" }
ignoreExceptionSuspend {
val myNodeNum = nodeManager.myNodeNum.value ?: return@ignoreExceptionSuspend
val myNodeNum = nodeManager.myNodeNum.value
if (myNodeNum == null) {
Logger.w { "MeshActionHandlerImpl: myNodeNum is null, skipping ServiceAction!" }
if (action is ServiceAction.SendContact) {
action.result.complete(false)
}
return@ignoreExceptionSuspend
}
when (action) {
is ServiceAction.Favorite -> handleFavorite(action, myNodeNum)
is ServiceAction.Ignore -> handleIgnore(action, myNodeNum)

View file

@ -304,7 +304,7 @@ class MeshDataHandlerImpl(
if (p != null && p.status != MessageStatus.RECEIVED) {
val updatedPacket =
p.copy(status = m, relays = if (isAck) p.relays + 1 else p.relays, relayNode = relayNode)
packetRepository.value.update(updatedPacket)
packetRepository.value.update(updatedPacket, routingError = routingError)
}
reaction?.let { r ->

View file

@ -103,7 +103,9 @@ class NodeManagerImpl(
val byId = mutableMapOf<String, Node>()
nodes.values.forEach { byId[it.user.id] = it }
_nodeDBbyID.value = persistentMapOf<String, Node>().putAll(byId)
myNodeNum.value = nodeRepository.myNodeInfo.value?.myNodeNum
if (myNodeNum.value == null) {
myNodeNum.value = nodeRepository.myNodeInfo.value?.myNodeNum
}
}
}
@ -195,7 +197,12 @@ class NodeManagerImpl(
} else {
val keyMatch = !node.hasPKC || node.user.public_key == p.public_key
val newUser = if (keyMatch) p else p.copy(public_key = ByteString.EMPTY)
node.copy(user = newUser, channel = channel, manuallyVerified = manuallyVerified)
node.copy(
user = newUser,
publicKey = newUser.public_key,
channel = channel,
manuallyVerified = manuallyVerified,
)
}
if (newNode && !shouldPreserve) {
scope.handledLaunch {
@ -278,7 +285,7 @@ class NodeManagerImpl(
if (info.via_mqtt) {
newUser = newUser.copy(long_name = "${newUser.long_name} (MQTT)")
}
next = next.copy(user = newUser)
next = next.copy(user = newUser, publicKey = newUser.public_key)
}
}
val position = info.position

View file

@ -256,12 +256,20 @@ class PacketRepositoryImpl(private val dbManager: DatabaseProvider, private val
insertRoomPacket(packetToSave)
}
override suspend fun update(packet: DataPacket): Unit = withContext(dispatchers.io) {
override suspend fun update(packet: DataPacket, routingError: Int): Unit = withContext(dispatchers.io) {
val dao = dbManager.currentDb.value.packetDao()
// Match on key fields that identify the packet, rather than the entire data object
dao.findPacketsWithId(packet.id)
.find { it.data.id == packet.id && it.data.from == packet.from && it.data.to == packet.to }
?.let { dao.update(it.copy(data = packet)) }
?.let { existing ->
val updated =
if (routingError >= 0) {
existing.copy(data = packet, routingError = routingError)
} else {
existing.copy(data = packet)
}
dao.update(updated)
}
}
override suspend fun insertReaction(reaction: Reaction, myNodeNum: Int) =

View file

@ -18,6 +18,8 @@ package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.mock
import okio.ByteString
import okio.ByteString.Companion.toByteString
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.Node
import org.meshtastic.core.repository.NodeRepository
@ -34,6 +36,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import org.meshtastic.proto.NodeInfo as ProtoNodeInfo
import org.meshtastic.proto.Position as ProtoPosition
class NodeManagerImplTest {
@ -226,4 +229,103 @@ class NodeManagerImplTest {
assertTrue(!nodeManager.nodeDBbyNodeNum.containsKey(nodeNum))
assertTrue(!nodeManager.nodeDBbyID.containsKey("!testnode"))
}
@Test
fun `handleReceivedUser sets publicKey from user public_key`() {
val nodeNum = 1234
val pk = ByteArray(32) { (it + 1).toByte() }.toByteString()
val existingUser =
User(id = "!12345678", long_name = "Existing", short_name = "EX", hw_model = HardwareModel.TLORA_V2)
nodeManager.updateNode(nodeNum) { it.copy(user = existingUser) }
val incomingUser =
User(
id = "!12345678",
long_name = "Updated",
short_name = "UP",
hw_model = HardwareModel.TLORA_V2,
public_key = pk,
)
nodeManager.handleReceivedUser(nodeNum, incomingUser)
val result = nodeManager.nodeDBbyNodeNum[nodeNum]!!
assertEquals(pk, result.publicKey)
assertEquals(pk, result.user.public_key)
assertTrue(result.hasPKC)
}
@Test
fun `handleReceivedUser sets empty publicKey when key mismatch clears user key`() {
val nodeNum = 1234
val existingPk = ByteArray(32) { (it + 1).toByte() }.toByteString()
val existingUser =
User(
id = "!12345678",
long_name = "Existing",
short_name = "EX",
hw_model = HardwareModel.TLORA_V2,
public_key = existingPk,
)
nodeManager.updateNode(nodeNum) { it.copy(user = existingUser, publicKey = existingPk) }
val differentPk = ByteArray(32) { (it + 10).toByte() }.toByteString()
val incomingUser =
User(
id = "!12345678",
long_name = "Updated",
short_name = "UP",
hw_model = HardwareModel.TLORA_V2,
public_key = differentPk,
)
nodeManager.handleReceivedUser(nodeNum, incomingUser)
val result = nodeManager.nodeDBbyNodeNum[nodeNum]!!
// Key mismatch: newUser gets public_key cleared to EMPTY, and publicKey should match
assertEquals(ByteString.EMPTY, result.publicKey)
assertEquals(ByteString.EMPTY, result.user.public_key)
}
@Test
fun `installNodeInfo sets publicKey from user public_key`() {
val nodeNum = 5678
val pk = ByteArray(32) { (it + 1).toByte() }.toByteString()
val user =
User(
id = "!abcd1234",
long_name = "Remote Node",
short_name = "RN",
hw_model = HardwareModel.HELTEC_V3,
public_key = pk,
)
val info = ProtoNodeInfo(num = nodeNum, user = user, last_heard = 1000, channel = 0)
nodeManager.installNodeInfo(info)
val result = nodeManager.nodeDBbyNodeNum[nodeNum]!!
assertEquals(pk, result.publicKey)
assertEquals(pk, result.user.public_key)
assertTrue(result.hasPKC)
}
@Test
fun `installNodeInfo clears publicKey for licensed users`() {
val nodeNum = 5678
val pk = ByteArray(32) { (it + 1).toByte() }.toByteString()
val user =
User(
id = "!abcd1234",
long_name = "Licensed Op",
short_name = "LO",
hw_model = HardwareModel.HELTEC_V3,
public_key = pk,
is_licensed = true,
)
val info = ProtoNodeInfo(num = nodeNum, user = user, last_heard = 1000, channel = 0)
nodeManager.installNodeInfo(info)
val result = nodeManager.nodeDBbyNodeNum[nodeNum]!!
assertEquals(ByteString.EMPTY, result.publicKey)
assertEquals(ByteString.EMPTY, result.user.public_key)
}
}

View file

@ -125,16 +125,21 @@ open class DatabaseManager(
// Build/open Room DB off the main thread
val db = withContext(dispatchers.io) { getOrOpenDatabase(dbName) }
if (previousDbName != null && previousDbName != dbName) {
closeCachedDatabase(previousDbName)
}
// Emit the new DB BEFORE closing the old one. flatMapLatest collectors on
// currentDb will cancel their in-flight queries on the previous database once
// the new value is emitted. Closing the old pool first would race with those
// collectors, causing "Connection pool is closed" crashes.
_currentDb.value = db
_currentAddress.value = address
markLastUsed(dbName)
// Also mark the previous DB as used "just now" so LRU has an accurate, recent timestamp
previousDbName?.let { markLastUsed(it) }
// Now safe to close the previous DB — collectors have switched to the new instance.
if (previousDbName != null && previousDbName != dbName) {
closeCachedDatabase(previousDbName)
}
// Defer LRU eviction so switch is not blocked by filesystem work
managerScope.launch(dispatchers.io) { enforceCacheLimit(activeDbName = dbName) }

View file

@ -21,10 +21,12 @@ import org.meshtastic.core.resources.Res
import org.meshtastic.core.resources.delivery_confirmed
import org.meshtastic.core.resources.error
import org.meshtastic.core.resources.message_delivery_status
import org.meshtastic.core.resources.message_status_delivered
import org.meshtastic.core.resources.message_status_enroute
import org.meshtastic.core.resources.message_status_queued
import org.meshtastic.core.resources.message_status_sfpp_confirmed
import org.meshtastic.core.resources.message_status_sfpp_routing
import org.meshtastic.core.resources.message_status_unknown
import org.meshtastic.core.resources.routing_error_admin_bad_session_key
import org.meshtastic.core.resources.routing_error_admin_public_key_unauthorized
import org.meshtastic.core.resources.routing_error_bad_request
@ -103,7 +105,11 @@ data class Message(
MessageStatus.ENROUTE -> Res.string.message_status_enroute
MessageStatus.SFPP_ROUTING -> Res.string.message_status_sfpp_routing
MessageStatus.SFPP_CONFIRMED -> Res.string.message_status_sfpp_confirmed
else -> getStringResFrom(routingError)
MessageStatus.DELIVERED -> Res.string.message_status_delivered
MessageStatus.ERROR -> getStringResFrom(routingError)
MessageStatus.UNKNOWN,
null,
-> Res.string.message_status_unknown
}
return title to text
}

View file

@ -175,8 +175,8 @@ interface PacketRepository {
filtered: Boolean = false,
)
/** Updates an existing packet in the database. */
suspend fun update(packet: DataPacket)
/** Updates an existing packet in the database, optionally setting a routing error code. */
suspend fun update(packet: DataPacket, routingError: Int = -1)
/** Persists a message reaction (emoji). */
suspend fun insertReaction(reaction: Reaction, myNodeNum: Int)

View file

@ -71,16 +71,24 @@ class SendMessageUseCaseImpl(
val ourNode = nodeRepository.ourNodeInfo.value
val fromId = ourNode?.user?.id ?: DataPacket.ID_LOCAL
// logic for direct messages
if (channel == null) {
// Direct message side-effects: share the contact's public key (PKI) or
// favorite the node (legacy) before sending the first message. PKI DMs use
// channel == PKC_CHANNEL_INDEX (8); legacy DMs have no channel prefix
// (channel == null). Both formats target a specific node.
val isDirectMessage = channel == null || channel == DataPacket.PKC_CHANNEL_INDEX
if (isDirectMessage) {
val destNode = nodeRepository.getNode(dest)
val fwVersion = ourNode?.metadata?.firmware_version
val isClientBase = ourNode?.user?.role == Config.DeviceConfig.Role.CLIENT_BASE
val capabilities = Capabilities(fwVersion)
if (capabilities.canSendVerifiedContacts) {
// Best-effort: inform firmware of the destination's public key
// for its NodeDB cache. The MeshPacket itself carries the key
// directly, so the message can be encrypted regardless.
sendSharedContact(destNode)
} else {
} else if (channel == null) {
// Legacy favoriting only applies to old-style DMs without PKI
if (!destNode.isFavorite && !isClientBase) {
favoriteNode(destNode)
}

View file

@ -138,4 +138,77 @@ class SendMessageUseCaseTest {
// Assert
// Verified by observing that no exception is thrown and coverage is hit.
}
@Test
fun `invoke with PKI DM triggers sendSharedContact`() = runTest {
// Arrange: PKI DMs use contactKey = "8!nodeHex" (PKC_CHANNEL_INDEX = 8)
val ourNode =
Node(
num = 1,
user = User(id = "!local", role = Config.DeviceConfig.Role.CLIENT),
metadata = DeviceMetadata(firmware_version = "2.7.12"),
)
nodeRepository.setOurNode(ourNode)
val destNode = Node(num = 0x70fdde9b.toInt(), user = User(id = "!70fdde9b"))
nodeRepository.upsert(destNode)
appPreferences.homoglyph.setHomoglyphEncodingEnabled(false)
// Act — PKI DM: channel 8 + node ID
useCase("PKI direct message", "${DataPacket.PKC_CHANNEL_INDEX}!70fdde9b", null)
// Assert — sendSharedContact should be called for PKI DMs
radioController.sentSharedContacts.size shouldBe 1
radioController.sentSharedContacts[0] shouldBe 0x70fdde9b.toInt()
radioController.favoritedNodes.size shouldBe 0
}
@Test
fun `invoke with channel DM does not trigger sendSharedContact or favorite`() = runTest {
// Arrange: channel-based DMs use contactKey = "<ch>!nodeHex" where ch is 0-7
val ourNode =
Node(
num = 1,
user = User(id = "!local", role = Config.DeviceConfig.Role.CLIENT),
metadata = DeviceMetadata(firmware_version = "2.7.12"),
)
nodeRepository.setOurNode(ourNode)
val destNode = Node(num = 0x12345678, user = User(id = "!12345678"))
nodeRepository.upsert(destNode)
appPreferences.homoglyph.setHomoglyphEncodingEnabled(false)
// Act — channel 1 DM (not PKI, not legacy)
useCase("Channel DM", "1!12345678", null)
// Assert — neither sendSharedContact nor favorite should be called for channel DMs
radioController.sentSharedContacts.size shouldBe 0
radioController.favoritedNodes.size shouldBe 0
}
@Test
fun `invoke with PKI DM to older firmware does not trigger favorite`() = runTest {
// Arrange: PKI DMs with old firmware should NOT fall through to favoriting
val ourNode =
Node(
num = 1,
user = User(id = "!local", role = Config.DeviceConfig.Role.CLIENT),
metadata = DeviceMetadata(firmware_version = "2.0.0"),
)
nodeRepository.setOurNode(ourNode)
val destNode = Node(num = 0xABCDEF01.toInt(), user = User(id = "!abcdef01"))
nodeRepository.upsert(destNode)
appPreferences.homoglyph.setHomoglyphEncodingEnabled(false)
// Act — PKI DM with firmware that doesn't support verified contacts
useCase("Old PKI DM", "${DataPacket.PKC_CHANNEL_INDEX}!abcdef01", null)
// Assert — PKI DMs should not trigger legacy favoriting (that's only for channel==null)
radioController.sentSharedContacts.size shouldBe 0
radioController.favoritedNodes.size shouldBe 0
}
}

View file

@ -57,6 +57,8 @@
<string name="unrecognized">Unrecognized</string>
<string name="message_status_enroute">Waiting to be acknowledged</string>
<string name="message_status_queued">Queued for sending</string>
<string name="message_status_delivered">Delivered to mesh</string>
<string name="message_status_unknown">Unknown</string>
<string name="message_status_sfpp_routing">Routing via SF++ chain…</string>
<string name="message_status_sfpp_confirmed">Confirmed on SF++ chain</string>
<string name="routing_error_none">Acknowledged</string>

View file

@ -65,8 +65,10 @@ import org.meshtastic.core.resources.Res
import org.meshtastic.core.resources.delivery_confirmed
import org.meshtastic.core.resources.error
import org.meshtastic.core.resources.message_delivery_status
import org.meshtastic.core.resources.message_status_delivered
import org.meshtastic.core.resources.message_status_enroute
import org.meshtastic.core.resources.message_status_queued
import org.meshtastic.core.resources.message_status_unknown
import org.meshtastic.core.resources.react
import org.meshtastic.core.resources.you
import org.meshtastic.core.ui.component.BottomSheetDialog
@ -210,7 +212,11 @@ internal fun ReactionDialog(
MessageStatus.RECEIVED -> Res.string.delivery_confirmed
MessageStatus.QUEUED -> Res.string.message_status_queued
MessageStatus.ENROUTE -> Res.string.message_status_enroute
else -> getStringResFrom(reaction.routingError)
MessageStatus.DELIVERED -> Res.string.message_status_delivered
MessageStatus.SFPP_ROUTING -> Res.string.message_status_enroute
MessageStatus.SFPP_CONFIRMED -> Res.string.delivery_confirmed
MessageStatus.ERROR -> getStringResFrom(reaction.routingError)
MessageStatus.UNKNOWN -> Res.string.message_status_unknown
}
val relayNodeName =