fix: harden reliability, clean up KMP compliance, and improve code quality (#5023)

This commit is contained in:
James Rich 2026-04-09 13:21:46 -05:00 committed by GitHub
parent 537029a71c
commit 14b381c1eb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
53 changed files with 370 additions and 409 deletions

View file

@ -26,6 +26,7 @@ import org.koin.core.annotation.Single
class CoreNetworkModule {
@Single
fun provideJson(): Json = Json {
isLenient = true
ignoreUnknownKeys = true
coerceInputValues = true
}

View file

@ -433,7 +433,7 @@ class BleRadioInterface(
}
}
private var radioService: org.meshtastic.core.ble.MeshtasticRadioProfile? = null
@Volatile private var radioService: org.meshtastic.core.ble.MeshtasticRadioProfile? = null
// --- RadioTransport Implementation ---

View file

@ -17,7 +17,7 @@
package org.meshtastic.core.network.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.launch
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.network.transport.StreamFrameCodec
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.core.repository.RadioTransport
@ -64,7 +64,7 @@ abstract class StreamInterface(protected val service: RadioInterfaceService) : R
override fun handleSendToRadio(p: ByteArray) {
// This method is called from a continuation and it might show up late, so check for uart being null
service.serviceScope.launch { codec.frameAndSend(p, ::sendBytes, ::flushBytes) }
service.serviceScope.handledLaunch { codec.frameAndSend(p, ::sendBytes, ::flushBytes) }
}
/** Process a single incoming byte through the stream framing state machine. */

View file

@ -21,12 +21,14 @@ import io.github.davidepianca98.MQTTClient
import io.github.davidepianca98.mqtt.MQTTVersion
import io.github.davidepianca98.mqtt.Subscription
import io.github.davidepianca98.mqtt.packets.Qos
import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode
import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions
import io.github.davidepianca98.socket.tls.TLSClientSettings
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.first
@ -54,6 +56,9 @@ class MQTTRepositoryImpl(
private const val DEFAULT_TOPIC_LEVEL = "/2/e/"
private const val JSON_TOPIC_LEVEL = "/2/json/"
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
private const val INITIAL_RECONNECT_DELAY_MS = 1000L
private const val MAX_RECONNECT_DELAY_MS = 30_000L
private const val RECONNECT_BACKOFF_MULTIPLIER = 2
}
private var client: MQTTClient? = null
@ -62,8 +67,14 @@ class MQTTRepositoryImpl(
private var clientJob: Job? = null
private val publishSemaphore = Semaphore(20)
@Suppress("TooGenericExceptionCaught")
override fun disconnect() {
Logger.i { "MQTT Disconnecting" }
try {
client?.disconnect(ReasonCode.SUCCESS)
} catch (e: Exception) {
Logger.w(e) { "MQTT clean disconnect failed" }
}
clientJob?.cancel()
clientJob = null
client = null
@ -123,23 +134,39 @@ class MQTTRepositoryImpl(
client = newClient
clientJob = scope.launch {
try {
Logger.i { "MQTT Starting client loop for $host:$port" }
newClient.runSuspend()
} catch (e: io.github.davidepianca98.mqtt.MQTTException) {
Logger.e(e) { "MQTT Client loop error (MQTT)" }
close(e)
} catch (e: io.github.davidepianca98.socket.IOException) {
Logger.e(e) { "MQTT Client loop error (IO)" }
close(e)
} catch (e: kotlinx.coroutines.CancellationException) {
Logger.i { "MQTT Client loop cancelled" }
throw e
clientJob =
scope.launch {
var reconnectDelay = INITIAL_RECONNECT_DELAY_MS
while (true) {
try {
Logger.i { "MQTT Starting client loop for $host:$port" }
// Reset backoff on each successful connection establishment. If the broker
// disconnects cleanly after hours of operation, the next reconnect should
// start with the minimum delay rather than whatever was accumulated.
reconnectDelay = INITIAL_RECONNECT_DELAY_MS
newClient.runSuspend()
// runSuspend returned normally — broker closed connection. Retry.
Logger.w { "MQTT client loop ended normally, reconnecting in ${reconnectDelay}ms" }
} catch (e: io.github.davidepianca98.mqtt.MQTTException) {
Logger.e(e) { "MQTT Client loop error (MQTT), reconnecting in ${reconnectDelay}ms" }
} catch (e: io.github.davidepianca98.socket.IOException) {
Logger.e(e) { "MQTT Client loop error (IO), reconnecting in ${reconnectDelay}ms" }
} catch (e: kotlinx.coroutines.CancellationException) {
Logger.i { "MQTT Client loop cancelled" }
throw e
}
delay(reconnectDelay)
reconnectDelay =
(reconnectDelay * RECONNECT_BACKOFF_MULTIPLIER).coerceAtMost(MAX_RECONNECT_DELAY_MS)
}
}
}
// Subscriptions
// Subscriptions: placed after runSuspend is launched and has had time to establish
// the TCP connection. KMQTT's subscribe() queues internally, but subscribing before
// the connection is ready may silently drop subscriptions depending on the version.
// A brief yield gives runSuspend() time to connect before we subscribe.
kotlinx.coroutines.yield()
val subscriptions = mutableListOf<Subscription>()
channelSet.subscribeList.forEach { globalId ->
subscriptions.add(

View file

@ -23,13 +23,22 @@ import org.koin.core.annotation.Single
import org.meshtastic.core.model.NetworkDeviceHardware
import org.meshtastic.core.model.NetworkFirmwareReleases
/** Client for the Meshtastic public API (device hardware catalog and firmware releases). */
interface ApiService {
/** Fetches the device hardware catalog from the Meshtastic API. */
suspend fun getDeviceHardware(): List<NetworkDeviceHardware>
/** Fetches the list of available firmware releases from the Meshtastic API. */
suspend fun getFirmwareReleases(): NetworkFirmwareReleases
}
@Single
/**
* Ktor-based [ApiService] implementation.
*
* Registered with `binds = []` to prevent Koin from auto-binding to [ApiService]; host modules (`app`, `desktop`)
* provide their own explicit `ApiService` binding to allow platform-specific `HttpClient` engines.
*/
@Single(binds = [])
class ApiServiceImpl(private val client: HttpClient) : ApiService {
override suspend fun getDeviceHardware(): List<NetworkDeviceHardware> =
client.get("https://api.meshtastic.org/resource/deviceHardware").body()

View file

@ -99,7 +99,10 @@ class TcpTransport(
/** Whether the transport is currently connected. */
val isConnected: Boolean
get() = socket?.isConnected == true && !socket!!.isClosed
get() {
val s = socket ?: return false
return s.isConnected && !s.isClosed
}
/**
* Start a TCP connection to the given address with automatic reconnect.
@ -127,6 +130,8 @@ class TcpTransport(
*/
suspend fun sendPacket(payload: ByteArray) {
codec.frameAndSend(payload = payload, sendBytes = ::sendBytesRaw, flush = ::flushBytes)
packetsSent++
bytesSent += payload.size
}
/** Send a heartbeat packet to keep the connection alive. */
@ -283,8 +288,6 @@ class TcpTransport(
Logger.w { "$logTag: [$currentAddress] Cannot send ${p.size} bytes: not connected" }
return
}
packetsSent++
bytesSent += p.size
try {
stream.write(p)
} catch (ex: IOException) {

View file

@ -19,10 +19,10 @@ package org.meshtastic.core.network
import co.touchlab.kermit.Logger
import com.fazecast.jSerialComm.SerialPort
import com.fazecast.jSerialComm.SerialPortTimeoutException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.network.radio.StreamInterface
import org.meshtastic.core.repository.RadioInterfaceService
import org.meshtastic.proto.Heartbeat
@ -41,6 +41,7 @@ private constructor(
private val portName: String,
private val baudRate: Int = DEFAULT_BAUD_RATE,
service: RadioInterfaceService,
private val dispatchers: CoroutineDispatchers,
) : StreamInterface(service) {
private var serialPort: SerialPort? = null
private var readJob: Job? = null
@ -73,7 +74,7 @@ private constructor(
private fun startReadLoop(port: SerialPort) {
Logger.d { "[$portName] Starting serial read loop" }
readJob =
service.serviceScope.launch(Dispatchers.IO) {
service.serviceScope.launch(dispatchers.io) {
val input = port.inputStream
val buffer = ByteArray(READ_BUFFER_SIZE)
try {
@ -169,8 +170,13 @@ private constructor(
* Creates and opens a [SerialTransport]. If the port cannot be opened, the transport signals a permanent
* disconnect to the [service] and returns the (non-connected) instance.
*/
fun open(portName: String, baudRate: Int = DEFAULT_BAUD_RATE, service: RadioInterfaceService): SerialTransport {
val transport = SerialTransport(portName, baudRate, service)
fun open(
portName: String,
baudRate: Int = DEFAULT_BAUD_RATE,
service: RadioInterfaceService,
dispatchers: CoroutineDispatchers,
): SerialTransport {
val transport = SerialTransport(portName, baudRate, service, dispatchers)
if (!transport.startConnection()) {
val errorMessage = diagnoseOpenFailure(portName)
Logger.w { "[$portName] Serial port could not be opened; signalling disconnect. $errorMessage" }