/* * Copyright (c) 2025-2026 Meshtastic LLC * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ package com.geeksville.mesh.service import android.util.Log import co.touchlab.kermit.Logger import com.geeksville.mesh.BuildConfig import com.geeksville.mesh.concurrent.handledLaunch import com.geeksville.mesh.repository.radio.InterfaceId import com.meshtastic.core.strings.getString import dagger.Lazy import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.first import org.meshtastic.core.analytics.DataPair import org.meshtastic.core.analytics.platform.PlatformAnalytics import org.meshtastic.core.data.repository.PacketRepository import org.meshtastic.core.data.repository.RadioConfigRepository import org.meshtastic.core.database.entity.Packet import org.meshtastic.core.database.entity.ReactionEntity import org.meshtastic.core.model.DataPacket import org.meshtastic.core.model.MessageStatus import org.meshtastic.core.model.util.SfppHasher import org.meshtastic.core.prefs.mesh.MeshPrefs import org.meshtastic.core.service.MeshServiceNotifications import org.meshtastic.core.service.RetryEvent import org.meshtastic.core.service.ServiceRepository import org.meshtastic.core.service.filter.MessageFilterService import org.meshtastic.core.strings.Res import org.meshtastic.core.strings.critical_alert import org.meshtastic.core.strings.error_duty_cycle import org.meshtastic.core.strings.unknown_username import org.meshtastic.core.strings.waypoint_received import org.meshtastic.proto.AdminProtos import org.meshtastic.proto.MeshProtos import org.meshtastic.proto.MeshProtos.MeshPacket import org.meshtastic.proto.PaxcountProtos import org.meshtastic.proto.Portnums import org.meshtastic.proto.StoreAndForwardProtos import org.meshtastic.proto.TelemetryProtos import org.meshtastic.proto.copy import java.util.concurrent.ConcurrentHashMap import javax.inject.Inject import javax.inject.Singleton import kotlin.time.Duration.Companion.milliseconds @Suppress("LongParameterList", "TooManyFunctions", "LargeClass") @Singleton class MeshDataHandler @Inject constructor( private val nodeManager: MeshNodeManager, private val packetHandler: PacketHandler, private val serviceRepository: ServiceRepository, private val packetRepository: Lazy, private val serviceBroadcasts: MeshServiceBroadcasts, private val serviceNotifications: MeshServiceNotifications, private val analytics: PlatformAnalytics, private val dataMapper: MeshDataMapper, private val configHandler: MeshConfigHandler, private val configFlowManager: MeshConfigFlowManager, private val commandSender: MeshCommandSender, private val historyManager: MeshHistoryManager, private val meshPrefs: MeshPrefs, private val connectionManager: MeshConnectionManager, private val tracerouteHandler: MeshTracerouteHandler, private val neighborInfoHandler: MeshNeighborInfoHandler, private val radioConfigRepository: RadioConfigRepository, private val messageFilterService: MessageFilterService, ) { private var scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) fun start(scope: CoroutineScope) { this.scope = scope } private val rememberDataType = setOf( Portnums.PortNum.TEXT_MESSAGE_APP_VALUE, Portnums.PortNum.ALERT_APP_VALUE, Portnums.PortNum.WAYPOINT_APP_VALUE, ) fun handleReceivedData(packet: MeshPacket, myNodeNum: Int, logUuid: String? = null, logInsertJob: Job? = null) { val dataPacket = dataMapper.toDataPacket(packet) ?: return val fromUs = myNodeNum == packet.from dataPacket.status = MessageStatus.RECEIVED val shouldBroadcast = handleDataPacket(packet, dataPacket, myNodeNum, fromUs, logUuid, logInsertJob) if (shouldBroadcast) { serviceBroadcasts.broadcastReceivedData(dataPacket) } analytics.track("num_data_receive", DataPair("num_data_receive", 1)) } private fun handleDataPacket( packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int, fromUs: Boolean, logUuid: String?, logInsertJob: Job?, ): Boolean { var shouldBroadcast = !fromUs when (packet.decoded.portnumValue) { Portnums.PortNum.TEXT_MESSAGE_APP_VALUE -> handleTextMessage(packet, dataPacket, myNodeNum) Portnums.PortNum.ALERT_APP_VALUE -> rememberDataPacket(dataPacket, myNodeNum) Portnums.PortNum.WAYPOINT_APP_VALUE -> handleWaypoint(packet, dataPacket, myNodeNum) Portnums.PortNum.POSITION_APP_VALUE -> handlePosition(packet, dataPacket, myNodeNum) Portnums.PortNum.NODEINFO_APP_VALUE -> if (!fromUs) handleNodeInfo(packet) Portnums.PortNum.TELEMETRY_APP_VALUE -> handleTelemetry(packet, dataPacket, myNodeNum) else -> shouldBroadcast = handleSpecializedDataPacket(packet, dataPacket, myNodeNum, logUuid, logInsertJob) } return shouldBroadcast } private fun handleSpecializedDataPacket( packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int, logUuid: String?, logInsertJob: Job?, ): Boolean { var shouldBroadcast = false when (packet.decoded.portnumValue) { Portnums.PortNum.TRACEROUTE_APP_VALUE -> { tracerouteHandler.handleTraceroute(packet, logUuid, logInsertJob) shouldBroadcast = false } Portnums.PortNum.ROUTING_APP_VALUE -> { handleRouting(packet, dataPacket) shouldBroadcast = true } Portnums.PortNum.PAXCOUNTER_APP_VALUE -> { handlePaxCounter(packet) shouldBroadcast = false } Portnums.PortNum.STORE_FORWARD_APP_VALUE -> { handleStoreAndForward(packet, dataPacket, myNodeNum) shouldBroadcast = false } Portnums.PortNum.STORE_FORWARD_PLUSPLUS_APP_VALUE -> { handleStoreForwardPlusPlus(packet) shouldBroadcast = false } Portnums.PortNum.ADMIN_APP_VALUE -> { handleAdminMessage(packet, myNodeNum) shouldBroadcast = false } Portnums.PortNum.NEIGHBORINFO_APP_VALUE -> { neighborInfoHandler.handleNeighborInfo(packet) shouldBroadcast = true } Portnums.PortNum.RANGE_TEST_APP_VALUE, Portnums.PortNum.DETECTION_SENSOR_APP_VALUE, -> { handleRangeTest(dataPacket, myNodeNum) shouldBroadcast = false } } return shouldBroadcast } private fun handleRangeTest(dataPacket: DataPacket, myNodeNum: Int) { val u = dataPacket.copy(dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE) rememberDataPacket(u, myNodeNum) } private fun handleStoreAndForward(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) { val u = StoreAndForwardProtos.StoreAndForward.parseFrom(packet.decoded.payload) handleReceivedStoreAndForward(dataPacket, u, myNodeNum) } @Suppress("LongMethod") private fun handleStoreForwardPlusPlus(packet: MeshPacket) { val sfpp = MeshProtos.StoreForwardPlusPlus.parseFrom(packet.decoded.payload) Logger.d { "Received StoreForwardPlusPlus packet: $sfpp" } when (sfpp.sfppMessageType) { MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE, MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE_FIRSTHALF, MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE_SECONDHALF, -> { val isFragment = sfpp.sfppMessageType != MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE // If it has a commit hash, it's already on the chain (Confirmed) // Otherwise it's still being routed via SF++ (Routing) val status = if (sfpp.commitHash.isEmpty) MessageStatus.SFPP_ROUTING else MessageStatus.SFPP_CONFIRMED // Prefer a full 16-byte hash calculated from the message bytes if available // But only if it's NOT a fragment, otherwise the calculated hash would be wrong val hash = when { !sfpp.messageHash.isEmpty -> sfpp.messageHash.toByteArray() !isFragment && !sfpp.message.isEmpty -> { SfppHasher.computeMessageHash( encryptedPayload = sfpp.message.toByteArray(), // Map 0 back to NODENUM_BROADCAST to match firmware hash calculation to = if (sfpp.encapsulatedTo == 0) DataPacket.NODENUM_BROADCAST else sfpp.encapsulatedTo, from = sfpp.encapsulatedFrom, id = sfpp.encapsulatedId, ) } else -> null } ?: return Logger.d { "SFPP updateStatus: packetId=${sfpp.encapsulatedId} from=${sfpp.encapsulatedFrom} " + "to=${sfpp.encapsulatedTo} myNodeNum=${nodeManager.myNodeNum} status=$status" } scope.handledLaunch { packetRepository .get() .updateSFPPStatus( packetId = sfpp.encapsulatedId, from = sfpp.encapsulatedFrom, to = sfpp.encapsulatedTo, hash = hash, status = status, rxTime = sfpp.encapsulatedRxtime.toLong() and 0xFFFFFFFFL, myNodeNum = nodeManager.myNodeNum, ) serviceBroadcasts.broadcastMessageStatus(sfpp.encapsulatedId, status) } } MeshProtos.StoreForwardPlusPlus.SFPP_message_type.CANON_ANNOUNCE -> { scope.handledLaunch { packetRepository .get() .updateSFPPStatusByHash( hash = sfpp.messageHash.toByteArray(), status = MessageStatus.SFPP_CONFIRMED, rxTime = sfpp.encapsulatedRxtime.toLong() and 0xFFFFFFFFL, ) } } MeshProtos.StoreForwardPlusPlus.SFPP_message_type.CHAIN_QUERY -> { Logger.i { "SF++: Node ${packet.from} is querying chain status" } } MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_REQUEST -> { Logger.i { "SF++: Node ${packet.from} is requesting links" } } else -> {} } } private fun handlePaxCounter(packet: MeshPacket) { val p = PaxcountProtos.Paxcount.parseFrom(packet.decoded.payload) nodeManager.handleReceivedPaxcounter(packet.from, p) } private fun handlePosition(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) { val p = MeshProtos.Position.parseFrom(packet.decoded.payload) nodeManager.handleReceivedPosition(packet.from, myNodeNum, p, dataPacket.time) } private fun handleWaypoint(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) { val u = MeshProtos.Waypoint.parseFrom(packet.decoded.payload) if (u.lockedTo != 0 && u.lockedTo != packet.from) return val currentSecond = (System.currentTimeMillis() / MILLISECONDS_IN_SECOND).toInt() rememberDataPacket(dataPacket, myNodeNum, updateNotification = u.expire > currentSecond) } private fun handleAdminMessage(packet: MeshPacket, myNodeNum: Int) { val u = AdminProtos.AdminMessage.parseFrom(packet.decoded.payload) commandSender.setSessionPasskey(u.sessionPasskey) if (packet.from == myNodeNum) { when (u.payloadVariantCase) { AdminProtos.AdminMessage.PayloadVariantCase.GET_CONFIG_RESPONSE -> configHandler.handleDeviceConfig(u.getConfigResponse) AdminProtos.AdminMessage.PayloadVariantCase.GET_CHANNEL_RESPONSE -> configHandler.handleChannel(u.getChannelResponse) else -> {} } } if (u.payloadVariantCase == AdminProtos.AdminMessage.PayloadVariantCase.GET_DEVICE_METADATA_RESPONSE) { if (packet.from == myNodeNum) { configFlowManager.handleLocalMetadata(u.getDeviceMetadataResponse) } else { nodeManager.insertMetadata(packet.from, u.getDeviceMetadataResponse) } } } private fun handleTextMessage(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) { if (packet.decoded.replyId != 0 && packet.decoded.emoji != 0) { rememberReaction(packet) } else { rememberDataPacket(dataPacket, myNodeNum) } } private fun handleNodeInfo(packet: MeshPacket) { val u = MeshProtos.User.parseFrom(packet.decoded.payload).copy { if (isLicensed) clearPublicKey() if (packet.viaMqtt) longName = "$longName (MQTT)" } nodeManager.handleReceivedUser(packet.from, u, packet.channel) } private fun handleTelemetry(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) { val t = TelemetryProtos.Telemetry.parseFrom(packet.decoded.payload).copy { if (time == 0) time = (dataPacket.time.milliseconds.inWholeSeconds).toInt() } val fromNum = packet.from val isRemote = (fromNum != myNodeNum) if (!isRemote) { connectionManager.updateTelemetry(t) } nodeManager.updateNodeInfo(fromNum) { nodeEntity -> when { t.hasDeviceMetrics() -> { nodeEntity.deviceTelemetry = t if (fromNum == myNodeNum || (isRemote && nodeEntity.isFavorite)) { if ( t.deviceMetrics.voltage > BATTERY_PERCENT_UNSUPPORTED && t.deviceMetrics.batteryLevel <= BATTERY_PERCENT_LOW_THRESHOLD ) { if (shouldBatteryNotificationShow(fromNum, t, myNodeNum)) { serviceNotifications.showOrUpdateLowBatteryNotification(nodeEntity, isRemote) } } else { if (batteryPercentCooldowns.containsKey(fromNum)) { batteryPercentCooldowns.remove(fromNum) } serviceNotifications.cancelLowBatteryNotification(nodeEntity) } } } t.hasEnvironmentMetrics() -> nodeEntity.environmentTelemetry = t t.hasPowerMetrics() -> nodeEntity.powerTelemetry = t } } } private fun shouldBatteryNotificationShow(fromNum: Int, t: TelemetryProtos.Telemetry, myNodeNum: Int): Boolean { val isRemote = (fromNum != myNodeNum) var shouldDisplay = false var forceDisplay = false when { t.deviceMetrics.batteryLevel <= BATTERY_PERCENT_CRITICAL_THRESHOLD -> { shouldDisplay = true forceDisplay = true } t.deviceMetrics.batteryLevel == BATTERY_PERCENT_LOW_THRESHOLD -> shouldDisplay = true t.deviceMetrics.batteryLevel.mod(BATTERY_PERCENT_LOW_DIVISOR) == 0 && !isRemote -> shouldDisplay = true isRemote -> shouldDisplay = true } if (shouldDisplay) { val now = System.currentTimeMillis() / MILLISECONDS_IN_SECOND if (!batteryPercentCooldowns.containsKey(fromNum)) batteryPercentCooldowns[fromNum] = 0 if ((now - batteryPercentCooldowns[fromNum]!!) >= BATTERY_PERCENT_COOLDOWN_SECONDS || forceDisplay) { batteryPercentCooldowns[fromNum] = now return true } } return false } private fun handleRouting(packet: MeshPacket, dataPacket: DataPacket) { val r = MeshProtos.Routing.parseFrom(packet.decoded.payload) if (r.errorReason == MeshProtos.Routing.Error.DUTY_CYCLE_LIMIT) { serviceRepository.setErrorMessage(getString(Res.string.error_duty_cycle)) } handleAckNak( packet.decoded.requestId, dataMapper.toNodeID(packet.from), r.errorReasonValue, dataPacket.relayNode, ) packetHandler.removeResponse(packet.decoded.requestId, complete = true) } @Suppress("CyclomaticComplexMethod", "LongMethod") private fun handleAckNak(requestId: Int, fromId: String, routingError: Int, relayNode: Int?) { scope.handledLaunch { val isAck = routingError == MeshProtos.Routing.Error.NONE_VALUE val p = packetRepository.get().getPacketById(requestId) val reaction = packetRepository.get().getReactionByPacketId(requestId) val isMaxRetransmit = routingError == MeshProtos.Routing.Error.MAX_RETRANSMIT_VALUE val shouldRetry = isMaxRetransmit && p != null && p.port_num == Portnums.PortNum.TEXT_MESSAGE_APP_VALUE && (p.data.from == DataPacket.ID_LOCAL || p.data.from == nodeManager.getMyId()) && p.data.retryCount < MAX_RETRY_ATTEMPTS val shouldRetryReaction = isMaxRetransmit && reaction != null && (reaction.userId == DataPacket.ID_LOCAL || reaction.userId == nodeManager.getMyId()) && reaction.retryCount < MAX_RETRY_ATTEMPTS && reaction.to != null @Suppress("MaxLineLength") Logger.d { val retryInfo = "packetId=${p?.packetId ?: reaction?.packetId} dataId=${p?.data?.id} retry=${p?.data?.retryCount ?: reaction?.retryCount}" val statusInfo = "status=${p?.data?.status ?: reaction?.status}" "[ackNak] req=$requestId routeErr=$routingError isAck=$isAck " + "maxRetransmit=$isMaxRetransmit shouldRetry=$shouldRetry reaction=$shouldRetryReaction $retryInfo $statusInfo" } if (shouldRetry) { val newRetryCount = p.data.retryCount + 1 // Emit retry event to UI and wait for user response val retryEvent = RetryEvent.MessageRetry( packetId = requestId, text = p.data.text ?: "", attemptNumber = newRetryCount, maxAttempts = MAX_RETRY_ATTEMPTS + 1, // +1 for initial attempt ) Logger.w { "[ackNak] requesting retry for req=$requestId retry=$newRetryCount" } Log.d("MeshDataHandler", "[ackNak] Emitting retry event for req=$requestId retry=$newRetryCount") val shouldProceed = serviceRepository.requestRetry(retryEvent, RETRY_DELAY_MS) Log.d("MeshDataHandler", "[ackNak] Retry response for req=$requestId: shouldProceed=$shouldProceed") if (shouldProceed) { val newId = commandSender.generatePacketId() val updatedData = p.data.copy( id = newId, status = MessageStatus.QUEUED, retryCount = newRetryCount, relayNode = null, ) val updatedPacket = p.copy(packetId = newId, data = updatedData, routingError = MeshProtos.Routing.Error.NONE_VALUE) packetRepository.get().update(updatedPacket) Logger.w { "[ackNak] retrying req=$requestId newId=$newId retry=$newRetryCount" } commandSender.sendData(updatedData) } else { // User cancelled retry - mark as ERROR Logger.w { "[ackNak] retry cancelled by user for req=$requestId" } p.data.status = MessageStatus.ERROR packetRepository.get().update(p) } return@handledLaunch } if (shouldRetryReaction && reaction != null) { val newRetryCount = reaction.retryCount + 1 // Emit retry event to UI and wait for user response val retryEvent = RetryEvent.ReactionRetry( packetId = requestId, emoji = reaction.emoji, attemptNumber = newRetryCount, maxAttempts = MAX_RETRY_ATTEMPTS + 1, // +1 for initial attempt ) Logger.w { "[ackNak] requesting retry for reaction req=$requestId retry=$newRetryCount" } val shouldProceed = serviceRepository.requestRetry(retryEvent, RETRY_DELAY_MS) if (shouldProceed) { val newId = commandSender.generatePacketId() val reactionPacket = DataPacket( to = reaction.to, channel = reaction.channel, bytes = reaction.emoji.toByteArray(Charsets.UTF_8), dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE, replyId = reaction.replyId, wantAck = true, emoji = reaction.emoji.codePointAt(0), id = newId, retryCount = newRetryCount, ) val updatedReaction = reaction.copy( packetId = newId, status = MessageStatus.QUEUED, retryCount = newRetryCount, relayNode = null, routingError = MeshProtos.Routing.Error.NONE_VALUE, ) packetRepository.get().updateReaction(updatedReaction) Logger.w { "[ackNak] retrying reaction req=$requestId newId=$newId retry=$newRetryCount" } commandSender.sendData(reactionPacket) } else { // User cancelled retry - mark as ERROR Logger.w { "[ackNak] retry cancelled by user for reaction req=$requestId" } val errorReaction = reaction.copy(status = MessageStatus.ERROR, routingError = routingError) packetRepository.get().updateReaction(errorReaction) } return@handledLaunch } val m = when { isAck && (fromId == p?.data?.to || fromId == reaction?.to) -> MessageStatus.RECEIVED isAck -> MessageStatus.DELIVERED else -> MessageStatus.ERROR } if (p != null && p.data.status != MessageStatus.RECEIVED) { p.data.status = m p.routingError = routingError if (isAck) { p.data.relays += 1 } p.data.relayNode = relayNode packetRepository.get().update(p) } reaction?.let { r -> if (r.status != MessageStatus.RECEIVED) { var updated = r.copy(status = m, routingError = routingError, relayNode = relayNode) if (isAck) { updated = updated.copy(relays = updated.relays + 1) } packetRepository.get().updateReaction(updated) } } serviceBroadcasts.broadcastMessageStatus(requestId, m) } } private fun handleReceivedStoreAndForward( dataPacket: DataPacket, s: StoreAndForwardProtos.StoreAndForward, myNodeNum: Int, ) { Logger.d { "StoreAndForward: ${s.variantCase} ${s.rr} from ${dataPacket.from}" } val transport = currentTransport() val isHistory = s.variantCase == StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY val lastRequest = if (isHistory) s.history.lastRequest else 0 val baseContext = "transport=$transport from=${dataPacket.from}" historyLog { "rxStoreForward $baseContext variant=${s.variantCase} rr=${s.rr} lastRequest=$lastRequest" } when (s.variantCase) { StoreAndForwardProtos.StoreAndForward.VariantCase.STATS -> { val text = s.stats.toString() val u = dataPacket.copy( bytes = text.encodeToByteArray(), dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE, ) rememberDataPacket(u, myNodeNum) } StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY -> { val h = s.history @Suppress("MaxLineLength") historyLog(Log.DEBUG) { "routerHistory $baseContext messages=${h.historyMessages} window=${h.window} lastReq=${h.lastRequest}" } val text = "Total messages: ${h.historyMessages}\n" + "History window: ${h.window.milliseconds.inWholeMinutes} min\n" + "Last request: ${h.lastRequest}" val u = dataPacket.copy( bytes = text.encodeToByteArray(), dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE, ) rememberDataPacket(u, myNodeNum) historyManager.updateStoreForwardLastRequest("router_history", h.lastRequest, transport) } StoreAndForwardProtos.StoreAndForward.VariantCase.HEARTBEAT -> { val hb = s.heartbeat historyLog { "rxHeartbeat $baseContext period=${hb.period} secondary=${hb.secondary}" } } StoreAndForwardProtos.StoreAndForward.VariantCase.TEXT -> { if (s.rr == StoreAndForwardProtos.StoreAndForward.RequestResponse.ROUTER_TEXT_BROADCAST) { dataPacket.to = DataPacket.ID_BROADCAST } @Suppress("MaxLineLength") historyLog(Log.DEBUG) { "rxText $baseContext id=${dataPacket.id} ts=${dataPacket.time} to=${dataPacket.to} decision=remember" } val u = dataPacket.copy(bytes = s.text.toByteArray(), dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE) rememberDataPacket(u, myNodeNum) } else -> {} } } fun rememberDataPacket(dataPacket: DataPacket, myNodeNum: Int, updateNotification: Boolean = true) { if (dataPacket.dataType !in rememberDataType) return val fromLocal = dataPacket.from == DataPacket.ID_LOCAL || dataPacket.from == DataPacket.nodeNumToDefaultId(myNodeNum) val toBroadcast = dataPacket.to == DataPacket.ID_BROADCAST val contactId = if (fromLocal || toBroadcast) dataPacket.to else dataPacket.from // contactKey: unique contact key filter (channel)+(nodeId) val contactKey = "${dataPacket.channel}$contactId" scope.handledLaunch { packetRepository.get().apply { // Check for duplicates before inserting val existingPackets = findPacketsWithId(dataPacket.id) if (existingPackets.isNotEmpty()) { Logger.d { "Skipping duplicate packet: packetId=${dataPacket.id} from=${dataPacket.from} " + "to=${dataPacket.to} contactKey=$contactKey" + " (already have ${existingPackets.size} packet(s))" } return@handledLaunch } // Check if message should be filtered val isFiltered = shouldFilterMessage(dataPacket, contactKey) val packetToSave = Packet( uuid = 0L, myNodeNum = myNodeNum, packetId = dataPacket.id, port_num = dataPacket.dataType, contact_key = contactKey, received_time = System.currentTimeMillis(), read = fromLocal || isFiltered, data = dataPacket, snr = dataPacket.snr, rssi = dataPacket.rssi, hopsAway = dataPacket.hopsAway, filtered = isFiltered, ) insert(packetToSave) if (!isFiltered) { handlePacketNotification(packetToSave, dataPacket, contactKey, updateNotification) } } } } private suspend fun PacketRepository.shouldFilterMessage(dataPacket: DataPacket, contactKey: String): Boolean { if (dataPacket.dataType != Portnums.PortNum.TEXT_MESSAGE_APP_VALUE) return false val isFilteringDisabled = getContactSettings(contactKey).filteringDisabled return messageFilterService.shouldFilter(dataPacket.text.orEmpty(), isFilteringDisabled) } private suspend fun handlePacketNotification( packet: Packet, dataPacket: DataPacket, contactKey: String, updateNotification: Boolean, ) { val conversationMuted = packetRepository.get().getContactSettings(contactKey).isMuted val nodeMuted = nodeManager.nodeDBbyID[dataPacket.from]?.isMuted == true val isSilent = conversationMuted || nodeMuted if (packet.port_num == Portnums.PortNum.ALERT_APP_VALUE && !isSilent) { serviceNotifications.showAlertNotification( contactKey, getSenderName(dataPacket), dataPacket.alert ?: getString(Res.string.critical_alert), ) } else if (updateNotification) { scope.handledLaunch { updateNotification(contactKey, dataPacket, isSilent) } } } private fun getSenderName(packet: DataPacket): String { if (packet.from == DataPacket.ID_LOCAL) { val myId = nodeManager.getMyId() return nodeManager.nodeDBbyID[myId]?.user?.longName ?: getString(Res.string.unknown_username) } return nodeManager.nodeDBbyID[packet.from]?.user?.longName ?: getString(Res.string.unknown_username) } private suspend fun updateNotification(contactKey: String, dataPacket: DataPacket, isSilent: Boolean) { when (dataPacket.dataType) { Portnums.PortNum.TEXT_MESSAGE_APP_VALUE -> { val message = dataPacket.text!! val channelName = if (dataPacket.to == DataPacket.ID_BROADCAST) { radioConfigRepository.channelSetFlow.first().settingsList.getOrNull(dataPacket.channel)?.name } else { null } serviceNotifications.updateMessageNotification( contactKey, getSenderName(dataPacket), message, dataPacket.to == DataPacket.ID_BROADCAST, channelName, isSilent, ) } Portnums.PortNum.WAYPOINT_APP_VALUE -> { val message = getString(Res.string.waypoint_received, dataPacket.waypoint!!.name) serviceNotifications.updateWaypointNotification( contactKey, getSenderName(dataPacket), message, dataPacket.waypoint!!.id, isSilent, ) } else -> return } } private fun rememberReaction(packet: MeshPacket) = scope.handledLaunch { val emoji = packet.decoded.payload.toByteArray().decodeToString() val fromId = dataMapper.toNodeID(packet.from) val toId = dataMapper.toNodeID(packet.to) val reaction = ReactionEntity( myNodeNum = nodeManager.myNodeNum ?: 0, replyId = packet.decoded.replyId, userId = fromId, emoji = emoji, timestamp = System.currentTimeMillis(), snr = packet.rxSnr, rssi = packet.rxRssi, hopsAway = if (packet.hopStart == 0 || packet.hopLimit > packet.hopStart) { HOPS_AWAY_UNAVAILABLE } else { packet.hopStart - packet.hopLimit }, packetId = packet.id, status = MessageStatus.RECEIVED, to = toId, channel = packet.channel, ) // Check for duplicates before inserting val existingReactions = packetRepository.get().findReactionsWithId(packet.id) if (existingReactions.isNotEmpty()) { Logger.d { "Skipping duplicate reaction: packetId=${packet.id} replyId=${packet.decoded.replyId} " + "from=$fromId emoji=$emoji (already have ${existingReactions.size} reaction(s))" } return@handledLaunch } packetRepository.get().insertReaction(reaction) // Find the original packet to get the contactKey packetRepository.get().getPacketByPacketId(packet.decoded.replyId)?.let { original -> // Skip notification if the original message was filtered if (original.packet.filtered) return@let val contactKey = original.packet.contact_key val conversationMuted = packetRepository.get().getContactSettings(contactKey).isMuted val nodeMuted = nodeManager.nodeDBbyID[fromId]?.isMuted == true val isSilent = conversationMuted || nodeMuted val channelName = if (original.packet.data.to == DataPacket.ID_BROADCAST) { radioConfigRepository.channelSetFlow .first() .settingsList .getOrNull(original.packet.data.channel) ?.name } else { null } serviceNotifications.updateReactionNotification( contactKey, getSenderName(dataMapper.toDataPacket(packet)!!), emoji, original.packet.data.to == DataPacket.ID_BROADCAST, channelName, isSilent, ) } } private fun currentTransport(address: String? = meshPrefs.deviceAddress): String = when (address?.firstOrNull()) { InterfaceId.BLUETOOTH.id -> "BLE" InterfaceId.TCP.id -> "TCP" InterfaceId.SERIAL.id -> "Serial" InterfaceId.MOCK.id -> "Mock" InterfaceId.NOP.id -> "NOP" else -> "Unknown" } private inline fun historyLog( priority: Int = Log.INFO, throwable: Throwable? = null, crossinline message: () -> String, ) { if (!BuildConfig.DEBUG) return val logger = Logger.withTag("HistoryReplay") val msg = message() when (priority) { Log.VERBOSE -> logger.v(throwable) { msg } Log.DEBUG -> logger.d(throwable) { msg } Log.INFO -> logger.i(throwable) { msg } Log.WARN -> logger.w(throwable) { msg } Log.ERROR -> logger.e(throwable) { msg } else -> logger.i(throwable) { msg } } } companion object { private const val MAX_RETRY_ATTEMPTS = 2 private const val RETRY_DELAY_MS = 5_000L private const val MILLISECONDS_IN_SECOND = 1000L private const val HOPS_AWAY_UNAVAILABLE = -1 private const val BATTERY_PERCENT_UNSUPPORTED = 0.0 private const val BATTERY_PERCENT_LOW_THRESHOLD = 20 private const val BATTERY_PERCENT_LOW_DIVISOR = 5 private const val BATTERY_PERCENT_CRITICAL_THRESHOLD = 5 private const val BATTERY_PERCENT_COOLDOWN_SECONDS = 1500 private val batteryPercentCooldowns = ConcurrentHashMap() } }