From c26d76f60b35de1893c0bc677c9f5a14b40f56ff Mon Sep 17 00:00:00 2001 From: Phil Oliver <3497406+poliver@users.noreply.github.com> Date: Sun, 24 Aug 2025 08:15:32 -0400 Subject: [PATCH] `MeshService`: extract basic packet handling (#2813) --- .../geeksville/mesh/service/MeshService.kt | 313 ++++++++---------- .../geeksville/mesh/service/PacketHandler.kt | 192 +++++++++++ 2 files changed, 328 insertions(+), 177 deletions(-) create mode 100644 app/src/main/java/com/geeksville/mesh/service/PacketHandler.kt diff --git a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt index fd1eab833..b9fb3f889 100644 --- a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt +++ b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt @@ -91,7 +91,6 @@ import com.google.protobuf.ByteString import com.google.protobuf.InvalidProtocolBufferException import dagger.Lazy import dagger.hilt.android.AndroidEntryPoint -import java8.util.concurrent.CompletableFuture import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers @@ -103,13 +102,9 @@ import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.withTimeoutOrNull import java.util.Random import java.util.UUID import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException import javax.inject.Inject import kotlin.math.absoluteValue @@ -216,6 +211,8 @@ class MeshService : MeshServiceBroadcasts(this, clientPackages) { connectionState.also { radioConfigRepository.setConnectionState(it) } } + private lateinit var packetHandler: PacketHandler + private val serviceJob = Job() private val serviceScope = CoroutineScope(Dispatchers.IO + serviceJob) private var connectionState = ConnectionState.DISCONNECTED @@ -289,42 +286,6 @@ class MeshService : } } - /** - * Send a command/packet to our radio. But cope with the possibility that we might start up before we are fully - * bound to the RadioInterfaceService - */ - private fun sendToRadio(p: ToRadio.Builder) { - val built = p.build() - debug("Sending to radio ${built.toPIIString()}") - val b = built.toByteArray() - - radioInterfaceService.sendToRadio(b) - changeStatus(p.packet.id, MessageStatus.ENROUTE) - - if (p.packet.hasDecoded()) { - val packetToSave = - MeshLog( - uuid = UUID.randomUUID().toString(), - message_type = "Packet", - received_date = System.currentTimeMillis(), - raw_message = p.packet.toString(), - fromNum = p.packet.from, - portNum = p.packet.decoded.portnumValue, - fromRadio = fromRadio { packet = p.packet }, - ) - insertMeshLog(packetToSave) - } - } - - /** - * Send a mesh packet to the radio, if the radio is not currently connected this function will throw - * NotConnectedException - */ - private fun sendToRadio(packet: MeshPacket) { - queuedPackets.add(packet) - startPacketQueue() - } - private fun showAlertNotification(contactKey: String, dataPacket: DataPacket) { serviceNotifications.showAlertNotification( contactKey, @@ -369,6 +330,14 @@ class MeshService : loadSettings() // Load our last known node DB + packetHandler = + PacketHandler( + packetRepository = packetRepository, + serviceBroadcasts = serviceBroadcasts, + radioInterfaceService = radioInterfaceService, + meshLogRepository = meshLogRepository, + ) + // the rest of our init will happen once we are in radioConnection.onServiceConnected } @@ -848,7 +817,7 @@ class MeshService : } handleAckNak(data.requestId, fromId, u.errorReasonValue) - queueResponse.remove(data.requestId)?.complete(true) + packetHandler.removeResponse(data.requestId, complete = true) } Portnums.PortNum.ADMIN_APP_VALUE -> { @@ -1121,64 +1090,11 @@ class MeshService : } } - private val queuedPackets = ConcurrentLinkedQueue() - private val queueResponse = mutableMapOf>() - private var queueJob: Job? = null - - private fun sendPacket(packet: MeshPacket): CompletableFuture { - // send the packet to the radio and return a CompletableFuture that will be completed with - // the result - val future = CompletableFuture() - queueResponse[packet.id] = future - try { - if (connectionState != ConnectionState.CONNECTED) throw RadioNotConnectedException() - sendToRadio(ToRadio.newBuilder().apply { this.packet = packet }) - } catch (ex: Exception) { - errormsg("sendToRadio error:", ex) - future.complete(false) - } - return future - } - - private fun startPacketQueue() { - if (queueJob?.isActive == true) return - queueJob = - serviceScope.handledLaunch { - debug("packet queueJob started") - while (connectionState == ConnectionState.CONNECTED) { - // take the first packet from the queue head - val packet = queuedPackets.poll() ?: break - try { - // send packet to the radio and wait for response - val response = sendPacket(packet) - debug("queueJob packet id=${packet.id.toUInt()} waiting") - val success = response.get(2, TimeUnit.MINUTES) - debug("queueJob packet id=${packet.id.toUInt()} success $success") - } catch (e: TimeoutException) { - debug("queueJob packet id=${packet.id.toUInt()} timeout") - } catch (e: Exception) { - debug("queueJob packet id=${packet.id.toUInt()} failed") - } - } - } - } - - private fun stopPacketQueue() { - if (queueJob?.isActive == true) { - info("Stopping packet queueJob") - queueJob?.cancel() - queueJob = null - queuedPackets.clear() - queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(false) - queueResponse.clear() - } - } - private fun sendNow(p: DataPacket) { val packet = toMeshPacket(p) p.time = System.currentTimeMillis() // update time to the actual time we started sending // debug("Sending to radio: ${packet.toPIIString()}") - sendToRadio(packet) + packetHandler.sendToRadio(packet) { connectionState } } private fun processQueuedPackets() { @@ -1194,26 +1110,6 @@ class MeshService : offlineSentPackets.removeAll(sentPackets) } - private suspend fun getDataPacketById(packetId: Int): DataPacket? = withTimeoutOrNull(1000) { - var dataPacket: DataPacket? = null - while (dataPacket == null) { - dataPacket = packetRepository.get().getPacketById(packetId)?.data - if (dataPacket == null) delay(100) - } - dataPacket - } - - /** Change the status on a DataPacket and update watchers */ - private fun changeStatus(packetId: Int, m: MessageStatus) = serviceScope.handledLaunch { - if (packetId != 0) { - getDataPacketById(packetId)?.let { p -> - if (p.status == m) return@handledLaunch - packetRepository.get().updateMessageStatus(p, m) - serviceBroadcasts.broadcastMessageStatus(packetId, m) - } - } - } - /** Handle an ack/nak packet by updating sent message status */ private fun handleAckNak(requestId: Int, fromId: String, routingError: Int) { serviceScope.handledLaunch { @@ -1349,7 +1245,7 @@ class MeshService : // Perform all the steps needed once we start waiting for device sleep to complete fun startDeviceSleep() { - stopPacketQueue() + packetHandler.stopPacketQueue() stopLocationRequests() stopMqttClientProxy() @@ -1382,7 +1278,7 @@ class MeshService : } fun startDisconnect() { - stopPacketQueue() + packetHandler.stopPacketQueue() stopLocationRequests() stopMqttClientProxy() @@ -1491,7 +1387,7 @@ class MeshService : handleModuleConfig(proto.moduleConfig) } PayloadVariantCase.QUEUESTATUS -> { proto: MeshProtos.FromRadio -> - handleQueueStatus(proto.queueStatus) + packetHandler.handleQueueStatus((proto.queueStatus)) } PayloadVariantCase.METADATA -> { proto: MeshProtos.FromRadio -> handleMetadata(proto.metadata) } PayloadVariantCase.MQTTCLIENTPROXYMESSAGE -> { proto: MeshProtos.FromRadio -> @@ -1569,17 +1465,6 @@ class MeshService : radioConfigRepository.setStatusMessage("Module config ($moduleCount / $moduleTotal)") } - private fun handleQueueStatus(queueStatus: MeshProtos.QueueStatus) { - debug("queueStatus ${queueStatus.toOneLineString()}") - val (success, isFull, requestId) = with(queueStatus) { Triple(res == 0, free == 0, meshPacketId) } - if (success && isFull) return // Queue is full, wait for free != 0 - if (requestId != 0) { - queueResponse.remove(requestId)?.complete(success) - } else { - queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(success) - } - } - private fun handleChannel(ch: ChannelProtos.Channel) { debug("Received channel ${ch.index}") val packetToSave = @@ -1770,7 +1655,7 @@ class MeshService : serviceNotifications.showClientNotification(notification) // if the future for the originating request is still in the queue, complete as unsuccessful // for now - queueResponse.remove(notification.replyId)?.complete(false) + packetHandler.removeResponse(notification.replyId, complete = false) } private fun handleFileInfo(fileInfo: MeshProtos.FileInfo) { @@ -1844,7 +1729,9 @@ class MeshService : if (moduleConfig.mqtt.enabled && moduleConfig.mqtt.proxyToClientEnabled) { mqttMessageFlow = mqttRepository.proxyMessageFlow - .onEach { message -> sendToRadio(ToRadio.newBuilder().apply { mqttClientProxyMessage = message }) } + .onEach { message -> + packetHandler.sendToRadio(ToRadio.newBuilder().apply { mqttClientProxyMessage = message }) + } .catch { throwable -> radioConfigRepository.setErrorMessage("MqttClientProxy failed: $throwable") } .launchIn(serviceScope) } @@ -1865,7 +1752,9 @@ class MeshService : processQueuedPackets() // send any packets that were queued up startMqttClientProxy() serviceBroadcasts.broadcastConnection() - sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { setTimeOnly = currentSecond() }) + packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { setTimeOnly = currentSecond() }) { + connectionState + } sendAnalytics() reportConnection() } @@ -1946,7 +1835,7 @@ class MeshService : debug("Starting config only nonce=$CONFIG_ONLY_NONCE") - sendToRadio(ToRadio.newBuilder().apply { this.wantConfigId = CONFIG_ONLY_NONCE }) + packetHandler.sendToRadio(ToRadio.newBuilder().apply { this.wantConfigId = CONFIG_ONLY_NONCE }) } private fun startNodeInfoOnly() { @@ -1954,7 +1843,7 @@ class MeshService : debug("Starting node info nonce=$NODE_INFO_ONLY_NONCE") - sendToRadio(ToRadio.newBuilder().apply { this.wantConfigId = NODE_INFO_ONLY_NONCE }) + packetHandler.sendToRadio(ToRadio.newBuilder().apply { this.wantConfigId = NODE_INFO_ONLY_NONCE }) } /** Send a position (typically from our built in GPS) into the mesh. */ @@ -1971,7 +1860,7 @@ class MeshService : handleReceivedPosition(mi.myNodeNum, position) } - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(idNum).buildMeshPacket( channel = if (destNum == null) { @@ -1985,7 +1874,9 @@ class MeshService : payload = position.toByteString() this.wantResponse = wantResponse }, - ) + ) { + connectionState + } } } catch (ex: BLEException) { warn("Ignoring disconnected radio during gps location update") @@ -2012,7 +1903,11 @@ class MeshService : handleReceivedUser(dest.num, user) // encapsulate our payload in the proper protobuf and fire it off - sendToRadio(newMeshPacketTo(dest.num).buildAdminPacket(id = packetId) { setOwner = user }) + packetHandler.sendToRadio( + newMeshPacketTo(dest.num).buildAdminPacket(id = packetId) { setOwner = user }, + ) { + connectionState + } } } @@ -2051,16 +1946,22 @@ class MeshService : } private fun importContact(contact: AdminProtos.SharedContact) { - sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { addContact = contact }) + packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { addContact = contact }) { + connectionState + } handleReceivedUser(contact.nodeNum, contact.user) } private fun getDeviceMetadata(destNum: Int) = toRemoteExceptions { - sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(wantResponse = true) { getDeviceMetadataRequest = true }) + packetHandler.sendToRadio( + newMeshPacketTo(destNum).buildAdminPacket(wantResponse = true) { getDeviceMetadataRequest = true }, + ) { + connectionState + } } private fun favoriteNode(node: Node) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(myNodeNum).buildAdminPacket { if (node.isFavorite) { debug("removing node ${node.num} from favorite list") @@ -2070,12 +1971,14 @@ class MeshService : setFavoriteNode = node.num } }, - ) + ) { + connectionState + } updateNodeInfo(node.num) { it.isFavorite = !node.isFavorite } } private fun ignoreNode(node: Node) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(myNodeNum).buildAdminPacket { if (node.isIgnored) { debug("removing node ${node.num} from ignore list") @@ -2085,7 +1988,9 @@ class MeshService : setIgnoredNode = node.num } }, - ) + ) { + connectionState + } updateNodeInfo(node.num) { it.isIgnored = !node.isIgnored } } @@ -2101,7 +2006,7 @@ class MeshService : portnumValue = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE payload = ByteString.copyFrom(reaction.emoji.encodeToByteArray()) } - sendToRadio(packet) + packetHandler.sendToRadio(packet) { connectionState } rememberReaction(packet.copy { from = myNodeNum }) } @@ -2197,9 +2102,11 @@ class MeshService : } override fun getRemoteOwner(id: Int, destNum: Int) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) { getOwnerRequest = true }, - ) + ) { + connectionState + } } override fun send(p: DataPacket) { @@ -2259,12 +2166,14 @@ class MeshService : override fun setRemoteConfig(id: Int, num: Int, payload: ByteArray) = toRemoteExceptions { debug("Setting new radio config!") val config = ConfigProtos.Config.parseFrom(payload) - sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setConfig = config }) + packetHandler.sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setConfig = config }) { + connectionState + } if (num == myNodeNum) setLocalConfig(config) // Update our local copy } override fun getRemoteConfig(id: Int, destNum: Int, config: Int) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) { if (config == AdminProtos.AdminMessage.ConfigType.SESSIONKEY_CONFIG_VALUE) { getDeviceMetadataRequest = true @@ -2272,47 +2181,63 @@ class MeshService : getConfigRequestValue = config } }, - ) + ) { + connectionState + } } /** Send our current module config to the device */ override fun setModuleConfig(id: Int, num: Int, payload: ByteArray) = toRemoteExceptions { debug("Setting new module config!") val config = ModuleConfigProtos.ModuleConfig.parseFrom(payload) - sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setModuleConfig = config }) + packetHandler.sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setModuleConfig = config }) { + connectionState + } if (num == myNodeNum) setLocalModuleConfig(config) // Update our local copy } override fun getModuleConfig(id: Int, destNum: Int, config: Int) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) { getModuleConfigRequestValue = config }, - ) + ) { + connectionState + } } override fun setRingtone(destNum: Int, ringtone: String) = toRemoteExceptions { - sendToRadio(newMeshPacketTo(destNum).buildAdminPacket { setRingtoneMessage = ringtone }) + packetHandler.sendToRadio(newMeshPacketTo(destNum).buildAdminPacket { setRingtoneMessage = ringtone }) { + connectionState + } } override fun getRingtone(id: Int, destNum: Int) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) { getRingtoneRequest = true }, - ) + ) { + connectionState + } } override fun setCannedMessages(destNum: Int, messages: String) = toRemoteExceptions { - sendToRadio(newMeshPacketTo(destNum).buildAdminPacket { setCannedMessageModuleMessages = messages }) + packetHandler.sendToRadio( + newMeshPacketTo(destNum).buildAdminPacket { setCannedMessageModuleMessages = messages }, + ) { + connectionState + } } override fun getCannedMessages(id: Int, destNum: Int) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) { getCannedMessageModuleMessagesRequest = true }, - ) + ) { + connectionState + } } override fun setChannel(payload: ByteArray?) = toRemoteExceptions { @@ -2321,23 +2246,31 @@ class MeshService : override fun setRemoteChannel(id: Int, num: Int, payload: ByteArray?) = toRemoteExceptions { val channel = ChannelProtos.Channel.parseFrom(payload) - sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setChannel = channel }) + packetHandler.sendToRadio(newMeshPacketTo(num).buildAdminPacket(id = id) { setChannel = channel }) { + connectionState + } } override fun getRemoteChannel(id: Int, destNum: Int, index: Int) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildAdminPacket(id = id, wantResponse = true) { getChannelRequest = index + 1 }, - ) + ) { + connectionState + } } override fun beginEditSettings() = toRemoteExceptions { - sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { beginEditSettings = true }) + packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { beginEditSettings = true }) { + connectionState + } } override fun commitEditSettings() = toRemoteExceptions { - sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { commitEditSettings = true }) + packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { commitEditSettings = true }) { + connectionState + } } override fun getChannelSet(): ByteArray = toRemoteExceptions { this@MeshService.channelSet.toByteArray() } @@ -2361,18 +2294,22 @@ class MeshService : override fun removeByNodenum(requestId: Int, nodeNum: Int) = toRemoteExceptions { nodeDBbyNodeNum.remove(nodeNum) - sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { removeByNodenum = nodeNum }) + packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { removeByNodenum = nodeNum }) { + connectionState + } } override fun requestUserInfo(destNum: Int) = toRemoteExceptions { if (destNum != myNodeNum) { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildMeshPacket(channel = nodeDBbyNodeNum[destNum]?.channel ?: 0) { portnumValue = Portnums.PortNum.NODEINFO_APP_VALUE wantResponse = true payload = nodeDBbyNodeNum[myNodeNum]!!.user.toByteString() }, - ) + ) { + connectionState + } } } @@ -2403,7 +2340,7 @@ class MeshService : time = currentSecond() } - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildMeshPacket( channel = nodeDBbyNodeNum[destNum]?.channel ?: 0, priority = MeshPacket.Priority.BACKGROUND, @@ -2412,7 +2349,9 @@ class MeshService : payload = meshPosition.toByteString() wantResponse = true }, - ) + ) { + connectionState + } } } @@ -2422,7 +2361,7 @@ class MeshService : longitudeI = Position.degI(position.longitude) altitude = position.altitude } - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildAdminPacket { if (position != Position(0.0, 0.0, 0)) { setFixedPosition = pos @@ -2430,12 +2369,14 @@ class MeshService : removeFixedPosition = true } }, - ) + ) { + connectionState + } updateNodeInfo(destNum) { it.setPosition(pos, currentSecond()) } } override fun requestTraceroute(requestId: Int, destNum: Int) = toRemoteExceptions { - sendToRadio( + packetHandler.sendToRadio( newMeshPacketTo(destNum).buildMeshPacket( wantAck = true, id = requestId, @@ -2444,23 +2385,41 @@ class MeshService : portnumValue = Portnums.PortNum.TRACEROUTE_APP_VALUE wantResponse = true }, - ) + ) { + connectionState + } } override fun requestShutdown(requestId: Int, destNum: Int) = toRemoteExceptions { - sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { shutdownSeconds = 5 }) + packetHandler.sendToRadio( + newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { shutdownSeconds = 5 }, + ) { + connectionState + } } override fun requestReboot(requestId: Int, destNum: Int) = toRemoteExceptions { - sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { rebootSeconds = 5 }) + packetHandler.sendToRadio( + newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { rebootSeconds = 5 }, + ) { + connectionState + } } override fun requestFactoryReset(requestId: Int, destNum: Int) = toRemoteExceptions { - sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { factoryResetDevice = 1 }) + packetHandler.sendToRadio( + newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { factoryResetDevice = 1 }, + ) { + connectionState + } } override fun requestNodedbReset(requestId: Int, destNum: Int) = toRemoteExceptions { - sendToRadio(newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { nodedbReset = 1 }) + packetHandler.sendToRadio( + newMeshPacketTo(destNum).buildAdminPacket(id = requestId) { nodedbReset = 1 }, + ) { + connectionState + } } } } diff --git a/app/src/main/java/com/geeksville/mesh/service/PacketHandler.kt b/app/src/main/java/com/geeksville/mesh/service/PacketHandler.kt new file mode 100644 index 000000000..02c5b2b51 --- /dev/null +++ b/app/src/main/java/com/geeksville/mesh/service/PacketHandler.kt @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2025 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.geeksville.mesh.service + +import com.geeksville.mesh.DataPacket +import com.geeksville.mesh.MeshProtos +import com.geeksville.mesh.MeshProtos.MeshPacket +import com.geeksville.mesh.MeshProtos.ToRadio +import com.geeksville.mesh.MessageStatus +import com.geeksville.mesh.android.BuildUtils.debug +import com.geeksville.mesh.android.BuildUtils.errormsg +import com.geeksville.mesh.android.BuildUtils.info +import com.geeksville.mesh.concurrent.handledLaunch +import com.geeksville.mesh.database.MeshLogRepository +import com.geeksville.mesh.database.PacketRepository +import com.geeksville.mesh.database.entity.MeshLog +import com.geeksville.mesh.fromRadio +import com.geeksville.mesh.repository.radio.RadioInterfaceService +import com.geeksville.mesh.util.toOneLineString +import com.geeksville.mesh.util.toPIIString +import dagger.Lazy +import java8.util.concurrent.CompletableFuture +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.withTimeoutOrNull +import java.util.UUID +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException + +class PacketHandler( + private val packetRepository: Lazy, + private val serviceBroadcasts: MeshServiceBroadcasts, + private val radioInterfaceService: RadioInterfaceService, + private val meshLogRepository: Lazy, +) { + + private var queueJob: Job? = null + private val scope = CoroutineScope(Dispatchers.IO) + + private val queuedPackets = ConcurrentLinkedQueue() + private val queueResponse = mutableMapOf>() + + /** + * Send a command/packet to our radio. But cope with the possibility that we might start up before we are fully + * bound to the RadioInterfaceService + */ + fun sendToRadio(p: ToRadio.Builder) { + val built = p.build() + debug("Sending to radio ${built.toPIIString()}") + val b = built.toByteArray() + + radioInterfaceService.sendToRadio(b) + changeStatus(p.packet.id, MessageStatus.ENROUTE) + + if (p.packet.hasDecoded()) { + val packetToSave = + MeshLog( + uuid = UUID.randomUUID().toString(), + message_type = "Packet", + received_date = System.currentTimeMillis(), + raw_message = p.packet.toString(), + fromNum = p.packet.from, + portNum = p.packet.decoded.portnumValue, + fromRadio = fromRadio { packet = p.packet }, + ) + insertMeshLog(packetToSave) + } + } + + /** + * Send a mesh packet to the radio, if the radio is not currently connected this function will throw + * NotConnectedException + */ + fun sendToRadio(packet: MeshPacket, getConnectionState: () -> ConnectionState) { + queuedPackets.add(packet) + startPacketQueue(getConnectionState) + } + + fun stopPacketQueue() { + if (queueJob?.isActive == true) { + info("Stopping packet queueJob") + queueJob?.cancel() + queueJob = null + queuedPackets.clear() + queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(false) + queueResponse.clear() + } + } + + fun handleQueueStatus(queueStatus: MeshProtos.QueueStatus) { + debug("queueStatus ${queueStatus.toOneLineString()}") + val (success, isFull, requestId) = with(queueStatus) { Triple(res == 0, free == 0, meshPacketId) } + if (success && isFull) return // Queue is full, wait for free != 0 + if (requestId != 0) { + queueResponse.remove(requestId)?.complete(success) + } else { + queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(success) + } + } + + fun removeResponse(dataRequestId: Int, complete: Boolean) { + queueResponse.remove(dataRequestId)?.complete(complete) + } + + @Suppress("TooGenericExceptionCaught", "SwallowedException") + private fun startPacketQueue(getConnectionState: () -> ConnectionState) { + if (queueJob?.isActive == true) return + queueJob = + scope.handledLaunch { + debug("packet queueJob started") + while (getConnectionState() == ConnectionState.CONNECTED) { + // take the first packet from the queue head + val packet = queuedPackets.poll() ?: break + try { + // send packet to the radio and wait for response + val response = sendPacket(packet, getConnectionState) + debug("queueJob packet id=${packet.id.toUInt()} waiting") + val success = response.get(2, TimeUnit.MINUTES) + debug("queueJob packet id=${packet.id.toUInt()} success $success") + } catch (e: TimeoutException) { + debug("queueJob packet id=${packet.id.toUInt()} timeout") + } catch (e: Exception) { + debug("queueJob packet id=${packet.id.toUInt()} failed") + } + } + } + } + + /** Change the status on a DataPacket and update watchers */ + private fun changeStatus(packetId: Int, m: MessageStatus) = scope.handledLaunch { + if (packetId != 0) { + getDataPacketById(packetId)?.let { p -> + if (p.status == m) return@handledLaunch + packetRepository.get().updateMessageStatus(p, m) + serviceBroadcasts.broadcastMessageStatus(packetId, m) + } + } + } + + @Suppress("MagicNumber") + private suspend fun getDataPacketById(packetId: Int): DataPacket? = withTimeoutOrNull(1000) { + var dataPacket: DataPacket? = null + while (dataPacket == null) { + dataPacket = packetRepository.get().getPacketById(packetId)?.data + if (dataPacket == null) delay(100) + } + dataPacket + } + + @Suppress("TooGenericExceptionCaught") + private fun sendPacket(packet: MeshPacket, getConnectionState: () -> ConnectionState): CompletableFuture { + // send the packet to the radio and return a CompletableFuture that will be completed with + // the result + val future = CompletableFuture() + queueResponse[packet.id] = future + try { + if (getConnectionState() != ConnectionState.CONNECTED) throw RadioNotConnectedException() + sendToRadio(ToRadio.newBuilder().apply { this.packet = packet }) + } catch (ex: Exception) { + errormsg("sendToRadio error:", ex) + future.complete(false) + } + return future + } + + private fun insertMeshLog(packetToSave: MeshLog) { + scope.handledLaunch { + // Do not log, because might contain PII + // info("insert: ${packetToSave.message_type} = + // ${packetToSave.raw_message.toOneLineString()}") + meshLogRepository.get().insert(packetToSave) + } + } +}