fix: Update messaging feature with contact item keys and MQTT limits (#4871)

This commit is contained in:
James Rich 2026-03-21 09:25:23 -05:00 committed by GitHub
parent d61c0c9a67
commit 88d11aafec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 17 additions and 3 deletions

View file

@ -32,6 +32,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.serialization.json.Json
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
@ -58,6 +60,7 @@ class MQTTRepositoryImpl(
private val json = Json { ignoreUnknownKeys = true }
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private var clientJob: Job? = null
private val publishSemaphore = Semaphore(20)
override fun disconnect() {
Logger.i { "MQTT Disconnecting" }
@ -162,6 +165,15 @@ class MQTTRepositoryImpl(
@OptIn(ExperimentalUnsignedTypes::class)
override fun publish(topic: String, data: ByteArray, retained: Boolean) {
Logger.d { "MQTT publishing message to topic $topic (size: ${data.size} bytes, retained: $retained)" }
client?.publish(retain = retained, qos = Qos.AT_LEAST_ONCE, topic = topic, payload = data.toUByteArray())
scope.launch {
publishSemaphore.withPermit {
client?.publish(
retain = retained,
qos = Qos.AT_LEAST_ONCE,
topic = topic,
payload = data.toUByteArray(),
)
}
}
}
}