From 47bc9218dc6685432b6dc19028861f3c3a8df684 Mon Sep 17 00:00:00 2001 From: andrekir Date: Mon, 16 Oct 2023 17:40:37 -0300 Subject: [PATCH] refactor: subscribe MQTT channels with `downlink_enabled` --- .../com/geeksville/mesh/model/ChannelSet.kt | 6 +++++ .../mesh/repository/network/MQTTRepository.kt | 26 ++++++++++++++----- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/app/src/main/java/com/geeksville/mesh/model/ChannelSet.kt b/app/src/main/java/com/geeksville/mesh/model/ChannelSet.kt index e936bc713..5254f7d88 100644 --- a/app/src/main/java/com/geeksville/mesh/model/ChannelSet.kt +++ b/app/src/main/java/com/geeksville/mesh/model/ChannelSet.kt @@ -30,6 +30,12 @@ fun Uri.toChannelSet(): ChannelSet { return ChannelSet.parseFrom(bytes) } +/** + * @return A list of globally unique channel IDs usable with MQTT subscribe() + */ +val ChannelSet.subscribeList: List + get() = settingsList.filter { it.downlinkEnabled }.map { Channel(it, loraConfig).name } + /** * Return the primary channel info */ diff --git a/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt b/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt index f96e6a6bd..c4c4c1188 100644 --- a/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt +++ b/app/src/main/java/com/geeksville/mesh/repository/network/MQTTRepository.kt @@ -1,9 +1,10 @@ package com.geeksville.mesh.repository.network import com.geeksville.mesh.MeshProtos.MqttClientProxyMessage -import com.geeksville.mesh.ModuleConfigProtos.ModuleConfig.MQTTConfig import com.geeksville.mesh.android.Logging +import com.geeksville.mesh.model.subscribeList import com.geeksville.mesh.mqttClientProxyMessage +import com.geeksville.mesh.repository.datastore.ChannelSetRepository import com.geeksville.mesh.repository.datastore.ModuleConfigRepository import com.geeksville.mesh.util.ignoreException import com.google.protobuf.ByteString @@ -27,6 +28,7 @@ import javax.net.ssl.TrustManager @Singleton class MQTTRepository @Inject constructor( + private val channelSetRepository: ChannelSetRepository, private val moduleConfigRepository: ModuleConfigRepository, ) : Logging { @@ -39,19 +41,23 @@ class MQTTRepository @Inject constructor( */ private const val DEFAULT_QOS = 1 private const val DEFAULT_TOPIC_ROOT = "msh" - private const val VERSION_TOPIC_LEVEL = "/2/c/#" + private const val STAT_TOPIC_LEVEL = "/2/stat/" + private const val DEFAULT_TOPIC_LEVEL = "/2/c/" + private const val JSON_TOPIC_LEVEL = "/2/json/" private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org" } private var mqttClient: MqttAsyncClient? = null suspend fun connect(callback: MqttCallbackExtended) { - val mqttConfig: MQTTConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt + val channelSet = channelSetRepository.fetchInitialChannelSet() ?: return + val mqttConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt val sslContext = SSLContext.getInstance("TLS") // Create a custom SSLContext that trusts all certificates sslContext.init(null, arrayOf(TrustAllX509TrustManager()), SecureRandom()) + // val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId val connectOptions = MqttConnectOptions().apply { userName = mqttConfig.username password = mqttConfig.password.toCharArray() @@ -60,11 +66,12 @@ class MQTTRepository @Inject constructor( if (mqttConfig.tlsEnabled) { socketFactory = sslContext.socketFactory } + // setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true) } val bufferOptions = DisconnectedBufferOptions().apply { isBufferEnabled = true - bufferSize = 100 + bufferSize = 512 isPersistBuffer = false isDeleteOldestMessages = true } @@ -75,7 +82,8 @@ class MQTTRepository @Inject constructor( val serverURI: String = URI(scheme, null, host, port, "", "", "").toString() - val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + VERSION_TOPIC_LEVEL + val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + + DEFAULT_TOPIC_LEVEL // FIXME if (mqttConfig.jsonEnabled) JSON_TOPIC_LEVEL else DEFAULT_TOPIC_LEVEL mqttClient = MqttAsyncClient( serverURI, @@ -86,8 +94,12 @@ class MQTTRepository @Inject constructor( setCallback(callback) setBufferOpts(bufferOptions) connect(connectOptions).waitForCompletion() - subscribe(topic, DEFAULT_QOS).waitForCompletion() - info("MQTT Subscribed to topic: $topic") + + channelSet.subscribeList.forEach { globalId -> + val topicFilter = "$topic$globalId/#" + subscribe(topicFilter, DEFAULT_QOS).waitForCompletion() + info("MQTT Subscribed to topic: $topicFilter") + } } }