fix(transport): Kable BLE audit + thread-safety, MQTT, and logging fixes across transport layers (#5071)

This commit is contained in:
James Rich 2026-04-11 17:56:29 -05:00 committed by GitHub
parent 5f0e60eb21
commit a3c0a4832d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
44 changed files with 1123 additions and 513 deletions

View file

@ -36,14 +36,14 @@ class InterfaceFactory(
) {
internal val nopInterface by lazy { nopInterfaceFactory.create("") }
private val specMap: Map<InterfaceId, InterfaceSpec<*>>
get() =
mapOf(
InterfaceId.MOCK to mockSpec.value,
InterfaceId.NOP to NopInterfaceSpec(nopInterfaceFactory),
InterfaceId.SERIAL to serialSpec.value,
InterfaceId.TCP to tcpSpec.value,
)
private val specMap: Map<InterfaceId, InterfaceSpec<*>> by lazy {
mapOf(
InterfaceId.MOCK to mockSpec.value,
InterfaceId.NOP to NopInterfaceSpec(nopInterfaceFactory),
InterfaceId.SERIAL to serialSpec.value,
InterfaceId.TCP to tcpSpec.value,
)
}
fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = "${interfaceId.id}$rest"

View file

@ -38,19 +38,14 @@ class SerialInterface(
connect()
}
override fun onDeviceDisconnect(waitForStopped: Boolean) {
override fun onDeviceDisconnect(waitForStopped: Boolean, isPermanent: Boolean) {
connRef.get()?.close(waitForStopped)
super.onDeviceDisconnect(waitForStopped)
super.onDeviceDisconnect(waitForStopped, isPermanent)
}
override fun connect() {
val deviceMap = usbRepository.serialDevices.value
val device =
if (deviceMap.containsKey(address)) {
deviceMap[address]!!
} else {
deviceMap.map { (_, driver) -> driver }.firstOrNull()
}
val device = deviceMap[address] ?: deviceMap.values.firstOrNull()
if (device == null) {
Logger.e { "[$address] Serial device not found at address" }
} else {

View file

@ -33,19 +33,12 @@ class SerialInterfaceSpec(
factory.create(rest, service)
override fun addressValid(rest: String): Boolean {
usbRepository.serialDevices.value.filterValues { usbManager.hasPermission(it.device) }
findSerial(rest)?.let { d ->
return usbManager.hasPermission(d.device)
}
return false
val driver = findSerial(rest) ?: return false
return usbManager.hasPermission(driver.device)
}
internal fun findSerial(rest: String): UsbSerialDriver? {
val deviceMap = usbRepository.serialDevices.value
return if (deviceMap.containsKey(rest)) {
deviceMap[rest]!!
} else {
deviceMap.map { (_, driver) -> driver }.firstOrNull()
}
return deviceMap[rest] ?: deviceMap.values.firstOrNull()
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -14,17 +14,27 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
package org.meshtastic.core.network
import org.meshtastic.core.repository.RadioTransport
import co.touchlab.kermit.Logger
import io.ktor.client.plugins.logging.Logger as KtorLogger
/**
* Radio interface factory service provider interface. Each radio backend implementation needs to have a factory to
* create new instances. These instances are specific to a particular address. This interface defines a common API
* across all radio interfaces for obtaining implementation instances.
* Bridges Ktor's HTTP client logging to [Kermit][Logger] so HTTP request/response events appear in the standard app
* logs rather than going to [System.out] via Ktor's default [io.ktor.client.plugins.logging.Logger.DEFAULT].
*
* This is primarily used in conjunction with Dagger assisted injection for each backend interface type.
* Usage:
* ```
* HttpClient(engine) {
* install(Logging) {
* logger = KermitHttpLogger
* level = LogLevel.HEADERS
* }
* }
* ```
*/
interface InterfaceFactorySpi<T : RadioTransport> {
fun create(rest: String): T
object KermitHttpLogger : KtorLogger {
override fun log(message: String) {
Logger.d { message }
}
}

View file

@ -45,7 +45,9 @@ import org.meshtastic.core.ble.BleDevice
import org.meshtastic.core.ble.BleScanner
import org.meshtastic.core.ble.BleWriteType
import org.meshtastic.core.ble.BluetoothRepository
import org.meshtastic.core.ble.DisconnectReason
import org.meshtastic.core.ble.MeshtasticBleConstants.SERVICE_UUID
import org.meshtastic.core.ble.classifyBleException
import org.meshtastic.core.ble.retryBleOperation
import org.meshtastic.core.ble.toMeshtasticRadioProfile
import org.meshtastic.core.common.util.nowMillis
@ -57,18 +59,23 @@ import org.meshtastic.proto.ToRadio
import kotlin.concurrent.Volatile
import kotlin.concurrent.atomics.AtomicInt
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
private const val SCAN_RETRY_COUNT = 3
private const val SCAN_RETRY_DELAY_MS = 1000L
private const val CONNECTION_TIMEOUT_MS = 15_000L
private val SCAN_RETRY_DELAY = 1.seconds
private val CONNECTION_TIMEOUT = 15.seconds
private const val RECONNECT_FAILURE_THRESHOLD = 3
private const val RECONNECT_BASE_DELAY_MS = 5_000L
private const val RECONNECT_MAX_DELAY_MS = 60_000L
private val RECONNECT_BASE_DELAY = 5.seconds
private val RECONNECT_MAX_DELAY = 60.seconds
private const val RECONNECT_MAX_FAILURES = 10
/** Settle delay before each connection attempt to let the Android BLE stack finish any pending disconnect cleanup. */
private val SETTLE_DELAY = 1.seconds
/**
* Minimum milliseconds a BLE connection must stay up before we consider it "stable" and reset
* Minimum time a BLE connection must stay up before we consider it "stable" and reset
* [BleRadioInterface.consecutiveFailures]. Without this, a device at the edge of BLE range can repeatedly connect for a
* fraction of a second and drop each brief connection resets the failure counter so [RECONNECT_FAILURE_THRESHOLD] is
* never reached, and the app never signals [ConnectionState.DeviceSleep].
@ -76,24 +83,29 @@ private const val RECONNECT_MAX_FAILURES = 10
* The value (5 s) is long enough that only connections that survive past the initial GATT setup are treated as genuine,
* but short enough that normal reconnects after light-sleep still reset the counter promptly.
*/
private const val MIN_STABLE_CONNECTION_MS = 5_000L
private val MIN_STABLE_CONNECTION = 5.seconds
/**
* Returns the reconnect backoff delay in milliseconds for a given consecutive failure count.
* Returns the reconnect backoff delay for a given consecutive failure count.
*
* Backoff schedule: 1 failure 5 s 2 failures 10 s 3 failures 20 s 4 failures 40 s 5+ failures 60 s (capped)
*/
internal fun computeReconnectBackoffMs(consecutiveFailures: Int): Long {
if (consecutiveFailures <= 0) return RECONNECT_BASE_DELAY_MS
return minOf(RECONNECT_BASE_DELAY_MS * (1L shl (consecutiveFailures - 1).coerceAtMost(4)), RECONNECT_MAX_DELAY_MS)
internal fun computeReconnectBackoff(consecutiveFailures: Int): Duration {
if (consecutiveFailures <= 0) return RECONNECT_BASE_DELAY
val multiplier = 1 shl (consecutiveFailures - 1).coerceAtMost(4)
return minOf(RECONNECT_BASE_DELAY * multiplier, RECONNECT_MAX_DELAY)
}
// Milliseconds to wait after launching characteristic observations before triggering the
// Meshtastic handshake. Both fromRadio and logRadio observation flows write the CCCD
// asynchronously via Kable's GATT queue. Without this settle window the want_config_id
// burst from the radio can arrive before notifications are enabled, causing the first
// handshake attempt to look like a stall.
private const val CCCD_SETTLE_MS = 50L
/**
* Delay after writing a heartbeat before re-polling FROMRADIO.
*
* The ESP32 firmware processes TORADIO writes asynchronously (NimBLE callback FreeRTOS main task queue
* `handleToRadio()` `heartbeatReceived = true`). The immediate drain trigger in
* [KableMeshtasticRadioProfile.sendToRadio] fires before this completes, so the `queueStatus` response is not yet
* available. 200 ms is well above observed ESP32 task scheduling latency (~1050 ms) while remaining imperceptible to
* the user.
*/
private val HEARTBEAT_DRAIN_DELAY = 200.milliseconds
private val SCAN_TIMEOUT = 5.seconds
private val GATT_CLEANUP_TIMEOUT = 5.seconds
@ -120,7 +132,7 @@ class BleRadioInterface(
private val bluetoothRepository: BluetoothRepository,
private val connectionFactory: BleConnectionFactory,
private val service: RadioInterfaceService,
val address: String,
internal val address: String,
) : RadioTransport {
private val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
@ -143,11 +155,15 @@ class BleRadioInterface(
private val bleConnection: BleConnection = connectionFactory.create(connectionScope, address)
private val writeMutex: Mutex = Mutex()
private var connectionStartTime: Long = 0
private var packetsReceived: Int = 0
private var packetsSent: Int = 0
private var bytesReceived: Long = 0
private var bytesSent: Long = 0
@Volatile private var connectionStartTime: Long = 0
@Volatile private var packetsReceived: Int = 0
@Volatile private var packetsSent: Int = 0
@Volatile private var bytesReceived: Long = 0
@Volatile private var bytesSent: Long = 0
@Volatile private var isFullyConnected = false
private var connectionJob: Job? = null
@ -186,7 +202,7 @@ class BleRadioInterface(
}
if (attempt < SCAN_RETRY_COUNT - 1) {
delay(SCAN_RETRY_DELAY_MS)
delay(SCAN_RETRY_DELAY)
}
}
@ -199,23 +215,18 @@ class BleRadioInterface(
connectionScope.launch {
while (isActive) {
try {
// Allow any pending background disconnects to complete and the Android BLE stack
// to settle before we attempt a new connection.
@Suppress("MagicNumber")
val connectDelayMs = 1000L
delay(connectDelayMs)
// Settle delay: let the Android BLE stack finish any pending
// disconnect cleanup before starting a new connection attempt.
delay(SETTLE_DELAY)
connectionStartTime = nowMillis
Logger.i { "[$address] BLE connection attempt started" }
val device = findDevice()
// Ensure the device is bonded before connecting. On Android, the
// firmware may require an encrypted link (pairing mode != NO_PIN).
// Without an explicit bond the GATT connection will fail with
// insufficient-authentication (status 5) or the dreaded status 133.
// On Desktop/JVM this is a no-op since the OS handles pairing during
// the GATT connection when the peripheral requires it.
// Bond before connecting: firmware may require an encrypted link,
// and without a bond Android fails with status 5 or 133.
// No-op on Desktop/JVM where the OS handles pairing automatically.
if (!bluetoothRepository.isBonded(address)) {
Logger.i { "[$address] Device not bonded, initiating bonding" }
@Suppress("TooGenericExceptionCaught")
@ -227,36 +238,26 @@ class BleRadioInterface(
}
}
var state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT_MS)
if (state !is BleConnectionState.Connected) {
// Kable on Android occasionally fails the first connection attempt with
// NotConnectedException if the previous peripheral wasn't fully cleaned
// up by the OS. A quick retry resolves it.
Logger.d { "[$address] First connection attempt failed, retrying in 1.5s" }
@Suppress("MagicNumber")
delay(1500L)
state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT_MS)
}
val state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT)
if (state !is BleConnectionState.Connected) {
throw RadioNotConnectedException("Failed to connect to device at address $address")
}
// Connection succeeded — only reset the failure counter if the
// connection stays up long enough. See MIN_STABLE_CONNECTION_MS.
// Only reset failures if connection was stable (see MIN_STABLE_CONNECTION).
val gattConnectedAt = nowMillis
isFullyConnected = true
onConnected()
// Use coroutineScope so that the connectionState listener is scoped to this
// iteration only. When the inner scope exits (on disconnect), the listener is
// cancelled automatically before the next reconnect cycle starts a fresh one.
// Scope the connectionState listener to this iteration so it's
// cancelled automatically before the next reconnect cycle.
var disconnectReason: DisconnectReason = DisconnectReason.Unknown
coroutineScope {
bleConnection.connectionState
.onEach { s ->
if (s is BleConnectionState.Disconnected && isFullyConnected) {
isFullyConnected = false
disconnectReason = s.reason
onDisconnected()
}
}
@ -265,27 +266,30 @@ class BleRadioInterface(
discoverServicesAndSetupCharacteristics()
// Suspend here until Kable drops the connection
bleConnection.connectionState.first { it is BleConnectionState.Disconnected }
}
Logger.i { "[$address] BLE connection dropped, preparing to reconnect" }
Logger.i {
"[$address] BLE connection dropped (reason: $disconnectReason), preparing to reconnect"
}
// Only reset the failure counter if the connection was stable (lasted
// longer than MIN_STABLE_CONNECTION_MS). A connection that drops within
// seconds typically means the device is at the edge of BLE range or
// powered off — the Android BLE stack may briefly "connect" to a cached
// GATT profile before realising the device is gone. Without this guard,
// the failure counter resets on every brief connect, preventing us from
// ever reaching RECONNECT_FAILURE_THRESHOLD and signalling DeviceSleep.
val connectionUptime = nowMillis - gattConnectedAt
if (connectionUptime >= MIN_STABLE_CONNECTION_MS) {
// Skip failure counting for intentional disconnects.
if (disconnectReason is DisconnectReason.LocalDisconnect) {
consecutiveFailures = 0
continue
}
// A connection that drops almost immediately (< MIN_STABLE_CONNECTION)
// is treated as a failure — the BLE stack may have "connected" to a
// cached GATT profile before realising the device is gone.
val connectionUptime = (nowMillis - gattConnectedAt).milliseconds
if (connectionUptime >= MIN_STABLE_CONNECTION) {
consecutiveFailures = 0
} else {
consecutiveFailures++
Logger.w {
"[$address] Connection lasted only ${connectionUptime}ms " +
"(< ${MIN_STABLE_CONNECTION_MS}ms) — treating as failure " +
"[$address] Connection lasted only $connectionUptime " +
"(< $MIN_STABLE_CONNECTION) — treating as failure " +
"(consecutive failures: $consecutiveFailures)"
}
if (consecutiveFailures >= RECONNECT_MAX_FAILURES) {
@ -307,16 +311,14 @@ class BleRadioInterface(
Logger.d { "[$address] BLE connection coroutine cancelled" }
throw e
} catch (e: Exception) {
val failureTime = nowMillis - connectionStartTime
val failureTime = (nowMillis - connectionStartTime).milliseconds
consecutiveFailures++
Logger.w(e) {
"[$address] Failed to connect to device after ${failureTime}ms " +
"[$address] Failed to connect to device after $failureTime " +
"(consecutive failures: $consecutiveFailures)"
}
// After exceeding the max failure limit, give up permanently to stop
// draining battery on a device that is genuinely offline. The user
// must manually reconnect from the connections screen.
// Give up permanently to stop draining battery.
if (consecutiveFailures >= RECONNECT_MAX_FAILURES) {
Logger.e { "[$address] Giving up after $consecutiveFailures consecutive failures" }
val (_, msg) = e.toDisconnectReason()
@ -324,18 +326,14 @@ class BleRadioInterface(
return@launch
}
// At the failure threshold, signal DeviceSleep so
// MeshConnectionManagerImpl can start its sleep timeout.
// Signal DeviceSleep so MeshConnectionManagerImpl starts its sleep timeout.
if (consecutiveFailures >= RECONNECT_FAILURE_THRESHOLD) {
handleFailure(e)
}
// Exponential backoff: 5s → 10s → 20s → 40s → capped at 60s.
// Reduces BLE stack pressure and battery drain when the device is genuinely
// out of range, while still recovering quickly from transient drops.
val backoffMs = computeReconnectBackoffMs(consecutiveFailures)
Logger.d { "[$address] Retrying in ${backoffMs}ms (failure #$consecutiveFailures)" }
delay(backoffMs)
val backoff = computeReconnectBackoff(consecutiveFailures)
Logger.d { "[$address] Retrying in $backoff (failure #$consecutiveFailures)" }
delay(backoff)
}
}
}
@ -354,23 +352,8 @@ class BleRadioInterface(
private fun onDisconnected() {
radioService = null
val uptime =
if (connectionStartTime > 0) {
nowMillis - connectionStartTime
} else {
0
}
Logger.i {
"[$address] BLE disconnected - " +
"Uptime: ${uptime}ms, " +
"Packets RX: $packetsReceived ($bytesReceived bytes), " +
"Packets TX: $packetsSent ($bytesSent bytes)"
}
// Signal DeviceSleep immediately so the UI reflects the disconnect while the
// reconnect loop continues in the background. The previous approach suppressed
// this signal until RECONNECT_FAILURE_THRESHOLD consecutive failures, leaving the
// UI stuck on "Connected" for 35+ seconds after the device disappeared.
Logger.i { "[$address] BLE disconnected - ${formatSessionStats()}" }
// Signal immediately so the UI reflects the disconnect while reconnect continues.
service.onDisconnect(isPermanent = false)
}
@ -379,7 +362,6 @@ class BleRadioInterface(
bleConnection.profile(serviceUuid = SERVICE_UUID) { service ->
val radioService = service.toMeshtasticRadioProfile()
// Wire up notifications
radioService.fromRadio
.onEach { packet ->
Logger.v { "[$address] Received packet fromRadio (${packet.size} bytes)" }
@ -402,16 +384,12 @@ class BleRadioInterface(
}
.launchIn(this)
// Store reference for handleSendToRadio
this@BleRadioInterface.radioService = radioService
Logger.i { "[$address] Profile service active and characteristics subscribed" }
// Give Kable's async CCCD writes time to complete before triggering the
// Meshtastic handshake. The fromRadio/logRadio observation flows register
// notifications through the GATT queue asynchronously. Without this settle
// window, the want_config_id burst arrives before notifications are enabled.
delay(CCCD_SETTLE_MS)
// Wait for FROMNUM CCCD write before triggering the Meshtastic handshake.
radioService.awaitSubscriptionReady()
// Log negotiated MTU for diagnostics
val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE)
@ -421,10 +399,8 @@ class BleRadioInterface(
}
} catch (e: Exception) {
Logger.w(e) { "[$address] Profile service discovery or operation failed" }
// Ensure the peripheral is disconnected so the outer reconnect loop sees a clean
// Disconnected state. Do NOT call handleFailure here — the reconnect loop tracks
// consecutive failures and calls handleFailure after RECONNECT_FAILURE_THRESHOLD,
// preventing premature onDisconnect signals to the service on transient errors.
// Disconnect to let the outer reconnect loop see a clean Disconnected state.
// Do NOT call handleFailure here — the reconnect loop owns failure counting.
try {
bleConnection.disconnect()
} catch (ignored: Exception) {
@ -481,25 +457,25 @@ class BleRadioInterface(
val nonce = heartbeatNonce.fetchAndAdd(1)
Logger.v { "[$address] BLE keepAlive — sending ToRadio heartbeat (nonce=$nonce)" }
handleSendToRadio(ToRadio(heartbeat = Heartbeat(nonce = nonce)).encode())
// The firmware responds to heartbeats by queuing a `queueStatus` FromRadio packet
// on the next getFromRadio() call, but it does NOT send a FROMNUM notification for
// it. The immediate drain trigger in sendToRadio() fires before the ESP32's async
// task queue has processed the heartbeat, so the response sits unread. Schedule a
// delayed re-drain to pick it up.
connectionScope.launch {
delay(HEARTBEAT_DRAIN_DELAY)
radioService?.requestDrain()
}
}
/** Closes the connection to the device. */
override fun close() {
val uptime = if (connectionStartTime > 0) nowMillis - connectionStartTime else 0
Logger.i {
"[$address] Disconnecting. " +
"Uptime: ${uptime}ms, " +
"Packets RX: $packetsReceived ($bytesReceived bytes), " +
"Packets TX: $packetsSent ($bytesSent bytes)"
}
// Cancel the connection scope to break the while(isActive) reconnect loop.
Logger.i { "[$address] Disconnecting. ${formatSessionStats()}" }
connectionScope.cancel("close() called")
// GATT cleanup must survive serviceScope cancellation. SharedRadioInterfaceService calls
// close() and then immediately cancels serviceScope — a coroutine launched on serviceScope
// may never be dispatched, leaving the BluetoothGatt object leaked (causes GATT 133 on the
// next connect attempt). GlobalScope is the correct tool here: the cleanup is short-lived,
// fire-and-forget, and must outlive any application-managed scope.
// onDisconnect is handled by SharedRadioInterfaceService.stopInterfaceLocked() directly.
// GATT cleanup must outlive serviceScope cancellation — GlobalScope is intentional.
// SharedRadioInterfaceService cancels serviceScope immediately after close(), so a
// coroutine launched there may never run, leaking BluetoothGatt (causes GATT 133).
@OptIn(DelicateCoroutinesApi::class)
GlobalScope.launch {
try {
@ -525,17 +501,27 @@ class BleRadioInterface(
service.onDisconnect(isPermanent, errorMessage = msg)
}
/** Formats a one-line session statistics summary for logging. */
private fun formatSessionStats(): String {
val uptime = if (connectionStartTime > 0) nowMillis - connectionStartTime else 0
return "Uptime: ${uptime}ms, " +
"Packets RX: $packetsReceived ($bytesReceived bytes), " +
"Packets TX: $packetsSent ($bytesSent bytes)"
}
private fun Throwable.toDisconnectReason(): Pair<Boolean, String> {
val isPermanent =
this::class.simpleName == "BluetoothUnavailableException" ||
this::class.simpleName == "ManagerClosedException"
classifyBleException()?.let {
return it.isPermanent to it.message
}
val msg =
when {
this is RadioNotConnectedException -> this.message ?: "Device not found"
this is NoSuchElementException || this is IllegalArgumentException -> "Required characteristic missing"
this::class.simpleName == "GattException" -> "GATT Error: ${this.message}"
when (this) {
is RadioNotConnectedException -> this.message ?: "Device not found"
is NoSuchElementException,
is IllegalArgumentException,
-> "Required characteristic missing"
else -> this.message ?: this::class.simpleName ?: "Unknown"
}
return Pair(isPermanent, msg)
return false to msg
}
}

View file

@ -42,11 +42,11 @@ abstract class StreamInterface(protected val service: RadioInterfaceService) : R
*
* @param waitForStopped if true we should wait for the manager to finish - must be false if called from inside the
* manager callbacks
* @param isPermanent true if the device is definitely gone (e.g. USB unplugged), false if it may come back (e.g.
* TCP transient disconnect). Defaults to true for serial subclasses like [TCPInterface] override with false.
*/
protected open fun onDeviceDisconnect(waitForStopped: Boolean) {
service.onDisconnect(
isPermanent = true,
) // if USB device disconnects it is definitely permanently gone, not sleeping)
protected open fun onDeviceDisconnect(waitForStopped: Boolean, isPermanent: Boolean = true) {
service.onDisconnect(isPermanent = isPermanent)
}
protected open fun connect() {

View file

@ -44,6 +44,7 @@ import org.meshtastic.core.model.util.subscribeList
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.proto.MqttClientProxyMessage
import kotlin.concurrent.Volatile
@Single(binds = [MQTTRepository::class])
class MQTTRepositoryImpl(
@ -62,7 +63,7 @@ class MQTTRepositoryImpl(
private const val RECONNECT_BACKOFF_MULTIPLIER = 2
}
private var client: MQTTClient? = null
@Volatile private var client: MQTTClient? = null
@OptIn(ExperimentalSerializationApi::class)
private val json = Json {
@ -70,7 +71,8 @@ class MQTTRepositoryImpl(
exceptionsWithDebugInfo = false
}
private val scope = CoroutineScope(dispatchers.default + SupervisorJob())
private var clientJob: Job? = null
@Volatile private var clientJob: Job? = null
private val publishSemaphore = Semaphore(20)
@Suppress("TooGenericExceptionCaught")
@ -149,12 +151,10 @@ class MQTTRepositoryImpl(
while (true) {
try {
Logger.i { "MQTT Starting client loop for $host:$port" }
// Reset backoff on each successful connection establishment. If the broker
// disconnects cleanly after hours of operation, the next reconnect should
// start with the minimum delay rather than whatever was accumulated.
reconnectDelay = INITIAL_RECONNECT_DELAY_MS
newClient.runSuspend()
// runSuspend returned normally — broker closed connection. Retry.
// runSuspend returned normally — broker closed connection cleanly.
// Reset backoff so the next reconnect starts with the minimum delay.
reconnectDelay = INITIAL_RECONNECT_DELAY_MS
Logger.w { "MQTT client loop ended normally, reconnecting in ${reconnectDelay}ms" }
} catch (e: io.github.davidepianca98.mqtt.MQTTException) {
Logger.e(e) { "MQTT Client loop error (MQTT), reconnecting in ${reconnectDelay}ms" }
@ -199,15 +199,25 @@ class MQTTRepositoryImpl(
@OptIn(ExperimentalUnsignedTypes::class)
override fun publish(topic: String, data: ByteArray, retained: Boolean) {
val currentClient = client
if (currentClient == null) {
Logger.w { "MQTT publish to $topic dropped: client not connected" }
return
}
Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" }
scope.launch {
publishSemaphore.withPermit {
client?.publish(
retain = retained,
qos = Qos.AT_LEAST_ONCE,
topic = topic,
payload = data.toUByteArray(),
)
@Suppress("TooGenericExceptionCaught")
try {
currentClient.publish(
retain = retained,
qos = Qos.AT_LEAST_ONCE,
topic = topic,
payload = data.toUByteArray(),
)
} catch (e: Exception) {
Logger.w(e) { "MQTT publish to $topic failed" }
}
}
}
}

View file

@ -36,6 +36,7 @@ import org.meshtastic.core.testing.FakeBluetoothRepository
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalCoroutinesApi::class)
class BleRadioInterfaceTest {
@ -164,14 +165,14 @@ class BleRadioInterfaceTest {
}
@Test
fun `computeReconnectBackoffMs returns correct backoff values`() {
assertEquals(5_000L, computeReconnectBackoffMs(0))
assertEquals(5_000L, computeReconnectBackoffMs(1))
assertEquals(10_000L, computeReconnectBackoffMs(2))
assertEquals(20_000L, computeReconnectBackoffMs(3))
assertEquals(40_000L, computeReconnectBackoffMs(4))
assertEquals(60_000L, computeReconnectBackoffMs(5))
assertEquals(60_000L, computeReconnectBackoffMs(10))
assertEquals(60_000L, computeReconnectBackoffMs(100))
fun `computeReconnectBackoff returns correct backoff values`() {
assertEquals(5.seconds, computeReconnectBackoff(0))
assertEquals(5.seconds, computeReconnectBackoff(1))
assertEquals(10.seconds, computeReconnectBackoff(2))
assertEquals(20.seconds, computeReconnectBackoff(3))
assertEquals(40.seconds, computeReconnectBackoff(4))
assertEquals(60.seconds, computeReconnectBackoff(5))
assertEquals(60.seconds, computeReconnectBackoff(10))
assertEquals(60.seconds, computeReconnectBackoff(100))
}
}

View file

@ -19,6 +19,7 @@ package org.meshtastic.core.network.radio
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds
/**
* Tests the exponential backoff schedule used by [BleRadioInterface] when consecutive connection attempts fail. The
@ -28,42 +29,42 @@ class ReconnectBackoffTest {
@Test
fun `zero failures yields base delay`() {
assertEquals(5_000L, computeReconnectBackoffMs(0))
assertEquals(5.seconds, computeReconnectBackoff(0))
}
@Test
fun `first failure yields 5s`() {
assertEquals(5_000L, computeReconnectBackoffMs(1))
assertEquals(5.seconds, computeReconnectBackoff(1))
}
@Test
fun `second failure yields 10s`() {
assertEquals(10_000L, computeReconnectBackoffMs(2))
assertEquals(10.seconds, computeReconnectBackoff(2))
}
@Test
fun `third failure yields 20s`() {
assertEquals(20_000L, computeReconnectBackoffMs(3))
assertEquals(20.seconds, computeReconnectBackoff(3))
}
@Test
fun `fourth failure yields 40s`() {
assertEquals(40_000L, computeReconnectBackoffMs(4))
assertEquals(40.seconds, computeReconnectBackoff(4))
}
@Test
fun `fifth failure is capped at 60s`() {
assertEquals(60_000L, computeReconnectBackoffMs(5))
assertEquals(60.seconds, computeReconnectBackoff(5))
}
@Test
fun `large failure count stays capped at 60s`() {
assertEquals(60_000L, computeReconnectBackoffMs(100))
assertEquals(60.seconds, computeReconnectBackoff(100))
}
@Test
fun `backoff is strictly increasing up to the cap`() {
val values = (1..5).map { computeReconnectBackoffMs(it) }
val values = (1..5).map { computeReconnectBackoff(it) }
for (i in 0 until values.size - 1) {
assertTrue(
values[i] < values[i + 1],

View file

@ -52,7 +52,8 @@ open class TCPInterface(
override fun onDisconnected() {
// Transport already performed teardown; only propagate lifecycle to StreamInterface.
super@TCPInterface.onDeviceDisconnect(false)
// TCP disconnects are transient (not permanent) — the transport will auto-reconnect.
super@TCPInterface.onDeviceDisconnect(false, isPermanent = false)
}
override fun onPacketReceived(bytes: ByteArray) {
@ -71,9 +72,9 @@ open class TCPInterface(
Logger.d { "[$address] TCPInterface.sendBytes delegated to transport" }
}
override fun onDeviceDisconnect(waitForStopped: Boolean) {
override fun onDeviceDisconnect(waitForStopped: Boolean, isPermanent: Boolean) {
transport.stop()
super.onDeviceDisconnect(waitForStopped)
super.onDeviceDisconnect(waitForStopped, isPermanent = false)
}
override fun connect() {

View file

@ -65,6 +65,10 @@ class TcpTransport(
}
companion object {
/**
* Maximum reconnect retries. Set to [Int.MAX_VALUE] to retry indefinitely the caller ([TcpTransport.stop])
* owns the cancellation lifecycle.
*/
const val MAX_RECONNECT_RETRIES = Int.MAX_VALUE
const val MIN_BACKOFF_MILLIS = 1_000L
const val MAX_BACKOFF_MILLIS = 5 * 60 * 1_000L
@ -84,18 +88,26 @@ class TcpTransport(
)
// TCP socket state
private var socket: Socket? = null
private var outStream: OutputStream? = null
private var connectionJob: Job? = null
private var currentAddress: String? = null
@Volatile private var socket: Socket? = null
@Volatile private var outStream: OutputStream? = null
@Volatile private var connectionJob: Job? = null
@Volatile private var currentAddress: String? = null
// Metrics
private var connectionStartTime: Long = 0
private var packetsReceived: Int = 0
private var packetsSent: Int = 0
private var bytesReceived: Long = 0
private var bytesSent: Long = 0
private var timeoutEvents: Int = 0
@Volatile private var connectionStartTime: Long = 0
@Volatile private var packetsReceived: Int = 0
@Volatile private var packetsSent: Int = 0
@Volatile private var bytesReceived: Long = 0
@Volatile private var bytesSent: Long = 0
@Volatile private var timeoutEvents: Int = 0
/** Whether the transport is currently connected. */
val isConnected: Boolean