fix: resolve bugs across connection, PKI, admin, packet flow, and stability subsystems (#5011)

This commit is contained in:
James Rich 2026-04-09 08:20:06 -05:00 committed by GitHub
parent cd9f1c0600
commit 60cc2f4237
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 413 additions and 45 deletions

View file

@ -23,6 +23,7 @@ package org.meshtastic.core.common.util
* - `%s`, `%d` positional or sequential string/integer
* - `%N$s`, `%N$d` explicit positional string/integer
* - `%N$.Nf`, `%.Nf` float with decimal precision
* - `%x`, `%X`, `%08x` hexadecimal (lower/upper, optional zero-padded width)
* - `%%` literal percent
*
* This avoids a dependency on `NSString.stringWithFormat` (which uses Obj-C `%@` conventions).
@ -57,7 +58,20 @@ actual fun formatString(pattern: String, vararg args: Any?): String = buildStrin
i = startPos // rewind — digits are part of width/precision, not positional index
}
// Parse optional flags/width (skip for now — not used in this codebase)
// Parse optional flags (zero-pad)
var zeroPad = false
if (i < pattern.length && pattern[i] == '0') {
zeroPad = true
i++
}
// Parse optional width
var width: Int? = null
val widthStart = i
while (i < pattern.length && pattern[i].isDigit()) i++
if (i > widthStart) {
width = pattern.substring(widthStart, i).toInt()
}
// Parse optional precision (.N)
var precision: Int? = null
@ -86,10 +100,24 @@ actual fun formatString(pattern: String, vararg args: Any?): String = buildStrin
val places = precision ?: DEFAULT_FLOAT_PRECISION
append(NumberFormatter.format(value, places))
}
'x',
'X',
-> {
val value = (arg as? Number)?.toLong() ?: 0L
// Mask to 32 bits when the original arg fits in an Int to match unsigned behaviour.
val masked = if (arg is Int) value and INT_MASK else value
var hex = masked.toString(HEX_RADIX)
if (conversion == 'X') hex = hex.uppercase()
val padChar = if (zeroPad) '0' else ' '
val padWidth = width ?: 0
append(hex.padStart(padWidth, padChar))
}
else -> {
// Unknown conversion — reproduce original token
append('%')
if (explicitIndex != null) append("${explicitIndex + 1}$")
if (zeroPad) append('0')
if (width != null) append(width)
if (precision != null) append(".$precision")
append(conversion)
}
@ -98,3 +126,5 @@ actual fun formatString(pattern: String, vararg args: Any?): String = buildStrin
}
private const val DEFAULT_FLOAT_PRECISION = 6
private const val HEX_RADIX = 16
private const val INT_MASK = 0xFFFFFFFFL

View file

@ -98,11 +98,12 @@ class CommandSenderImpl(
/**
* Resolves the correct channel index for sending a packet to [toNum].
*
* When both the local node and the destination support PKC, returns [DataPacket.PKC_CHANNEL_INDEX] so that
* [buildMeshPacket] enables PKI encryption. Otherwise falls back to the node's heard-on channel (for general
* packets) or the dedicated admin channel (for admin packets).
* PKI encryption ([DataPacket.PKC_CHANNEL_INDEX]) is only used for **admin** packets, where end-to-end encryption
* is appropriate. Protocol-level requests (traceroute, telemetry, position, nodeinfo, neighborinfo) must NOT use
* PKI because relay nodes need to read and/or modify the inner payload (e.g. traceroute appends each hop's node
* number). These requests fall back to the node's heard-on channel.
*/
private fun getChannelIndex(toNum: Int, isAdmin: Boolean = false): Int {
private fun getAdminChannelIndex(toNum: Int): Int {
val myNum = nodeManager.myNodeNum.value ?: return 0
val myNode = nodeManager.nodeDBbyNodeNum[myNum]
val destNode = nodeManager.nodeDBbyNodeNum[toNum]
@ -110,15 +111,18 @@ class CommandSenderImpl(
return when {
myNum == toNum -> 0
myNode?.hasPKC == true && destNode?.hasPKC == true -> DataPacket.PKC_CHANNEL_INDEX
isAdmin ->
else ->
channelSet.value.settings
.indexOfFirst { it.name.equals(ADMIN_CHANNEL_NAME, ignoreCase = true) }
.coerceAtLeast(0)
else -> destNode?.channel ?: 0
}
}
private fun getAdminChannelIndex(toNum: Int): Int = getChannelIndex(toNum, isAdmin = true)
/**
* Returns the heard-on channel for a non-admin request to [toNum]. Does NOT use PKI protocol-level requests need
* clear inner payloads.
*/
private fun getChannelIndex(toNum: Int): Int = nodeManager.nodeDBbyNodeNum[toNum]?.channel ?: 0
override fun sendData(p: DataPacket) {
if (p.id == 0) p.id = generatePacketId()

View file

@ -89,6 +89,12 @@ class FromRadioPacketHandlerImpl(
fileInfo != null -> router.value.configFlowManager.handleFileInfo(fileInfo)
xmodemPacket != null -> router.value.xmodemManager.handleIncomingXModem(xmodemPacket)
clientNotification != null -> handleClientNotification(clientNotification)
// Firmware rebooted without a transport-level disconnect (common on serial/TCP).
// Re-handshake immediately rather than waiting for the 30s stall guard.
proto.rebooted != null -> {
Logger.w { "Firmware rebooted (rebooted=${proto.rebooted}), re-initiating handshake" }
router.value.configFlowManager.triggerWantConfig()
}
}
}

View file

@ -248,6 +248,11 @@ class MeshActionHandlerImpl(
override fun handleSetRemoteConfig(id: Int, destNum: Int, payload: ByteArray) {
val c = Config.ADAPTER.decode(payload)
commandSender.sendAdmin(destNum, id) { AdminMessage(set_config = c) }
// When targeting the local node, optimistically persist the config so the
// UI reflects changes immediately (matching handleSetConfig behaviour).
if (destNum == nodeManager.myNodeNum.value) {
scope.handledLaunch { radioConfigRepository.setLocalConfig(c) }
}
}
override fun handleGetRemoteConfig(id: Int, destNum: Int, config: Int) {
@ -310,6 +315,11 @@ class MeshActionHandlerImpl(
if (payload != null) {
val c = Channel.ADAPTER.decode(payload)
commandSender.sendAdmin(destNum, id) { AdminMessage(set_channel = c) }
// When targeting the local node, optimistically persist the channel so
// the UI reflects changes immediately (matching handleSetChannel behaviour).
if (destNum == nodeManager.myNodeNum.value) {
scope.handledLaunch { radioConfigRepository.updateChannelSettings(c) }
}
}
}

View file

@ -17,6 +17,7 @@
package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import okio.IOException
@ -59,6 +60,9 @@ class MeshConfigFlowManagerImpl(
private lateinit var scope: CoroutineScope
private val wantConfigDelay = 100L
/** Monotonically increasing generation so async clears from a stale handshake are discarded. */
private val handshakeGeneration = atomic(0L)
override fun start(scope: CoroutineScope) {
this.scope = scope
}
@ -203,12 +207,18 @@ class MeshConfigFlowManagerImpl(
handshakeState = HandshakeState.ReceivingConfig(rawMyNodeInfo = myInfo)
nodeManager.setMyNodeNum(myInfo.my_node_num)
// Bump the generation so that a pending clear from a prior (interrupted) handshake
// will see a stale snapshot and skip its writes, preventing it from wiping config
// that was saved by this (newer) handshake's incoming packets.
val gen = handshakeGeneration.incrementAndGet()
// Clear persisted radio config so the new handshake starts from a clean slate.
// DataStore serializes its own writes, so the clear will precede subsequent
// setLocalConfig / updateChannelSettings calls dispatched by later packets in this
// session (handleFromRadio processes packets sequentially, so later dispatches always
// occur after this one returns).
scope.handledLaunch {
if (handshakeGeneration.value != gen) return@handledLaunch // Stale handshake; skip.
radioConfigRepository.clearChannelSet()
radioConfigRepository.clearLocalConfig()
radioConfigRepository.clearLocalModuleConfig()

View file

@ -205,6 +205,7 @@ class MeshConnectionManagerImpl(
private fun tearDownConnection() {
packetHandler.stopPacketQueue()
commandSender.setSessionPasskey(okio.ByteString.EMPTY) // Prevent stale passkey on reconnect.
locationManager.stop()
mqttManager.stop()
}
@ -227,8 +228,11 @@ class MeshConnectionManagerImpl(
scope.handledLaunch {
try {
val localConfig = radioConfigRepository.localConfigFlow.first()
val timeout = (localConfig.power?.ls_secs ?: 0) + DEVICE_SLEEP_TIMEOUT_SECONDS
Logger.d { "Waiting for sleeping device, timeout=$timeout secs" }
val rawTimeout = (localConfig.power?.ls_secs ?: 0) + DEVICE_SLEEP_TIMEOUT_SECONDS
// Cap the timeout so routers or power-saving configs (ls_secs=3600) don't
// leave the UI stuck in DeviceSleep for over an hour.
val timeout = rawTimeout.coerceAtMost(MAX_SLEEP_TIMEOUT_SECONDS)
Logger.d { "Waiting for sleeping device, timeout=$timeout secs (raw=$rawTimeout)" }
delay(timeout.seconds)
Logger.w { "Device timed out, setting disconnected" }
onConnectionChanged(ConnectionState.Disconnected)
@ -354,6 +358,12 @@ class MeshConnectionManagerImpl(
companion object {
private const val DEVICE_SLEEP_TIMEOUT_SECONDS = 30
// Maximum time (in seconds) to wait for a sleeping device before declaring it
// disconnected, regardless of the device's ls_secs configuration. Without this
// cap, routers (ls_secs=3600) leave the UI in DeviceSleep for over an hour.
private const val MAX_SLEEP_TIMEOUT_SECONDS = 300
private val HANDSHAKE_TIMEOUT = 30.seconds
// Shorter window for the retry attempt: if the device genuinely didn't receive the

View file

@ -41,6 +41,7 @@ import org.meshtastic.proto.FromRadio
import org.meshtastic.proto.LogRecord
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.PortNum
import kotlin.concurrent.Volatile
import kotlin.uuid.Uuid
/** Implementation of [MeshMessageProcessor] that handles raw radio messages and prepares mesh packets for routing. */
@ -59,6 +60,13 @@ class MeshMessageProcessorImpl(
private val logUuidByPacketId = mutableMapOf<Int, String>()
private val logInsertJobByPacketId = mutableMapOf<Int, Job>()
/**
* Epoch-millisecond timestamp of the last local-node `lastHeard` DB write. Used to throttle updates to at most once
* per [LOCAL_NODE_REFRESH_INTERVAL_MS] so that high-frequency FromRadio variants (log records, queue status) don't
* flood the DB.
*/
@Volatile private var lastLocalNodeRefreshMs = 0L
private val earlyMutex = Mutex()
private val earlyReceivedPackets = kotlin.collections.ArrayDeque<MeshPacket>()
private val maxEarlyPacketBuffer = 10240
@ -95,6 +103,9 @@ class MeshMessageProcessorImpl(
}
private fun processFromRadio(proto: FromRadio, myNodeNum: Int?) {
// Any decoded FromRadio proves the radio link is alive — keep the local node fresh.
refreshLocalNodeLastHeard()
// Audit log every incoming variant
logVariant(proto)
@ -253,5 +264,33 @@ class MeshMessageProcessorImpl(
}
}
/**
* Refreshes the local node's [Node.lastHeard] to prove the radio link is alive.
*
* Without this, [lastHeard] is only set when a [MeshPacket] arrives from another node (see
* [processReceivedMeshPacket]). On a quiet mesh the heartbeat cycle still exchanges data with the firmware (ToRadio
* heartbeat FromRadio queueStatus every 30 s), but that data never touched [lastHeard], causing the local node to
* appear stale in the UI even though the connection is healthy.
*
* To avoid flooding the DB on high-frequency variants (log records arrive many times per second when debug logging
* is enabled), writes are throttled to at most once per [LOCAL_NODE_REFRESH_INTERVAL_MS].
*/
private fun refreshLocalNodeLastHeard() {
val now = nowMillis
if (now - lastLocalNodeRefreshMs < LOCAL_NODE_REFRESH_INTERVAL_MS) return
lastLocalNodeRefreshMs = now
val myNum = nodeManager.myNodeNum.value ?: return
nodeManager.updateNode(myNum, withBroadcast = false) { node: Node -> node.copy(lastHeard = nowSeconds.toInt()) }
}
private fun insertMeshLog(log: MeshLog): Job = scope.handledLaunch { meshLogRepository.value.insert(log) }
companion object {
/**
* Minimum interval between local-node `lastHeard` DB writes, in milliseconds. Aligned with the heartbeat
* interval (30 s) so that one write per heartbeat cycle keeps the node fresh without unnecessary DB churn.
*/
private const val LOCAL_NODE_REFRESH_INTERVAL_MS = 30_000L
}
}

View file

@ -110,6 +110,7 @@ class PacketHandlerImpl(
override fun sendToRadio(packet: MeshPacket) {
scope.launch {
queueMutex.withLock {
queueStopped = false // Allow queue to resume after a disconnect/reconnect cycle.
queuedPackets.add(packet)
startPacketQueueLocked()
}
@ -123,6 +124,7 @@ class PacketHandlerImpl(
val deferred = CompletableDeferred<Boolean>()
responseMutex.withLock { queueResponse[packet.id] = deferred }
queueMutex.withLock {
queueStopped = false // Allow queue to resume after a disconnect/reconnect cycle.
queuedPackets.add(packet)
startPacketQueueLocked()
}
@ -199,15 +201,18 @@ class PacketHandlerImpl(
Logger.d { "queueJob packet id=${packet.id.toUInt()} success $success" }
} catch (e: TimeoutCancellationException) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} timeout" }
// Clean up the deferred for this packet. sendToRadioAndAwait callers
// also clean up in their own finally block (idempotent remove).
responseMutex.withLock { queueResponse.remove(packet.id) }
} catch (e: CancellationException) {
throw e // Preserve structured concurrency cancellation propagation.
} catch (e: Exception) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} failed" }
responseMutex.withLock { queueResponse.remove(packet.id) }
}
// Do NOT remove from queueResponse here. Removal is owned by:
// - handleQueueStatus (normal completion path)
// - sendToRadioAndAwait's finally block (for await-style callers)
// - stopPacketQueue (bulk cleanup on disconnect)
// Deferred cleanup is now handled in the catch blocks above.
// handleQueueStatus (normal success) and stopPacketQueue (bulk cleanup)
// also remove entries, and these removals are idempotent.
}
} finally {
// Hold queueMutex so that clearing queueJob and the restart decision are

View file

@ -21,6 +21,7 @@ import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.repository.PacketHandler
import org.meshtastic.core.repository.XModemFile
import org.meshtastic.core.repository.XModemManager
@ -59,6 +60,8 @@ class XModemManagerImpl(private val packetHandler: PacketHandler) : XModemManage
@Volatile private var transferName = ""
@Volatile private var expectedSeq = INITIAL_SEQ
@Volatile private var lastActivityMillis = 0L
private val blocks = mutableListOf<ByteArray>()
override fun setTransferName(name: String) {
@ -66,6 +69,17 @@ class XModemManagerImpl(private val packetHandler: PacketHandler) : XModemManage
}
override fun handleIncomingXModem(packet: XModem) {
// If blocks have accumulated but no activity for INACTIVITY_TIMEOUT_MS,
// the previous transfer is stale (firmware crash, BLE disconnect, etc.).
if (blocks.isNotEmpty() && lastActivityMillis > 0L) {
val elapsed = nowMillis - lastActivityMillis
if (elapsed > INACTIVITY_TIMEOUT_MS) {
Logger.w { "XModem: inactivity timeout (${elapsed}ms) — resetting stale transfer" }
reset()
}
}
lastActivityMillis = nowMillis
when (packet.control) {
XModem.Control.SOH,
XModem.Control.STX,
@ -135,6 +149,7 @@ class XModemManagerImpl(private val packetHandler: PacketHandler) : XModemManage
expectedSeq = INITIAL_SEQ
blocks.clear()
transferName = ""
lastActivityMillis = 0L
}
// CRC-CCITT: polynomial 0x1021, initial value 0x0000 (XModem variant)
@ -157,5 +172,6 @@ class XModemManagerImpl(private val packetHandler: PacketHandler) : XModemManage
private const val CTRLZ = 0x1A.toByte()
private const val CRC_POLY = 0x1021
private const val BITS_PER_BYTE = 8
private const val INACTIVITY_TIMEOUT_MS = 30_000L
}
}

View file

@ -28,6 +28,7 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.model.ConnectionState
@ -267,4 +268,46 @@ class MeshConnectionManagerImplTest {
verify { mqttManager.start(any(), true, true) }
verify { historyManager.requestHistoryReplay(any(), any(), any(), any()) }
}
@Test
fun `DeviceSleep timeout is capped at MAX_SLEEP_TIMEOUT_SECONDS for high ls_secs`() = runTest(testDispatcher) {
// Router with ls_secs=3600 — previously this created a 3630s timeout.
// With the cap, it should be clamped to 300s.
val config =
LocalConfig(
power = Config.PowerConfig(is_power_saving = true, ls_secs = 3600),
device = Config.DeviceConfig(role = Config.DeviceConfig.Role.ROUTER),
)
every { radioConfigRepository.localConfigFlow } returns flowOf(config)
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } returns Unit
every { serviceNotifications.updateServiceStateNotification(any(), any()) } returns Unit
every { packetHandler.stopPacketQueue() } returns Unit
every { locationManager.stop() } returns Unit
every { mqttManager.stop() } returns Unit
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
manager.start(backgroundScope)
advanceUntilIdle()
// Transition to Connected then DeviceSleep
radioConnectionState.value = ConnectionState.Connected
advanceUntilIdle()
radioConnectionState.value = ConnectionState.DeviceSleep
advanceUntilIdle()
assertEquals(
ConnectionState.DeviceSleep,
serviceRepository.connectionState.value,
"Should be in DeviceSleep initially",
)
// Advance 300 seconds (the cap) + 1 second to trigger the timeout.
advanceTimeBy(301_000L)
assertEquals(
ConnectionState.Disconnected,
serviceRepository.connectionState.value,
"Should transition to Disconnected after capped timeout (300s), not the raw 3630s",
)
}
}

View file

@ -23,6 +23,7 @@ import androidx.datastore.preferences.core.edit
import androidx.datastore.preferences.core.intPreferencesKey
import androidx.datastore.preferences.core.longPreferencesKey
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
@ -135,10 +136,12 @@ open class DatabaseManager(
// Also mark the previous DB as used "just now" so LRU has an accurate, recent timestamp
previousDbName?.let { markLastUsed(it) }
// Now safe to close the previous DB — collectors have switched to the new instance.
if (previousDbName != null && previousDbName != dbName) {
closeCachedDatabase(previousDbName)
}
// Do NOT close the previous DB synchronously here. Even though _currentDb has been
// updated, in-flight `withDb` calls may still hold a reference to the old database
// (captured before the emission). Closing the connection pool while those queries are
// executing causes "Connection pool is closed" crashes. Instead, let LRU eviction
// (enforceCacheLimit) handle cleanup — it only runs on databases that are not the
// active target and have not been used recently.
// Defer LRU eviction so switch is not blocked by filesystem work
managerScope.launch(dispatchers.io) { enforceCacheLimit(activeDbName = dbName) }
@ -167,11 +170,26 @@ open class DatabaseManager(
private val limitedIo = dispatchers.io.limitedParallelism(4)
/** Execute [block] with the current DB instance. */
@Suppress("TooGenericExceptionCaught")
override suspend fun <T> withDb(block: suspend (MeshtasticDatabase) -> T): T? = withContext(limitedIo) {
val db = _currentDb.value ?: return@withContext null
val active = buildDbName(_currentAddress.value)
markLastUsed(active)
block(db)
try {
block(db)
} catch (e: CancellationException) {
throw e // Preserve structured concurrency cancellation propagation.
} catch (e: Exception) {
// If the connection pool was closed between capturing `db` and executing the query
// (e.g., during a database switch), retry once with the current DB instance.
if (e.message?.contains("Connection pool is closed") == true) {
Logger.w { "withDb: connection pool closed, retrying with current DB" }
val retryDb = _currentDb.value ?: return@withContext null
block(retryDb)
} else {
throw e
}
}
}
/** Returns true if a database exists for the given device address. */

View file

@ -289,6 +289,7 @@ interface NodeInfoDao {
@Upsert suspend fun doUpsert(node: NodeEntity)
@Transaction
suspend fun upsert(node: NodeEntity) {
val verifiedNode = getVerifiedNodeForUpsert(node)
doUpsert(verifiedNode)

View file

@ -57,8 +57,8 @@ constructor(
if (nodeNums.isEmpty()) return
nodeRepository.deleteNodes(nodeNums)
val packetId = radioController.getPacketId()
for (nodeNum in nodeNums) {
val packetId = radioController.getPacketId()
radioController.removeByNodenum(packetId, nodeNum)
}
}

View file

@ -22,6 +22,8 @@ import org.meshtastic.core.network.repository.SerialConnection
import org.meshtastic.core.network.repository.SerialConnectionListener
import org.meshtastic.core.network.repository.UsbRepository
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import java.util.concurrent.atomic.AtomicReference
/** An interface that assumes we are talking to a meshtastic device via USB serial */
@ -119,7 +121,14 @@ class SerialInterface(
}
override fun keepAlive() {
Logger.d { "[$address] Serial keepAlive" }
// Send a ToRadio heartbeat so the firmware resets its idle timer and responds with
// a FromRadio queueStatus — proving the serial link is alive. Without this, the
// serial transport has no way to detect a silently dead device (battery depleted,
// firmware crash without the `rebooted` flag). The queueStatus response also feeds
// into MeshMessageProcessorImpl.refreshLocalNodeLastHeard() to keep the local
// node's lastHeard timestamp current.
Logger.d { "[$address] Serial keepAlive — sending heartbeat" }
handleSendToRadio(ToRadio(heartbeat = Heartbeat()).encode())
}
override fun sendBytes(p: ByteArray) {

View file

@ -65,6 +65,18 @@ private const val CONNECTION_TIMEOUT_MS = 15_000L
private const val RECONNECT_FAILURE_THRESHOLD = 3
private const val RECONNECT_BASE_DELAY_MS = 5_000L
private const val RECONNECT_MAX_DELAY_MS = 60_000L
private const val RECONNECT_MAX_FAILURES = 10
/**
* Minimum milliseconds a BLE connection must stay up before we consider it "stable" and reset
* [BleRadioInterface.consecutiveFailures]. Without this, a device at the edge of BLE range can repeatedly connect for a
* fraction of a second and drop each brief connection resets the failure counter so [RECONNECT_FAILURE_THRESHOLD] is
* never reached, and the app never signals [ConnectionState.DeviceSleep].
*
* The value (5 s) is long enough that only connections that survive past the initial GATT setup are treated as genuine,
* but short enough that normal reconnects after light-sleep still reset the counter promptly.
*/
private const val MIN_STABLE_CONNECTION_MS = 5_000L
/**
* Returns the reconnect backoff delay in milliseconds for a given consecutive failure count.
@ -181,7 +193,7 @@ class BleRadioInterface(
throw RadioNotConnectedException("Device not found at address $address")
}
@Suppress("LongMethod")
@Suppress("LongMethod", "CyclomaticComplexMethod")
private fun connect() {
connectionJob =
connectionScope.launch {
@ -231,8 +243,9 @@ class BleRadioInterface(
throw RadioNotConnectedException("Failed to connect to device at address $address")
}
// Connection succeeded — reset failure counter
consecutiveFailures = 0
// Connection succeeded — only reset the failure counter if the
// connection stays up long enough. See MIN_STABLE_CONNECTION_MS.
val gattConnectedAt = nowMillis
isFullyConnected = true
onConnected()
@ -257,6 +270,39 @@ class BleRadioInterface(
}
Logger.i { "[$address] BLE connection dropped, preparing to reconnect" }
// Only reset the failure counter if the connection was stable (lasted
// longer than MIN_STABLE_CONNECTION_MS). A connection that drops within
// seconds typically means the device is at the edge of BLE range or
// powered off — the Android BLE stack may briefly "connect" to a cached
// GATT profile before realising the device is gone. Without this guard,
// the failure counter resets on every brief connect, preventing us from
// ever reaching RECONNECT_FAILURE_THRESHOLD and signalling DeviceSleep.
val connectionUptime = nowMillis - gattConnectedAt
if (connectionUptime >= MIN_STABLE_CONNECTION_MS) {
consecutiveFailures = 0
} else {
consecutiveFailures++
Logger.w {
"[$address] Connection lasted only ${connectionUptime}ms " +
"(< ${MIN_STABLE_CONNECTION_MS}ms) — treating as failure " +
"(consecutive failures: $consecutiveFailures)"
}
if (consecutiveFailures >= RECONNECT_MAX_FAILURES) {
Logger.e { "[$address] Giving up after $consecutiveFailures unstable connections" }
service.onDisconnect(
isPermanent = true,
errorMessage = "Device unreachable (unstable connection)",
)
return@launch
}
if (consecutiveFailures >= RECONNECT_FAILURE_THRESHOLD) {
service.onDisconnect(
isPermanent = false,
errorMessage = "Device unreachable (unstable connection)",
)
}
}
} catch (e: kotlinx.coroutines.CancellationException) {
Logger.d { "[$address] BLE connection coroutine cancelled" }
throw e
@ -268,10 +314,19 @@ class BleRadioInterface(
"(consecutive failures: $consecutiveFailures)"
}
// At the failure threshold, signal DeviceSleep so MeshConnectionManagerImpl can
// start its sleep timeout. Use == (not >=) to fire exactly once; repeated
// onDisconnect signals would reset upstream state machines unnecessarily.
if (consecutiveFailures == RECONNECT_FAILURE_THRESHOLD) {
// After exceeding the max failure limit, give up permanently to stop
// draining battery on a device that is genuinely offline. The user
// must manually reconnect from the connections screen.
if (consecutiveFailures >= RECONNECT_MAX_FAILURES) {
Logger.e { "[$address] Giving up after $consecutiveFailures consecutive failures" }
val (_, msg) = e.toDisconnectReason()
service.onDisconnect(isPermanent = true, errorMessage = msg)
return@launch
}
// At the failure threshold, signal DeviceSleep so
// MeshConnectionManagerImpl can start its sleep timeout.
if (consecutiveFailures >= RECONNECT_FAILURE_THRESHOLD) {
handleFailure(e)
}
@ -312,10 +367,11 @@ class BleRadioInterface(
"Packets RX: $packetsReceived ($bytesReceived bytes), " +
"Packets TX: $packetsSent ($bytesSent bytes)"
}
// Do NOT call service.onDisconnect() here. The reconnect while-loop handles retries
// internally. Emitting DeviceSleep on every transient disconnect creates competing state
// transitions with MeshConnectionManagerImpl's sleep timeout. Instead, handleFailure()
// is called from the catch block after RECONNECT_FAILURE_THRESHOLD consecutive failures.
// Signal DeviceSleep immediately so the UI reflects the disconnect while the
// reconnect loop continues in the background. The previous approach suppressed
// this signal until RECONNECT_FAILURE_THRESHOLD consecutive failures, leaving the
// UI stuck on "Connected" for 35+ seconds after the device disappeared.
service.onDisconnect(isPermanent = false)
}
private suspend fun discoverServicesAndSetupCharacteristics() {

View file

@ -17,6 +17,8 @@
package org.meshtastic.core.network.radio
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
@ -124,4 +126,52 @@ class BleRadioInterfaceTest {
// Cancel the reconnect loop so runTest can complete.
bleInterface.close()
}
/**
* After [RECONNECT_MAX_FAILURES] (10) consecutive failures, the reconnect loop should stop and signal a permanent
* disconnect. This prevents infinite battery drain when the device is genuinely offline.
*
* Time budget for 10 failures with bonded device (no scan): Each iteration = 1s settle + connectAndAwait throw +
* backoff Backoffs: 5s, 10s, 20s, 40s, 60s, 60s, 60s, 60s, 60s, (exit at failure 10 before backoff) Total 10×1s
* settle + 5+10+20+40+60+60+60+60+60 = 10 + 375 = 385s 385_000ms We use a generous 400_000ms to cover any timing
* variance.
*/
@Test
fun `reconnect loop stops after RECONNECT_MAX_FAILURES with permanent disconnect`() = runTest {
val device = FakeBleDevice(address = address, name = "Test Device")
bluetoothRepository.bond(device)
connection.connectException = RadioNotConnectedException("simulated failure")
every { service.onDisconnect(any(), any()) } returns Unit
val bleInterface =
BleRadioInterface(
serviceScope = this,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
service = service,
address = address,
)
// Advance enough time for all 10 failures to occur.
advanceTimeBy(400_001L)
// Should have been called with isPermanent=true at least once (the final call).
verify { service.onDisconnect(isPermanent = true, errorMessage = any()) }
bleInterface.close()
}
@Test
fun `computeReconnectBackoffMs returns correct backoff values`() {
assertEquals(5_000L, computeReconnectBackoffMs(0))
assertEquals(5_000L, computeReconnectBackoffMs(1))
assertEquals(10_000L, computeReconnectBackoffMs(2))
assertEquals(20_000L, computeReconnectBackoffMs(3))
assertEquals(40_000L, computeReconnectBackoffMs(4))
assertEquals(60_000L, computeReconnectBackoffMs(5))
assertEquals(60_000L, computeReconnectBackoffMs(10))
assertEquals(60_000L, computeReconnectBackoffMs(100))
}
}

View file

@ -25,6 +25,8 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.meshtastic.core.network.radio.StreamInterface
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import java.io.File
/**
@ -137,7 +139,11 @@ private constructor(
}
override fun keepAlive() {
// Not specifically needed for raw serial unless implemented
// Send a ToRadio heartbeat so the firmware resets its idle timer and responds with
// a FromRadio queueStatus — proving the serial link is alive. Without this, the
// serial transport has no way to detect a silently dead device.
Logger.d { "[$portName] Serial keepAlive — sending heartbeat" }
handleSendToRadio(ToRadio(heartbeat = Heartbeat()).encode())
}
private fun closePortResources() {

View file

@ -53,6 +53,7 @@ import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioPrefs
import org.meshtastic.core.repository.RadioTransport
import org.meshtastic.core.repository.RadioTransportFactory
import kotlin.concurrent.Volatile
/**
* Shared multiplatform connection orchestrator for Meshtastic radios.
@ -107,8 +108,16 @@ class SharedRadioInterfaceService(
private var heartbeatJob: kotlinx.coroutines.Job? = null
private var lastHeartbeatMillis = 0L
@Volatile private var lastDataReceivedMillis = 0L
companion object {
private const val HEARTBEAT_INTERVAL_MILLIS = 30 * 1000L
// If we haven't received any data from the radio within this window after sending a
// heartbeat while the connection is nominally "Connected", the connection is likely a
// zombie (BLE stack didn't report disconnect). Two missed heartbeat intervals gives
// the firmware a reasonable window to respond or send telemetry.
private const val LIVENESS_TIMEOUT_MILLIS = HEARTBEAT_INTERVAL_MILLIS * 2
}
private val initLock = Mutex()
@ -245,15 +254,36 @@ class SharedRadioInterfaceService(
private fun startHeartbeat() {
heartbeatJob?.cancel()
lastDataReceivedMillis = nowMillis
heartbeatJob =
serviceScope.launch {
while (true) {
delay(HEARTBEAT_INTERVAL_MILLIS)
keepAlive()
checkLiveness()
}
}
}
/**
* Detects zombie connections where the BLE stack didn't report a disconnect.
*
* If we believe we're connected but haven't received any data from the radio within [LIVENESS_TIMEOUT_MILLIS], the
* connection is likely dead. Signal a non-permanent disconnect so the reconnect machinery can take over.
*/
private fun checkLiveness() {
if (_connectionState.value != ConnectionState.Connected) return
val silenceMs = nowMillis - lastDataReceivedMillis
if (silenceMs > LIVENESS_TIMEOUT_MILLIS) {
Logger.w {
"Liveness check failed: no data received for ${silenceMs}ms " +
"(threshold: ${LIVENESS_TIMEOUT_MILLIS}ms). Treating as disconnect."
}
onDisconnect(isPermanent = false, errorMessage = "Connection timeout — no data received")
}
}
fun keepAlive(now: Long = nowMillis) {
if (now - lastHeartbeatMillis > HEARTBEAT_INTERVAL_MILLIS) {
radioIf?.keepAlive()
@ -271,6 +301,7 @@ class SharedRadioInterfaceService(
@Suppress("TooGenericExceptionCaught")
override fun handleFromRadio(bytes: ByteArray) {
try {
lastDataReceivedMillis = nowMillis
processLifecycle.coroutineScope.launch(dispatchers.io) { _receivedData.emit(bytes) }
_meshActivity.tryEmit(MeshActivity.Receive)
} catch (t: Throwable) {
@ -283,6 +314,7 @@ class SharedRadioInterfaceService(
// launching a coroutine. The async launch pattern introduced a window where a concurrent
// onDisconnect launch could execute AFTER an onConnect launch, leaving the service stuck
// in Connected while the transport was actually disconnected.
lastDataReceivedMillis = nowMillis
if (_connectionState.value != ConnectionState.Connected) {
Logger.d { "Broadcasting connection state change to Connected" }
_connectionState.value = ConnectionState.Connected

View file

@ -21,6 +21,7 @@ import androidx.lifecycle.viewModelScope
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
@ -121,7 +122,11 @@ class FirmwareUpdateViewModel(
override fun onCleared() {
super.onCleared()
viewModelScope.launch { tempFirmwareFile = cleanupTemporaryFiles(fileHandler, tempFirmwareFile) }
// viewModelScope is already cancelled when onCleared() runs, so use a standalone scope
// for fire-and-forget cleanup of temporary firmware files.
kotlinx.coroutines.CoroutineScope(NonCancellable).launch {
tempFirmwareFile = cleanupTemporaryFiles(fileHandler, tempFirmwareFile)
}
}
fun setReleaseType(type: FirmwareReleaseType) {

View file

@ -82,6 +82,7 @@ open class BaseMapViewModel(
.getWaypoints()
.mapLatest { list ->
list
.filter { it.waypoint != null }
.associateBy { packet -> packet.waypoint!!.id }
.filterValues {
val expire = it.waypoint?.expire ?: 0

View file

@ -123,7 +123,7 @@ class NodeDetailViewModel(
/** Returns the type-safe navigation route for a direct message to this node. */
fun getDirectMessageRoute(node: Node, ourNode: Node?): String {
val hasPKC = ourNode?.hasPKC == true
val hasPKC = ourNode?.hasPKC == true && node.hasPKC
val channel = if (hasPKC) DataPacket.PKC_CHANNEL_INDEX else node.channel
return "${channel}${node.user.id}"
}

View file

@ -142,7 +142,7 @@ class LogSearchManager {
return filteredLogs
.flatMapIndexed { logIndex, log ->
searchText.split(" ").flatMap { term ->
val escapedTerm = term // Simple regex escape or just use contains
val escapedTerm = Regex.escape(term)
val regex = escapedTerm.toRegex(RegexOption.IGNORE_CASE)
val messageMatches =
regex.findAll(log.logMessage).map {

View file

@ -585,7 +585,12 @@ open class RadioConfigViewModel(
val route = radioConfigState.value.route
when (result) {
is RadioResponseResult.Error -> sendError(result.message)
is RadioResponseResult.Error -> {
sendError(result.message)
// Abort the AdminRoute flow — do not fire the destructive action
// (reboot/shutdown/factory_reset) if the metadata preflight failed.
return
}
is RadioResponseResult.Success -> {
if (route.isEmpty()) {
val data = packet.decoded!!
@ -705,6 +710,12 @@ open class RadioConfigViewModel(
}
}
// Routing ACKs (Success) share the same request_id as the upcoming ADMIN_APP response.
// Removing the id here would cause the actual admin response to be silently dropped,
// because processRadioResponseUseCase checks `request_id in requestIds`.
// The Success branch already handles its own id removal when route is empty (set flow).
if (result is RadioResponseResult.Success) return
if (AdminRoute.entries.any { it.name == route }) {
sendAdminRequest(destNum)
}

View file

@ -233,13 +233,15 @@ class RadioConfigViewModelTest {
}
@Test
fun `setResponseStateLoading for REBOOT calls useCase after packet response`() = runTest {
fun `setResponseStateLoading for REBOOT calls useCase after config response`() = runTest {
val node = Node(num = 123, user = User(id = "!123"))
nodeRepository.setNodes(listOf(node))
val packetFlow = MutableSharedFlow<MeshPacket>()
every { serviceRepository.meshPacketFlow } returns packetFlow
every { processRadioResponseUseCase(any(), any(), any()) } returns RadioResponseResult.Success
// AdminRoute first sends a session key config request; the admin action fires
// only after the actual ConfigResponse (not a routing ACK / Success).
every { processRadioResponseUseCase(any(), any(), any()) } returns RadioResponseResult.ConfigResponse(Config())
viewModel = createViewModel()
@ -247,20 +249,22 @@ class RadioConfigViewModelTest {
viewModel.setResponseStateLoading(AdminRoute.REBOOT)
// Emit a packet to trigger processPacketResponse -> sendAdminRequest
// Emit a config response packet to trigger processPacketResponse -> sendAdminRequest
packetFlow.emit(MeshPacket())
verifySuspend { adminActionsUseCase.reboot(123) }
}
@Test
fun `setResponseStateLoading for FACTORY_RESET calls useCase after packet response`() = runTest {
fun `setResponseStateLoading for FACTORY_RESET calls useCase after config response`() = runTest {
val node = Node(num = 123, user = User(id = "!123"))
nodeRepository.setNodes(listOf(node))
val packetFlow = MutableSharedFlow<MeshPacket>()
every { serviceRepository.meshPacketFlow } returns packetFlow
every { processRadioResponseUseCase(any(), any(), any()) } returns RadioResponseResult.Success
// AdminRoute first sends a session key config request; the admin action fires
// only after the actual ConfigResponse (not a routing ACK / Success).
every { processRadioResponseUseCase(any(), any(), any()) } returns RadioResponseResult.ConfigResponse(Config())
viewModel = createViewModel()
@ -268,7 +272,7 @@ class RadioConfigViewModelTest {
viewModel.setResponseStateLoading(AdminRoute.FACTORY_RESET)
// Emit a packet to trigger processPacketResponse -> sendAdminRequest
// Emit a config response packet to trigger processPacketResponse -> sendAdminRequest
packetFlow.emit(MeshPacket())
verifySuspend { adminActionsUseCase.factoryReset(123, any()) }
@ -449,7 +453,6 @@ class RadioConfigViewModelTest {
nodeRepository.setNodes(listOf(node))
val packetFlow = MutableSharedFlow<MeshPacket>()
every { serviceRepository.meshPacketFlow } returns packetFlow
every { processRadioResponseUseCase(any(), 123, any()) } returns RadioResponseResult.Success
viewModel = createViewModel()
@ -461,13 +464,16 @@ class RadioConfigViewModelTest {
packetFlow.emit(MeshPacket())
viewModel.setResponseStateLoading(AdminRoute.SHUTDOWN)
every { processRadioResponseUseCase(any(), 123, any()) } returns RadioResponseResult.Success
// AdminRoute fires sendAdminRequest after receiving ConfigResponse (session key),
// not after a routing ACK (Success).
every { processRadioResponseUseCase(any(), 123, any()) } returns RadioResponseResult.ConfigResponse(Config())
packetFlow.emit(MeshPacket())
verifySuspend { adminActionsUseCase.shutdown(123) }
// NODEDB_RESET
everySuspend { adminActionsUseCase.nodedbReset(any(), any(), any()) } returns 42
viewModel.setResponseStateLoading(AdminRoute.NODEDB_RESET)
every { processRadioResponseUseCase(any(), 123, any()) } returns RadioResponseResult.ConfigResponse(Config())
packetFlow.emit(MeshPacket())
verifySuspend { adminActionsUseCase.nodedbReset(123, any(), any()) }
}