From a3c0a4832d139b9ed15cc2fdd22d9d2c0bef1db4 Mon Sep 17 00:00:00 2001 From: James Rich <2199651+jamesarich@users.noreply.github.com> Date: Sat, 11 Apr 2026 17:56:29 -0500 Subject: [PATCH] fix(transport): Kable BLE audit + thread-safety, MQTT, and logging fixes across transport layers (#5071) --- .../org/meshtastic/app/di/NetworkModule.kt | 6 +- core/ble/build.gradle.kts | 5 +- .../core/ble/AndroidBluetoothRepository.kt | 19 +- .../meshtastic/core/ble/KablePlatformSetup.kt | 22 +- .../core/ble/ActiveBleConnection.kt | 11 +- .../org/meshtastic/core/ble/BleConnection.kt | 16 +- .../meshtastic/core/ble/BleConnectionState.kt | 49 +++- .../core/ble/BleExceptionClassifier.kt | 55 ++++ .../meshtastic/core/ble/DirectBleDevice.kt | 50 ---- .../meshtastic/core/ble/KableBleConnection.kt | 144 ++++++----- .../core/ble/KableBleConnectionFactory.kt | 6 + .../meshtastic/core/ble/KableBleScanner.kt | 15 +- .../core/ble/KableMeshtasticRadioProfile.kt | 113 ++++----- .../meshtastic/core/ble/KableStateMapping.kt | 39 ++- .../meshtastic/core/ble/KermitLogEngine.kt | 51 ++++ .../core/ble/MeshtasticBleConstants.kt | 2 - ...bleBleDevice.kt => MeshtasticBleDevice.kt} | 38 ++- .../core/ble/MeshtasticRadioProfile.kt | 18 ++ .../core/ble/BleExceptionClassifierTest.kt | 67 +++++ .../core/ble/DisconnectReasonTest.kt | 51 ++++ .../ble/KableMeshtasticRadioProfileTest.kt | 129 ++++++++++ .../core/ble/KableStateMappingTest.kt | 143 +++++++++++ .../data/manager/MeshConfigFlowManagerImpl.kt | 4 +- core/network/build.gradle.kts | 1 + .../core/network/radio/InterfaceFactory.kt | 16 +- .../core/network/radio/SerialInterface.kt | 11 +- .../core/network/radio/SerialInterfaceSpec.kt | 13 +- ...rfaceFactorySpi.kt => KermitHttpLogger.kt} | 28 ++- .../core/network/radio/BleRadioInterface.kt | 238 +++++++++--------- .../core/network/radio/StreamInterface.kt | 8 +- .../network/repository/MQTTRepositoryImpl.kt | 36 ++- .../network/radio/BleRadioInterfaceTest.kt | 19 +- .../network/radio/ReconnectBackoffTest.kt | 17 +- .../core/network/radio/TCPInterface.kt | 7 +- .../core/network/transport/TcpTransport.kt | 32 ++- .../org/meshtastic/core/testing/FakeBle.kt | 10 +- desktop/build.gradle.kts | 1 + .../desktop/di/DesktopKoinModule.kt | 15 +- .../feature/firmware/ota/BleOtaTransport.kt | 39 +-- .../feature/firmware/ota/BleScanSupport.kt | 6 +- .../firmware/ota/dfu/SecureDfuTransport.kt | 45 ++-- .../ota/dfu/SecureDfuTransportTest.kt | 4 +- .../wifiprovision/NymeaBleConstants.kt | 10 +- .../wifiprovision/domain/NymeaWifiService.kt | 27 +- 44 files changed, 1123 insertions(+), 513 deletions(-) create mode 100644 core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleExceptionClassifier.kt delete mode 100644 core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/DirectBleDevice.kt create mode 100644 core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KermitLogEngine.kt rename core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/{KableBleDevice.kt => MeshtasticBleDevice.kt} (53%) create mode 100644 core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/BleExceptionClassifierTest.kt create mode 100644 core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/DisconnectReasonTest.kt create mode 100644 core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfileTest.kt create mode 100644 core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/KableStateMappingTest.kt rename core/network/src/commonMain/kotlin/org/meshtastic/core/network/{radio/InterfaceFactorySpi.kt => KermitHttpLogger.kt} (50%) diff --git a/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt b/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt index 7f6fb0215..4aa27bf0e 100644 --- a/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt +++ b/app/src/main/kotlin/org/meshtastic/app/di/NetworkModule.kt @@ -40,6 +40,7 @@ import okio.Path.Companion.toOkioPath import org.koin.core.annotation.Module import org.koin.core.annotation.Single import org.meshtastic.core.common.BuildConfigProvider +import org.meshtastic.core.network.KermitHttpLogger private const val DISK_CACHE_PERCENT = 0.02 private const val MEMORY_CACHE_PERCENT = 0.25 @@ -84,7 +85,10 @@ class NetworkModule { HttpClient(engineFactory = Android) { install(plugin = ContentNegotiation) { json(json) } if (buildConfigProvider.isDebug) { - install(plugin = Logging) { level = LogLevel.BODY } + install(plugin = Logging) { + logger = KermitHttpLogger + level = LogLevel.BODY + } } } } diff --git a/core/ble/build.gradle.kts b/core/ble/build.gradle.kts index b61fad0e7..d26431634 100644 --- a/core/ble/build.gradle.kts +++ b/core/ble/build.gradle.kts @@ -46,7 +46,10 @@ kotlin { implementation(libs.jetbrains.lifecycle.runtime) } - commonTest.dependencies { implementation(libs.kotlinx.coroutines.test) } + commonTest.dependencies { + implementation(libs.kotlinx.coroutines.test) + implementation(projects.core.testing) + } val androidHostTest by getting { dependencies { diff --git a/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/AndroidBluetoothRepository.kt b/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/AndroidBluetoothRepository.kt index c8d444688..5b17e264b 100644 --- a/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/AndroidBluetoothRepository.kt +++ b/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/AndroidBluetoothRepository.kt @@ -49,7 +49,7 @@ class AndroidBluetoothRepository( private val _state = MutableStateFlow(BluetoothState(hasPermissions = hasBluetoothPermissions())) override val state: StateFlow = _state.asStateFlow() - private val deviceCache = mutableMapOf() + private val deviceCache = mutableMapOf() init { processLifecycle.coroutineScope.launch(dispatchers.default) { updateBluetoothState() } @@ -180,14 +180,15 @@ class AndroidBluetoothRepository( // user renamed the device in firmware since the cache was populated. deviceCache.keys.retainAll(bondedAddresses) return bonded.map { device -> - deviceCache - .getOrPut(device.address) { DirectBleDevice(device.address, device.name) } - .also { cached -> - // Refresh name if it changed (firmware rename, etc.) - if (cached.name != device.name) { - deviceCache[device.address] = DirectBleDevice(device.address, device.name) - } - } + val cached = deviceCache.getOrPut(device.address) { MeshtasticBleDevice(device.address, device.name) } + // If the name changed (firmware rename, etc.), replace the cached entry and return the new one. + if (cached.name != device.name) { + val updated = MeshtasticBleDevice(device.address, device.name) + deviceCache[device.address] = updated + updated + } else { + cached + } } } diff --git a/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/KablePlatformSetup.kt b/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/KablePlatformSetup.kt index e9928f8d5..b0617635a 100644 --- a/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/KablePlatformSetup.kt +++ b/core/ble/src/androidMain/kotlin/org/meshtastic/core/ble/KablePlatformSetup.kt @@ -20,15 +20,29 @@ import co.touchlab.kermit.Logger import com.juul.kable.AndroidPeripheral import com.juul.kable.Peripheral import com.juul.kable.PeripheralBuilder +import com.juul.kable.PooledThreadingStrategy import com.juul.kable.toIdentifier +/** + * Shared thread pool for Kable BLE connections. + * + * [PooledThreadingStrategy] reuses handler threads across reconnect cycles, avoiding the overhead of creating a new + * thread per connection attempt that [OnDemandThreadingStrategy][com.juul.kable.OnDemandThreadingStrategy] incurs. Idle + * threads are evicted after 1 minute (default). + * + * A single app-wide instance is used because Kable recommends exactly one pool per application. + */ +private val sharedThreadingStrategy = PooledThreadingStrategy() + internal actual fun PeripheralBuilder.platformConfig(device: BleDevice, autoConnect: () -> Boolean) { - // If we're connecting blindly to a bonded device without a fresh scan (DirectBleDevice), - // we MUST use autoConnect = true. Otherwise, Android's direct connect algorithm will often fail - // immediately with GATT 133 or timeout, especially if the device uses random resolvable addresses. - // If we just scanned the device (KableBleDevice), direct connection (autoConnect = false) is faster. + // Bonded devices without a fresh advertisement must use autoConnect = true. Otherwise, + // Android's direct connect algorithm often fails with GATT 133 or times out, especially + // if the device uses random resolvable addresses. Scanned devices (advertisement != null) + // use direct connection (autoConnect = false) for faster initial connects. autoConnectIf(autoConnect) + threadingStrategy = sharedThreadingStrategy + onServicesDiscovered { try { // Android defaults to 23 bytes MTU. Meshtastic packets can be 512 bytes. diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/ActiveBleConnection.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/ActiveBleConnection.kt index 1bfaff648..1ea11622d 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/ActiveBleConnection.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/ActiveBleConnection.kt @@ -19,14 +19,17 @@ package org.meshtastic.core.ble import com.juul.kable.Peripheral import kotlin.concurrent.Volatile +/** Snapshot of the currently active BLE peripheral and its address, updated atomically. */ +internal data class ActiveConnection(val peripheral: Peripheral, val address: String) + /** * A simple global tracker for the currently active BLE connection. This resolves instance mismatch issues between * dynamically created UI devices (scanned vs bonded) and the actual connection. * - * Fields are volatile to ensure visibility across AIDL binder threads and coroutine dispatchers. + * [active] is a single volatile reference so readers always see a consistent peripheral/address pair — the previous + * two-field design (`activePeripheral` + `activeAddress`) was susceptible to TOCTOU races when fields were updated + * non-atomically. */ internal object ActiveBleConnection { - @Volatile var activePeripheral: Peripheral? = null - - @Volatile var activeAddress: String? = null + @Volatile var active: ActiveConnection? = null } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnection.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnection.kt index 06496aeea..59cf134de 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnection.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnection.kt @@ -19,6 +19,7 @@ package org.meshtastic.core.ble import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.onStart import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds import kotlin.uuid.Uuid @@ -49,8 +50,8 @@ interface BleConnection { /** Connects to the given [BleDevice]. */ suspend fun connect(device: BleDevice) - /** Connects to the given [BleDevice] and waits for a terminal state. */ - suspend fun connectAndAwait(device: BleDevice, timeoutMs: Long): BleConnectionState + /** Connects to the given [BleDevice] and waits for a terminal state or [timeout]. */ + suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState /** Disconnects from the current device. */ suspend fun disconnect() @@ -77,6 +78,17 @@ interface BleService { /** Observes notifications/indications from the characteristic. */ fun observe(characteristic: BleCharacteristic): Flow + /** + * Observes notifications/indications from the characteristic with an [onSubscription] action that fires **after** + * notifications are enabled (CCCD written). + * + * The [onSubscription] is re-invoked on every reconnect while the returned [Flow] is active. The default + * implementation invokes [onSubscription] eagerly on flow start so non-Kable implementations still signal + * readiness. + */ + fun observe(characteristic: BleCharacteristic, onSubscription: suspend () -> Unit): Flow = + observe(characteristic).onStart { onSubscription() } + /** Reads the characteristic value once. */ suspend fun read(characteristic: BleCharacteristic): ByteArray diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnectionState.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnectionState.kt index a9f82c5f9..2026b0cb1 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnectionState.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleConnectionState.kt @@ -17,16 +17,53 @@ package org.meshtastic.core.ble /** Represents the state of a BLE connection. */ -sealed class BleConnectionState { - /** The peripheral is disconnected. */ - object Disconnected : BleConnectionState() +sealed interface BleConnectionState { + + /** + * The peripheral is disconnected. + * + * @param reason why the disconnect occurred. [DisconnectReason.Unknown] when the platform doesn't provide status + * information (e.g. JavaScript) or when the disconnect was synthesised locally without a GATT callback. + */ + data class Disconnected(val reason: DisconnectReason = DisconnectReason.Unknown) : BleConnectionState /** The peripheral is connecting. */ - object Connecting : BleConnectionState() + data object Connecting : BleConnectionState /** The peripheral is connected. */ - object Connected : BleConnectionState() + data object Connected : BleConnectionState /** The peripheral is disconnecting. */ - object Disconnecting : BleConnectionState() + data object Disconnecting : BleConnectionState +} + +/** + * Platform-agnostic reason for a BLE disconnect. + * + * Mapped from Kable's [com.juul.kable.State.Disconnected.Status] in `KableStateMapping`. + */ +sealed interface DisconnectReason { + /** Cause is unknown or the platform did not report one. */ + data object Unknown : DisconnectReason + + /** The local app/central initiated the disconnect. */ + data object LocalDisconnect : DisconnectReason + + /** The remote peripheral (firmware) initiated the disconnect. */ + data object RemoteDisconnect : DisconnectReason + + /** A connection attempt failed to establish. */ + data object ConnectionFailed : DisconnectReason + + /** The BLE link supervision timed out (device went out of range). */ + data object Timeout : DisconnectReason + + /** The connection was explicitly cancelled. */ + data object Cancelled : DisconnectReason + + /** An encryption or authentication failure occurred. */ + data object EncryptionFailed : DisconnectReason + + /** Platform-specific status code that doesn't map to a known reason. */ + data class PlatformSpecific(val code: Int) : DisconnectReason } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleExceptionClassifier.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleExceptionClassifier.kt new file mode 100644 index 000000000..6f5180b60 --- /dev/null +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/BleExceptionClassifier.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +@file:Suppress("MatchingDeclarationName") // File groups the classifier function and its result type. + +package org.meshtastic.core.ble + +import com.juul.kable.GattRequestRejectedException +import com.juul.kable.GattStatusException +import com.juul.kable.NotConnectedException +import com.juul.kable.UnmetRequirementException + +/** + * Classification of a BLE-layer exception for the transport layer to act on. + * + * @property isPermanent `true` if the condition won't resolve without user intervention (e.g. Bluetooth disabled). + * @property gattStatus the platform GATT status code when available (Android-specific). + * @property message a human-readable description of the failure. + */ +data class BleExceptionInfo(val isPermanent: Boolean, val gattStatus: Int? = null, val message: String) + +/** + * Inspects this [Throwable] and returns a [BleExceptionInfo] if it is a known Kable exception, or `null` if it is + * unrelated to the BLE layer. + * + * This keeps Kable type knowledge inside `core:ble` so that `core:network` (and other consumers) can classify BLE + * exceptions without depending on Kable directly. + */ +fun Throwable.classifyBleException(): BleExceptionInfo? = when (this) { + is GattStatusException -> + BleExceptionInfo( + isPermanent = false, + gattStatus = status, + message = "GATT error (status $status): $message", + ) + is NotConnectedException -> BleExceptionInfo(isPermanent = false, message = "Not connected") + is GattRequestRejectedException -> + BleExceptionInfo(isPermanent = false, message = "GATT request rejected (busy)") + is UnmetRequirementException -> + BleExceptionInfo(isPermanent = true, message = message ?: "Bluetooth LE unavailable") + else -> null +} diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/DirectBleDevice.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/DirectBleDevice.kt deleted file mode 100644 index 9e32e4602..000000000 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/DirectBleDevice.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (c) 2026 Meshtastic LLC - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.meshtastic.core.ble - -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asStateFlow - -/** Represents a BLE device known by address only (e.g. from bonded list) without an active advertisement. */ -class DirectBleDevice(override val address: String, override val name: String? = null) : BleDevice { - private val _state = MutableStateFlow(BleConnectionState.Disconnected) - override val state: StateFlow = _state.asStateFlow() - - override val isBonded: Boolean = true - - override val isConnected: Boolean - get() = _state.value is BleConnectionState.Connected || ActiveBleConnection.activeAddress == address - - @OptIn(com.juul.kable.ExperimentalApi::class) - override suspend fun readRssi(): Int { - val peripheral = ActiveBleConnection.activePeripheral - return if (peripheral != null && ActiveBleConnection.activeAddress == address) { - peripheral.rssi() - } else { - 0 - } - } - - override suspend fun bond() { - // DirectBleDevice assumes we are already bonded. - } - - fun updateState(newState: BleConnectionState) { - _state.value = newState - } -} diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnection.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnection.kt index 5265127c1..dde1955a5 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnection.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnection.kt @@ -18,9 +18,11 @@ package org.meshtastic.core.ble import co.touchlab.kermit.Logger import com.juul.kable.Peripheral +import com.juul.kable.PeripheralBuilder import com.juul.kable.State import com.juul.kable.WriteType import com.juul.kable.characteristicOf +import com.juul.kable.logs.Logging import com.juul.kable.writeWithoutResponse import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope @@ -30,7 +32,6 @@ import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.launchIn @@ -39,6 +40,7 @@ import kotlinx.coroutines.job import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds import kotlin.uuid.Uuid /** [BleService] implementation backed by a Kable [Peripheral] for a specific GATT service. */ @@ -50,6 +52,9 @@ class KableBleService(private val peripheral: Peripheral, private val serviceUui override fun observe(characteristic: BleCharacteristic) = peripheral.observe(characteristicOf(serviceUuid, characteristic.uuid)) + override fun observe(characteristic: BleCharacteristic, onSubscription: suspend () -> Unit) = + peripheral.observe(characteristicOf(serviceUuid, characteristic.uuid), onSubscription) + override suspend fun read(characteristic: BleCharacteristic): ByteArray = peripheral.read(characteristicOf(serviceUuid, characteristic.uuid)) @@ -78,8 +83,11 @@ class KableBleService(private val peripheral: Peripheral, private val serviceUui /** * [BleConnection] implementation using Kable for cross-platform BLE communication. * - * Manages peripheral lifecycle (connect with exponential backoff, disconnect, reconnect), connection state tracking, - * and GATT service profile access. + * Manages peripheral lifecycle, connection state tracking, and GATT service profile access. + * + * Connection attempts follow Kable's recommended pattern from the SensorTag sample: try a direct connect first, then + * fall back to `autoConnect = true` on failure. Only two attempts are made per [connect] call — the caller + * ([BleRadioInterface]) owns the macro-level retry/backoff loop. */ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { @@ -88,10 +96,8 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { private var connectionScope: CoroutineScope? = null companion object { - private const val INITIAL_RETRY_DELAY_MS = 1000L - private const val MAX_RETRY_DELAY_MS = 30_000L - private const val MAX_CONNECT_RETRIES = 15 - private const val BACKOFF_MULTIPLIER = 2 + /** Settle delay between a direct connect failure and the autoConnect fallback attempt. */ + private val AUTOCONNECT_FALLBACK_DELAY = 1.seconds } private val _deviceFlow = MutableSharedFlow(replay = 1) @@ -108,47 +114,32 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { ) override val connectionState: SharedFlow = _connectionState.asSharedFlow() - @Suppress("LongMethod", "CyclomaticComplexMethod") + @Suppress("CyclomaticComplexMethod", "LongMethod") override suspend fun connect(device: BleDevice) { - val autoConnect = MutableStateFlow(device is DirectBleDevice) + val meshtasticDevice = device as? MeshtasticBleDevice ?: error("Unsupported BleDevice type: ${device::class}") + var autoConnect = meshtasticDevice.advertisement == null + + /** Applies logging, observation exception handling, and platform config shared by both peripheral types. */ + fun PeripheralBuilder.commonConfig() { + logging { + engine = KermitLogEngine + level = Logging.Level.Events + identifier = device.address + } + observationExceptionHandler { cause -> + Logger.w(cause) { "[${device.address}] Observation failure suppressed" } + } + platformConfig(device) { autoConnect } + } val p = - when (device) { - is KableBleDevice -> - Peripheral(device.advertisement) { - observationExceptionHandler { cause -> - Logger.w(cause) { "[${device.address}] Observation failure suppressed" } - } - platformConfig(device) { autoConnect.value } - } - is DirectBleDevice -> - createPeripheral(device.address) { - observationExceptionHandler { cause -> - Logger.w(cause) { "[${device.address}] Observation failure suppressed" } - } - platformConfig(device) { autoConnect.value } - } - else -> error("Unsupported BleDevice type: ${device::class}") - } + meshtasticDevice.advertisement?.let { adv -> Peripheral(adv) { commonConfig() } } + ?: createPeripheral(device.address) { commonConfig() } - // Clean up previous peripheral under NonCancellable to prevent GATT resource leaks - // if the calling coroutine is cancelled during teardown. - withContext(NonCancellable) { - try { - peripheral?.disconnect() - } catch (@Suppress("TooGenericExceptionCaught") e: Exception) { - Logger.w(e) { "[${device.address}] Failed to disconnect previous peripheral" } - } - try { - peripheral?.close() - } catch (@Suppress("TooGenericExceptionCaught") e: Exception) { - Logger.w(e) { "[${device.address}] Failed to close previous peripheral" } - } - } + cleanUpPeripheral(device.address) peripheral = p - ActiveBleConnection.activePeripheral = p - ActiveBleConnection.activeAddress = device.address + ActiveBleConnection.active = ActiveConnection(p, device.address) _deviceFlow.emit(device) @@ -162,21 +153,15 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { hasStartedConnecting = true } - when (device) { - is KableBleDevice -> device.updateState(mappedState) - is DirectBleDevice -> device.updateState(mappedState) - } + meshtasticDevice.updateState(mappedState) _connectionState.emit(mappedState) } .launchIn(scope) - var retryCount = 0 - var retryDelayMs = INITIAL_RETRY_DELAY_MS while (p.state.value !is State.Connected) { - autoConnect.value = + autoConnect = try { - // Cancel any previous connectionScope to avoid leaking the old coroutine scope. connectionScope?.let { oldScope -> Logger.d { "[${device.address}] Cancelling previous connectionScope before reconnect" } oldScope.coroutineContext.job.cancel() @@ -185,52 +170,50 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { false } catch (e: CancellationException) { throw e - } catch (@Suppress("TooGenericExceptionCaught", "SwallowedException") _: Exception) { - retryCount++ - if (retryCount > MAX_CONNECT_RETRIES) { - Logger.w { "[${device.address}] Max connect retries ($MAX_CONNECT_RETRIES) exceeded" } - _connectionState.emit(BleConnectionState.Disconnected) - return + } catch (@Suppress("TooGenericExceptionCaught", "SwallowedException") e: Exception) { + if (autoConnect) { + // autoConnect already true and still failed — don't loop forever. + Logger.w { "[${device.address}] autoConnect attempt failed, giving up" } + _connectionState.emit(BleConnectionState.Disconnected(DisconnectReason.ConnectionFailed)) + throw e } - Logger.d { "[${device.address}] Connect retry $retryCount, backoff ${retryDelayMs}ms" } - delay(retryDelayMs) - retryDelayMs = (retryDelayMs * BACKOFF_MULTIPLIER).coerceAtMost(MAX_RETRY_DELAY_MS) + Logger.d { "[${device.address}] Direct connect failed, falling back to autoConnect" } + delay(AUTOCONNECT_FALLBACK_DELAY) true } } } @Suppress("TooGenericExceptionCaught", "SwallowedException") - override suspend fun connectAndAwait(device: BleDevice, timeoutMs: Long): BleConnectionState = try { - withTimeout(timeoutMs) { + override suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState = try { + withTimeout(timeout) { connect(device) BleConnectionState.Connected } } catch (_: TimeoutCancellationException) { // Our own timeout expired — treat as a failed attempt so callers can retry. - BleConnectionState.Disconnected + BleConnectionState.Disconnected(DisconnectReason.Timeout) } catch (e: CancellationException) { // External cancellation (scope closed) — must propagate. throw e } catch (_: Exception) { - BleConnectionState.Disconnected + BleConnectionState.Disconnected(DisconnectReason.ConnectionFailed) } override suspend fun disconnect() = withContext(NonCancellable) { // Emit Disconnected before cancelling stateJob so downstream collectors see the // state transition. If we cancel stateJob first, the peripheral's state flow // emission of Disconnected is never forwarded to _connectionState. - _connectionState.emit(BleConnectionState.Disconnected) + _connectionState.emit(BleConnectionState.Disconnected(DisconnectReason.LocalDisconnect)) stateJob?.cancel() stateJob = null - peripheral?.disconnect() - peripheral?.close() + + safeClosePeripheral("disconnect") peripheral = null connectionScope = null - ActiveBleConnection.activePeripheral = null - ActiveBleConnection.activeAddress = null + ActiveBleConnection.active = null _deviceFlow.emit(null) } @@ -247,4 +230,29 @@ class KableBleConnection(private val scope: CoroutineScope) : BleConnection { } override fun maximumWriteValueLength(writeType: BleWriteType): Int? = peripheral?.negotiatedMaxWriteLength() + + /** Ensures the previous peripheral's GATT resources are fully released. */ + private suspend fun cleanUpPeripheral(tag: String) { + withContext(NonCancellable) { safeClosePeripheral(tag) } + } + + /** + * Safely disconnects and closes the current [peripheral], logging any failures. + * + * Kable requires `close()` to release broadcast receivers on Android (Kable issue #359). Separate try/catch blocks + * ensure `close()` always runs even if `disconnect()` throws. + */ + @Suppress("TooGenericExceptionCaught") + private suspend fun safeClosePeripheral(tag: String) { + try { + peripheral?.disconnect() + } catch (e: Exception) { + Logger.w(e) { "[$tag] Failed to disconnect peripheral" } + } + try { + peripheral?.close() + } catch (e: Exception) { + Logger.w(e) { "[$tag] Failed to close peripheral" } + } + } } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnectionFactory.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnectionFactory.kt index d0f3a7168..13b8a1663 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnectionFactory.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleConnectionFactory.kt @@ -21,5 +21,11 @@ import org.koin.core.annotation.Single @Single class KableBleConnectionFactory : BleConnectionFactory { + /** + * Creates a new [KableBleConnection]. + * + * [tag] is unused because Kable's own log identifier is set per-peripheral inside [KableBleConnection.connect] + * using the device address, which provides more precise context than a factory-time tag. + */ override fun create(scope: CoroutineScope, tag: String): BleConnection = KableBleConnection(scope) } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleScanner.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleScanner.kt index d9e27704f..5e91b3459 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleScanner.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleScanner.kt @@ -17,6 +17,7 @@ package org.meshtastic.core.ble import com.juul.kable.Scanner +import com.juul.kable.logs.Logging import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.withTimeoutOrNull @@ -28,6 +29,10 @@ import kotlin.uuid.Uuid class KableBleScanner : BleScanner { override fun scan(timeout: Duration, serviceUuid: Uuid?, address: String?): Flow { val scanner = Scanner { + logging { + engine = KermitLogEngine + level = Logging.Level.Events + } // Use separate match blocks so each filter is evaluated independently (OR semantics). // Combining address and service UUID in a single match{} creates an AND filter which // silently drops results on OEM stacks (Samsung, Xiaomi) when the device uses a @@ -43,7 +48,15 @@ class KableBleScanner : BleScanner { // By wrapping it in a channelFlow with a timeout, we enforce the BleScanner contract cleanly. return channelFlow { withTimeoutOrNull(timeout) { - scanner.advertisements.collect { advertisement -> send(KableBleDevice(advertisement)) } + scanner.advertisements.collect { advertisement -> + send( + MeshtasticBleDevice( + address = advertisement.identifier.toString(), + name = advertisement.name, + advertisement = advertisement, + ), + ) + } } } } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfile.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfile.kt index 46ace854f..3f0e61864 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfile.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfile.kt @@ -18,110 +18,101 @@ package org.meshtastic.core.ble import co.touchlab.kermit.Logger import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.launch import org.meshtastic.core.ble.MeshtasticBleConstants.FROMNUM_CHARACTERISTIC -import org.meshtastic.core.ble.MeshtasticBleConstants.FROMRADIOSYNC_CHARACTERISTIC import org.meshtastic.core.ble.MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC import org.meshtastic.core.ble.MeshtasticBleConstants.LOGRADIO_CHARACTERISTIC import org.meshtastic.core.ble.MeshtasticBleConstants.TORADIO_CHARACTERISTIC +import kotlin.time.Duration.Companion.milliseconds /** * [MeshtasticRadioProfile] implementation using Kable BLE characteristics. * - * Supports both the modern `FROMRADIOSYNC` characteristic (single observe stream) and the legacy `FROMNUM` + - * `FROMRADIO` polling fallback for older firmware versions. + * Uses the standard Meshtastic BLE protocol: FROMNUM notifications trigger polling reads on the FROMRADIO + * characteristic. The firmware gates FROMNUM notifications behind `STATE_SEND_PACKETS`, so during the config handshake + * we seed the drain trigger to poll proactively. */ class KableMeshtasticRadioProfile(private val service: BleService) : MeshtasticRadioProfile { private val toRadio = service.characteristic(TORADIO_CHARACTERISTIC) private val fromRadioChar = service.characteristic(FROMRADIO_CHARACTERISTIC) - private val fromRadioSync = service.characteristic(FROMRADIOSYNC_CHARACTERISTIC) private val fromNum = service.characteristic(FROMNUM_CHARACTERISTIC) private val logRadioChar = service.characteristic(LOGRADIO_CHARACTERISTIC) companion object { - private const val TRANSIENT_RETRY_DELAY_MS = 500L + private val TRANSIENT_RETRY_DELAY = 500.milliseconds } - // replay = 1: a seed emission placed here before the collector starts is replayed to the - // collector immediately on subscription. This is what drives the initial FROMRADIO poll - // during the config-handshake phase, where the firmware suppresses FROMNUM notifications - // (it only emits them in STATE_SEND_PACKETS). Without the initial replay the entire config - // stream would be silently skipped on devices that lack FROMRADIOSYNC. + private val subscriptionReady = CompletableDeferred() + + /** Seed with replay=1 so the config-handshake drain starts before FROMNUM notifications are gated in. */ private val triggerDrain = MutableSharedFlow(replay = 1, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST) - // Using observe() for fromRadioSync or legacy read loop for fromRadio @Suppress("TooGenericExceptionCaught", "SwallowedException") override val fromRadio: Flow = channelFlow { - // Try to observe FROMRADIOSYNC if available. If it fails, fallback to FROMNUM/FROMRADIO. - // This mirrors the robust fallback logic originally established in the legacy Android Nordic implementation. launch { - try { - if (service.hasCharacteristic(fromRadioSync)) { - service.observe(fromRadioSync).collect { send(it) } - } else { - error("fromRadioSync missing") - } - } catch (e: CancellationException) { - throw e - } catch (_: Exception) { - // Fallback to legacy FROMNUM/FROMRADIO polling. - // Wire up FROMNUM notifications for steady-state packet delivery. - launch { - if (service.hasCharacteristic(fromNum)) { - service.observe(fromNum).collect { triggerDrain.tryEmit(Unit) } + if (service.hasCharacteristic(fromNum)) { + service + .observe(fromNum) { + Logger.d { "FROMNUM CCCD written — notifications enabled" } + subscriptionReady.complete(Unit) } - } - // Seed the replay buffer so the collector below starts draining immediately. - // The firmware does NOT send FROMNUM notifications during the config handshake - // (it gates them on STATE_SEND_PACKETS). Without this seed the entire config - // stream would never be read on devices that lack FROMRADIOSYNC. - triggerDrain.tryEmit(Unit) - triggerDrain.collect { - var keepReading = true - while (keepReading) { - try { - if (!service.hasCharacteristic(fromRadioChar)) { - keepReading = false - continue - } - val packet = service.read(fromRadioChar) - if (packet.isEmpty()) keepReading = false else send(packet) - } catch (e: CancellationException) { - throw e - } catch (e: Exception) { - Logger.w(e) { "FROMRADIO read error, pausing before next drain trigger" } - keepReading = false - // Don't permanently stop — the next triggerDrain emission will retry. - delay(TRANSIENT_RETRY_DELAY_MS) - } + .collect { triggerDrain.tryEmit(Unit) } + } else { + subscriptionReady.complete(Unit) + } + } + triggerDrain.tryEmit(Unit) + triggerDrain.collect { + var keepReading = true + while (keepReading) { + try { + if (!service.hasCharacteristic(fromRadioChar)) { + keepReading = false + continue } + val packet = service.read(fromRadioChar) + if (packet.isEmpty()) keepReading = false else send(packet) + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + Logger.w(e) { "FROMRADIO read error, pausing before next drain trigger" } + keepReading = false + delay(TRANSIENT_RETRY_DELAY) } } } } - @Suppress("TooGenericExceptionCaught", "SwallowedException") - override val logRadio: Flow = channelFlow { - try { - if (service.hasCharacteristic(logRadioChar)) { - service.observe(logRadioChar).collect { send(it) } + override val logRadio: Flow = + if (service.hasCharacteristic(logRadioChar)) { + service.observe(logRadioChar).catch { e -> + if (e is CancellationException) throw e + // logRadio is optional — swallow observation errors silently. } - } catch (e: CancellationException) { - throw e - } catch (_: Exception) { - // logRadio is optional, ignore if not found + } else { + emptyFlow() } - } override suspend fun sendToRadio(packet: ByteArray) { service.write(toRadio, packet, service.preferredWriteType(toRadio)) triggerDrain.tryEmit(Unit) } + + override fun requestDrain() { + triggerDrain.tryEmit(Unit) + } + + override suspend fun awaitSubscriptionReady() { + subscriptionReady.await() + } } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableStateMapping.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableStateMapping.kt index 7a03a3d89..4bd395dc5 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableStateMapping.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableStateMapping.kt @@ -25,14 +25,33 @@ import com.juul.kable.State * state emitted by StateFlow upon subscription. * @return the mapped [BleConnectionState], or null if the state should be ignored. */ -fun State.toBleConnectionState(hasStartedConnecting: Boolean): BleConnectionState? { - return when (this) { - is State.Connecting -> BleConnectionState.Connecting - is State.Connected -> BleConnectionState.Connected - is State.Disconnecting -> BleConnectionState.Disconnecting - is State.Disconnected -> { - if (!hasStartedConnecting) return null - BleConnectionState.Disconnected - } - } +fun State.toBleConnectionState(hasStartedConnecting: Boolean): BleConnectionState? = when (this) { + is State.Connecting -> BleConnectionState.Connecting + is State.Connected -> BleConnectionState.Connected + is State.Disconnecting -> BleConnectionState.Disconnecting + is State.Disconnected -> + if (hasStartedConnecting) BleConnectionState.Disconnected(status.toDisconnectReason()) else null +} + +/** + * Maps Kable's [State.Disconnected.Status] to [DisconnectReason]. + * + * Groups platform-specific GATT/CBError codes into broad categories that the reconnect logic can act on without leaking + * platform details. + */ +fun State.Disconnected.Status?.toDisconnectReason(): DisconnectReason = when (this) { + null -> DisconnectReason.Unknown + State.Disconnected.Status.CentralDisconnected -> DisconnectReason.LocalDisconnect + State.Disconnected.Status.PeripheralDisconnected -> DisconnectReason.RemoteDisconnect + State.Disconnected.Status.Failed, + State.Disconnected.Status.L2CapFailure, + -> DisconnectReason.ConnectionFailed + State.Disconnected.Status.Timeout, + State.Disconnected.Status.LinkManagerProtocolTimeout, + -> DisconnectReason.Timeout + State.Disconnected.Status.Cancelled -> DisconnectReason.Cancelled + State.Disconnected.Status.EncryptionTimedOut -> DisconnectReason.EncryptionFailed + State.Disconnected.Status.ConnectionLimitReached -> DisconnectReason.ConnectionFailed + State.Disconnected.Status.UnknownDevice -> DisconnectReason.ConnectionFailed + is State.Disconnected.Status.Unknown -> DisconnectReason.PlatformSpecific(status) } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KermitLogEngine.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KermitLogEngine.kt new file mode 100644 index 000000000..6884dc9e1 --- /dev/null +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KermitLogEngine.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.ble + +import co.touchlab.kermit.Logger +import com.juul.kable.logs.LogEngine + +/** + * Bridges Kable's internal logging to [Kermit][Logger] so BLE lifecycle events (connect, disconnect, subscribe, GATT + * operations) appear in the standard app logs rather than going to [System.out] via Kable's default + * [com.juul.kable.logs.SystemLogEngine]. + */ +internal object KermitLogEngine : LogEngine { + override fun verbose(throwable: Throwable?, tag: String, message: String) { + Logger.v(throwable) { "[$tag] $message" } + } + + override fun debug(throwable: Throwable?, tag: String, message: String) { + Logger.d(throwable) { "[$tag] $message" } + } + + override fun info(throwable: Throwable?, tag: String, message: String) { + Logger.i(throwable) { "[$tag] $message" } + } + + override fun warn(throwable: Throwable?, tag: String, message: String) { + Logger.w(throwable) { "[$tag] $message" } + } + + override fun error(throwable: Throwable?, tag: String, message: String) { + Logger.e(throwable) { "[$tag] $message" } + } + + override fun assert(throwable: Throwable?, tag: String, message: String) { + Logger.e(throwable) { "[$tag] $message" } + } +} diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticBleConstants.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticBleConstants.kt index 389516521..f69214187 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticBleConstants.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticBleConstants.kt @@ -38,8 +38,6 @@ object MeshtasticBleConstants { /** Characteristic for receiving log notifications from the radio. */ val LOGRADIO_CHARACTERISTIC: Uuid = Uuid.parse("5a3d6e49-06e6-4423-9944-e9de8cdf9547") - val FROMRADIOSYNC_CHARACTERISTIC: Uuid = Uuid.parse("888a50c3-982d-45db-9963-c7923769165d") - // --- OTA Characteristics --- /** The Meshtastic OTA service UUID (ESP32 Unified OTA). */ diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleDevice.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticBleDevice.kt similarity index 53% rename from core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleDevice.kt rename to core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticBleDevice.kt index 455779937..eb2ee2129 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/KableBleDevice.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticBleDevice.kt @@ -19,30 +19,41 @@ package org.meshtastic.core.ble import com.juul.kable.Advertisement import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow -class KableBleDevice(val advertisement: Advertisement) : BleDevice { - override val name: String? - get() = advertisement.name +/** + * Unified [BleDevice] implementation for all BLE devices — scanned, bonded, or both. + * + * When created from a live BLE scan, [advertisement] is populated and used for optimal peripheral construction via + * `Peripheral(advertisement)`. When created from the OS bonded device list (address only), [advertisement] is `null` + * and the peripheral is constructed via `createPeripheral(address)` with `autoConnect = true`. + * + * @param address The device's MAC address (or platform identifier string). + * @param name The device's display name, if known. + * @param advertisement The Kable [Advertisement] from a live scan, or `null` for bonded-only devices. + */ +class MeshtasticBleDevice( + override val address: String, + override val name: String? = null, + val advertisement: Advertisement? = null, +) : BleDevice { - override val address: String - get() = advertisement.identifier.toString() - - private val _state = MutableStateFlow(BleConnectionState.Disconnected) - override val state: StateFlow = _state + private val _state = MutableStateFlow(BleConnectionState.Disconnected()) + override val state: StateFlow = _state.asStateFlow() // Bonding is handled by the OS pairing dialog on Android; on desktop Kable connects directly. override val isBonded: Boolean = true override val isConnected: Boolean - get() = _state.value is BleConnectionState.Connected || ActiveBleConnection.activeAddress == address + get() = _state.value is BleConnectionState.Connected || ActiveBleConnection.active?.address == address @OptIn(com.juul.kable.ExperimentalApi::class) override suspend fun readRssi(): Int { - val peripheral = ActiveBleConnection.activePeripheral - return if (peripheral != null && ActiveBleConnection.activeAddress == address) { - peripheral.rssi() + val active = ActiveBleConnection.active + return if (active != null && active.address == address) { + active.peripheral.rssi() } else { - advertisement.rssi + advertisement?.rssi ?: 0 } } @@ -50,6 +61,7 @@ class KableBleDevice(val advertisement: Advertisement) : BleDevice { // No-op: bonding is OS-managed on Android and not required on desktop. } + /** Updates the tracked connection state. Called by [KableBleConnection] when the peripheral state changes. */ internal fun updateState(newState: BleConnectionState) { _state.value = newState } diff --git a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticRadioProfile.kt b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticRadioProfile.kt index d1a557a42..7a69e9524 100644 --- a/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticRadioProfile.kt +++ b/core/ble/src/commonMain/kotlin/org/meshtastic/core/ble/MeshtasticRadioProfile.kt @@ -28,4 +28,22 @@ interface MeshtasticRadioProfile { /** Sends a packet to the radio. */ suspend fun sendToRadio(packet: ByteArray) + + /** + * Requests a drain of the FROMRADIO characteristic without writing to TORADIO. + * + * This is useful when the firmware has queued a response (e.g. `queueStatus` after a heartbeat) but did not send a + * FROMNUM notification. Without an explicit drain trigger the response would sit unread until the next unrelated + * FROMNUM notification arrives. + */ + fun requestDrain() {} + + /** + * Suspends until GATT notifications are enabled (CCCD written) for the primary observation characteristic. + * + * Callers should await this before triggering the Meshtastic handshake (`want_config_id`) to guarantee that FROMNUM + * notifications will be delivered. The default implementation returns immediately for profiles where CCCD readiness + * is not observable (e.g. fakes and non-BLE transports). + */ + suspend fun awaitSubscriptionReady() {} } diff --git a/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/BleExceptionClassifierTest.kt b/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/BleExceptionClassifierTest.kt new file mode 100644 index 000000000..1170b973b --- /dev/null +++ b/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/BleExceptionClassifierTest.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.ble + +import com.juul.kable.GattStatusException +import com.juul.kable.NotConnectedException +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue + +/** + * Tests for [classifyBleException] — the boundary between Kable types and the transport layer. + * + * [GattRequestRejectedException] and [UnmetRequirementException] have `internal` constructors in Kable, so they cannot + * be instantiated from outside the library. The `else -> null` branch covers the fallback for any unrecognised + * throwable. + */ +class BleExceptionClassifierTest { + + @Test + fun `GattStatusException maps to non-permanent with status code`() { + val ex = GattStatusException(message = "GATT failure", status = 133) + val info = ex.classifyBleException() + assertNotNull(info) + assertFalse(info.isPermanent) + assertEquals(133, info.gattStatus) + assertTrue(info.message.contains("133")) + } + + @Test + fun `NotConnectedException maps to non-permanent without status code`() { + val ex = NotConnectedException("disconnected") + val info = ex.classifyBleException() + assertNotNull(info) + assertFalse(info.isPermanent) + assertNull(info.gattStatus) + assertEquals("Not connected", info.message) + } + + @Test + fun `unrelated exception returns null`() { + val ex = IllegalStateException("something else") + assertNull(ex.classifyBleException()) + } + + @Test + fun `RuntimeException returns null`() { + assertNull(RuntimeException("boom").classifyBleException()) + } +} diff --git a/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/DisconnectReasonTest.kt b/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/DisconnectReasonTest.kt new file mode 100644 index 000000000..d947dd04d --- /dev/null +++ b/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/DisconnectReasonTest.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.ble + +import kotlin.test.Test +import kotlin.test.assertContains +import kotlin.test.assertEquals + +/** Tests for [DisconnectReason] and [BleConnectionState.Disconnected]. */ +class DisconnectReasonTest { + + @Test + @Suppress("MagicNumber") + fun `PlatformSpecific toString includes status code`() { + val reason = DisconnectReason.PlatformSpecific(133) + val str = reason.toString() + assertContains(str, "133", message = "PlatformSpecific.toString() should include the status code") + } + + @Test + fun `Disconnected default reason is Unknown`() { + val state = BleConnectionState.Disconnected() + assertEquals(DisconnectReason.Unknown, state.reason) + } + + @Test + fun `Disconnected preserves explicit reason`() { + val state = BleConnectionState.Disconnected(DisconnectReason.Timeout) + assertEquals(DisconnectReason.Timeout, state.reason) + } + + @Test + fun `data object reasons are singletons`() { + assertEquals(DisconnectReason.Unknown, DisconnectReason.Unknown) + assertEquals(DisconnectReason.LocalDisconnect, DisconnectReason.LocalDisconnect) + } +} diff --git a/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfileTest.kt b/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfileTest.kt new file mode 100644 index 000000000..8068c9387 --- /dev/null +++ b/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/KableMeshtasticRadioProfileTest.kt @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.ble + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.async +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import org.meshtastic.core.testing.FakeBleService +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +/** + * Tests for [KableMeshtasticRadioProfile] — the GATT characteristic orchestration layer. + * + * Uses [FakeBleService] from `core:testing`. Since [FakeBleService] inherits the default [BleService.observe] overload + * (which invokes `onSubscription` via `onStart`), `awaitSubscriptionReady()` completes immediately — matching the + * behaviour expected from non-Kable implementations. + */ +@OptIn(ExperimentalCoroutinesApi::class) +class KableMeshtasticRadioProfileTest { + + private fun createService(): FakeBleService = FakeBleService().apply { + addCharacteristic(MeshtasticBleConstants.FROMNUM_CHARACTERISTIC) + addCharacteristic(MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC) + addCharacteristic(MeshtasticBleConstants.TORADIO_CHARACTERISTIC) + } + + @Test + fun `awaitSubscriptionReady completes when using FakeBleService`() = runTest { + val service = createService() + val profile = KableMeshtasticRadioProfile(service) + + // Start collecting fromRadio to activate the observe() flow (which triggers onSubscription) + val collectJob = launch { profile.fromRadio.first() } + advanceUntilIdle() + + // Should not hang — FakeBleService's default observe(char, onSubscription) fires onSubscription eagerly + profile.awaitSubscriptionReady() + + collectJob.cancel() + } + + @Test + fun `sendToRadio writes to TORADIO and triggers drain`() = runTest { + val service = createService() + val profile = KableMeshtasticRadioProfile(service) + val testData = byteArrayOf(1, 2, 3) + + // Enqueue empty read so the drain loop terminates + service.enqueueRead(MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC, ByteArray(0)) + + profile.sendToRadio(testData) + + assertEquals(1, service.writes.size) + assertTrue(service.writes[0].data.contentEquals(testData)) + } + + @Test + fun `fromRadio emits packets from FROMRADIO reads`() = runTest { + val service = createService() + val profile = KableMeshtasticRadioProfile(service) + + val packet1 = byteArrayOf(10, 20, 30) + service.enqueueRead(MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC, packet1) + // Empty read terminates the drain loop + service.enqueueRead(MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC, ByteArray(0)) + + val received = async { profile.fromRadio.first() } + advanceUntilIdle() + + assertTrue(received.await().contentEquals(packet1)) + } + + @Test + fun `requestDrain triggers additional FROMRADIO reads`() = runTest { + val service = createService() + val profile = KableMeshtasticRadioProfile(service) + + val received = mutableListOf() + + // Start the fromRadio collector + val collectJob = launch { profile.fromRadio.collect { received.add(it) } } + advanceUntilIdle() + + // First drain should have completed (initial seed) with nothing queued. + // Now enqueue a packet and trigger a manual drain. + val latePacket = byteArrayOf(99) + service.enqueueRead(MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC, latePacket) + service.enqueueRead(MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC, ByteArray(0)) + profile.requestDrain() + advanceUntilIdle() + + assertEquals(1, received.size) + assertTrue(received[0].contentEquals(latePacket)) + + collectJob.cancel() + } + + @Test + fun `MeshtasticRadioProfile default awaitSubscriptionReady returns immediately`() = runTest { + val profile = + object : MeshtasticRadioProfile { + override val fromRadio = kotlinx.coroutines.flow.emptyFlow() + override val logRadio = kotlinx.coroutines.flow.emptyFlow() + + override suspend fun sendToRadio(packet: ByteArray) {} + } + // Should not hang — default implementation is a no-op + profile.awaitSubscriptionReady() + } +} diff --git a/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/KableStateMappingTest.kt b/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/KableStateMappingTest.kt new file mode 100644 index 000000000..18c7be4da --- /dev/null +++ b/core/ble/src/commonTest/kotlin/org/meshtastic/core/ble/KableStateMappingTest.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.ble + +import com.juul.kable.State +import kotlinx.coroutines.test.TestScope +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNull + +/** Tests for [toBleConnectionState] and [toDisconnectReason] mappings. */ +class KableStateMappingTest { + + // --- toBleConnectionState --- + + @Test + fun `Connecting maps to BleConnectionState Connecting`() { + val result = State.Connecting.Bluetooth.toBleConnectionState(hasStartedConnecting = false) + assertIs(result) + } + + @Test + fun `Connected maps to BleConnectionState Connected`() { + val scope = TestScope() + val result = State.Connected(scope).toBleConnectionState(hasStartedConnecting = true) + assertIs(result) + } + + @Test + fun `Disconnecting maps to BleConnectionState Disconnecting`() { + val result = State.Disconnecting.toBleConnectionState(hasStartedConnecting = true) + assertIs(result) + } + + @Test + fun `Disconnected before connecting started returns null`() { + val result = State.Disconnected(status = null).toBleConnectionState(hasStartedConnecting = false) + assertNull(result) + } + + @Test + fun `Disconnected after connecting started maps with reason`() { + val result = + State.Disconnected(State.Disconnected.Status.Timeout).toBleConnectionState(hasStartedConnecting = true) + assertIs(result) + assertEquals(DisconnectReason.Timeout, result.reason) + } + + // --- toDisconnectReason --- + + @Test + fun `null status maps to Unknown`() { + assertEquals(DisconnectReason.Unknown, null.toDisconnectReason()) + } + + @Test + fun `CentralDisconnected maps to LocalDisconnect`() { + assertEquals( + DisconnectReason.LocalDisconnect, + State.Disconnected.Status.CentralDisconnected.toDisconnectReason(), + ) + } + + @Test + fun `PeripheralDisconnected maps to RemoteDisconnect`() { + assertEquals( + DisconnectReason.RemoteDisconnect, + State.Disconnected.Status.PeripheralDisconnected.toDisconnectReason(), + ) + } + + @Test + fun `Failed maps to ConnectionFailed`() { + assertEquals(DisconnectReason.ConnectionFailed, State.Disconnected.Status.Failed.toDisconnectReason()) + } + + @Test + fun `Timeout maps to Timeout`() { + assertEquals(DisconnectReason.Timeout, State.Disconnected.Status.Timeout.toDisconnectReason()) + } + + @Test + fun `LinkManagerProtocolTimeout maps to Timeout`() { + assertEquals( + DisconnectReason.Timeout, + State.Disconnected.Status.LinkManagerProtocolTimeout.toDisconnectReason(), + ) + } + + @Test + fun `Cancelled maps to Cancelled`() { + assertEquals(DisconnectReason.Cancelled, State.Disconnected.Status.Cancelled.toDisconnectReason()) + } + + @Test + fun `EncryptionTimedOut maps to EncryptionFailed`() { + assertEquals( + DisconnectReason.EncryptionFailed, + State.Disconnected.Status.EncryptionTimedOut.toDisconnectReason(), + ) + } + + @Test + fun `L2CapFailure maps to ConnectionFailed`() { + assertEquals(DisconnectReason.ConnectionFailed, State.Disconnected.Status.L2CapFailure.toDisconnectReason()) + } + + @Test + fun `ConnectionLimitReached maps to ConnectionFailed`() { + assertEquals( + DisconnectReason.ConnectionFailed, + State.Disconnected.Status.ConnectionLimitReached.toDisconnectReason(), + ) + } + + @Test + fun `UnknownDevice maps to ConnectionFailed`() { + assertEquals(DisconnectReason.ConnectionFailed, State.Disconnected.Status.UnknownDevice.toDisconnectReason()) + } + + @Test + @Suppress("MagicNumber") + fun `Unknown status maps to PlatformSpecific with code`() { + val result = State.Disconnected.Status.Unknown(status = 42).toDisconnectReason() + assertIs(result) + assertEquals(42, result.code) + } +} diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt index f492dcd65..dc544a300 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshConfigFlowManagerImpl.kt @@ -84,7 +84,7 @@ class MeshConfigFlowManagerImpl( * [rawMyNodeInfo] arrives first (my_info packet); [metadata] may arrive shortly after. Both are consumed * together by [buildMyNodeInfo] at Stage 1 completion. */ - data class ReceivingConfig(val rawMyNodeInfo: ProtoMyNodeInfo, var metadata: DeviceMetadata? = null) : + data class ReceivingConfig(val rawMyNodeInfo: ProtoMyNodeInfo, val metadata: DeviceMetadata? = null) : HandshakeState() /** @@ -231,7 +231,7 @@ class MeshConfigFlowManagerImpl( Logger.i { "Local Metadata received: ${metadata.firmware_version}" } val state = handshakeState if (state is HandshakeState.ReceivingConfig) { - state.metadata = metadata + handshakeState = state.copy(metadata = metadata) // Persist the metadata immediately — buildMyNodeInfo() reads it at Stage 1 complete, // but the DB write does not need to wait until then. if (metadata != DeviceMetadata()) { diff --git a/core/network/build.gradle.kts b/core/network/build.gradle.kts index 1c0d14a01..c3dc2ffd5 100644 --- a/core/network/build.gradle.kts +++ b/core/network/build.gradle.kts @@ -45,6 +45,7 @@ kotlin { implementation(libs.kotlinx.serialization.json) implementation(libs.ktor.client.core) implementation(libs.ktor.client.content.negotiation) + implementation(libs.ktor.client.logging) implementation(libs.ktor.serialization.kotlinx.json) implementation(libs.kermit) implementation(libs.jetbrains.lifecycle.runtime) diff --git a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/InterfaceFactory.kt b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/InterfaceFactory.kt index f33cedfae..b070ba013 100644 --- a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/InterfaceFactory.kt +++ b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/InterfaceFactory.kt @@ -36,14 +36,14 @@ class InterfaceFactory( ) { internal val nopInterface by lazy { nopInterfaceFactory.create("") } - private val specMap: Map> - get() = - mapOf( - InterfaceId.MOCK to mockSpec.value, - InterfaceId.NOP to NopInterfaceSpec(nopInterfaceFactory), - InterfaceId.SERIAL to serialSpec.value, - InterfaceId.TCP to tcpSpec.value, - ) + private val specMap: Map> by lazy { + mapOf( + InterfaceId.MOCK to mockSpec.value, + InterfaceId.NOP to NopInterfaceSpec(nopInterfaceFactory), + InterfaceId.SERIAL to serialSpec.value, + InterfaceId.TCP to tcpSpec.value, + ) + } fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = "${interfaceId.id}$rest" diff --git a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/SerialInterface.kt b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/SerialInterface.kt index e57c4a446..6c843caee 100644 --- a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/SerialInterface.kt +++ b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/SerialInterface.kt @@ -38,19 +38,14 @@ class SerialInterface( connect() } - override fun onDeviceDisconnect(waitForStopped: Boolean) { + override fun onDeviceDisconnect(waitForStopped: Boolean, isPermanent: Boolean) { connRef.get()?.close(waitForStopped) - super.onDeviceDisconnect(waitForStopped) + super.onDeviceDisconnect(waitForStopped, isPermanent) } override fun connect() { val deviceMap = usbRepository.serialDevices.value - val device = - if (deviceMap.containsKey(address)) { - deviceMap[address]!! - } else { - deviceMap.map { (_, driver) -> driver }.firstOrNull() - } + val device = deviceMap[address] ?: deviceMap.values.firstOrNull() if (device == null) { Logger.e { "[$address] Serial device not found at address" } } else { diff --git a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/SerialInterfaceSpec.kt b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/SerialInterfaceSpec.kt index 8597fd060..f510be3bb 100644 --- a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/SerialInterfaceSpec.kt +++ b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/SerialInterfaceSpec.kt @@ -33,19 +33,12 @@ class SerialInterfaceSpec( factory.create(rest, service) override fun addressValid(rest: String): Boolean { - usbRepository.serialDevices.value.filterValues { usbManager.hasPermission(it.device) } - findSerial(rest)?.let { d -> - return usbManager.hasPermission(d.device) - } - return false + val driver = findSerial(rest) ?: return false + return usbManager.hasPermission(driver.device) } internal fun findSerial(rest: String): UsbSerialDriver? { val deviceMap = usbRepository.serialDevices.value - return if (deviceMap.containsKey(rest)) { - deviceMap[rest]!! - } else { - deviceMap.map { (_, driver) -> driver }.firstOrNull() - } + return deviceMap[rest] ?: deviceMap.values.firstOrNull() } } diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/InterfaceFactorySpi.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/KermitHttpLogger.kt similarity index 50% rename from core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/InterfaceFactorySpi.kt rename to core/network/src/commonMain/kotlin/org/meshtastic/core/network/KermitHttpLogger.kt index 5354f5500..cabeb977a 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/InterfaceFactorySpi.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/KermitHttpLogger.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025-2026 Meshtastic LLC + * Copyright (c) 2026 Meshtastic LLC * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -14,17 +14,27 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -package org.meshtastic.core.network.radio +package org.meshtastic.core.network -import org.meshtastic.core.repository.RadioTransport +import co.touchlab.kermit.Logger +import io.ktor.client.plugins.logging.Logger as KtorLogger /** - * Radio interface factory service provider interface. Each radio backend implementation needs to have a factory to - * create new instances. These instances are specific to a particular address. This interface defines a common API - * across all radio interfaces for obtaining implementation instances. + * Bridges Ktor's HTTP client logging to [Kermit][Logger] so HTTP request/response events appear in the standard app + * logs rather than going to [System.out] via Ktor's default [io.ktor.client.plugins.logging.Logger.DEFAULT]. * - * This is primarily used in conjunction with Dagger assisted injection for each backend interface type. + * Usage: + * ``` + * HttpClient(engine) { + * install(Logging) { + * logger = KermitHttpLogger + * level = LogLevel.HEADERS + * } + * } + * ``` */ -interface InterfaceFactorySpi { - fun create(rest: String): T +object KermitHttpLogger : KtorLogger { + override fun log(message: String) { + Logger.d { message } + } } diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioInterface.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioInterface.kt index 9942eec87..2eda52102 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioInterface.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/BleRadioInterface.kt @@ -45,7 +45,9 @@ import org.meshtastic.core.ble.BleDevice import org.meshtastic.core.ble.BleScanner import org.meshtastic.core.ble.BleWriteType import org.meshtastic.core.ble.BluetoothRepository +import org.meshtastic.core.ble.DisconnectReason import org.meshtastic.core.ble.MeshtasticBleConstants.SERVICE_UUID +import org.meshtastic.core.ble.classifyBleException import org.meshtastic.core.ble.retryBleOperation import org.meshtastic.core.ble.toMeshtasticRadioProfile import org.meshtastic.core.common.util.nowMillis @@ -57,18 +59,23 @@ import org.meshtastic.proto.ToRadio import kotlin.concurrent.Volatile import kotlin.concurrent.atomics.AtomicInt import kotlin.concurrent.atomics.ExperimentalAtomicApi +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds private const val SCAN_RETRY_COUNT = 3 -private const val SCAN_RETRY_DELAY_MS = 1000L -private const val CONNECTION_TIMEOUT_MS = 15_000L +private val SCAN_RETRY_DELAY = 1.seconds +private val CONNECTION_TIMEOUT = 15.seconds private const val RECONNECT_FAILURE_THRESHOLD = 3 -private const val RECONNECT_BASE_DELAY_MS = 5_000L -private const val RECONNECT_MAX_DELAY_MS = 60_000L +private val RECONNECT_BASE_DELAY = 5.seconds +private val RECONNECT_MAX_DELAY = 60.seconds private const val RECONNECT_MAX_FAILURES = 10 +/** Settle delay before each connection attempt to let the Android BLE stack finish any pending disconnect cleanup. */ +private val SETTLE_DELAY = 1.seconds + /** - * Minimum milliseconds a BLE connection must stay up before we consider it "stable" and reset + * Minimum time a BLE connection must stay up before we consider it "stable" and reset * [BleRadioInterface.consecutiveFailures]. Without this, a device at the edge of BLE range can repeatedly connect for a * fraction of a second and drop — each brief connection resets the failure counter so [RECONNECT_FAILURE_THRESHOLD] is * never reached, and the app never signals [ConnectionState.DeviceSleep]. @@ -76,24 +83,29 @@ private const val RECONNECT_MAX_FAILURES = 10 * The value (5 s) is long enough that only connections that survive past the initial GATT setup are treated as genuine, * but short enough that normal reconnects after light-sleep still reset the counter promptly. */ -private const val MIN_STABLE_CONNECTION_MS = 5_000L +private val MIN_STABLE_CONNECTION = 5.seconds /** - * Returns the reconnect backoff delay in milliseconds for a given consecutive failure count. + * Returns the reconnect backoff delay for a given consecutive failure count. * * Backoff schedule: 1 failure → 5 s 2 failures → 10 s 3 failures → 20 s 4 failures → 40 s 5+ failures → 60 s (capped) */ -internal fun computeReconnectBackoffMs(consecutiveFailures: Int): Long { - if (consecutiveFailures <= 0) return RECONNECT_BASE_DELAY_MS - return minOf(RECONNECT_BASE_DELAY_MS * (1L shl (consecutiveFailures - 1).coerceAtMost(4)), RECONNECT_MAX_DELAY_MS) +internal fun computeReconnectBackoff(consecutiveFailures: Int): Duration { + if (consecutiveFailures <= 0) return RECONNECT_BASE_DELAY + val multiplier = 1 shl (consecutiveFailures - 1).coerceAtMost(4) + return minOf(RECONNECT_BASE_DELAY * multiplier, RECONNECT_MAX_DELAY) } -// Milliseconds to wait after launching characteristic observations before triggering the -// Meshtastic handshake. Both fromRadio and logRadio observation flows write the CCCD -// asynchronously via Kable's GATT queue. Without this settle window the want_config_id -// burst from the radio can arrive before notifications are enabled, causing the first -// handshake attempt to look like a stall. -private const val CCCD_SETTLE_MS = 50L +/** + * Delay after writing a heartbeat before re-polling FROMRADIO. + * + * The ESP32 firmware processes TORADIO writes asynchronously (NimBLE callback → FreeRTOS main task queue → + * `handleToRadio()` → `heartbeatReceived = true`). The immediate drain trigger in + * [KableMeshtasticRadioProfile.sendToRadio] fires before this completes, so the `queueStatus` response is not yet + * available. 200 ms is well above observed ESP32 task scheduling latency (~10–50 ms) while remaining imperceptible to + * the user. + */ +private val HEARTBEAT_DRAIN_DELAY = 200.milliseconds private val SCAN_TIMEOUT = 5.seconds private val GATT_CLEANUP_TIMEOUT = 5.seconds @@ -120,7 +132,7 @@ class BleRadioInterface( private val bluetoothRepository: BluetoothRepository, private val connectionFactory: BleConnectionFactory, private val service: RadioInterfaceService, - val address: String, + internal val address: String, ) : RadioTransport { private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> @@ -143,11 +155,15 @@ class BleRadioInterface( private val bleConnection: BleConnection = connectionFactory.create(connectionScope, address) private val writeMutex: Mutex = Mutex() - private var connectionStartTime: Long = 0 - private var packetsReceived: Int = 0 - private var packetsSent: Int = 0 - private var bytesReceived: Long = 0 - private var bytesSent: Long = 0 + @Volatile private var connectionStartTime: Long = 0 + + @Volatile private var packetsReceived: Int = 0 + + @Volatile private var packetsSent: Int = 0 + + @Volatile private var bytesReceived: Long = 0 + + @Volatile private var bytesSent: Long = 0 @Volatile private var isFullyConnected = false private var connectionJob: Job? = null @@ -186,7 +202,7 @@ class BleRadioInterface( } if (attempt < SCAN_RETRY_COUNT - 1) { - delay(SCAN_RETRY_DELAY_MS) + delay(SCAN_RETRY_DELAY) } } @@ -199,23 +215,18 @@ class BleRadioInterface( connectionScope.launch { while (isActive) { try { - // Allow any pending background disconnects to complete and the Android BLE stack - // to settle before we attempt a new connection. - @Suppress("MagicNumber") - val connectDelayMs = 1000L - delay(connectDelayMs) + // Settle delay: let the Android BLE stack finish any pending + // disconnect cleanup before starting a new connection attempt. + delay(SETTLE_DELAY) connectionStartTime = nowMillis Logger.i { "[$address] BLE connection attempt started" } val device = findDevice() - // Ensure the device is bonded before connecting. On Android, the - // firmware may require an encrypted link (pairing mode != NO_PIN). - // Without an explicit bond the GATT connection will fail with - // insufficient-authentication (status 5) or the dreaded status 133. - // On Desktop/JVM this is a no-op since the OS handles pairing during - // the GATT connection when the peripheral requires it. + // Bond before connecting: firmware may require an encrypted link, + // and without a bond Android fails with status 5 or 133. + // No-op on Desktop/JVM where the OS handles pairing automatically. if (!bluetoothRepository.isBonded(address)) { Logger.i { "[$address] Device not bonded, initiating bonding" } @Suppress("TooGenericExceptionCaught") @@ -227,36 +238,26 @@ class BleRadioInterface( } } - var state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT_MS) - - if (state !is BleConnectionState.Connected) { - // Kable on Android occasionally fails the first connection attempt with - // NotConnectedException if the previous peripheral wasn't fully cleaned - // up by the OS. A quick retry resolves it. - Logger.d { "[$address] First connection attempt failed, retrying in 1.5s" } - @Suppress("MagicNumber") - delay(1500L) - state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT_MS) - } + val state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT) if (state !is BleConnectionState.Connected) { throw RadioNotConnectedException("Failed to connect to device at address $address") } - // Connection succeeded — only reset the failure counter if the - // connection stays up long enough. See MIN_STABLE_CONNECTION_MS. + // Only reset failures if connection was stable (see MIN_STABLE_CONNECTION). val gattConnectedAt = nowMillis isFullyConnected = true onConnected() - // Use coroutineScope so that the connectionState listener is scoped to this - // iteration only. When the inner scope exits (on disconnect), the listener is - // cancelled automatically before the next reconnect cycle starts a fresh one. + // Scope the connectionState listener to this iteration so it's + // cancelled automatically before the next reconnect cycle. + var disconnectReason: DisconnectReason = DisconnectReason.Unknown coroutineScope { bleConnection.connectionState .onEach { s -> if (s is BleConnectionState.Disconnected && isFullyConnected) { isFullyConnected = false + disconnectReason = s.reason onDisconnected() } } @@ -265,27 +266,30 @@ class BleRadioInterface( discoverServicesAndSetupCharacteristics() - // Suspend here until Kable drops the connection bleConnection.connectionState.first { it is BleConnectionState.Disconnected } } - Logger.i { "[$address] BLE connection dropped, preparing to reconnect" } + Logger.i { + "[$address] BLE connection dropped (reason: $disconnectReason), preparing to reconnect" + } - // Only reset the failure counter if the connection was stable (lasted - // longer than MIN_STABLE_CONNECTION_MS). A connection that drops within - // seconds typically means the device is at the edge of BLE range or - // powered off — the Android BLE stack may briefly "connect" to a cached - // GATT profile before realising the device is gone. Without this guard, - // the failure counter resets on every brief connect, preventing us from - // ever reaching RECONNECT_FAILURE_THRESHOLD and signalling DeviceSleep. - val connectionUptime = nowMillis - gattConnectedAt - if (connectionUptime >= MIN_STABLE_CONNECTION_MS) { + // Skip failure counting for intentional disconnects. + if (disconnectReason is DisconnectReason.LocalDisconnect) { + consecutiveFailures = 0 + continue + } + + // A connection that drops almost immediately (< MIN_STABLE_CONNECTION) + // is treated as a failure — the BLE stack may have "connected" to a + // cached GATT profile before realising the device is gone. + val connectionUptime = (nowMillis - gattConnectedAt).milliseconds + if (connectionUptime >= MIN_STABLE_CONNECTION) { consecutiveFailures = 0 } else { consecutiveFailures++ Logger.w { - "[$address] Connection lasted only ${connectionUptime}ms " + - "(< ${MIN_STABLE_CONNECTION_MS}ms) — treating as failure " + + "[$address] Connection lasted only $connectionUptime " + + "(< $MIN_STABLE_CONNECTION) — treating as failure " + "(consecutive failures: $consecutiveFailures)" } if (consecutiveFailures >= RECONNECT_MAX_FAILURES) { @@ -307,16 +311,14 @@ class BleRadioInterface( Logger.d { "[$address] BLE connection coroutine cancelled" } throw e } catch (e: Exception) { - val failureTime = nowMillis - connectionStartTime + val failureTime = (nowMillis - connectionStartTime).milliseconds consecutiveFailures++ Logger.w(e) { - "[$address] Failed to connect to device after ${failureTime}ms " + + "[$address] Failed to connect to device after $failureTime " + "(consecutive failures: $consecutiveFailures)" } - // After exceeding the max failure limit, give up permanently to stop - // draining battery on a device that is genuinely offline. The user - // must manually reconnect from the connections screen. + // Give up permanently to stop draining battery. if (consecutiveFailures >= RECONNECT_MAX_FAILURES) { Logger.e { "[$address] Giving up after $consecutiveFailures consecutive failures" } val (_, msg) = e.toDisconnectReason() @@ -324,18 +326,14 @@ class BleRadioInterface( return@launch } - // At the failure threshold, signal DeviceSleep so - // MeshConnectionManagerImpl can start its sleep timeout. + // Signal DeviceSleep so MeshConnectionManagerImpl starts its sleep timeout. if (consecutiveFailures >= RECONNECT_FAILURE_THRESHOLD) { handleFailure(e) } - // Exponential backoff: 5s → 10s → 20s → 40s → capped at 60s. - // Reduces BLE stack pressure and battery drain when the device is genuinely - // out of range, while still recovering quickly from transient drops. - val backoffMs = computeReconnectBackoffMs(consecutiveFailures) - Logger.d { "[$address] Retrying in ${backoffMs}ms (failure #$consecutiveFailures)" } - delay(backoffMs) + val backoff = computeReconnectBackoff(consecutiveFailures) + Logger.d { "[$address] Retrying in $backoff (failure #$consecutiveFailures)" } + delay(backoff) } } } @@ -354,23 +352,8 @@ class BleRadioInterface( private fun onDisconnected() { radioService = null - - val uptime = - if (connectionStartTime > 0) { - nowMillis - connectionStartTime - } else { - 0 - } - Logger.i { - "[$address] BLE disconnected - " + - "Uptime: ${uptime}ms, " + - "Packets RX: $packetsReceived ($bytesReceived bytes), " + - "Packets TX: $packetsSent ($bytesSent bytes)" - } - // Signal DeviceSleep immediately so the UI reflects the disconnect while the - // reconnect loop continues in the background. The previous approach suppressed - // this signal until RECONNECT_FAILURE_THRESHOLD consecutive failures, leaving the - // UI stuck on "Connected" for 35+ seconds after the device disappeared. + Logger.i { "[$address] BLE disconnected - ${formatSessionStats()}" } + // Signal immediately so the UI reflects the disconnect while reconnect continues. service.onDisconnect(isPermanent = false) } @@ -379,7 +362,6 @@ class BleRadioInterface( bleConnection.profile(serviceUuid = SERVICE_UUID) { service -> val radioService = service.toMeshtasticRadioProfile() - // Wire up notifications radioService.fromRadio .onEach { packet -> Logger.v { "[$address] Received packet fromRadio (${packet.size} bytes)" } @@ -402,16 +384,12 @@ class BleRadioInterface( } .launchIn(this) - // Store reference for handleSendToRadio this@BleRadioInterface.radioService = radioService Logger.i { "[$address] Profile service active and characteristics subscribed" } - // Give Kable's async CCCD writes time to complete before triggering the - // Meshtastic handshake. The fromRadio/logRadio observation flows register - // notifications through the GATT queue asynchronously. Without this settle - // window, the want_config_id burst arrives before notifications are enabled. - delay(CCCD_SETTLE_MS) + // Wait for FROMNUM CCCD write before triggering the Meshtastic handshake. + radioService.awaitSubscriptionReady() // Log negotiated MTU for diagnostics val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE) @@ -421,10 +399,8 @@ class BleRadioInterface( } } catch (e: Exception) { Logger.w(e) { "[$address] Profile service discovery or operation failed" } - // Ensure the peripheral is disconnected so the outer reconnect loop sees a clean - // Disconnected state. Do NOT call handleFailure here — the reconnect loop tracks - // consecutive failures and calls handleFailure after RECONNECT_FAILURE_THRESHOLD, - // preventing premature onDisconnect signals to the service on transient errors. + // Disconnect to let the outer reconnect loop see a clean Disconnected state. + // Do NOT call handleFailure here — the reconnect loop owns failure counting. try { bleConnection.disconnect() } catch (ignored: Exception) { @@ -481,25 +457,25 @@ class BleRadioInterface( val nonce = heartbeatNonce.fetchAndAdd(1) Logger.v { "[$address] BLE keepAlive — sending ToRadio heartbeat (nonce=$nonce)" } handleSendToRadio(ToRadio(heartbeat = Heartbeat(nonce = nonce)).encode()) + + // The firmware responds to heartbeats by queuing a `queueStatus` FromRadio packet + // on the next getFromRadio() call, but it does NOT send a FROMNUM notification for + // it. The immediate drain trigger in sendToRadio() fires before the ESP32's async + // task queue has processed the heartbeat, so the response sits unread. Schedule a + // delayed re-drain to pick it up. + connectionScope.launch { + delay(HEARTBEAT_DRAIN_DELAY) + radioService?.requestDrain() + } } /** Closes the connection to the device. */ override fun close() { - val uptime = if (connectionStartTime > 0) nowMillis - connectionStartTime else 0 - Logger.i { - "[$address] Disconnecting. " + - "Uptime: ${uptime}ms, " + - "Packets RX: $packetsReceived ($bytesReceived bytes), " + - "Packets TX: $packetsSent ($bytesSent bytes)" - } - // Cancel the connection scope to break the while(isActive) reconnect loop. + Logger.i { "[$address] Disconnecting. ${formatSessionStats()}" } connectionScope.cancel("close() called") - // GATT cleanup must survive serviceScope cancellation. SharedRadioInterfaceService calls - // close() and then immediately cancels serviceScope — a coroutine launched on serviceScope - // may never be dispatched, leaving the BluetoothGatt object leaked (causes GATT 133 on the - // next connect attempt). GlobalScope is the correct tool here: the cleanup is short-lived, - // fire-and-forget, and must outlive any application-managed scope. - // onDisconnect is handled by SharedRadioInterfaceService.stopInterfaceLocked() directly. + // GATT cleanup must outlive serviceScope cancellation — GlobalScope is intentional. + // SharedRadioInterfaceService cancels serviceScope immediately after close(), so a + // coroutine launched there may never run, leaking BluetoothGatt (causes GATT 133). @OptIn(DelicateCoroutinesApi::class) GlobalScope.launch { try { @@ -525,17 +501,27 @@ class BleRadioInterface( service.onDisconnect(isPermanent, errorMessage = msg) } + /** Formats a one-line session statistics summary for logging. */ + private fun formatSessionStats(): String { + val uptime = if (connectionStartTime > 0) nowMillis - connectionStartTime else 0 + return "Uptime: ${uptime}ms, " + + "Packets RX: $packetsReceived ($bytesReceived bytes), " + + "Packets TX: $packetsSent ($bytesSent bytes)" + } + private fun Throwable.toDisconnectReason(): Pair { - val isPermanent = - this::class.simpleName == "BluetoothUnavailableException" || - this::class.simpleName == "ManagerClosedException" + classifyBleException()?.let { + return it.isPermanent to it.message + } + val msg = - when { - this is RadioNotConnectedException -> this.message ?: "Device not found" - this is NoSuchElementException || this is IllegalArgumentException -> "Required characteristic missing" - this::class.simpleName == "GattException" -> "GATT Error: ${this.message}" + when (this) { + is RadioNotConnectedException -> this.message ?: "Device not found" + is NoSuchElementException, + is IllegalArgumentException, + -> "Required characteristic missing" else -> this.message ?: this::class.simpleName ?: "Unknown" } - return Pair(isPermanent, msg) + return false to msg } } diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/StreamInterface.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/StreamInterface.kt index ea985c020..d72c9d0d5 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/StreamInterface.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/radio/StreamInterface.kt @@ -42,11 +42,11 @@ abstract class StreamInterface(protected val service: RadioInterfaceService) : R * * @param waitForStopped if true we should wait for the manager to finish - must be false if called from inside the * manager callbacks + * @param isPermanent true if the device is definitely gone (e.g. USB unplugged), false if it may come back (e.g. + * TCP transient disconnect). Defaults to true for serial — subclasses like [TCPInterface] override with false. */ - protected open fun onDeviceDisconnect(waitForStopped: Boolean) { - service.onDisconnect( - isPermanent = true, - ) // if USB device disconnects it is definitely permanently gone, not sleeping) + protected open fun onDeviceDisconnect(waitForStopped: Boolean, isPermanent: Boolean = true) { + service.onDisconnect(isPermanent = isPermanent) } protected open fun connect() { diff --git a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt index 41fb652ed..56d70d453 100644 --- a/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt +++ b/core/network/src/commonMain/kotlin/org/meshtastic/core/network/repository/MQTTRepositoryImpl.kt @@ -44,6 +44,7 @@ import org.meshtastic.core.model.util.subscribeList import org.meshtastic.core.repository.NodeRepository import org.meshtastic.core.repository.RadioConfigRepository import org.meshtastic.proto.MqttClientProxyMessage +import kotlin.concurrent.Volatile @Single(binds = [MQTTRepository::class]) class MQTTRepositoryImpl( @@ -62,7 +63,7 @@ class MQTTRepositoryImpl( private const val RECONNECT_BACKOFF_MULTIPLIER = 2 } - private var client: MQTTClient? = null + @Volatile private var client: MQTTClient? = null @OptIn(ExperimentalSerializationApi::class) private val json = Json { @@ -70,7 +71,8 @@ class MQTTRepositoryImpl( exceptionsWithDebugInfo = false } private val scope = CoroutineScope(dispatchers.default + SupervisorJob()) - private var clientJob: Job? = null + + @Volatile private var clientJob: Job? = null private val publishSemaphore = Semaphore(20) @Suppress("TooGenericExceptionCaught") @@ -149,12 +151,10 @@ class MQTTRepositoryImpl( while (true) { try { Logger.i { "MQTT Starting client loop for $host:$port" } - // Reset backoff on each successful connection establishment. If the broker - // disconnects cleanly after hours of operation, the next reconnect should - // start with the minimum delay rather than whatever was accumulated. - reconnectDelay = INITIAL_RECONNECT_DELAY_MS newClient.runSuspend() - // runSuspend returned normally — broker closed connection. Retry. + // runSuspend returned normally — broker closed connection cleanly. + // Reset backoff so the next reconnect starts with the minimum delay. + reconnectDelay = INITIAL_RECONNECT_DELAY_MS Logger.w { "MQTT client loop ended normally, reconnecting in ${reconnectDelay}ms" } } catch (e: io.github.davidepianca98.mqtt.MQTTException) { Logger.e(e) { "MQTT Client loop error (MQTT), reconnecting in ${reconnectDelay}ms" } @@ -199,15 +199,25 @@ class MQTTRepositoryImpl( @OptIn(ExperimentalUnsignedTypes::class) override fun publish(topic: String, data: ByteArray, retained: Boolean) { + val currentClient = client + if (currentClient == null) { + Logger.w { "MQTT publish to $topic dropped: client not connected" } + return + } Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" } scope.launch { publishSemaphore.withPermit { - client?.publish( - retain = retained, - qos = Qos.AT_LEAST_ONCE, - topic = topic, - payload = data.toUByteArray(), - ) + @Suppress("TooGenericExceptionCaught") + try { + currentClient.publish( + retain = retained, + qos = Qos.AT_LEAST_ONCE, + topic = topic, + payload = data.toUByteArray(), + ) + } catch (e: Exception) { + Logger.w(e) { "MQTT publish to $topic failed" } + } } } } diff --git a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/BleRadioInterfaceTest.kt b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/BleRadioInterfaceTest.kt index d4fd0dcc1..d4a41ba95 100644 --- a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/BleRadioInterfaceTest.kt +++ b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/BleRadioInterfaceTest.kt @@ -36,6 +36,7 @@ import org.meshtastic.core.testing.FakeBluetoothRepository import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.seconds @OptIn(ExperimentalCoroutinesApi::class) class BleRadioInterfaceTest { @@ -164,14 +165,14 @@ class BleRadioInterfaceTest { } @Test - fun `computeReconnectBackoffMs returns correct backoff values`() { - assertEquals(5_000L, computeReconnectBackoffMs(0)) - assertEquals(5_000L, computeReconnectBackoffMs(1)) - assertEquals(10_000L, computeReconnectBackoffMs(2)) - assertEquals(20_000L, computeReconnectBackoffMs(3)) - assertEquals(40_000L, computeReconnectBackoffMs(4)) - assertEquals(60_000L, computeReconnectBackoffMs(5)) - assertEquals(60_000L, computeReconnectBackoffMs(10)) - assertEquals(60_000L, computeReconnectBackoffMs(100)) + fun `computeReconnectBackoff returns correct backoff values`() { + assertEquals(5.seconds, computeReconnectBackoff(0)) + assertEquals(5.seconds, computeReconnectBackoff(1)) + assertEquals(10.seconds, computeReconnectBackoff(2)) + assertEquals(20.seconds, computeReconnectBackoff(3)) + assertEquals(40.seconds, computeReconnectBackoff(4)) + assertEquals(60.seconds, computeReconnectBackoff(5)) + assertEquals(60.seconds, computeReconnectBackoff(10)) + assertEquals(60.seconds, computeReconnectBackoff(100)) } } diff --git a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/ReconnectBackoffTest.kt b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/ReconnectBackoffTest.kt index 007b82b45..c4e64d36a 100644 --- a/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/ReconnectBackoffTest.kt +++ b/core/network/src/commonTest/kotlin/org/meshtastic/core/network/radio/ReconnectBackoffTest.kt @@ -19,6 +19,7 @@ package org.meshtastic.core.network.radio import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds /** * Tests the exponential backoff schedule used by [BleRadioInterface] when consecutive connection attempts fail. The @@ -28,42 +29,42 @@ class ReconnectBackoffTest { @Test fun `zero failures yields base delay`() { - assertEquals(5_000L, computeReconnectBackoffMs(0)) + assertEquals(5.seconds, computeReconnectBackoff(0)) } @Test fun `first failure yields 5s`() { - assertEquals(5_000L, computeReconnectBackoffMs(1)) + assertEquals(5.seconds, computeReconnectBackoff(1)) } @Test fun `second failure yields 10s`() { - assertEquals(10_000L, computeReconnectBackoffMs(2)) + assertEquals(10.seconds, computeReconnectBackoff(2)) } @Test fun `third failure yields 20s`() { - assertEquals(20_000L, computeReconnectBackoffMs(3)) + assertEquals(20.seconds, computeReconnectBackoff(3)) } @Test fun `fourth failure yields 40s`() { - assertEquals(40_000L, computeReconnectBackoffMs(4)) + assertEquals(40.seconds, computeReconnectBackoff(4)) } @Test fun `fifth failure is capped at 60s`() { - assertEquals(60_000L, computeReconnectBackoffMs(5)) + assertEquals(60.seconds, computeReconnectBackoff(5)) } @Test fun `large failure count stays capped at 60s`() { - assertEquals(60_000L, computeReconnectBackoffMs(100)) + assertEquals(60.seconds, computeReconnectBackoff(100)) } @Test fun `backoff is strictly increasing up to the cap`() { - val values = (1..5).map { computeReconnectBackoffMs(it) } + val values = (1..5).map { computeReconnectBackoff(it) } for (i in 0 until values.size - 1) { assertTrue( values[i] < values[i + 1], diff --git a/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/radio/TCPInterface.kt b/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/radio/TCPInterface.kt index adab96d4d..0ffb731cf 100644 --- a/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/radio/TCPInterface.kt +++ b/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/radio/TCPInterface.kt @@ -52,7 +52,8 @@ open class TCPInterface( override fun onDisconnected() { // Transport already performed teardown; only propagate lifecycle to StreamInterface. - super@TCPInterface.onDeviceDisconnect(false) + // TCP disconnects are transient (not permanent) — the transport will auto-reconnect. + super@TCPInterface.onDeviceDisconnect(false, isPermanent = false) } override fun onPacketReceived(bytes: ByteArray) { @@ -71,9 +72,9 @@ open class TCPInterface( Logger.d { "[$address] TCPInterface.sendBytes delegated to transport" } } - override fun onDeviceDisconnect(waitForStopped: Boolean) { + override fun onDeviceDisconnect(waitForStopped: Boolean, isPermanent: Boolean) { transport.stop() - super.onDeviceDisconnect(waitForStopped) + super.onDeviceDisconnect(waitForStopped, isPermanent = false) } override fun connect() { diff --git a/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/transport/TcpTransport.kt b/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/transport/TcpTransport.kt index dcc0a402f..264e42f89 100644 --- a/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/transport/TcpTransport.kt +++ b/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/transport/TcpTransport.kt @@ -65,6 +65,10 @@ class TcpTransport( } companion object { + /** + * Maximum reconnect retries. Set to [Int.MAX_VALUE] to retry indefinitely — the caller ([TcpTransport.stop]) + * owns the cancellation lifecycle. + */ const val MAX_RECONNECT_RETRIES = Int.MAX_VALUE const val MIN_BACKOFF_MILLIS = 1_000L const val MAX_BACKOFF_MILLIS = 5 * 60 * 1_000L @@ -84,18 +88,26 @@ class TcpTransport( ) // TCP socket state - private var socket: Socket? = null - private var outStream: OutputStream? = null - private var connectionJob: Job? = null - private var currentAddress: String? = null + @Volatile private var socket: Socket? = null + + @Volatile private var outStream: OutputStream? = null + + @Volatile private var connectionJob: Job? = null + + @Volatile private var currentAddress: String? = null // Metrics - private var connectionStartTime: Long = 0 - private var packetsReceived: Int = 0 - private var packetsSent: Int = 0 - private var bytesReceived: Long = 0 - private var bytesSent: Long = 0 - private var timeoutEvents: Int = 0 + @Volatile private var connectionStartTime: Long = 0 + + @Volatile private var packetsReceived: Int = 0 + + @Volatile private var packetsSent: Int = 0 + + @Volatile private var bytesReceived: Long = 0 + + @Volatile private var bytesSent: Long = 0 + + @Volatile private var timeoutEvents: Int = 0 /** Whether the transport is currently connected. */ val isConnected: Boolean diff --git a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeBle.kt b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeBle.kt index 27dc3facc..e5280ec45 100644 --- a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeBle.kt +++ b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeBle.kt @@ -42,7 +42,7 @@ import kotlin.uuid.Uuid class FakeBleDevice( override val address: String, override val name: String? = "Fake Device", - initialState: BleConnectionState = BleConnectionState.Disconnected, + initialState: BleConnectionState = BleConnectionState.Disconnected(), ) : BaseFake(), BleDevice { private val _state = mutableStateFlow(initialState) @@ -124,11 +124,11 @@ class FakeBleConnection : } } - override suspend fun connectAndAwait(device: BleDevice, timeoutMs: Long): BleConnectionState { + override suspend fun connectAndAwait(device: BleDevice, timeout: Duration): BleConnectionState { connectException?.let { throw it } if (failNextN > 0) { failNextN-- - return BleConnectionState.Disconnected + return BleConnectionState.Disconnected() } connect(device) return BleConnectionState.Connected @@ -137,9 +137,9 @@ class FakeBleConnection : override suspend fun disconnect() { disconnectCalls++ val currentDevice = _device.value - _connectionState.emit(BleConnectionState.Disconnected) + _connectionState.emit(BleConnectionState.Disconnected()) if (currentDevice is FakeBleDevice) { - currentDevice.setState(BleConnectionState.Disconnected) + currentDevice.setState(BleConnectionState.Disconnected()) } _device.value = null _deviceFlow.emit(null) diff --git a/desktop/build.gradle.kts b/desktop/build.gradle.kts index bcaab0590..14075fbda 100644 --- a/desktop/build.gradle.kts +++ b/desktop/build.gradle.kts @@ -285,6 +285,7 @@ dependencies { // Ktor HttpClient (Java engine for JVM/Desktop) implementation(libs.ktor.client.java) implementation(libs.ktor.client.content.negotiation) + implementation(libs.ktor.client.logging) implementation(libs.ktor.serialization.kotlinx.json) implementation(libs.androidx.paging.common) diff --git a/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt b/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt index b93c16a75..978be6b26 100644 --- a/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt +++ b/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt @@ -20,6 +20,8 @@ package org.meshtastic.desktop.di import io.ktor.client.HttpClient import io.ktor.client.engine.java.Java import io.ktor.client.plugins.contentnegotiation.ContentNegotiation +import io.ktor.client.plugins.logging.LogLevel +import io.ktor.client.plugins.logging.Logging import io.ktor.serialization.kotlinx.json.json import kotlinx.serialization.json.Json import org.koin.dsl.module @@ -30,6 +32,7 @@ import org.meshtastic.core.model.BootloaderOtaQuirk import org.meshtastic.core.model.NetworkDeviceHardware import org.meshtastic.core.model.NetworkFirmwareReleases import org.meshtastic.core.model.RadioController +import org.meshtastic.core.network.KermitHttpLogger import org.meshtastic.core.network.repository.MQTTRepository import org.meshtastic.core.repository.AppWidgetUpdater import org.meshtastic.core.repository.LocationRepository @@ -168,7 +171,17 @@ private fun desktopPlatformStubsModule() = module { } // Ktor HttpClient for JVM/Desktop (equivalent of CoreNetworkAndroidModule on Android) - single { HttpClient(Java) { install(ContentNegotiation) { json(get()) } } } + single { + HttpClient(Java) { + install(ContentNegotiation) { json(get()) } + if (org.meshtastic.desktop.DesktopBuildConfig.IS_DEBUG) { + install(Logging) { + logger = KermitHttpLogger + level = LogLevel.HEADERS + } + } + } + } // Desktop stubs for data sources that load from Android assets on mobile single { diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleOtaTransport.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleOtaTransport.kt index 9d2478f45..3bdb0f1d7 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleOtaTransport.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleOtaTransport.kt @@ -38,6 +38,9 @@ import org.meshtastic.core.ble.BleWriteType import org.meshtastic.core.ble.MeshtasticBleConstants.OTA_NOTIFY_CHARACTERISTIC import org.meshtastic.core.ble.MeshtasticBleConstants.OTA_SERVICE_UUID import org.meshtastic.core.ble.MeshtasticBleConstants.OTA_WRITE_CHARACTERISTIC +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds /** BLE transport implementation for ESP32 Unified OTA protocol using Kable. */ class BleOtaTransport( @@ -68,7 +71,7 @@ class BleOtaTransport( tag = "BLE OTA", serviceUuid = OTA_SERVICE_UUID, retryCount = SCAN_RETRY_COUNT, - retryDelayMs = SCAN_RETRY_DELAY_MS, + retryDelay = SCAN_RETRY_DELAY, ) { it.address in targetAddresses } @@ -76,8 +79,8 @@ class BleOtaTransport( @Suppress("MagicNumber") override suspend fun connect(): Result = runCatching { - Logger.i { "BLE OTA: Waiting ${REBOOT_DELAY_MS}ms for device to reboot into OTA mode..." } - delay(REBOOT_DELAY_MS) + Logger.i { "BLE OTA: Waiting $REBOOT_DELAY for device to reboot into OTA mode..." } + delay(REBOOT_DELAY) Logger.i { "BLE OTA: Connecting to $address using Kable..." } @@ -96,7 +99,7 @@ class BleOtaTransport( .launchIn(transportScope) try { - val finalState = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT_MS) + val finalState = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT) if (finalState is BleConnectionState.Disconnected) { Logger.w { "BLE OTA: Failed to connect to ${device.address} (state=$finalState)" } throw OtaProtocolException.ConnectionFailed("Failed to connect to device at address ${device.address}") @@ -137,7 +140,7 @@ class BleOtaTransport( .launchIn(this) // Allow time for the BLE subscription to be established before proceeding. - delay(SUBSCRIPTION_SETTLE_MS) + delay(SUBSCRIPTION_SETTLE) if (!subscribed.isCompleted) subscribed.complete(Unit) subscribed.await() @@ -156,7 +159,7 @@ class BleOtaTransport( var handshakeComplete = false var responsesReceived = 0 while (!handshakeComplete) { - val response = waitForResponse(ERASING_TIMEOUT_MS) + val response = waitForResponse(ERASING_TIMEOUT) responsesReceived++ when (val parsed = OtaResponse.parse(response)) { is OtaResponse.Ok -> { @@ -203,7 +206,7 @@ class BleOtaTransport( val nextSentBytes = sentBytes + currentChunkSize repeat(packetsSentForChunk) { i -> - val response = waitForResponse(ACK_TIMEOUT_MS) + val response = waitForResponse(ACK_TIMEOUT) val isLastPacketOfChunk = i == packetsSentForChunk - 1 when (val parsed = OtaResponse.parse(response)) { @@ -229,7 +232,7 @@ class BleOtaTransport( onProgress(sentBytes.toFloat() / totalBytes) } - val finalResponse = waitForResponse(VERIFICATION_TIMEOUT_MS) + val finalResponse = waitForResponse(VERIFICATION_TIMEOUT) when (val parsed = OtaResponse.parse(finalResponse)) { is OtaResponse.Ok -> Unit is OtaResponse.Error -> { @@ -274,21 +277,21 @@ class BleOtaTransport( return packetsSent } - private suspend fun waitForResponse(timeoutMs: Long): String = try { - withTimeout(timeoutMs) { responseChannel.receive() } + private suspend fun waitForResponse(timeout: Duration): String = try { + withTimeout(timeout) { responseChannel.receive() } } catch (@Suppress("SwallowedException") e: kotlinx.coroutines.TimeoutCancellationException) { - throw OtaProtocolException.Timeout("Timeout waiting for response after ${timeoutMs}ms") + throw OtaProtocolException.Timeout("Timeout waiting for response after $timeout") } companion object { - private const val CONNECTION_TIMEOUT_MS = 15_000L - private const val SUBSCRIPTION_SETTLE_MS = 500L - private const val ERASING_TIMEOUT_MS = 60_000L - private const val ACK_TIMEOUT_MS = 10_000L - private const val VERIFICATION_TIMEOUT_MS = 10_000L - private const val REBOOT_DELAY_MS = 5_000L + private val CONNECTION_TIMEOUT = 15.seconds + private val SUBSCRIPTION_SETTLE = 500.milliseconds + private val ERASING_TIMEOUT = 60.seconds + private val ACK_TIMEOUT = 10.seconds + private val VERIFICATION_TIMEOUT = 10.seconds + private val REBOOT_DELAY = 5.seconds private const val SCAN_RETRY_COUNT = 3 - private const val SCAN_RETRY_DELAY_MS = 2_000L + private val SCAN_RETRY_DELAY = 2.seconds const val RECOMMENDED_CHUNK_SIZE = 512 } } diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleScanSupport.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleScanSupport.kt index 6df54ea43..97fced4c6 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleScanSupport.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleScanSupport.kt @@ -26,7 +26,7 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds internal const val DEFAULT_SCAN_RETRY_COUNT = 3 -internal const val DEFAULT_SCAN_RETRY_DELAY_MS = 2_000L +internal val DEFAULT_SCAN_RETRY_DELAY: Duration = 2.seconds internal val DEFAULT_SCAN_TIMEOUT: Duration = 10.seconds private const val MAC_PARTS_COUNT = 6 @@ -59,7 +59,7 @@ internal suspend fun scanForBleDevice( tag: String, serviceUuid: kotlin.uuid.Uuid, retryCount: Int = DEFAULT_SCAN_RETRY_COUNT, - retryDelayMs: Long = DEFAULT_SCAN_RETRY_DELAY_MS, + retryDelay: Duration = DEFAULT_SCAN_RETRY_DELAY, scanTimeout: Duration = DEFAULT_SCAN_TIMEOUT, predicate: (BleDevice) -> Boolean, ): BleDevice? { @@ -80,7 +80,7 @@ internal suspend fun scanForBleDevice( return device } Logger.w { "$tag: Target not in ${foundDevices.size} devices found" } - if (attempt < retryCount - 1) delay(retryDelayMs) + if (attempt < retryCount - 1) delay(retryDelay) } return null } diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransport.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransport.kt index f3d9d8648..83d0deecc 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransport.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransport.kt @@ -48,6 +48,9 @@ import org.meshtastic.core.ble.BleWriteType import org.meshtastic.core.ble.DEFAULT_BLE_WRITE_VALUE_LENGTH import org.meshtastic.feature.firmware.ota.calculateMacPlusOne import org.meshtastic.feature.firmware.ota.scanForBleDevice +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds /** * Kable-based transport for the Nordic Secure DFU (Secure DFU over BLE) protocol. @@ -96,7 +99,7 @@ class SecureDfuTransport( ?: throw DfuException.ConnectionFailed("Device $address not found for buttonless DFU trigger") Logger.i { "DFU: Connecting to $address to trigger buttonless DFU..." } - bleConnection.connectAndAwait(device, CONNECT_TIMEOUT_MS) + bleConnection.connectAndAwait(device, CONNECT_TIMEOUT) bleConnection.profile(SecureDfuUuids.SERVICE) { service -> val buttonlessChar = service.characteristic(SecureDfuUuids.BUTTONLESS_NO_BONDS) @@ -111,7 +114,7 @@ class SecureDfuTransport( .catch { e -> Logger.d(e) { "DFU: Buttonless indication stream ended (expected on disconnect)" } } .launchIn(this) - delay(SUBSCRIPTION_SETTLE_MS) + delay(SUBSCRIPTION_SETTLE) Logger.i { "DFU: Writing buttonless DFU trigger..." } service.write(buttonlessChar, byteArrayOf(0x01), BleWriteType.WITH_RESPONSE) @@ -119,7 +122,7 @@ class SecureDfuTransport( // Wait for the indication response (0x20-01-STATUS). The device may disconnect before we receive it — // that's expected and treated as success, matching the Nordic DFU library's behavior. try { - withTimeout(BUTTONLESS_RESPONSE_TIMEOUT_MS) { + withTimeout(BUTTONLESS_RESPONSE_TIMEOUT) { val response = indicationChannel.receive() if (response.size >= 3 && response[0] == BUTTONLESS_RESPONSE_CODE && response[2] != 0x01.toByte()) { Logger.w { "DFU: Buttonless DFU response indicates error: ${response.toHexString()}" } @@ -162,7 +165,7 @@ class SecureDfuTransport( bleConnection.connectionState.onEach { Logger.d { "DFU: Connection state → $it" } }.launchIn(transportScope) - val connected = bleConnection.connectAndAwait(device, CONNECT_TIMEOUT_MS) + val connected = bleConnection.connectAndAwait(device, CONNECT_TIMEOUT) if (connected is BleConnectionState.Disconnected) { throw DfuException.ConnectionFailed("Failed to connect to DFU device ${device.address}") } @@ -188,7 +191,7 @@ class SecureDfuTransport( } .launchIn(this) - delay(SUBSCRIPTION_SETTLE_MS) + delay(SUBSCRIPTION_SETTLE) if (!subscribed.isCompleted) subscribed.complete(Unit) subscribed.await() @@ -286,7 +289,7 @@ class SecureDfuTransport( } catch (e: Throwable) { lastError = e Logger.w(e) { "DFU: Object transfer failed (attempt ${attempt + 1}/$OBJECT_RETRY_COUNT): ${e.message}" } - if (attempt < OBJECT_RETRY_COUNT - 1) delay(RETRY_DELAY_MS) + if (attempt < OBJECT_RETRY_COUNT - 1) delay(RETRY_DELAY) } } throw lastError ?: DfuException.TransferFailed("Object transfer failed after $OBJECT_RETRY_COUNT attempts") @@ -347,7 +350,7 @@ class SecureDfuTransport( // First-chunk delay: some older bootloaders need time to prepare flash after Create. // The Nordic DFU library uses 400ms for the first chunk. if (isFirstChunk) { - delay(FIRST_CHUNK_DELAY_MS) + delay(FIRST_CHUNK_DELAY) isFirstChunk = false } @@ -399,7 +402,7 @@ class SecureDfuTransport( } catch (e: DfuException.ProtocolError) { if (e.resultCode == DfuResultCode.INVALID_OBJECT && offset + objectSize >= totalBytes) { Logger.w { "DFU: Execute returned INVALID_OBJECT on final object, retrying once..." } - delay(RETRY_DELAY_MS) + delay(RETRY_DELAY) sendExecute() } else { throw e @@ -440,7 +443,7 @@ class SecureDfuTransport( // Wait for the device's PRN receipt notification, then validate CRC. // Skip the wait on the last packet — the final CALCULATE_CHECKSUM covers it. if (prnInterval > 0 && packetsSincePrn >= prnInterval && pos < until) { - val response = awaitNotification(COMMAND_TIMEOUT_MS) + val response = awaitNotification(COMMAND_TIMEOUT) if (response is DfuResponse.ChecksumResult) { val expectedCrc = DfuCrc32.calculate(data, length = pos) if (response.offset != pos || response.crc32 != expectedCrc) { @@ -459,7 +462,7 @@ class SecureDfuTransport( val controlChar = service.characteristic(SecureDfuUuids.CONTROL_POINT) service.write(controlChar, payload, BleWriteType.WITH_RESPONSE) } - return awaitNotification(COMMAND_TIMEOUT_MS) + return awaitNotification(COMMAND_TIMEOUT) } private suspend fun setPrn(value: Int) { @@ -506,13 +509,13 @@ class SecureDfuTransport( Logger.d { "DFU: Object executed." } } - private suspend fun awaitNotification(timeoutMs: Long): DfuResponse = try { - withTimeout(timeoutMs) { + private suspend fun awaitNotification(timeout: Duration): DfuResponse = try { + withTimeout(timeout) { val bytes = notificationChannel.receive() DfuResponse.parse(bytes).also { Logger.d { "DFU: Notification → $it" } } } } catch (_: TimeoutCancellationException) { - throw DfuException.Timeout("No response from Control Point after ${timeoutMs}ms") + throw DfuException.Timeout("No response from Control Point after $timeout") } private fun DfuResponse.requireSuccess(expectedOpcode: Byte) { @@ -541,7 +544,7 @@ class SecureDfuTransport( tag = "DFU", serviceUuid = SecureDfuUuids.SERVICE, retryCount = SCAN_RETRY_COUNT, - retryDelayMs = SCAN_RETRY_DELAY_MS, + retryDelay = SCAN_RETRY_DELAY, predicate = predicate, ) @@ -550,14 +553,14 @@ class SecureDfuTransport( // --------------------------------------------------------------------------- companion object { - private const val CONNECT_TIMEOUT_MS = 15_000L - private const val COMMAND_TIMEOUT_MS = 30_000L - private const val SUBSCRIPTION_SETTLE_MS = 500L - private const val BUTTONLESS_RESPONSE_TIMEOUT_MS = 3_000L + private val CONNECT_TIMEOUT = 15.seconds + private val COMMAND_TIMEOUT = 30.seconds + private val SUBSCRIPTION_SETTLE = 500.milliseconds + private val BUTTONLESS_RESPONSE_TIMEOUT = 3.seconds private const val SCAN_RETRY_COUNT = 3 - private const val SCAN_RETRY_DELAY_MS = 2_000L - private const val RETRY_DELAY_MS = 2_000L - private const val FIRST_CHUNK_DELAY_MS = 400L + private val SCAN_RETRY_DELAY = 2.seconds + private val RETRY_DELAY = 2.seconds + private val FIRST_CHUNK_DELAY = 400.milliseconds /** Response code prefix for Buttonless DFU indications (0x20 = response). */ private const val BUTTONLESS_RESPONSE_CODE: Byte = 0x20 diff --git a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransportTest.kt b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransportTest.kt index b6a73bc52..da8f84057 100644 --- a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransportTest.kt +++ b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransportTest.kt @@ -614,8 +614,8 @@ class SecureDfuTransportTest { override suspend fun connect(device: BleDevice) = delegate.connect(device) - override suspend fun connectAndAwait(device: BleDevice, timeoutMs: Long) = - delegate.connectAndAwait(device, timeoutMs) + override suspend fun connectAndAwait(device: BleDevice, timeout: Duration) = + delegate.connectAndAwait(device, timeout) override suspend fun disconnect() = delegate.disconnect() diff --git a/feature/wifi-provision/src/commonMain/kotlin/org/meshtastic/feature/wifiprovision/NymeaBleConstants.kt b/feature/wifi-provision/src/commonMain/kotlin/org/meshtastic/feature/wifiprovision/NymeaBleConstants.kt index f174d5746..5b0d8398c 100644 --- a/feature/wifi-provision/src/commonMain/kotlin/org/meshtastic/feature/wifiprovision/NymeaBleConstants.kt +++ b/feature/wifi-provision/src/commonMain/kotlin/org/meshtastic/feature/wifiprovision/NymeaBleConstants.kt @@ -16,6 +16,8 @@ */ package org.meshtastic.feature.wifiprovision +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds import kotlin.uuid.Uuid /** @@ -62,14 +64,14 @@ internal object NymeaBleConstants { /** JSON stream terminator — marks the end of a reassembled message. */ const val STREAM_TERMINATOR = '\n' - /** Scan + connect timeout in milliseconds. */ - const val SCAN_TIMEOUT_MS = 10_000L + /** Scan + connect timeout. */ + val SCAN_TIMEOUT = 10.seconds /** Maximum time to wait for a command response. */ - const val RESPONSE_TIMEOUT_MS = 15_000L + val RESPONSE_TIMEOUT = 15.seconds /** Settle time after subscribing to notifications before sending commands. */ - const val SUBSCRIPTION_SETTLE_MS = 300L + val SUBSCRIPTION_SETTLE = 300.milliseconds // endregion // region Wireless Commander command codes diff --git a/feature/wifi-provision/src/commonMain/kotlin/org/meshtastic/feature/wifiprovision/domain/NymeaWifiService.kt b/feature/wifi-provision/src/commonMain/kotlin/org/meshtastic/feature/wifiprovision/domain/NymeaWifiService.kt index 067dec798..03330dc3e 100644 --- a/feature/wifi-provision/src/commonMain/kotlin/org/meshtastic/feature/wifiprovision/domain/NymeaWifiService.kt +++ b/feature/wifi-provision/src/commonMain/kotlin/org/meshtastic/feature/wifiprovision/domain/NymeaWifiService.kt @@ -43,14 +43,13 @@ import org.meshtastic.feature.wifiprovision.NymeaBleConstants.CMD_GET_NETWORKS import org.meshtastic.feature.wifiprovision.NymeaBleConstants.CMD_SCAN import org.meshtastic.feature.wifiprovision.NymeaBleConstants.COMMANDER_RESPONSE_UUID import org.meshtastic.feature.wifiprovision.NymeaBleConstants.RESPONSE_SUCCESS -import org.meshtastic.feature.wifiprovision.NymeaBleConstants.RESPONSE_TIMEOUT_MS -import org.meshtastic.feature.wifiprovision.NymeaBleConstants.SCAN_TIMEOUT_MS -import org.meshtastic.feature.wifiprovision.NymeaBleConstants.SUBSCRIPTION_SETTLE_MS +import org.meshtastic.feature.wifiprovision.NymeaBleConstants.RESPONSE_TIMEOUT +import org.meshtastic.feature.wifiprovision.NymeaBleConstants.SCAN_TIMEOUT +import org.meshtastic.feature.wifiprovision.NymeaBleConstants.SUBSCRIPTION_SETTLE import org.meshtastic.feature.wifiprovision.NymeaBleConstants.WIRELESS_COMMANDER_UUID import org.meshtastic.feature.wifiprovision.NymeaBleConstants.WIRELESS_SERVICE_UUID import org.meshtastic.feature.wifiprovision.model.ProvisionResult import org.meshtastic.feature.wifiprovision.model.WifiNetwork -import kotlin.time.Duration.Companion.milliseconds /** * GATT client for the nymea-networkmanager WiFi provisioning profile. @@ -87,26 +86,20 @@ class NymeaWifiService( * * @param address Optional MAC address filter. If null, the first advertising device is used. * @return The discovered device's advertised name on success. - * @throws IllegalStateException if no device is found within [SCAN_TIMEOUT_MS]. + * @throws IllegalStateException if no device is found within [SCAN_TIMEOUT]. */ suspend fun connect(address: String? = null): Result = runCatching { Logger.i { "$TAG: Scanning for nymea-networkmanager device (address=$address)…" } val device = - withTimeout(SCAN_TIMEOUT_MS) { - scanner - .scan( - timeout = SCAN_TIMEOUT_MS.milliseconds, - serviceUuid = WIRELESS_SERVICE_UUID, - address = address, - ) - .first() + withTimeout(SCAN_TIMEOUT) { + scanner.scan(timeout = SCAN_TIMEOUT, serviceUuid = WIRELESS_SERVICE_UUID, address = address).first() } val deviceName = device.name ?: device.address Logger.i { "$TAG: Found device: ${device.name} @ ${device.address}" } - val state = bleConnection.connectAndAwait(device, SCAN_TIMEOUT_MS) + val state = bleConnection.connectAndAwait(device, SCAN_TIMEOUT) check(state is BleConnectionState.Connected) { "Failed to connect to ${device.address} — final state: $state" } Logger.i { "$TAG: Connected. Discovering wireless service…" } @@ -130,7 +123,7 @@ class NymeaWifiService( } .launchIn(this) - delay(SUBSCRIPTION_SETTLE_MS) + delay(SUBSCRIPTION_SETTLE) if (!subscribed.isCompleted) subscribed.complete(Unit) subscribed.await() @@ -235,8 +228,8 @@ class NymeaWifiService( } } - /** Wait up to [RESPONSE_TIMEOUT_MS] for a complete JSON response from the notification channel. */ - private suspend fun waitForResponse(): String = withTimeout(RESPONSE_TIMEOUT_MS) { responseChannel.receive() } + /** Wait up to [RESPONSE_TIMEOUT] for a complete JSON response from the notification channel. */ + private suspend fun waitForResponse(): String = withTimeout(RESPONSE_TIMEOUT) { responseChannel.receive() } private fun nymeaErrorMessage(code: Int): String = when (code) { NymeaBleConstants.RESPONSE_INVALID_COMMAND -> "Invalid command"