diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/BluetoothInterface.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/BluetoothInterface.kt index 1ed6fb51c..9721e5661 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/radio/BluetoothInterface.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/radio/BluetoothInterface.kt @@ -19,6 +19,7 @@ package com.geeksville.mesh.repository.radio import android.annotation.SuppressLint import android.app.Application +import android.bluetooth.BluetoothGatt import android.bluetooth.BluetoothGattCharacteristic import android.bluetooth.BluetoothGattService import com.geeksville.mesh.concurrent.handledLaunch @@ -34,9 +35,13 @@ import dagger.assisted.Assisted import dagger.assisted.AssistedInject import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.stateIn import org.meshtastic.core.analytics.platform.PlatformAnalytics import org.meshtastic.core.model.util.anonymize import timber.log.Timber @@ -140,40 +145,61 @@ constructor( private lateinit var fromNum: BluetoothGattCharacteristic - // RSSI flow & polling job (null when unavailable / disconnected) - private val _rssiFlow = MutableStateFlow(null) - val rssiFlow: StateFlow = _rssiFlow + /** + * RSSI flow, which polls the remote device for RSSI only when there are active subscribers. The polling stops + * automatically when the last collector stops. + */ + val rssiFlow: StateFlow = + callbackFlow { + // Initial read for faster UI update + safe?.asyncReadRemoteRssi { first -> first.getOrNull()?.let { trySend(it) } } - @Volatile private var rssiPollingJob: Job? = null - - // Start polling RSSI every 5 seconds (immediate first read) - @Suppress("MagicNumber", "LoopWithTooManyJumpStatements") - private fun startRssiPolling() { - rssiPollingJob?.cancel() - val s = safe ?: return - // Immediate read for faster UI update - s.asyncReadRemoteRssi { first -> first.getOrNull()?.let { _rssiFlow.value = it } } - rssiPollingJob = - service.serviceScope.handledLaunch { - while (true) { - try { - delay(5000) - if (safe == null) break - safe?.asyncReadRemoteRssi { res -> res.getOrNull()?.let { _rssiFlow.value = it } } - } catch (ex: CancellationException) { - break - } catch (ex: Exception) { - Timber.d("RSSI polling error: ${ex.message}") + // Launch the polling loop on the service scope + @Suppress("LoopWithTooManyJumpStatements", "MagicNumber") + val pollingJob = + service.serviceScope.handledLaunch { + while (true) { + try { + delay(2500) // Poll every 5 seconds + safe?.asyncReadRemoteRssi { res -> res.getOrNull()?.let { trySend(it) } } + } catch (ex: CancellationException) { + break // Stop polling on cancellation + } catch (ex: Exception) { + Timber.d("RSSI polling error: ${ex.message}") + } } } - } - } - // Stop polling and clear current value - private fun stopRssiPolling() { - rssiPollingJob?.cancel() - rssiPollingJob = null - _rssiFlow.value = null + // This block executes when the last collector stops. + awaitClose { + pollingJob.cancel() + // Clear the value when the flow is closed (no active subscribers). + trySend(null) + } + } + .distinctUntilChanged() + .stateIn( + scope = service.serviceScope, + // Keep the polling running for 5 seconds after the last collector disappears + started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5000), + initialValue = null, + ) + + /** + * If we think we are connected, but we don't hear anything from the device, we might be in a zombie state. This + * function forces a read of a characteristic to see if we are really connected. + */ + override fun keepAlive() { + if (reconnectJob == null) { + // We are not currently trying to reconnect, so lets see if we are really connected + Timber.d("Bluetooth keep-alive, checking connection by reading fromNum") + // This will force a reconnect if the read fails + service.serviceScope.handledLaunch { + if (safe != null) { // if we are closing this will be null + doReadFromRadio(false) + } + } + } } /** @@ -240,7 +266,7 @@ constructor( /** We had some problem, schedule a reconnection attempt (if one isn't already queued) */ private fun scheduleReconnect(reason: String) { - stopRssiPolling() + // stopRssiPolling() is no longer needed, as flow management handles polling lifecycle if (reconnectJob == null) { Timber.w("Scheduling reconnect because $reason") reconnectJob = service.serviceScope.handledLaunch { retryDueToException() } @@ -431,7 +457,11 @@ constructor( service.serviceScope.handledLaunch { Timber.i("Connected to radio!") - startRssiPolling() + // The RSSI flow is now managed by its subscription count (WhileSubscribed) + + // After connecting, request a high connection priority for better stability + val success = safe?.gatt?.requestConnectionPriority(BluetoothGatt.CONNECTION_PRIORITY_HIGH) + Timber.d("Requested high connection priority: $success") if ( needForceRefresh @@ -472,7 +502,7 @@ constructor( override fun close() { reconnectJob?.cancel() // Cancel any queued reconnect attempts - stopRssiPolling() + // stopRssiPolling() is no longer needed, as flow management handles polling lifecycle if (safe != null) { Timber.i("Closing BluetoothInterface") diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/IRadioInterface.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/IRadioInterface.kt index f0f08f6da..7690bebea 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/radio/IRadioInterface.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/radio/IRadioInterface.kt @@ -21,5 +21,10 @@ import java.io.Closeable interface IRadioInterface : Closeable { fun handleSendToRadio(p: ByteArray) -} + /** + * If we think we are connected, but we don't hear anything from the device, we might be in a zombie state. This + * function can be implemented by interfaces to see if we are really connected. + */ + fun keepAlive() {} +} diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/RadioInterfaceService.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/RadioInterfaceService.kt index 2199f896d..46a5697b9 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/radio/RadioInterfaceService.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/radio/RadioInterfaceService.kt @@ -133,17 +133,22 @@ constructor( } companion object { - private const val HEARTBEAT_INTERVAL_MILLIS = 5 * 60 * 1000L + private const val HEARTBEAT_INTERVAL_MILLIS = 30 * 1000L } private var lastHeartbeatMillis = 0L fun keepAlive(now: Long = System.currentTimeMillis()) { if (now - lastHeartbeatMillis > HEARTBEAT_INTERVAL_MILLIS) { - Timber.i("Sending ToRadio heartbeat") - val heartbeat = - MeshProtos.ToRadio.newBuilder().setHeartbeat(MeshProtos.Heartbeat.getDefaultInstance()).build() - handleSendToRadio(heartbeat.toByteArray()) + if (radioIf is SerialInterface) { + Timber.i("Sending ToRadio heartbeat") + val heartbeat = + MeshProtos.ToRadio.newBuilder().setHeartbeat(MeshProtos.Heartbeat.getDefaultInstance()).build() + handleSendToRadio(heartbeat.toByteArray()) + } else { + // For BLE and TCP this will check if the connection is still alive + radioIf.keepAlive() + } lastHeartbeatMillis = now } }