feat(core): implement batched node info handling for mesh handshake

This commit optimizes the mesh handshake protocol by introducing explicit support for `NodeInfoBatch` messages. It updates the configuration flow to handle both primary batched node delivery and legacy single-node delivery for backwards compatibility with older firmware.

Key changes include:

- **Batch Processing Optimization:**
    - Added `handleNodeInfoBatch` to the `MeshConfigFlowManager` interface to allow bulk processing of node information, reducing per-item overhead during the initial handshake.
    - Updated `MeshConfigFlowManagerImpl` to accumulate batched nodes efficiently using `addAll`.
    - Refactored `FromRadioPacketHandlerImpl` to delegate batch processing directly to the manager instead of iterating through individual items.

- **Handshake Protocol Updates:**
    - Updated `HandshakeConstants` to distinguish between `BATCH_NODE_INFO_NONCE` (primary Stage 2) and `NODE_INFO_NONCE` (legacy Stage 2).
    - Modified `handleConfigComplete` logic to trigger Stage 2 completion for both batched and legacy nonces.
    - Ensured `MeshConnectionManager` prioritizes the batch nonce when requesting node information.

- **Testing & Simulation:**
    - Created `MeshConfigFlowManagerImplTest` to validate node accumulation, batch handling, and handshake nonce routing.
    - Improved `MockInterface` to better simulate real-world packet ordering by delaying live traffic until after the handshake completion coroutine has processed the node database.
    - Added verification tests to ensure the connection manager uses the correct batching nonces.

Specific changes:
- Added `handleNodeInfoBatch` implementation to `MeshConfigFlowManagerImpl`.
- Updated documentation in `HandshakeConstants` regarding two-stage mesh handshake protocol.
- Refactored `MockInterface.sendStage2NodeInfoResponse` to handle packet encoding and simulation delays.
- Added unit tests covering edge cases for empty and mixed node info batches.
This commit is contained in:
James Rich 2026-04-03 10:36:16 -05:00
parent b708b2ff76
commit a74d5d470a
8 changed files with 271 additions and 30 deletions

View file

@ -82,7 +82,7 @@ class FromRadioPacketHandlerImpl(
serviceRepository.setConnectionProgress("Nodes (${router.value.configFlowManager.newNodeCount})")
}
nodeInfoBatch != null -> {
nodeInfoBatch.items.forEach { info -> router.value.configFlowManager.handleNodeInfo(info) }
router.value.configFlowManager.handleNodeInfoBatch(nodeInfoBatch.items)
serviceRepository.setConnectionProgress("Nodes (${router.value.configFlowManager.newNodeCount})")
}
configCompleteId != null -> router.value.configFlowManager.handleConfigComplete(configCompleteId)

View file

@ -77,8 +77,8 @@ class MeshConfigFlowManagerImpl(
override fun handleConfigComplete(configCompleteId: Int) {
when (configCompleteId) {
HandshakeConstants.CONFIG_NONCE -> handleConfigOnlyComplete()
HandshakeConstants.NODE_INFO_NONCE,
HandshakeConstants.BATCH_NODE_INFO_NONCE,
HandshakeConstants.NODE_INFO_NONCE,
-> handleNodeInfoComplete()
else -> Logger.w { "Config complete id mismatch: $configCompleteId" }
}
@ -171,6 +171,10 @@ class MeshConfigFlowManagerImpl(
newNodes.add(info)
}
override fun handleNodeInfoBatch(items: List<NodeInfo>) {
newNodes.addAll(items)
}
override fun handleFileInfo(info: FileInfo) {
Logger.d { "FileInfo received: ${info.file_name} (${info.size_bytes} bytes)" }
scope.handledLaunch { radioConfigRepository.addFileInfo(info) }

View file

@ -163,20 +163,19 @@ class FromRadioPacketHandlerImplTest {
}
@Test
fun `handleFromRadio routes NODE_INFO_BATCH items to configFlowManager and updates status`() {
fun `handleFromRadio routes NODE_INFO_BATCH to handleNodeInfoBatch and updates status`() {
val node1 = ProtoNodeInfo(num = 1111)
val node2 = ProtoNodeInfo(num = 2222)
val node3 = ProtoNodeInfo(num = 3333)
val batch = NodeInfoBatch(items = listOf(node1, node2, node3))
val items = listOf(node1, node2, node3)
val batch = NodeInfoBatch(items = items)
val proto = FromRadio(node_info_batch = batch)
every { configFlowManager.newNodeCount } returns 3
handler.handleFromRadio(proto)
verify { configFlowManager.handleNodeInfo(node1) }
verify { configFlowManager.handleNodeInfo(node2) }
verify { configFlowManager.handleNodeInfo(node3) }
verify { configFlowManager.handleNodeInfoBatch(items) }
verify { serviceRepository.setConnectionProgress("Nodes (3)") }
}

View file

@ -0,0 +1,209 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import dev.mokkery.verifyNoMoreCalls
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.model.Node
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.HandshakeConstants
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.PacketHandler
import org.meshtastic.core.repository.PlatformAnalytics
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceBroadcasts
import org.meshtastic.core.testing.FakeNodeRepository
import org.meshtastic.core.testing.FakeServiceRepository
import org.meshtastic.proto.NodeInfo
import org.meshtastic.proto.ToRadio
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class)
class MeshConfigFlowManagerImplTest {
private val connectionManager = mock<MeshConnectionManager>(MockMode.autofill)
private val nodeRepository = FakeNodeRepository()
private val serviceRepository = FakeServiceRepository()
private val radioConfigRepository = mock<RadioConfigRepository>(MockMode.autofill)
private val serviceBroadcasts = mock<ServiceBroadcasts>(MockMode.autofill)
private val analytics = mock<PlatformAnalytics>(MockMode.autofill)
private val commandSender = mock<CommandSender>(MockMode.autofill)
private val packetHandler = mock<PacketHandler>(MockMode.autofill)
private val nodeManager = mock<NodeManager>(MockMode.autofill)
// Tracks nodes installed via nodeManager.installNodeInfo so assertions can inspect them
private val installedNodes: MutableMap<Int, Node> = mutableMapOf()
private val testDispatcher = UnconfinedTestDispatcher()
private lateinit var manager: MeshConfigFlowManagerImpl
@BeforeTest
fun setUp() {
every { connectionManager.startNodeInfoOnly() } returns Unit
every { connectionManager.onRadioConfigLoaded() } returns Unit
every { connectionManager.onNodeDbReady() } returns Unit
every { packetHandler.sendToRadio(any<ToRadio>()) } returns Unit
every { serviceBroadcasts.broadcastConnection() } returns Unit
every { nodeManager.nodeDBbyNodeNum } returns installedNodes
every { nodeManager.myNodeNum } returns null
every { nodeManager.setNodeDbReady(any<Boolean>()) } returns Unit
every { nodeManager.setAllowNodeDbWrites(any<Boolean>()) } returns Unit
every { nodeManager.installNodeInfo(any<NodeInfo>(), any<Boolean>()) } returns Unit
manager =
MeshConfigFlowManagerImpl(
nodeManager = nodeManager,
connectionManager = lazy { connectionManager },
nodeRepository = nodeRepository,
radioConfigRepository = radioConfigRepository,
serviceRepository = serviceRepository,
serviceBroadcasts = serviceBroadcasts,
analytics = analytics,
commandSender = commandSender,
packetHandler = packetHandler,
)
}
// -------------------------------------------------------------------------
// handleNodeInfo / handleNodeInfoBatch — accumulation
// -------------------------------------------------------------------------
@Test
fun `handleNodeInfo accumulates nodes and increments newNodeCount`() {
manager.handleNodeInfo(NodeInfo(num = 1))
manager.handleNodeInfo(NodeInfo(num = 2))
assertEquals(2, manager.newNodeCount)
}
@Test
fun `handleNodeInfoBatch adds all items in one shot`() {
val items = listOf(NodeInfo(num = 10), NodeInfo(num = 11), NodeInfo(num = 12))
manager.handleNodeInfoBatch(items)
assertEquals(3, manager.newNodeCount)
}
@Test
fun `handleNodeInfoBatch with empty list leaves count at zero`() {
manager.handleNodeInfoBatch(emptyList())
assertEquals(0, manager.newNodeCount)
}
@Test
fun `handleNodeInfoBatch with single item equals count of one`() {
manager.handleNodeInfoBatch(listOf(NodeInfo(num = 7)))
assertEquals(1, manager.newNodeCount)
}
@Test
fun `handleNodeInfoBatch and handleNodeInfo accumulate together`() {
manager.handleNodeInfo(NodeInfo(num = 1))
manager.handleNodeInfoBatch(listOf(NodeInfo(num = 2), NodeInfo(num = 3)))
assertEquals(3, manager.newNodeCount)
}
// -------------------------------------------------------------------------
// handleConfigComplete — nonce routing
// -------------------------------------------------------------------------
@Test
fun `handleConfigComplete with BATCH_NODE_INFO_NONCE triggers Stage 2 completion`() = runTest(testDispatcher) {
manager.start(backgroundScope)
manager.handleNodeInfoBatch(listOf(NodeInfo(num = 100)))
// installNodeInfo is a no-op mock; seed the map so nodeDBbyNodeNum[num] is non-null
installedNodes[100] = Node(num = 100)
manager.handleConfigComplete(HandshakeConstants.BATCH_NODE_INFO_NONCE)
advanceUntilIdle()
verify { connectionManager.onNodeDbReady() }
}
@Test
fun `handleConfigComplete with NODE_INFO_NONCE (legacy) also triggers Stage 2 completion`() =
runTest(testDispatcher) {
manager.start(backgroundScope)
manager.handleNodeInfo(NodeInfo(num = 200))
installedNodes[200] = Node(num = 200)
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
verify { connectionManager.onNodeDbReady() }
}
@Test
fun `handleConfigComplete with unknown nonce takes no action`() = runTest(testDispatcher) {
manager.start(backgroundScope)
manager.handleConfigComplete(99999)
advanceUntilIdle()
verifyNoMoreCalls(connectionManager)
}
// -------------------------------------------------------------------------
// handleConfigComplete Stage 2 — newNodes cleared after completion
// -------------------------------------------------------------------------
@Test
fun `newNodeCount resets to zero after Stage 2 completion`() = runTest(testDispatcher) {
manager.start(backgroundScope)
manager.handleNodeInfoBatch(listOf(NodeInfo(num = 1), NodeInfo(num = 2)))
installedNodes[1] = Node(num = 1)
installedNodes[2] = Node(num = 2)
assertEquals(2, manager.newNodeCount)
manager.handleConfigComplete(HandshakeConstants.BATCH_NODE_INFO_NONCE)
advanceUntilIdle()
assertEquals(0, manager.newNodeCount)
}
// -------------------------------------------------------------------------
// handleConfigComplete Stage 2 — empty batch edge case
// -------------------------------------------------------------------------
@Test
fun `Stage 2 completion with empty batch signals readiness with no installed nodes`() = runTest(testDispatcher) {
manager.start(backgroundScope)
// Intentionally skip any handleNodeInfo* calls — simulates an empty NodeInfoBatch
manager.handleConfigComplete(HandshakeConstants.BATCH_NODE_INFO_NONCE)
advanceUntilIdle()
verify { connectionManager.onNodeDbReady() }
}
}

View file

@ -35,6 +35,7 @@ import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.Node
import org.meshtastic.core.repository.AppWidgetUpdater
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.HandshakeConstants
import org.meshtastic.core.repository.HistoryManager
import org.meshtastic.core.repository.MeshLocationManager
import org.meshtastic.core.repository.MeshServiceNotifications
@ -54,6 +55,7 @@ import org.meshtastic.proto.Config
import org.meshtastic.proto.LocalConfig
import org.meshtastic.proto.LocalModuleConfig
import org.meshtastic.proto.ModuleConfig
import org.meshtastic.proto.ToRadio
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
@ -151,17 +153,6 @@ class MeshConnectionManagerImplTest {
@Test
fun `Disconnected state stops services`() = runTest(testDispatcher) {
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } returns Unit
every { serviceNotifications.updateServiceStateNotification(any(), any()) } returns Unit
every { packetHandler.stopPacketQueue() } returns Unit
every { locationManager.stop() } returns Unit
every { mqttManager.stop() } returns Unit
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } returns Unit
every { serviceNotifications.updateServiceStateNotification(any(), any()) } returns Unit
every { packetHandler.stopPacketQueue() } returns Unit
every { locationManager.stop() } returns Unit
every { mqttManager.stop() } returns Unit
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
manager.start(backgroundScope)
// Transition to Connected first so that Disconnected actually does something
radioConnectionState.value = ConnectionState.Connected
@ -267,4 +258,25 @@ class MeshConnectionManagerImplTest {
verify { mqttManager.start(any(), true, true) }
verify { historyManager.requestHistoryReplay(any(), any(), any(), any()) }
}
@Test
fun `startNodeInfoOnly sends BATCH_NODE_INFO_NONCE not the legacy nonce`() = runTest(testDispatcher) {
val sentPackets = mutableListOf<ToRadio>()
every { packetHandler.sendToRadio(any<ToRadio>()) } calls
{ call ->
sentPackets.add(call.arg(0))
Unit
}
manager.start(backgroundScope)
manager.startNodeInfoOnly()
advanceUntilIdle()
val nodeInfoPacket = sentPackets.firstOrNull { (it.want_config_id ?: 0) != 0 }
assertEquals(
HandshakeConstants.BATCH_NODE_INFO_NONCE,
nodeInfoPacket?.want_config_id,
"startNodeInfoOnly must use BATCH_NODE_INFO_NONCE, not the legacy NODE_INFO_NONCE",
)
}
}

View file

@ -352,8 +352,9 @@ class MockInterface(private val service: RadioInterfaceService, val address: Str
}
/**
* Stage 2: send all nodes as a single [NodeInfoBatch], then config_complete_id. After the handshake completes,
* simulate live traffic.
* Stage 2: send all nodes as a single [NodeInfoBatch], then config_complete_id. Live traffic is simulated in a
* separate coroutine after a short delay so it arrives *after* the handshake completion coroutine has had a chance
* to commit the node DB matching real-world ordering.
*/
private fun sendStage2NodeInfoResponse(configId: Int) {
val batch =
@ -364,18 +365,22 @@ class MockInterface(private val service: RadioInterfaceService, val address: Str
makeSimNodeInfo(MY_NODE + 1, 32.960758, -96.733521), // richardson
),
)
val packets =
arrayOf(
FromRadio(node_info_batch = batch),
FromRadio(config_complete_id = configId),
listOf(FromRadio(node_info_batch = batch), FromRadio(config_complete_id = configId)).forEach { p ->
service.handleFromRadio(p.encode())
}
// Simulate live traffic after handshake
// Simulate live traffic after the handshake has completed. Launched in a separate
// coroutine with a small delay so these packets arrive after onNodeDbReady() runs.
service.serviceScope.handledLaunch {
delay(200)
listOf(
makeTextMessage(MY_NODE + 1),
makeNeighborInfo(MY_NODE + 1),
makePosition(MY_NODE + 1),
makeTelemetry(MY_NODE + 1),
makeNodeStatus(MY_NODE + 1),
)
packets.forEach { p -> service.handleFromRadio(p.encode()) }
.forEach { p -> service.handleFromRadio(p.encode()) }
}
}
}

View file

@ -18,9 +18,9 @@ package org.meshtastic.core.repository
/**
* Shared constants for the two-stage mesh handshake protocol.
*
* Stage 1 (`CONFIG_NONCE`): requests device config, module config, and channels. Stage 2 (`BATCH_NODE_INFO_NONCE`):
* requests the full node database with batched NodeInfo delivery.
* - Stage 1 ([CONFIG_NONCE]): requests device config, module config, and channels.
* - Stage 2 ([BATCH_NODE_INFO_NONCE], primary): requests the full node database with batched [NodeInfoBatch] delivery.
* - Stage 2 ([NODE_INFO_NONCE], legacy): requests node info one-at-a-time; kept for firmware that pre-dates batching.
*
* Both [MeshConfigFlowManager] (consumer) and [MeshConnectionManager] (sender) reference these.
*/
@ -31,6 +31,8 @@ object HandshakeConstants {
/** Nonce sent in `want_config_id` to request node info only — unbatched legacy (Stage 2). */
const val NODE_INFO_NONCE = 69421
/** Nonce sent in `want_config_id` to request node info only — batched (Stage 2). */
// 69422 intentionally skipped — reserved for future use.
/** Nonce sent in `want_config_id` to request node info only — batched (Stage 2, primary). */
const val BATCH_NODE_INFO_NONCE = 69423
}

View file

@ -36,6 +36,16 @@ interface MeshConfigFlowManager {
/** Handles received node information. */
fun handleNodeInfo(info: NodeInfo)
/**
* Handles a batch of node information records delivered in a single [NodeInfoBatch] message.
*
* The default implementation simply delegates to [handleNodeInfo] for each item. Implementations should override
* this with a bulk `addAll` to avoid per-item overhead on large meshes.
*/
fun handleNodeInfoBatch(items: List<NodeInfo>) {
items.forEach { handleNodeInfo(it) }
}
/**
* Handles a [FileInfo] packet received during STATE_SEND_FILEMANIFEST.
*