feat(mqtt): migrate to MQTTastic-Client-KMP (#5165)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: jamesarich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2026-04-17 10:19:08 -05:00 committed by GitHub
parent b828a1271c
commit 305a487dd7
12 changed files with 271 additions and 131 deletions

View file

@ -17,6 +17,8 @@
package org.meshtastic.core.network.repository
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow
import org.meshtastic.mqtt.ConnectionState
import org.meshtastic.proto.MqttClientProxyMessage
/** Interface defining the MQTT interactions used for proxying messages to and from the mesh. */
@ -38,4 +40,7 @@ interface MQTTRepository {
* @param retained Whether the message should be retained by the broker.
*/
fun publish(topic: String, data: ByteArray, retained: Boolean)
/** Observable MQTT connection lifecycle state (DISCONNECTED → CONNECTING → CONNECTED → RECONNECTING). */
val connectionState: StateFlow<ConnectionState>
}

View file

@ -17,22 +17,15 @@
package org.meshtastic.core.network.repository
import co.touchlab.kermit.Logger
import io.github.davidepianca98.MQTTClient
import io.github.davidepianca98.mqtt.MQTTException
import io.github.davidepianca98.mqtt.MQTTVersion
import io.github.davidepianca98.mqtt.Subscription
import io.github.davidepianca98.mqtt.packets.Qos
import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode
import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions
import io.github.davidepianca98.socket.IOException
import io.github.davidepianca98.socket.tls.TLSClientSettings
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
@ -44,11 +37,19 @@ import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonDecodingException
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.safeCatching
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.MqttJsonPayload
import org.meshtastic.core.model.util.subscribeList
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.mqtt.ConnectionState
import org.meshtastic.mqtt.MqttClient
import org.meshtastic.mqtt.MqttEndpoint
import org.meshtastic.mqtt.MqttException
import org.meshtastic.mqtt.MqttMessage
import org.meshtastic.mqtt.QoS
import org.meshtastic.mqtt.packet.Subscription
import org.meshtastic.proto.MqttClientProxyMessage
import kotlin.concurrent.Volatile
@ -64,12 +65,17 @@ class MQTTRepositoryImpl(
private const val DEFAULT_TOPIC_LEVEL = "/2/e/"
private const val JSON_TOPIC_LEVEL = "/2/json/"
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
private const val WEBSOCKET_PATH = "/mqtt"
private const val KEEPALIVE_SECONDS = 30
private const val INITIAL_RECONNECT_DELAY_MS = 1000L
private const val MAX_RECONNECT_DELAY_MS = 30_000L
private const val RECONNECT_BACKOFF_MULTIPLIER = 2
}
@Volatile private var client: MQTTClient? = null
@Volatile private var client: MqttClient? = null
private val _connectionState = MutableStateFlow(ConnectionState.DISCONNECTED)
override val connectionState: StateFlow<ConnectionState> = _connectionState.asStateFlow()
@OptIn(ExperimentalSerializationApi::class)
private val json = Json {
@ -77,25 +83,17 @@ class MQTTRepositoryImpl(
exceptionsWithDebugInfo = false
}
private val scope = CoroutineScope(dispatchers.default + SupervisorJob())
@Volatile private var clientJob: Job? = null
private val publishSemaphore = Semaphore(20)
@Suppress("TooGenericExceptionCaught")
override fun disconnect() {
Logger.i { "MQTT Disconnecting" }
val c = client
client = null // Null first to prevent re-entrant disconnect
try {
c?.disconnect(ReasonCode.SUCCESS)
} catch (e: Exception) {
Logger.w(e) { "MQTT clean disconnect failed" }
}
clientJob?.cancel()
clientJob = null
client = null
_connectionState.value = ConnectionState.DISCONNECTED
scope.launch { safeCatching { c?.close() }.onFailure { e -> Logger.w(e) { "MQTT clean disconnect failed" } } }
}
@OptIn(ExperimentalUnsignedTypes::class)
@OptIn(ExperimentalSerializationApi::class)
override val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}"
val channelSet = radioConfigRepository.channelSetFlow.first()
@ -103,108 +101,112 @@ class MQTTRepositoryImpl(
val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT } ?: DEFAULT_TOPIC_ROOT
val (host, port) =
(mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let {
it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883)
val rawAddress = mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS
val endpoint =
if (rawAddress.contains("://")) {
MqttEndpoint.parse(rawAddress)
} else {
// Use WebSocket transport on all platforms for firewall/CDN compatibility.
val scheme = if (mqttConfig?.tls_enabled == true) "wss" else "ws"
MqttEndpoint.parse("$scheme://$rawAddress$WEBSOCKET_PATH")
}
val newClient =
MQTTClient(
mqttVersion = MQTTVersion.MQTT5,
address = host,
port = port,
tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null,
userName = mqttConfig?.username,
password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(),
clientId = ownerId,
publishReceived = { packet ->
val topic = packet.topicName
val payload = packet.payload?.toByteArray()
Logger.d { "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" }
if (topic.contains("/json/")) {
try {
val jsonStr = payload?.decodeToString() ?: ""
// Validate JSON by parsing it
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }
trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain))
} catch (e: JsonDecodingException) {
@OptIn(ExperimentalSerializationApi::class)
Logger.e(e) { "Failed to parse MQTT JSON: ${e.shortMessage} (path: ${e.path})" }
} catch (e: SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
}
} else {
trySend(
MqttClientProxyMessage(
topic = topic,
data_ = payload?.toByteString() ?: okio.ByteString.EMPTY,
retained = packet.retain,
),
)
}
},
)
MqttClient(ownerId) {
keepAliveSeconds = KEEPALIVE_SECONDS
autoReconnect = true
username = mqttConfig?.username
mqttConfig?.password?.let { password(it) }
}
client = newClient
// Subscribe before starting the event loop. KMQTT's subscribe() calls send(),
// which queues the SUBSCRIBE packet in pendingSendMessages while connackReceived
// is false. Once the event loop receives CONNACK, it flushes the queue — so
// subscriptions are guaranteed to be sent immediately after the connection is
// established, with no timing races. This replaces a previous yield()-based
// approach that was unreliable on lightly loaded dispatchers.
val subscriptions = mutableListOf<Subscription>()
channelSet.subscribeList.forEach { globalId ->
subscriptions.add(
Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
)
if (mqttConfig?.json_enabled == true) {
subscriptions.add(
Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
val subscriptions: List<Subscription> = buildList {
channelSet.subscribeList.forEach { globalId ->
add(
Subscription(
"$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+",
maxQos = QoS.AT_LEAST_ONCE,
noLocal = true,
),
)
}
}
subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)))
if (subscriptions.isNotEmpty()) {
Logger.d { "MQTT subscribing to ${subscriptions.size} topics" }
newClient.subscribe(subscriptions)
}
clientJob =
scope.launch {
var reconnectDelay = INITIAL_RECONNECT_DELAY_MS
while (true) {
try {
Logger.i { "MQTT Starting client loop for $host:$port" }
newClient.runSuspend()
// runSuspend returned normally — broker closed connection cleanly.
// Reset backoff so the next reconnect starts with the minimum delay.
reconnectDelay = INITIAL_RECONNECT_DELAY_MS
Logger.w { "MQTT client loop ended normally, reconnecting in ${reconnectDelay}ms" }
} catch (e: MQTTException) {
Logger.e(e) { "MQTT Client loop error (MQTT), reconnecting in ${reconnectDelay}ms" }
} catch (e: IOException) {
Logger.e(e) { "MQTT Client loop error (IO), reconnecting in ${reconnectDelay}ms" }
} catch (e: CancellationException) {
Logger.i { "MQTT Client loop cancelled" }
throw e
}
delay(reconnectDelay)
reconnectDelay =
(reconnectDelay * RECONNECT_BACKOFF_MULTIPLIER).coerceAtMost(MAX_RECONNECT_DELAY_MS)
if (mqttConfig?.json_enabled == true) {
add(
Subscription(
"$rootTopic$JSON_TOPIC_LEVEL$globalId/+",
maxQos = QoS.AT_LEAST_ONCE,
noLocal = true,
),
)
}
}
add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", maxQos = QoS.AT_LEAST_ONCE, noLocal = true))
}
// Collect from the SharedFlow before connecting to avoid missing retained messages
// that arrive immediately after SUBSCRIBE.
launch { newClient.messages.collect { msg -> processMessage(msg) } }
// Forward the client's connection state to the repo-level StateFlow for UI observation.
launch { newClient.connectionState.collect { _connectionState.value = it } }
// Retry the initial connect with exponential backoff. Once established,
// autoReconnect handles subsequent drops and re-subscribes internally.
launch {
var reconnectDelay = INITIAL_RECONNECT_DELAY_MS
while (true) {
val result = safeCatching {
Logger.i { "MQTT Connecting to $endpoint" }
newClient.connect(endpoint)
if (subscriptions.isNotEmpty()) {
Logger.d { "MQTT subscribing to ${subscriptions.size} topics" }
newClient.subscribe(subscriptions)
}
Logger.i { "MQTT connected and subscribed" }
}
when {
result.isSuccess -> return@launch
result.exceptionOrNull() is MqttException.ConnectionRejected -> {
Logger.e(result.exceptionOrNull()) { "MQTT connection rejected (unrecoverable), stopping" }
close(result.exceptionOrNull()!!)
return@launch
}
else -> {
Logger.e(result.exceptionOrNull()) { "MQTT connect failed, retrying in ${reconnectDelay}ms" }
delay(reconnectDelay)
reconnectDelay =
(reconnectDelay * RECONNECT_BACKOFF_MULTIPLIER).coerceAtMost(MAX_RECONNECT_DELAY_MS)
}
}
}
}
awaitClose { disconnect() }
}
@OptIn(ExperimentalUnsignedTypes::class)
@OptIn(ExperimentalSerializationApi::class)
private fun ProducerScope<MqttClientProxyMessage>.processMessage(msg: MqttMessage) {
val topic = msg.topic
val payload = msg.payload.toByteArray()
Logger.d { "MQTT received message on topic $topic (size: ${payload.size} bytes)" }
if (topic.contains("/json/")) {
try {
val jsonStr = payload.decodeToString()
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }
trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = msg.retain))
} catch (e: JsonDecodingException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.shortMessage} (path: ${e.path})" }
} catch (e: SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
}
} else {
trySend(MqttClientProxyMessage(topic = topic, data_ = payload.toByteString(), retained = msg.retain))
}
}
override fun publish(topic: String, data: ByteArray, retained: Boolean) {
val currentClient = client
if (currentClient == null) {
@ -214,17 +216,12 @@ class MQTTRepositoryImpl(
Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" }
scope.launch {
publishSemaphore.withPermit {
@Suppress("TooGenericExceptionCaught")
try {
safeCatching {
currentClient.publish(
retain = retained,
qos = Qos.AT_LEAST_ONCE,
topic = topic,
payload = data.toUByteArray(),
MqttMessage(topic = topic, payload = data, qos = QoS.AT_LEAST_ONCE, retain = retained),
)
} catch (e: Exception) {
Logger.w(e) { "MQTT publish to $topic failed" }
}
.onFailure { e -> Logger.w(e) { "MQTT publish to $topic failed" } }
}
}
}