mirror of
https://github.com/meshtastic/Meshtastic-Android.git
synced 2026-04-20 22:23:37 +00:00
fix: align BLE connection handshake with firmware protocol expectations (#5141)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
parent
96419f3251
commit
84621acb04
5 changed files with 199 additions and 30 deletions
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright (c) 2026 Meshtastic LLC
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.meshtastic.core.data.manager
|
||||
|
||||
import co.touchlab.kermit.Logger
|
||||
import kotlinx.atomicfu.atomic
|
||||
import org.koin.core.annotation.Single
|
||||
import org.meshtastic.core.repository.PacketHandler
|
||||
import org.meshtastic.proto.Heartbeat
|
||||
import org.meshtastic.proto.ToRadio
|
||||
|
||||
/**
|
||||
* Centralized heartbeat sender for the data layer.
|
||||
*
|
||||
* Consolidates heartbeat nonce management into a single monotonically increasing counter, preventing the firmware's
|
||||
* per-connection duplicate-write filter (byte-level memcmp) from silently dropping consecutive heartbeats.
|
||||
*
|
||||
* This is distinct from [org.meshtastic.core.network.transport.HeartbeatSender], which operates at the transport layer
|
||||
* with raw byte encoding. This class works at the protobuf/data layer through [PacketHandler].
|
||||
*/
|
||||
@Single
|
||||
class DataLayerHeartbeatSender(private val packetHandler: PacketHandler) {
|
||||
private val nonce = atomic(0)
|
||||
|
||||
/**
|
||||
* Enqueues a heartbeat with a unique nonce.
|
||||
*
|
||||
* @param tag descriptive label for log messages (e.g. "pre-handshake", "inter-stage")
|
||||
*/
|
||||
@Suppress("TooGenericExceptionCaught")
|
||||
fun sendHeartbeat(tag: String = "handshake") {
|
||||
try {
|
||||
val n = nonce.incrementAndGet()
|
||||
packetHandler.sendToRadio(ToRadio(heartbeat = Heartbeat(nonce = n)))
|
||||
Logger.d { "[$tag] Heartbeat enqueued (nonce=$n)" }
|
||||
} catch (e: Exception) {
|
||||
Logger.w(e) { "[$tag] Failed to enqueue heartbeat; proceeding" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -20,18 +20,17 @@ import co.touchlab.kermit.Logger
|
|||
import kotlinx.atomicfu.atomic
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.delay
|
||||
import okio.IOException
|
||||
import org.koin.core.annotation.Named
|
||||
import org.koin.core.annotation.Single
|
||||
import org.meshtastic.core.common.util.handledLaunch
|
||||
import org.meshtastic.core.model.ConnectionState
|
||||
import org.meshtastic.core.model.DeviceVersion
|
||||
import org.meshtastic.core.repository.CommandSender
|
||||
import org.meshtastic.core.repository.HandshakeConstants
|
||||
import org.meshtastic.core.repository.MeshConfigFlowManager
|
||||
import org.meshtastic.core.repository.MeshConnectionManager
|
||||
import org.meshtastic.core.repository.NodeManager
|
||||
import org.meshtastic.core.repository.NodeRepository
|
||||
import org.meshtastic.core.repository.PacketHandler
|
||||
import org.meshtastic.core.repository.PlatformAnalytics
|
||||
import org.meshtastic.core.repository.RadioConfigRepository
|
||||
import org.meshtastic.core.repository.ServiceBroadcasts
|
||||
|
|
@ -39,9 +38,7 @@ import org.meshtastic.core.repository.ServiceRepository
|
|||
import org.meshtastic.proto.DeviceMetadata
|
||||
import org.meshtastic.proto.FileInfo
|
||||
import org.meshtastic.proto.HardwareModel
|
||||
import org.meshtastic.proto.Heartbeat
|
||||
import org.meshtastic.proto.NodeInfo
|
||||
import org.meshtastic.proto.ToRadio
|
||||
import org.meshtastic.core.model.MyNodeInfo as SharedMyNodeInfo
|
||||
import org.meshtastic.proto.MyNodeInfo as ProtoMyNodeInfo
|
||||
|
||||
|
|
@ -56,7 +53,7 @@ class MeshConfigFlowManagerImpl(
|
|||
private val serviceBroadcasts: ServiceBroadcasts,
|
||||
private val analytics: PlatformAnalytics,
|
||||
private val commandSender: CommandSender,
|
||||
private val packetHandler: PacketHandler,
|
||||
private val heartbeatSender: DataLayerHeartbeatSender,
|
||||
@Named("ServiceScope") private val scope: CoroutineScope,
|
||||
) : MeshConfigFlowManager {
|
||||
private val wantConfigDelay = 100L
|
||||
|
|
@ -90,10 +87,8 @@ class MeshConfigFlowManagerImpl(
|
|||
* [myNodeInfo] was committed at the Stage 1→2 transition. [nodes] accumulates [NodeInfo] packets until
|
||||
* `config_complete_id` arrives.
|
||||
*/
|
||||
data class ReceivingNodeInfo(
|
||||
val myNodeInfo: SharedMyNodeInfo,
|
||||
val nodes: MutableList<NodeInfo> = mutableListOf(),
|
||||
) : HandshakeState()
|
||||
data class ReceivingNodeInfo(val myNodeInfo: SharedMyNodeInfo, val nodes: List<NodeInfo> = emptyList()) :
|
||||
HandshakeState()
|
||||
|
||||
/** Both stages finished. The app is fully connected. */
|
||||
data class Complete(val myNodeInfo: SharedMyNodeInfo) : HandshakeState()
|
||||
|
|
@ -139,28 +134,31 @@ class MeshConfigFlowManagerImpl(
|
|||
return
|
||||
}
|
||||
|
||||
// Warn if firmware is below the absolute minimum supported version.
|
||||
// The UI layer already enforces this via FirmwareVersionCheck, so we just log here
|
||||
// for diagnostics rather than hard-disconnecting.
|
||||
finalizedInfo.firmwareVersion?.let { fwVersion ->
|
||||
if (DeviceVersion(fwVersion) < DeviceVersion(DeviceVersion.ABS_MIN_FW_VERSION)) {
|
||||
Logger.w {
|
||||
"Firmware $fwVersion is below minimum ${DeviceVersion.ABS_MIN_FW_VERSION} — " +
|
||||
"protocol incompatibilities may occur"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
handshakeState = HandshakeState.ReceivingNodeInfo(myNodeInfo = finalizedInfo)
|
||||
Logger.i { "myNodeInfo committed (nodeNum=${finalizedInfo.myNodeNum})" }
|
||||
connectionManager.value.onRadioConfigLoaded()
|
||||
|
||||
scope.handledLaunch {
|
||||
delay(wantConfigDelay)
|
||||
sendHeartbeat()
|
||||
heartbeatSender.sendHeartbeat("inter-stage")
|
||||
delay(wantConfigDelay)
|
||||
Logger.i { "Requesting NodeInfo (Stage 2)" }
|
||||
connectionManager.value.startNodeInfoOnly()
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendHeartbeat() {
|
||||
try {
|
||||
packetHandler.sendToRadio(ToRadio(heartbeat = Heartbeat()))
|
||||
Logger.d { "Heartbeat sent between nonce stages" }
|
||||
} catch (ex: IOException) {
|
||||
Logger.w(ex) { "Failed to send heartbeat; proceeding with node-info stage" }
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleNodeInfoComplete(state: HandshakeState.ReceivingNodeInfo) {
|
||||
Logger.i { "NodeInfo complete (Stage 2)" }
|
||||
|
||||
|
|
@ -168,16 +166,12 @@ class MeshConfigFlowManagerImpl(
|
|||
|
||||
// Transition state immediately (synchronously) to prevent duplicate handling.
|
||||
// The async work below (DB writes, broadcasts) proceeds without the guard.
|
||||
// Because nodes is now immutable, no snapshot is needed — state.nodes IS the snapshot.
|
||||
// Any stall-guard retry that re-enters handleNodeInfo will see Complete state and be ignored.
|
||||
handshakeState = HandshakeState.Complete(myNodeInfo = info)
|
||||
|
||||
// Snapshot and clear immediately so that a concurrent stall-guard retry (which
|
||||
// resends want_config_id and causes the firmware to restart the node_info burst)
|
||||
// starts accumulating into a fresh list rather than doubling this batch.
|
||||
val nodesToProcess = state.nodes.toList()
|
||||
state.nodes.clear()
|
||||
|
||||
val entities =
|
||||
nodesToProcess.mapNotNull { nodeInfo ->
|
||||
state.nodes.mapNotNull { nodeInfo ->
|
||||
nodeManager.installNodeInfo(nodeInfo, withBroadcast = false)
|
||||
nodeManager.nodeDBbyNodeNum[nodeInfo.num]
|
||||
?: run {
|
||||
|
|
@ -242,7 +236,7 @@ class MeshConfigFlowManagerImpl(
|
|||
override fun handleNodeInfo(info: NodeInfo) {
|
||||
val state = handshakeState
|
||||
if (state is HandshakeState.ReceivingNodeInfo) {
|
||||
state.nodes.add(info)
|
||||
handshakeState = state.copy(nodes = state.nodes + info)
|
||||
} else {
|
||||
Logger.w { "Ignoring NodeInfo outside Stage 2 (state=$state)" }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,6 +84,7 @@ class MeshConnectionManagerImpl(
|
|||
private val packetRepository: PacketRepository,
|
||||
private val workerManager: MeshWorkerManager,
|
||||
private val appWidgetUpdater: AppWidgetUpdater,
|
||||
private val heartbeatSender: DataLayerHeartbeatSender,
|
||||
@Named("ServiceScope") private val scope: CoroutineScope,
|
||||
) : MeshConnectionManager {
|
||||
/**
|
||||
|
|
@ -92,6 +93,7 @@ class MeshConnectionManagerImpl(
|
|||
*/
|
||||
private val connectionMutex = Mutex()
|
||||
|
||||
private var preHandshakeJob: Job? = null
|
||||
private var sleepTimeout: Job? = null
|
||||
private var locationRequestsJob: Job? = null
|
||||
private var handshakeTimeout: Job? = null
|
||||
|
|
@ -172,6 +174,8 @@ class MeshConnectionManagerImpl(
|
|||
|
||||
sleepTimeout?.cancel()
|
||||
sleepTimeout = null
|
||||
preHandshakeJob?.cancel()
|
||||
preHandshakeJob = null
|
||||
handshakeTimeout?.cancel()
|
||||
handshakeTimeout = null
|
||||
|
||||
|
|
@ -192,9 +196,19 @@ class MeshConnectionManagerImpl(
|
|||
serviceRepository.setConnectionState(ConnectionState.Connecting)
|
||||
}
|
||||
serviceBroadcasts.broadcastConnection()
|
||||
Logger.i { "Starting mesh handshake (Stage 1)" }
|
||||
connectTimeMsec = nowMillis
|
||||
startConfigOnly()
|
||||
|
||||
// Send a wake-up heartbeat before the config request. The firmware may be in a
|
||||
// power-saving state where the NimBLE callback context needs warming up. The 100ms
|
||||
// delay ensures the heartbeat BLE write is enqueued before the want_config_id
|
||||
// (sendToRadio is fire-and-forget through async coroutine launches).
|
||||
preHandshakeJob =
|
||||
scope.handledLaunch {
|
||||
heartbeatSender.sendHeartbeat("pre-handshake")
|
||||
delay(PRE_HANDSHAKE_SETTLE_MS)
|
||||
Logger.i { "Starting mesh handshake (Stage 1)" }
|
||||
startConfigOnly()
|
||||
}
|
||||
}
|
||||
|
||||
private fun startHandshakeStallGuard(stage: Int, action: () -> Unit) {
|
||||
|
|
@ -381,6 +395,15 @@ class MeshConnectionManagerImpl(
|
|||
// cap, routers (ls_secs=3600) leave the UI in DeviceSleep for over an hour.
|
||||
private const val MAX_SLEEP_TIMEOUT_SECONDS = 300
|
||||
|
||||
/**
|
||||
* Delay between the pre-handshake heartbeat and the want_config_id send.
|
||||
*
|
||||
* Ensures the heartbeat BLE write completes and the firmware's NimBLE callback context is warmed up before the
|
||||
* config request arrives. 100ms is well above observed ESP32 task scheduling latency (~10–50ms) while adding
|
||||
* negligible connection latency.
|
||||
*/
|
||||
private const val PRE_HANDSHAKE_SETTLE_MS = 100L
|
||||
|
||||
private val HANDSHAKE_TIMEOUT = 30.seconds
|
||||
|
||||
// Shorter window for the retry attempt: if the device genuinely didn't receive the
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.meshtastic.core.data.manager
|
||||
|
||||
import dev.mokkery.MockMode
|
||||
import dev.mokkery.answering.calls
|
||||
import dev.mokkery.answering.returns
|
||||
import dev.mokkery.every
|
||||
import dev.mokkery.matcher.any
|
||||
|
|
@ -97,7 +98,7 @@ class MeshConfigFlowManagerImplTest {
|
|||
serviceBroadcasts = serviceBroadcasts,
|
||||
analytics = analytics,
|
||||
commandSender = commandSender,
|
||||
packetHandler = packetHandler,
|
||||
heartbeatSender = DataLayerHeartbeatSender(packetHandler),
|
||||
scope = testScope,
|
||||
)
|
||||
}
|
||||
|
|
@ -174,6 +175,49 @@ class MeshConfigFlowManagerImplTest {
|
|||
verify { connectionManager.startNodeInfoOnly() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Stage 1 complete sends heartbeat with non-zero nonce between stages`() = testScope.runTest {
|
||||
val sentPackets = mutableListOf<org.meshtastic.proto.ToRadio>()
|
||||
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } calls
|
||||
{ call ->
|
||||
sentPackets.add(call.arg(0))
|
||||
}
|
||||
|
||||
manager.handleMyInfo(protoMyNodeInfo)
|
||||
advanceUntilIdle()
|
||||
manager.handleLocalMetadata(metadata)
|
||||
advanceUntilIdle()
|
||||
|
||||
sentPackets.clear() // Clear any packets from prior phases
|
||||
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
|
||||
advanceUntilIdle()
|
||||
|
||||
val heartbeats = sentPackets.filter { it.heartbeat != null }
|
||||
assertEquals(1, heartbeats.size, "Expected exactly one inter-stage heartbeat")
|
||||
assertEquals(
|
||||
true,
|
||||
heartbeats[0].heartbeat!!.nonce != 0,
|
||||
"Inter-stage heartbeat should have a non-zero nonce",
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Stage 1 complete with old firmware logs warning but continues handshake`() = testScope.runTest {
|
||||
val oldMetadata =
|
||||
DeviceMetadata(firmware_version = "2.3.0", hw_model = HardwareModel.HELTEC_V3, hasWifi = false)
|
||||
manager.handleMyInfo(protoMyNodeInfo)
|
||||
advanceUntilIdle()
|
||||
manager.handleLocalMetadata(oldMetadata)
|
||||
advanceUntilIdle()
|
||||
|
||||
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
|
||||
advanceUntilIdle()
|
||||
|
||||
// Handshake should still progress despite old firmware
|
||||
verify { connectionManager.onRadioConfigLoaded() }
|
||||
verify { connectionManager.startNodeInfoOnly() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Stage 1 complete without metadata still succeeds with null firmware version`() = testScope.runTest {
|
||||
manager.handleMyInfo(protoMyNodeInfo)
|
||||
|
|
|
|||
|
|
@ -129,6 +129,7 @@ class MeshConnectionManagerImplTest {
|
|||
packetRepository,
|
||||
workerManager,
|
||||
appWidgetUpdater,
|
||||
DataLayerHeartbeatSender(packetHandler),
|
||||
scope,
|
||||
)
|
||||
|
||||
|
|
@ -148,6 +149,59 @@ class MeshConnectionManagerImplTest {
|
|||
verify { serviceBroadcasts.broadcastConnection() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Connected state sends pre-handshake heartbeat before config request`() = runTest(testDispatcher) {
|
||||
val sentPackets = mutableListOf<org.meshtastic.proto.ToRadio>()
|
||||
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } calls
|
||||
{ call ->
|
||||
sentPackets.add(call.arg(0))
|
||||
}
|
||||
|
||||
manager = createManager(backgroundScope)
|
||||
radioConnectionState.value = ConnectionState.Connected
|
||||
// Advance past PRE_HANDSHAKE_SETTLE_MS (100ms) but NOT the 30s stall guard timeout
|
||||
advanceTimeBy(200)
|
||||
|
||||
// First ToRadio should be a heartbeat, second should be want_config_id
|
||||
assertEquals(2, sentPackets.size, "Expected heartbeat + want_config_id, got ${sentPackets.size} packets")
|
||||
val heartbeat = sentPackets[0]
|
||||
val wantConfig = sentPackets[1]
|
||||
|
||||
assertEquals(true, heartbeat.heartbeat != null, "First packet should be a heartbeat")
|
||||
assertEquals(true, heartbeat.heartbeat!!.nonce != 0, "Heartbeat should have a non-zero nonce")
|
||||
assertEquals(
|
||||
org.meshtastic.core.repository.HandshakeConstants.CONFIG_NONCE,
|
||||
wantConfig.want_config_id,
|
||||
"Second packet should be want_config_id with CONFIG_NONCE",
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Disconnect during pre-handshake settle cancels config start`() = runTest(testDispatcher) {
|
||||
val sentPackets = mutableListOf<org.meshtastic.proto.ToRadio>()
|
||||
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } calls
|
||||
{ call ->
|
||||
sentPackets.add(call.arg(0))
|
||||
}
|
||||
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
|
||||
|
||||
manager = createManager(backgroundScope)
|
||||
radioConnectionState.value = ConnectionState.Connected
|
||||
// Advance only 50ms — within the 100ms settle window
|
||||
advanceTimeBy(50)
|
||||
|
||||
// Should have sent only the heartbeat so far, not want_config_id
|
||||
assertEquals(1, sentPackets.size, "Only heartbeat should be sent before settle completes")
|
||||
|
||||
// Disconnect before the settle delay completes — should cancel the pending config start
|
||||
radioConnectionState.value = ConnectionState.Disconnected
|
||||
advanceTimeBy(200)
|
||||
|
||||
// The want_config_id should NOT have been sent because the job was cancelled
|
||||
val configPackets = sentPackets.filter { it.want_config_id != null }
|
||||
assertEquals(0, configPackets.size, "want_config_id should not be sent after disconnect")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Disconnected state stops services`() = runTest(testDispatcher) {
|
||||
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue