chore: review-cleanup fleet (audit + fix + hardening) (#5158)

This commit is contained in:
James Rich 2026-04-16 19:02:59 -05:00 committed by GitHub
parent 872c566ef1
commit 17e69c6d4c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
68 changed files with 784 additions and 459 deletions

View file

@ -95,3 +95,18 @@ inline fun <T, R> T.safeCatching(block: T.() -> R): Result<R> = try {
} catch (e: Exception) {
Result.failure(e)
}
/**
* Like [safeCatching] but also catches JVM [Error]s (e.g. [ExceptionInInitializerError] raised by compose-resources'
* lazy skiko initialization on the desktop JVM test classpath). Still re-throws [CancellationException] so structured
* concurrency is preserved. Use when the block invokes code whose failure modes include static-initializer errors and
* the caller only needs a best-effort fallback.
*/
@Suppress("TooGenericExceptionCaught")
inline fun <T> safeCatchingAll(block: () -> T): Result<T> = try {
Result.success(block())
} catch (e: CancellationException) {
throw e
} catch (t: Throwable) {
Result.failure(t)
}

View file

@ -45,6 +45,7 @@ import org.meshtastic.core.repository.PacketRepository
import org.meshtastic.core.repository.PlatformAnalytics
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceBroadcasts
import org.meshtastic.core.repository.UiPrefs
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.Channel
import org.meshtastic.proto.Config
@ -63,6 +64,7 @@ class MeshActionHandlerImpl(
private val dataHandler: Lazy<MeshDataHandler>,
private val analytics: PlatformAnalytics,
private val meshPrefs: MeshPrefs,
private val uiPrefs: UiPrefs,
private val databaseManager: DatabaseManager,
private val notificationManager: NotificationManager,
private val messageProcessor: Lazy<MeshMessageProcessor>,
@ -207,7 +209,7 @@ class MeshActionHandlerImpl(
override fun handleRequestPosition(destNum: Int, position: Position, myNodeNum: Int) {
if (destNum != myNodeNum) {
val provideLocation = meshPrefs.shouldProvideNodeLocation(myNodeNum).value
val provideLocation = uiPrefs.shouldProvideNodeLocation(myNodeNum).value
val currentPosition =
when {
provideLocation && position.isValid() -> position

View file

@ -22,7 +22,9 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@ -73,6 +75,11 @@ class PacketHandlerImpl(
private val queueMutex = Mutex()
private val queuedPackets = mutableListOf<MeshPacket>()
// Unbounded channel preserves FIFO ordering of fire-and-forget sendToRadio(MeshPacket)
// calls. The non-suspend entry point does trySend (always succeeds for UNLIMITED) and
// a single consumer coroutine enqueues packets under queueMutex in arrival order.
private val outboundChannel = Channel<MeshPacket>(Channel.UNLIMITED)
// Set to true by stopPacketQueue() under queueMutex. Checked by startPacketQueueLocked()
// and the queue processor's finally block to prevent restarting a stopped queue.
private var queueStopped = false
@ -80,6 +87,20 @@ class PacketHandlerImpl(
private val responseMutex = Mutex()
private val queueResponse = mutableMapOf<Int, CompletableDeferred<Boolean>>()
init {
// Single consumer serializes enqueues from the non-suspend sendToRadio(MeshPacket)
// entry point, preserving FIFO across rapid concurrent callers.
scope.launch {
outboundChannel.consumeAsFlow().collect { packet ->
queueMutex.withLock {
queueStopped = false // Allow queue to resume after a disconnect/reconnect cycle.
queuedPackets.add(packet)
startPacketQueueLocked()
}
}
}
}
override fun sendToRadio(p: ToRadio) {
Logger.d { "Sending to radio ${p.toPIIString()}" }
val b = p.encode()
@ -104,13 +125,9 @@ class PacketHandlerImpl(
}
override fun sendToRadio(packet: MeshPacket) {
scope.launch {
queueMutex.withLock {
queueStopped = false // Allow queue to resume after a disconnect/reconnect cycle.
queuedPackets.add(packet)
startPacketQueueLocked()
}
}
// Non-suspend entry point — order-preserving via unbounded channel drained by
// a single consumer coroutine. trySend on UNLIMITED never fails for capacity.
outboundChannel.trySend(packet)
}
@Suppress("TooGenericExceptionCaught", "SwallowedException")

View file

@ -28,6 +28,7 @@ import kotlinx.coroutines.withContext
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.database.DatabaseProvider
import org.meshtastic.core.database.dao.NodeInfoDao
import org.meshtastic.core.database.entity.PacketEntity
import org.meshtastic.core.database.entity.toReaction
import org.meshtastic.core.di.CoroutineDispatchers
@ -242,7 +243,10 @@ class PacketRepositoryImpl(private val dbManager: DatabaseProvider, private val
emptyMap()
} else {
withContext(dispatchers.io) {
dbManager.currentDb.value.packetDao().getPacketsByPacketIds(ids).associateBy { it.packet.packetId }
val dao = dbManager.currentDb.value.packetDao()
ids.chunked(NodeInfoDao.MAX_BIND_PARAMS)
.flatMap { dao.getPacketsByPacketIds(it) }
.associateBy { it.packet.packetId }
}
}

View file

@ -47,6 +47,7 @@ import org.meshtastic.core.repository.PacketRepository
import org.meshtastic.core.repository.PlatformAnalytics
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceBroadcasts
import org.meshtastic.core.repository.UiPrefs
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.Channel
import org.meshtastic.proto.Config
@ -68,6 +69,7 @@ class MeshActionHandlerImplTest {
private val dataHandler = mock<MeshDataHandler>(MockMode.autofill)
private val analytics = mock<PlatformAnalytics>(MockMode.autofill)
private val meshPrefs = mock<MeshPrefs>(MockMode.autofill)
private val uiPrefs = mock<UiPrefs>(MockMode.autofill)
private val databaseManager = mock<DatabaseManager>(MockMode.autofill)
private val notificationManager = mock<NotificationManager>(MockMode.autofill)
private val messageProcessor = mock<MeshMessageProcessor>(MockMode.autofill)
@ -100,6 +102,7 @@ class MeshActionHandlerImplTest {
dataHandler = lazy { dataHandler },
analytics = analytics,
meshPrefs = meshPrefs,
uiPrefs = uiPrefs,
databaseManager = databaseManager,
notificationManager = notificationManager,
messageProcessor = lazy { messageProcessor },
@ -356,7 +359,7 @@ class MeshActionHandlerImplTest {
@Test
fun handleRequestPosition_provideLocation_validPosition_usesGivenPosition() {
handler = createHandler(testScope)
every { meshPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(true)
every { uiPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(true)
val validPosition = Position(37.7749, -122.4194, 10)
handler.handleRequestPosition(REMOTE_NODE_NUM, validPosition, MY_NODE_NUM)
@ -367,7 +370,7 @@ class MeshActionHandlerImplTest {
@Test
fun handleRequestPosition_provideLocation_invalidPosition_fallsBackToNodeDB() {
handler = createHandler(testScope)
every { meshPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(true)
every { uiPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(true)
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
val invalidPosition = Position(0.0, 0.0, 0)
@ -380,7 +383,7 @@ class MeshActionHandlerImplTest {
@Test
fun handleRequestPosition_doNotProvide_sendsZeroPosition() {
handler = createHandler(testScope)
every { meshPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(false)
every { uiPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(false)
val validPosition = Position(37.7749, -122.4194, 10)
handler.handleRequestPosition(REMOTE_NODE_NUM, validPosition, MY_NODE_NUM)

View file

@ -271,6 +271,42 @@ abstract class CommonPacketDaoTest {
assertFalse(excludingFiltered.any { it.packet.filtered })
}
@Test
fun testGetPacketsByPacketIdsChunked() = runTest {
// Regression test for SQLITE_MAX_VARIABLE_NUMBER (999) limit. Inserting >999 packets and
// looking them up by id must not throw; callers are expected to chunk, and each chunk
// must return the correct rows.
val totalPackets = 2000
val chunkSize = NodeInfoDao.MAX_BIND_PARAMS
val contactKey = "chunk-test"
val baseTime = nowMillis
val packetIds = (1..totalPackets).toList()
packetIds.forEach { id ->
packetDao.insert(
Packet(
uuid = 0L,
myNodeNum = myNodeNum,
port_num = PortNum.TEXT_MESSAGE_APP.value,
contact_key = contactKey,
received_time = baseTime + id,
read = false,
data =
DataPacket(
to = DataPacket.ID_BROADCAST,
bytes = "Chunk $id".encodeToByteArray().toByteString(),
dataType = PortNum.TEXT_MESSAGE_APP.value,
),
packetId = id,
),
)
}
val fetched = packetIds.chunked(chunkSize).flatMap { packetDao.getPacketsByPacketIds(it) }
assertEquals(totalPackets, fetched.size)
assertEquals(packetIds.toSet(), fetched.map { it.packet.packetId }.toSet())
}
companion object {
private const val SAMPLE_SIZE = 10
}

View file

@ -58,6 +58,8 @@ kotlin {
implementation(libs.androidx.test.runner)
}
}
commonTest.dependencies { implementation(projects.core.testing) }
}
}

View file

@ -32,5 +32,7 @@ kotlin {
implementation(libs.jetbrains.navigation3.ui)
implementation(libs.kermit)
}
commonTest.dependencies { implementation(projects.core.testing) }
}
}

View file

@ -1,31 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.repository
import android.annotation.SuppressLint
import java.security.cert.X509Certificate
import javax.net.ssl.X509TrustManager
@SuppressLint("CustomX509TrustManager", "TrustAllX509TrustManager")
@Suppress("EmptyFunctionBlock")
class TrustAllX509TrustManager : X509TrustManager {
override fun checkClientTrusted(chain: Array<X509Certificate>?, authType: String?) {}
override fun checkServerTrusted(chain: Array<X509Certificate>?, authType: String?) {}
override fun getAcceptedIssuers(): Array<X509Certificate> = arrayOf()
}

View file

@ -22,9 +22,8 @@ import co.touchlab.kermit.Logger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
@ -37,6 +36,7 @@ import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import org.meshtastic.core.ble.BleConnection
import org.meshtastic.core.ble.BleConnectionFactory
@ -396,14 +396,14 @@ class BleRadioTransport(
}
/** Closes the connection to the device. */
override fun close() {
override suspend fun close() {
Logger.i { "[$address] Disconnecting. ${formatSessionStats()}" }
connectionScope.cancel("close() called")
// GATT cleanup must outlive scope cancellation — GlobalScope is intentional.
// SharedRadioInterfaceService cancels the scope immediately after close(), so a
// coroutine launched there may never run, leaking BluetoothGatt (causes GATT 133).
@OptIn(DelicateCoroutinesApi::class)
GlobalScope.launch {
// GATT cleanup must run under NonCancellable so a cancelled caller cannot skip it,
// which would leak BluetoothGatt and trigger status 133 on the next reconnect.
// Using withContext (not runBlocking) keeps the caller's thread free — this is
// critical when close() is invoked from the main thread during process shutdown.
withContext(NonCancellable) {
try {
withTimeoutOrNull(GATT_CLEANUP_TIMEOUT) { bleConnection.disconnect() }
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {

View file

@ -144,7 +144,7 @@ class MockRadioTransport(
}
}
override fun close() {
override suspend fun close() {
Logger.i { "Closing the mock transport" }
}

View file

@ -30,7 +30,7 @@ class NopRadioTransport(val address: String) : RadioTransport {
// No-op
}
override fun close() {
override suspend fun close() {
// No-op
}
}

View file

@ -35,7 +35,7 @@ abstract class StreamTransport(protected val callback: RadioTransportCallback, p
private val codec =
StreamFrameCodec(onPacketReceived = { callback.handleFromRadio(it) }, logTag = "StreamTransport")
override fun close() {
override suspend fun close() {
Logger.d { "Closing stream for good" }
onDeviceDisconnect(true)
}

View file

@ -74,7 +74,7 @@ open class TcpRadioTransport(
transport.start(address)
}
override fun close() {
override suspend fun close() {
Logger.d { "[$address] Closing TCP transport" }
closing = true
transport.stop()

View file

@ -154,7 +154,7 @@ private constructor(
serialPort = null
}
override fun close() {
override suspend fun close() {
Logger.d { "[$portName] Closing serial transport" }
readJob?.cancel()
readJob = null

View file

@ -19,19 +19,31 @@ package org.meshtastic.core.prefs
import kotlinx.atomicfu.AtomicRef
import kotlinx.collections.immutable.PersistentMap
internal inline fun <K, V> cachedFlow(cache: AtomicRef<PersistentMap<K, V>>, key: K, build: () -> V): V {
var resolved = cache.value[key]
if (resolved == null) {
val newValue = build()
while (resolved == null) {
val current = cache.value
val currentValue = current[key]
if (currentValue != null) {
resolved = currentValue
} else if (cache.compareAndSet(current, current.put(key, newValue))) {
resolved = newValue
}
/**
* Look up [key] in [cache]; if absent, construct a value via [build] and insert it atomically.
*
* [build] is wrapped in a [Lazy] before being published to [cache], so concurrent first-access of the same key never
* invokes [build] more than once only the winner of the CAS has its [Lazy] evaluated, and all readers share that same
* result. This matters when [build] eagerly launches a coroutine (e.g. `Flow.stateIn(scope, Eagerly, )`): the naive
* approach would leak the losing coroutine into a never-cancelled scope.
*/
@Suppress("ReturnCount")
internal inline fun <K, V> cachedFlow(
cache: AtomicRef<PersistentMap<K, Lazy<V>>>,
key: K,
crossinline build: () -> V,
): V {
cache.value[key]?.let {
return it.value
}
val newLazy = lazy(LazyThreadSafetyMode.SYNCHRONIZED) { build() }
while (true) {
val current = cache.value
current[key]?.let {
return it.value
}
if (cache.compareAndSet(current, current.put(key, newLazy))) {
return newLazy.value
}
}
return checkNotNull(resolved)
}

View file

@ -42,7 +42,7 @@ class MapConsentPrefsImpl(
) : MapConsentPrefs {
private val scope = CoroutineScope(SupervisorJob() + dispatchers.default)
private val consentFlows = atomic(persistentMapOf<Int?, StateFlow<Boolean>>())
private val consentFlows = atomic(persistentMapOf<Int?, Lazy<StateFlow<Boolean>>>())
override fun shouldReportLocation(nodeNum: Int?): StateFlow<Boolean> = cachedFlow(consentFlows, nodeNum) {
val key = booleanPreferencesKey(nodeNum.toString())

View file

@ -18,7 +18,6 @@ package org.meshtastic.core.prefs.mesh
import androidx.datastore.core.DataStore
import androidx.datastore.preferences.core.Preferences
import androidx.datastore.preferences.core.booleanPreferencesKey
import androidx.datastore.preferences.core.edit
import androidx.datastore.preferences.core.intPreferencesKey
import androidx.datastore.preferences.core.stringPreferencesKey
@ -45,8 +44,7 @@ class MeshPrefsImpl(
) : MeshPrefs {
private val scope = CoroutineScope(SupervisorJob() + dispatchers.default)
private val locationFlows = atomic(persistentMapOf<Int?, StateFlow<Boolean>>())
private val storeForwardFlows = atomic(persistentMapOf<String?, StateFlow<Int>>())
private val storeForwardFlows = atomic(persistentMapOf<String?, Lazy<StateFlow<Int>>>())
override val deviceAddress: StateFlow<String?> =
dataStore.data
@ -65,15 +63,6 @@ class MeshPrefsImpl(
}
}
override fun shouldProvideNodeLocation(nodeNum: Int?): StateFlow<Boolean> = cachedFlow(locationFlows, nodeNum) {
val key = booleanPreferencesKey(provideLocationKey(nodeNum))
dataStore.data.map { it[key] ?: false }.stateIn(scope, SharingStarted.Eagerly, false)
}
override fun setShouldProvideNodeLocation(nodeNum: Int?, provide: Boolean) {
scope.launch { dataStore.edit { prefs -> prefs[booleanPreferencesKey(provideLocationKey(nodeNum))] = provide } }
}
override fun getStoreForwardLastRequest(address: String?): StateFlow<Int> = cachedFlow(storeForwardFlows, address) {
val key = intPreferencesKey(storeForwardKey(address))
dataStore.data.map { it[key] ?: 0 }.stateIn(scope, SharingStarted.Eagerly, 0)
@ -92,8 +81,6 @@ class MeshPrefsImpl(
}
}
private fun provideLocationKey(nodeNum: Int?) = "provide-location-$nodeNum"
private fun storeForwardKey(address: String?): String = "store-forward-last-request-${normalizeAddress(address)}"
companion object {

View file

@ -46,7 +46,7 @@ class UiPrefsImpl(
private val scope = CoroutineScope(SupervisorJob() + dispatchers.default)
// Maps nodeNum to a flow for the for the "provide-location-nodeNum" pref
private val provideNodeLocationFlows = atomic(persistentMapOf<Int, StateFlow<Boolean>>())
private val provideNodeLocationFlows = atomic(persistentMapOf<Int, Lazy<StateFlow<Boolean>>>())
override val appIntroCompleted: StateFlow<Boolean> =
dataStore.data.map { it[KEY_APP_INTRO_COMPLETED] ?: false }.stateIn(scope, SharingStarted.Eagerly, false)

View file

@ -213,10 +213,6 @@ interface MeshPrefs {
fun setDeviceAddress(address: String?)
fun shouldProvideNodeLocation(nodeNum: Int?): StateFlow<Boolean>
fun setShouldProvideNodeLocation(nodeNum: Int?, provide: Boolean)
fun getStoreForwardLastRequest(address: String?): StateFlow<Int>
fun setStoreForwardLastRequest(address: String?, timestamp: Int)

View file

@ -17,6 +17,7 @@
package org.meshtastic.core.repository
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import org.meshtastic.core.model.ConnectionState
@ -68,12 +69,26 @@ interface RadioInterfaceService : RadioTransportCallback {
/** Whether we are currently using a mock transport. */
fun isMockTransport(): Boolean
/** Flow of raw data received from the radio. */
val receivedData: SharedFlow<ByteArray>
/**
* Flow of raw data received from the radio.
*
* Emissions preserve the order in which bytes arrived from the hardware this is required because the firmware
* handshake (initial config packet ordering) depends on strict FIFO delivery. Implementations MUST guarantee
* ordering; do not swap in a [SharedFlow] without preserving order.
*/
val receivedData: Flow<ByteArray>
/** Flow of radio activity events. */
val meshActivity: SharedFlow<MeshActivity>
/**
* Drains any bytes currently buffered in [receivedData] without emitting them to collectors.
*
* Callers invoke this before attaching a fresh collector after a stop/start cycle so stale bytes buffered while no
* collector was attached do not get replayed ahead of the next session's handshake.
*/
fun resetReceivedBuffer()
/** Sends a raw byte array to the radio. */
fun sendToRadio(bytes: ByteArray)

View file

@ -16,13 +16,11 @@
*/
package org.meshtastic.core.repository
import okio.Closeable
/**
* Interface for hardware transports (BLE, Serial, TCP, etc.) that handles raw byte communication. This is the
* KMP-compatible replacement for the legacy Android-specific IRadioInterface.
*/
interface RadioTransport : Closeable {
interface RadioTransport {
/** Sends a raw byte array to the radio hardware. */
fun handleSendToRadio(p: ByteArray)
@ -39,4 +37,13 @@ interface RadioTransport : Closeable {
* function can be implemented by transports to see if we are really connected.
*/
fun keepAlive() {}
/**
* Closes the connection to the device.
*
* Implementations that perform potentially-blocking teardown (e.g. BLE GATT disconnect) MUST run that work inside
* `withContext(NonCancellable)` so a cancelled caller cannot skip cleanup, leaving the underlying resource leaked.
* Callers must invoke this from a coroutine it must never be called from a blocking context (no `runBlocking`).
*/
suspend fun close()
}

View file

@ -16,13 +16,14 @@
*/
package org.meshtastic.core.repository
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertTrue
class RadioTransportTest {
@Test
fun `RadioTransport can be implemented`() {
fun `RadioTransport can be implemented`() = runTest {
var sentData: ByteArray? = null
var closed = false
var keepAliveCalled = false
@ -37,7 +38,7 @@ class RadioTransportTest {
keepAliveCalled = true
}
override fun close() {
override suspend fun close() {
closed = true
}
}

View file

@ -1261,4 +1261,8 @@
<string name="wifi_provision_ssid_placeholder">Enter or select a network</string>
<string name="wifi_provision_status_applied">WiFi configured successfully!</string>
<string name="wifi_provision_status_failed">Failed to apply WiFi configuration</string>
<string name="desktop_tray_tooltip">Meshtastic Desktop</string>
<string name="desktop_tray_show">Show Meshtastic</string>
<string name="desktop_tray_quit">Quit</string>
<string name="desktop_notification_title">Meshtastic</string>
</resources>

View file

@ -19,13 +19,15 @@ package org.meshtastic.core.service
import co.touchlab.kermit.Logger
import co.touchlab.kermit.Severity
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.koin.core.annotation.Named
import kotlinx.coroutines.isActive
import org.koin.core.annotation.Single
import org.meshtastic.core.common.database.DatabaseManager
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.MeshMessageProcessor
import org.meshtastic.core.repository.MeshRouter
@ -59,18 +61,15 @@ class MeshServiceOrchestrator(
private val takPrefs: TakPrefs,
private val databaseManager: DatabaseManager,
private val connectionManager: MeshConnectionManager,
@Named("ServiceScope") private val scope: CoroutineScope,
private val dispatchers: CoroutineDispatchers,
) {
private var serviceJob: Job? = null
private var takJob: Job? = null
/** The coroutine scope for the service. */
val serviceScope: CoroutineScope
get() = scope
// Per-start coroutine scope. A fresh scope is created on each start() and cancelled on stop(), so all collectors
// launched from start() are torn down cleanly and do not accumulate across start/stop/start cycles.
private var scope: CoroutineScope? = null
/** Whether the orchestrator is currently running. */
val isRunning: Boolean
get() = serviceJob?.isActive == true
get() = scope?.isActive == true
/**
* Starts the mesh service components and wires up data flows.
@ -85,27 +84,31 @@ class MeshServiceOrchestrator(
}
Logger.i { "Starting mesh service orchestrator" }
val job = Job()
serviceJob = job
val newScope = CoroutineScope(SupervisorJob() + dispatchers.default)
scope = newScope
// Drop any bytes that piled up in the service's receivedData channel since the last stop(). The channel
// outlives the orchestrator's per-start scope, so without this drain a stop/start cycle would replay stale
// packets ahead of the fresh session's firmware handshake.
radioInterfaceService.resetReceivedBuffer()
serviceNotifications.initChannels()
connectionManager.updateStatusNotification()
// Observe TAK server pref to start/stop
takJob =
takPrefs.isTakServerEnabled
.onEach { isEnabled ->
if (isEnabled && !takServerManager.isRunning.value) {
Logger.i { "TAK Server enabled by preference, starting integration" }
takMeshIntegration.start(scope)
} else if (!isEnabled && takServerManager.isRunning.value) {
Logger.i { "TAK Server disabled by preference, stopping integration" }
takMeshIntegration.stop()
}
takPrefs.isTakServerEnabled
.onEach { isEnabled ->
if (isEnabled && !takServerManager.isRunning.value) {
Logger.i { "TAK Server enabled by preference, starting integration" }
takMeshIntegration.start(newScope)
} else if (!isEnabled && takServerManager.isRunning.value) {
Logger.i { "TAK Server disabled by preference, stopping integration" }
takMeshIntegration.stop()
}
.launchIn(scope)
}
.launchIn(newScope)
scope.handledLaunch {
newScope.handledLaunch {
// Ensure the per-device database is active before the radio connects.
// On Android this is handled by MeshUtilApplication.init(); on Desktop (and any
// future KMP host) the orchestrator is the first entry point, so it must initialize
@ -119,18 +122,18 @@ class MeshServiceOrchestrator(
radioInterfaceService.receivedData
.onEach { bytes -> messageProcessor.handleFromRadio(bytes, nodeManager.myNodeNum.value) }
.launchIn(scope)
.launchIn(newScope)
radioInterfaceService.connectionError
.onEach { errorMessage -> serviceRepository.setErrorMessage(errorMessage, Severity.Warn) }
.launchIn(scope)
.launchIn(newScope)
// Each action is dispatched in its own supervised coroutine so that a failure in one
// action (e.g. a timeout in sendAdminAwait) cannot terminate the collector and silently
// drop all subsequent service actions for the rest of the session.
serviceRepository.serviceAction
.onEach { action -> scope.handledLaunch { router.actionHandler.onServiceAction(action) } }
.launchIn(scope)
.onEach { action -> newScope.handledLaunch { router.actionHandler.onServiceAction(action) } }
.launchIn(newScope)
nodeManager.loadCachedNodeDB()
}
@ -142,13 +145,11 @@ class MeshServiceOrchestrator(
*/
fun stop() {
Logger.i { "Stopping mesh service orchestrator" }
takJob?.cancel()
takJob = null
// Guard stop() so we don't emit a spurious "stopped" log when TAK was never started
if (takServerManager.isRunning.value) {
takMeshIntegration.stop()
}
serviceJob?.cancel()
serviceJob = null
scope?.cancel()
scope = null
}
}

View file

@ -25,7 +25,9 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
@ -35,6 +37,7 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@ -42,7 +45,7 @@ import org.koin.core.annotation.Named
import org.koin.core.annotation.Single
import org.meshtastic.core.ble.BluetoothRepository
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ignoreException
import org.meshtastic.core.common.util.ignoreExceptionSuspend
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.ConnectionState
@ -95,8 +98,13 @@ class SharedRadioInterfaceService(
private val _currentDeviceAddressFlow = MutableStateFlow<String?>(radioPrefs.devAddr.value)
override val currentDeviceAddressFlow: StateFlow<String?> = _currentDeviceAddressFlow.asStateFlow()
private val _receivedData = MutableSharedFlow<ByteArray>(extraBufferCapacity = 64)
override val receivedData: SharedFlow<ByteArray> = _receivedData
// Unbounded Channel preserves strict FIFO delivery of incoming radio bytes, which the
// firmware handshake depends on (initial config packet ordering). A SharedFlow with
// `launch { emit() }` per packet reorders under concurrent dispatch and breaks config load.
// trySend on an UNLIMITED channel never suspends and never drops, so handleFromRadio can
// remain a non-suspend synchronous callback.
private val _receivedData = Channel<ByteArray>(Channel.UNLIMITED)
override val receivedData: Flow<ByteArray> = _receivedData.receiveAsFlow()
private val _meshActivity =
MutableSharedFlow<MeshActivity>(extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)
@ -148,6 +156,7 @@ class SharedRadioInterfaceService(
}
}
}
.catch { Logger.e(it) { "devAddr flow crashed" } }
.launchIn(processLifecycle.coroutineScope)
bluetoothRepository.state
@ -216,7 +225,7 @@ class SharedRadioInterfaceService(
processLifecycle.coroutineScope.launch {
transportMutex.withLock {
ignoreException { stopTransportLocked() }
ignoreExceptionSuspend { stopTransportLocked() }
startTransportLocked()
}
}
@ -245,7 +254,7 @@ class SharedRadioInterfaceService(
}
/** Must be called under [transportMutex]. */
private fun stopTransportLocked() {
private suspend fun stopTransportLocked() {
val currentTransport = radioTransport
Logger.i { "Stopping transport $currentTransport" }
isStarted = false
@ -322,13 +331,28 @@ class SharedRadioInterfaceService(
override fun handleFromRadio(bytes: ByteArray) {
try {
lastDataReceivedMillis = nowMillis
processLifecycle.coroutineScope.launch(dispatchers.io) { _receivedData.emit(bytes) }
// trySend synchronously onto the unbounded Channel so packet order matches arrival
// order. The previous `launch { emit() }` pattern dispatched each packet onto a
// fresh coroutine, letting the scheduler reorder them — which broke the firmware
// config handshake (see PhoneAPI.cpp initial-handshake sequence).
val result = _receivedData.trySend(bytes)
if (result.isFailure) {
Logger.e(result.exceptionOrNull()) { "Failed to enqueue ${bytes.size} received bytes; dropping packet" }
}
_meshActivity.tryEmit(MeshActivity.Receive)
} catch (t: Throwable) {
Logger.e(t) { "handleFromRadio failed while emitting data" }
}
}
override fun resetReceivedBuffer() {
// Drain any bytes buffered while no collector was attached. Without this, a stop/start cycle
// would replay stale bytes ahead of the next session's firmware handshake, since the channel
// outlives the orchestrator's per-start scope.
@Suppress("EmptyWhileBlock", "ControlFlowWithEmptyBody")
while (_receivedData.tryReceive().isSuccess) Unit
}
override fun onConnect() {
// MutableStateFlow.value is thread-safe (backed by atomics) — assign directly rather than
// launching a coroutine. The async launch pattern introduced a window where a concurrent

View file

@ -23,13 +23,15 @@ import dev.mokkery.every
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import dev.mokkery.verify.VerifyMode.Companion.atLeast
import dev.mokkery.verify.VerifyMode.Companion.exactly
import dev.mokkery.verifySuspend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import org.meshtastic.core.common.database.DatabaseManager
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.Node
import org.meshtastic.core.model.service.ServiceAction
import org.meshtastic.core.repository.CommandSender
@ -70,8 +72,11 @@ class MeshServiceOrchestratorTest {
private val databaseManager: DatabaseManager = mock(MockMode.autofill)
private val connectionManager: MeshConnectionManager = mock(MockMode.autofill)
@OptIn(ExperimentalCoroutinesApi::class)
private val testDispatcher = UnconfinedTestDispatcher()
private val testScope = CoroutineScope(testDispatcher)
@OptIn(ExperimentalCoroutinesApi::class)
private val dispatchers = CoroutineDispatchers(io = testDispatcher, main = testDispatcher, default = testDispatcher)
/** Stubs the shared flow dependencies used by every test and returns an orchestrator. */
private fun createOrchestrator(
@ -114,7 +119,7 @@ class MeshServiceOrchestratorTest {
takPrefs = takPrefs,
databaseManager = databaseManager,
connectionManager = connectionManager,
scope = testScope,
dispatchers = dispatchers,
)
}
@ -217,4 +222,79 @@ class MeshServiceOrchestratorTest {
orchestrator.stop()
assertFalse(orchestrator.isRunning)
}
/**
* Regression test for a bug where `stop()` did not actually tear down the FromRadio collectors. Collectors were
* attached to an injected process-wide ServiceScope rather than a per-start scope, so `start() -> stop() ->
* start()` caused duplicate collectors and every FromRadio packet was handled 2x (then 3x, etc.).
*/
@Test
fun testFromRadioCollectorsTornDownOnStopAndRestartedCleanlyOnStart() {
val receivedData = MutableSharedFlow<ByteArray>(extraBufferCapacity = 8)
val orchestrator = createOrchestrator(receivedData = receivedData)
every { nodeManager.myNodeNum } returns MutableStateFlow(null)
orchestrator.start()
val packet1 = byteArrayOf(1, 2, 3)
receivedData.tryEmit(packet1)
verifySuspend(exactly(1)) { messageProcessor.handleFromRadio(packet1, null) }
orchestrator.stop()
val packet2 = byteArrayOf(4, 5, 6)
receivedData.tryEmit(packet2)
// After stop(), the collector must be gone - the handler should not be invoked for packet2.
verifySuspend(exactly(0)) { messageProcessor.handleFromRadio(packet2, null) }
orchestrator.start()
val packet3 = byteArrayOf(7, 8, 9)
receivedData.tryEmit(packet3)
// After restart, a single fresh collector must process packet3 exactly once (not twice).
verifySuspend(exactly(1)) { messageProcessor.handleFromRadio(packet3, null) }
orchestrator.stop()
}
/**
* Regression test for a channel-buffer-replay bug: the production [RadioInterfaceService] buffers inbound bytes in
* a process-lifetime `Channel(UNLIMITED)`. Between `stop()` and the next `start()`, any bytes that arrive sit in
* the channel and would be replayed to the fresh collector prepending stale packets to the next session's
* firmware handshake. `start()` must call [RadioInterfaceService.resetReceivedBuffer] before attaching the
* collector.
*/
@Test
fun testStartDrainsReceivedBufferBeforeAttachingCollector() {
val orchestrator = createOrchestrator()
every { nodeManager.myNodeNum } returns MutableStateFlow(null)
orchestrator.start()
orchestrator.stop()
orchestrator.start()
// resetReceivedBuffer must be invoked at least once per start() (twice total for two starts).
verify(atLeast(2)) { radioInterfaceService.resetReceivedBuffer() }
orchestrator.stop()
}
/** Additional regression: after many start/stop cycles, collectors must not accumulate. */
@Test
fun testRepeatedStartStopDoesNotAccumulateCollectors() {
val receivedData = MutableSharedFlow<ByteArray>(extraBufferCapacity = 8)
val orchestrator = createOrchestrator(receivedData = receivedData)
every { nodeManager.myNodeNum } returns MutableStateFlow(null)
repeat(5) {
orchestrator.start()
orchestrator.stop()
}
orchestrator.start()
val packet = byteArrayOf(42)
receivedData.tryEmit(packet)
// Despite six total start() calls, only the most recent collector is live.
verifySuspend(exactly(1)) { messageProcessor.handleFromRadio(packet, null) }
orchestrator.stop()
}
}

View file

@ -18,12 +18,15 @@ package org.meshtastic.core.takserver
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@ -58,6 +61,12 @@ class TAKServerManagerImpl(private val takServer: TAKServer) : TAKServerManager
private val _inboundMessages = MutableSharedFlow<CoTMessage>()
override val inboundMessages: SharedFlow<CoTMessage> = _inboundMessages.asSharedFlow()
// Unbounded channel preserves FIFO ordering of inbound CoT messages under load.
// onMessage is a non-suspend callback, so we trySend (always succeeds for UNLIMITED)
// and a single consumer coroutine drains into _inboundMessages in order.
private var inboundChannel: Channel<CoTMessage>? = null
private var inboundDrainJob: Job? = null
private var lastBroadcastPositions = mutableMapOf<Int, Int>()
override fun start(scope: CoroutineScope) {
@ -68,8 +77,11 @@ class TAKServerManagerImpl(private val takServer: TAKServer) : TAKServerManager
}
scope.launch {
// Wire up inbound message handler BEFORE starting so no messages are lost
takServer.onMessage = { cotMessage -> scope.launch { _inboundMessages.emit(cotMessage) } }
// Wire up inbound message handler BEFORE starting so no messages are lost.
val channel = Channel<CoTMessage>(Channel.UNLIMITED)
inboundChannel = channel
inboundDrainJob = scope.launch { channel.consumeAsFlow().collect { _inboundMessages.emit(it) } }
takServer.onMessage = { cotMessage -> channel.trySend(cotMessage) }
val result = takServer.start(scope)
if (result.isSuccess) {
@ -79,6 +91,10 @@ class TAKServerManagerImpl(private val takServer: TAKServer) : TAKServerManager
Logger.e(result.exceptionOrNull()) { "Failed to start TAK Server" }
// Clear onMessage if start failed so we don't hold a reference unnecessarily
takServer.onMessage = null
inboundDrainJob?.cancel()
inboundDrainJob = null
channel.close()
inboundChannel = null
}
}
}
@ -86,6 +102,10 @@ class TAKServerManagerImpl(private val takServer: TAKServer) : TAKServerManager
override fun stop() {
takServer.stop()
takServer.onMessage = null
inboundChannel?.close()
inboundChannel = null
inboundDrainJob?.cancel()
inboundDrainJob = null
_isRunning.value = false
scope = null
Logger.i { "TAK Server stopped" }

View file

@ -32,8 +32,8 @@ kotlin {
// Heavy modules (database, data, domain) should depend on core:testing, not vice versa.
api(projects.core.model)
api(projects.core.repository)
api(projects.core.database)
api(projects.core.ble)
implementation(projects.core.database)
implementation(projects.core.ble)
implementation(projects.core.datastore)
implementation(libs.androidx.room.runtime)
api(libs.kermit)

View file

@ -237,15 +237,6 @@ class FakeMeshPrefs : MeshPrefs {
deviceAddress.value = address
}
private val provideLocation = mutableMapOf<Int?, MutableStateFlow<Boolean>>()
override fun shouldProvideNodeLocation(nodeNum: Int?): StateFlow<Boolean> =
provideLocation.getOrPut(nodeNum) { MutableStateFlow(true) }
override fun setShouldProvideNodeLocation(nodeNum: Int?, provide: Boolean) {
provideLocation.getOrPut(nodeNum) { MutableStateFlow(provide) }.value = provide
}
private val lastRequest = mutableMapOf<String?, MutableStateFlow<Int>>()
override fun getStoreForwardLastRequest(address: String?): StateFlow<Int> =

View file

@ -18,10 +18,13 @@ package org.meshtastic.core.testing
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.receiveAsFlow
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.DeviceType
import org.meshtastic.core.model.InterfaceId
@ -48,8 +51,10 @@ class FakeRadioInterfaceService(override val serviceScope: CoroutineScope = Main
private val _currentDeviceAddressFlow = MutableStateFlow<String?>(null)
override val currentDeviceAddressFlow: StateFlow<String?> = _currentDeviceAddressFlow
private val _receivedData = MutableSharedFlow<ByteArray>()
override val receivedData: SharedFlow<ByteArray> = _receivedData
// Use an unbounded Channel to mirror SharedRadioInterfaceService semantics. A MutableSharedFlow would
// hide the stop/start backlog bug that motivated the resetReceivedBuffer() API.
private val _receivedData = Channel<ByteArray>(Channel.UNLIMITED)
override val receivedData: Flow<ByteArray> = _receivedData.receiveAsFlow()
private val _meshActivity = MutableSharedFlow<MeshActivity>()
override val meshActivity: SharedFlow<MeshActivity> = _meshActivity
@ -88,13 +93,18 @@ class FakeRadioInterfaceService(override val serviceScope: CoroutineScope = Main
}
override fun handleFromRadio(bytes: ByteArray) {
// In a real implementation, this would emit to receivedData
_receivedData.trySend(bytes)
}
override fun resetReceivedBuffer() {
@Suppress("EmptyWhileBlock", "ControlFlowWithEmptyBody")
while (_receivedData.tryReceive().isSuccess) Unit
}
// --- Helper methods for testing ---
suspend fun emitFromRadio(bytes: ByteArray) {
_receivedData.emit(bytes)
fun emitFromRadio(bytes: ByteArray) {
_receivedData.trySend(bytes)
}
fun setConnectionState(state: ConnectionState) {

View file

@ -32,7 +32,7 @@ class FakeRadioTransport : RadioTransport {
keepAliveCalled = true
}
override fun close() {
override suspend fun close() {
closeCalled = true
}
}