refactor(service): harden KMP service layer — database init, connection reliability, handler decomposition (#4992)

This commit is contained in:
James Rich 2026-04-04 13:07:44 -05:00 committed by GitHub
parent e111b61e4e
commit 6af3ad6f0c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
62 changed files with 3808 additions and 735 deletions

View file

@ -0,0 +1,86 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import org.koin.core.annotation.Single
import org.meshtastic.core.repository.AdminPacketHandler
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.MeshConfigFlowManager
import org.meshtastic.core.repository.MeshConfigHandler
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.MeshPacket
/**
* Implementation of [AdminPacketHandler] that processes admin messages, including session passkeys, device/module
* configuration, and metadata.
*/
@Single
class AdminPacketHandlerImpl(
private val nodeManager: NodeManager,
private val configHandler: Lazy<MeshConfigHandler>,
private val configFlowManager: Lazy<MeshConfigFlowManager>,
private val commandSender: CommandSender,
) : AdminPacketHandler {
override fun handleAdminMessage(packet: MeshPacket, myNodeNum: Int) {
val payload = packet.decoded?.payload ?: return
val u = AdminMessage.ADAPTER.decode(payload)
Logger.d { "Admin message from=${packet.from} fields=${u.summarize()}" }
// Guard against clearing a valid passkey: firmware always embeds the key in every
// admin response, but a missing (default-empty) field must not reset the stored value.
val incomingPasskey = u.session_passkey
if (incomingPasskey.size > 0) {
Logger.d { "Session passkey updated (${incomingPasskey.size} bytes)" }
commandSender.setSessionPasskey(incomingPasskey)
}
val fromNum = packet.from
u.get_module_config_response?.let {
if (fromNum == myNodeNum) {
configHandler.value.handleModuleConfig(it)
} else {
it.statusmessage?.node_status?.let { nodeManager.updateNodeStatus(fromNum, it) }
}
}
if (fromNum == myNodeNum) {
u.get_config_response?.let { configHandler.value.handleDeviceConfig(it) }
u.get_channel_response?.let { configHandler.value.handleChannel(it) }
}
u.get_device_metadata_response?.let {
if (fromNum == myNodeNum) {
configFlowManager.value.handleLocalMetadata(it)
} else {
nodeManager.insertMetadata(fromNum, it)
}
}
}
}
/** Returns a short summary of the non-null admin message fields for logging. */
private fun AdminMessage.summarize(): String = buildList {
get_config_response?.let { add("get_config_response") }
get_module_config_response?.let { add("get_module_config_response") }
get_channel_response?.let { add("get_channel_response") }
get_device_metadata_response?.let { add("get_device_metadata_response") }
if (session_passkey.size > 0) add("session_passkey")
}
.joinToString()
.ifEmpty { "empty" }

View file

@ -19,14 +19,12 @@ package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import okio.ByteString
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MessageStatus
@ -62,7 +60,7 @@ class CommandSenderImpl(
private val tracerouteHandler: TracerouteHandler,
private val neighborInfoHandler: NeighborInfoHandler,
) : CommandSender {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private val currentPacketId = atomic(Random(nowMillis).nextLong().absoluteValue)
private val sessionPasskey = atomic(ByteString.EMPTY)
@ -98,7 +96,7 @@ class CommandSenderImpl(
private fun computeHopLimit(): Int = (localConfig.value.lora?.hop_limit ?: 0).takeIf { it > 0 } ?: DEFAULT_HOP_LIMIT
private fun getAdminChannelIndex(toNum: Int): Int {
val myNum = nodeManager.myNodeNum ?: return 0
val myNum = nodeManager.myNodeNum.value ?: return 0
val myNode = nodeManager.nodeDBbyNodeNum[myNum]
val destNode = nodeManager.nodeDBbyNodeNum[toNum]
@ -169,8 +167,20 @@ class CommandSenderImpl(
packetHandler.sendToRadio(packet)
}
override suspend fun sendAdminAwait(
destNum: Int,
requestId: Int,
wantResponse: Boolean,
initFn: () -> AdminMessage,
): Boolean {
val adminMsg = initFn().copy(session_passkey = sessionPasskey.value)
val packet =
buildAdminPacket(to = destNum, id = requestId, wantResponse = wantResponse, adminMessage = adminMsg)
return packetHandler.sendToRadioAndAwait(packet)
}
override fun sendPosition(pos: org.meshtastic.proto.Position, destNum: Int?, wantResponse: Boolean) {
val myNum = nodeManager.myNodeNum ?: return
val myNum = nodeManager.myNodeNum.value ?: return
val idNum = destNum ?: myNum
Logger.d { "Sending our position/time to=$idNum $pos" }
@ -230,11 +240,11 @@ class CommandSenderImpl(
AdminMessage(remove_fixed_position = true)
}
}
nodeManager.handleReceivedPosition(destNum, nodeManager.myNodeNum ?: 0, meshPos, nowMillis)
nodeManager.handleReceivedPosition(destNum, nodeManager.myNodeNum.value ?: 0, meshPos, nowMillis)
}
override fun requestUserInfo(destNum: Int) {
val myNum = nodeManager.myNodeNum ?: return
val myNum = nodeManager.myNodeNum.value ?: return
val myNode = nodeManager.nodeDBbyNodeNum[myNum] ?: return
packetHandler.sendToRadio(
buildMeshPacket(
@ -303,7 +313,7 @@ class CommandSenderImpl(
override fun requestNeighborInfo(requestId: Int, destNum: Int) {
neighborInfoHandler.recordStartTime(requestId)
val myNum = nodeManager.myNodeNum ?: 0
val myNum = nodeManager.myNodeNum.value ?: 0
if (destNum == myNum) {
val neighborInfoToSend =
neighborInfoHandler.lastNeighborInfo
@ -392,7 +402,7 @@ class CommandSenderImpl(
}
return MeshPacket(
from = nodeManager.myNodeNum ?: 0,
from = nodeManager.myNodeNum.value ?: 0,
to = to,
id = id,
want_ack = wantAck,

View file

@ -127,7 +127,6 @@ class FromRadioPacketHandlerImpl(
notificationManager.dispatch(
Notification(title = title, type = type, message = cn.message, category = Notification.Category.Alert),
)
packetHandler.removeResponse(0, complete = false)
}
}
}

View file

@ -16,14 +16,13 @@
*/
package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import okio.ByteString.Companion.toByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.common.database.DatabaseManager
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ignoreException
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.common.util.ignoreExceptionSuspend
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MeshUser
@ -66,7 +65,7 @@ class MeshActionHandlerImpl(
private val messageProcessor: Lazy<MeshMessageProcessor>,
private val radioConfigRepository: RadioConfigRepository,
) : MeshActionHandler {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
override fun start(scope: CoroutineScope) {
this.scope = scope
@ -77,9 +76,10 @@ class MeshActionHandlerImpl(
private const val EMOJI_INDICATOR = 1
}
override fun onServiceAction(action: ServiceAction) {
ignoreException {
val myNodeNum = nodeManager.myNodeNum ?: return@ignoreException
override suspend fun onServiceAction(action: ServiceAction) {
Logger.d { "ServiceAction dispatched: ${action::class.simpleName}" }
ignoreExceptionSuspend {
val myNodeNum = nodeManager.myNodeNum.value ?: return@ignoreExceptionSuspend
when (action) {
is ServiceAction.Favorite -> handleFavorite(action, myNodeNum)
is ServiceAction.Ignore -> handleIgnore(action, myNodeNum)
@ -87,7 +87,12 @@ class MeshActionHandlerImpl(
is ServiceAction.Reaction -> handleReaction(action, myNodeNum)
is ServiceAction.ImportContact -> handleImportContact(action, myNodeNum)
is ServiceAction.SendContact -> {
commandSender.sendAdmin(myNodeNum) { AdminMessage(add_contact = action.contact) }
val accepted =
runCatching {
commandSender.sendAdminAwait(myNodeNum) { AdminMessage(add_contact = action.contact) }
}
.getOrDefault(false)
action.result.complete(accepted)
}
is ServiceAction.GetDeviceMetadata -> {
commandSender.sendAdmin(action.destNum, wantResponse = true) {
@ -180,6 +185,7 @@ class MeshActionHandlerImpl(
}
override fun handleSetOwner(u: MeshUser, myNodeNum: Int) {
Logger.d { "Setting owner: longName=${u.longName}, shortName=${u.shortName}" }
val newUser = User(id = u.id, long_name = u.longName, short_name = u.shortName, is_licensed = u.isLicensed)
commandSender.sendAdmin(myNodeNum) { AdminMessage(set_owner = newUser) }
nodeManager.handleReceivedUser(myNodeNum, newUser)
@ -253,7 +259,7 @@ class MeshActionHandlerImpl(
c.statusmessage?.let { sm -> nodeManager.updateNodeStatus(destNum, sm.node_status) }
// Optimistically persist module config locally so the UI reflects the
// new values immediately instead of waiting for the next want_config handshake.
if (destNum == nodeManager.myNodeNum) {
if (destNum == nodeManager.myNodeNum.value) {
scope.handledLaunch { radioConfigRepository.setLocalModuleConfig(c) }
}
}
@ -329,6 +335,7 @@ class MeshActionHandlerImpl(
}
override fun handleRequestReboot(requestId: Int, destNum: Int) {
Logger.i { "Reboot requested for node $destNum" }
commandSender.sendAdmin(destNum, requestId) { AdminMessage(reboot_seconds = DEFAULT_REBOOT_DELAY) }
}
@ -340,6 +347,7 @@ class MeshActionHandlerImpl(
}
override fun handleRequestFactoryReset(requestId: Int, destNum: Int) {
Logger.i { "Factory reset requested for node $destNum" }
commandSender.sendAdmin(destNum, requestId) { AdminMessage(factory_reset_device = 1) }
}
@ -356,6 +364,7 @@ class MeshActionHandlerImpl(
override fun handleUpdateLastAddress(deviceAddr: String?) {
val currentAddr = meshPrefs.deviceAddress.value
if (deviceAddr != currentAddr) {
Logger.i { "Device address changed, switching database and clearing node DB" }
meshPrefs.setDeviceAddress(deviceAddr)
scope.handledLaunch {
nodeManager.clear()

View file

@ -18,12 +18,10 @@ package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import okio.IOException
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.HandshakeConstants
@ -58,47 +56,91 @@ class MeshConfigFlowManagerImpl(
private val commandSender: CommandSender,
private val packetHandler: PacketHandler,
) : MeshConfigFlowManager {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private val wantConfigDelay = 100L
override fun start(scope: CoroutineScope) {
this.scope = scope
}
private val newNodes = mutableListOf<NodeInfo>()
override val newNodeCount: Int
get() = newNodes.size
/**
* Type-safe handshake state machine. Each state carries exactly the data that is valid during that phase,
* eliminating the possibility of accessing stale or uninitialized fields.
*
* Guards [handleConfigComplete] so that duplicate or out-of-order `config_complete_id` signals from the firmware
* cannot trigger the wrong stage handler or drive the state machine backward.
*/
private sealed class HandshakeState {
/** No handshake in progress. */
data object Idle : HandshakeState()
private var rawMyNodeInfo: ProtoMyNodeInfo? = null
private var lastMetadata: DeviceMetadata? = null
private var newMyNodeInfo: SharedMyNodeInfo? = null
private var myNodeInfo: SharedMyNodeInfo? = null
/**
* Stage 1: receiving device config, module config, channels, and metadata.
*
* [rawMyNodeInfo] arrives first (my_info packet); [metadata] may arrive shortly after. Both are consumed
* together by [buildMyNodeInfo] at Stage 1 completion.
*/
data class ReceivingConfig(val rawMyNodeInfo: ProtoMyNodeInfo, var metadata: DeviceMetadata? = null) :
HandshakeState()
/**
* Stage 2: receiving node-info packets from the firmware.
*
* [myNodeInfo] was committed at the Stage 12 transition. [nodes] accumulates [NodeInfo] packets until
* `config_complete_id` arrives.
*/
data class ReceivingNodeInfo(
val myNodeInfo: SharedMyNodeInfo,
val nodes: MutableList<NodeInfo> = mutableListOf(),
) : HandshakeState()
/** Both stages finished. The app is fully connected. */
data class Complete(val myNodeInfo: SharedMyNodeInfo) : HandshakeState()
}
private var handshakeState: HandshakeState = HandshakeState.Idle
override val newNodeCount: Int
get() = (handshakeState as? HandshakeState.ReceivingNodeInfo)?.nodes?.size ?: 0
override fun handleConfigComplete(configCompleteId: Int) {
val state = handshakeState
when (configCompleteId) {
HandshakeConstants.CONFIG_NONCE -> handleConfigOnlyComplete()
HandshakeConstants.NODE_INFO_NONCE -> handleNodeInfoComplete()
HandshakeConstants.CONFIG_NONCE -> {
if (state !is HandshakeState.ReceivingConfig) {
Logger.w { "Ignoring Stage 1 config_complete in state=$state" }
return
}
handleConfigOnlyComplete(state)
}
HandshakeConstants.NODE_INFO_NONCE -> {
if (state !is HandshakeState.ReceivingNodeInfo) {
Logger.w { "Ignoring Stage 2 config_complete in state=$state" }
return
}
handleNodeInfoComplete(state)
}
else -> Logger.w { "Config complete id mismatch: $configCompleteId" }
}
}
private fun handleConfigOnlyComplete() {
private fun handleConfigOnlyComplete(state: HandshakeState.ReceivingConfig) {
Logger.i { "Config-only complete (Stage 1)" }
if (newMyNodeInfo == null) {
Logger.w {
"newMyNodeInfo is still null at Stage 1 complete, attempting final regen with last known metadata"
val finalizedInfo = buildMyNodeInfo(state.rawMyNodeInfo, state.metadata)
if (finalizedInfo == null) {
Logger.w { "Stage 1 failed: could not build MyNodeInfo, retrying Stage 1" }
handshakeState = HandshakeState.Idle
scope.handledLaunch {
delay(wantConfigDelay)
connectionManager.value.startConfigOnly()
}
regenMyNodeInfo(lastMetadata)
return
}
val finalizedInfo = newMyNodeInfo
if (finalizedInfo == null) {
Logger.e { "Handshake stall: Did not receive a valid MyNodeInfo before Stage 1 complete" }
} else {
myNodeInfo = finalizedInfo
Logger.i { "myNodeInfo committed successfully (nodeNum=${finalizedInfo.myNodeNum})" }
connectionManager.value.onRadioConfigLoaded()
}
handshakeState = HandshakeState.ReceivingNodeInfo(myNodeInfo = finalizedInfo)
Logger.i { "myNodeInfo committed (nodeNum=${finalizedInfo.myNodeNum})" }
connectionManager.value.onRadioConfigLoaded()
scope.handledLaunch {
delay(wantConfigDelay)
@ -118,19 +160,34 @@ class MeshConfigFlowManagerImpl(
}
}
private fun handleNodeInfoComplete() {
private fun handleNodeInfoComplete(state: HandshakeState.ReceivingNodeInfo) {
Logger.i { "NodeInfo complete (Stage 2)" }
val entities = newNodes.map { info ->
nodeManager.installNodeInfo(info, withBroadcast = false)
nodeManager.nodeDBbyNodeNum[info.num]!!
}
newNodes.clear()
val info = state.myNodeInfo
// Transition state immediately (synchronously) to prevent duplicate handling.
// The async work below (DB writes, broadcasts) proceeds without the guard.
handshakeState = HandshakeState.Complete(myNodeInfo = info)
// Snapshot and clear immediately so that a concurrent stall-guard retry (which
// resends want_config_id and causes the firmware to restart the node_info burst)
// starts accumulating into a fresh list rather than doubling this batch.
val nodesToProcess = state.nodes.toList()
state.nodes.clear()
val entities =
nodesToProcess.mapNotNull { nodeInfo ->
nodeManager.installNodeInfo(nodeInfo, withBroadcast = false)
nodeManager.nodeDBbyNodeNum[nodeInfo.num]
?: run {
Logger.w { "Node ${nodeInfo.num} missing from DB after installNodeInfo; skipping" }
null
}
}
scope.handledLaunch {
myNodeInfo?.let {
nodeRepository.installConfig(it, entities)
sendAnalytics(it)
}
nodeRepository.installConfig(info, entities)
analytics.setDeviceAttributes(info.firmwareVersion ?: "unknown", info.model ?: "unknown")
nodeManager.setNodeDbReady(true)
nodeManager.setAllowNodeDbWrites(true)
serviceRepository.setConnectionState(ConnectionState.Connected)
@ -139,16 +196,18 @@ class MeshConfigFlowManagerImpl(
}
}
private fun sendAnalytics(mi: SharedMyNodeInfo) {
analytics.setDeviceAttributes(mi.firmwareVersion ?: "unknown", mi.model ?: "unknown")
}
override fun handleMyInfo(myInfo: ProtoMyNodeInfo) {
Logger.i { "MyNodeInfo received: ${myInfo.my_node_num}" }
rawMyNodeInfo = myInfo
nodeManager.myNodeNum = myInfo.my_node_num
regenMyNodeInfo(lastMetadata)
// Transition to Stage 1, discarding any stale data from a prior interrupted handshake.
handshakeState = HandshakeState.ReceivingConfig(rawMyNodeInfo = myInfo)
nodeManager.setMyNodeNum(myInfo.my_node_num)
// Clear persisted radio config so the new handshake starts from a clean slate.
// DataStore serializes its own writes, so the clear will precede subsequent
// setLocalConfig / updateChannelSettings calls dispatched by later packets in this
// session (handleFromRadio processes packets sequentially, so later dispatches always
// occur after this one returns).
scope.handledLaunch {
radioConfigRepository.clearChannelSet()
radioConfigRepository.clearLocalConfig()
@ -160,12 +219,26 @@ class MeshConfigFlowManagerImpl(
override fun handleLocalMetadata(metadata: DeviceMetadata) {
Logger.i { "Local Metadata received: ${metadata.firmware_version}" }
lastMetadata = metadata
regenMyNodeInfo(metadata)
val state = handshakeState
if (state is HandshakeState.ReceivingConfig) {
state.metadata = metadata
// Persist the metadata immediately — buildMyNodeInfo() reads it at Stage 1 complete,
// but the DB write does not need to wait until then.
if (metadata != DeviceMetadata()) {
scope.handledLaunch { nodeRepository.insertMetadata(state.rawMyNodeInfo.my_node_num, metadata) }
}
} else {
Logger.w { "Ignoring metadata outside Stage 1 (state=$state)" }
}
}
override fun handleNodeInfo(info: NodeInfo) {
newNodes.add(info)
val state = handshakeState
if (state is HandshakeState.ReceivingNodeInfo) {
state.nodes.add(info)
} else {
Logger.w { "Ignoring NodeInfo outside Stage 2 (state=$state)" }
}
}
override fun handleFileInfo(info: FileInfo) {
@ -177,46 +250,38 @@ class MeshConfigFlowManagerImpl(
connectionManager.value.startConfigOnly()
}
private fun regenMyNodeInfo(metadata: DeviceMetadata? = null) {
val myInfo = rawMyNodeInfo
if (myInfo != null) {
try {
val mi =
with(myInfo) {
SharedMyNodeInfo(
myNodeNum = my_node_num,
hasGPS = false,
model =
when (val hwModel = metadata?.hw_model) {
null,
HardwareModel.UNSET,
-> null
else -> hwModel.name.replace('_', '-').replace('p', '.').lowercase()
},
firmwareVersion = metadata?.firmware_version?.takeIf { it.isNotBlank() },
couldUpdate = false,
shouldUpdate = false,
currentPacketId = commandSender.getCurrentPacketId() and 0xffffffffL,
messageTimeoutMsec = 300000,
minAppVersion = min_app_version,
maxChannels = 8,
hasWifi = metadata?.hasWifi == true,
channelUtilization = 0f,
airUtilTx = 0f,
deviceId = device_id.utf8(),
pioEnv = myInfo.pio_env.ifEmpty { null },
)
}
if (metadata != null && metadata != DeviceMetadata()) {
scope.handledLaunch { nodeRepository.insertMetadata(mi.myNodeNum, metadata) }
}
newMyNodeInfo = mi
Logger.d { "newMyNodeInfo updated: nodeNum=${mi.myNodeNum} model=${mi.model} fw=${mi.firmwareVersion}" }
} catch (@Suppress("TooGenericExceptionCaught") ex: Exception) {
Logger.e(ex) { "Failed to regenMyNodeInfo" }
}
} else {
Logger.v { "regenMyNodeInfo skipped: rawMyNodeInfo is null" }
/**
* Builds a [SharedMyNodeInfo] from the raw proto and optional firmware metadata. Pure function no side effects.
* Returns null only if construction throws.
*/
private fun buildMyNodeInfo(raw: ProtoMyNodeInfo, metadata: DeviceMetadata?): SharedMyNodeInfo? = try {
with(raw) {
SharedMyNodeInfo(
myNodeNum = my_node_num,
hasGPS = false,
model =
when (val hwModel = metadata?.hw_model) {
null,
HardwareModel.UNSET,
-> null
else -> hwModel.name.replace('_', '-').replace('p', '.').lowercase()
},
firmwareVersion = metadata?.firmware_version?.takeIf { it.isNotBlank() },
couldUpdate = false,
shouldUpdate = false,
currentPacketId = commandSender.getCurrentPacketId() and 0xffffffffL,
messageTimeoutMsec = 300000,
minAppVersion = min_app_version,
maxChannels = 8,
hasWifi = metadata?.hasWifi == true,
channelUtilization = 0f,
airUtilTx = 0f,
deviceId = device_id.utf8(),
pioEnv = pio_env.ifEmpty { null },
)
}
} catch (@Suppress("TooGenericExceptionCaught") ex: Exception) {
Logger.e(ex) { "Failed to build MyNodeInfo" }
null
}
}

View file

@ -16,15 +16,14 @@
*/
package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.repository.MeshConfigHandler
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.RadioConfigRepository
@ -42,7 +41,7 @@ class MeshConfigHandlerImpl(
private val serviceRepository: ServiceRepository,
private val nodeManager: NodeManager,
) : MeshConfigHandler {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private val _localConfig = MutableStateFlow(LocalConfig())
override val localConfig = _localConfig.asStateFlow()
@ -57,16 +56,18 @@ class MeshConfigHandlerImpl(
}
override fun handleDeviceConfig(config: Config) {
Logger.d { "Device config received: ${config.summarize()}" }
scope.handledLaunch { radioConfigRepository.setLocalConfig(config) }
serviceRepository.setConnectionProgress("Device config received")
}
override fun handleModuleConfig(config: ModuleConfig) {
Logger.d { "Module config received: ${config.summarize()}" }
scope.handledLaunch { radioConfigRepository.setLocalModuleConfig(config) }
serviceRepository.setConnectionProgress("Module config received")
config.statusmessage?.let { sm ->
nodeManager.myNodeNum?.let { num -> nodeManager.updateNodeStatus(num, sm.node_status) }
nodeManager.myNodeNum.value?.let { num -> nodeManager.updateNodeStatus(num, sm.node_status) }
}
}
@ -85,6 +86,40 @@ class MeshConfigHandlerImpl(
}
override fun handleDeviceUIConfig(config: DeviceUIConfig) {
Logger.d { "DeviceUI config received" }
scope.handledLaunch { radioConfigRepository.setDeviceUIConfig(config) }
}
}
/** Returns a short summary of which Config variant is set. */
private fun Config.summarize(): String = when {
device != null -> "device"
position != null -> "position"
power != null -> "power"
network != null -> "network"
display != null -> "display"
lora != null -> "lora"
bluetooth != null -> "bluetooth"
security != null -> "security"
else -> "unknown"
}
/** Returns a short summary of which ModuleConfig variant is set. */
@Suppress("CyclomaticComplexMethod")
private fun ModuleConfig.summarize(): String = when {
mqtt != null -> "mqtt"
serial != null -> "serial"
external_notification != null -> "external_notification"
store_forward != null -> "store_forward"
range_test != null -> "range_test"
telemetry != null -> "telemetry"
canned_message != null -> "canned_message"
audio != null -> "audio"
remote_hardware != null -> "remote_hardware"
neighbor_info != null -> "neighbor_info"
ambient_lighting != null -> "ambient_lighting"
detection_sensor != null -> "detection_sensor"
paxcounter != null -> "paxcounter"
statusmessage != null -> "statusmessage"
else -> "unknown"
}

View file

@ -21,7 +21,6 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
@ -29,7 +28,6 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.model.ConnectionState
@ -84,7 +82,7 @@ class MeshConnectionManagerImpl(
private val workerManager: MeshWorkerManager,
private val appWidgetUpdater: AppWidgetUpdater,
) : MeshConnectionManager {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private var sleepTimeout: Job? = null
private var locationRequestsJob: Job? = null
private var handshakeTimeout: Job? = null
@ -127,22 +125,20 @@ class MeshConnectionManagerImpl(
.launchIn(scope)
}
private fun onRadioConnectionState(newState: ConnectionState) {
scope.handledLaunch {
val localConfig = radioConfigRepository.localConfigFlow.first()
val isRouter = localConfig.device?.role == Config.DeviceConfig.Role.ROUTER
val lsEnabled = localConfig.power?.is_power_saving == true || isRouter
private suspend fun onRadioConnectionState(newState: ConnectionState) {
val localConfig = radioConfigRepository.localConfigFlow.first()
val isRouter = localConfig.device?.role == Config.DeviceConfig.Role.ROUTER
val lsEnabled = localConfig.power?.is_power_saving == true || isRouter
val effectiveState =
when (newState) {
is ConnectionState.Connected -> ConnectionState.Connected
is ConnectionState.DeviceSleep ->
if (lsEnabled) ConnectionState.DeviceSleep else ConnectionState.Disconnected
is ConnectionState.Connecting -> ConnectionState.Connecting
is ConnectionState.Disconnected -> ConnectionState.Disconnected
}
onConnectionChanged(effectiveState)
}
val effectiveState =
when (newState) {
is ConnectionState.Connected -> ConnectionState.Connected
is ConnectionState.DeviceSleep ->
if (lsEnabled) ConnectionState.DeviceSleep else ConnectionState.Disconnected
is ConnectionState.Connecting -> ConnectionState.Connecting
is ConnectionState.Disconnected -> ConnectionState.Disconnected
}
onConnectionChanged(effectiveState)
}
private fun onConnectionChanged(c: ConnectionState) {
@ -195,23 +191,27 @@ class MeshConnectionManagerImpl(
// the stall is on our side, the retry will be dropped and the reconnect below
// will trigger instead — which is the right recovery in that case.
Logger.w {
"Handshake stall detected at Stage $stage — retrying, then reconnecting if still stalled."
"Handshake stall detected at Stage $stage — retrying, then reconnecting if still stalled"
}
action()
delay(HANDSHAKE_RETRY_TIMEOUT)
if (serviceRepository.connectionState.value is ConnectionState.Connecting) {
Logger.e { "Handshake still stalled after retry. Forcing reconnect." }
Logger.e { "Handshake still stalled after retry, forcing reconnect" }
onConnectionChanged(ConnectionState.Disconnected)
}
}
}
}
private fun handleDeviceSleep() {
serviceRepository.setConnectionState(ConnectionState.DeviceSleep)
private fun tearDownConnection() {
packetHandler.stopPacketQueue()
locationManager.stop()
mqttManager.stop()
}
private fun handleDeviceSleep() {
serviceRepository.setConnectionState(ConnectionState.DeviceSleep)
tearDownConnection()
if (connectTimeMsec != 0L) {
val now = nowMillis
@ -230,7 +230,7 @@ class MeshConnectionManagerImpl(
val timeout = (localConfig.power?.ls_secs ?: 0) + DEVICE_SLEEP_TIMEOUT_SECONDS
Logger.d { "Waiting for sleeping device, timeout=$timeout secs" }
delay(timeout.seconds)
Logger.w { "Device timeout out, setting disconnected" }
Logger.w { "Device timed out, setting disconnected" }
onConnectionChanged(ConnectionState.Disconnected)
} catch (_: CancellationException) {
Logger.d { "device sleep timeout cancelled" }
@ -242,9 +242,7 @@ class MeshConnectionManagerImpl(
private fun handleDisconnected() {
serviceRepository.setConnectionState(ConnectionState.Disconnected)
packetHandler.stopPacketQueue()
locationManager.stop()
mqttManager.stop()
tearDownConnection()
analytics.track(
EVENT_MESH_DISCONNECT,
@ -285,7 +283,7 @@ class MeshConnectionManagerImpl(
handshakeTimeout?.cancel()
handshakeTimeout = null
val myNodeNum = nodeManager.myNodeNum ?: 0
val myNodeNum = nodeManager.myNodeNum.value ?: 0
// Set device time now that the full node picture is ready. Sending this during Stage 1
// (onRadioConfigLoaded) introduced GATT write contention with the Stage 2 node-info burst.

View file

@ -20,14 +20,10 @@ import co.touchlab.kermit.Logger
import co.touchlab.kermit.Severity
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.model.DataPacket
@ -37,11 +33,8 @@ import org.meshtastic.core.model.Reaction
import org.meshtastic.core.model.util.MeshDataMapper
import org.meshtastic.core.model.util.decodeOrNull
import org.meshtastic.core.model.util.toOneLiner
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.AdminPacketHandler
import org.meshtastic.core.repository.DataPair
import org.meshtastic.core.repository.MeshConfigFlowManager
import org.meshtastic.core.repository.MeshConfigHandler
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.MeshDataHandler
import org.meshtastic.core.repository.MeshServiceNotifications
import org.meshtastic.core.repository.MessageFilter
@ -56,38 +49,33 @@ import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceBroadcasts
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.core.repository.StoreForwardPacketHandler
import org.meshtastic.core.repository.TelemetryPacketHandler
import org.meshtastic.core.repository.TracerouteHandler
import org.meshtastic.core.resources.Res
import org.meshtastic.core.resources.critical_alert
import org.meshtastic.core.resources.error_duty_cycle
import org.meshtastic.core.resources.getStringSuspend
import org.meshtastic.core.resources.low_battery_message
import org.meshtastic.core.resources.low_battery_title
import org.meshtastic.core.resources.unknown_username
import org.meshtastic.core.resources.waypoint_received
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.Paxcount
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.Position
import org.meshtastic.proto.Routing
import org.meshtastic.proto.StatusMessage
import org.meshtastic.proto.Telemetry
import org.meshtastic.proto.User
import org.meshtastic.proto.Waypoint
import kotlin.time.Duration.Companion.milliseconds
/**
* Implementation of [MeshDataHandler] that decodes and routes incoming mesh data packets.
*
* This class handles the complexity of:
* 1. Mapping raw [MeshPacket] objects to domain-friendly [DataPacket] objects.
* 2. Routing packets to specialized handlers (e.g., Traceroute, NeighborInfo, SFPP).
* 2. Routing packets to specialized handlers (e.g., Traceroute, NeighborInfo, Telemetry, Admin, SFPP).
* 3. Managing message history and persistence.
* 4. Triggering notifications for various packet types (Text, Waypoints, Battery).
* 5. Tracking received telemetry for node updates.
* 4. Triggering notifications for various packet types (Text, Waypoints).
*/
@Suppress("LongParameterList", "TooManyFunctions", "LargeClass", "CyclomaticComplexMethod")
@Suppress("LongParameterList", "TooManyFunctions", "CyclomaticComplexMethod")
@Single
class MeshDataHandlerImpl(
private val nodeManager: NodeManager,
@ -99,24 +87,20 @@ class MeshDataHandlerImpl(
private val serviceNotifications: MeshServiceNotifications,
private val analytics: PlatformAnalytics,
private val dataMapper: MeshDataMapper,
private val configHandler: Lazy<MeshConfigHandler>,
private val configFlowManager: Lazy<MeshConfigFlowManager>,
private val commandSender: CommandSender,
private val connectionManager: Lazy<MeshConnectionManager>,
private val tracerouteHandler: TracerouteHandler,
private val neighborInfoHandler: NeighborInfoHandler,
private val radioConfigRepository: RadioConfigRepository,
private val messageFilter: MessageFilter,
private val storeForwardHandler: StoreForwardPacketHandler,
private val telemetryHandler: TelemetryPacketHandler,
private val adminPacketHandler: AdminPacketHandler,
) : MeshDataHandler {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private val batteryMutex = Mutex()
private val batteryPercentCooldowns = mutableMapOf<Int, Long>()
private lateinit var scope: CoroutineScope
override fun start(scope: CoroutineScope) {
this.scope = scope
storeForwardHandler.start(scope)
telemetryHandler.start(scope)
}
private val rememberDataType =
@ -157,7 +141,7 @@ class MeshDataHandlerImpl(
PortNum.WAYPOINT_APP -> handleWaypoint(packet, dataPacket, myNodeNum)
PortNum.POSITION_APP -> handlePosition(packet, dataPacket, myNodeNum)
PortNum.NODEINFO_APP -> if (!fromUs) handleNodeInfo(packet)
PortNum.TELEMETRY_APP -> handleTelemetry(packet, dataPacket, myNodeNum)
PortNum.TELEMETRY_APP -> telemetryHandler.handleTelemetry(packet, dataPacket, myNodeNum)
else ->
shouldBroadcast =
handleSpecializedDataPacket(packet, dataPacket, myNodeNum, fromUs, logUuid, logInsertJob)
@ -198,7 +182,7 @@ class MeshDataHandlerImpl(
}
PortNum.ADMIN_APP -> {
handleAdminMessage(packet, myNodeNum)
adminPacketHandler.handleAdminMessage(packet, myNodeNum)
}
PortNum.NEIGHBORINFO_APP -> {
@ -255,37 +239,6 @@ class MeshDataHandlerImpl(
rememberDataPacket(dataPacket, myNodeNum, updateNotification = u.expire > currentSecond)
}
private fun handleAdminMessage(packet: MeshPacket, myNodeNum: Int) {
val payload = packet.decoded?.payload ?: return
val u = AdminMessage.ADAPTER.decode(payload)
// Guard against clearing a valid passkey: firmware always embeds the key in every
// admin response, but a missing (default-empty) field must not reset the stored value.
val incomingPasskey = u.session_passkey
if (incomingPasskey.size > 0) commandSender.setSessionPasskey(incomingPasskey)
val fromNum = packet.from
u.get_module_config_response?.let {
if (fromNum == myNodeNum) {
configHandler.value.handleModuleConfig(it)
} else {
it.statusmessage?.node_status?.let { nodeManager.updateNodeStatus(fromNum, it) }
}
}
if (fromNum == myNodeNum) {
u.get_config_response?.let { configHandler.value.handleDeviceConfig(it) }
u.get_channel_response?.let { configHandler.value.handleChannel(it) }
}
u.get_device_metadata_response?.let {
if (fromNum == myNodeNum) {
configFlowManager.value.handleLocalMetadata(it)
} else {
nodeManager.insertMetadata(fromNum, it)
}
}
}
private fun handleTextMessage(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val decoded = packet.decoded ?: return
if (decoded.reply_id != 0 && decoded.emoji != 0) {
@ -311,107 +264,6 @@ class MeshDataHandlerImpl(
rememberDataPacket(dataPacket, myNodeNum)
}
@Suppress("LongMethod")
private fun handleTelemetry(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val payload = packet.decoded?.payload ?: return
val t =
(Telemetry.ADAPTER.decodeOrNull(payload, Logger) ?: return).let {
if (it.time == 0) it.copy(time = (dataPacket.time.milliseconds.inWholeSeconds).toInt()) else it
}
Logger.d { "Telemetry from ${packet.from}: ${Telemetry.ADAPTER.toOneLiner(t)}" }
val fromNum = packet.from
val isRemote = (fromNum != myNodeNum)
if (!isRemote) {
connectionManager.value.updateTelemetry(t)
}
nodeManager.updateNode(fromNum) { node: Node ->
val metrics = t.device_metrics
val environment = t.environment_metrics
val power = t.power_metrics
var nextNode = node
when {
metrics != null -> {
nextNode = nextNode.copy(deviceMetrics = metrics)
if (fromNum == myNodeNum || (isRemote && node.isFavorite)) {
if (
(metrics.voltage ?: 0f) > BATTERY_PERCENT_UNSUPPORTED &&
(metrics.battery_level ?: 0) <= BATTERY_PERCENT_LOW_THRESHOLD
) {
scope.launch {
if (shouldBatteryNotificationShow(fromNum, t, myNodeNum)) {
notificationManager.dispatch(
Notification(
title =
getStringSuspend(
Res.string.low_battery_title,
nextNode.user.short_name,
),
message =
getStringSuspend(
Res.string.low_battery_message,
nextNode.user.long_name,
nextNode.deviceMetrics.battery_level ?: 0,
),
category = Notification.Category.Battery,
),
)
}
}
} else {
scope.launch {
batteryMutex.withLock {
if (batteryPercentCooldowns.containsKey(fromNum)) {
batteryPercentCooldowns.remove(fromNum)
}
}
notificationManager.cancel(nextNode.num)
}
}
}
}
environment != null -> nextNode = nextNode.copy(environmentMetrics = environment)
power != null -> nextNode = nextNode.copy(powerMetrics = power)
}
val telemetryTime = if (t.time != 0) t.time else nextNode.lastHeard
val newLastHeard = maxOf(nextNode.lastHeard, telemetryTime)
nextNode.copy(lastHeard = newLastHeard)
}
}
@Suppress("ReturnCount")
private suspend fun shouldBatteryNotificationShow(fromNum: Int, t: Telemetry, myNodeNum: Int): Boolean {
val isRemote = (fromNum != myNodeNum)
var shouldDisplay = false
var forceDisplay = false
val metrics = t.device_metrics ?: return false
val batteryLevel = metrics.battery_level ?: 0
when {
batteryLevel <= BATTERY_PERCENT_CRITICAL_THRESHOLD -> {
shouldDisplay = true
forceDisplay = true
}
batteryLevel == BATTERY_PERCENT_LOW_THRESHOLD -> shouldDisplay = true
batteryLevel.mod(BATTERY_PERCENT_LOW_DIVISOR) == 0 && !isRemote -> shouldDisplay = true
isRemote -> shouldDisplay = true
}
if (shouldDisplay) {
val now = nowSeconds
batteryMutex.withLock {
if (!batteryPercentCooldowns.containsKey(fromNum)) batteryPercentCooldowns[fromNum] = 0L
if ((now - batteryPercentCooldowns[fromNum]!!) >= BATTERY_PERCENT_COOLDOWN_SECONDS || forceDisplay) {
batteryPercentCooldowns[fromNum] = now
return true
}
}
}
return false
}
private fun handleRouting(packet: MeshPacket, dataPacket: DataPacket) {
val payload = packet.decoded?.payload ?: return
val r = Routing.ADAPTER.decodeOrNull(payload, Logger) ?: return
@ -628,12 +480,13 @@ class MeshDataHandlerImpl(
return@handledLaunch
}
packetRepository.value.insertReaction(reaction, nodeManager.myNodeNum ?: 0)
packetRepository.value.insertReaction(reaction, nodeManager.myNodeNum.value ?: 0)
// Find the original packet to get the contactKey
packetRepository.value.getPacketByPacketId(decoded.reply_id)?.let { originalPacket ->
// Skip notification if the original message was filtered
val targetId = if (originalPacket.from == DataPacket.ID_LOCAL) originalPacket.to else originalPacket.from
val targetId =
if (originalPacket.from == DataPacket.ID_LOCAL) originalPacket.to else originalPacket.from
val contactKey = "${originalPacket.channel}$targetId"
val conversationMuted = packetRepository.value.getContactSettings(contactKey).isMuted
val nodeMuted = nodeManager.nodeDBbyID[fromId]?.isMuted == true
@ -642,7 +495,11 @@ class MeshDataHandlerImpl(
if (!isSilent) {
val channelName =
if (originalPacket.to == DataPacket.ID_BROADCAST) {
radioConfigRepository.channelSetFlow.first().settings.getOrNull(originalPacket.channel)?.name
radioConfigRepository.channelSetFlow
.first()
.settings
.getOrNull(originalPacket.channel)
?.name
} else {
null
}
@ -660,11 +517,5 @@ class MeshDataHandlerImpl(
companion object {
private const val HOPS_AWAY_UNAVAILABLE = -1
private const val BATTERY_PERCENT_UNSUPPORTED = 0.0
private const val BATTERY_PERCENT_LOW_THRESHOLD = 20
private const val BATTERY_PERCENT_LOW_DIVISOR = 5
private const val BATTERY_PERCENT_CRITICAL_THRESHOLD = 5
private const val BATTERY_PERCENT_COOLDOWN_SECONDS = 1500
}
}

View file

@ -19,7 +19,6 @@ package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@ -27,7 +26,6 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.model.MeshLog
@ -55,7 +53,7 @@ class MeshMessageProcessorImpl(
private val router: Lazy<MeshRouter>,
private val fromRadioDispatcher: FromRadioPacketHandler,
) : MeshMessageProcessor {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private val mapsMutex = Mutex()
private val logUuidByPacketId = mutableMapOf<Int, String>()
@ -152,6 +150,7 @@ class MeshMessageProcessorImpl(
earlyMutex.withLock {
val queueSize = earlyReceivedPackets.size
if (queueSize >= maxEarlyPacketBuffer) {
Logger.w { "Early packet buffer full ($queueSize), dropping oldest packet" }
earlyReceivedPackets.removeFirstOrNull()
}
earlyReceivedPackets.addLast(preparedPacket)
@ -162,16 +161,17 @@ class MeshMessageProcessorImpl(
private fun flushEarlyReceivedPackets(reason: String) {
scope.launch {
val packets = earlyMutex.withLock {
if (earlyReceivedPackets.isEmpty()) return@withLock emptyList<MeshPacket>()
val list = earlyReceivedPackets.toList()
earlyReceivedPackets.clear()
list
}
val packets =
earlyMutex.withLock {
if (earlyReceivedPackets.isEmpty()) return@withLock emptyList<MeshPacket>()
val list = earlyReceivedPackets.toList()
earlyReceivedPackets.clear()
list
}
if (packets.isEmpty()) return@launch
Logger.d { "replayEarlyPackets reason=$reason count=${packets.size}" }
val myNodeNum = nodeManager.myNodeNum
val myNodeNum = nodeManager.myNodeNum.value
packets.forEach { processReceivedMeshPacket(it, myNodeNum) }
}
}

View file

@ -20,12 +20,10 @@ import co.touchlab.kermit.Logger
import co.touchlab.kermit.Severity
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.network.repository.MQTTRepository
import org.meshtastic.core.repository.MqttManager
import org.meshtastic.core.repository.PacketHandler
@ -39,7 +37,7 @@ class MqttManagerImpl(
private val packetHandler: PacketHandler,
private val serviceRepository: ServiceRepository,
) : MqttManager {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private var mqttMessageFlow: Job? = null
override fun start(scope: CoroutineScope, enabled: Boolean, proxyToClientEnabled: Boolean) {

View file

@ -21,10 +21,8 @@ import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.collections.immutable.persistentMapOf
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.NumberFormatter
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.repository.NeighborInfoHandler
import org.meshtastic.core.repository.NodeManager
@ -39,7 +37,7 @@ class NeighborInfoHandlerImpl(
private val serviceRepository: ServiceRepository,
private val serviceBroadcasts: ServiceBroadcasts,
) : NeighborInfoHandler {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private val startTimes = atomic(persistentMapOf<Int, Long>())
@ -59,7 +57,7 @@ class NeighborInfoHandlerImpl(
// Store the last neighbor info from our connected radio
val from = packet.from
if (from == nodeManager.myNodeNum) {
if (from == nodeManager.myNodeNum.value) {
lastNeighborInfo = ni
Logger.d { "Stored last neighbor info from connected radio" }
}

View file

@ -21,13 +21,11 @@ import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.collections.immutable.persistentMapOf
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import okio.ByteString
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.DeviceMetrics
import org.meshtastic.core.model.EnvironmentMetrics
@ -62,7 +60,7 @@ class NodeManagerImpl(
private val serviceBroadcasts: ServiceBroadcasts,
private val notificationManager: NotificationManager,
) : NodeManager {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private val _nodeDBbyNodeNum = atomic(persistentMapOf<Int, Node>())
private val _nodeDBbyID = atomic(persistentMapOf<String, Node>())
@ -84,7 +82,11 @@ class NodeManagerImpl(
allowNodeDbWrites.value = allowed
}
override var myNodeNum: Int? = null
override val myNodeNum = MutableStateFlow<Int?>(null)
override fun setMyNodeNum(num: Int?) {
myNodeNum.value = num
}
override fun start(scope: CoroutineScope) {
this.scope = scope
@ -101,7 +103,7 @@ class NodeManagerImpl(
val byId = mutableMapOf<String, Node>()
nodes.values.forEach { byId[it.user.id] = it }
_nodeDBbyID.value = persistentMapOf<String, Node>().putAll(byId)
myNodeNum = nodeRepository.myNodeInfo.value?.myNodeNum
myNodeNum.value = nodeRepository.myNodeInfo.value?.myNodeNum
}
}
@ -110,7 +112,7 @@ class NodeManagerImpl(
_nodeDBbyID.value = persistentMapOf()
isNodeDbReady.value = false
allowNodeDbWrites.value = false
myNodeNum = null
myNodeNum.value = null
}
override fun getMyNodeInfo(): MyNodeInfo? {
@ -135,7 +137,7 @@ class NodeManagerImpl(
}
override fun getMyId(): String {
val num = myNodeNum ?: nodeRepository.myNodeInfo.value?.myNodeNum ?: return ""
val num = myNodeNum.value ?: nodeRepository.myNodeInfo.value?.myNodeNum ?: return ""
return _nodeDBbyNodeNum.value[num]?.user?.id ?: ""
}
@ -271,9 +273,8 @@ class NodeManagerImpl(
if (shouldPreserveExistingUser(node.user, user)) {
// keep existing names
} else {
var newUser = user.let {
if (it.is_licensed == true) it.copy(public_key = ByteString.EMPTY) else it
}
var newUser =
user.let { if (it.is_licensed == true) it.copy(public_key = ByteString.EMPTY) else it }
if (info.via_mqtt) {
newUser = newUser.copy(long_name = "${newUser.long_name} (MQTT)")
}

View file

@ -17,6 +17,7 @@
package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
@ -29,7 +30,6 @@ import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.model.ConnectionState
import org.meshtastic.core.model.DataPacket
@ -67,16 +67,21 @@ class PacketHandlerImpl(
}
private var queueJob: Job? = null
private var scope: CoroutineScope = CoroutineScope(ioDispatcher)
private lateinit var scope: CoroutineScope
private val queueMutex = Mutex()
private val queuedPackets = mutableListOf<MeshPacket>()
// Set to true by stopPacketQueue() under queueMutex. Checked by startPacketQueueLocked()
// and the queue processor's finally block to prevent restarting a stopped queue.
private var queueStopped = false
private val responseMutex = Mutex()
private val queueResponse = mutableMapOf<Int, CompletableDeferred<Boolean>>()
override fun start(scope: CoroutineScope) {
this.scope = scope
queueStopped = false // Safe: called before any concurrent operations on this scope.
}
override fun sendToRadio(p: ToRadio) {
@ -104,22 +109,52 @@ class PacketHandlerImpl(
override fun sendToRadio(packet: MeshPacket) {
scope.launch {
queueMutex.withLock { queuedPackets.add(packet) }
startPacketQueue()
queueMutex.withLock {
queuedPackets.add(packet)
startPacketQueueLocked()
}
}
}
@Suppress("TooGenericExceptionCaught", "SwallowedException")
override suspend fun sendToRadioAndAwait(packet: MeshPacket): Boolean {
// Pre-register the deferred so the queue processor and QueueStatus handler
// can find it immediately — no polling required.
val deferred = CompletableDeferred<Boolean>()
responseMutex.withLock { queueResponse[packet.id] = deferred }
queueMutex.withLock {
queuedPackets.add(packet)
startPacketQueueLocked()
}
return try {
withTimeout(TIMEOUT) { deferred.await() }
} catch (e: TimeoutCancellationException) {
Logger.d { "sendToRadioAndAwait packet id=${packet.id.toUInt()} timeout" }
false
} catch (e: CancellationException) {
throw e // Preserve structured concurrency cancellation propagation.
} catch (e: Exception) {
Logger.d { "sendToRadioAndAwait packet id=${packet.id.toUInt()} failed: ${e.message}" }
false
} finally {
responseMutex.withLock { queueResponse.remove(packet.id) }
}
}
override fun stopPacketQueue() {
if (queueJob?.isActive == true) {
// Run async so callers (non-suspend) don't block, but all mutations are
// serialized under the same mutexes used by the queue processor and senders.
scope.launch {
Logger.i { "Stopping packet queueJob" }
queueJob?.cancel()
queueJob = null
scope.launch {
queueMutex.withLock { queuedPackets.clear() }
responseMutex.withLock {
queueResponse.values.lastOrNull { !it.isCompleted }?.complete(false)
queueResponse.clear()
}
queueMutex.withLock {
queueStopped = true
queueJob?.cancel()
queueJob = null
queuedPackets.clear()
}
responseMutex.withLock {
queueResponse.values.forEach { if (!it.isCompleted) it.complete(false) }
queueResponse.clear()
}
}
}
@ -144,33 +179,47 @@ class PacketHandlerImpl(
scope.launch { responseMutex.withLock { queueResponse.remove(dataRequestId)?.complete(complete) } }
}
private fun startPacketQueue() {
/**
* Starts the packet queue processor. Must be called while holding [queueMutex] to ensure the check-then-start is
* atomic preventing two concurrent callers from launching duplicate processors.
*/
private fun startPacketQueueLocked() {
if (queueStopped) return
if (queueJob?.isActive == true) return
queueJob = scope.handledLaunch {
try {
while (serviceRepository.connectionState.value == ConnectionState.Connected) {
val packet = queueMutex.withLock { queuedPackets.removeFirstOrNull() } ?: break
@Suppress("TooGenericExceptionCaught", "SwallowedException")
try {
val response = sendPacket(packet)
Logger.d { "queueJob packet id=${packet.id.toUInt()} waiting" }
val success = withTimeout(TIMEOUT) { response.await() }
Logger.d { "queueJob packet id=${packet.id.toUInt()} success $success" }
} catch (e: TimeoutCancellationException) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} timeout" }
} catch (e: Exception) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} failed" }
} finally {
responseMutex.withLock { queueResponse.remove(packet.id) }
queueJob =
scope.handledLaunch {
try {
while (serviceRepository.connectionState.value == ConnectionState.Connected) {
val packet = queueMutex.withLock { queuedPackets.removeFirstOrNull() } ?: break
@Suppress("TooGenericExceptionCaught", "SwallowedException")
try {
val response = sendPacket(packet)
Logger.d { "queueJob packet id=${packet.id.toUInt()} waiting" }
val success = withTimeout(TIMEOUT) { response.await() }
Logger.d { "queueJob packet id=${packet.id.toUInt()} success $success" }
} catch (e: TimeoutCancellationException) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} timeout" }
} catch (e: CancellationException) {
throw e // Preserve structured concurrency cancellation propagation.
} catch (e: Exception) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} failed" }
}
// Do NOT remove from queueResponse here. Removal is owned by:
// - handleQueueStatus (normal completion path)
// - sendToRadioAndAwait's finally block (for await-style callers)
// - stopPacketQueue (bulk cleanup on disconnect)
}
} finally {
// Hold queueMutex so that clearing queueJob and the restart decision are
// atomic with respect to new senders calling startPacketQueueLocked().
queueMutex.withLock {
queueJob = null
if (!queueStopped && queuedPackets.isNotEmpty()) {
startPacketQueueLocked()
}
}
}
} finally {
queueJob = null
if (queueMutex.withLock { queuedPackets.isNotEmpty() }) {
startPacketQueue()
}
}
}
}
private fun changeStatus(packetId: Int, m: MessageStatus) = scope.handledLaunch {
@ -194,8 +243,8 @@ class PacketHandlerImpl(
@Suppress("TooGenericExceptionCaught")
private suspend fun sendPacket(packet: MeshPacket): CompletableDeferred<Boolean> {
val deferred = CompletableDeferred<Boolean>()
responseMutex.withLock { queueResponse[packet.id] = deferred }
// Reuse a deferred pre-registered by sendToRadioAndAwait, or create a new one.
val deferred = responseMutex.withLock { queueResponse.getOrPut(packet.id) { CompletableDeferred() } }
try {
if (serviceRepository.connectionState.value != ConnectionState.Connected) {
throw RadioNotConnectedException()

View file

@ -18,12 +18,10 @@ package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import okio.ByteString.Companion.toByteString
import okio.IOException
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MessageStatus
import org.meshtastic.core.model.util.SfppHasher
@ -48,7 +46,7 @@ class StoreForwardPacketHandlerImpl(
private val historyManager: HistoryManager,
private val dataHandler: Lazy<MeshDataHandler>,
) : StoreForwardPacketHandler {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
override fun start(scope: CoroutineScope) {
this.scope = scope
@ -116,7 +114,7 @@ class StoreForwardPacketHandlerImpl(
Logger.d {
"SFPP updateStatus: packetId=${sfpp.encapsulated_id} from=${sfpp.encapsulated_from} " +
"to=${sfpp.encapsulated_to} myNodeNum=${nodeManager.myNodeNum} status=$status"
"to=${sfpp.encapsulated_to} myNodeNum=${nodeManager.myNodeNum.value} status=$status"
}
scope.handledLaunch {
packetRepository.value.updateSFPPStatus(
@ -126,7 +124,7 @@ class StoreForwardPacketHandlerImpl(
hash = hash,
status = status,
rxTime = sfpp.encapsulated_rxtime.toLong() and 0xFFFFFFFFL,
myNodeNum = nodeManager.myNodeNum ?: 0,
myNodeNum = nodeManager.myNodeNum.value ?: 0,
)
serviceBroadcasts.broadcastMessageStatus(sfpp.encapsulated_id, status)
}
@ -145,10 +143,8 @@ class StoreForwardPacketHandlerImpl(
}
private fun handleReceivedStoreAndForward(dataPacket: DataPacket, s: StoreAndForward, myNodeNum: Int) {
Logger.d { "StoreAndForward: variant from ${dataPacket.from}" }
val h = s.history
val lastRequest = h?.last_request ?: 0
Logger.d { "rxStoreForward from=${dataPacket.from} lastRequest=$lastRequest" }
val lastRequest = s.history?.last_request ?: 0
Logger.d { "StoreAndForward from=${dataPacket.from} lastRequest=$lastRequest" }
when {
s.stats != null -> {
val text = s.stats.toString()
@ -159,7 +155,8 @@ class StoreForwardPacketHandlerImpl(
)
dataHandler.value.rememberDataPacket(u, myNodeNum)
}
h != null -> {
s.history != null -> {
val h = s.history!!
val text =
"Total messages: ${h.history_messages}\n" +
"History window: ${h.window.milliseconds.inWholeMinutes} min\n" +

View file

@ -0,0 +1,170 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.Node
import org.meshtastic.core.model.util.decodeOrNull
import org.meshtastic.core.model.util.toOneLiner
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.Notification
import org.meshtastic.core.repository.NotificationManager
import org.meshtastic.core.repository.TelemetryPacketHandler
import org.meshtastic.core.resources.Res
import org.meshtastic.core.resources.getStringSuspend
import org.meshtastic.core.resources.low_battery_message
import org.meshtastic.core.resources.low_battery_title
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.Telemetry
import kotlin.time.Duration.Companion.milliseconds
/**
* Implementation of [TelemetryPacketHandler] that processes telemetry packets and manages battery-level notifications
* with cooldown logic.
*/
@Single
class TelemetryPacketHandlerImpl(
private val nodeManager: NodeManager,
private val connectionManager: Lazy<MeshConnectionManager>,
private val notificationManager: NotificationManager,
) : TelemetryPacketHandler {
private lateinit var scope: CoroutineScope
private val batteryMutex = Mutex()
private val batteryPercentCooldowns = mutableMapOf<Int, Long>()
override fun start(scope: CoroutineScope) {
this.scope = scope
}
@Suppress("LongMethod", "CyclomaticComplexMethod")
override fun handleTelemetry(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val payload = packet.decoded?.payload ?: return
val t =
(Telemetry.ADAPTER.decodeOrNull(payload, Logger) ?: return).let {
if (it.time == 0) it.copy(time = (dataPacket.time.milliseconds.inWholeSeconds).toInt()) else it
}
Logger.d { "Telemetry from ${packet.from}: ${Telemetry.ADAPTER.toOneLiner(t)}" }
val fromNum = packet.from
val isRemote = (fromNum != myNodeNum)
if (!isRemote) {
connectionManager.value.updateTelemetry(t)
}
nodeManager.updateNode(fromNum) { node: Node ->
val metrics = t.device_metrics
val environment = t.environment_metrics
val power = t.power_metrics
var nextNode = node
when {
metrics != null -> {
nextNode = nextNode.copy(deviceMetrics = metrics)
if (fromNum == myNodeNum || (isRemote && node.isFavorite)) {
if (
(metrics.voltage ?: 0f) > BATTERY_PERCENT_UNSUPPORTED &&
(metrics.battery_level ?: 0) <= BATTERY_PERCENT_LOW_THRESHOLD
) {
scope.launch {
if (shouldBatteryNotificationShow(fromNum, t, myNodeNum)) {
notificationManager.dispatch(
Notification(
title =
getStringSuspend(
Res.string.low_battery_title,
nextNode.user.short_name,
),
message =
getStringSuspend(
Res.string.low_battery_message,
nextNode.user.long_name,
nextNode.deviceMetrics.battery_level ?: 0,
),
category = Notification.Category.Battery,
),
)
}
}
} else {
scope.launch {
batteryMutex.withLock {
if (batteryPercentCooldowns.containsKey(fromNum)) {
batteryPercentCooldowns.remove(fromNum)
}
}
notificationManager.cancel(nextNode.num)
}
}
}
}
environment != null -> nextNode = nextNode.copy(environmentMetrics = environment)
power != null -> nextNode = nextNode.copy(powerMetrics = power)
}
val telemetryTime = if (t.time != 0) t.time else nextNode.lastHeard
val newLastHeard = maxOf(nextNode.lastHeard, telemetryTime)
nextNode.copy(lastHeard = newLastHeard)
}
}
@Suppress("ReturnCount")
private suspend fun shouldBatteryNotificationShow(fromNum: Int, t: Telemetry, myNodeNum: Int): Boolean {
val isRemote = (fromNum != myNodeNum)
var shouldDisplay = false
var forceDisplay = false
val metrics = t.device_metrics ?: return false
val batteryLevel = metrics.battery_level ?: 0
when {
batteryLevel <= BATTERY_PERCENT_CRITICAL_THRESHOLD -> {
shouldDisplay = true
forceDisplay = true
}
batteryLevel == BATTERY_PERCENT_LOW_THRESHOLD -> shouldDisplay = true
batteryLevel.mod(BATTERY_PERCENT_LOW_DIVISOR) == 0 && !isRemote -> shouldDisplay = true
isRemote -> shouldDisplay = true
}
if (shouldDisplay) {
val now = nowSeconds
batteryMutex.withLock {
if (!batteryPercentCooldowns.containsKey(fromNum)) batteryPercentCooldowns[fromNum] = 0L
if ((now - batteryPercentCooldowns[fromNum]!!) >= BATTERY_PERCENT_COOLDOWN_SECONDS || forceDisplay) {
batteryPercentCooldowns[fromNum] = now
return true
}
}
}
return false
}
companion object {
private const val BATTERY_PERCENT_UNSUPPORTED = 0.0
private const val BATTERY_PERCENT_LOW_THRESHOLD = 20
private const val BATTERY_PERCENT_LOW_DIVISOR = 5
private const val BATTERY_PERCENT_CRITICAL_THRESHOLD = 5
private const val BATTERY_PERCENT_COOLDOWN_SECONDS = 1500
}
}

View file

@ -22,11 +22,9 @@ import kotlinx.atomicfu.update
import kotlinx.collections.immutable.persistentMapOf
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.NumberFormatter
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.model.fullRouteDiscovery
import org.meshtastic.core.model.getTracerouteResponse
@ -45,7 +43,7 @@ class TracerouteHandlerImpl(
private val tracerouteSnapshotRepository: TracerouteSnapshotRepository,
private val nodeRepository: NodeRepository,
) : TracerouteHandler {
private var scope: CoroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())
private lateinit var scope: CoroutineScope
private val startTimes = atomic(persistentMapOf<Int, Long>())

View file

@ -0,0 +1,224 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.mock
import dev.mokkery.verify
import okio.ByteString
import okio.ByteString.Companion.toByteString
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.MeshConfigFlowManager
import org.meshtastic.core.repository.MeshConfigHandler
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.Channel
import org.meshtastic.proto.Config
import org.meshtastic.proto.Data
import org.meshtastic.proto.DeviceMetadata
import org.meshtastic.proto.HardwareModel
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.ModuleConfig
import org.meshtastic.proto.PortNum
import kotlin.test.BeforeTest
import kotlin.test.Test
class AdminPacketHandlerImplTest {
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val configHandler = mock<MeshConfigHandler>(MockMode.autofill)
private val configFlowManager = mock<MeshConfigFlowManager>(MockMode.autofill)
private val commandSender = mock<CommandSender>(MockMode.autofill)
private lateinit var handler: AdminPacketHandlerImpl
private val myNodeNum = 12345
@BeforeTest
fun setUp() {
handler =
AdminPacketHandlerImpl(
nodeManager = nodeManager,
configHandler = lazy { configHandler },
configFlowManager = lazy { configFlowManager },
commandSender = commandSender,
)
}
private fun makePacket(from: Int, adminMessage: AdminMessage): MeshPacket {
val payload = AdminMessage.ADAPTER.encode(adminMessage).toByteString()
return MeshPacket(from = from, decoded = Data(portnum = PortNum.ADMIN_APP, payload = payload))
}
// ---------- Session passkey ----------
@Test
fun `session passkey is updated when present`() {
val passkey = ByteString.of(1, 2, 3, 4)
val adminMsg = AdminMessage(session_passkey = passkey)
val packet = makePacket(myNodeNum, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
verify { commandSender.setSessionPasskey(passkey) }
}
@Test
fun `empty session passkey does not clear existing passkey`() {
val adminMsg = AdminMessage(session_passkey = ByteString.EMPTY)
val packet = makePacket(myNodeNum, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
// setSessionPasskey should NOT be called for empty passkey
}
// ---------- get_config_response ----------
@Test
fun `get_config_response from own node delegates to configHandler`() {
val config = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
val adminMsg = AdminMessage(get_config_response = config)
val packet = makePacket(myNodeNum, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
verify { configHandler.handleDeviceConfig(config) }
}
@Test
fun `get_config_response from remote node is ignored`() {
val config = Config(device = Config.DeviceConfig())
val adminMsg = AdminMessage(get_config_response = config)
val packet = makePacket(99999, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
// configHandler.handleDeviceConfig should NOT be called
}
// ---------- get_module_config_response ----------
@Test
fun `get_module_config_response from own node delegates to configHandler`() {
val moduleConfig = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
val adminMsg = AdminMessage(get_module_config_response = moduleConfig)
val packet = makePacket(myNodeNum, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
verify { configHandler.handleModuleConfig(moduleConfig) }
}
@Test
fun `get_module_config_response from remote node updates node status`() {
val moduleConfig = ModuleConfig(statusmessage = ModuleConfig.StatusMessageConfig(node_status = "Battery Low"))
val adminMsg = AdminMessage(get_module_config_response = moduleConfig)
val remoteNode = 99999
val packet = makePacket(remoteNode, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
verify { nodeManager.updateNodeStatus(remoteNode, "Battery Low") }
}
@Test
fun `get_module_config_response from remote without status message does not crash`() {
val moduleConfig = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
val adminMsg = AdminMessage(get_module_config_response = moduleConfig)
val packet = makePacket(99999, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
// No crash, no updateNodeStatus call
}
// ---------- get_channel_response ----------
@Test
fun `get_channel_response from own node delegates to configHandler`() {
val channel = Channel(index = 0)
val adminMsg = AdminMessage(get_channel_response = channel)
val packet = makePacket(myNodeNum, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
verify { configHandler.handleChannel(channel) }
}
@Test
fun `get_channel_response from remote node is ignored`() {
val channel = Channel(index = 0)
val adminMsg = AdminMessage(get_channel_response = channel)
val packet = makePacket(99999, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
// configHandler.handleChannel should NOT be called
}
// ---------- get_device_metadata_response ----------
@Test
fun `device metadata from own node delegates to configFlowManager`() {
val metadata = DeviceMetadata(firmware_version = "2.6.0", hw_model = HardwareModel.HELTEC_V3)
val adminMsg = AdminMessage(get_device_metadata_response = metadata)
val packet = makePacket(myNodeNum, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
verify { configFlowManager.handleLocalMetadata(metadata) }
}
@Test
fun `device metadata from remote node delegates to nodeManager`() {
val metadata = DeviceMetadata(firmware_version = "2.5.0", hw_model = HardwareModel.TBEAM)
val adminMsg = AdminMessage(get_device_metadata_response = metadata)
val remoteNode = 99999
val packet = makePacket(remoteNode, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
verify { nodeManager.insertMetadata(remoteNode, metadata) }
}
// ---------- Edge cases ----------
@Test
fun `packet with null decoded payload is ignored`() {
val packet = MeshPacket(from = myNodeNum, decoded = null)
handler.handleAdminMessage(packet, myNodeNum)
// No crash
}
@Test
fun `packet with empty payload bytes is ignored`() {
val packet =
MeshPacket(from = myNodeNum, decoded = Data(portnum = PortNum.ADMIN_APP, payload = ByteString.EMPTY))
handler.handleAdminMessage(packet, myNodeNum)
// No crash — decodes as default AdminMessage with no fields set
}
@Test
fun `combined admin message with passkey and config response`() {
val passkey = ByteString.of(5, 6, 7, 8)
val config = Config(lora = Config.LoRaConfig())
val adminMsg = AdminMessage(session_passkey = passkey, get_config_response = config)
val packet = makePacket(myNodeNum, adminMsg)
handler.handleAdminMessage(packet, myNodeNum)
verify { commandSender.setSessionPasskey(passkey) }
verify { configHandler.handleDeviceConfig(config) }
}
}

View file

@ -0,0 +1,583 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.everySuspend
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import dev.mokkery.verify.VerifyMode.Companion.not
import dev.mokkery.verifySuspend
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.common.database.DatabaseManager
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MeshUser
import org.meshtastic.core.model.Node
import org.meshtastic.core.model.Position
import org.meshtastic.core.model.service.ServiceAction
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.MeshDataHandler
import org.meshtastic.core.repository.MeshMessageProcessor
import org.meshtastic.core.repository.MeshPrefs
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.NotificationManager
import org.meshtastic.core.repository.PacketRepository
import org.meshtastic.core.repository.PlatformAnalytics
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceBroadcasts
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.Channel
import org.meshtastic.proto.Config
import org.meshtastic.proto.HardwareModel
import org.meshtastic.proto.ModuleConfig
import org.meshtastic.proto.SharedContact
import org.meshtastic.proto.User
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class MeshActionHandlerImplTest {
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val commandSender = mock<CommandSender>(MockMode.autofill)
private val packetRepository = mock<PacketRepository>(MockMode.autofill)
private val serviceBroadcasts = mock<ServiceBroadcasts>(MockMode.autofill)
private val dataHandler = mock<MeshDataHandler>(MockMode.autofill)
private val analytics = mock<PlatformAnalytics>(MockMode.autofill)
private val meshPrefs = mock<MeshPrefs>(MockMode.autofill)
private val databaseManager = mock<DatabaseManager>(MockMode.autofill)
private val notificationManager = mock<NotificationManager>(MockMode.autofill)
private val messageProcessor = mock<MeshMessageProcessor>(MockMode.autofill)
private val radioConfigRepository = mock<RadioConfigRepository>(MockMode.autofill)
private val myNodeNumFlow = MutableStateFlow<Int?>(MY_NODE_NUM)
private lateinit var handler: MeshActionHandlerImpl
private val testDispatcher = UnconfinedTestDispatcher()
private val testScope = TestScope(testDispatcher)
companion object {
private const val MY_NODE_NUM = 12345
private const val REMOTE_NODE_NUM = 67890
}
@BeforeTest
fun setUp() {
every { nodeManager.myNodeNum } returns myNodeNumFlow
every { nodeManager.getMyId() } returns "!12345678"
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
handler =
MeshActionHandlerImpl(
nodeManager = nodeManager,
commandSender = commandSender,
packetRepository = lazy { packetRepository },
serviceBroadcasts = serviceBroadcasts,
dataHandler = lazy { dataHandler },
analytics = analytics,
meshPrefs = meshPrefs,
databaseManager = databaseManager,
notificationManager = notificationManager,
messageProcessor = lazy { messageProcessor },
radioConfigRepository = radioConfigRepository,
)
}
// ---- handleUpdateLastAddress (device-switch path — P0 critical) ----
@Test
fun handleUpdateLastAddress_differentAddress_switchesDatabaseAndClearsState() = runTest(testDispatcher) {
handler.start(backgroundScope)
every { meshPrefs.deviceAddress } returns MutableStateFlow("old_addr")
everySuspend { databaseManager.switchActiveDatabase(any()) } returns Unit
handler.handleUpdateLastAddress("new_addr")
advanceUntilIdle()
verify { meshPrefs.setDeviceAddress("new_addr") }
verify { nodeManager.clear() }
verifySuspend { messageProcessor.clearEarlyPackets() }
verifySuspend { databaseManager.switchActiveDatabase("new_addr") }
verify { notificationManager.cancelAll() }
verify { nodeManager.loadCachedNodeDB() }
}
@Test
fun handleUpdateLastAddress_sameAddress_noOp() = runTest(testDispatcher) {
handler.start(backgroundScope)
every { meshPrefs.deviceAddress } returns MutableStateFlow("same_addr")
handler.handleUpdateLastAddress("same_addr")
advanceUntilIdle()
verify(not) { meshPrefs.setDeviceAddress(any()) }
verify(not) { nodeManager.clear() }
}
@Test
fun handleUpdateLastAddress_nullAddress_switchesIfDifferent() = runTest(testDispatcher) {
handler.start(backgroundScope)
every { meshPrefs.deviceAddress } returns MutableStateFlow("old_addr")
everySuspend { databaseManager.switchActiveDatabase(any()) } returns Unit
handler.handleUpdateLastAddress(null)
advanceUntilIdle()
verify { meshPrefs.setDeviceAddress(null) }
verify { nodeManager.clear() }
verifySuspend { databaseManager.switchActiveDatabase(null) }
}
@Test
fun handleUpdateLastAddress_nullToNull_noOp() = runTest(testDispatcher) {
handler.start(backgroundScope)
every { meshPrefs.deviceAddress } returns MutableStateFlow(null)
handler.handleUpdateLastAddress(null)
advanceUntilIdle()
verify(not) { meshPrefs.setDeviceAddress(any()) }
}
@Test
fun handleUpdateLastAddress_executesStepsInOrder() = runTest(testDispatcher) {
handler.start(backgroundScope)
every { meshPrefs.deviceAddress } returns MutableStateFlow("old")
everySuspend { databaseManager.switchActiveDatabase(any()) } returns Unit
handler.handleUpdateLastAddress("new")
advanceUntilIdle()
// Verify critical sequence: clear -> switchDB -> cancelNotifications -> loadCachedNodeDB
verify { nodeManager.clear() }
verifySuspend { databaseManager.switchActiveDatabase("new") }
verify { notificationManager.cancelAll() }
verify { nodeManager.loadCachedNodeDB() }
}
// ---- onServiceAction: null myNodeNum early-return ----
@Test
fun onServiceAction_nullMyNodeNum_doesNothing() = runTest(testDispatcher) {
handler.start(backgroundScope)
myNodeNumFlow.value = null
val node = createTestNode(REMOTE_NODE_NUM)
handler.onServiceAction(ServiceAction.Favorite(node))
advanceUntilIdle()
verify(not) { commandSender.sendAdmin(any(), any(), any(), any()) }
}
// ---- onServiceAction: Favorite ----
@Test
fun onServiceAction_favorite_sendsSetFavoriteWhenNotFavorite() = runTest(testDispatcher) {
handler.start(backgroundScope)
val node = createTestNode(REMOTE_NODE_NUM, isFavorite = false)
handler.onServiceAction(ServiceAction.Favorite(node))
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verify { nodeManager.updateNode(any(), any(), any(), any()) }
}
@Test
fun onServiceAction_favorite_sendsRemoveFavoriteWhenAlreadyFavorite() = runTest(testDispatcher) {
handler.start(backgroundScope)
val node = createTestNode(REMOTE_NODE_NUM, isFavorite = true)
handler.onServiceAction(ServiceAction.Favorite(node))
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verify { nodeManager.updateNode(any(), any(), any(), any()) }
}
// ---- onServiceAction: Ignore ----
@Test
fun onServiceAction_ignore_togglesAndUpdatesFilteredBySender() = runTest(testDispatcher) {
handler.start(backgroundScope)
val node = createTestNode(REMOTE_NODE_NUM, isIgnored = false)
handler.onServiceAction(ServiceAction.Ignore(node))
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verify { nodeManager.updateNode(any(), any(), any(), any()) }
verifySuspend { packetRepository.updateFilteredBySender(any(), any()) }
}
// ---- onServiceAction: Mute ----
@Test
fun onServiceAction_mute_togglesMutedState() = runTest(testDispatcher) {
handler.start(backgroundScope)
val node = createTestNode(REMOTE_NODE_NUM, isMuted = false)
handler.onServiceAction(ServiceAction.Mute(node))
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verify { nodeManager.updateNode(any(), any(), any(), any()) }
}
// ---- onServiceAction: GetDeviceMetadata ----
@Test
fun onServiceAction_getDeviceMetadata_sendsAdminRequest() = runTest(testDispatcher) {
handler.start(backgroundScope)
handler.onServiceAction(ServiceAction.GetDeviceMetadata(REMOTE_NODE_NUM))
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
}
// ---- onServiceAction: SendContact ----
@Test
fun onServiceAction_sendContact_completesWithTrueOnSuccess() = runTest(testDispatcher) {
handler.start(backgroundScope)
everySuspend { commandSender.sendAdminAwait(any(), any(), any(), any()) } returns true
val action = ServiceAction.SendContact(SharedContact())
handler.onServiceAction(action)
advanceUntilIdle()
assertTrue(action.result.isCompleted)
assertTrue(action.result.await())
}
@Test
fun onServiceAction_sendContact_completesWithFalseOnFailure() = runTest(testDispatcher) {
handler.start(backgroundScope)
everySuspend { commandSender.sendAdminAwait(any(), any(), any(), any()) } returns false
val action = ServiceAction.SendContact(SharedContact())
handler.onServiceAction(action)
advanceUntilIdle()
assertTrue(action.result.isCompleted)
assertFalse(action.result.await())
}
// ---- onServiceAction: ImportContact ----
@Test
fun onServiceAction_importContact_sendsAdminAndUpdatesNode() = runTest(testDispatcher) {
handler.start(backgroundScope)
val contact =
SharedContact(node_num = REMOTE_NODE_NUM, user = User(id = "!abcdef12", long_name = "TestUser"))
handler.onServiceAction(ServiceAction.ImportContact(contact))
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verify { nodeManager.handleReceivedUser(any(), any(), any(), any()) }
}
// ---- handleSetOwner ----
@Test
fun handleSetOwner_sendsAdminAndUpdatesLocalNode() {
handler.start(testScope)
val meshUser =
MeshUser(
id = "!12345678",
longName = "Test Long",
shortName = "TL",
hwModel = HardwareModel.UNSET,
isLicensed = false,
)
handler.handleSetOwner(meshUser, MY_NODE_NUM)
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verify { nodeManager.handleReceivedUser(any(), any(), any(), any()) }
}
// ---- handleSend ----
@Test
fun handleSend_sendsDataAndBroadcastsStatus() {
handler.start(testScope)
val packet = DataPacket(to = "!deadbeef", dataType = 1, bytes = null, channel = 0)
handler.handleSend(packet, MY_NODE_NUM)
verify { commandSender.sendData(any()) }
verify { serviceBroadcasts.broadcastMessageStatus(any(), any()) }
verify { dataHandler.rememberDataPacket(any(), any(), any()) }
}
// ---- handleRequestPosition: 3 branches ----
@Test
fun handleRequestPosition_sameNode_doesNothing() {
handler.start(testScope)
handler.handleRequestPosition(MY_NODE_NUM, Position(0.0, 0.0, 0), MY_NODE_NUM)
verify(not) { commandSender.requestPosition(any(), any()) }
}
@Test
fun handleRequestPosition_provideLocation_validPosition_usesGivenPosition() {
handler.start(testScope)
every { meshPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(true)
val validPosition = Position(37.7749, -122.4194, 10)
handler.handleRequestPosition(REMOTE_NODE_NUM, validPosition, MY_NODE_NUM)
verify { commandSender.requestPosition(REMOTE_NODE_NUM, validPosition) }
}
@Test
fun handleRequestPosition_provideLocation_invalidPosition_fallsBackToNodeDB() {
handler.start(testScope)
every { meshPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(true)
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
val invalidPosition = Position(0.0, 0.0, 0)
handler.handleRequestPosition(REMOTE_NODE_NUM, invalidPosition, MY_NODE_NUM)
// Falls back to Position(0.0, 0.0, 0) when node has no position in DB
verify { commandSender.requestPosition(any(), any()) }
}
@Test
fun handleRequestPosition_doNotProvide_sendsZeroPosition() {
handler.start(testScope)
every { meshPrefs.shouldProvideNodeLocation(MY_NODE_NUM) } returns MutableStateFlow(false)
val validPosition = Position(37.7749, -122.4194, 10)
handler.handleRequestPosition(REMOTE_NODE_NUM, validPosition, MY_NODE_NUM)
// Should send zero position regardless of valid input
verify { commandSender.requestPosition(any(), any()) }
}
// ---- handleSetConfig: optimistic persist ----
@Test
fun handleSetConfig_decodesAndSendsAdmin_thenPersistsLocally() = runTest(testDispatcher) {
handler.start(backgroundScope)
everySuspend { radioConfigRepository.setLocalConfig(any()) } returns Unit
val config = Config(lora = Config.LoRaConfig(hop_limit = 5))
val payload = Config.ADAPTER.encode(config)
handler.handleSetConfig(payload, MY_NODE_NUM)
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verifySuspend { radioConfigRepository.setLocalConfig(any()) }
}
// ---- handleSetModuleConfig: conditional persist ----
@Test
fun handleSetModuleConfig_ownNode_persistsLocally() = runTest(testDispatcher) {
handler.start(backgroundScope)
myNodeNumFlow.value = MY_NODE_NUM
everySuspend { radioConfigRepository.setLocalModuleConfig(any()) } returns Unit
val moduleConfig = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
val payload = ModuleConfig.ADAPTER.encode(moduleConfig)
handler.handleSetModuleConfig(0, MY_NODE_NUM, payload)
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verifySuspend { radioConfigRepository.setLocalModuleConfig(any()) }
}
@Test
fun handleSetModuleConfig_remoteNode_doesNotPersistLocally() = runTest(testDispatcher) {
handler.start(backgroundScope)
myNodeNumFlow.value = MY_NODE_NUM
val moduleConfig = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
val payload = ModuleConfig.ADAPTER.encode(moduleConfig)
handler.handleSetModuleConfig(0, REMOTE_NODE_NUM, payload)
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verifySuspend(not) { radioConfigRepository.setLocalModuleConfig(any()) }
}
// ---- handleSetChannel: null payload guard ----
@Test
fun handleSetChannel_nonNullPayload_decodesAndPersists() = runTest(testDispatcher) {
handler.start(backgroundScope)
everySuspend { radioConfigRepository.updateChannelSettings(any()) } returns Unit
val channel = Channel(index = 1)
val payload = Channel.ADAPTER.encode(channel)
handler.handleSetChannel(payload, MY_NODE_NUM)
advanceUntilIdle()
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verifySuspend { radioConfigRepository.updateChannelSettings(any()) }
}
@Test
fun handleSetChannel_nullPayload_doesNothing() {
handler.start(testScope)
handler.handleSetChannel(null, MY_NODE_NUM)
verify(not) { commandSender.sendAdmin(any(), any(), any(), any()) }
}
// ---- handleRemoveByNodenum ----
@Test
fun handleRemoveByNodenum_removesAndSendsAdmin() {
handler.start(testScope)
handler.handleRemoveByNodenum(REMOTE_NODE_NUM, 99, MY_NODE_NUM)
verify { nodeManager.removeByNodenum(REMOTE_NODE_NUM) }
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
}
// ---- handleSetRemoteOwner ----
@Test
fun handleSetRemoteOwner_decodesAndSendsAdmin() {
handler.start(testScope)
val user = User(id = "!remote01", long_name = "Remote", short_name = "RM")
val payload = User.ADAPTER.encode(user)
handler.handleSetRemoteOwner(1, REMOTE_NODE_NUM, payload)
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
verify { nodeManager.handleReceivedUser(any(), any(), any(), any()) }
}
// ---- handleGetRemoteConfig: sessionkey vs regular ----
@Test
fun handleGetRemoteConfig_sessionkeyConfig_sendsDeviceMetadataRequest() {
handler.start(testScope)
handler.handleGetRemoteConfig(1, REMOTE_NODE_NUM, AdminMessage.ConfigType.SESSIONKEY_CONFIG.value)
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
}
@Test
fun handleGetRemoteConfig_regularConfig_sendsConfigRequest() {
handler.start(testScope)
handler.handleGetRemoteConfig(1, REMOTE_NODE_NUM, AdminMessage.ConfigType.LORA_CONFIG.value)
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
}
// ---- handleSetRemoteChannel: null payload guard ----
@Test
fun handleSetRemoteChannel_nullPayload_doesNothing() {
handler.start(testScope)
handler.handleSetRemoteChannel(1, REMOTE_NODE_NUM, null)
verify(not) { commandSender.sendAdmin(any(), any(), any(), any()) }
}
@Test
fun handleSetRemoteChannel_nonNullPayload_decodesAndSendsAdmin() {
handler.start(testScope)
val channel = Channel(index = 2)
val payload = Channel.ADAPTER.encode(channel)
handler.handleSetRemoteChannel(1, REMOTE_NODE_NUM, payload)
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
}
// ---- handleRequestRebootOta: null hash ----
@Test
fun handleRequestRebootOta_withNullHash_sendsAdmin() {
handler.start(testScope)
handler.handleRequestRebootOta(1, REMOTE_NODE_NUM, 0, null)
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
}
@Test
fun handleRequestRebootOta_withHash_sendsAdmin() {
handler.start(testScope)
val hash = byteArrayOf(0x01, 0x02, 0x03)
handler.handleRequestRebootOta(1, REMOTE_NODE_NUM, 1, hash)
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
}
// ---- handleRequestNodedbReset ----
@Test
fun handleRequestNodedbReset_sendsAdminWithPreserveFavorites() {
handler.start(testScope)
handler.handleRequestNodedbReset(1, REMOTE_NODE_NUM, preserveFavorites = true)
verify { commandSender.sendAdmin(any(), any(), any(), any()) }
}
// ---- Helper ----
private fun createTestNode(
num: Int,
isFavorite: Boolean = false,
isIgnored: Boolean = false,
isMuted: Boolean = false,
): Node = Node(
num = num,
user = User(id = "!${num.toString(16).padStart(8, '0')}", long_name = "Node $num", short_name = "N$num"),
isFavorite = isFavorite,
isIgnored = isIgnored,
isMuted = isMuted,
)
}

View file

@ -0,0 +1,377 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import dev.mokkery.verifySuspend
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import okio.ByteString.Companion.encodeUtf8
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.HandshakeConstants
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.PacketHandler
import org.meshtastic.core.repository.PlatformAnalytics
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceBroadcasts
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.proto.DeviceMetadata
import org.meshtastic.proto.FileInfo
import org.meshtastic.proto.HardwareModel
import org.meshtastic.proto.NodeInfo
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import org.meshtastic.proto.MyNodeInfo as ProtoMyNodeInfo
@OptIn(ExperimentalCoroutinesApi::class)
class MeshConfigFlowManagerImplTest {
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val connectionManager = mock<MeshConnectionManager>(MockMode.autofill)
private val nodeRepository = mock<NodeRepository>(MockMode.autofill)
private val radioConfigRepository = mock<RadioConfigRepository>(MockMode.autofill)
private val serviceRepository = mock<ServiceRepository>(MockMode.autofill)
private val serviceBroadcasts = mock<ServiceBroadcasts>(MockMode.autofill)
private val analytics = mock<PlatformAnalytics>(MockMode.autofill)
private val commandSender = mock<CommandSender>(MockMode.autofill)
private val packetHandler = mock<PacketHandler>(MockMode.autofill)
private val testDispatcher = StandardTestDispatcher()
private val testScope = TestScope(testDispatcher)
private lateinit var manager: MeshConfigFlowManagerImpl
private val myNodeNum = 12345
private val protoMyNodeInfo =
ProtoMyNodeInfo(
my_node_num = myNodeNum,
min_app_version = 30000,
device_id = "test-device".encodeUtf8(),
pio_env = "",
)
private val metadata =
DeviceMetadata(firmware_version = "2.6.0", hw_model = HardwareModel.HELTEC_V3, hasWifi = false)
@BeforeTest
fun setUp() {
every { commandSender.getCurrentPacketId() } returns 100
every { packetHandler.sendToRadio(any<org.meshtastic.proto.ToRadio>()) } returns Unit
every { nodeManager.nodeDBbyNodeNum } returns emptyMap()
every { nodeManager.myNodeNum } returns MutableStateFlow(null)
manager =
MeshConfigFlowManagerImpl(
nodeManager = nodeManager,
connectionManager = lazy { connectionManager },
nodeRepository = nodeRepository,
radioConfigRepository = radioConfigRepository,
serviceRepository = serviceRepository,
serviceBroadcasts = serviceBroadcasts,
analytics = analytics,
commandSender = commandSender,
packetHandler = packetHandler,
)
manager.start(testScope)
}
// ---------- handleMyInfo ----------
@Test
fun `handleMyInfo transitions to ReceivingConfig and sets myNodeNum`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
verify { nodeManager.setMyNodeNum(myNodeNum) }
}
@Test
fun `handleMyInfo clears persisted radio config`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
verifySuspend { radioConfigRepository.clearChannelSet() }
verifySuspend { radioConfigRepository.clearLocalConfig() }
verifySuspend { radioConfigRepository.clearLocalModuleConfig() }
verifySuspend { radioConfigRepository.clearDeviceUIConfig() }
verifySuspend { radioConfigRepository.clearFileManifest() }
}
// ---------- handleLocalMetadata ----------
@Test
fun `handleLocalMetadata persists metadata when in ReceivingConfig state`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
verifySuspend { nodeRepository.insertMetadata(myNodeNum, metadata) }
}
@Test
fun `handleLocalMetadata skips empty metadata`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
// Default/empty DeviceMetadata should not trigger insertMetadata
manager.handleLocalMetadata(DeviceMetadata())
advanceUntilIdle()
// insertMetadata should only have been called zero times for default metadata
// (we just verify no crash occurs)
}
@Test
fun `handleLocalMetadata ignored outside ReceivingConfig state`() = testScope.runTest {
// State is Idle — handleLocalMetadata should be a no-op
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
// No crash, no insertMetadata call
}
// ---------- handleConfigComplete Stage 1 ----------
@Test
fun `Stage 1 complete builds MyNodeInfo and transitions to ReceivingNodeInfo`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
verify { connectionManager.onRadioConfigLoaded() }
verify { connectionManager.startNodeInfoOnly() }
}
@Test
fun `Stage 1 complete without metadata still succeeds with null firmware version`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
// No metadata provided
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
verify { connectionManager.onRadioConfigLoaded() }
}
@Test
fun `Stage 1 complete id ignored when not in ReceivingConfig state`() = testScope.runTest {
// State is Idle — should be a no-op
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
// No crash, no onRadioConfigLoaded
}
@Test
fun `Duplicate Stage 1 config_complete does not re-trigger`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
// Now in ReceivingNodeInfo — a second Stage 1 complete should be ignored
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
}
// ---------- handleNodeInfo ----------
@Test
fun `handleNodeInfo accumulates nodes during Stage 2`() = testScope.runTest {
// Transition to Stage 2
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
// Now in ReceivingNodeInfo
manager.handleNodeInfo(NodeInfo(num = 100))
manager.handleNodeInfo(NodeInfo(num = 200))
assertEquals(2, manager.newNodeCount)
}
@Test
fun `handleNodeInfo ignored outside Stage 2`() = testScope.runTest {
// State is Idle
manager.handleNodeInfo(NodeInfo(num = 999))
assertEquals(0, manager.newNodeCount)
}
// ---------- handleConfigComplete Stage 2 ----------
@Test
fun `Stage 2 complete processes nodes and sets Connected state`() = testScope.runTest {
val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100)
every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode)
// Full handshake: MyInfo -> metadata -> Stage 1 complete -> nodes -> Stage 2 complete
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
manager.handleNodeInfo(NodeInfo(num = 100))
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
verify { nodeManager.installNodeInfo(any(), withBroadcast = false) }
verify { nodeManager.setNodeDbReady(true) }
verify { nodeManager.setAllowNodeDbWrites(true) }
verify { serviceBroadcasts.broadcastConnection() }
verify { connectionManager.onNodeDbReady() }
}
@Test
fun `Stage 2 complete id ignored when not in ReceivingNodeInfo state`() = testScope.runTest {
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
// No crash
}
@Test
fun `Stage 2 complete with no nodes still transitions to Connected`() = testScope.runTest {
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
// No handleNodeInfo calls — empty node list
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
verify { nodeManager.setNodeDbReady(true) }
verify { connectionManager.onNodeDbReady() }
}
// ---------- Unknown config_complete_id ----------
@Test
fun `Unknown config_complete_id is ignored`() = testScope.runTest {
manager.handleConfigComplete(99999)
advanceUntilIdle()
// No crash
}
// ---------- newNodeCount ----------
@Test
fun `newNodeCount returns 0 when not in ReceivingNodeInfo state`() {
assertEquals(0, manager.newNodeCount)
}
// ---------- handleFileInfo ----------
@Test
fun `handleFileInfo delegates to radioConfigRepository`() = testScope.runTest {
val fileInfo = FileInfo(file_name = "firmware.bin", size_bytes = 1024)
manager.handleFileInfo(fileInfo)
advanceUntilIdle()
verifySuspend { radioConfigRepository.addFileInfo(fileInfo) }
}
// ---------- triggerWantConfig ----------
@Test
fun `triggerWantConfig delegates to connectionManager startConfigOnly`() {
manager.triggerWantConfig()
verify { connectionManager.startConfigOnly() }
}
// ---------- Full handshake flow ----------
@Test
fun `Full handshake from Idle to Complete`() = testScope.runTest {
val testNode = org.meshtastic.core.testing.TestDataFactory.createTestNode(num = 100)
every { nodeManager.nodeDBbyNodeNum } returns mapOf(100 to testNode)
// Stage 0: Idle -> handleMyInfo
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
verify { nodeManager.setMyNodeNum(myNodeNum) }
// Receive metadata during Stage 1
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
// Stage 1 complete
manager.handleConfigComplete(HandshakeConstants.CONFIG_NONCE)
advanceUntilIdle()
verify { connectionManager.onRadioConfigLoaded() }
// Receive NodeInfo during Stage 2
manager.handleNodeInfo(NodeInfo(num = 100))
assertEquals(1, manager.newNodeCount)
// Stage 2 complete
manager.handleConfigComplete(HandshakeConstants.NODE_INFO_NONCE)
advanceUntilIdle()
verify { nodeManager.setNodeDbReady(true) }
verify { connectionManager.onNodeDbReady() }
// After complete, newNodeCount should be 0 (state is Complete)
assertEquals(0, manager.newNodeCount)
}
// ---------- Interrupted handshake ----------
@Test
fun `handleMyInfo resets stale handshake state`() = testScope.runTest {
// Start first handshake
manager.handleMyInfo(protoMyNodeInfo)
advanceUntilIdle()
manager.handleLocalMetadata(metadata)
advanceUntilIdle()
// Before Stage 1 completes, a new handleMyInfo arrives (device rebooted)
val newMyInfo = protoMyNodeInfo.copy(my_node_num = 99999)
manager.handleMyInfo(newMyInfo)
advanceUntilIdle()
verify { nodeManager.setMyNodeNum(99999) }
}
}

View file

@ -0,0 +1,230 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import dev.mokkery.verifySuspend
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.model.MyNodeInfo
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.proto.Channel
import org.meshtastic.proto.Config
import org.meshtastic.proto.DeviceUIConfig
import org.meshtastic.proto.LocalConfig
import org.meshtastic.proto.LocalModuleConfig
import org.meshtastic.proto.ModuleConfig
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class)
class MeshConfigHandlerImplTest {
private val radioConfigRepository = mock<RadioConfigRepository>(MockMode.autofill)
private val serviceRepository = mock<ServiceRepository>(MockMode.autofill)
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val localConfigFlow = MutableStateFlow(LocalConfig())
private val moduleConfigFlow = MutableStateFlow(LocalModuleConfig())
private val testDispatcher = UnconfinedTestDispatcher()
private lateinit var handler: MeshConfigHandlerImpl
@BeforeTest
fun setUp() {
every { radioConfigRepository.localConfigFlow } returns localConfigFlow
every { radioConfigRepository.moduleConfigFlow } returns moduleConfigFlow
handler =
MeshConfigHandlerImpl(
radioConfigRepository = radioConfigRepository,
serviceRepository = serviceRepository,
nodeManager = nodeManager,
)
}
// ---------- start and flow wiring ----------
@Test
fun `start wires localConfig flow from repository`() = runTest(testDispatcher) {
handler.start(backgroundScope)
val config = LocalConfig(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.ROUTER))
localConfigFlow.value = config
advanceUntilIdle()
assertEquals(config, handler.localConfig.value)
}
@Test
fun `start wires moduleConfig flow from repository`() = runTest(testDispatcher) {
handler.start(backgroundScope)
val config = LocalModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
moduleConfigFlow.value = config
advanceUntilIdle()
assertEquals(config, handler.moduleConfig.value)
}
// ---------- handleDeviceConfig ----------
@Test
fun `handleDeviceConfig persists config and updates progress`() = runTest(testDispatcher) {
handler.start(backgroundScope)
val config = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
handler.handleDeviceConfig(config)
advanceUntilIdle()
verifySuspend { radioConfigRepository.setLocalConfig(config) }
verify { serviceRepository.setConnectionProgress("Device config received") }
}
@Test
fun `handleDeviceConfig handles all config variants`() = runTest(testDispatcher) {
handler.start(backgroundScope)
val configs =
listOf(
Config(position = Config.PositionConfig()),
Config(power = Config.PowerConfig()),
Config(network = Config.NetworkConfig()),
Config(display = Config.DisplayConfig()),
Config(lora = Config.LoRaConfig()),
Config(bluetooth = Config.BluetoothConfig()),
Config(security = Config.SecurityConfig()),
)
for (config in configs) {
handler.handleDeviceConfig(config)
advanceUntilIdle()
}
// All should have been persisted (7 configs)
verifySuspend { radioConfigRepository.setLocalConfig(any()) }
}
// ---------- handleModuleConfig ----------
@Test
fun `handleModuleConfig persists config and updates progress`() = runTest(testDispatcher) {
handler.start(backgroundScope)
val config = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
handler.handleModuleConfig(config)
advanceUntilIdle()
verifySuspend { radioConfigRepository.setLocalModuleConfig(config) }
verify { serviceRepository.setConnectionProgress("Module config received") }
}
@Test
fun `handleModuleConfig with statusmessage updates node status`() = runTest(testDispatcher) {
handler.start(backgroundScope)
val myNum = 123
every { nodeManager.myNodeNum } returns MutableStateFlow<Int?>(myNum)
val config = ModuleConfig(statusmessage = ModuleConfig.StatusMessageConfig(node_status = "Active"))
handler.handleModuleConfig(config)
advanceUntilIdle()
verify { nodeManager.updateNodeStatus(myNum, "Active") }
}
@Test
fun `handleModuleConfig with statusmessage skipped when myNodeNum is null`() = runTest(testDispatcher) {
handler.start(backgroundScope)
every { nodeManager.myNodeNum } returns MutableStateFlow<Int?>(null)
val config = ModuleConfig(statusmessage = ModuleConfig.StatusMessageConfig(node_status = "Active"))
handler.handleModuleConfig(config)
advanceUntilIdle()
// No crash — updateNodeStatus should not be called
}
// ---------- handleChannel ----------
@Test
fun `handleChannel persists channel settings`() = runTest(testDispatcher) {
handler.start(backgroundScope)
val channel = Channel(index = 0)
handler.handleChannel(channel)
advanceUntilIdle()
verifySuspend { radioConfigRepository.updateChannelSettings(channel) }
}
@Test
fun `handleChannel shows progress with max channels when myNodeInfo available`() = runTest(testDispatcher) {
handler.start(backgroundScope)
every { nodeManager.getMyNodeInfo() } returns
MyNodeInfo(
myNodeNum = 123,
hasGPS = false,
model = null,
firmwareVersion = null,
couldUpdate = false,
shouldUpdate = false,
currentPacketId = 0L,
messageTimeoutMsec = 0,
minAppVersion = 0,
maxChannels = 8,
hasWifi = false,
channelUtilization = 0f,
airUtilTx = 0f,
deviceId = null,
)
val channel = Channel(index = 2)
handler.handleChannel(channel)
advanceUntilIdle()
verify { serviceRepository.setConnectionProgress("Channels (3 / 8)") }
}
@Test
fun `handleChannel shows progress without max channels when myNodeInfo unavailable`() = runTest(testDispatcher) {
handler.start(backgroundScope)
every { nodeManager.getMyNodeInfo() } returns null
val channel = Channel(index = 0)
handler.handleChannel(channel)
advanceUntilIdle()
verify { serviceRepository.setConnectionProgress("Channels (1)") }
}
// ---------- handleDeviceUIConfig ----------
@Test
fun `handleDeviceUIConfig persists config`() = runTest(testDispatcher) {
handler.start(backgroundScope)
val config = DeviceUIConfig()
handler.handleDeviceUIConfig(config)
advanceUntilIdle()
verifySuspend { radioConfigRepository.setDeviceUIConfig(config) }
}
}

View file

@ -255,7 +255,7 @@ class MeshConnectionManagerImplTest {
)
moduleConfigFlow.value = moduleConfig
every { commandSender.requestTelemetry(any(), any(), any()) } returns Unit
every { nodeManager.myNodeNum } returns 123
every { nodeManager.myNodeNum } returns MutableStateFlow(123)
every { mqttManager.start(any(), any(), any()) } returns Unit
every { historyManager.requestHistoryReplay(any(), any(), any(), any()) } returns Unit
every { nodeManager.getMyNodeInfo() } returns null

View file

@ -35,10 +35,7 @@ import org.meshtastic.core.model.ContactSettings
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.Node
import org.meshtastic.core.model.util.MeshDataMapper
import org.meshtastic.core.repository.CommandSender
import org.meshtastic.core.repository.MeshConfigFlowManager
import org.meshtastic.core.repository.MeshConfigHandler
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.AdminPacketHandler
import org.meshtastic.core.repository.MeshServiceNotifications
import org.meshtastic.core.repository.MessageFilter
import org.meshtastic.core.repository.NeighborInfoHandler
@ -51,6 +48,7 @@ import org.meshtastic.core.repository.RadioConfigRepository
import org.meshtastic.core.repository.ServiceBroadcasts
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.core.repository.StoreForwardPacketHandler
import org.meshtastic.core.repository.TelemetryPacketHandler
import org.meshtastic.core.repository.TracerouteHandler
import org.meshtastic.proto.ChannelSet
import org.meshtastic.proto.Data
@ -79,15 +77,13 @@ class MeshDataHandlerTest {
private val serviceNotifications: MeshServiceNotifications = mock(MockMode.autofill)
private val analytics: PlatformAnalytics = mock(MockMode.autofill)
private val dataMapper: MeshDataMapper = mock(MockMode.autofill)
private val configHandler: MeshConfigHandler = mock(MockMode.autofill)
private val configFlowManager: MeshConfigFlowManager = mock(MockMode.autofill)
private val commandSender: CommandSender = mock(MockMode.autofill)
private val connectionManager: MeshConnectionManager = mock(MockMode.autofill)
private val tracerouteHandler: TracerouteHandler = mock(MockMode.autofill)
private val neighborInfoHandler: NeighborInfoHandler = mock(MockMode.autofill)
private val radioConfigRepository: RadioConfigRepository = mock(MockMode.autofill)
private val messageFilter: MessageFilter = mock(MockMode.autofill)
private val storeForwardHandler: StoreForwardPacketHandler = mock(MockMode.autofill)
private val telemetryHandler: TelemetryPacketHandler = mock(MockMode.autofill)
private val adminPacketHandler: AdminPacketHandler = mock(MockMode.autofill)
private val testDispatcher = StandardTestDispatcher()
private val testScope = TestScope(testDispatcher)
@ -105,15 +101,13 @@ class MeshDataHandlerTest {
serviceNotifications = serviceNotifications,
analytics = analytics,
dataMapper = dataMapper,
configHandler = lazy { configHandler },
configFlowManager = lazy { configFlowManager },
commandSender = commandSender,
connectionManager = lazy { connectionManager },
tracerouteHandler = tracerouteHandler,
neighborInfoHandler = neighborInfoHandler,
radioConfigRepository = radioConfigRepository,
messageFilter = messageFilter,
storeForwardHandler = storeForwardHandler,
telemetryHandler = telemetryHandler,
adminPacketHandler = adminPacketHandler,
)
handler.start(testScope)
@ -428,7 +422,7 @@ class MeshDataHandlerTest {
// --- Telemetry handling ---
@Test
fun `telemetry packet updates node via nodeManager`() {
fun `telemetry packet delegates to telemetryHandler`() {
val telemetry =
Telemetry(
time = 2000,
@ -451,11 +445,11 @@ class MeshDataHandlerTest {
handler.handleReceivedData(packet, 123)
verify { nodeManager.updateNode(456, any(), any(), any()) }
verify { telemetryHandler.handleTelemetry(packet, any(), 123) }
}
@Test
fun `telemetry from local node also updates connectionManager`() {
fun `telemetry from local node delegates to telemetryHandler`() {
val myNodeNum = 123
val telemetry =
Telemetry(
@ -479,7 +473,7 @@ class MeshDataHandlerTest {
handler.handleReceivedData(packet, myNodeNum)
verify { connectionManager.updateTelemetry(any()) }
verify { telemetryHandler.handleTelemetry(packet, any(), myNodeNum) }
}
// --- Text message handling ---
@ -490,10 +484,8 @@ class MeshDataHandlerTest {
MeshPacket(
id = 42,
from = 456,
decoded = Data(
portnum = PortNum.TEXT_MESSAGE_APP,
payload = "hello".encodeToByteArray().toByteString(),
),
decoded =
Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = "hello".encodeToByteArray().toByteString()),
)
val dataPacket =
DataPacket(
@ -510,7 +502,8 @@ class MeshDataHandlerTest {
// Provide sender node so getSenderName() doesn't fall back to getString (requires Skiko)
every { nodeManager.nodeDBbyID } returns
mapOf(
"!remote" to Node(num = 456, user = User(id = "!remote", long_name = "Remote User", short_name = "RU")),
"!remote" to
Node(num = 456, user = User(id = "!remote", long_name = "Remote User", short_name = "RU")),
)
handler.handleReceivedData(packet, 123)
@ -525,10 +518,8 @@ class MeshDataHandlerTest {
MeshPacket(
id = 42,
from = 456,
decoded = Data(
portnum = PortNum.TEXT_MESSAGE_APP,
payload = "hello".encodeToByteArray().toByteString(),
),
decoded =
Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = "hello".encodeToByteArray().toByteString()),
)
val dataPacket =
DataPacket(
@ -583,7 +574,7 @@ class MeshDataHandlerTest {
123 to Node(num = 123, user = User(id = "!local")),
)
everySuspend { packetRepository.findReactionsWithId(99) } returns emptyList()
every { nodeManager.myNodeNum } returns 123
every { nodeManager.myNodeNum } returns MutableStateFlow(123)
everySuspend { packetRepository.getPacketByPacketId(42) } returns null
handler.handleReceivedData(packet, 123)
@ -600,7 +591,8 @@ class MeshDataHandlerTest {
MeshPacket(
id = 55,
from = 456,
decoded = Data(portnum = PortNum.RANGE_TEST_APP, payload = "test".encodeToByteArray().toByteString()),
decoded =
Data(portnum = PortNum.RANGE_TEST_APP, payload = "test".encodeToByteArray().toByteString()),
)
val dataPacket =
DataPacket(
@ -616,7 +608,8 @@ class MeshDataHandlerTest {
every { messageFilter.shouldFilter(any(), any()) } returns false
every { nodeManager.nodeDBbyID } returns
mapOf(
"!remote" to Node(num = 456, user = User(id = "!remote", long_name = "Remote User", short_name = "RU")),
"!remote" to
Node(num = 456, user = User(id = "!remote", long_name = "Remote User", short_name = "RU")),
)
handler.handleReceivedData(packet, 123)
@ -629,7 +622,7 @@ class MeshDataHandlerTest {
// --- Admin message handling ---
@Test
fun `admin message sets session passkey`() {
fun `admin message delegates to adminPacketHandler`() {
val admin = org.meshtastic.proto.AdminMessage(session_passkey = okio.ByteString.of(1, 2, 3))
val packet =
MeshPacket(from = 123, decoded = Data(portnum = PortNum.ADMIN_APP, payload = admin.encode().toByteString()))
@ -644,7 +637,7 @@ class MeshDataHandlerTest {
handler.handleReceivedData(packet, 123)
verify { commandSender.setSessionPasskey(any()) }
verify { adminPacketHandler.handleAdminMessage(packet, 123) }
}
// --- Message filtering ---
@ -688,10 +681,8 @@ class MeshDataHandlerTest {
MeshPacket(
id = 88,
from = 456,
decoded = Data(
portnum = PortNum.TEXT_MESSAGE_APP,
payload = "hello".encodeToByteArray().toByteString(),
),
decoded =
Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = "hello".encodeToByteArray().toByteString()),
)
val dataPacket =
DataPacket(

View file

@ -0,0 +1,355 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import dev.mokkery.verifySuspend
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import okio.ByteString
import org.meshtastic.core.repository.FromRadioPacketHandler
import org.meshtastic.core.repository.MeshDataHandler
import org.meshtastic.core.repository.MeshLogRepository
import org.meshtastic.core.repository.MeshRouter
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.proto.Data
import org.meshtastic.proto.FromRadio
import org.meshtastic.proto.LogRecord
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.PortNum
import kotlin.test.BeforeTest
import kotlin.test.Test
@OptIn(ExperimentalCoroutinesApi::class)
class MeshMessageProcessorImplTest {
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val serviceRepository = mock<ServiceRepository>(MockMode.autofill)
private val meshLogRepository = mock<MeshLogRepository>(MockMode.autofill)
private val router = mock<MeshRouter>(MockMode.autofill)
private val fromRadioDispatcher = mock<FromRadioPacketHandler>(MockMode.autofill)
private val dataHandler = mock<MeshDataHandler>(MockMode.autofill)
private val testDispatcher = UnconfinedTestDispatcher()
private lateinit var processor: MeshMessageProcessorImpl
private val myNodeNum = 12345
private val isNodeDbReady = MutableStateFlow(false)
@BeforeTest
fun setUp() {
every { nodeManager.isNodeDbReady } returns isNodeDbReady
every { nodeManager.myNodeNum } returns MutableStateFlow<Int?>(myNodeNum)
every { router.dataHandler } returns dataHandler
processor =
MeshMessageProcessorImpl(
nodeManager = nodeManager,
serviceRepository = serviceRepository,
meshLogRepository = lazy { meshLogRepository },
router = lazy { router },
fromRadioDispatcher = fromRadioDispatcher,
)
}
// ---------- handleFromRadio: non-packet variants ----------
@Test
fun `handleFromRadio dispatches non-packet variants to fromRadioDispatcher`() = runTest(testDispatcher) {
processor.start(backgroundScope)
val logRecord = LogRecord(message = "test log")
val fromRadio = FromRadio(log_record = logRecord)
val bytes = FromRadio.ADAPTER.encode(fromRadio)
processor.handleFromRadio(bytes, myNodeNum)
advanceUntilIdle()
verify { fromRadioDispatcher.handleFromRadio(any()) }
}
@Test
fun `handleFromRadio falls back to LogRecord parsing when FromRadio fails`() = runTest(testDispatcher) {
processor.start(backgroundScope)
// Encode a raw LogRecord (not wrapped in FromRadio) — first decode as FromRadio fails,
// fallback decode as LogRecord succeeds
val logRecord = LogRecord(message = "fallback log")
val bytes = LogRecord.ADAPTER.encode(logRecord)
processor.handleFromRadio(bytes, myNodeNum)
advanceUntilIdle()
// Should have been dispatched as a FromRadio with log_record set
verify { fromRadioDispatcher.handleFromRadio(any()) }
}
@Test
fun `handleFromRadio with completely invalid bytes does not crash`() = runTest(testDispatcher) {
processor.start(backgroundScope)
// Invalid protobuf bytes — both parses should fail
val garbage = byteArrayOf(0xFF.toByte(), 0xFE.toByte(), 0xFD.toByte())
processor.handleFromRadio(garbage, myNodeNum)
advanceUntilIdle()
// No crash
}
// ---------- handleReceivedMeshPacket: early buffering ----------
@Test
fun `packets are buffered when node DB is not ready`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = false
val packet =
MeshPacket(
id = 1,
from = 999,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 1000,
)
processor.handleReceivedMeshPacket(packet, myNodeNum)
advanceUntilIdle()
// Packet should be buffered, not processed
// (no emitMeshPacket call since DB is not ready)
}
@Test
fun `buffered packets are flushed when node DB becomes ready`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = false
val packet =
MeshPacket(
id = 1,
from = 999,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 1000,
)
processor.handleReceivedMeshPacket(packet, myNodeNum)
advanceUntilIdle()
// Now make DB ready
isNodeDbReady.value = true
advanceUntilIdle()
// Buffered packet should have been flushed and processed
verifySuspend { serviceRepository.emitMeshPacket(any()) }
}
@Test
fun `early buffer overflow drops oldest packet`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = false
// The maxEarlyPacketBuffer is 10240 — we won't actually fill it in this test,
// but we test the boundary behavior conceptually. Instead, test that multiple
// packets are accumulated properly.
repeat(5) { i ->
val packet =
MeshPacket(
id = i,
from = 999,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 1000 + i,
)
processor.handleReceivedMeshPacket(packet, myNodeNum)
}
advanceUntilIdle()
// Flush
isNodeDbReady.value = true
advanceUntilIdle()
// All 5 packets should have been processed
verifySuspend { serviceRepository.emitMeshPacket(any()) }
}
// ---------- handleReceivedMeshPacket: rx_time normalization ----------
@Test
fun `packets with rx_time 0 get current time`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = true
val packet =
MeshPacket(
id = 1,
from = myNodeNum,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 0, // should be replaced with current time
)
processor.handleReceivedMeshPacket(packet, myNodeNum)
advanceUntilIdle()
verifySuspend { serviceRepository.emitMeshPacket(any()) }
}
@Test
fun `packets with non-zero rx_time keep their time`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = true
val packet =
MeshPacket(
id = 2,
from = myNodeNum,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 1700000000,
)
processor.handleReceivedMeshPacket(packet, myNodeNum)
advanceUntilIdle()
verifySuspend { serviceRepository.emitMeshPacket(any()) }
}
// ---------- handleReceivedMeshPacket: node updates ----------
@Test
fun `processReceivedMeshPacket updates myNode lastHeard`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = true
val packet =
MeshPacket(
id = 10,
from = 999,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 1700000000,
)
processor.handleReceivedMeshPacket(packet, myNodeNum)
advanceUntilIdle()
// Should have called updateNode for myNodeNum (lastHeard update)
verify { nodeManager.updateNode(myNodeNum, withBroadcast = true, any(), any()) }
}
@Test
fun `processReceivedMeshPacket updates sender node`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = true
val senderNode = 999
val packet =
MeshPacket(
id = 10,
from = senderNode,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 1700000000,
channel = 1,
)
processor.handleReceivedMeshPacket(packet, myNodeNum)
advanceUntilIdle()
// Should have called updateNode for the sender
verify { nodeManager.updateNode(senderNode, withBroadcast = false, any(), any()) }
}
// ---------- handleReceivedMeshPacket: null decoded ----------
@Test
fun `packet with null decoded is skipped`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = true
val packet = MeshPacket(id = 1, from = 999, decoded = null)
processor.handleReceivedMeshPacket(packet, myNodeNum)
advanceUntilIdle()
// No crash, no emitMeshPacket call (decoded is null so processReceivedMeshPacket returns early)
}
// ---------- handleReceivedMeshPacket: null myNodeNum ----------
@Test
fun `processReceivedMeshPacket with null myNodeNum skips node updates`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = true
val packet =
MeshPacket(
id = 10,
from = 999,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 1700000000,
)
processor.handleReceivedMeshPacket(packet, null)
advanceUntilIdle()
// emitMeshPacket should still be called, but node updates should be skipped
verifySuspend { serviceRepository.emitMeshPacket(any()) }
}
// ---------- clearEarlyPackets ----------
@Test
fun `clearEarlyPackets empties the buffer`() = runTest(testDispatcher) {
processor.start(backgroundScope)
isNodeDbReady.value = false
val packet =
MeshPacket(
id = 1,
from = 999,
decoded = Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = ByteString.EMPTY),
rx_time = 1000,
)
processor.handleReceivedMeshPacket(packet, myNodeNum)
advanceUntilIdle()
processor.clearEarlyPackets()
advanceUntilIdle()
// Now make DB ready — the buffer should be empty, nothing to flush
isNodeDbReady.value = true
advanceUntilIdle()
// emitMeshPacket should NOT have been called (buffer was cleared)
}
// ---------- logVariant ----------
@Test
fun `FromRadio log_record variant is logged as MeshLog`() = runTest(testDispatcher) {
processor.start(backgroundScope)
val logRecord = LogRecord(message = "device log")
val fromRadio = FromRadio(log_record = logRecord)
val bytes = FromRadio.ADAPTER.encode(fromRadio)
processor.handleFromRadio(bytes, myNodeNum)
advanceUntilIdle()
verifySuspend { meshLogRepository.insert(any()) }
}
}

View file

@ -188,7 +188,7 @@ class NodeManagerImplTest {
assertTrue(nodeManager.nodeDBbyNodeNum.isEmpty())
assertTrue(nodeManager.nodeDBbyID.isEmpty())
assertNull(nodeManager.myNodeNum)
assertNull(nodeManager.myNodeNum.value)
}
@Test

View file

@ -0,0 +1,341 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import dev.mokkery.verifySuspend
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import okio.ByteString
import okio.ByteString.Companion.toByteString
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.repository.HistoryManager
import org.meshtastic.core.repository.MeshDataHandler
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.PacketRepository
import org.meshtastic.core.repository.ServiceBroadcasts
import org.meshtastic.proto.Data
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.StoreAndForward
import org.meshtastic.proto.StoreForwardPlusPlus
import kotlin.test.BeforeTest
import kotlin.test.Test
@OptIn(ExperimentalCoroutinesApi::class)
class StoreForwardPacketHandlerImplTest {
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val packetRepository = mock<PacketRepository>(MockMode.autofill)
private val serviceBroadcasts = mock<ServiceBroadcasts>(MockMode.autofill)
private val historyManager = mock<HistoryManager>(MockMode.autofill)
private val dataHandler = mock<MeshDataHandler>(MockMode.autofill)
private val testDispatcher = StandardTestDispatcher()
private val testScope = TestScope(testDispatcher)
private lateinit var handler: StoreForwardPacketHandlerImpl
private val myNodeNum = 12345
@BeforeTest
fun setUp() {
every { nodeManager.myNodeNum } returns MutableStateFlow<Int?>(myNodeNum)
handler =
StoreForwardPacketHandlerImpl(
nodeManager = nodeManager,
packetRepository = lazy { packetRepository },
serviceBroadcasts = serviceBroadcasts,
historyManager = historyManager,
dataHandler = lazy { dataHandler },
)
handler.start(testScope)
}
private fun makeSfPacket(from: Int, sf: StoreAndForward): MeshPacket {
val payload = StoreAndForward.ADAPTER.encode(sf).toByteString()
return MeshPacket(from = from, decoded = Data(portnum = PortNum.STORE_FORWARD_APP, payload = payload))
}
private fun makeSfppPacket(from: Int, sfpp: StoreForwardPlusPlus): MeshPacket {
val payload = StoreForwardPlusPlus.ADAPTER.encode(sfpp).toByteString()
return MeshPacket(from = from, decoded = Data(portnum = PortNum.STORE_FORWARD_APP, payload = payload))
}
private fun makeDataPacket(from: Int): DataPacket = DataPacket(
id = 1,
time = 1700000000000L,
to = DataPacket.ID_BROADCAST,
from = DataPacket.nodeNumToDefaultId(from),
bytes = null,
dataType = PortNum.STORE_FORWARD_APP.value,
)
// ---------- Legacy S&F: stats ----------
@Test
fun `handleStoreAndForward stats creates text data packet`() = testScope.runTest {
val sf =
StoreAndForward(
stats = StoreAndForward.Statistics(messages_total = 100, messages_saved = 50, messages_max = 200),
)
val packet = makeSfPacket(999, sf)
val dataPacket = makeDataPacket(999)
handler.handleStoreAndForward(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { dataHandler.rememberDataPacket(any(), myNodeNum) }
}
// ---------- Legacy S&F: history ----------
@Test
fun `handleStoreAndForward history creates text packet and updates last request`() = testScope.runTest {
val sf =
StoreAndForward(
history =
StoreAndForward.History(history_messages = 42, window = 3600000, last_request = 1700000000),
)
val packet = makeSfPacket(999, sf)
val dataPacket = makeDataPacket(999)
handler.handleStoreAndForward(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { dataHandler.rememberDataPacket(any(), myNodeNum) }
verify { historyManager.updateStoreForwardLastRequest("router_history", 1700000000, "Unknown") }
}
// ---------- Legacy S&F: heartbeat ----------
@Test
fun `handleStoreAndForward heartbeat does not crash`() = testScope.runTest {
val sf = StoreAndForward(heartbeat = StoreAndForward.Heartbeat(period = 900, secondary = 1))
val packet = makeSfPacket(999, sf)
val dataPacket = makeDataPacket(999)
handler.handleStoreAndForward(packet, dataPacket, myNodeNum)
advanceUntilIdle()
// No crash, just logs
}
// ---------- Legacy S&F: text ----------
@Test
fun `handleStoreAndForward text with broadcast rr sets to broadcast`() = testScope.runTest {
val sf =
StoreAndForward(
text = "Hello from router".encodeToByteArray().toByteString(),
rr = StoreAndForward.RequestResponse.ROUTER_TEXT_BROADCAST,
)
val packet = makeSfPacket(999, sf)
val dataPacket = makeDataPacket(999)
handler.handleStoreAndForward(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { dataHandler.rememberDataPacket(any(), myNodeNum) }
}
@Test
fun `handleStoreAndForward text without broadcast rr preserves destination`() = testScope.runTest {
val sf =
StoreAndForward(
text = "Direct message".encodeToByteArray().toByteString(),
rr = StoreAndForward.RequestResponse.ROUTER_TEXT_DIRECT,
)
val packet = makeSfPacket(999, sf)
val dataPacket = makeDataPacket(999)
handler.handleStoreAndForward(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { dataHandler.rememberDataPacket(any(), myNodeNum) }
}
// ---------- Legacy S&F: null payload ----------
@Test
fun `handleStoreAndForward with null payload returns early`() = testScope.runTest {
val packet = MeshPacket(from = 999, decoded = null)
val dataPacket = makeDataPacket(999)
handler.handleStoreAndForward(packet, dataPacket, myNodeNum)
advanceUntilIdle()
// No crash
}
// ---------- Legacy S&F: empty message ----------
@Test
fun `handleStoreAndForward with no fields set does not crash`() = testScope.runTest {
val sf = StoreAndForward()
val packet = makeSfPacket(999, sf)
val dataPacket = makeDataPacket(999)
handler.handleStoreAndForward(packet, dataPacket, myNodeNum)
advanceUntilIdle()
// No crash — falls through to else branch
}
// ---------- SF++: LINK_PROVIDE ----------
@Test
fun `handleStoreForwardPlusPlus LINK_PROVIDE with message_hash updates status`() = testScope.runTest {
val sfpp =
StoreForwardPlusPlus(
sfpp_message_type = StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE,
encapsulated_id = 42,
encapsulated_from = 1000,
encapsulated_to = 2000,
message_hash = ByteString.of(0x01, 0x02, 0x03, 0x04),
commit_hash = ByteString.EMPTY,
)
val packet = makeSfppPacket(999, sfpp)
handler.handleStoreForwardPlusPlus(packet)
advanceUntilIdle()
verifySuspend { packetRepository.updateSFPPStatus(any(), any(), any(), any(), any(), any(), any()) }
verify { serviceBroadcasts.broadcastMessageStatus(42, any()) }
}
// ---------- SF++: CANON_ANNOUNCE ----------
@Test
fun `handleStoreForwardPlusPlus CANON_ANNOUNCE updates status by hash`() = testScope.runTest {
val sfpp =
StoreForwardPlusPlus(
sfpp_message_type = StoreForwardPlusPlus.SFPP_message_type.CANON_ANNOUNCE,
message_hash = ByteString.of(0xAA.toByte(), 0xBB.toByte()),
encapsulated_rxtime = 1700000000,
)
val packet = makeSfppPacket(999, sfpp)
handler.handleStoreForwardPlusPlus(packet)
advanceUntilIdle()
verifySuspend { packetRepository.updateSFPPStatusByHash(any(), any(), any()) }
}
// ---------- SF++: CHAIN_QUERY ----------
@Test
fun `handleStoreForwardPlusPlus CHAIN_QUERY logs info without crash`() = testScope.runTest {
val sfpp = StoreForwardPlusPlus(sfpp_message_type = StoreForwardPlusPlus.SFPP_message_type.CHAIN_QUERY)
val packet = makeSfppPacket(999, sfpp)
handler.handleStoreForwardPlusPlus(packet)
advanceUntilIdle()
// No crash, just logs
}
// ---------- SF++: LINK_REQUEST ----------
@Test
fun `handleStoreForwardPlusPlus LINK_REQUEST logs info without crash`() = testScope.runTest {
val sfpp = StoreForwardPlusPlus(sfpp_message_type = StoreForwardPlusPlus.SFPP_message_type.LINK_REQUEST)
val packet = makeSfppPacket(999, sfpp)
handler.handleStoreForwardPlusPlus(packet)
advanceUntilIdle()
// No crash, just logs
}
// ---------- SF++: invalid payload ----------
@Test
fun `handleStoreForwardPlusPlus with null payload returns early`() = testScope.runTest {
val packet = MeshPacket(from = 999, decoded = null)
handler.handleStoreForwardPlusPlus(packet)
advanceUntilIdle()
// No crash
}
// ---------- SF++: fragment types ----------
@Test
fun `handleStoreForwardPlusPlus LINK_PROVIDE_FIRSTHALF handled as link provide`() = testScope.runTest {
val sfpp =
StoreForwardPlusPlus(
sfpp_message_type = StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE_FIRSTHALF,
encapsulated_id = 55,
encapsulated_from = 1000,
encapsulated_to = 2000,
message_hash = ByteString.of(0x01, 0x02),
commit_hash = ByteString.EMPTY,
)
val packet = makeSfppPacket(999, sfpp)
handler.handleStoreForwardPlusPlus(packet)
advanceUntilIdle()
verifySuspend { packetRepository.updateSFPPStatus(any(), any(), any(), any(), any(), any(), any()) }
}
@Test
fun `handleStoreForwardPlusPlus LINK_PROVIDE_SECONDHALF handled as link provide`() = testScope.runTest {
val sfpp =
StoreForwardPlusPlus(
sfpp_message_type = StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE_SECONDHALF,
encapsulated_id = 56,
encapsulated_from = 1000,
encapsulated_to = 2000,
message_hash = ByteString.of(0x03, 0x04),
commit_hash = ByteString.EMPTY,
)
val packet = makeSfppPacket(999, sfpp)
handler.handleStoreForwardPlusPlus(packet)
advanceUntilIdle()
verifySuspend { packetRepository.updateSFPPStatus(any(), any(), any(), any(), any(), any(), any()) }
}
// ---------- SF++: commit_hash present changes status ----------
@Test
fun `handleStoreForwardPlusPlus LINK_PROVIDE with commit_hash sets SFPP_CONFIRMED`() = testScope.runTest {
val sfpp =
StoreForwardPlusPlus(
sfpp_message_type = StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE,
encapsulated_id = 77,
encapsulated_from = 1000,
encapsulated_to = 2000,
message_hash = ByteString.of(0x01, 0x02),
commit_hash = ByteString.of(0xAA.toByte()), // non-empty
)
val packet = makeSfppPacket(999, sfpp)
handler.handleStoreForwardPlusPlus(packet)
advanceUntilIdle()
verifySuspend { packetRepository.updateSFPPStatus(any(), any(), any(), any(), any(), any(), any()) }
}
}

View file

@ -0,0 +1,204 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.manager
import dev.mokkery.MockMode
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import okio.ByteString.Companion.toByteString
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.repository.MeshConnectionManager
import org.meshtastic.core.repository.NodeManager
import org.meshtastic.core.repository.NotificationManager
import org.meshtastic.proto.Data
import org.meshtastic.proto.DeviceMetrics
import org.meshtastic.proto.EnvironmentMetrics
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.PowerMetrics
import org.meshtastic.proto.Telemetry
import kotlin.test.BeforeTest
import kotlin.test.Test
@OptIn(ExperimentalCoroutinesApi::class)
class TelemetryPacketHandlerImplTest {
private val nodeManager = mock<NodeManager>(MockMode.autofill)
private val connectionManager = mock<MeshConnectionManager>(MockMode.autofill)
private val notificationManager = mock<NotificationManager>(MockMode.autofill)
private val testDispatcher = StandardTestDispatcher()
private val testScope = TestScope(testDispatcher)
private lateinit var handler: TelemetryPacketHandlerImpl
private val myNodeNum = 12345
private val remoteNodeNum = 99999
@BeforeTest
fun setUp() {
handler =
TelemetryPacketHandlerImpl(
nodeManager = nodeManager,
connectionManager = lazy { connectionManager },
notificationManager = notificationManager,
)
handler.start(testScope)
}
private fun makeTelemetryPacket(from: Int, telemetry: Telemetry): MeshPacket {
val payload = Telemetry.ADAPTER.encode(telemetry).toByteString()
return MeshPacket(
from = from,
decoded = Data(portnum = PortNum.TELEMETRY_APP, payload = payload),
rx_time = 1700000000,
)
}
private fun makeDataPacket(from: Int): DataPacket = DataPacket(
id = 1,
time = 1700000000000L,
to = DataPacket.ID_BROADCAST,
from = DataPacket.nodeNumToDefaultId(from),
bytes = null,
dataType = PortNum.TELEMETRY_APP.value,
)
// ---------- Device metrics from local node ----------
@Test
fun `local device metrics updates telemetry on connectionManager`() = testScope.runTest {
val telemetry =
Telemetry(time = 1700000000, device_metrics = DeviceMetrics(battery_level = 80, voltage = 4.1f))
val packet = makeTelemetryPacket(myNodeNum, telemetry)
val dataPacket = makeDataPacket(myNodeNum)
handler.handleTelemetry(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { connectionManager.updateTelemetry(any()) }
verify { nodeManager.updateNode(myNodeNum, any(), any(), any()) }
}
// ---------- Device metrics from remote node ----------
@Test
fun `remote device metrics updates node but not connectionManager`() = testScope.runTest {
val telemetry =
Telemetry(time = 1700000000, device_metrics = DeviceMetrics(battery_level = 90, voltage = 4.2f))
val packet = makeTelemetryPacket(remoteNodeNum, telemetry)
val dataPacket = makeDataPacket(remoteNodeNum)
handler.handleTelemetry(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { nodeManager.updateNode(remoteNodeNum, any(), any(), any()) }
}
// ---------- Environment metrics ----------
@Test
fun `environment metrics updates node with environment data`() = testScope.runTest {
val telemetry =
Telemetry(
time = 1700000000,
environment_metrics = EnvironmentMetrics(temperature = 25.5f, relative_humidity = 60.0f),
)
val packet = makeTelemetryPacket(remoteNodeNum, telemetry)
val dataPacket = makeDataPacket(remoteNodeNum)
handler.handleTelemetry(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { nodeManager.updateNode(remoteNodeNum, any(), any(), any()) }
}
// ---------- Power metrics ----------
@Test
fun `power metrics updates node with power data`() = testScope.runTest {
val telemetry = Telemetry(time = 1700000000, power_metrics = PowerMetrics(ch1_voltage = 3.3f))
val packet = makeTelemetryPacket(remoteNodeNum, telemetry)
val dataPacket = makeDataPacket(remoteNodeNum)
handler.handleTelemetry(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { nodeManager.updateNode(remoteNodeNum, any(), any(), any()) }
}
// ---------- Telemetry time handling ----------
@Test
fun `telemetry with time 0 gets time from dataPacket`() = testScope.runTest {
val telemetry = Telemetry(time = 0, device_metrics = DeviceMetrics(battery_level = 50, voltage = 3.8f))
val packet = makeTelemetryPacket(myNodeNum, telemetry)
val dataPacket = makeDataPacket(myNodeNum)
handler.handleTelemetry(packet, dataPacket, myNodeNum)
advanceUntilIdle()
verify { nodeManager.updateNode(myNodeNum, any(), any(), any()) }
}
// ---------- Null payload ----------
@Test
fun `handleTelemetry with null decoded payload returns early`() = testScope.runTest {
val packet = MeshPacket(from = myNodeNum, decoded = null)
val dataPacket = makeDataPacket(myNodeNum)
handler.handleTelemetry(packet, dataPacket, myNodeNum)
advanceUntilIdle()
// No crash
}
@Test
fun `handleTelemetry with empty payload bytes returns early`() = testScope.runTest {
val packet =
MeshPacket(
from = myNodeNum,
decoded = Data(portnum = PortNum.TELEMETRY_APP, payload = okio.ByteString.EMPTY),
)
val dataPacket = makeDataPacket(myNodeNum)
handler.handleTelemetry(packet, dataPacket, myNodeNum)
advanceUntilIdle()
// No crash — decodeOrNull returns null for empty payload
}
// ---------- Battery notification: healthy battery does NOT trigger ----------
@Test
fun `healthy battery level does not trigger low battery notification`() = testScope.runTest {
val telemetry =
Telemetry(time = 1700000000, device_metrics = DeviceMetrics(battery_level = 80, voltage = 4.0f))
val packet = makeTelemetryPacket(myNodeNum, telemetry)
val dataPacket = makeDataPacket(myNodeNum)
handler.handleTelemetry(packet, dataPacket, myNodeNum)
advanceUntilIdle()
// No dispatch call — battery is healthy
}
}