/* * Copyright (c) 2025-2026 Meshtastic LLC * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ package com.geeksville.mesh.service import co.touchlab.kermit.Logger import com.geeksville.mesh.concurrent.handledLaunch import com.geeksville.mesh.repository.radio.RadioInterfaceService import dagger.Lazy import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.delay import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeoutOrNull import org.meshtastic.core.data.repository.MeshLogRepository import org.meshtastic.core.data.repository.PacketRepository import org.meshtastic.core.database.entity.MeshLog import org.meshtastic.core.model.DataPacket import org.meshtastic.core.model.MessageStatus import org.meshtastic.core.model.util.toOneLineString import org.meshtastic.core.model.util.toPIIString import org.meshtastic.core.service.ConnectionState import org.meshtastic.proto.MeshProtos import org.meshtastic.proto.MeshProtos.MeshPacket import org.meshtastic.proto.MeshProtos.ToRadio import org.meshtastic.proto.fromRadio import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentLinkedQueue import javax.inject.Inject import javax.inject.Singleton @Suppress("TooManyFunctions") @Singleton class PacketHandler @Inject constructor( private val packetRepository: Lazy, private val serviceBroadcasts: MeshServiceBroadcasts, private val radioInterfaceService: RadioInterfaceService, private val meshLogRepository: Lazy, private val connectionStateHolder: ConnectionStateHandler, ) { companion object { private const val TIMEOUT_MS = 5000L // Increased from 250ms to be more tolerant } private var queueJob: Job? = null private var scope: CoroutineScope = CoroutineScope(Dispatchers.IO) private val queuedPackets = ConcurrentLinkedQueue() private val queueResponse = ConcurrentHashMap>() fun start(scope: CoroutineScope) { this.scope = scope } /** * Send a command/packet to our radio. But cope with the possibility that we might start up before we are fully * bound to the RadioInterfaceService */ fun sendToRadio(p: ToRadio.Builder) { val built = p.build() Logger.d { "Sending to radio ${built.toPIIString()}" } val b = built.toByteArray() radioInterfaceService.sendToRadio(b) changeStatus(p.packet.id, MessageStatus.ENROUTE) if (p.packet.hasDecoded()) { val packetToSave = MeshLog( uuid = UUID.randomUUID().toString(), message_type = "Packet", received_date = System.currentTimeMillis(), raw_message = p.packet.toString(), fromNum = p.packet.from, portNum = p.packet.decoded.portnumValue, fromRadio = fromRadio { packet = p.packet }, ) insertMeshLog(packetToSave) } } /** * Send a mesh packet to the radio, if the radio is not currently connected this function will throw * NotConnectedException */ fun sendToRadio(packet: MeshPacket) { queuedPackets.add(packet) startPacketQueue() } fun stopPacketQueue() { if (queueJob?.isActive == true) { Logger.i { "Stopping packet queueJob" } queueJob?.cancel() queueJob = null queuedPackets.clear() queueResponse.entries.lastOrNull { !it.value.isCompleted }?.value?.complete(false) queueResponse.clear() } } fun handleQueueStatus(queueStatus: MeshProtos.QueueStatus) { Logger.d { "[queueStatus] ${queueStatus.toOneLineString()}" } val (success, isFull, requestId) = with(queueStatus) { Triple(res == 0, free == 0, meshPacketId) } if (success && isFull) return // Queue is full, wait for free != 0 if (requestId != 0) { queueResponse.remove(requestId)?.complete(success) } else { // This is slightly suboptimal but matches legacy behavior for packets without IDs queueResponse.values.firstOrNull { !it.isCompleted }?.complete(success) } } fun removeResponse(dataRequestId: Int, complete: Boolean) { queueResponse.remove(dataRequestId)?.complete(complete) } @Suppress("TooGenericExceptionCaught", "SwallowedException") private fun startPacketQueue() { if (queueJob?.isActive == true) return queueJob = scope.handledLaunch { Logger.d { "packet queueJob started" } while (connectionStateHolder.connectionState.value == ConnectionState.Connected) { // take the first packet from the queue head val packet = queuedPackets.poll() ?: break try { // send packet to the radio and wait for response val response = sendPacket(packet) Logger.d { "queueJob packet id=${packet.id.toUInt()} waiting" } val success = withTimeout(TIMEOUT_MS) { response.await() } Logger.d { "queueJob packet id=${packet.id.toUInt()} success $success" } } catch (e: TimeoutCancellationException) { Logger.d { "queueJob packet id=${packet.id.toUInt()} timeout" } } catch (e: Exception) { Logger.d { "queueJob packet id=${packet.id.toUInt()} failed" } } finally { queueResponse.remove(packet.id) } } } } /** Change the status on a DataPacket and update watchers */ private fun changeStatus(packetId: Int, m: MessageStatus) = scope.handledLaunch { if (packetId != 0) { getDataPacketById(packetId)?.let { p -> if (p.status == m) return@handledLaunch packetRepository.get().updateMessageStatus(p, m) serviceBroadcasts.broadcastMessageStatus(packetId, m) } } } @Suppress("MagicNumber") private suspend fun getDataPacketById(packetId: Int): DataPacket? = withTimeoutOrNull(1000) { var dataPacket: DataPacket? = null while (dataPacket == null) { dataPacket = packetRepository.get().getPacketById(packetId)?.data if (dataPacket == null) delay(100) } dataPacket } @Suppress("TooGenericExceptionCaught") private fun sendPacket(packet: MeshPacket): CompletableDeferred { // send the packet to the radio and return a CompletableDeferred that will be completed with // the result val deferred = CompletableDeferred() queueResponse[packet.id] = deferred try { if (connectionStateHolder.connectionState.value != ConnectionState.Connected) { throw RadioNotConnectedException() } sendToRadio(ToRadio.newBuilder().apply { this.packet = packet }) } catch (ex: Exception) { Logger.e(ex) { "sendToRadio error: ${ex.message}" } deferred.complete(false) } return deferred } private fun insertMeshLog(packetToSave: MeshLog) { scope.handledLaunch { // Do not log, because might contain PII // info("insert: ${packetToSave.message_type} = // ${packetToSave.raw_message.toOneLineString()}") meshLogRepository.get().insert(packetToSave) } } }