feat(ble): Implement keep-alive and improve connection stability (#3359)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2025-10-06 14:45:11 -05:00 committed by GitHub
parent cc64abfc5c
commit c98e74d804
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 80 additions and 40 deletions

View file

@ -19,6 +19,7 @@ package com.geeksville.mesh.repository.radio
import android.annotation.SuppressLint
import android.app.Application
import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCharacteristic
import android.bluetooth.BluetoothGattService
import com.geeksville.mesh.concurrent.handledLaunch
@ -34,9 +35,13 @@ import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.stateIn
import org.meshtastic.core.analytics.platform.PlatformAnalytics
import org.meshtastic.core.model.util.anonymize
import timber.log.Timber
@ -140,40 +145,61 @@ constructor(
private lateinit var fromNum: BluetoothGattCharacteristic
// RSSI flow & polling job (null when unavailable / disconnected)
private val _rssiFlow = MutableStateFlow<Int?>(null)
val rssiFlow: StateFlow<Int?> = _rssiFlow
/**
* RSSI flow, which polls the remote device for RSSI only when there are active subscribers. The polling stops
* automatically when the last collector stops.
*/
val rssiFlow: StateFlow<Int?> =
callbackFlow {
// Initial read for faster UI update
safe?.asyncReadRemoteRssi { first -> first.getOrNull()?.let { trySend(it) } }
@Volatile private var rssiPollingJob: Job? = null
// Start polling RSSI every 5 seconds (immediate first read)
@Suppress("MagicNumber", "LoopWithTooManyJumpStatements")
private fun startRssiPolling() {
rssiPollingJob?.cancel()
val s = safe ?: return
// Immediate read for faster UI update
s.asyncReadRemoteRssi { first -> first.getOrNull()?.let { _rssiFlow.value = it } }
rssiPollingJob =
service.serviceScope.handledLaunch {
while (true) {
try {
delay(5000)
if (safe == null) break
safe?.asyncReadRemoteRssi { res -> res.getOrNull()?.let { _rssiFlow.value = it } }
} catch (ex: CancellationException) {
break
} catch (ex: Exception) {
Timber.d("RSSI polling error: ${ex.message}")
// Launch the polling loop on the service scope
@Suppress("LoopWithTooManyJumpStatements", "MagicNumber")
val pollingJob =
service.serviceScope.handledLaunch {
while (true) {
try {
delay(2500) // Poll every 5 seconds
safe?.asyncReadRemoteRssi { res -> res.getOrNull()?.let { trySend(it) } }
} catch (ex: CancellationException) {
break // Stop polling on cancellation
} catch (ex: Exception) {
Timber.d("RSSI polling error: ${ex.message}")
}
}
}
}
}
// Stop polling and clear current value
private fun stopRssiPolling() {
rssiPollingJob?.cancel()
rssiPollingJob = null
_rssiFlow.value = null
// This block executes when the last collector stops.
awaitClose {
pollingJob.cancel()
// Clear the value when the flow is closed (no active subscribers).
trySend(null)
}
}
.distinctUntilChanged()
.stateIn(
scope = service.serviceScope,
// Keep the polling running for 5 seconds after the last collector disappears
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5000),
initialValue = null,
)
/**
* If we think we are connected, but we don't hear anything from the device, we might be in a zombie state. This
* function forces a read of a characteristic to see if we are really connected.
*/
override fun keepAlive() {
if (reconnectJob == null) {
// We are not currently trying to reconnect, so lets see if we are really connected
Timber.d("Bluetooth keep-alive, checking connection by reading fromNum")
// This will force a reconnect if the read fails
service.serviceScope.handledLaunch {
if (safe != null) { // if we are closing this will be null
doReadFromRadio(false)
}
}
}
}
/**
@ -240,7 +266,7 @@ constructor(
/** We had some problem, schedule a reconnection attempt (if one isn't already queued) */
private fun scheduleReconnect(reason: String) {
stopRssiPolling()
// stopRssiPolling() is no longer needed, as flow management handles polling lifecycle
if (reconnectJob == null) {
Timber.w("Scheduling reconnect because $reason")
reconnectJob = service.serviceScope.handledLaunch { retryDueToException() }
@ -431,7 +457,11 @@ constructor(
service.serviceScope.handledLaunch {
Timber.i("Connected to radio!")
startRssiPolling()
// The RSSI flow is now managed by its subscription count (WhileSubscribed)
// After connecting, request a high connection priority for better stability
val success = safe?.gatt?.requestConnectionPriority(BluetoothGatt.CONNECTION_PRIORITY_HIGH)
Timber.d("Requested high connection priority: $success")
if (
needForceRefresh
@ -472,7 +502,7 @@ constructor(
override fun close() {
reconnectJob?.cancel() // Cancel any queued reconnect attempts
stopRssiPolling()
// stopRssiPolling() is no longer needed, as flow management handles polling lifecycle
if (safe != null) {
Timber.i("Closing BluetoothInterface")

View file

@ -21,5 +21,10 @@ import java.io.Closeable
interface IRadioInterface : Closeable {
fun handleSendToRadio(p: ByteArray)
}
/**
* If we think we are connected, but we don't hear anything from the device, we might be in a zombie state. This
* function can be implemented by interfaces to see if we are really connected.
*/
fun keepAlive() {}
}

View file

@ -133,17 +133,22 @@ constructor(
}
companion object {
private const val HEARTBEAT_INTERVAL_MILLIS = 5 * 60 * 1000L
private const val HEARTBEAT_INTERVAL_MILLIS = 30 * 1000L
}
private var lastHeartbeatMillis = 0L
fun keepAlive(now: Long = System.currentTimeMillis()) {
if (now - lastHeartbeatMillis > HEARTBEAT_INTERVAL_MILLIS) {
Timber.i("Sending ToRadio heartbeat")
val heartbeat =
MeshProtos.ToRadio.newBuilder().setHeartbeat(MeshProtos.Heartbeat.getDefaultInstance()).build()
handleSendToRadio(heartbeat.toByteArray())
if (radioIf is SerialInterface) {
Timber.i("Sending ToRadio heartbeat")
val heartbeat =
MeshProtos.ToRadio.newBuilder().setHeartbeat(MeshProtos.Heartbeat.getDefaultInstance()).build()
handleSendToRadio(heartbeat.toByteArray())
} else {
// For BLE and TCP this will check if the connection is still alive
radioIf.keepAlive()
}
lastHeartbeatMillis = now
}
}