refactor(transport): complete transport architecture overhaul — extract callback, wire BleReconnectPolicy, fix safety issues (#5080)

This commit is contained in:
James Rich 2026-04-11 23:22:18 -05:00 committed by GitHub
parent 962c619c4c
commit e85300531e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
64 changed files with 1184 additions and 1018 deletions

View file

@ -17,6 +17,7 @@
package org.meshtastic.core.network.radio
import android.content.Context
import android.hardware.usb.UsbManager
import android.provider.Settings
import org.koin.core.annotation.Single
import org.meshtastic.core.ble.BleConnectionFactory
@ -25,21 +26,23 @@ import org.meshtastic.core.ble.BluetoothRepository
import org.meshtastic.core.common.BuildConfigProvider
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.DeviceType
import org.meshtastic.core.model.InterfaceId
import org.meshtastic.core.network.repository.UsbRepository
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioTransport
import org.meshtastic.core.repository.RadioTransportFactory
/**
* Android implementation of [RadioTransportFactory]. Handles pure-KMP transports (BLE) via [BaseRadioTransportFactory]
* while delegating legacy platform-specific connections (like USB/Serial, TCP, and Mocks) to the Android-specific
* [InterfaceFactory].
* while creating platform-specific connections (TCP, USB/Serial, Mock, NOP) directly in [createPlatformTransport].
*/
@Single(binds = [RadioTransportFactory::class])
@Suppress("LongParameterList")
class AndroidRadioTransportFactory(
private val context: Context,
private val interfaceFactory: Lazy<InterfaceFactory>,
private val buildConfigProvider: BuildConfigProvider,
private val usbRepository: UsbRepository,
private val usbManager: UsbManager,
scanner: BleScanner,
bluetoothRepository: BluetoothRepository,
connectionFactory: BleConnectionFactory,
@ -48,13 +51,50 @@ class AndroidRadioTransportFactory(
override val supportedDeviceTypes: List<DeviceType> = listOf(DeviceType.BLE, DeviceType.TCP, DeviceType.USB)
override fun isMockInterface(): Boolean =
override fun isMockTransport(): Boolean =
buildConfigProvider.isDebug || Settings.System.getString(context.contentResolver, "firebase.test.lab") == "true"
override fun isPlatformAddressValid(address: String): Boolean = interfaceFactory.value.addressValid(address)
override fun isPlatformAddressValid(address: String): Boolean {
val interfaceId = address.firstOrNull()?.let { InterfaceId.forIdChar(it) } ?: return false
val rest = address.substring(1)
return when (interfaceId) {
InterfaceId.MOCK,
InterfaceId.NOP,
InterfaceId.TCP,
-> true
InterfaceId.SERIAL -> {
val deviceMap = usbRepository.serialDevices.value
val driver = deviceMap[rest] ?: deviceMap.values.firstOrNull()
driver != null && usbManager.hasPermission(driver.device)
}
InterfaceId.BLUETOOTH -> true // Handled by base class
}
}
override fun createPlatformTransport(address: String, service: RadioInterfaceService): RadioTransport {
// Fallback to legacy factory for Serial, Mocks, and NOPs
return interfaceFactory.value.createInterface(address, service)
val interfaceId = address.firstOrNull()?.let { InterfaceId.forIdChar(it) }
val rest = address.substring(1)
return when (interfaceId) {
InterfaceId.MOCK -> MockRadioTransport(callback = service, scope = service.serviceScope, address = rest)
InterfaceId.TCP ->
TcpRadioTransport(
callback = service,
scope = service.serviceScope,
dispatchers = dispatchers,
address = rest,
)
InterfaceId.SERIAL ->
SerialRadioTransport(
callback = service,
scope = service.serviceScope,
usbRepository = usbRepository,
address = rest,
)
InterfaceId.NOP,
null,
-> NopRadioTransport(rest)
InterfaceId.BLUETOOTH -> error("BLE addresses should be handled by BaseRadioTransportFactory")
}
}
}

View file

@ -1,66 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.koin.core.annotation.Single
import org.meshtastic.core.model.InterfaceId
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioTransport
/**
* Entry point for create radio backend instances given a specific address.
*
* This class is responsible for building and dissecting radio addresses based upon their interface type and the "rest"
* of the address (which varies per implementation).
*/
@Single
class InterfaceFactory(
private val nopInterfaceFactory: NopInterfaceFactory,
private val mockSpec: Lazy<MockInterfaceSpec>,
private val serialSpec: Lazy<SerialInterfaceSpec>,
private val tcpSpec: Lazy<TCPInterfaceSpec>,
) {
internal val nopInterface by lazy { nopInterfaceFactory.create("") }
private val specMap: Map<InterfaceId, InterfaceSpec<*>> by lazy {
mapOf(
InterfaceId.MOCK to mockSpec.value,
InterfaceId.NOP to NopInterfaceSpec(nopInterfaceFactory),
InterfaceId.SERIAL to serialSpec.value,
InterfaceId.TCP to tcpSpec.value,
)
}
fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = "${interfaceId.id}$rest"
fun createInterface(address: String, service: RadioInterfaceService): RadioTransport {
val (spec, rest) = splitAddress(address)
return spec?.createInterface(rest, service) ?: nopInterface
}
fun addressValid(address: String?): Boolean = address?.let {
val (spec, rest) = splitAddress(it)
spec?.addressValid(rest)
} ?: false
private fun splitAddress(address: String): Pair<InterfaceSpec<*>?, String> {
if (address.isEmpty()) return Pair(null, "")
val c = address[0].let { InterfaceId.forIdChar(it) }?.let { specMap[it] }
val rest = address.substring(1)
return Pair(c, rest)
}
}

View file

@ -1,28 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.koin.core.annotation.Single
import org.meshtastic.core.network.repository.UsbRepository
import org.meshtastic.core.repository.RadioInterfaceService
/** Factory for creating `SerialInterface` instances. */
@Single
class SerialInterfaceFactory(private val usbRepository: UsbRepository) {
fun create(rest: String, service: RadioInterfaceService): SerialInterface =
SerialInterface(service, usbRepository, rest)
}

View file

@ -1,44 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import android.hardware.usb.UsbManager
import com.hoho.android.usbserial.driver.UsbSerialDriver
import org.koin.core.annotation.Single
import org.meshtastic.core.network.repository.UsbRepository
import org.meshtastic.core.repository.RadioInterfaceService
/** Serial/USB interface backend implementation. */
@Single
class SerialInterfaceSpec(
private val factory: SerialInterfaceFactory,
private val usbManager: UsbManager,
private val usbRepository: UsbRepository,
) : InterfaceSpec<SerialInterface> {
override fun createInterface(rest: String, service: RadioInterfaceService): SerialInterface =
factory.create(rest, service)
override fun addressValid(rest: String): Boolean {
val driver = findSerial(rest) ?: return false
return usbManager.hasPermission(driver.device)
}
internal fun findSerial(rest: String): UsbSerialDriver? {
val deviceMap = usbRepository.serialDevices.value
return deviceMap[rest] ?: deviceMap.values.firstOrNull()
}
}

View file

@ -17,24 +17,28 @@
package org.meshtastic.core.network.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.network.repository.SerialConnection
import org.meshtastic.core.network.repository.SerialConnectionListener
import org.meshtastic.core.network.repository.UsbRepository
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import org.meshtastic.core.network.transport.HeartbeatSender
import org.meshtastic.core.repository.RadioTransportCallback
import java.util.concurrent.atomic.AtomicReference
/** An interface that assumes we are talking to a meshtastic device via USB serial */
class SerialInterface(
service: RadioInterfaceService,
/** An Android USB/serial [RadioTransport] implementation. */
class SerialRadioTransport(
callback: RadioTransportCallback,
scope: CoroutineScope,
private val usbRepository: UsbRepository,
private val address: String,
) : StreamInterface(service) {
) : StreamTransport(callback, scope) {
private var connRef = AtomicReference<SerialConnection?>()
init {
private val heartbeatSender = HeartbeatSender(sendToRadio = ::handleSendToRadio, logTag = "Serial[$address]")
override fun start() {
connect()
}
@ -116,14 +120,9 @@ class SerialInterface(
}
override fun keepAlive() {
// Send a ToRadio heartbeat so the firmware resets its idle timer and responds with
// a FromRadio queueStatus — proving the serial link is alive. Without this, the
// serial transport has no way to detect a silently dead device (battery depleted,
// firmware crash without the `rebooted` flag). The queueStatus response also feeds
// into MeshMessageProcessorImpl.refreshLocalNodeLastHeard() to keep the local
// node's lastHeard timestamp current.
Logger.d { "[$address] Serial keepAlive — sending heartbeat" }
handleSendToRadio(ToRadio(heartbeat = Heartbeat()).encode())
// Delegate to HeartbeatSender which sends a ToRadio heartbeat to prove the serial
// link is alive and keep the local node's lastHeard timestamp current.
scope.handledLaunch { heartbeatSender.sendHeartbeat() }
}
override fun sendBytes(p: ByteArray) {

View file

@ -1,27 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.koin.core.annotation.Single
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.repository.RadioInterfaceService
/** Factory for creating `TCPInterface` instances. */
@Single
class TCPInterfaceFactory(private val dispatchers: CoroutineDispatchers) {
fun create(rest: String, service: RadioInterfaceService): TCPInterface = TCPInterface(service, dispatchers, rest)
}

View file

@ -1,27 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.koin.core.annotation.Single
import org.meshtastic.core.repository.RadioInterfaceService
/** TCP interface backend implementation. */
@Single
class TCPInterfaceSpec(private val factory: TCPInterfaceFactory) : InterfaceSpec<TCPInterface> {
override fun createInterface(rest: String, service: RadioInterfaceService): TCPInterface =
factory.create(rest, service)
}

View file

@ -38,40 +38,41 @@ abstract class BaseRadioTransportFactory(
override fun isAddressValid(address: String?): Boolean {
val spec = address?.firstOrNull() ?: return false
return spec in
listOf(InterfaceId.TCP.id, InterfaceId.SERIAL.id, InterfaceId.BLUETOOTH.id, InterfaceId.MOCK.id) ||
spec == '!' ||
isPlatformAddressValid(address)
return when (spec) {
InterfaceId.TCP.id,
InterfaceId.SERIAL.id,
InterfaceId.BLUETOOTH.id,
InterfaceId.MOCK.id,
'!',
-> true
else -> isPlatformAddressValid(address)
}
}
protected open fun isPlatformAddressValid(address: String): Boolean = false
override fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = "${interfaceId.id}$rest"
override fun createTransport(address: String, service: RadioInterfaceService): RadioTransport = when {
address.startsWith(InterfaceId.BLUETOOTH.id) -> {
BleRadioInterface(
serviceScope = service.serviceScope,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
service = service,
address = address.removePrefix(InterfaceId.BLUETOOTH.id.toString()),
)
}
address.startsWith("!") -> {
BleRadioInterface(
serviceScope = service.serviceScope,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
service = service,
address = address.removePrefix("!"),
)
}
else -> createPlatformTransport(address, service)
override fun createTransport(address: String, service: RadioInterfaceService): RadioTransport {
val transport =
when {
address.startsWith(InterfaceId.BLUETOOTH.id) || address.startsWith("!") -> {
val bleAddress = address.removePrefix(InterfaceId.BLUETOOTH.id.toString()).removePrefix("!")
BleRadioTransport(
scope = service.serviceScope,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
callback = service,
address = bleAddress,
)
}
else -> createPlatformTransport(address, service)
}
transport.start()
return transport
}
/** Delegate to platform for Mock, TCP, or Serial/USB interfaces. */
/** Delegate to platform for Mock, TCP, or Serial/USB transports. */
protected abstract fun createPlatformTransport(address: String, service: RadioInterfaceService): RadioTransport
}

View file

@ -19,6 +19,7 @@
package org.meshtastic.core.network.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
@ -32,7 +33,6 @@ import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
@ -47,54 +47,22 @@ import org.meshtastic.core.ble.BleWriteType
import org.meshtastic.core.ble.BluetoothRepository
import org.meshtastic.core.ble.DisconnectReason
import org.meshtastic.core.ble.MeshtasticBleConstants.SERVICE_UUID
import org.meshtastic.core.ble.MeshtasticRadioProfile
import org.meshtastic.core.ble.classifyBleException
import org.meshtastic.core.ble.retryBleOperation
import org.meshtastic.core.ble.toMeshtasticRadioProfile
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.model.RadioNotConnectedException
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.network.transport.HeartbeatSender
import org.meshtastic.core.repository.RadioTransport
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import org.meshtastic.core.repository.RadioTransportCallback
import kotlin.concurrent.Volatile
import kotlin.concurrent.atomics.AtomicInt
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
private const val SCAN_RETRY_COUNT = 3
private val SCAN_RETRY_DELAY = 1.seconds
private val CONNECTION_TIMEOUT = 15.seconds
private const val RECONNECT_FAILURE_THRESHOLD = 3
private val RECONNECT_BASE_DELAY = 5.seconds
private val RECONNECT_MAX_DELAY = 60.seconds
private const val RECONNECT_MAX_FAILURES = 10
/** Settle delay before each connection attempt to let the Android BLE stack finish any pending disconnect cleanup. */
private val SETTLE_DELAY = 1.seconds
/**
* Minimum time a BLE connection must stay up before we consider it "stable" and reset
* [BleRadioInterface.consecutiveFailures]. Without this, a device at the edge of BLE range can repeatedly connect for a
* fraction of a second and drop each brief connection resets the failure counter so [RECONNECT_FAILURE_THRESHOLD] is
* never reached, and the app never signals [ConnectionState.DeviceSleep].
*
* The value (5 s) is long enough that only connections that survive past the initial GATT setup are treated as genuine,
* but short enough that normal reconnects after light-sleep still reset the counter promptly.
*/
private val MIN_STABLE_CONNECTION = 5.seconds
/**
* Returns the reconnect backoff delay for a given consecutive failure count.
*
* Backoff schedule: 1 failure 5 s 2 failures 10 s 3 failures 20 s 4 failures 40 s 5+ failures 60 s (capped)
*/
internal fun computeReconnectBackoff(consecutiveFailures: Int): Duration {
if (consecutiveFailures <= 0) return RECONNECT_BASE_DELAY
val multiplier = 1 shl (consecutiveFailures - 1).coerceAtMost(4)
return minOf(RECONNECT_BASE_DELAY * multiplier, RECONNECT_MAX_DELAY)
}
/**
* Delay after writing a heartbeat before re-polling FROMRADIO.
@ -117,27 +85,27 @@ private val GATT_CLEANUP_TIMEOUT = 5.seconds
* - Bonding and discovery.
* - Automatic reconnection logic.
* - MTU and connection parameter monitoring.
* - Routing raw byte packets between the radio and [RadioInterfaceService].
* - Routing raw byte packets between the radio and [RadioTransportCallback].
*
* @param serviceScope The coroutine scope to use for launching coroutines.
* @param scope The coroutine scope to use for launching coroutines.
* @param scanner The BLE scanner.
* @param bluetoothRepository The Bluetooth repository.
* @param connectionFactory The BLE connection factory.
* @param service The [RadioInterfaceService] to use for handling radio events.
* @param callback The [RadioTransportCallback] to use for handling radio events.
* @param address The BLE address of the device to connect to.
*/
class BleRadioInterface(
private val serviceScope: CoroutineScope,
class BleRadioTransport(
private val scope: CoroutineScope,
private val scanner: BleScanner,
private val bluetoothRepository: BluetoothRepository,
private val connectionFactory: BleConnectionFactory,
private val service: RadioInterfaceService,
private val callback: RadioTransportCallback,
internal val address: String,
) : RadioTransport {
private val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
Logger.w(throwable) { "[$address] Uncaught exception in connectionScope" }
serviceScope.launch {
scope.launch {
try {
bleConnection.disconnect()
} catch (e: Exception) {
@ -145,13 +113,11 @@ class BleRadioInterface(
}
}
val (isPermanent, msg) = throwable.toDisconnectReason()
service.onDisconnect(isPermanent, errorMessage = msg)
callback.onDisconnect(isPermanent, errorMessage = msg)
}
private val connectionScope: CoroutineScope =
CoroutineScope(
serviceScope.coroutineContext + SupervisorJob(serviceScope.coroutineContext.job) + exceptionHandler,
)
CoroutineScope(scope.coroutineContext + SupervisorJob(scope.coroutineContext.job) + exceptionHandler)
private val bleConnection: BleConnection = connectionFactory.create(connectionScope, address)
private val writeMutex: Mutex = Mutex()
@ -167,12 +133,19 @@ class BleRadioInterface(
@Volatile private var isFullyConnected = false
private var connectionJob: Job? = null
private var consecutiveFailures = 0
private val reconnectPolicy = BleReconnectPolicy()
@OptIn(ExperimentalAtomicApi::class)
private val heartbeatNonce = AtomicInt(0)
private val heartbeatSender =
HeartbeatSender(
sendToRadio = ::handleSendToRadio,
afterHeartbeat = {
delay(HEARTBEAT_DRAIN_DELAY)
radioService?.requestDrain()
},
logTag = address,
)
init {
override fun start() {
connect()
}
@ -209,134 +182,104 @@ class BleRadioInterface(
throw RadioNotConnectedException("Device not found at address $address")
}
@Suppress("LongMethod", "CyclomaticComplexMethod")
private fun connect() {
connectionJob =
connectionScope.launch {
while (isActive) {
try {
// Settle delay: let the Android BLE stack finish any pending
// disconnect cleanup before starting a new connection attempt.
delay(SETTLE_DELAY)
connectionStartTime = nowMillis
Logger.i { "[$address] BLE connection attempt started" }
val device = findDevice()
// Bond before connecting: firmware may require an encrypted link,
// and without a bond Android fails with status 5 or 133.
// No-op on Desktop/JVM where the OS handles pairing automatically.
if (!bluetoothRepository.isBonded(address)) {
Logger.i { "[$address] Device not bonded, initiating bonding" }
@Suppress("TooGenericExceptionCaught")
try {
bluetoothRepository.bond(device)
Logger.i { "[$address] Bonding successful" }
} catch (e: Exception) {
Logger.w(e) { "[$address] Bonding failed, attempting connection anyway" }
}
reconnectPolicy.execute(
attempt = {
try {
attemptConnection()
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
val failureTime = (nowMillis - connectionStartTime).milliseconds
Logger.w(e) { "[$address] Failed to connect after $failureTime" }
BleReconnectPolicy.Outcome.Failed(e)
}
},
onTransientDisconnect = { error ->
val msg = error?.toDisconnectReason()?.second ?: "Device unreachable"
callback.onDisconnect(isPermanent = false, errorMessage = msg)
},
onPermanentDisconnect = { error ->
val msg = error?.toDisconnectReason()?.second ?: "Device unreachable"
callback.onDisconnect(isPermanent = true, errorMessage = msg)
},
)
}
}
val state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT)
/**
* Performs a single BLE connect-and-wait cycle.
*
* Finds the device, bonds if needed, connects, discovers services, and waits for disconnect. Returns a
* [BleReconnectPolicy.Outcome] describing how the connection ended.
*/
@Suppress("CyclomaticComplexMethod")
private suspend fun attemptConnection(): BleReconnectPolicy.Outcome {
connectionStartTime = nowMillis
Logger.i { "[$address] BLE connection attempt started" }
if (state !is BleConnectionState.Connected) {
throw RadioNotConnectedException("Failed to connect to device at address $address")
}
val device = findDevice()
// Only reset failures if connection was stable (see MIN_STABLE_CONNECTION).
val gattConnectedAt = nowMillis
isFullyConnected = true
onConnected()
// Bond before connecting: firmware may require an encrypted link,
// and without a bond Android fails with status 5 or 133.
// No-op on Desktop/JVM where the OS handles pairing automatically.
if (!bluetoothRepository.isBonded(address)) {
Logger.i { "[$address] Device not bonded, initiating bonding" }
@Suppress("TooGenericExceptionCaught")
try {
bluetoothRepository.bond(device)
Logger.i { "[$address] Bonding successful" }
} catch (e: Exception) {
Logger.w(e) { "[$address] Bonding failed, attempting connection anyway" }
}
}
// Scope the connectionState listener to this iteration so it's
// cancelled automatically before the next reconnect cycle.
var disconnectReason: DisconnectReason = DisconnectReason.Unknown
coroutineScope {
bleConnection.connectionState
.onEach { s ->
if (s is BleConnectionState.Disconnected && isFullyConnected) {
isFullyConnected = false
disconnectReason = s.reason
onDisconnected()
}
}
.catch { e -> Logger.w(e) { "[$address] bleConnection.connectionState flow crashed" } }
.launchIn(this)
val state = bleConnection.connectAndAwait(device, CONNECTION_TIMEOUT)
discoverServicesAndSetupCharacteristics()
if (state !is BleConnectionState.Connected) {
throw RadioNotConnectedException("Failed to connect to device at address $address")
}
bleConnection.connectionState.first { it is BleConnectionState.Disconnected }
}
val gattConnectedAt = nowMillis
isFullyConnected = true
onConnected()
Logger.i {
"[$address] BLE connection dropped (reason: $disconnectReason), preparing to reconnect"
}
// Skip failure counting for intentional disconnects.
if (disconnectReason is DisconnectReason.LocalDisconnect) {
consecutiveFailures = 0
continue
}
// A connection that drops almost immediately (< MIN_STABLE_CONNECTION)
// is treated as a failure — the BLE stack may have "connected" to a
// cached GATT profile before realising the device is gone.
val connectionUptime = (nowMillis - gattConnectedAt).milliseconds
if (connectionUptime >= MIN_STABLE_CONNECTION) {
consecutiveFailures = 0
} else {
consecutiveFailures++
Logger.w {
"[$address] Connection lasted only $connectionUptime " +
"(< $MIN_STABLE_CONNECTION) — treating as failure " +
"(consecutive failures: $consecutiveFailures)"
}
if (consecutiveFailures >= RECONNECT_MAX_FAILURES) {
Logger.e { "[$address] Giving up after $consecutiveFailures unstable connections" }
service.onDisconnect(
isPermanent = true,
errorMessage = "Device unreachable (unstable connection)",
)
return@launch
}
if (consecutiveFailures >= RECONNECT_FAILURE_THRESHOLD) {
service.onDisconnect(
isPermanent = false,
errorMessage = "Device unreachable (unstable connection)",
)
}
}
} catch (e: kotlinx.coroutines.CancellationException) {
Logger.d { "[$address] BLE connection coroutine cancelled" }
throw e
} catch (e: Exception) {
val failureTime = (nowMillis - connectionStartTime).milliseconds
consecutiveFailures++
Logger.w(e) {
"[$address] Failed to connect to device after $failureTime " +
"(consecutive failures: $consecutiveFailures)"
}
// Give up permanently to stop draining battery.
if (consecutiveFailures >= RECONNECT_MAX_FAILURES) {
Logger.e { "[$address] Giving up after $consecutiveFailures consecutive failures" }
val (_, msg) = e.toDisconnectReason()
service.onDisconnect(isPermanent = true, errorMessage = msg)
return@launch
}
// Signal DeviceSleep so MeshConnectionManagerImpl starts its sleep timeout.
if (consecutiveFailures >= RECONNECT_FAILURE_THRESHOLD) {
handleFailure(e)
}
val backoff = computeReconnectBackoff(consecutiveFailures)
Logger.d { "[$address] Retrying in $backoff (failure #$consecutiveFailures)" }
delay(backoff)
// Scope the connectionState listener to this iteration so it's
// cancelled automatically before the next reconnect cycle.
var disconnectReason: DisconnectReason = DisconnectReason.Unknown
coroutineScope {
bleConnection.connectionState
.onEach { s ->
if (s is BleConnectionState.Disconnected && isFullyConnected) {
isFullyConnected = false
disconnectReason = s.reason
onDisconnected()
}
}
.catch { e -> Logger.w(e) { "[$address] bleConnection.connectionState flow crashed" } }
.launchIn(this)
discoverServicesAndSetupCharacteristics()
bleConnection.connectionState.first { it is BleConnectionState.Disconnected }
}
Logger.i { "[$address] BLE connection dropped (reason: $disconnectReason), preparing to reconnect" }
val wasIntentional = disconnectReason is DisconnectReason.LocalDisconnect
val connectionUptime = (nowMillis - gattConnectedAt).milliseconds
val wasStable = connectionUptime >= reconnectPolicy.minStableConnection
if (!wasStable && !wasIntentional) {
Logger.w {
"[$address] Connection lasted only $connectionUptime " +
"(< ${reconnectPolicy.minStableConnection}) — treating as unstable"
}
}
return BleReconnectPolicy.Outcome.Disconnected(wasStable = wasStable, wasIntentional = wasIntentional)
}
private suspend fun onConnected() {
@ -354,7 +297,7 @@ class BleRadioInterface(
radioService = null
Logger.i { "[$address] BLE disconnected - ${formatSessionStats()}" }
// Signal immediately so the UI reflects the disconnect while reconnect continues.
service.onDisconnect(isPermanent = false)
callback.onDisconnect(isPermanent = false)
}
private suspend fun discoverServicesAndSetupCharacteristics() {
@ -384,7 +327,7 @@ class BleRadioInterface(
}
.launchIn(this)
this@BleRadioInterface.radioService = radioService
this@BleRadioTransport.radioService = radioService
Logger.i { "[$address] Profile service active and characteristics subscribed" }
@ -395,7 +338,7 @@ class BleRadioInterface(
val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE)
Logger.i { "[$address] BLE Radio Session Ready. Max write length (WITHOUT_RESPONSE): $maxLen bytes" }
this@BleRadioInterface.service.onConnect()
this@BleRadioTransport.callback.onConnect()
}
} catch (e: Exception) {
Logger.w(e) { "[$address] Profile service discovery or operation failed" }
@ -409,7 +352,7 @@ class BleRadioInterface(
}
}
@Volatile private var radioService: org.meshtastic.core.ble.MeshtasticRadioProfile? = null
@Volatile private var radioService: MeshtasticRadioProfile? = null
// --- RadioTransport Implementation ---
@ -445,36 +388,19 @@ class BleRadioInterface(
}
}
@OptIn(ExperimentalAtomicApi::class)
override fun keepAlive() {
// Send a ToRadio heartbeat so the firmware resets its power-saving idle timer.
// The firmware only resets the timer on writes to the TORADIO characteristic; a
// BLE-level GATT keepalive is invisible to it. Without this the device may enter
// light-sleep and drop the BLE connection after ~60 s of application inactivity.
//
// Each heartbeat uses a distinct nonce to vary the wire bytes, preventing the
// firmware's per-connection duplicate-write filter from silently dropping it.
val nonce = heartbeatNonce.fetchAndAdd(1)
Logger.v { "[$address] BLE keepAlive — sending ToRadio heartbeat (nonce=$nonce)" }
handleSendToRadio(ToRadio(heartbeat = Heartbeat(nonce = nonce)).encode())
// The firmware responds to heartbeats by queuing a `queueStatus` FromRadio packet
// on the next getFromRadio() call, but it does NOT send a FROMNUM notification for
// it. The immediate drain trigger in sendToRadio() fires before the ESP32's async
// task queue has processed the heartbeat, so the response sits unread. Schedule a
// delayed re-drain to pick it up.
connectionScope.launch {
delay(HEARTBEAT_DRAIN_DELAY)
radioService?.requestDrain()
}
// Delegate to HeartbeatSender which sends a ToRadio heartbeat with a unique nonce
// so the firmware resets its power-saving idle timer. After sending, it schedules
// a delayed re-drain to pick up the queueStatus response.
connectionScope.launch { heartbeatSender.sendHeartbeat() }
}
/** Closes the connection to the device. */
override fun close() {
Logger.i { "[$address] Disconnecting. ${formatSessionStats()}" }
connectionScope.cancel("close() called")
// GATT cleanup must outlive serviceScope cancellation — GlobalScope is intentional.
// SharedRadioInterfaceService cancels serviceScope immediately after close(), so a
// GATT cleanup must outlive scope cancellation — GlobalScope is intentional.
// SharedRadioInterfaceService cancels the scope immediately after close(), so a
// coroutine launched there may never run, leaking BluetoothGatt (causes GATT 133).
@OptIn(DelicateCoroutinesApi::class)
GlobalScope.launch {
@ -493,12 +419,12 @@ class BleRadioInterface(
"[$address] Dispatching packet #$packetsReceived " +
"(${packet.size} bytes, total RX: $bytesReceived bytes)"
}
service.handleFromRadio(packet)
callback.handleFromRadio(packet)
}
private fun handleFailure(throwable: Throwable) {
val (isPermanent, msg) = throwable.toDisconnectReason()
service.onDisconnect(isPermanent, errorMessage = msg)
callback.onDisconnect(isPermanent, errorMessage = msg)
}
/** Formats a one-line session statistics summary for logging. */

View file

@ -0,0 +1,170 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
/**
* Encapsulates the BLE reconnection policy with exponential backoff.
*
* The policy tracks consecutive failures and decides whether to retry, signal a transient disconnect (DeviceSleep), or
* give up permanently.
*
* @param maxFailures maximum consecutive failures before giving up permanently
* @param failureThreshold after this many consecutive failures, signal a transient disconnect
* @param settleDelay delay before each connection attempt to let the BLE stack settle
* @param minStableConnection minimum time a connection must stay up to be considered "stable"
* @param backoffStrategy computes the backoff delay for a given failure count
*/
class BleReconnectPolicy(
private val maxFailures: Int = DEFAULT_MAX_FAILURES,
private val failureThreshold: Int = DEFAULT_FAILURE_THRESHOLD,
private val settleDelay: Duration = DEFAULT_SETTLE_DELAY,
/** Minimum time a connection must stay up to be considered "stable". Exposed for callers to compare uptime. */
val minStableConnection: Duration = DEFAULT_MIN_STABLE_CONNECTION,
private val backoffStrategy: (attempt: Int) -> Duration = ::computeReconnectBackoff,
) {
/** Outcome of a single reconnect iteration. */
sealed interface Outcome {
/** Connection attempt succeeded and then eventually disconnected. */
data class Disconnected(val wasStable: Boolean, val wasIntentional: Boolean) : Outcome
/** Connection attempt failed with an exception. */
data class Failed(val error: Throwable) : Outcome
}
/** Action the caller should take after the policy processes an outcome. */
sealed interface Action {
/** Retry the connection after the specified backoff delay. */
data class Retry(val backoff: Duration) : Action
/** Signal a transient disconnect to higher layers. */
data class SignalTransient(val backoff: Duration) : Action
/** Give up permanently. */
data object GiveUp : Action
/** Continue immediately (e.g. after an intentional disconnect). */
data object Continue : Action
}
internal var consecutiveFailures: Int = 0
private set
/** Processes the outcome of a connection attempt and returns the action the caller should take. */
fun processOutcome(outcome: Outcome): Action = when (outcome) {
is Outcome.Disconnected -> {
if (outcome.wasIntentional) {
consecutiveFailures = 0
Action.Continue
} else if (outcome.wasStable) {
consecutiveFailures = 0
Action.Continue
} else {
consecutiveFailures++
Logger.w { "Unstable connection (consecutive failures: $consecutiveFailures)" }
evaluateFailure()
}
}
is Outcome.Failed -> {
consecutiveFailures++
Logger.w { "Connection failed (consecutive failures: $consecutiveFailures)" }
evaluateFailure()
}
}
private fun evaluateFailure(): Action {
if (consecutiveFailures >= maxFailures) {
return Action.GiveUp
}
val backoff = backoffStrategy(consecutiveFailures)
return if (consecutiveFailures >= failureThreshold) {
Action.SignalTransient(backoff)
} else {
Action.Retry(backoff)
}
}
/**
* Runs the reconnect loop, calling [attempt] for each iteration.
*
* The [attempt] lambda should perform a single connect-and-wait cycle and return the [Outcome] when the connection
* drops or an error occurs.
*
* @param attempt performs a single connection attempt and returns the outcome
* @param onTransientDisconnect called when the policy decides to signal a transient disconnect
* @param onPermanentDisconnect called when the policy gives up permanently
*/
suspend fun execute(
attempt: suspend () -> Outcome,
onTransientDisconnect: suspend (Throwable?) -> Unit,
onPermanentDisconnect: suspend (Throwable?) -> Unit,
) {
while (coroutineContext.isActive) {
delay(settleDelay)
val outcome = attempt()
val lastError = (outcome as? Outcome.Failed)?.error
when (val action = processOutcome(outcome)) {
is Action.Continue -> continue
is Action.Retry -> {
Logger.d { "Retrying in ${action.backoff} (failure #$consecutiveFailures)" }
delay(action.backoff)
}
is Action.SignalTransient -> {
onTransientDisconnect(lastError)
Logger.d { "Retrying in ${action.backoff} (failure #$consecutiveFailures)" }
delay(action.backoff)
}
is Action.GiveUp -> {
Logger.e { "Giving up after $consecutiveFailures consecutive failures" }
onPermanentDisconnect(lastError)
return
}
}
}
}
companion object {
const val DEFAULT_MAX_FAILURES = 10
const val DEFAULT_FAILURE_THRESHOLD = 3
val DEFAULT_SETTLE_DELAY = 1.seconds
val DEFAULT_MIN_STABLE_CONNECTION = 5.seconds
internal val RECONNECT_BASE_DELAY = 5.seconds
internal val RECONNECT_MAX_DELAY = 60.seconds
internal const val BACKOFF_MAX_EXPONENT = 4
}
}
/**
* Returns the reconnect backoff delay for a given consecutive failure count.
*
* Backoff schedule: 1 failure 5 s, 2 failures 10 s, 3 failures 20 s, 4 failures 40 s, 5+ failures 60 s
* (capped).
*/
internal fun computeReconnectBackoff(consecutiveFailures: Int): Duration {
if (consecutiveFailures <= 0) return BleReconnectPolicy.RECONNECT_BASE_DELAY
val multiplier = 1 shl (consecutiveFailures - 1).coerceAtMost(BleReconnectPolicy.BACKOFF_MAX_EXPONENT)
return minOf(BleReconnectPolicy.RECONNECT_BASE_DELAY * multiplier, BleReconnectPolicy.RECONNECT_MAX_DELAY)
}

View file

@ -1,28 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioTransport
/** This interface defines the contract that all radio backend implementations must adhere to. */
interface InterfaceSpec<T : RadioTransport> {
fun createInterface(rest: String, service: RadioInterfaceService): T
/** Return true if this address is still acceptable. For BLE that means, still bonded */
fun addressValid(rest: String): Boolean = true
}

View file

@ -1,26 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.koin.core.annotation.Single
import org.meshtastic.core.repository.RadioInterfaceService
/** Factory for creating `MockInterface` instances. */
@Single
class MockInterfaceFactory {
fun create(rest: String, service: RadioInterfaceService): MockInterface = MockInterface(service, rest)
}

View file

@ -1,30 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.koin.core.annotation.Single
import org.meshtastic.core.repository.RadioInterfaceService
/** Mock interface backend implementation. */
@Single
class MockInterfaceSpec(private val factory: MockInterfaceFactory) : InterfaceSpec<MockInterface> {
override fun createInterface(rest: String, service: RadioInterfaceService): MockInterface =
factory.create(rest, service)
/** Return true if this address is still acceptable. For BLE that means, still bonded */
override fun addressValid(rest: String): Boolean = true
}

View file

@ -17,6 +17,7 @@
package org.meshtastic.core.network.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import okio.ByteString.Companion.encodeUtf8
import okio.ByteString.Companion.toByteString
@ -25,8 +26,8 @@ import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.model.Channel
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.util.getInitials
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioTransport
import org.meshtastic.core.repository.RadioTransportCallback
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.Config
import org.meshtastic.proto.Data
@ -55,9 +56,13 @@ private val defaultLoRaConfig = Config.LoRaConfig(use_preset = true, region = Co
private val defaultChannel = ProtoChannel(settings = Channel.default.settings, role = ProtoChannel.Role.PRIMARY)
/** A simulated interface that is used for testing in the simulator */
/** A simulated transport that is used for testing in the simulator. */
@Suppress("detekt:TooManyFunctions", "detekt:MagicNumber")
class MockInterface(private val service: RadioInterfaceService, val address: String) : RadioTransport {
class MockRadioTransport(
private val callback: RadioTransportCallback,
private val scope: CoroutineScope,
val address: String,
) : RadioTransport {
companion object {
private const val MY_NODE = 0x42424242
@ -68,13 +73,22 @@ class MockInterface(private val service: RadioInterfaceService, val address: Str
// an infinite sequence of ints
private val packetIdSequence = generateSequence { currentPacketId++ }.iterator()
init {
Logger.i { "Starting the mock interface" }
service.onConnect() // Tell clients they can use the API
override fun start() {
Logger.i { "Starting the mock transport" }
callback.onConnect() // Tell clients they can use the API
}
override fun handleSendToRadio(p: ByteArray) {
val pr = ToRadio.ADAPTER.decode(p)
// Intercept want_config handshake — send config response only when requested,
// mirroring the behaviour of real firmware which waits for want_config_id.
val wantConfigId = pr.want_config_id ?: 0
if (wantConfigId != 0) {
sendConfigResponse(wantConfigId)
return
}
val packet = pr.packet
if (packet != null) {
sendQueueStatus(packet.id)
@ -83,11 +97,10 @@ class MockInterface(private val service: RadioInterfaceService, val address: Str
val data = packet?.decoded
when {
(pr.want_config_id ?: 0) != 0 -> sendConfigResponse(pr.want_config_id ?: 0)
data != null && data.portnum == PortNum.ADMIN_APP ->
handleAdminPacket(pr, AdminMessage.ADAPTER.decode(data.payload))
packet != null && packet.want_ack == true -> sendFakeAck(pr)
else -> Logger.i { "Ignoring data sent to mock interface $pr" }
else -> Logger.i { "Ignoring data sent to mock transport $pr" }
}
}
@ -127,12 +140,12 @@ class MockInterface(private val service: RadioInterfaceService, val address: Str
)
}
else -> Logger.i { "Ignoring admin sent to mock interface $d" }
else -> Logger.i { "Ignoring admin sent to mock transport $d" }
}
}
override fun close() {
Logger.i { "Closing the mock interface" }
Logger.i { "Closing the mock transport" }
}
// / Generate a fake text message from a node
@ -279,7 +292,7 @@ class MockInterface(private val service: RadioInterfaceService, val address: Str
Data(portnum = PortNum.ROUTING_APP, payload = Routing().encode().toByteString(), request_id = msgId),
)
private fun sendQueueStatus(msgId: Int) = service.handleFromRadio(
private fun sendQueueStatus(msgId: Int) = callback.handleFromRadio(
FromRadio(queueStatus = QueueStatus(res = 0, free = 16, mesh_packet_id = msgId)).encode(),
)
@ -291,14 +304,14 @@ class MockInterface(private val service: RadioInterfaceService, val address: Str
toIn,
Data(portnum = PortNum.ADMIN_APP, payload = adminMsg.encode().toByteString(), request_id = reqId),
)
service.handleFromRadio(p.encode())
callback.handleFromRadio(p.encode())
}
// / Send a fake ack packet back if the sender asked for want_ack
private fun sendFakeAck(pr: ToRadio) = service.serviceScope.handledLaunch {
private fun sendFakeAck(pr: ToRadio) = scope.handledLaunch {
val packet = pr.packet ?: return@handledLaunch
delay(2000)
service.handleFromRadio(makeAck(MY_NODE + 1, packet.from, packet.id).encode())
callback.handleFromRadio(makeAck(MY_NODE + 1, packet.from, packet.id).encode())
}
private fun sendConfigResponse(configId: Int) {
@ -353,6 +366,6 @@ class MockInterface(private val service: RadioInterfaceService, val address: Str
makeNodeStatus(MY_NODE + 1),
)
packets.forEach { p -> service.handleFromRadio(p.encode()) }
packets.forEach { p -> callback.handleFromRadio(p.encode()) }
}
}

View file

@ -1,25 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.koin.core.annotation.Single
/** Factory for creating `NopInterface` instances. */
@Single
class NopInterfaceFactory {
fun create(rest: String): NopInterface = NopInterface(rest)
}

View file

@ -1,26 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import org.koin.core.annotation.Single
import org.meshtastic.core.repository.RadioInterfaceService
/** No-op interface backend implementation. */
@Single
class NopInterfaceSpec(private val factory: NopInterfaceFactory) : InterfaceSpec<NopInterface> {
override fun createInterface(rest: String, service: RadioInterfaceService): NopInterface = factory.create(rest)
}

View file

@ -18,7 +18,14 @@ package org.meshtastic.core.network.radio
import org.meshtastic.core.repository.RadioTransport
class NopInterface(val address: String) : RadioTransport {
/**
* An intentionally inert [RadioTransport] that silently discards all operations.
*
* Used as a safe default when no valid device address is configured or when the requested transport type is
* unsupported. All method calls are no-ops it never connects, never sends data, and never signals lifecycle events to
* the service layer.
*/
class NopRadioTransport(val address: String) : RadioTransport {
override fun handleSendToRadio(p: ByteArray) {
// No-op
}

View file

@ -17,10 +17,11 @@
package org.meshtastic.core.network.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.network.transport.StreamFrameCodec
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioTransport
import org.meshtastic.core.repository.RadioTransportCallback
/**
* An interface that assumes we are talking to a meshtastic device over some sort of stream connection (serial or TCP
@ -28,9 +29,11 @@ import org.meshtastic.core.repository.RadioTransport
*
* Delegates framing logic to [StreamFrameCodec] from `core:network`.
*/
abstract class StreamInterface(protected val service: RadioInterfaceService) : RadioTransport {
abstract class StreamTransport(protected val callback: RadioTransportCallback, protected val scope: CoroutineScope) :
RadioTransport {
private val codec = StreamFrameCodec(onPacketReceived = { service.handleFromRadio(it) }, logTag = "StreamInterface")
private val codec =
StreamFrameCodec(onPacketReceived = { callback.handleFromRadio(it) }, logTag = "StreamTransport")
override fun close() {
Logger.d { "Closing stream for good" }
@ -38,33 +41,34 @@ abstract class StreamInterface(protected val service: RadioInterfaceService) : R
}
/**
* Tell MeshService our device has gone away, but wait for it to come back
* Notify the transport callback that our device has gone away, but wait for it to come back.
*
* @param waitForStopped if true we should wait for the manager to finish - must be false if called from inside the
* manager callbacks
* @param waitForStopped if true we should wait for the transport to finish - must be false if called from inside
* transport callbacks
* @param isPermanent true if the device is definitely gone (e.g. USB unplugged), false if it may come back (e.g.
* TCP transient disconnect). Defaults to true for serial subclasses like [TCPInterface] override with false.
* TCP transient disconnect). Defaults to true for serial subclasses may override with false.
*/
protected open fun onDeviceDisconnect(waitForStopped: Boolean, isPermanent: Boolean = true) {
service.onDisconnect(isPermanent = isPermanent)
callback.onDisconnect(isPermanent = isPermanent)
}
protected open fun connect() {
// Before telling mesh service, send a few START1s to wake a sleeping device
// Before connecting, send a few START1s to wake a sleeping device
sendBytes(StreamFrameCodec.WAKE_BYTES)
// Now tell clients they can (finally use the api)
service.onConnect()
callback.onConnect()
}
/** Writes raw bytes to the underlying stream (serial port, TCP socket, etc.). */
abstract fun sendBytes(p: ByteArray)
// If subclasses need to flush at the end of a packet they can implement
/** Flushes buffered bytes to the underlying stream. No-op by default. */
open fun flushBytes() {}
override fun handleSendToRadio(p: ByteArray) {
// This method is called from a continuation and it might show up late, so check for uart being null
service.serviceScope.handledLaunch { codec.frameAndSend(p, ::sendBytes, ::flushBytes) }
scope.handledLaunch { codec.frameAndSend(p, ::sendBytes, ::flushBytes) }
}
/** Process a single incoming byte through the stream framing state machine. */

View file

@ -18,12 +18,15 @@ package org.meshtastic.core.network.repository
import co.touchlab.kermit.Logger
import io.github.davidepianca98.MQTTClient
import io.github.davidepianca98.mqtt.MQTTException
import io.github.davidepianca98.mqtt.MQTTVersion
import io.github.davidepianca98.mqtt.Subscription
import io.github.davidepianca98.mqtt.packets.Qos
import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode
import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions
import io.github.davidepianca98.socket.IOException
import io.github.davidepianca98.socket.tls.TLSClientSettings
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
@ -36,9 +39,12 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.SerializationException
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonDecodingException
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.MqttJsonPayload
import org.meshtastic.core.model.util.subscribeList
import org.meshtastic.core.repository.NodeRepository
@ -50,7 +56,7 @@ import kotlin.concurrent.Volatile
class MQTTRepositoryImpl(
private val radioConfigRepository: RadioConfigRepository,
private val nodeRepository: NodeRepository,
dispatchers: org.meshtastic.core.di.CoroutineDispatchers,
dispatchers: CoroutineDispatchers,
) : MQTTRepository {
companion object {
@ -78,14 +84,15 @@ class MQTTRepositoryImpl(
@Suppress("TooGenericExceptionCaught")
override fun disconnect() {
Logger.i { "MQTT Disconnecting" }
val c = client
client = null // Null first to prevent re-entrant disconnect
try {
client?.disconnect(ReasonCode.SUCCESS)
c?.disconnect(ReasonCode.SUCCESS)
} catch (e: Exception) {
Logger.w(e) { "MQTT clean disconnect failed" }
}
clientJob?.cancel()
clientJob = null
client = null
}
@OptIn(ExperimentalUnsignedTypes::class)
@ -123,10 +130,10 @@ class MQTTRepositoryImpl(
Logger.d { "MQTT parsed JSON payload successfully" }
trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain))
} catch (e: kotlinx.serialization.json.JsonDecodingException) {
} catch (e: JsonDecodingException) {
@OptIn(ExperimentalSerializationApi::class)
Logger.e(e) { "Failed to parse MQTT JSON: ${e.shortMessage} (path: ${e.path})" }
} catch (e: kotlinx.serialization.SerializationException) {
} catch (e: SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
@ -180,11 +187,11 @@ class MQTTRepositoryImpl(
// Reset backoff so the next reconnect starts with the minimum delay.
reconnectDelay = INITIAL_RECONNECT_DELAY_MS
Logger.w { "MQTT client loop ended normally, reconnecting in ${reconnectDelay}ms" }
} catch (e: io.github.davidepianca98.mqtt.MQTTException) {
} catch (e: MQTTException) {
Logger.e(e) { "MQTT Client loop error (MQTT), reconnecting in ${reconnectDelay}ms" }
} catch (e: io.github.davidepianca98.socket.IOException) {
} catch (e: IOException) {
Logger.e(e) { "MQTT Client loop error (IO), reconnecting in ${reconnectDelay}ms" }
} catch (e: kotlinx.coroutines.CancellationException) {
} catch (e: CancellationException) {
Logger.i { "MQTT Client loop cancelled" }
throw e
}

View file

@ -0,0 +1,57 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.transport
import co.touchlab.kermit.Logger
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import kotlin.concurrent.atomics.AtomicInt
import kotlin.concurrent.atomics.ExperimentalAtomicApi
/**
* Shared heartbeat sender for Meshtastic radio transports.
*
* Constructs and sends a `ToRadio(heartbeat = Heartbeat(nonce = ...))` message to keep the firmware's idle timer from
* expiring. Each call uses a monotonically increasing nonce to prevent the firmware's per-connection duplicate-write
* filter from silently dropping it.
*
* @param sendToRadio callback to transmit the encoded heartbeat bytes to the radio
* @param afterHeartbeat optional suspend callback invoked after sending (e.g. to schedule a drain)
* @param logTag tag for log messages
*/
class HeartbeatSender(
private val sendToRadio: (ByteArray) -> Unit,
private val afterHeartbeat: (suspend () -> Unit)? = null,
private val logTag: String = "HeartbeatSender",
) {
@OptIn(ExperimentalAtomicApi::class)
private val nonce = AtomicInt(0)
/**
* Sends a heartbeat to the radio.
*
* The firmware responds to heartbeats by queuing a `queueStatus` FromRadio packet, proving the link is alive and
* keeping the local node's lastHeard timestamp current.
*/
@OptIn(ExperimentalAtomicApi::class)
suspend fun sendHeartbeat() {
val n = nonce.fetchAndAdd(1)
Logger.v { "[$logTag] Sending ToRadio heartbeat (nonce=$n)" }
sendToRadio(ToRadio(heartbeat = Heartbeat(nonce = n)).encode())
afterHeartbeat?.invoke()
}
}

View file

@ -36,10 +36,9 @@ import org.meshtastic.core.testing.FakeBluetoothRepository
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalCoroutinesApi::class)
class BleRadioInterfaceTest {
class BleRadioTransportTest {
private val testScope = TestScope()
private val scanner = FakeBleScanner()
@ -56,66 +55,69 @@ class BleRadioInterfaceTest {
}
@Test
fun `connect attempts to scan and connect via init`() = runTest {
fun `connect attempts to scan and connect via start`() = runTest {
val device = FakeBleDevice(address = address, name = "Test Device")
scanner.emitDevice(device)
val bleInterface =
BleRadioInterface(
serviceScope = testScope,
val bleTransport =
BleRadioTransport(
scope = testScope,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
service = service,
callback = service,
address = address,
)
bleTransport.start()
// init starts connect() which is async
// start() begins connect() which is async
// In a real test we'd verify the connection state,
// but for now this confirms it works with the fakes.
assertEquals(address, bleInterface.address)
assertEquals(address, bleTransport.address)
}
@Test
fun `address returns correct value`() {
val bleInterface =
BleRadioInterface(
serviceScope = testScope,
val bleTransport =
BleRadioTransport(
scope = testScope,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
service = service,
callback = service,
address = address,
)
assertEquals(address, bleInterface.address)
assertEquals(address, bleTransport.address)
}
/**
* After [RECONNECT_FAILURE_THRESHOLD] consecutive connection failures, [RadioInterfaceService.onDisconnect] must be
* called so the higher layers can react (e.g. start the device-sleep timeout in [MeshConnectionManagerImpl]).
* After [BleReconnectPolicy.DEFAULT_FAILURE_THRESHOLD] consecutive connection failures,
* [RadioInterfaceService.onDisconnect] must be called so the higher layers can react (e.g. start the device-sleep
* timeout in [MeshConnectionManagerImpl]).
*
* Virtual-time breakdown (RECONNECT_FAILURE_THRESHOLD = 3): t = 1 000 ms iteration 1 settle delay elapses,
* Virtual-time breakdown (DEFAULT_FAILURE_THRESHOLD = 3): t = 1 000 ms iteration 1 settle delay elapses,
* connectAndAwait throws, backoff 5 s starts t = 6 000 ms backoff ends t = 7 000 ms iteration 2 settle delay
* elapses, connectAndAwait throws, backoff 10 s starts t = 17 000 ms backoff ends t = 18 000 ms iteration 3
* settle delay elapses, connectAndAwait throws onDisconnect called
*/
@Test
fun `onDisconnect is called after RECONNECT_FAILURE_THRESHOLD consecutive failures`() = runTest {
fun `onDisconnect is called after DEFAULT_FAILURE_THRESHOLD consecutive failures`() = runTest {
val device = FakeBleDevice(address = address, name = "Test Device")
bluetoothRepository.bond(device) // skip BLE scan — device is already bonded
// Make every connectAndAwait call throw so each iteration counts as one failure.
connection.connectException = RadioNotConnectedException("simulated failure")
val bleInterface =
BleRadioInterface(
serviceScope = this,
val bleTransport =
BleRadioTransport(
scope = this,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
service = service,
callback = service,
address = address,
)
bleTransport.start()
// Advance through exactly 3 failure iterations (≈18 001 ms virtual time).
// The 4th iteration's backoff hasn't elapsed yet, so the coroutine is suspended
@ -125,12 +127,12 @@ class BleRadioInterfaceTest {
verify { service.onDisconnect(any(), any()) }
// Cancel the reconnect loop so runTest can complete.
bleInterface.close()
bleTransport.close()
}
/**
* After [RECONNECT_MAX_FAILURES] (10) consecutive failures, the reconnect loop should stop and signal a permanent
* disconnect. This prevents infinite battery drain when the device is genuinely offline.
* After [BleReconnectPolicy.DEFAULT_MAX_FAILURES] (10) consecutive failures, the reconnect loop should stop and
* signal a permanent disconnect. This prevents infinite battery drain when the device is genuinely offline.
*
* Time budget for 10 failures with bonded device (no scan): Each iteration = 1s settle + connectAndAwait throw +
* backoff Backoffs: 5s, 10s, 20s, 40s, 60s, 60s, 60s, 60s, 60s, (exit at failure 10 before backoff) Total 10×1s
@ -138,22 +140,23 @@ class BleRadioInterfaceTest {
* variance.
*/
@Test
fun `reconnect loop stops after RECONNECT_MAX_FAILURES with permanent disconnect`() = runTest {
fun `reconnect loop stops after DEFAULT_MAX_FAILURES with permanent disconnect`() = runTest {
val device = FakeBleDevice(address = address, name = "Test Device")
bluetoothRepository.bond(device)
connection.connectException = RadioNotConnectedException("simulated failure")
every { service.onDisconnect(any(), any()) } returns Unit
val bleInterface =
BleRadioInterface(
serviceScope = this,
val bleTransport =
BleRadioTransport(
scope = this,
scanner = scanner,
bluetoothRepository = bluetoothRepository,
connectionFactory = connectionFactory,
service = service,
callback = service,
address = address,
)
bleTransport.start()
// Advance enough time for all 10 failures to occur.
advanceTimeBy(400_001L)
@ -161,18 +164,6 @@ class BleRadioInterfaceTest {
// Should have been called with isPermanent=true at least once (the final call).
verify { service.onDisconnect(isPermanent = true, errorMessage = any()) }
bleInterface.close()
}
@Test
fun `computeReconnectBackoff returns correct backoff values`() {
assertEquals(5.seconds, computeReconnectBackoff(0))
assertEquals(5.seconds, computeReconnectBackoff(1))
assertEquals(10.seconds, computeReconnectBackoff(2))
assertEquals(20.seconds, computeReconnectBackoff(3))
assertEquals(40.seconds, computeReconnectBackoff(4))
assertEquals(60.seconds, computeReconnectBackoff(5))
assertEquals(60.seconds, computeReconnectBackoff(10))
assertEquals(60.seconds, computeReconnectBackoff(100))
bleTransport.close()
}
}

View file

@ -0,0 +1,277 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.radio
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
class BleReconnectPolicyTest {
@Test
fun `stable disconnect resets failures and returns Continue`() {
val policy = BleReconnectPolicy()
// Simulate one prior failure
policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test")))
assertEquals(1, policy.consecutiveFailures)
// Now a stable disconnect should reset
val action =
policy.processOutcome(BleReconnectPolicy.Outcome.Disconnected(wasStable = true, wasIntentional = false))
assertEquals(BleReconnectPolicy.Action.Continue, action)
assertEquals(0, policy.consecutiveFailures)
}
@Test
fun `intentional disconnect resets failures and returns Continue`() {
val policy = BleReconnectPolicy()
policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test")))
val action =
policy.processOutcome(BleReconnectPolicy.Outcome.Disconnected(wasStable = false, wasIntentional = true))
assertEquals(BleReconnectPolicy.Action.Continue, action)
assertEquals(0, policy.consecutiveFailures)
}
@Test
fun `unstable disconnect increments failures`() {
val policy = BleReconnectPolicy()
val action =
policy.processOutcome(BleReconnectPolicy.Outcome.Disconnected(wasStable = false, wasIntentional = false))
assertEquals(1, policy.consecutiveFailures)
assertTrue(action is BleReconnectPolicy.Action.Retry)
}
@Test
fun `failure at threshold signals transient disconnect`() {
val policy = BleReconnectPolicy(failureThreshold = 3)
// Accumulate failures up to threshold
repeat(2) { policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test"))) }
val action = policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test")))
assertEquals(3, policy.consecutiveFailures)
assertTrue(action is BleReconnectPolicy.Action.SignalTransient)
}
@Test
fun `failure at max gives up permanently`() {
val policy = BleReconnectPolicy(maxFailures = 3)
repeat(2) { policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test"))) }
val action = policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test")))
assertEquals(BleReconnectPolicy.Action.GiveUp, action)
}
@Test
fun `backoff increases with consecutive failures`() {
val policy = BleReconnectPolicy()
val backoffs =
(1..5).map { i ->
val action = policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test")))
when (action) {
is BleReconnectPolicy.Action.Retry -> action.backoff
is BleReconnectPolicy.Action.SignalTransient -> action.backoff
else -> error("Unexpected action: $action")
}
}
// Verify backoffs are non-decreasing
for (i in 0 until backoffs.size - 1) {
assertTrue(backoffs[i] <= backoffs[i + 1], "Expected ${backoffs[i]} <= ${backoffs[i + 1]}")
}
}
@Test
fun `custom backoff strategy is used`() {
val customBackoff = 42.seconds
val policy = BleReconnectPolicy(backoffStrategy = { customBackoff })
val action = policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test")))
assertTrue(action is BleReconnectPolicy.Action.Retry)
assertEquals(customBackoff, action.backoff)
}
@Test
fun `maxFailures equal to failureThreshold gives up without signalling transient`() {
val policy = BleReconnectPolicy(maxFailures = 3, failureThreshold = 3)
repeat(2) { policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test"))) }
val action = policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test")))
// GiveUp takes priority over SignalTransient when both thresholds are the same
assertEquals(BleReconnectPolicy.Action.GiveUp, action)
}
@Test
fun `failure count resets after stable disconnect then re-increments`() {
val policy = BleReconnectPolicy()
// Accumulate two failures
repeat(2) { policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test"))) }
assertEquals(2, policy.consecutiveFailures)
// Stable disconnect resets
policy.processOutcome(BleReconnectPolicy.Outcome.Disconnected(wasStable = true, wasIntentional = false))
assertEquals(0, policy.consecutiveFailures)
// New failure starts from 1
policy.processOutcome(BleReconnectPolicy.Outcome.Failed(RuntimeException("test")))
assertEquals(1, policy.consecutiveFailures)
}
// region execute() loop tests
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `execute gives up after maxFailures and calls onPermanentDisconnect`() = runTest {
val policy =
BleReconnectPolicy(maxFailures = 3, settleDelay = 1.milliseconds, backoffStrategy = { 1.milliseconds })
var permanentError: Throwable? = null
var permanentCalled = false
var transientCalled = false
policy.execute(
attempt = { BleReconnectPolicy.Outcome.Failed(RuntimeException("connection failed")) },
onTransientDisconnect = { transientCalled = true },
onPermanentDisconnect = { error ->
permanentCalled = true
permanentError = error
},
)
assertTrue(permanentCalled, "onPermanentDisconnect should have been called")
assertNotNull(permanentError, "error should be passed to onPermanentDisconnect")
assertEquals("connection failed", permanentError?.message)
assertEquals(3, policy.consecutiveFailures)
// failureThreshold defaults to 3, same as maxFailures here, so GiveUp takes priority
assertTrue(!transientCalled, "onTransientDisconnect should not be called when GiveUp fires first")
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `execute calls onTransientDisconnect at threshold then continues retrying`() = runTest {
var attemptCount = 0
val policy =
BleReconnectPolicy(
maxFailures = 5,
failureThreshold = 2,
settleDelay = 1.milliseconds,
backoffStrategy = { 1.milliseconds },
)
var transientCount = 0
policy.execute(
attempt = {
attemptCount++
BleReconnectPolicy.Outcome.Failed(RuntimeException("fail #$attemptCount"))
},
onTransientDisconnect = { transientCount++ },
onPermanentDisconnect = {},
)
assertEquals(5, attemptCount, "should attempt exactly maxFailures times")
// Transient is signalled for failures 2, 3, 4 (at or above threshold, below maxFailures)
assertEquals(3, transientCount, "should signal transient for each failure at or above threshold")
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `execute continues immediately after stable disconnect`() = runTest {
var attemptCount = 0
val policy =
BleReconnectPolicy(maxFailures = 5, settleDelay = 1.milliseconds, backoffStrategy = { 1.milliseconds })
policy.execute(
attempt = {
attemptCount++
if (attemptCount <= 2) {
// First two attempts connect briefly and disconnect stably
BleReconnectPolicy.Outcome.Disconnected(wasStable = true, wasIntentional = false)
} else {
// Then fail until maxFailures
BleReconnectPolicy.Outcome.Failed(RuntimeException("fail"))
}
},
onTransientDisconnect = {},
onPermanentDisconnect = {},
)
// 2 stable disconnects + 5 failures (counter resets after each stable, so needs 5 more to hit max)
assertEquals(7, attemptCount)
assertEquals(5, policy.consecutiveFailures)
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `execute passes null error for unstable disconnect at threshold`() = runTest {
val policy =
BleReconnectPolicy(
maxFailures = 5,
failureThreshold = 2,
settleDelay = 1.milliseconds,
backoffStrategy = { 1.milliseconds },
)
val transientErrors = mutableListOf<Throwable?>()
var attemptCount = 0
policy.execute(
attempt = {
attemptCount++
// Use unstable disconnects (not Failed) so lastError is null
BleReconnectPolicy.Outcome.Disconnected(wasStable = false, wasIntentional = false)
},
onTransientDisconnect = { error -> transientErrors.add(error) },
onPermanentDisconnect = {},
)
// Disconnected outcomes don't have errors, so all transient callbacks get null
assertTrue(transientErrors.all { it == null }, "Disconnected outcomes should pass null error")
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `execute stops when coroutine is cancelled`() = runTest {
var attemptCount = 0
val policy =
BleReconnectPolicy(maxFailures = 100, settleDelay = 1.milliseconds, backoffStrategy = { 1.milliseconds })
val job =
backgroundScope.launch {
policy.execute(
attempt = {
attemptCount++
// Always succeed stably — loop should run until cancelled
BleReconnectPolicy.Outcome.Disconnected(wasStable = true, wasIntentional = false)
},
onTransientDisconnect = {},
onPermanentDisconnect = {},
)
}
// Let a few iterations run, then cancel
advanceTimeBy(50)
job.cancel()
advanceUntilIdle()
// Should have made some attempts but not reached maxFailures
assertTrue(attemptCount > 0, "should have attempted at least once")
assertTrue(attemptCount < 100, "should not have exhausted all failures — was cancelled")
}
// endregion
}

View file

@ -22,7 +22,7 @@ import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds
/**
* Tests the exponential backoff schedule used by [BleRadioInterface] when consecutive connection attempts fail. The
* Tests the exponential backoff schedule used by [BleRadioTransport] when consecutive connection attempts fail. The
* schedule is: failure #1 5 s failure #2 10 s failure #3 20 s failure #4 40 s failure #5+ 60 s (capped)
*/
class ReconnectBackoffTest {

View file

@ -17,8 +17,6 @@
package org.meshtastic.core.network.radio
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.mock
import dev.mokkery.verify
import io.kotest.property.Arb
@ -29,17 +27,16 @@ import io.kotest.property.checkAll
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.network.transport.StreamFrameCodec
import org.meshtastic.core.repository.RadioInterfaceService
import kotlin.test.BeforeTest
import org.meshtastic.core.repository.RadioTransportCallback
import kotlin.test.Test
import kotlin.test.assertTrue
class StreamInterfaceTest {
class StreamTransportTest {
private val radioService: RadioInterfaceService = mock(MockMode.autofill)
private lateinit var fakeStream: FakeStreamInterface
private val callback: RadioTransportCallback = mock(MockMode.autofill)
private lateinit var fakeStream: FakeStreamTransport
class FakeStreamInterface(service: RadioInterfaceService) : StreamInterface(service) {
class FakeStreamTransport(callback: RadioTransportCallback, scope: TestScope) : StreamTransport(callback, scope) {
val sentBytes = mutableListOf<ByteArray>()
override fun sendBytes(p: ByteArray) {
@ -59,21 +56,18 @@ class StreamInterfaceTest {
public override fun connect() = super.connect()
}
@BeforeTest
fun setUp() {
every { radioService.serviceScope } returns TestScope()
}
private val testScope = TestScope()
@Test
fun `handleSendToRadio property test`() = runTest {
fakeStream = FakeStreamInterface(radioService)
fakeStream = FakeStreamTransport(callback, testScope)
checkAll(Arb.byteArray(Arb.int(0, 512), Arb.byte())) { payload -> fakeStream.handleSendToRadio(payload) }
}
@Test
fun `readChar property test`() = runTest {
fakeStream = FakeStreamInterface(radioService)
fakeStream = FakeStreamTransport(callback, testScope)
checkAll(Arb.byteArray(Arb.int(0, 100), Arb.byte())) { data ->
data.forEach { fakeStream.feed(it) }
@ -83,11 +77,11 @@ class StreamInterfaceTest {
@Test
fun `connect sends wake bytes`() {
fakeStream = FakeStreamInterface(radioService)
fakeStream = FakeStreamTransport(callback, testScope)
fakeStream.connect()
assertTrue(fakeStream.sentBytes.isNotEmpty())
assertTrue(fakeStream.sentBytes[0].contentEquals(StreamFrameCodec.WAKE_BYTES))
verify { radioService.onConnect() }
verify { callback.onConnect() }
}
}

View file

@ -17,76 +17,76 @@
package org.meshtastic.core.network.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.network.transport.StreamFrameCodec
import org.meshtastic.core.network.transport.TcpTransport
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioTransport
import org.meshtastic.core.repository.RadioTransportCallback
import kotlin.concurrent.Volatile
/**
* Android TCP radio interface thin adapter over the shared [TcpTransport] from `core:network`.
* TCP radio transport thin adapter over the shared [TcpTransport] from `core:network`.
*
* Manages the mapping between the Android-specific [StreamInterface]/[RadioTransport] contract and the shared transport
* layer.
* Implements [RadioTransport] directly via composition over [TcpTransport], delegating send/receive to the transport
* and calling [RadioTransportCallback] for lifecycle events. This avoids the previous inheritance from
* [StreamTransport] which created a dead [StreamFrameCodec] and required overriding `sendBytes` as a no-op.
*/
open class TCPInterface(
service: RadioInterfaceService,
open class TcpRadioTransport(
private val callback: RadioTransportCallback,
private val scope: CoroutineScope,
private val dispatchers: CoroutineDispatchers,
private val address: String,
) : StreamInterface(service) {
) : RadioTransport {
companion object {
const val SERVICE_PORT = StreamFrameCodec.DEFAULT_TCP_PORT
}
/** Guards against a double [RadioTransportCallback.onDisconnect] when [close] triggers [TcpTransport.stop]. */
@Volatile private var closing = false
private val transport =
TcpTransport(
dispatchers = dispatchers,
scope = service.serviceScope,
scope = scope,
listener =
object : TcpTransport.Listener {
override fun onConnected() {
super@TCPInterface.connect()
callback.onConnect()
}
override fun onDisconnected() {
// Transport already performed teardown; only propagate lifecycle to StreamInterface.
if (closing) return // close() will fire the permanent disconnect itself
// TCP disconnects are transient (not permanent) — the transport will auto-reconnect.
super@TCPInterface.onDeviceDisconnect(false, isPermanent = false)
callback.onDisconnect(isPermanent = false)
}
override fun onPacketReceived(bytes: ByteArray) {
service.handleFromRadio(bytes)
callback.handleFromRadio(bytes)
}
},
logTag = "TCPInterface[$address]",
logTag = "TcpRadioTransport[$address]",
)
init {
connect()
}
override fun sendBytes(p: ByteArray) {
// Direct byte sending is handled by the transport; this is used by StreamInterface for serial compat
Logger.d { "[$address] TCPInterface.sendBytes delegated to transport" }
}
override fun onDeviceDisconnect(waitForStopped: Boolean, isPermanent: Boolean) {
transport.stop()
super.onDeviceDisconnect(waitForStopped, isPermanent = false)
}
override fun connect() {
override fun start() {
transport.start(address)
}
override fun close() {
Logger.d { "[$address] Closing TCP transport" }
closing = true
transport.stop()
callback.onDisconnect(isPermanent = true)
}
override fun keepAlive() {
Logger.d { "[$address] TCP keepAlive" }
service.serviceScope.handledLaunch { transport.sendHeartbeat() }
scope.handledLaunch { transport.sendHeartbeat() }
}
override fun handleSendToRadio(p: ByteArray) {
service.serviceScope.handledLaunch { transport.sendPacket(p) }
scope.handledLaunch { transport.sendPacket(p) }
}
}

View file

@ -24,7 +24,6 @@ import kotlinx.coroutines.withContext
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
@ -34,13 +33,14 @@ import java.net.InetAddress
import java.net.Socket
import java.net.SocketTimeoutException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
/**
* Shared JVM TCP transport for Meshtastic radios.
*
* Manages the TCP socket lifecycle (connect, read loop, reconnect with backoff) and uses [StreamFrameCodec] for the
* START1/START2 stream framing protocol. Heartbeat scheduling is owned by [SharedRadioInterfaceService]; this class
* only exposes [sendHeartbeat] for external callers.
* START1/START2 stream framing protocol. [sendHeartbeat] sends a heartbeat with a monotonically-increasing nonce so the
* firmware's per-connection duplicate-write filter does not silently drop it.
*
* Used by Android and Desktop via the shared `SharedRadioInterfaceService`.
*/
@ -109,6 +109,8 @@ class TcpTransport(
@Volatile private var timeoutEvents: Int = 0
private val heartbeatNonce = AtomicInteger(0)
/** Whether the transport is currently connected. */
val isConnected: Boolean
get() {
@ -146,9 +148,10 @@ class TcpTransport(
bytesSent += payload.size
}
/** Send a heartbeat packet to keep the connection alive. */
/** Send a heartbeat packet with a monotonically-increasing nonce to keep the connection alive. */
suspend fun sendHeartbeat() {
val heartbeat = ToRadio(heartbeat = Heartbeat())
val nonce = heartbeatNonce.getAndIncrement()
val heartbeat = ToRadio(heartbeat = org.meshtastic.proto.Heartbeat(nonce = nonce))
sendPacket(heartbeat.encode())
}

View file

@ -19,18 +19,19 @@ package org.meshtastic.core.network
import co.touchlab.kermit.Logger
import com.fazecast.jSerialComm.SerialPort
import com.fazecast.jSerialComm.SerialPortTimeoutException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.network.radio.StreamInterface
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import org.meshtastic.core.network.radio.StreamTransport
import org.meshtastic.core.network.transport.HeartbeatSender
import org.meshtastic.core.repository.RadioTransportCallback
import java.io.File
/**
* JVM-specific implementation of [RadioTransport] using jSerialComm. Uses [StreamInterface] for START1/START2 packet
* JVM-specific implementation of [RadioTransport] using jSerialComm. Uses [StreamTransport] for START1/START2 packet
* framing.
*
* Use the [open] factory method instead of the constructor directly to ensure the serial port is opened and the read
@ -40,12 +41,15 @@ class SerialTransport
private constructor(
private val portName: String,
private val baudRate: Int = DEFAULT_BAUD_RATE,
service: RadioInterfaceService,
callback: RadioTransportCallback,
scope: CoroutineScope,
private val dispatchers: CoroutineDispatchers,
) : StreamInterface(service) {
) : StreamTransport(callback, scope) {
private var serialPort: SerialPort? = null
private var readJob: Job? = null
private val heartbeatSender = HeartbeatSender(sendToRadio = ::handleSendToRadio, logTag = "Serial[$portName]")
/** Attempts to open the serial port and starts the read loop. Returns true if successful, false otherwise. */
private fun startConnection(): Boolean {
return try {
@ -57,7 +61,7 @@ private constructor(
port.setDTR()
port.setRTS()
Logger.i { "[$portName] Serial port opened (baud=$baudRate)" }
super.connect() // Sends WAKE_BYTES and signals service.onConnect()
super.connect() // Sends WAKE_BYTES and signals callback.onConnect()
startReadLoop(port)
true
} else {
@ -74,7 +78,7 @@ private constructor(
private fun startReadLoop(port: SerialPort) {
Logger.d { "[$portName] Starting serial read loop" }
readJob =
service.serviceScope.launch(dispatchers.io) {
scope.launch(dispatchers.io) {
val input = port.inputStream
val buffer = ByteArray(READ_BUFFER_SIZE)
try {
@ -91,7 +95,7 @@ private constructor(
}
} catch (_: SerialPortTimeoutException) {
// Expected timeout when no data is available
} catch (e: kotlinx.coroutines.CancellationException) {
} catch (e: CancellationException) {
throw e
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
if (isActive) {
@ -102,7 +106,7 @@ private constructor(
reading = false
}
}
} catch (e: kotlinx.coroutines.CancellationException) {
} catch (e: CancellationException) {
throw e
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
if (isActive) {
@ -140,11 +144,9 @@ private constructor(
}
override fun keepAlive() {
// Send a ToRadio heartbeat so the firmware resets its idle timer and responds with
// a FromRadio queueStatus — proving the serial link is alive. Without this, the
// serial transport has no way to detect a silently dead device.
Logger.d { "[$portName] Serial keepAlive — sending heartbeat" }
handleSendToRadio(ToRadio(heartbeat = Heartbeat()).encode())
// Delegate to HeartbeatSender which sends a ToRadio heartbeat to prove the
// serial link is alive.
scope.launch { heartbeatSender.sendHeartbeat() }
}
private fun closePortResources() {
@ -168,19 +170,20 @@ private constructor(
/**
* Creates and opens a [SerialTransport]. If the port cannot be opened, the transport signals a permanent
* disconnect to the [service] and returns the (non-connected) instance.
* disconnect to the [callback] and returns the (non-connected) instance.
*/
fun open(
portName: String,
baudRate: Int = DEFAULT_BAUD_RATE,
service: RadioInterfaceService,
callback: RadioTransportCallback,
scope: CoroutineScope,
dispatchers: CoroutineDispatchers,
): SerialTransport {
val transport = SerialTransport(portName, baudRate, service, dispatchers)
val transport = SerialTransport(portName, baudRate, callback, scope, dispatchers)
if (!transport.startConnection()) {
val errorMessage = diagnoseOpenFailure(portName)
Logger.w { "[$portName] Serial port could not be opened; signalling disconnect. $errorMessage" }
service.onDisconnect(isPermanent = true, errorMessage = errorMessage)
callback.onDisconnect(isPermanent = true, errorMessage = errorMessage)
}
return transport
}