MeshService: extract basic packet handling (#2813)

This commit is contained in:
Phil Oliver 2025-08-24 08:15:32 -04:00 committed by GitHub
parent ce54f42988
commit c26d76f60b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 328 additions and 177 deletions

View file

@ -91,7 +91,6 @@ import com.google.protobuf.ByteString
import com.google.protobuf.InvalidProtocolBufferException
import dagger.Lazy
import dagger.hilt.android.AndroidEntryPoint
import java8.util.concurrent.CompletableFuture
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
@ -103,13 +102,9 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.withTimeoutOrNull
import java.util.Random
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import javax.inject.Inject
import kotlin.math.absoluteValue
@ -216,6 +211,8 @@ class MeshService :
MeshServiceBroadcasts(this, clientPackages) {
connectionState.also { radioConfigRepository.setConnectionState(it) }
}
private lateinit var packetHandler: PacketHandler
private val serviceJob = Job()
private val serviceScope = CoroutineScope(Dispatchers.IO + serviceJob)
private var connectionState = ConnectionState.DISCONNECTED
@ -289,42 +286,6 @@ class MeshService :
}
}
/**
* Send a command/packet to our radio. But cope with the possibility that we might start up before we are fully
* bound to the RadioInterfaceService
*/
private fun sendToRadio(p: ToRadio.Builder) {
val built = p.build()
debug("Sending to radio ${built.toPIIString()}")
val b = built.toByteArray()
radioInterfaceService.sendToRadio(b)
changeStatus(p.packet.id, MessageStatus.ENROUTE)
if (p.packet.hasDecoded()) {
val packetToSave =
MeshLog(
uuid = UUID.randomUUID().toString(),
message_type = "Packet",
received_date = System.currentTimeMillis(),
raw_message = p.packet.toString(),
fromNum = p.packet.from,
portNum = p.packet.decoded.portnumValue,
fromRadio = fromRadio { packet = p.packet },
)
insertMeshLog(packetToSave)
}
}
/**
* Send a mesh packet to the radio, if the radio is not currently connected this function will throw
* NotConnectedException
*/
private fun sendToRadio(packet: MeshPacket) {
queuedPackets.add(packet)
startPacketQueue()
}
private fun showAlertNotification(contactKey: String, dataPacket: DataPacket) {
serviceNotifications.showAlertNotification(
contactKey,
@ -369,6 +330,14 @@ class MeshService :
loadSettings() // Load our last known node DB
packetHandler =
PacketHandler(
packetRepository = packetRepository,
serviceBroadcasts = serviceBroadcasts,
radioInterfaceService = radioInterfaceService,
meshLogRepository = meshLogRepository,
)
// the rest of our init will happen once we are in radioConnection.onServiceConnected
}
@ -848,7 +817,7 @@ class MeshService :
}
handleAckNak(data.requestId, fromId, u.errorReasonValue)
queueResponse.remove(data.requestId)?.complete(true)
packetHandler.removeResponse(data.requestId, complete = true)
}
Portnums.PortNum.ADMIN_APP_VALUE -> {
@ -1121,64 +1090,11 @@ class MeshService :
}
}
private val queuedPackets = ConcurrentLinkedQueue<MeshPacket>()
private val queueResponse = mutableMapOf<Int, CompletableFuture<Boolean>>()
private var queueJob: Job? = null
private fun sendPacket(packet: MeshPacket): CompletableFuture<Boolean> {
// send the packet to the radio and return a CompletableFuture that will be completed with
// the result
val future = CompletableFuture<Boolean>()
queueResponse[packet.id] = future
try {
if (connectionState != ConnectionState.CONNECTED) throw RadioNotConnectedException()
sendToRadio(ToRadio.newBuilder().apply { this.packet = packet })
} catch (ex: Exception) {
errormsg("sendToRadio error:", ex)
future.complete(false)
}
return future
}
private fun startPacketQueue() {
if (queueJob?.isActive == true) return
queueJob =
serviceScope.handledLaunch {
debug("packet queueJob started")
while (connectionState == ConnectionState.CONNECTED) {
// take the first packet from the queue head
val packet = queuedPackets.poll() ?: break
try {
// send packet to the radio and wait for response
val response = sendPacket(packet)
debug("queueJob packet id=${packet.id.toUInt()} waiting")
val success = response.get(2, TimeUnit.MINUTES)
debug("queueJob packet id=${packet.id.toUInt()} success $success")
} catch (e: TimeoutException) {
debug("queueJob packet id=${packet.id.toUInt()} timeout")
} catch (e: Exception) {
debug("queueJob packet id=${packet.id.toUInt()} failed")
}
}
}
}
private fun stopPacketQueue() {
if (queueJob?.isActive == true) {
info("Stopping packet queueJob")
queueJob?.cancel()
queueJob = null
queuedPackets.clear()
queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(false)
queueResponse.clear()
}
}
private fun sendNow(p: DataPacket) {
val packet = toMeshPacket(p)
p.time = System.currentTimeMillis() // update time to the actual time we started sending
// debug("Sending to radio: ${packet.toPIIString()}")
sendToRadio(packet)
packetHandler.sendToRadio(packet) { connectionState }
}
private fun processQueuedPackets() {
@ -1194,26 +1110,6 @@ class MeshService :
offlineSentPackets.removeAll(sentPackets)
}
private suspend fun getDataPacketById(packetId: Int): DataPacket? = withTimeoutOrNull(1000) {
var dataPacket: DataPacket? = null
while (dataPacket == null) {
dataPacket = packetRepository.get().getPacketById(packetId)?.data
if (dataPacket == null) delay(100)
}
dataPacket
}
/** Change the status on a DataPacket and update watchers */
private fun changeStatus(packetId: Int, m: MessageStatus) = serviceScope.handledLaunch {
if (packetId != 0) {
getDataPacketById(packetId)?.let { p ->
if (p.status == m) return@handledLaunch
packetRepository.get().updateMessageStatus(p, m)
serviceBroadcasts.broadcastMessageStatus(packetId, m)
}
}
}
/** Handle an ack/nak packet by updating sent message status */
private fun handleAckNak(requestId: Int, fromId: String, routingError: Int) {
serviceScope.handledLaunch {
@ -1349,7 +1245,7 @@ class MeshService :
// Perform all the steps needed once we start waiting for device sleep to complete
fun startDeviceSleep() {
stopPacketQueue()
packetHandler.stopPacketQueue()
stopLocationRequests()
stopMqttClientProxy()
@ -1382,7 +1278,7 @@ class MeshService :
}
fun startDisconnect() {
stopPacketQueue()
packetHandler.stopPacketQueue()
stopLocationRequests()
stopMqttClientProxy()
@ -1491,7 +1387,7 @@ class MeshService :
handleModuleConfig(proto.moduleConfig)
}
PayloadVariantCase.QUEUESTATUS -> { proto: MeshProtos.FromRadio ->
handleQueueStatus(proto.queueStatus)
packetHandler.handleQueueStatus((proto.queueStatus))
}
PayloadVariantCase.METADATA -> { proto: MeshProtos.FromRadio -> handleMetadata(proto.metadata) }
PayloadVariantCase.MQTTCLIENTPROXYMESSAGE -> { proto: MeshProtos.FromRadio ->
@ -1569,17 +1465,6 @@ class MeshService :
radioConfigRepository.setStatusMessage("Module config ($moduleCount / $moduleTotal)")
}
private fun handleQueueStatus(queueStatus: MeshProtos.QueueStatus) {
debug("queueStatus ${queueStatus.toOneLineString()}")
val (success, isFull, requestId) = with(queueStatus) { Triple(res == 0, free == 0, meshPacketId) }
if (success && isFull) return // Queue is full, wait for free != 0
if (requestId != 0) {
queueResponse.remove(requestId)?.complete(success)
} else {
queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(success)
}
}
private fun handleChannel(ch: ChannelProtos.Channel) {
debug("Received channel ${ch.index}")
val packetToSave =
@ -1770,7 +1655,7 @@ class MeshService :
serviceNotifications.showClientNotification(notification)
// if the future for the originating request is still in the queue, complete as unsuccessful
// for now
queueResponse.remove(notification.replyId)?.complete(false)
packetHandler.removeResponse(notification.replyId, complete = false)
}
private fun handleFileInfo(fileInfo: MeshProtos.FileInfo) {
@ -1844,7 +1729,9 @@ class MeshService :
if (moduleConfig.mqtt.enabled && moduleConfig.mqtt.proxyToClientEnabled) {
mqttMessageFlow =
mqttRepository.proxyMessageFlow
.onEach { message -> sendToRadio(ToRadio.newBuilder().apply { mqttClientProxyMessage = message }) }
.onEach { message ->
packetHandler.sendToRadio(ToRadio.newBuilder().apply { mqttClientProxyMessage = message })
}
.catch { throwable -> radioConfigRepository.setErrorMessage("MqttClientProxy failed: $throwable") }
.launchIn(serviceScope)
}
@ -1865,7 +1752,9 @@ class MeshService :
processQueuedPackets() // send any packets that were queued up
startMqttClientProxy()
serviceBroadcasts.broadcastConnection()
sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { setTimeOnly = currentSecond() })
packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { setTimeOnly = currentSecond() }) {
connectionState
}
sendAnalytics()
reportConnection()
}
@ -1946,7 +1835,7 @@ class MeshService :
debug("Starting config only nonce=$CONFIG_ONLY_NONCE")
sendToRadio(ToRadio.newBuilder().apply { this.wantConfigId = CONFIG_ONLY_NONCE })
packetHandler.sendToRadio(ToRadio.newBuilder().apply { this.wantConfigId = CONFIG_ONLY_NONCE })
}
private fun startNodeInfoOnly() {
@ -1954,7 +1843,7 @@ class MeshService :
debug("Starting node info nonce=$NODE_INFO_ONLY_NONCE")
sendToRadio(ToRadio.newBuilder().apply { this.wantConfigId = NODE_INFO_ONLY_NONCE })
packetHandler.sendToRadio(ToRadio.newBuilder().apply { this.wantConfigId = NODE_INFO_ONLY_NONCE })
}
/** Send a position (typically from our built in GPS) into the mesh. */
@ -1971,7 +1860,7 @@ class MeshService :
handleReceivedPosition(mi.myNodeNum, position)
}
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(idNum).buildMeshPacket(
channel =
if (destNum == null) {
@ -1985,7 +1874,9 @@ class MeshService :
payload = position.toByteString()
this.wantResponse = wantResponse
},
)
) {
connectionState
}
}
} catch (ex: BLEException) {
warn("Ignoring disconnected radio during gps location update")
@ -2012,7 +1903,11 @@ class MeshService :
handleReceivedUser(dest.num, user)
// encapsulate our payload in the proper protobuf and fire it off
sendToRadio(newMeshPacketTo(dest.num).buildAdminPacket(id = packetId) { setOwner = user })
packetHandler.sendToRadio(
newMeshPacketTo(dest.num).buildAdminPacket(id = packetId) { setOwner = user },
) {
connectionState
}
}
}
@ -2051,16 +1946,22 @@ class MeshService :
}
private fun importContact(contact: AdminProtos.SharedContact) {
sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { addContact = contact })
packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { addContact = contact }) {
connectionState
}
handleReceivedUser(contact.nodeNum, contact.user)
}
private fun getDeviceMetadata(destNum: Int) = toRemoteExceptions {
sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(wantResponse = true) { getDeviceMetadataRequest = true })
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(wantResponse = true) { getDeviceMetadataRequest = true },
) {
connectionState
}
}
private fun favoriteNode(node: Node) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(myNodeNum).buildAdminPacket {
if (node.isFavorite) {
debug("removing node ${node.num} from favorite list")
@ -2070,12 +1971,14 @@ class MeshService :
setFavoriteNode = node.num
}
},
)
) {
connectionState
}
updateNodeInfo(node.num) { it.isFavorite = !node.isFavorite }
}
private fun ignoreNode(node: Node) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(myNodeNum).buildAdminPacket {
if (node.isIgnored) {
debug("removing node ${node.num} from ignore list")
@ -2085,7 +1988,9 @@ class MeshService :
setIgnoredNode = node.num
}
},
)
) {
connectionState
}
updateNodeInfo(node.num) { it.isIgnored = !node.isIgnored }
}
@ -2101,7 +2006,7 @@ class MeshService :
portnumValue = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE
payload = ByteString.copyFrom(reaction.emoji.encodeToByteArray())
}
sendToRadio(packet)
packetHandler.sendToRadio(packet) { connectionState }
rememberReaction(packet.copy { from = myNodeNum })
}
@ -2197,9 +2102,11 @@ class MeshService :
}
override fun getRemoteOwner(id: Int, destNum: Int) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) { getOwnerRequest = true },
)
) {
connectionState
}
}
override fun send(p: DataPacket) {
@ -2259,12 +2166,14 @@ class MeshService :
override fun setRemoteConfig(id: Int, num: Int, payload: ByteArray) = toRemoteExceptions {
debug("Setting new radio config!")
val config = ConfigProtos.Config.parseFrom(payload)
sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setConfig = config })
packetHandler.sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setConfig = config }) {
connectionState
}
if (num == myNodeNum) setLocalConfig(config) // Update our local copy
}
override fun getRemoteConfig(id: Int, destNum: Int, config: Int) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) {
if (config == AdminProtos.AdminMessage.ConfigType.SESSIONKEY_CONFIG_VALUE) {
getDeviceMetadataRequest = true
@ -2272,47 +2181,63 @@ class MeshService :
getConfigRequestValue = config
}
},
)
) {
connectionState
}
}
/** Send our current module config to the device */
override fun setModuleConfig(id: Int, num: Int, payload: ByteArray) = toRemoteExceptions {
debug("Setting new module config!")
val config = ModuleConfigProtos.ModuleConfig.parseFrom(payload)
sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setModuleConfig = config })
packetHandler.sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setModuleConfig = config }) {
connectionState
}
if (num == myNodeNum) setLocalModuleConfig(config) // Update our local copy
}
override fun getModuleConfig(id: Int, destNum: Int, config: Int) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) {
getModuleConfigRequestValue = config
},
)
) {
connectionState
}
}
override fun setRingtone(destNum: Int, ringtone: String) = toRemoteExceptions {
sendToRadio(newMeshPacketTo(destNum).buildAdminPacket { setRingtoneMessage = ringtone })
packetHandler.sendToRadio(newMeshPacketTo(destNum).buildAdminPacket { setRingtoneMessage = ringtone }) {
connectionState
}
}
override fun getRingtone(id: Int, destNum: Int) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) {
getRingtoneRequest = true
},
)
) {
connectionState
}
}
override fun setCannedMessages(destNum: Int, messages: String) = toRemoteExceptions {
sendToRadio(newMeshPacketTo(destNum).buildAdminPacket { setCannedMessageModuleMessages = messages })
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket { setCannedMessageModuleMessages = messages },
) {
connectionState
}
}
override fun getCannedMessages(id: Int, destNum: Int) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) {
getCannedMessageModuleMessagesRequest = true
},
)
) {
connectionState
}
}
override fun setChannel(payload: ByteArray?) = toRemoteExceptions {
@ -2321,23 +2246,31 @@ class MeshService :
override fun setRemoteChannel(id: Int, num: Int, payload: ByteArray?) = toRemoteExceptions {
val channel = ChannelProtos.Channel.parseFrom(payload)
sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setChannel = channel })
packetHandler.sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setChannel = channel }) {
connectionState
}
}
override fun getRemoteChannel(id: Int, destNum: Int, index: Int) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) {
getChannelRequest = index + 1
},
)
) {
connectionState
}
}
override fun beginEditSettings() = toRemoteExceptions {
sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { beginEditSettings = true })
packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { beginEditSettings = true }) {
connectionState
}
}
override fun commitEditSettings() = toRemoteExceptions {
sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { commitEditSettings = true })
packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { commitEditSettings = true }) {
connectionState
}
}
override fun getChannelSet(): ByteArray = toRemoteExceptions { this@MeshService.channelSet.toByteArray() }
@ -2361,18 +2294,22 @@ class MeshService :
override fun removeByNodenum(requestId: Int, nodeNum: Int) = toRemoteExceptions {
nodeDBbyNodeNum.remove(nodeNum)
sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { removeByNodenum = nodeNum })
packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { removeByNodenum = nodeNum }) {
connectionState
}
}
override fun requestUserInfo(destNum: Int) = toRemoteExceptions {
if (destNum != myNodeNum) {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildMeshPacket(channel = nodeDBbyNodeNum[destNum]?.channel ?: 0) {
portnumValue = Portnums.PortNum.NODEINFO_APP_VALUE
wantResponse = true
payload = nodeDBbyNodeNum[myNodeNum]!!.user.toByteString()
},
)
) {
connectionState
}
}
}
@ -2403,7 +2340,7 @@ class MeshService :
time = currentSecond()
}
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildMeshPacket(
channel = nodeDBbyNodeNum[destNum]?.channel ?: 0,
priority = MeshPacket.Priority.BACKGROUND,
@ -2412,7 +2349,9 @@ class MeshService :
payload = meshPosition.toByteString()
wantResponse = true
},
)
) {
connectionState
}
}
}
@ -2422,7 +2361,7 @@ class MeshService :
longitudeI = Position.degI(position.longitude)
altitude = position.altitude
}
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket {
if (position != Position(0.0, 0.0, 0)) {
setFixedPosition = pos
@ -2430,12 +2369,14 @@ class MeshService :
removeFixedPosition = true
}
},
)
) {
connectionState
}
updateNodeInfo(destNum) { it.setPosition(pos, currentSecond()) }
}
override fun requestTraceroute(requestId: Int, destNum: Int) = toRemoteExceptions {
sendToRadio(
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildMeshPacket(
wantAck = true,
id = requestId,
@ -2444,23 +2385,41 @@ class MeshService :
portnumValue = Portnums.PortNum.TRACEROUTE_APP_VALUE
wantResponse = true
},
)
) {
connectionState
}
}
override fun requestShutdown(requestId: Int, destNum: Int) = toRemoteExceptions {
sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { shutdownSeconds = 5 })
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { shutdownSeconds = 5 },
) {
connectionState
}
}
override fun requestReboot(requestId: Int, destNum: Int) = toRemoteExceptions {
sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { rebootSeconds = 5 })
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { rebootSeconds = 5 },
) {
connectionState
}
}
override fun requestFactoryReset(requestId: Int, destNum: Int) = toRemoteExceptions {
sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { factoryResetDevice = 1 })
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { factoryResetDevice = 1 },
) {
connectionState
}
}
override fun requestNodedbReset(requestId: Int, destNum: Int) = toRemoteExceptions {
sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { nodedbReset = 1 })
packetHandler.sendToRadio(
newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { nodedbReset = 1 },
) {
connectionState
}
}
}
}

View file

@ -0,0 +1,192 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import com.geeksville.mesh.DataPacket
import com.geeksville.mesh.MeshProtos
import com.geeksville.mesh.MeshProtos.MeshPacket
import com.geeksville.mesh.MeshProtos.ToRadio
import com.geeksville.mesh.MessageStatus
import com.geeksville.mesh.android.BuildUtils.debug
import com.geeksville.mesh.android.BuildUtils.errormsg
import com.geeksville.mesh.android.BuildUtils.info
import com.geeksville.mesh.concurrent.handledLaunch
import com.geeksville.mesh.database.MeshLogRepository
import com.geeksville.mesh.database.PacketRepository
import com.geeksville.mesh.database.entity.MeshLog
import com.geeksville.mesh.fromRadio
import com.geeksville.mesh.repository.radio.RadioInterfaceService
import com.geeksville.mesh.util.toOneLineString
import com.geeksville.mesh.util.toPIIString
import dagger.Lazy
import java8.util.concurrent.CompletableFuture
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeoutOrNull
import java.util.UUID
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class PacketHandler(
private val packetRepository: Lazy<PacketRepository>,
private val serviceBroadcasts: MeshServiceBroadcasts,
private val radioInterfaceService: RadioInterfaceService,
private val meshLogRepository: Lazy<MeshLogRepository>,
) {
private var queueJob: Job? = null
private val scope = CoroutineScope(Dispatchers.IO)
private val queuedPackets = ConcurrentLinkedQueue<MeshPacket>()
private val queueResponse = mutableMapOf<Int, CompletableFuture<Boolean>>()
/**
* Send a command/packet to our radio. But cope with the possibility that we might start up before we are fully
* bound to the RadioInterfaceService
*/
fun sendToRadio(p: ToRadio.Builder) {
val built = p.build()
debug("Sending to radio ${built.toPIIString()}")
val b = built.toByteArray()
radioInterfaceService.sendToRadio(b)
changeStatus(p.packet.id, MessageStatus.ENROUTE)
if (p.packet.hasDecoded()) {
val packetToSave =
MeshLog(
uuid = UUID.randomUUID().toString(),
message_type = "Packet",
received_date = System.currentTimeMillis(),
raw_message = p.packet.toString(),
fromNum = p.packet.from,
portNum = p.packet.decoded.portnumValue,
fromRadio = fromRadio { packet = p.packet },
)
insertMeshLog(packetToSave)
}
}
/**
* Send a mesh packet to the radio, if the radio is not currently connected this function will throw
* NotConnectedException
*/
fun sendToRadio(packet: MeshPacket, getConnectionState: () -> ConnectionState) {
queuedPackets.add(packet)
startPacketQueue(getConnectionState)
}
fun stopPacketQueue() {
if (queueJob?.isActive == true) {
info("Stopping packet queueJob")
queueJob?.cancel()
queueJob = null
queuedPackets.clear()
queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(false)
queueResponse.clear()
}
}
fun handleQueueStatus(queueStatus: MeshProtos.QueueStatus) {
debug("queueStatus ${queueStatus.toOneLineString()}")
val (success, isFull, requestId) = with(queueStatus) { Triple(res == 0, free == 0, meshPacketId) }
if (success && isFull) return // Queue is full, wait for free != 0
if (requestId != 0) {
queueResponse.remove(requestId)?.complete(success)
} else {
queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(success)
}
}
fun removeResponse(dataRequestId: Int, complete: Boolean) {
queueResponse.remove(dataRequestId)?.complete(complete)
}
@Suppress("TooGenericExceptionCaught", "SwallowedException")
private fun startPacketQueue(getConnectionState: () -> ConnectionState) {
if (queueJob?.isActive == true) return
queueJob =
scope.handledLaunch {
debug("packet queueJob started")
while (getConnectionState() == ConnectionState.CONNECTED) {
// take the first packet from the queue head
val packet = queuedPackets.poll() ?: break
try {
// send packet to the radio and wait for response
val response = sendPacket(packet, getConnectionState)
debug("queueJob packet id=${packet.id.toUInt()} waiting")
val success = response.get(2, TimeUnit.MINUTES)
debug("queueJob packet id=${packet.id.toUInt()} success $success")
} catch (e: TimeoutException) {
debug("queueJob packet id=${packet.id.toUInt()} timeout")
} catch (e: Exception) {
debug("queueJob packet id=${packet.id.toUInt()} failed")
}
}
}
}
/** Change the status on a DataPacket and update watchers */
private fun changeStatus(packetId: Int, m: MessageStatus) = scope.handledLaunch {
if (packetId != 0) {
getDataPacketById(packetId)?.let { p ->
if (p.status == m) return@handledLaunch
packetRepository.get().updateMessageStatus(p, m)
serviceBroadcasts.broadcastMessageStatus(packetId, m)
}
}
}
@Suppress("MagicNumber")
private suspend fun getDataPacketById(packetId: Int): DataPacket? = withTimeoutOrNull(1000) {
var dataPacket: DataPacket? = null
while (dataPacket == null) {
dataPacket = packetRepository.get().getPacketById(packetId)?.data
if (dataPacket == null) delay(100)
}
dataPacket
}
@Suppress("TooGenericExceptionCaught")
private fun sendPacket(packet: MeshPacket, getConnectionState: () -> ConnectionState): CompletableFuture<Boolean> {
// send the packet to the radio and return a CompletableFuture that will be completed with
// the result
val future = CompletableFuture<Boolean>()
queueResponse[packet.id] = future
try {
if (getConnectionState() != ConnectionState.CONNECTED) throw RadioNotConnectedException()
sendToRadio(ToRadio.newBuilder().apply { this.packet = packet })
} catch (ex: Exception) {
errormsg("sendToRadio error:", ex)
future.complete(false)
}
return future
}
private fun insertMeshLog(packetToSave: MeshLog) {
scope.handledLaunch {
// Do not log, because might contain PII
// info("insert: ${packetToSave.message_type} =
// ${packetToSave.raw_message.toOneLineString()}")
meshLogRepository.get().insert(packetToSave)
}
}
}