fix(node): Correct owner ID and local user detection (#4256)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2026-01-18 21:20:40 -06:00 committed by GitHub
parent f760feffe2
commit 3b0dda4491
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 183 additions and 133 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2025 Meshtastic LLC
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -14,7 +14,6 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.repository.radio
import android.annotation.SuppressLint
@ -401,6 +400,10 @@ constructor(
} ?: Logger.w { "[$address] toRadio characteristic unavailable, can't send data" }
}
override fun keepAlive() {
Logger.d { "[$address] BLE keepAlive" }
}
/** Closes the connection to the device. */
override fun close() {
runBlocking {

View file

@ -33,6 +33,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
@ -225,10 +226,6 @@ constructor(
}
}
if (radioIf is SerialInterface) {
keepAlive(System.currentTimeMillis())
}
// ignoreException { Logger.d { "FromRadio: ${MeshProtos.FromRadio.parseFrom(p }}" } }
try {
@ -277,10 +274,24 @@ constructor(
}
radioIf = interfaceFactory.createInterface(address)
startHeartbeat()
}
}
}
private var heartbeatJob: kotlinx.coroutines.Job? = null
private fun startHeartbeat() {
heartbeatJob?.cancel()
heartbeatJob =
serviceScope.launch {
while (true) {
delay(HEARTBEAT_INTERVAL_MILLIS)
keepAlive()
}
}
}
private fun stopInterface() {
val r = radioIf
Logger.i { "stopping interface $r" }

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2025 Meshtastic LLC
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -14,7 +14,6 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.repository.radio
import co.touchlab.kermit.Logger
@ -115,6 +114,10 @@ constructor(
}
}
override fun keepAlive() {
Logger.d { "[$address] Serial keepAlive" }
}
override fun sendBytes(p: ByteArray) {
val conn = connRef.get()
if (conn != null) {

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2025 Meshtastic LLC
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -14,10 +14,12 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.repository.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* An interface that assumes we are talking to a meshtastic device over some sort of stream connection (serial or TCP
@ -32,6 +34,8 @@ abstract class StreamInterface(protected val service: RadioInterfaceService) : I
private val debugLineBuf = kotlin.text.StringBuilder()
private val writeMutex = Mutex()
/** The index of the next byte we are hoping to receive */
private var ptr = 0
@ -74,15 +78,19 @@ abstract class StreamInterface(protected val service: RadioInterfaceService) : I
override fun handleSendToRadio(p: ByteArray) {
// This method is called from a continuation and it might show up late, so check for uart being null
val header = ByteArray(4)
header[0] = START1
header[1] = START2
header[2] = (p.size shr 8).toByte()
header[3] = (p.size and 0xff).toByte()
service.serviceScope.launch {
writeMutex.withLock {
val header = ByteArray(4)
header[0] = START1
header[1] = START2
header[2] = (p.size shr 8).toByte()
header[3] = (p.size and 0xff).toByte()
sendBytes(header)
sendBytes(p)
flushBytes()
sendBytes(header)
sendBytes(p)
flushBytes()
}
}
}
/** Print device serial debug output somewhere */

View file

@ -146,6 +146,15 @@ constructor(
}
}
override fun keepAlive() {
Logger.d { "[$address] TCP keepAlive" }
val heartbeat =
org.meshtastic.proto.MeshProtos.ToRadio.newBuilder()
.setHeartbeat(org.meshtastic.proto.MeshProtos.Heartbeat.getDefaultInstance())
.build()
handleSendToRadio(heartbeat.toByteArray())
}
// Create a socket to make the connection with the server
private suspend fun startConnect() = withContext(dispatchers.io) {
val attemptStart = System.currentTimeMillis()
@ -154,73 +163,68 @@ constructor(
val parts = address.split(":", limit = 2)
val host = parts[0]
val port = parts.getOrNull(1)?.toIntOrNull() ?: SERVICE_PORT
Logger.i { "[$address] Parsed address. Host: $host, Port: $port" }
Logger.d { "[$address] Resolving host '$host' and connecting to port $port..." }
try {
Socket(InetAddress.getByName(host), port).use { socket ->
socket.tcpNoDelay = true
socket.soTimeout = SOCKET_TIMEOUT
this@TCPInterface.socket = socket
Socket(InetAddress.getByName(host), port).use { socket ->
socket.tcpNoDelay = true
socket.keepAlive = true
socket.soTimeout = SOCKET_TIMEOUT
this@TCPInterface.socket = socket
val connectTime = System.currentTimeMillis() - attemptStart
connectionStartTime = System.currentTimeMillis()
Logger.i {
"[$address] TCP socket connected in ${connectTime}ms - " +
"Local: ${socket.localSocketAddress}, Remote: ${socket.remoteSocketAddress}"
}
val connectTime = System.currentTimeMillis() - attemptStart
connectionStartTime = System.currentTimeMillis()
Logger.i {
"[$address] TCP socket connected in ${connectTime}ms - " +
"Local: ${socket.localSocketAddress}, Remote: ${socket.remoteSocketAddress}"
}
BufferedOutputStream(socket.getOutputStream()).use { outputStream ->
outStream = outputStream
BufferedOutputStream(socket.getOutputStream()).use { outputStream ->
outStream = outputStream
BufferedInputStream(socket.getInputStream()).use { inputStream ->
super.connect()
BufferedInputStream(socket.getInputStream()).use { inputStream ->
super.connect()
retryCount = 1
backoffDelay = MIN_BACKOFF_MILLIS
retryCount = 1
backoffDelay = MIN_BACKOFF_MILLIS
var timeoutCount = 0
while (timeoutCount < SOCKET_RETRIES) {
try { // close after 90s of inactivity
val c = inputStream.read()
if (c == -1) {
Logger.w {
"[$address] TCP got EOF on stream after $packetsReceived packets received"
}
break
} else {
timeoutCount = 0
packetsReceived++
bytesReceived++
readChar(c.toByte())
var timeoutCount = 0
while (timeoutCount < SOCKET_RETRIES) {
try { // close after 90s of inactivity
val c = inputStream.read()
if (c == -1) {
Logger.w {
"[$address] TCP got EOF on stream after $packetsReceived packets received"
}
} catch (ex: SocketTimeoutException) {
timeoutCount++
timeoutEvents++
if (timeoutCount % TIMEOUT_LOG_INTERVAL == 0) {
Logger.d {
"[$address] TCP socket timeout count: $timeoutCount/$SOCKET_RETRIES " +
"(total timeouts: $timeoutEvents)"
}
}
// Ignore and start another read
break
} else {
timeoutCount = 0
packetsReceived++
bytesReceived++
readChar(c.toByte())
}
} catch (ex: SocketTimeoutException) {
timeoutCount++
timeoutEvents++
if (timeoutCount % TIMEOUT_LOG_INTERVAL == 0) {
Logger.d {
"[$address] TCP socket timeout count: $timeoutCount/$SOCKET_RETRIES " +
"(total timeouts: $timeoutEvents)"
}
}
// Ignore and start another read
}
if (timeoutCount >= SOCKET_RETRIES) {
val inactivityMs = SOCKET_RETRIES * SOCKET_TIMEOUT
Logger.w {
"[$address] TCP closing connection due to $SOCKET_RETRIES consecutive timeouts " +
"(${inactivityMs}ms of inactivity)"
}
}
if (timeoutCount >= SOCKET_RETRIES) {
val inactivityMs = SOCKET_RETRIES * SOCKET_TIMEOUT
Logger.w {
"[$address] TCP closing connection due to $SOCKET_RETRIES consecutive timeouts " +
"(${inactivityMs}ms of inactivity)"
}
}
}
onDeviceDisconnect(false)
}
} catch (e: IOException) {
Logger.e(e) { "[$address] Error connecting to $host:$port" }
throw e
onDeviceDisconnect(false)
}
}
}

View file

@ -33,6 +33,8 @@ import org.jetbrains.compose.resources.getString
import org.meshtastic.core.data.repository.NodeRepository
import org.meshtastic.core.data.repository.PacketRepository
import org.meshtastic.core.data.repository.RadioConfigRepository
import org.meshtastic.core.database.entity.ContactSettings
import org.meshtastic.core.database.entity.MyNodeEntity
import org.meshtastic.core.database.entity.Packet
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.util.getChannel
@ -41,6 +43,7 @@ import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.core.strings.Res
import org.meshtastic.core.strings.channel_name
import org.meshtastic.core.ui.viewmodel.stateInWhileSubscribed
import org.meshtastic.proto.AppOnlyProtos
import org.meshtastic.proto.channelSet
import javax.inject.Inject
import kotlin.collections.map as collectionsMap
@ -60,6 +63,10 @@ constructor(
val channels = radioConfigRepository.channelSetFlow.stateInWhileSubscribed(initialValue = channelSet {})
// Combine node info and myId to reduce argument count in subsequent combines
private val identityFlow: Flow<Pair<MyNodeEntity?, String?>> =
combine(nodeRepository.myNodeInfo, nodeRepository.myId) { info, id -> Pair(info, id) }
/**
* Non-paginated contact list.
*
@ -69,12 +76,13 @@ constructor(
* @see contactListPaged for the paginated version used in ContactsScreen
*/
val contactList =
combine(
nodeRepository.myNodeInfo,
packetRepository.getContacts(),
channels,
packetRepository.getContactSettings(),
) { myNodeInfo, contacts, channelSet, settings ->
combine(identityFlow, packetRepository.getContacts(), channels, packetRepository.getContactSettings()) {
identity,
contacts,
channelSet,
settings,
->
val (myNodeInfo, myId) = identity
val myNodeNum = myNodeInfo?.myNodeNum ?: return@combine emptyList()
// Add empty channel placeholders (always show Broadcast contacts, even when empty)
val placeholder =
@ -89,7 +97,7 @@ constructor(
val contactKey = packet.contact_key
// Determine if this is my message (originated on this device)
val fromLocal = data.from == DataPacket.ID_LOCAL
val fromLocal = data.from == DataPacket.ID_LOCAL || (myId != null && data.from == myId)
val toBroadcast = data.to == DataPacket.ID_BROADCAST
// grab usernames from NodeInfo
@ -126,21 +134,23 @@ constructor(
.stateInWhileSubscribed(initialValue = emptyList())
val contactListPaged: Flow<PagingData<Contact>> =
combine(nodeRepository.myNodeInfo, channels, packetRepository.getContactSettings()) {
myNodeInfo,
channelSet,
settings,
->
Triple(myNodeInfo?.myNodeNum, channelSet, settings)
combine(identityFlow, channels, packetRepository.getContactSettings()) { identity, channelSet, settings ->
val (myNodeInfo, myId) = identity
ContactsPagedParams(myNodeInfo?.myNodeNum, channelSet, settings, myId)
}
.flatMapLatest { (myNodeNum, channelSet, settings) ->
.flatMapLatest { params ->
val myNodeNum = params.myNodeNum
val channelSet = params.channelSet
val settings = params.settings
val myId = params.myId
packetRepository.getContactsPaged().map { pagingData ->
pagingData.map { packet ->
val data = packet.data
val contactKey = packet.contact_key
// Determine if this is my message (originated on this device)
val fromLocal = data.from == DataPacket.ID_LOCAL
val fromLocal = data.from == DataPacket.ID_LOCAL || (myId != null && data.from == myId)
val toBroadcast = data.to == DataPacket.ID_BROADCAST
// grab usernames from NodeInfo
@ -198,4 +208,11 @@ constructor(
}
private fun getUser(userId: String?) = nodeRepository.getUser(userId ?: DataPacket.ID_BROADCAST)
private data class ContactsPagedParams(
val myNodeNum: Int?,
val channelSet: AppOnlyProtos.ChannelSet,
val settings: Map<String, ContactSettings>,
val myId: String?,
)
}

View file

@ -23,8 +23,10 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.onEach
@ -56,15 +58,6 @@ constructor(
private val nodeInfoWriteDataSource: NodeInfoWriteDataSource,
private val dispatchers: CoroutineDispatchers,
) {
init {
// Backfill denormalized name columns for existing nodes on startup
processLifecycle.coroutineScope.launch {
processLifecycle.repeatOnLifecycle(Lifecycle.State.CREATED) {
withContext(dispatchers.io) { nodeInfoWriteDataSource.backfillDenormalizedNames() }
}
}
}
// hardware info about our local device (can be null)
val myNodeInfo: StateFlow<MyNodeEntity?> =
nodeInfoReadDataSource
@ -82,23 +75,35 @@ constructor(
val myId: StateFlow<String?>
get() = _myId
fun getNodeDBbyNum() =
nodeInfoReadDataSource.nodeDBbyNumFlow().map { map -> map.mapValues { (_, it) -> it.toEntity() } }
// A map from nodeNum to Node
val nodeDBbyNum: StateFlow<Map<Int, Node>> =
nodeInfoReadDataSource
.nodeDBbyNumFlow()
.mapLatest { map -> map.mapValues { (_, it) -> it.toModel() } }
.onEach {
val ourNodeInfo = it.values.firstOrNull()
_ourNodeInfo.value = ourNodeInfo
_myId.value = ourNodeInfo?.user?.id
}
.flowOn(dispatchers.io)
.conflate()
.stateIn(processLifecycle.coroutineScope, SharingStarted.Eagerly, emptyMap())
init {
// Backfill denormalized name columns for existing nodes on startup
processLifecycle.coroutineScope.launch {
processLifecycle.repeatOnLifecycle(Lifecycle.State.CREATED) {
withContext(dispatchers.io) { nodeInfoWriteDataSource.backfillDenormalizedNames() }
}
}
// Keep ourNodeInfo and myId correctly updated based on current connection and node DB
combine(nodeDBbyNum, myNodeInfo) { db, info -> info?.myNodeNum?.let { db[it] } }
.onEach { node ->
_ourNodeInfo.value = node
_myId.value = node?.user?.id
}
.launchIn(processLifecycle.coroutineScope)
}
fun getNodeDBbyNum() =
nodeInfoReadDataSource.nodeDBbyNumFlow().map { map -> map.mapValues { (_, it) -> it.toEntity() } }
fun getNode(userId: String): Node = nodeDBbyNum.value.values.find { it.user.id == userId }
?: Node(num = DataPacket.idToDefaultNodeNum(userId) ?: 0, user = getUser(userId))

View file

@ -178,22 +178,6 @@ private fun MapView.updateMarkers(
nodeClusterer.invalidate()
}
// private fun addWeatherLayer() {
// if (map.tileProvider.tileSource.name()
// .equals(CustomTileSource.getTileSource("ESRI World TOPO").name())
// ) {
// val layer = TilesOverlay(
// MapTileProviderBasic(
// activity,
// CustomTileSource.OPENWEATHER_RADAR
// ), context
// )
// layer.loadingBackgroundColor = Color.TRANSPARENT
// layer.loadingLineColor = Color.TRANSPARENT
// map.overlayManager.add(layer)
// }
// }
private fun cacheManagerCallback(onTaskComplete: () -> Unit, onTaskFailed: (Int) -> Unit) =
object : CacheManager.CacheManagerCallback {
override fun onTaskComplete() {
@ -225,7 +209,7 @@ private fun cacheManagerCallback(onTaskComplete: () -> Unit, onTaskFailed: (Int)
* @param navigateToNodeDetails Callback to navigate to the details screen of a selected node.
*/
@OptIn(ExperimentalPermissionsApi::class) // Added for Accompanist
@Suppress("CyclomaticComplexMethod", "LongMethod")
@Suppress("CyclomaticComplexMethod", "LongParameterList", "LongMethod")
@Composable
fun MapView(
mapViewModel: MapViewModel = hiltViewModel(),
@ -336,6 +320,7 @@ fun MapView(
val nodes by mapViewModel.nodes.collectAsStateWithLifecycle()
val waypoints by mapViewModel.waypoints.collectAsStateWithLifecycle(emptyMap())
val selectedWaypointId by mapViewModel.selectedWaypointId.collectAsStateWithLifecycle()
val myId by mapViewModel.myId.collectAsStateWithLifecycle()
LaunchedEffect(selectedWaypointId, waypoints) {
if (selectedWaypointId != null && waypoints.containsKey(selectedWaypointId)) {
@ -506,7 +491,7 @@ fun MapView(
}
}
fun getUsername(id: String?) = if (id == DataPacket.ID_LOCAL) {
fun getUsername(id: String?) = if (id == DataPacket.ID_LOCAL || (myId != null && id == myId)) {
com.meshtastic.core.strings.getString(Res.string.you)
} else {
mapViewModel.getUser(id).longName

View file

@ -65,5 +65,5 @@ constructor(
val applicationId = buildConfigProvider.applicationId
fun getUser(userId: String?) = nodeRepository.getUser(userId ?: DataPacket.ID_BROADCAST)
override fun getUser(userId: String?) = nodeRepository.getUser(userId ?: DataPacket.ID_BROADCAST)
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2025 Meshtastic LLC
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -14,7 +14,6 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.feature.map
import android.os.RemoteException
@ -79,6 +78,8 @@ abstract class BaseMapViewModel(
val myNodeNum
get() = myNodeInfo.value?.myNodeNum
val myId = nodeRepository.myId
val nodes: StateFlow<List<Node>> =
nodeRepository
.getNodes()
@ -121,6 +122,8 @@ abstract class BaseMapViewModel(
fun getNodeByNum(nodeNum: Int): Node? = nodeRepository.nodeDBbyNum.value[nodeNum]
open fun getUser(userId: String?): MeshProtos.User = nodeRepository.getUser(userId ?: DataPacket.ID_BROADCAST)
fun getUser(nodeNum: Int): MeshProtos.User = nodeRepository.getUser(nodeNum)
fun getNodeOrFallback(nodeNum: Int): Node = getNodeByNum(nodeNum) ?: Node(num = nodeNum, user = getUser(nodeNum))

View file

@ -495,7 +495,7 @@ private fun BoxScope.ScrollToBottomFab(coroutineScope: CoroutineScope, listState
private fun ReplySnippet(originalMessage: Message?, onClearReply: () -> Unit, ourNode: Node?) {
AnimatedVisibility(visible = originalMessage != null) {
originalMessage?.let { message ->
val isFromLocalUser = message.node.user.id == DataPacket.ID_LOCAL
val isFromLocalUser = message.fromLocal
val replyingToNodeUser = if (isFromLocalUser) ourNode?.user else message.node.user
val unknownUserText = stringResource(Res.string.unknown)

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2025 Meshtastic LLC
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -14,7 +14,6 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.feature.settings.radio
import android.Manifest
@ -216,9 +215,10 @@ constructor(
requestAction(service, packetId, destNum)
requestIds.update { it.apply { add(packetId) } }
_radioConfigState.update { state ->
if (state.responseState is ResponseState.Loading) {
val total = maxOf(requestIds.value.size, state.responseState.total)
state.copy(responseState = state.responseState.copy(total = total))
val currentState = state.responseState
if (currentState is ResponseState.Loading) {
val total = requestIds.value.size.coerceAtLeast(1)
state.copy(responseState = currentState.copy(total = total))
} else {
state.copy(
route = "", // setter (response is PortNum.ROUTING_APP)
@ -233,7 +233,17 @@ constructor(
}
fun setOwner(user: MeshProtos.User) {
setRemoteOwner(destNode.value?.num ?: return, user)
val targetNode = destNode.value ?: return
// Ensure we are setting the owner for the intended target node
// This prevents accidentally updating the local node if the user object has the wrong ID
val fixedUser =
if (targetNode.user.id.isNotEmpty() && targetNode.user.id != user.id) {
Logger.w { "Fixing user ID mismatch in setOwner: form=${user.id} target=${targetNode.user.id}" }
user.toBuilder().setId(targetNode.user.id).build()
} else {
user
}
setRemoteOwner(targetNode.num, fixedUser)
}
private fun setRemoteOwner(destNum: Int, user: MeshProtos.User) = request(
@ -604,10 +614,11 @@ constructor(
mapConsentPrefs.setShouldReportLocation(nodeNum, shouldReportLocation)
}
private fun setResponseStateTotal(total: Int) {
private fun setResponseStateTotal(newTotal: Int) {
_radioConfigState.update { state ->
if (state.responseState is ResponseState.Loading) {
state.copy(responseState = state.responseState.copy(total = total))
val currentState = state.responseState
if (currentState is ResponseState.Loading) {
state.copy(responseState = currentState.copy(total = newTotal))
} else {
state // Return the unchanged state for other response states
}