feat: mqtt (#4841)

This commit is contained in:
James Rich 2026-03-18 13:39:20 -05:00 committed by GitHub
parent eae5a6bdac
commit d314ee2d8a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 371 additions and 200 deletions

View file

@ -0,0 +1,34 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.model
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
@Serializable
data class MqttJsonPayload(
val type: String,
val from: Long,
val to: Long? = null,
val channel: Int? = null,
val payload: String? = null,
@SerialName("hop_limit") val hopLimit: Int? = null,
val id: Long? = null,
val time: Long? = null,
val sender: String? = null,
// Add other fields as needed for position/telemetry
)

View file

@ -41,6 +41,8 @@ kotlin {
implementation(projects.core.proto)
implementation(libs.okio)
implementation(libs.kmqtt.client)
implementation(libs.kmqtt.common)
implementation(libs.kotlinx.serialization.json)
implementation(libs.ktor.client.core)
implementation(libs.ktor.client.content.negotiation)
@ -58,7 +60,6 @@ kotlin {
androidMain.dependencies {
implementation(projects.core.ble)
implementation(projects.core.prefs)
implementation(libs.org.eclipse.paho.client.mqttv3)
implementation(libs.usb.serial.android)
implementation(libs.coil.network.okhttp)
implementation(libs.coil.svg)

View file

@ -1,178 +0,0 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.repository
import co.touchlab.kermit.Logger
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.first
import okio.ByteString.Companion.toByteString
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttAsyncClient
import org.eclipse.paho.client.mqttv3.MqttAsyncClient.generateClientId
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.ignoreException
import org.meshtastic.core.model.util.subscribeList
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.proto.MqttClientProxyMessage
import java.net.URI
import java.security.SecureRandom
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManager
@Single
class MQTTRepositoryImpl
constructor(
private val radioConfigRepository: RadioConfigRepository,
private val nodeRepository: NodeRepository,
) : MQTTRepository {
companion object {
/**
* Quality of Service (QoS) levels in MQTT:
* - QoS 0: "at most once". Packets are sent once without validation if it has been received.
* - QoS 1: "at least once". Packets are sent and stored until the client receives confirmation from the server.
* MQTT ensures delivery, but duplicates may occur.
* - QoS 2: "exactly once". Similar to QoS 1, but with no duplicates.
*/
private const val DEFAULT_QOS = 1
private const val DEFAULT_TOPIC_ROOT = "msh"
private const val DEFAULT_TOPIC_LEVEL = "/2/e/"
private const val JSON_TOPIC_LEVEL = "/2/json/"
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
}
private var mqttClient: MqttAsyncClient? = null
override fun disconnect() {
Logger.i { "MQTT Disconnected" }
mqttClient?.apply {
if (isConnected) {
ignoreException { disconnect() }
}
ignoreException { close(true) }
}
mqttClient = null
}
override val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: generateClientId()}"
val channelSet = radioConfigRepository.channelSetFlow.first()
val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
val sslContext = SSLContext.getInstance("TLS")
// Create a custom SSLContext that trusts all certificates
sslContext.init(null, arrayOf<TrustManager>(TrustAllX509TrustManager()), SecureRandom())
val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT }
val connectOptions =
MqttConnectOptions().apply {
userName = mqttConfig?.username
password = mqttConfig?.password?.toCharArray()
isAutomaticReconnect = true
if (mqttConfig?.tls_enabled == true) {
socketFactory = sslContext.socketFactory
}
}
@Suppress("MagicNumber")
val bufferOptions =
DisconnectedBufferOptions().apply {
isBufferEnabled = true
bufferSize = 512
isPersistBuffer = false
isDeleteOldestMessages = true
}
val callback =
object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String) {
Logger.i { "MQTT connectComplete: $serverURI reconnect: $reconnect" }
channelSet.subscribeList
.ifEmpty {
return
}
.forEach { globalId ->
subscribe("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+")
if (mqttConfig?.json_enabled == true) subscribe("$rootTopic$JSON_TOPIC_LEVEL$globalId/+")
}
subscribe("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+")
}
override fun connectionLost(cause: Throwable) {
Logger.i { "MQTT connectionLost cause: $cause" }
if (cause is IllegalArgumentException) close(cause)
}
override fun messageArrived(topic: String, message: MqttMessage) {
trySend(
MqttClientProxyMessage(
topic = topic,
data_ = message.payload.toByteString(),
retained = message.isRetained,
),
)
}
override fun deliveryComplete(token: IMqttDeliveryToken?) {
Logger.i { "MQTT deliveryComplete messageId: ${token?.messageId}" }
}
}
val scheme = if (mqttConfig?.tls_enabled == true) "ssl" else "tcp"
val (host, port) =
(mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let {
it[0] to (it.getOrNull(1)?.toIntOrNull() ?: -1)
}
mqttClient =
MqttAsyncClient(URI(scheme, null, host, port, "", "", "").toString(), ownerId, MemoryPersistence()).apply {
setCallback(callback)
setBufferOpts(bufferOptions)
connect(connectOptions)
}
awaitClose { disconnect() }
}
private fun subscribe(topic: String) {
mqttClient?.subscribe(topic, DEFAULT_QOS)
Logger.i { "MQTT Subscribed to topic: $topic" }
}
@Suppress("TooGenericExceptionCaught")
override fun publish(topic: String, data: ByteArray, retained: Boolean) {
try {
val token = mqttClient?.publish(topic, data, DEFAULT_QOS, retained)
Logger.i { "MQTT Publish messageId: ${token?.messageId}" }
} catch (ex: Exception) {
if (ex.message?.contains("Client is disconnected") == true) {
Logger.w { "MQTT Publish skipped: Client is disconnected" }
} else {
Logger.e(ex) { "MQTT Publish error: ${ex.message}" }
}
}
}
}

View file

@ -0,0 +1,167 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.repository
import co.touchlab.kermit.Logger
import io.github.davidepianca98.MQTTClient
import io.github.davidepianca98.mqtt.MQTTVersion
import io.github.davidepianca98.mqtt.Subscription
import io.github.davidepianca98.mqtt.packets.Qos
import io.github.davidepianca98.mqtt.packets.mqttv5.SubscriptionOptions
import io.github.davidepianca98.socket.tls.TLSClientSettings
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.model.MqttJsonPayload
import org.meshtastic.core.model.util.subscribeList
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.proto.MqttClientProxyMessage
@Single(binds = [MQTTRepository::class])
class MQTTRepositoryImpl(
private val radioConfigRepository: RadioConfigRepository,
private val nodeRepository: NodeRepository,
) : MQTTRepository {
companion object {
private const val DEFAULT_TOPIC_ROOT = "msh"
private const val DEFAULT_TOPIC_LEVEL = "/2/e/"
private const val JSON_TOPIC_LEVEL = "/2/json/"
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
}
private var client: MQTTClient? = null
private val json = Json { ignoreUnknownKeys = true }
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private var clientJob: Job? = null
override fun disconnect() {
Logger.i { "MQTT Disconnecting" }
clientJob?.cancel()
clientJob = null
client = null
}
@OptIn(ExperimentalUnsignedTypes::class)
override val proxyMessageFlow: Flow<MqttClientProxyMessage> = callbackFlow {
val ownerId = "MeshtasticAndroidMqttProxy-${nodeRepository.myId.value ?: "unknown"}"
val channelSet = radioConfigRepository.channelSetFlow.first()
val mqttConfig = radioConfigRepository.moduleConfigFlow.first().mqtt
val rootTopic = mqttConfig?.root?.ifEmpty { DEFAULT_TOPIC_ROOT } ?: DEFAULT_TOPIC_ROOT
val (host, port) =
(mqttConfig?.address ?: DEFAULT_SERVER_ADDRESS).split(":", limit = 2).let {
it[0] to (it.getOrNull(1)?.toIntOrNull() ?: if (mqttConfig?.tls_enabled == true) 8883 else 1883)
}
val newClient =
MQTTClient(
mqttVersion = MQTTVersion.MQTT5,
address = host,
port = port,
tls = if (mqttConfig?.tls_enabled == true) TLSClientSettings() else null,
userName = mqttConfig?.username,
password = mqttConfig?.password?.encodeToByteArray()?.toUByteArray(),
clientId = ownerId,
publishReceived = { packet ->
val topic = packet.topicName
val payload = packet.payload?.toByteArray()
Logger.d { "MQTT received message on topic $topic (size: ${payload?.size ?: 0} bytes)" }
if (topic.contains("/json/")) {
try {
val jsonStr = payload?.decodeToString() ?: ""
// Validate JSON by parsing it
json.decodeFromString<MqttJsonPayload>(jsonStr)
Logger.d { "MQTT parsed JSON payload successfully" }
trySend(MqttClientProxyMessage(topic = topic, text = jsonStr, retained = packet.retain))
} catch (e: kotlinx.serialization.SerializationException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
} catch (e: IllegalArgumentException) {
Logger.e(e) { "Failed to parse MQTT JSON: ${e.message}" }
}
} else {
trySend(
MqttClientProxyMessage(
topic = topic,
data_ = payload?.toByteString() ?: okio.ByteString.EMPTY,
retained = packet.retain,
),
)
}
},
)
client = newClient
clientJob =
scope.launch {
try {
Logger.i { "MQTT Starting client loop for $host:$port" }
newClient.runSuspend()
} catch (e: io.github.davidepianca98.mqtt.MQTTException) {
Logger.e(e) { "MQTT Client loop error (MQTT)" }
close(e)
} catch (e: io.github.davidepianca98.socket.IOException) {
Logger.e(e) { "MQTT Client loop error (IO)" }
close(e)
} catch (e: kotlinx.coroutines.CancellationException) {
Logger.i { "MQTT Client loop cancelled" }
throw e
}
}
// Subscriptions
val subscriptions = mutableListOf<Subscription>()
channelSet.subscribeList.forEach { globalId ->
subscriptions.add(
Subscription("$rootTopic$DEFAULT_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
)
if (mqttConfig?.json_enabled == true) {
subscriptions.add(
Subscription("$rootTopic$JSON_TOPIC_LEVEL$globalId/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)),
)
}
}
subscriptions.add(Subscription("$rootTopic${DEFAULT_TOPIC_LEVEL}PKI/+", SubscriptionOptions(Qos.AT_LEAST_ONCE)))
if (subscriptions.isNotEmpty()) {
Logger.d { "MQTT subscribing to ${subscriptions.size} topics" }
newClient.subscribe(subscriptions)
}
awaitClose { disconnect() }
}
@OptIn(ExperimentalUnsignedTypes::class)
override fun publish(topic: String, data: ByteArray, retained: Boolean) {
Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" }
client?.publish(retain = retained, qos = Qos.AT_LEAST_ONCE, topic = topic, payload = data.toUByteArray())
}
}

View file

@ -0,0 +1,74 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.repository
import kotlinx.serialization.json.Json
import org.meshtastic.core.model.MqttJsonPayload
import kotlin.test.Test
import kotlin.test.assertEquals
class MQTTRepositoryImplTest {
@Test
fun `test address parsing logic`() {
val address1 = "mqtt.example.com:1883"
val (host1, port1) = address1.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) }
assertEquals("mqtt.example.com", host1)
assertEquals(1883, port1)
val address2 = "mqtt.example.com"
val (host2, port2) = address2.split(":", limit = 2).let { it[0] to (it.getOrNull(1)?.toIntOrNull() ?: 1883) }
assertEquals("mqtt.example.com", host2)
assertEquals(1883, port2)
}
@Test
fun `test json payload parsing`() {
val jsonStr =
"""{"type":"text","from":12345678,"to":4294967295,"payload":"Hello World","hop_limit":3,"id":123,"time":1600000000}"""
val json = Json { ignoreUnknownKeys = true }
val payload = json.decodeFromString<MqttJsonPayload>(jsonStr)
assertEquals("text", payload.type)
assertEquals(12345678L, payload.from)
assertEquals(4294967295L, payload.to)
assertEquals("Hello World", payload.payload)
assertEquals(3, payload.hopLimit)
assertEquals(123L, payload.id)
assertEquals(1600000000L, payload.time)
}
@Test
fun `test json payload serialization`() {
val payload =
MqttJsonPayload(
type = "text",
from = 12345678,
to = 4294967295,
payload = "Hello World",
hopLimit = 3,
id = 123,
time = 1600000000,
)
val json = Json { ignoreUnknownKeys = true }
val jsonStr = json.encodeToString(MqttJsonPayload.serializer(), payload)
assert(jsonStr.contains("\"type\":\"text\""))
assert(jsonStr.contains("\"from\":12345678"))
assert(jsonStr.contains("\"payload\":\"Hello World\""))
}
}