diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/DataLayerHeartbeatSender.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/DataLayerHeartbeatSender.kt new file mode 100644 index 000000000..6ca10df26 --- /dev/null +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/DataLayerHeartbeatSender.kt @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.manager + +import co.touchlab.kermit.Logger +import kotlinx.atomicfu.atomic +import org.koin.core.annotation.Single +import org.meshtastic.core.repository.PacketHandler +import org.meshtastic.proto.Heartbeat +import org.meshtastic.proto.ToRadio + +/** + * Centralized heartbeat sender for the data layer. + * + * Consolidates heartbeat nonce management into a single monotonically increasing counter, preventing the firmware's + * per-connection duplicate-write filter (byte-level memcmp) from silently dropping consecutive heartbeats. + * + * This is distinct from [org.meshtastic.core.network.transport.HeartbeatSender], which operates at the transport layer + * with raw byte encoding. This class works at the protobuf/data layer through [PacketHandler]. + */ +@Single +class DataLayerHeartbeatSender(private val packetHandler: PacketHandler) { + private val nonce = atomic(0) + + /** + * Enqueues a heartbeat with a unique nonce. + * + * @param tag descriptive label for log messages (e.g. "pre-handshake", "inter-stage") + */ + @Suppress("TooGenericExceptionCaught") + fun sendHeartbeat(tag: String = "handshake") { + try { + val n = nonce.incrementAndGet() + packetHandler.sendToRadio(ToRadio(heartbeat = Heartbeat(nonce = n))) + Logger.d { "[$tag] Heartbeat enqueued (nonce=$n)" } + } catch (e: Exception) { + Logger.w(e) { "[$tag] Failed to enqueue heartbeat; proceeding" } + } + } +} diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt index b7b27aa4e..cc5cc4319 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt @@ -20,18 +20,17 @@ import co.touchlab.kermit.Logger import kotlinx.atomicfu.atomic import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay -import okio.IOException import org.koin.core.annotation.Named import org.koin.core.annotation.Single import org.meshtastic.core.common.util.handledLaunch import org.meshtastic.core.model.ConnectionState +import org.meshtastic.core.model.DeviceVersion import org.meshtastic.core.repository.CommandSender import org.meshtastic.core.repository.HandshakeConstants import org.meshtastic.core.repository.MeshConfigFlowManager import org.meshtastic.core.repository.MeshConnectionManager import org.meshtastic.core.repository.NodeManager import org.meshtastic.core.repository.NodeRepository -import org.meshtastic.core.repository.PacketHandler import org.meshtastic.core.repository.PlatformAnalytics import org.meshtastic.core.repository.RadioConfigRepository import org.meshtastic.core.repository.ServiceBroadcasts @@ -39,9 +38,7 @@ import org.meshtastic.core.repository.ServiceRepository import org.meshtastic.proto.DeviceMetadata import org.meshtastic.proto.FileInfo import org.meshtastic.proto.HardwareModel -import org.meshtastic.proto.Heartbeat import org.meshtastic.proto.NodeInfo -import org.meshtastic.proto.ToRadio import org.meshtastic.core.model.MyNodeInfo as SharedMyNodeInfo import org.meshtastic.proto.MyNodeInfo as ProtoMyNodeInfo @@ -56,7 +53,7 @@ class MeshConfigFlowManagerImpl( private val serviceBroadcasts: ServiceBroadcasts, private val analytics: PlatformAnalytics, private val commandSender: CommandSender, - private val packetHandler: PacketHandler, + private val heartbeatSender: DataLayerHeartbeatSender, @Named("ServiceScope") private val scope: CoroutineScope, ) : MeshConfigFlowManager { private val wantConfigDelay = 100L @@ -90,10 +87,8 @@ class MeshConfigFlowManagerImpl( * [myNodeInfo] was committed at the Stage 1→2 transition. [nodes] accumulates [NodeInfo] packets until * `config_complete_id` arrives. */ - data class ReceivingNodeInfo( - val myNodeInfo: SharedMyNodeInfo, - val nodes: MutableList = mutableListOf(), - ) : HandshakeState() + data class ReceivingNodeInfo(val myNodeInfo: SharedMyNodeInfo, val nodes: List = emptyList()) : + HandshakeState() /** Both stages finished. The app is fully connected. */ data class Complete(val myNodeInfo: SharedMyNodeInfo) : HandshakeState() @@ -139,28 +134,31 @@ class MeshConfigFlowManagerImpl( return } + // Warn if firmware is below the absolute minimum supported version. + // The UI layer already enforces this via FirmwareVersionCheck, so we just log here + // for diagnostics rather than hard-disconnecting. + finalizedInfo.firmwareVersion?.let { fwVersion -> + if (DeviceVersion(fwVersion) < DeviceVersion(DeviceVersion.ABS_MIN_FW_VERSION)) { + Logger.w { + "Firmware $fwVersion is below minimum ${DeviceVersion.ABS_MIN_FW_VERSION} — " + + "protocol incompatibilities may occur" + } + } + } + handshakeState = HandshakeState.ReceivingNodeInfo(myNodeInfo = finalizedInfo) Logger.i { "myNodeInfo committed (nodeNum=${finalizedInfo.myNodeNum})" } connectionManager.value.onRadioConfigLoaded() scope.handledLaunch { delay(wantConfigDelay) - sendHeartbeat() + heartbeatSender.sendHeartbeat("inter-stage") delay(wantConfigDelay) Logger.i { "Requesting NodeInfo (Stage 2)" } connectionManager.value.startNodeInfoOnly() } } - private fun sendHeartbeat() { - try { - packetHandler.sendToRadio(ToRadio(heartbeat = Heartbeat())) - Logger.d { "Heartbeat sent between nonce stages" } - } catch (ex: IOException) { - Logger.w(ex) { "Failed to send heartbeat; proceeding with node-info stage" } - } - } - private fun handleNodeInfoComplete(state: HandshakeState.ReceivingNodeInfo) { Logger.i { "NodeInfo complete (Stage 2)" } @@ -168,16 +166,12 @@ class MeshConfigFlowManagerImpl( // Transition state immediately (synchronously) to prevent duplicate handling. // The async work below (DB writes, broadcasts) proceeds without the guard. + // Because nodes is now immutable, no snapshot is needed — state.nodes IS the snapshot. + // Any stall-guard retry that re-enters handleNodeInfo will see Complete state and be ignored. handshakeState = HandshakeState.Complete(myNodeInfo = info) - // Snapshot and clear immediately so that a concurrent stall-guard retry (which - // resends want_config_id and causes the firmware to restart the node_info burst) - // starts accumulating into a fresh list rather than doubling this batch. - val nodesToProcess = state.nodes.toList() - state.nodes.clear() - val entities = - nodesToProcess.mapNotNull { nodeInfo -> + state.nodes.mapNotNull { nodeInfo -> nodeManager.installNodeInfo(nodeInfo, withBroadcast = false) nodeManager.nodeDBbyNodeNum[nodeInfo.num] ?: run { @@ -242,7 +236,7 @@ class MeshConfigFlowManagerImpl( override fun handleNodeInfo(info: NodeInfo) { val state = handshakeState if (state is HandshakeState.ReceivingNodeInfo) { - state.nodes.add(info) + handshakeState = state.copy(nodes = state.nodes + info) } else { Logger.w { "Ignoring NodeInfo outside Stage 2 (state=$state)" } } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt index 94b405953..a60dc85c5 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImpl.kt @@ -84,6 +84,7 @@ class MeshConnectionManagerImpl( private val packetRepository: PacketRepository, private val workerManager: MeshWorkerManager, private val appWidgetUpdater: AppWidgetUpdater, + private val heartbeatSender: DataLayerHeartbeatSender, @Named("ServiceScope") private val scope: CoroutineScope, ) : MeshConnectionManager { /** @@ -92,6 +93,7 @@ class MeshConnectionManagerImpl( */ private val connectionMutex = Mutex() + private var preHandshakeJob: Job? = null private var sleepTimeout: Job? = null private var locationRequestsJob: Job? = null private var handshakeTimeout: Job? = null @@ -172,6 +174,8 @@ class MeshConnectionManagerImpl( sleepTimeout?.cancel() sleepTimeout = null + preHandshakeJob?.cancel() + preHandshakeJob = null handshakeTimeout?.cancel() handshakeTimeout = null @@ -192,9 +196,19 @@ class MeshConnectionManagerImpl( serviceRepository.setConnectionState(ConnectionState.Connecting) } serviceBroadcasts.broadcastConnection() - Logger.i { "Starting mesh handshake (Stage 1)" } connectTimeMsec = nowMillis - startConfigOnly() + + // Send a wake-up heartbeat before the config request. The firmware may be in a + // power-saving state where the NimBLE callback context needs warming up. The 100ms + // delay ensures the heartbeat BLE write is enqueued before the want_config_id + // (sendToRadio is fire-and-forget through async coroutine launches). + preHandshakeJob = + scope.handledLaunch { + heartbeatSender.sendHeartbeat("pre-handshake") + delay(PRE_HANDSHAKE_SETTLE_MS) + Logger.i { "Starting mesh handshake (Stage 1)" } + startConfigOnly() + } } private fun startHandshakeStallGuard(stage: Int, action: () -> Unit) { @@ -381,6 +395,15 @@ class MeshConnectionManagerImpl( // cap, routers (ls_secs=3600) leave the UI in DeviceSleep for over an hour. private const val MAX_SLEEP_TIMEOUT_SECONDS = 300 + /** + * Delay between the pre-handshake heartbeat and the want_config_id send. + * + * Ensures the heartbeat BLE write completes and the firmware's NimBLE callback context is warmed up before the + * config request arrives. 100ms is well above observed ESP32 task scheduling latency (~10–50ms) while adding + * negligible connection latency. + */ + private const val PRE_HANDSHAKE_SETTLE_MS = 100L + private val HANDSHAKE_TIMEOUT = 30.seconds // Shorter window for the retry attempt: if the device genuinely didn't receive the diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImplTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImplTest.kt index e05c6f20a..fdcd8ed44 100644 --- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImplTest.kt +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImplTest.kt @@ -17,6 +17,7 @@ package org.meshtastic.core.data.manager import dev.mokkery.MockMode +import dev.mokkery.answering.calls import dev.mokkery.answering.returns import dev.mokkery.every import dev.mokkery.matcher.any @@ -97,7 +98,7 @@ class MeshConfigFlowManagerImplTest { serviceBroadcasts = serviceBroadcasts, analytics = analytics, commandSender = commandSender, - packetHandler = packetHandler, + heartbeatSender = DataLayerHeartbeatSender(packetHandler), scope = testScope, ) } @@ -174,6 +175,49 @@ class MeshConfigFlowManagerImplTest { verify { connectionManager.startNodeInfoOnly() } } + @Test + fun `Stage 1 complete sends heartbeat with non-zero nonce between stages`() = testScope.runTest { + val sentPackets = mutableListOf() + every { packetHandler.sendToRadio(any()) } calls + { call -> + sentPackets.add(call.arg(0)) + } + + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(metadata) + advanceUntilIdle() + + sentPackets.clear() // Clear any packets from prior phases + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceUntilIdle() + + val heartbeats = sentPackets.filter { it.heartbeat != null } + assertEquals(1, heartbeats.size, "Expected exactly one inter-stage heartbeat") + assertEquals( + true, + heartbeats[0].heartbeat!!.nonce != 0, + "Inter-stage heartbeat should have a non-zero nonce", + ) + } + + @Test + fun `Stage 1 complete with old firmware logs warning but continues handshake`() = testScope.runTest { + val oldMetadata = + DeviceMetadata(firmware_version = "2.3.0", hw_model = HardwareModel.HELTEC_V3, hasWifi = false) + manager.handleMyInfo(protoMyNodeInfo) + advanceUntilIdle() + manager.handleLocalMetadata(oldMetadata) + advanceUntilIdle() + + manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE) + advanceUntilIdle() + + // Handshake should still progress despite old firmware + verify { connectionManager.onRadioConfigLoaded() } + verify { connectionManager.startNodeInfoOnly() } + } + @Test fun `Stage 1 complete without metadata still succeeds with null firmware version`() = testScope.runTest { manager.handleMyInfo(protoMyNodeInfo) diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt index c6dfa7f43..07c8914ad 100644 --- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/manager/MeshConnectionManagerImplTest.kt @@ -129,6 +129,7 @@ class MeshConnectionManagerImplTest { packetRepository, workerManager, appWidgetUpdater, + DataLayerHeartbeatSender(packetHandler), scope, ) @@ -148,6 +149,59 @@ class MeshConnectionManagerImplTest { verify { serviceBroadcasts.broadcastConnection() } } + @Test + fun `Connected state sends pre-handshake heartbeat before config request`() = runTest(testDispatcher) { + val sentPackets = mutableListOf() + every { packetHandler.sendToRadio(any()) } calls + { call -> + sentPackets.add(call.arg(0)) + } + + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + // Advance past PRE_HANDSHAKE_SETTLE_MS (100ms) but NOT the 30s stall guard timeout + advanceTimeBy(200) + + // First ToRadio should be a heartbeat, second should be want_config_id + assertEquals(2, sentPackets.size, "Expected heartbeat + want_config_id, got ${sentPackets.size} packets") + val heartbeat = sentPackets[0] + val wantConfig = sentPackets[1] + + assertEquals(true, heartbeat.heartbeat != null, "First packet should be a heartbeat") + assertEquals(true, heartbeat.heartbeat!!.nonce != 0, "Heartbeat should have a non-zero nonce") + assertEquals( + org.meshtastic.core.repository.HandshakeConstants.CONFIG_NONCE, + wantConfig.want_config_id, + "Second packet should be want_config_id with CONFIG_NONCE", + ) + } + + @Test + fun `Disconnect during pre-handshake settle cancels config start`() = runTest(testDispatcher) { + val sentPackets = mutableListOf() + every { packetHandler.sendToRadio(any()) } calls + { call -> + sentPackets.add(call.arg(0)) + } + every { nodeManager.nodeDBbyNodeNum } returns emptyMap() + + manager = createManager(backgroundScope) + radioConnectionState.value = ConnectionState.Connected + // Advance only 50ms — within the 100ms settle window + advanceTimeBy(50) + + // Should have sent only the heartbeat so far, not want_config_id + assertEquals(1, sentPackets.size, "Only heartbeat should be sent before settle completes") + + // Disconnect before the settle delay completes — should cancel the pending config start + radioConnectionState.value = ConnectionState.Disconnected + advanceTimeBy(200) + + // The want_config_id should NOT have been sent because the job was cancelled + val configPackets = sentPackets.filter { it.want_config_id != null } + assertEquals(0, configPackets.size, "want_config_id should not be sent after disconnect") + } + @Test fun `Disconnected state stops services`() = runTest(testDispatcher) { every { nodeManager.nodeDBbyNodeNum } returns emptyMap()