feat: implement XModem file transfers and enhance BLE connection robustness (#4959)
Some checks are pending
Dependency Submission / dependency-submission (push) Waiting to run
Main CI (Verify & Build) / validate-and-build (push) Waiting to run
Main Push Changelog / Generate main push changelog (push) Waiting to run

This commit is contained in:
James Rich 2026-03-30 22:49:31 -05:00 committed by GitHub
parent ae4465d7c8
commit c75c9b34d6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 1100 additions and 120 deletions

View file

@ -22,6 +22,7 @@ import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
@ -35,6 +36,8 @@ import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import org.meshtastic.core.ble.BleConnection
import org.meshtastic.core.ble.BleConnectionFactory
import org.meshtastic.core.ble.BleConnectionState
@ -49,13 +52,37 @@ import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.model.RadioNotConnectedException
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioTransport
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import kotlin.concurrent.Volatile
import kotlin.concurrent.atomics.AtomicInt
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.time.Duration.Companion.seconds
private const val SCAN_RETRY_COUNT = 3
private const val SCAN_RETRY_DELAY_MS = 1000L
private const val CONNECTION_TIMEOUT_MS = 15_000L
private const val RECONNECT_FAILURE_THRESHOLD = 3
private const val RECONNECT_BASE_DELAY_MS = 5_000L
private const val RECONNECT_MAX_DELAY_MS = 60_000L
/**
* Returns the reconnect backoff delay in milliseconds for a given consecutive failure count.
*
* Backoff schedule: 1 failure 5 s 2 failures 10 s 3 failures 20 s 4 failures 40 s 5+ failures 60 s (capped)
*/
internal fun computeReconnectBackoffMs(consecutiveFailures: Int): Long {
if (consecutiveFailures <= 0) return RECONNECT_BASE_DELAY_MS
return minOf(RECONNECT_BASE_DELAY_MS * (1L shl (consecutiveFailures - 1).coerceAtMost(4)), RECONNECT_MAX_DELAY_MS)
}
// Milliseconds to wait after launching characteristic observations before triggering the
// Meshtastic handshake. Both fromRadio and logRadio observation flows write the CCCD
// asynchronously via Kable's GATT queue. Without this settle window the want_config_id
// burst from the radio can arrive before notifications are enabled, causing the first
// handshake attempt to look like a stall.
private const val CCCD_SETTLE_MS = 50L
private val SCAN_TIMEOUT = 5.seconds
/**
@ -113,6 +140,9 @@ class BleRadioInterface(
private var connectionJob: Job? = null
private var consecutiveFailures = 0
@OptIn(ExperimentalAtomicApi::class)
private val heartbeatNonce = AtomicInt(0)
init {
connect()
}
@ -122,7 +152,7 @@ class BleRadioInterface(
/** Robustly finds the device. First checks bonded devices, then performs a short scan if not found. */
private suspend fun findDevice(): BleDevice {
bluetoothRepository.state.value.bondedDevices
.firstOrNull { it.address == address }
.firstOrNull { it.address.equals(address, ignoreCase = true) }
?.let {
return it
}
@ -132,9 +162,9 @@ class BleRadioInterface(
repeat(SCAN_RETRY_COUNT) { attempt ->
try {
val d =
kotlinx.coroutines.withTimeoutOrNull(SCAN_TIMEOUT) {
withTimeoutOrNull(SCAN_TIMEOUT) {
scanner.scan(timeout = SCAN_TIMEOUT, serviceUuid = SERVICE_UUID, address = address).first {
it.address == address
it.address.equals(address, ignoreCase = true)
}
}
if (d != null) return d
@ -150,6 +180,7 @@ class BleRadioInterface(
throw RadioNotConnectedException("Device not found at address $address")
}
@Suppress("LongMethod")
private fun connect() {
connectionJob = connectionScope.launch {
while (isActive) {
@ -158,22 +189,39 @@ class BleRadioInterface(
// to settle before we attempt a new connection.
@Suppress("MagicNumber")
val connectDelayMs = 1000L
kotlinx.coroutines.delay(connectDelayMs)
delay(connectDelayMs)
connectionStartTime = nowMillis
Logger.i { "[$address] BLE connection attempt started" }
val device = findDevice()
// Ensure the device is bonded before connecting. On Android, the
// firmware may require an encrypted link (pairing mode != NO_PIN).
// Without an explicit bond the GATT connection will fail with
// insufficient-authentication (status 5) or the dreaded status 133.
// On Desktop/JVM this is a no-op since the OS handles pairing during
// the GATT connection when the peripheral requires it.
if (!bluetoothRepository.isBonded(address)) {
Logger.i { "[$address] Device not bonded, initiating bonding..." }
@Suppress("TooGenericExceptionCaught")
try {
bluetoothRepository.bond(device)
Logger.i { "[$address] Bonding successful" }
} catch (e: Exception) {
Logger.w(e) { "[$address] Bonding failed, attempting connection anyway" }
}
}
var state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT_MS)
if (state !is BleConnectionState.Connected) {
// Kable on Android occasionally fails the first connection attempt with NotConnectedException
// if the previous peripheral wasn't fully cleaned up by the OS. A quick retry resolves it.
// Kable on Android occasionally fails the first connection attempt with
// NotConnectedException if the previous peripheral wasn't fully cleaned
// up by the OS. A quick retry resolves it.
Logger.w { "[$address] First connection attempt failed, retrying in 1.5s..." }
@Suppress("MagicNumber")
val retryDelayMs = 1500L
kotlinx.coroutines.delay(retryDelayMs)
delay(1500L)
state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT_MS)
}
@ -218,15 +266,19 @@ class BleRadioInterface(
"(consecutive failures: $consecutiveFailures)"
}
// After repeated failures, signal DeviceSleep so MeshConnectionManagerImpl can
// start its sleep timeout. handleFailure covers permanent-error cases.
if (consecutiveFailures >= RECONNECT_FAILURE_THRESHOLD) {
// At the failure threshold, signal DeviceSleep so MeshConnectionManagerImpl can
// start its sleep timeout. Use == (not >=) to fire exactly once; repeated
// onDisconnect signals would reset upstream state machines unnecessarily.
if (consecutiveFailures == RECONNECT_FAILURE_THRESHOLD) {
handleFailure(e)
}
// Wait before retrying to prevent hot loops
@Suppress("MagicNumber")
kotlinx.coroutines.delay(5000L)
// Exponential backoff: 5s → 10s → 20s → 40s → capped at 60s.
// Reduces BLE stack pressure and battery drain when the device is genuinely
// out of range, while still recovering quickly from transient drops.
val backoffMs = computeReconnectBackoffMs(consecutiveFailures)
Logger.d { "[$address] Retrying in ${backoffMs}ms (failure #$consecutiveFailures)" }
delay(backoffMs)
}
}
}
@ -297,6 +349,12 @@ class BleRadioInterface(
Logger.i { "[$address] Profile service active and characteristics subscribed" }
// Give Kable's async CCCD writes time to complete before triggering the
// Meshtastic handshake. The fromRadio/logRadio observation flows register
// notifications through the GATT queue asynchronously. Without this settle
// window, the want_config_id burst arrives before notifications are enabled.
delay(CCCD_SETTLE_MS)
// Log negotiated MTU for diagnostics
val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE)
Logger.i { "[$address] BLE Radio Session Ready. Max write length (WITHOUT_RESPONSE): $maxLen bytes" }
@ -305,8 +363,15 @@ class BleRadioInterface(
}
} catch (e: Exception) {
Logger.w(e) { "[$address] Profile service discovery or operation failed" }
bleConnection.disconnect()
handleFailure(e)
// Ensure the peripheral is disconnected so the outer reconnect loop sees a clean
// Disconnected state. Do NOT call handleFailure here — the reconnect loop tracks
// consecutive failures and calls handleFailure after RECONNECT_FAILURE_THRESHOLD,
// preventing premature onDisconnect signals to the service on transient errors.
try {
bleConnection.disconnect()
} catch (ignored: Exception) {
Logger.w(ignored) { "[$address] disconnect() failed after profile error" }
}
}
}
@ -347,34 +412,57 @@ class BleRadioInterface(
}
}
@OptIn(ExperimentalAtomicApi::class)
override fun keepAlive() {
Logger.d { "[$address] BLE keepAlive" }
// Send a ToRadio heartbeat so the firmware resets its power-saving idle timer.
// The firmware only resets the timer on writes to the TORADIO characteristic; a
// BLE-level GATT keepalive is invisible to it. Without this the device may enter
// light-sleep and drop the BLE connection after ~60 s of application inactivity.
//
// Each heartbeat uses a distinct nonce to vary the wire bytes, preventing the
// firmware's per-connection duplicate-write filter from silently dropping it.
val nonce = heartbeatNonce.fetchAndAdd(1)
Logger.d { "[$address] BLE keepAlive — sending ToRadio heartbeat (nonce=$nonce)" }
handleSendToRadio(ToRadio(heartbeat = Heartbeat(nonce = nonce)).encode())
}
/** Closes the connection to the device. */
override fun close() {
val uptime =
if (connectionStartTime > 0) {
nowMillis - connectionStartTime
} else {
0
}
val uptime = if (connectionStartTime > 0) nowMillis - connectionStartTime else 0
Logger.i {
"[$address] Disconnecting. " +
"Uptime: ${uptime}ms, " +
"Packets RX: $packetsReceived ($bytesReceived bytes), " +
"Packets TX: $packetsSent ($bytesSent bytes)"
}
// Cancel the connection scope FIRST to break the while(isActive) reconnect loop,
// then perform async cleanup on the parent serviceScope.
// Cancel the connection scope to break the while(isActive) reconnect loop.
connectionScope.cancel("close() called")
// GATT cleanup must run regardless of serviceScope lifecycle. SharedRadioInterfaceService
// cancels serviceScope immediately after calling close(), so launching on serviceScope is
// not reliable — the coroutine may never start. We use withContext(NonCancellable) inside
// a serviceScope.launch to guarantee cleanup completes even if the scope is cancelled
// mid-flight, preventing leaked BluetoothGatt objects (GATT 133 errors).
// onDisconnect is handled by SharedRadioInterfaceService.stopInterfaceLocked() directly.
serviceScope.launch {
try {
bleConnection.disconnect()
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
Logger.w(e) { "[$address] Failed to disconnect in close()" }
withContext(NonCancellable) {
// Send ToRadio.disconnect before dropping the BLE link. The firmware calls its
// own close() immediately on receipt, resetting the PhoneAPI state machine
// (config nonce, packet queue, observers) without waiting for the 6-second BLE
// supervision timeout. Best-effort: if the write fails we still disconnect below.
val currentService = radioService
if (currentService != null) {
try {
withTimeoutOrNull(2_000L) { currentService.sendToRadio(ToRadio(disconnect = true).encode()) }
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
Logger.w(e) { "[$address] Failed to send disconnect signal" }
}
}
try {
bleConnection.disconnect()
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
Logger.w(e) { "[$address] Failed to disconnect in close()" }
}
}
service.onDisconnect(true)
}
}

View file

@ -17,10 +17,14 @@
package org.meshtastic.core.network.radio
import dev.mokkery.MockMode
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.model.RadioNotConnectedException
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.testing.FakeBleConnection
import org.meshtastic.core.testing.FakeBleConnectionFactory
@ -82,4 +86,42 @@ class BleRadioInterfaceTest {
)
assertEquals(address, bleInterface.address)
}
/**
* After [RECONNECT_FAILURE_THRESHOLD] consecutive connection failures, [RadioInterfaceService.onDisconnect] must be
* called so the higher layers can react (e.g. start the device-sleep timeout in [MeshConnectionManagerImpl]).
*
* Virtual-time breakdown (RECONNECT_FAILURE_THRESHOLD = 3): t = 1 000 ms iteration 1 settle delay elapses,
* connectAndAwait throws, backoff 5 s starts t = 6 000 ms backoff ends t = 7 000 ms iteration 2 settle delay
* elapses, connectAndAwait throws, backoff 10 s starts t = 17 000 ms backoff ends t = 18 000 ms iteration 3
* settle delay elapses, connectAndAwait throws onDisconnect called
*/
@Test
fun `onDisconnect is called after RECONNECT_FAILURE_THRESHOLD consecutive failures`() = runTest {
val device = FakeBleDevice(address = address, name = "Test Device")
bluetoothRepository.bond(device) // skip BLE scan — device is already bonded
// Make every connectAndAwait call throw so each iteration counts as one failure.
connection.connectException = RadioNotConnectedException("simulated failure")
val bleInterface =
BleRadioInterface(
serviceScope = this,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
service = service,
address = address,
)
// Advance through exactly 3 failure iterations (≈18 001 ms virtual time).
// The 4th iteration's backoff hasn't elapsed yet, so the coroutine is suspended
// and advanceTimeBy returns cleanly.
advanceTimeBy(18_001L)
verify { service.onDisconnect(any(), any()) }
// Cancel the reconnect loop so runTest can complete.
bleInterface.close()
}
}

View file

@ -0,0 +1,74 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
/**
* Tests the exponential backoff schedule used by [BleRadioInterface] when consecutive connection attempts fail. The
* schedule is: failure #1 5 s failure #2 10 s failure #3 20 s failure #4 40 s failure #5+ 60 s (capped)
*/
class ReconnectBackoffTest {
@Test
fun `zero failures yields base delay`() {
assertEquals(5_000L, computeReconnectBackoffMs(0))
}
@Test
fun `first failure yields 5s`() {
assertEquals(5_000L, computeReconnectBackoffMs(1))
}
@Test
fun `second failure yields 10s`() {
assertEquals(10_000L, computeReconnectBackoffMs(2))
}
@Test
fun `third failure yields 20s`() {
assertEquals(20_000L, computeReconnectBackoffMs(3))
}
@Test
fun `fourth failure yields 40s`() {
assertEquals(40_000L, computeReconnectBackoffMs(4))
}
@Test
fun `fifth failure is capped at 60s`() {
assertEquals(60_000L, computeReconnectBackoffMs(5))
}
@Test
fun `large failure count stays capped at 60s`() {
assertEquals(60_000L, computeReconnectBackoffMs(100))
}
@Test
fun `backoff is strictly increasing up to the cap`() {
val values = (1..5).map { computeReconnectBackoffMs(it) }
for (i in 0 until values.size - 1) {
assertTrue(
values[i] < values[i + 1],
"Expected backoff[${i + 1}] (${values[i]}) < backoff[${i + 2}] (${values[i + 1]})",
)
}
}
}