/* * Copyright (c) 2025 Meshtastic LLC * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ package com.geeksville.mesh.service import com.geeksville.mesh.MeshProtos import com.geeksville.mesh.MeshProtos.MeshPacket import com.geeksville.mesh.MeshProtos.ToRadio import com.geeksville.mesh.concurrent.handledLaunch import com.geeksville.mesh.fromRadio import com.geeksville.mesh.repository.radio.RadioInterfaceService import dagger.Lazy import java8.util.concurrent.CompletableFuture import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.withTimeoutOrNull import org.meshtastic.core.data.repository.MeshLogRepository import org.meshtastic.core.data.repository.PacketRepository import org.meshtastic.core.database.entity.MeshLog import org.meshtastic.core.model.DataPacket import org.meshtastic.core.model.MessageStatus import org.meshtastic.core.model.util.toOneLineString import org.meshtastic.core.model.util.toPIIString import org.meshtastic.core.service.ConnectionState import timber.log.Timber import java.util.UUID import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import javax.inject.Inject import javax.inject.Singleton @Singleton class PacketHandler @Inject constructor( private val packetRepository: Lazy, private val serviceBroadcasts: MeshServiceBroadcasts, private val radioInterfaceService: RadioInterfaceService, private val meshLogRepository: Lazy, private val connectionStateHolder: MeshServiceConnectionStateHolder, ) { private var queueJob: Job? = null private val scope = CoroutineScope(Dispatchers.IO) private val queuedPackets = ConcurrentLinkedQueue() private val queueResponse = mutableMapOf>() /** * Send a command/packet to our radio. But cope with the possibility that we might start up before we are fully * bound to the RadioInterfaceService */ fun sendToRadio(p: ToRadio.Builder) { val built = p.build() Timber.d("Sending to radio ${built.toPIIString()}") val b = built.toByteArray() radioInterfaceService.sendToRadio(b) changeStatus(p.packet.id, MessageStatus.ENROUTE) if (p.packet.hasDecoded()) { val packetToSave = MeshLog( uuid = UUID.randomUUID().toString(), message_type = "Packet", received_date = System.currentTimeMillis(), raw_message = p.packet.toString(), fromNum = p.packet.from, portNum = p.packet.decoded.portnumValue, fromRadio = fromRadio { packet = p.packet }, ) insertMeshLog(packetToSave) } } /** * Send a mesh packet to the radio, if the radio is not currently connected this function will throw * NotConnectedException */ fun sendToRadio(packet: MeshPacket) { queuedPackets.add(packet) startPacketQueue() } fun stopPacketQueue() { if (queueJob?.isActive == true) { Timber.i("Stopping packet queueJob") queueJob?.cancel() queueJob = null queuedPackets.clear() queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(false) queueResponse.clear() } } fun handleQueueStatus(queueStatus: MeshProtos.QueueStatus) { Timber.d("queueStatus ${queueStatus.toOneLineString()}") val (success, isFull, requestId) = with(queueStatus) { Triple(res == 0, free == 0, meshPacketId) } if (success && isFull) return // Queue is full, wait for free != 0 if (requestId != 0) { queueResponse.remove(requestId)?.complete(success) } else { queueResponse.entries.lastOrNull { !it.value.isDone }?.value?.complete(success) } } fun removeResponse(dataRequestId: Int, complete: Boolean) { queueResponse.remove(dataRequestId)?.complete(complete) } @Suppress("TooGenericExceptionCaught", "SwallowedException") private fun startPacketQueue() { if (queueJob?.isActive == true) return queueJob = scope.handledLaunch { Timber.d("packet queueJob started") while (connectionStateHolder.getState() == ConnectionState.CONNECTED) { // take the first packet from the queue head val packet = queuedPackets.poll() ?: break try { // send packet to the radio and wait for response val response = sendPacket(packet) Timber.d("queueJob packet id=${packet.id.toUInt()} waiting") val success = response.get(2, TimeUnit.MINUTES) Timber.d("queueJob packet id=${packet.id.toUInt()} success $success") } catch (e: TimeoutException) { Timber.d("queueJob packet id=${packet.id.toUInt()} timeout") } catch (e: Exception) { Timber.d("queueJob packet id=${packet.id.toUInt()} failed") } } } } /** Change the status on a DataPacket and update watchers */ private fun changeStatus(packetId: Int, m: MessageStatus) = scope.handledLaunch { if (packetId != 0) { getDataPacketById(packetId)?.let { p -> if (p.status == m) return@handledLaunch packetRepository.get().updateMessageStatus(p, m) serviceBroadcasts.broadcastMessageStatus(packetId, m) } } } @Suppress("MagicNumber") private suspend fun getDataPacketById(packetId: Int): DataPacket? = withTimeoutOrNull(1000) { var dataPacket: DataPacket? = null while (dataPacket == null) { dataPacket = packetRepository.get().getPacketById(packetId)?.data if (dataPacket == null) delay(100) } dataPacket } @Suppress("TooGenericExceptionCaught") private fun sendPacket(packet: MeshPacket): CompletableFuture { // send the packet to the radio and return a CompletableFuture that will be completed with // the result val future = CompletableFuture() queueResponse[packet.id] = future try { if (connectionStateHolder.getState() != ConnectionState.CONNECTED) throw RadioNotConnectedException() sendToRadio(ToRadio.newBuilder().apply { this.packet = packet }) } catch (ex: Exception) { Timber.e("sendToRadio error:", ex) future.complete(false) } return future } private fun insertMeshLog(packetToSave: MeshLog) { scope.handledLaunch { // Do not log, because might contain PII // info("insert: ${packetToSave.message_type} = // ${packetToSave.raw_message.toOneLineString()}") meshLogRepository.get().insert(packetToSave) } } }