mirror of
https://github.com/meshtastic/Meshtastic-Android.git
synced 2026-04-20 22:23:37 +00:00
refactor: subscribe MQTT channels with downlink_enabled
This commit is contained in:
parent
5ece09b4ce
commit
47bc9218dc
2 changed files with 25 additions and 7 deletions
|
|
@ -30,6 +30,12 @@ fun Uri.toChannelSet(): ChannelSet {
|
|||
return ChannelSet.parseFrom(bytes)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A list of globally unique channel IDs usable with MQTT subscribe()
|
||||
*/
|
||||
val ChannelSet.subscribeList: List<String>
|
||||
get() = settingsList.filter { it.downlinkEnabled }.map { Channel(it, loraConfig).name }
|
||||
|
||||
/**
|
||||
* Return the primary channel info
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
package com.geeksville.mesh.repository.network
|
||||
|
||||
import com.geeksville.mesh.MeshProtos.MqttClientProxyMessage
|
||||
import com.geeksville.mesh.ModuleConfigProtos.ModuleConfig.MQTTConfig
|
||||
import com.geeksville.mesh.android.Logging
|
||||
import com.geeksville.mesh.model.subscribeList
|
||||
import com.geeksville.mesh.mqttClientProxyMessage
|
||||
import com.geeksville.mesh.repository.datastore.ChannelSetRepository
|
||||
import com.geeksville.mesh.repository.datastore.ModuleConfigRepository
|
||||
import com.geeksville.mesh.util.ignoreException
|
||||
import com.google.protobuf.ByteString
|
||||
|
|
@ -27,6 +28,7 @@ import javax.net.ssl.TrustManager
|
|||
|
||||
@Singleton
|
||||
class MQTTRepository @Inject constructor(
|
||||
private val channelSetRepository: ChannelSetRepository,
|
||||
private val moduleConfigRepository: ModuleConfigRepository,
|
||||
) : Logging {
|
||||
|
||||
|
|
@ -39,19 +41,23 @@ class MQTTRepository @Inject constructor(
|
|||
*/
|
||||
private const val DEFAULT_QOS = 1
|
||||
private const val DEFAULT_TOPIC_ROOT = "msh"
|
||||
private const val VERSION_TOPIC_LEVEL = "/2/c/#"
|
||||
private const val STAT_TOPIC_LEVEL = "/2/stat/"
|
||||
private const val DEFAULT_TOPIC_LEVEL = "/2/c/"
|
||||
private const val JSON_TOPIC_LEVEL = "/2/json/"
|
||||
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
|
||||
}
|
||||
|
||||
private var mqttClient: MqttAsyncClient? = null
|
||||
|
||||
suspend fun connect(callback: MqttCallbackExtended) {
|
||||
val mqttConfig: MQTTConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt
|
||||
val channelSet = channelSetRepository.fetchInitialChannelSet() ?: return
|
||||
val mqttConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt
|
||||
|
||||
val sslContext = SSLContext.getInstance("TLS")
|
||||
// Create a custom SSLContext that trusts all certificates
|
||||
sslContext.init(null, arrayOf<TrustManager>(TrustAllX509TrustManager()), SecureRandom())
|
||||
|
||||
// val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId
|
||||
val connectOptions = MqttConnectOptions().apply {
|
||||
userName = mqttConfig.username
|
||||
password = mqttConfig.password.toCharArray()
|
||||
|
|
@ -60,11 +66,12 @@ class MQTTRepository @Inject constructor(
|
|||
if (mqttConfig.tlsEnabled) {
|
||||
socketFactory = sslContext.socketFactory
|
||||
}
|
||||
// setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true)
|
||||
}
|
||||
|
||||
val bufferOptions = DisconnectedBufferOptions().apply {
|
||||
isBufferEnabled = true
|
||||
bufferSize = 100
|
||||
bufferSize = 512
|
||||
isPersistBuffer = false
|
||||
isDeleteOldestMessages = true
|
||||
}
|
||||
|
|
@ -75,7 +82,8 @@ class MQTTRepository @Inject constructor(
|
|||
|
||||
val serverURI: String = URI(scheme, null, host, port, "", "", "").toString()
|
||||
|
||||
val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + VERSION_TOPIC_LEVEL
|
||||
val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } +
|
||||
DEFAULT_TOPIC_LEVEL // FIXME if (mqttConfig.jsonEnabled) JSON_TOPIC_LEVEL else DEFAULT_TOPIC_LEVEL
|
||||
|
||||
mqttClient = MqttAsyncClient(
|
||||
serverURI,
|
||||
|
|
@ -86,8 +94,12 @@ class MQTTRepository @Inject constructor(
|
|||
setCallback(callback)
|
||||
setBufferOpts(bufferOptions)
|
||||
connect(connectOptions).waitForCompletion()
|
||||
subscribe(topic, DEFAULT_QOS).waitForCompletion()
|
||||
info("MQTT Subscribed to topic: $topic")
|
||||
|
||||
channelSet.subscribeList.forEach { globalId ->
|
||||
val topicFilter = "$topic$globalId/#"
|
||||
subscribe(topicFilter, DEFAULT_QOS).waitForCompletion()
|
||||
info("MQTT Subscribed to topic: $topicFilter")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue