feat(mqtt): adopt mqttastic-client-kmp 0.2.0 — disconnect reasons + Test Connection (#5181)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: jamesarich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2026-04-17 21:33:55 -05:00 committed by GitHub
parent 5c870028d4
commit 84e70d01a3
12 changed files with 425 additions and 55 deletions

View file

@ -31,12 +31,17 @@ import kotlinx.coroutines.flow.stateIn
import org.koin.core.annotation.Named
import org.koin.core.annotation.Single
import org.meshtastic.core.model.MqttConnectionState
import org.meshtastic.core.model.MqttProbeStatus
import org.meshtastic.core.network.repository.MQTTRepository
import org.meshtastic.core.network.repository.resolveEndpoint
import org.meshtastic.core.repository.MqttManager
import org.meshtastic.core.repository.PacketHandler
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.mqtt.ConnectionState
import org.meshtastic.mqtt.MqttClient
import org.meshtastic.mqtt.MqttException
import org.meshtastic.mqtt.ProbeResult
import org.meshtastic.mqtt.probe
import org.meshtastic.proto.MqttClientProxyMessage
import org.meshtastic.proto.ToRadio
@ -52,9 +57,9 @@ class MqttManagerImpl(
override val mqttConnectionState: StateFlow<MqttConnectionState> =
combine(proxyActive, mqttRepository.connectionState) { active, libState ->
if (!active) MqttConnectionState.INACTIVE else libState.toAppState()
if (!active) MqttConnectionState.Inactive else libState.toAppState()
}
.stateIn(scope, SharingStarted.Eagerly, MqttConnectionState.INACTIVE)
.stateIn(scope, SharingStarted.Eagerly, MqttConnectionState.Inactive)
override fun startProxy(enabled: Boolean, proxyToClientEnabled: Boolean) {
if (mqttMessageFlow?.isActive == true) return
@ -102,9 +107,55 @@ class MqttManagerImpl(
}
private fun ConnectionState.toAppState(): MqttConnectionState = when (this) {
ConnectionState.DISCONNECTED -> MqttConnectionState.DISCONNECTED
ConnectionState.CONNECTING -> MqttConnectionState.CONNECTING
ConnectionState.CONNECTED -> MqttConnectionState.CONNECTED
ConnectionState.RECONNECTING -> MqttConnectionState.RECONNECTING
is ConnectionState.Connecting -> MqttConnectionState.Connecting
is ConnectionState.Connected -> MqttConnectionState.Connected
is ConnectionState.Reconnecting ->
MqttConnectionState.Reconnecting(attempt = attempt, lastError = lastError?.message)
is ConnectionState.Disconnected ->
reason?.let { MqttConnectionState.Disconnected(reason = it.message) }
?: MqttConnectionState.Disconnected.Idle
}
override suspend fun probe(
address: String,
tlsEnabled: Boolean,
username: String?,
password: String?,
): MqttProbeStatus {
val endpoint = resolveEndpoint(address, tlsEnabled)
val result =
MqttClient.probe(endpoint = endpoint) {
val user = username?.takeUnless { it.isEmpty() }
val pass = password?.takeUnless { it.isEmpty() }
if (user != null) this.username = user
if (pass != null) password(pass)
}
return result.toAppStatus()
}
private fun ProbeResult.toAppStatus(): MqttProbeStatus = when (this) {
is ProbeResult.Success -> {
val info = serverInfo
val summary =
buildList {
info.assignedClientIdentifier?.let { add("client=$it") }
info.maximumQosOrdinal?.let { add("maxQoS=$it") }
info.serverKeepAliveSeconds?.let { add("keepalive=${it}s") }
}
.joinToString(", ")
.ifEmpty { null }
MqttProbeStatus.Success(serverInfo = summary)
}
is ProbeResult.Rejected ->
MqttProbeStatus.Rejected(
reasonCode = reasonCode.value,
reason = message,
serverReference = serverReference,
)
is ProbeResult.DnsFailure -> MqttProbeStatus.DnsFailure(message = cause.message)
is ProbeResult.TcpFailure -> MqttProbeStatus.TcpFailure(message = cause.message)
is ProbeResult.TlsFailure -> MqttProbeStatus.TlsFailure(message = cause.message)
is ProbeResult.Timeout -> MqttProbeStatus.Timeout(timeoutMs = durationMs)
is ProbeResult.Other -> MqttProbeStatus.Other(message = cause.message)
}
}

View file

@ -16,20 +16,41 @@
*/
package org.meshtastic.core.model
/** App-level MQTT proxy connection state, decoupled from the MQTT library's internal type. */
enum class MqttConnectionState {
/**
* App-level MQTT proxy connection state, decoupled from the MQTT library's internal type.
*
* Modeled as a sealed class so disconnect / reconnect events can carry diagnostic context the user-facing reason for
* an unexpected disconnect, or the most recent reconnect attempt failure without requiring downstream consumers to
* depend on the MQTT library's exception types.
*/
sealed class MqttConnectionState {
/** The MQTT proxy has not been started (disabled or not yet initialized). */
INACTIVE,
/** The MQTT client is not connected to the broker. */
DISCONNECTED,
data object Inactive : MqttConnectionState()
/** The MQTT client is actively connecting to the broker. */
CONNECTING,
data object Connecting : MqttConnectionState()
/** The MQTT client is connected and subscribed to topics. */
CONNECTED,
data object Connected : MqttConnectionState()
/** The MQTT client lost connection and is attempting to reconnect. */
RECONNECTING,
/**
* The MQTT client lost connection and is attempting to reconnect.
*
* @property attempt 1-based attempt counter for the current reconnect loop.
* @property lastError Localized message from the most recent reconnect failure, if any.
*/
data class Reconnecting(val attempt: Int = 0, val lastError: String? = null) : MqttConnectionState()
/**
* The MQTT client is not connected to the broker.
*
* @property reason Localized failure message for an unexpected disconnect, or `null` for the idle / initial /
* intentional-close case (use [Idle]).
*/
data class Disconnected(val reason: String? = null) : MqttConnectionState() {
companion object {
/** Singleton for the idle / no-reason disconnected state. */
val Idle: Disconnected = Disconnected(reason = null)
}
}
}

View file

@ -0,0 +1,52 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.model
/**
* UI-friendly outcome of a one-shot MQTT broker reachability probe.
*
* Mirrors the failure shapes of `org.meshtastic.mqtt.ProbeResult` but stays in the model module so feature/UI code can
* consume the result without depending on the MQTT library.
*/
sealed class MqttProbeStatus {
/** Probe is currently in flight. */
data object Probing : MqttProbeStatus()
/**
* Broker accepted the connection. [serverInfo] is a short human-readable summary of any CONNACK properties that are
* useful to surface to the user.
*/
data class Success(val serverInfo: String?) : MqttProbeStatus()
/** Broker rejected the connection (CONNACK with non-zero reason code). */
data class Rejected(val reasonCode: Int, val reason: String?, val serverReference: String?) : MqttProbeStatus()
/** DNS lookup failed. */
data class DnsFailure(val message: String?) : MqttProbeStatus()
/** TCP socket could not be opened. */
data class TcpFailure(val message: String?) : MqttProbeStatus()
/** TLS handshake failed. */
data class TlsFailure(val message: String?) : MqttProbeStatus()
/** Probe exceeded its timeout. */
data class Timeout(val timeoutMs: Long) : MqttProbeStatus()
/** Any other / unclassified failure. */
data class Other(val message: String?) : MqttProbeStatus()
}

View file

@ -65,7 +65,6 @@ class MQTTRepositoryImpl(
private const val DEFAULT_TOPIC_LEVEL = "/2/e/"
private const val JSON_TOPIC_LEVEL = "/2/json/"
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
private const val WEBSOCKET_PATH = "/mqtt"
private const val KEEPALIVE_SECONDS = 30
private const val INITIAL_RECONNECT_DELAY_MS = 1000L
private const val MAX_RECONNECT_DELAY_MS = 30_000L
@ -74,7 +73,7 @@ class MQTTRepositoryImpl(
@Volatile private var client: MqttClient? = null
private val _connectionState = MutableStateFlow(ConnectionState.DISCONNECTED)
private val _connectionState = MutableStateFlow<ConnectionState>(ConnectionState.Disconnected.Idle)
override val connectionState: StateFlow<ConnectionState> = _connectionState.asStateFlow()
@OptIn(ExperimentalSerializationApi::class)
@ -89,7 +88,7 @@ class MQTTRepositoryImpl(
Logger.i { "MQTT Disconnecting" }
val c = client
client = null
_connectionState.value = ConnectionState.DISCONNECTED
_connectionState.value = ConnectionState.Disconnected.Idle
scope.launch { safeCatching { c?.close() }.onFailure { e -> Logger.w(e) { "MQTT clean disconnect failed" } } }
}
@ -102,14 +101,7 @@ class MQTTRepositoryImpl(
val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT } ?: DEFAULT_TOPIC_ROOT
val rawAddress = mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS
val endpoint =
if (rawAddress.contains("://")) {
MqttEndpoint.parse(rawAddress)
} else {
// Use WebSocket transport on all platforms for firewall/CDN compatibility.
val scheme = if (mqttConfig?.tls_enabled == true) "wss" else "ws"
MqttEndpoint.parse("$scheme://$rawAddress$WEBSOCKET_PATH")
}
val endpoint = resolveEndpoint(rawAddress, mqttConfig?.tls_enabled == true)
val newClient =
MqttClient(ownerId) {
@ -226,3 +218,26 @@ class MQTTRepositoryImpl(
}
}
}
/**
* Resolve a user-supplied broker address into an [MqttEndpoint].
*
* Address resolution rules:
* - If [rawAddress] already contains a URI scheme (`scheme://…`), parse it directly via [MqttEndpoint.parse] and
* respect whatever transport / port the user encoded.
* - Otherwise wrap it as a WebSocket endpoint (`ws[s]://host${WEBSOCKET_PATH}`) so the proxy works over CDNs and
* firewall-restricted networks where raw 1883/8883 may be blocked. The scheme is `wss` when [tlsEnabled] is `true`,
* `ws` otherwise.
*
* Extracted as a top-level function so [MQTTRepositoryImplTest] can exercise every branch without spinning up the full
* repository, and so `MqttManagerImpl` (in `:core:data`) can reuse the same parsing rules for the probe API. Visibility
* is `public` because Kotlin's `internal` is scoped per Gradle module.
*/
fun resolveEndpoint(rawAddress: String, tlsEnabled: Boolean): MqttEndpoint = if (rawAddress.contains("://")) {
MqttEndpoint.parse(rawAddress)
} else {
val scheme = if (tlsEnabled) "wss" else "ws"
MqttEndpoint.parse("$scheme://$rawAddress$WEBSOCKET_PATH")
}
private const val WEBSOCKET_PATH = "/mqtt"

View file

@ -18,25 +18,82 @@ package org.meshtastic.core.network.repository
import kotlinx.serialization.json.Json
import org.meshtastic.core.model.MqttJsonPayload
import org.meshtastic.mqtt.MqttEndpoint
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertTrue
class MQTTRepositoryImplTest {
@Test
fun `test address parsing logic`() {
val address1 = "mqtt.example.com:1883"
val (host1, port1) = address1.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) }
assertEquals("mqtt.example.com", host1)
assertEquals(1883, port1)
// region resolveEndpoint — every behavioral branch of address parsing.
val address2 = "mqtt.example.com"
val (host2, port2) = address2.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) }
assertEquals("mqtt.example.com", host2)
assertEquals(1883, port2)
@Test
fun `bare host without scheme is wrapped as ws WebSocket on the standard port`() {
val endpoint = resolveEndpoint(rawAddress = "broker.example.com", tlsEnabled = false)
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
assertEquals("ws://broker.example.com/mqtt", ws.url)
}
@Test
fun `bare host with TLS enabled is upgraded to wss`() {
val endpoint = resolveEndpoint(rawAddress = "broker.example.com", tlsEnabled = true)
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
assertEquals("wss://broker.example.com/mqtt", ws.url)
}
@Test
fun `host with explicit port is preserved when wrapped`() {
val endpoint = resolveEndpoint(rawAddress = "broker.example.com:9001", tlsEnabled = false)
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
assertEquals("ws://broker.example.com:9001/mqtt", ws.url)
}
@Test
fun `address with ws scheme is parsed as-is and tls flag is ignored`() {
// tlsEnabled is intentionally true here — when the user supplies a full URL we
// must honor whatever scheme they provided, not silently upgrade it.
val endpoint = resolveEndpoint(rawAddress = "ws://broker.example.com:8080/custom-path", tlsEnabled = true)
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
assertEquals("ws://broker.example.com:8080/custom-path", ws.url)
}
@Test
fun `address with wss scheme is parsed as-is`() {
val endpoint = resolveEndpoint(rawAddress = "wss://broker.example.com/secure-mqtt", tlsEnabled = false)
val ws = assertIs<MqttEndpoint.WebSocket>(endpoint)
assertEquals("wss://broker.example.com/secure-mqtt", ws.url)
}
@Test
fun `address with mqtt tcp scheme is parsed as Tcp endpoint`() {
val endpoint = resolveEndpoint(rawAddress = "mqtt://broker.example.com:1883", tlsEnabled = false)
val tcp = assertIs<MqttEndpoint.Tcp>(endpoint)
assertEquals("broker.example.com", tcp.host)
assertEquals(1883, tcp.port)
assertEquals(false, tcp.tls)
}
@Test
fun `address with mqtts tcp scheme is parsed as Tcp endpoint with tls true`() {
val endpoint = resolveEndpoint(rawAddress = "mqtts://broker.example.com:8883", tlsEnabled = false)
val tcp = assertIs<MqttEndpoint.Tcp>(endpoint)
assertEquals("broker.example.com", tcp.host)
assertEquals(8883, tcp.port)
assertEquals(true, tcp.tls)
}
// endregion
// region MqttJsonPayload — keep the existing JSON contract tests.
@Test
fun `test json payload parsing`() {
val jsonStr =
@ -72,4 +129,6 @@ class MQTTRepositoryImplTest {
assertTrue(jsonStr.contains("\"from\":12345678"))
assertTrue(jsonStr.contains("\"payload\":\"Hello World\""))
}
// endregion
}

View file

@ -18,6 +18,7 @@ package org.meshtastic.core.repository
import kotlinx.coroutines.flow.StateFlow
import org.meshtastic.core.model.MqttConnectionState
import org.meshtastic.core.model.MqttProbeStatus
import org.meshtastic.proto.MqttClientProxyMessage
/** Interface for managing MQTT proxy communication. */
@ -33,4 +34,15 @@ interface MqttManager {
/** Handles an MQTT proxy message from the radio. */
fun handleMqttProxyMessage(message: MqttClientProxyMessage)
/**
* Probe an MQTT broker to verify connectivity and credentials without joining the proxy lifecycle. Intended for UI
* "Test Connection" affordances.
*
* @param address Raw broker address as the user would type it (host, host:port, or full URL).
* @param tlsEnabled `true` to upgrade bare addresses to `wss://` (ignored when [address] already has a scheme).
* @param username Optional MQTT username.
* @param password Optional MQTT password.
*/
suspend fun probe(address: String, tlsEnabled: Boolean, username: String?, password: String?): MqttProbeStatus
}

View file

@ -636,9 +636,21 @@
<string name="mqtt_config">MQTT Config</string>
<string name="mqtt_status_inactive">Inactive</string>
<string name="mqtt_status_disconnected">Disconnected</string>
<string name="mqtt_status_disconnected_with_reason">Disconnected — %1$s</string>
<string name="mqtt_status_connecting">Connecting…</string>
<string name="mqtt_status_connected">Connected</string>
<string name="mqtt_status_reconnecting">Reconnecting…</string>
<string name="mqtt_status_reconnecting_with_attempt">Reconnecting (attempt %1$d) — %2$s</string>
<string name="mqtt_test_connection">Test connection</string>
<string name="mqtt_probe_running">Probing broker…</string>
<string name="mqtt_probe_success">Reachable. Broker accepted credentials.</string>
<string name="mqtt_probe_success_with_info">Reachable (%1$s)</string>
<string name="mqtt_probe_rejected">Broker rejected: %1$s</string>
<string name="mqtt_probe_dns_failure">Host not found</string>
<string name="mqtt_probe_tcp_failure">Cannot reach broker (TCP)</string>
<string name="mqtt_probe_tls_failure">TLS handshake failed</string>
<string name="mqtt_probe_timeout">Timed out after %1$d ms</string>
<string name="mqtt_probe_other_failure">Connection failed</string>
<string name="mqtt_enabled">MQTT enabled</string>
<string name="address">Address</string>
<string name="username">Username</string>