From 3b0dda4491427c695729faf75b41dd14d236c4fb Mon Sep 17 00:00:00 2001
From: James Rich <2199651+jamesarich@users.noreply.github.com>
Date: Sun, 18 Jan 2026 21:20:40 -0600
Subject: [PATCH] fix(node): Correct owner ID and local user detection (#4256)
Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
---
.../repository/radio/NordicBleInterface.kt | 7 +-
.../repository/radio/RadioInterfaceService.kt | 19 ++-
.../mesh/repository/radio/SerialInterface.kt | 7 +-
.../mesh/repository/radio/StreamInterface.kt | 28 +++--
.../mesh/repository/radio/TCPInterface.kt | 108 +++++++++---------
.../mesh/ui/contact/ContactsViewModel.kt | 47 +++++---
.../core/data/repository/NodeRepository.kt | 39 ++++---
.../org/meshtastic/feature/map/MapView.kt | 21 +---
.../meshtastic/feature/map/MapViewModel.kt | 2 +-
.../feature/map/BaseMapViewModel.kt | 7 +-
.../meshtastic/feature/messaging/Message.kt | 2 +-
.../settings/radio/RadioConfigViewModel.kt | 29 +++--
12 files changed, 183 insertions(+), 133 deletions(-)
diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/NordicBleInterface.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/NordicBleInterface.kt
index e88508b6d..fd0be1261 100644
--- a/app/src/main/java/com/geeksville/mesh/repository/radio/NordicBleInterface.kt
+++ b/app/src/main/java/com/geeksville/mesh/repository/radio/NordicBleInterface.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2025 Meshtastic LLC
+ * Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -14,7 +14,6 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
-
package com.geeksville.mesh.repository.radio
import android.annotation.SuppressLint
@@ -401,6 +400,10 @@ constructor(
} ?: Logger.w { "[$address] toRadio characteristic unavailable, can't send data" }
}
+ override fun keepAlive() {
+ Logger.d { "[$address] BLE keepAlive" }
+ }
+
/** Closes the connection to the device. */
override fun close() {
runBlocking {
diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/RadioInterfaceService.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/RadioInterfaceService.kt
index 2d86d46f7..d26e1177b 100644
--- a/app/src/main/java/com/geeksville/mesh/repository/radio/RadioInterfaceService.kt
+++ b/app/src/main/java/com/geeksville/mesh/repository/radio/RadioInterfaceService.kt
@@ -33,6 +33,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
+import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
@@ -225,10 +226,6 @@ constructor(
}
}
- if (radioIf is SerialInterface) {
- keepAlive(System.currentTimeMillis())
- }
-
// ignoreException { Logger.d { "FromRadio: ${MeshProtos.FromRadio.parseFrom(p }}" } }
try {
@@ -277,10 +274,24 @@ constructor(
}
radioIf = interfaceFactory.createInterface(address)
+ startHeartbeat()
}
}
}
+ private var heartbeatJob: kotlinx.coroutines.Job? = null
+
+ private fun startHeartbeat() {
+ heartbeatJob?.cancel()
+ heartbeatJob =
+ serviceScope.launch {
+ while (true) {
+ delay(HEARTBEAT_INTERVAL_MILLIS)
+ keepAlive()
+ }
+ }
+ }
+
private fun stopInterface() {
val r = radioIf
Logger.i { "stopping interface $r" }
diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/SerialInterface.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/SerialInterface.kt
index d015621a5..c1154a5a0 100644
--- a/app/src/main/java/com/geeksville/mesh/repository/radio/SerialInterface.kt
+++ b/app/src/main/java/com/geeksville/mesh/repository/radio/SerialInterface.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2025 Meshtastic LLC
+ * Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -14,7 +14,6 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
-
package com.geeksville.mesh.repository.radio
import co.touchlab.kermit.Logger
@@ -115,6 +114,10 @@ constructor(
}
}
+ override fun keepAlive() {
+ Logger.d { "[$address] Serial keepAlive" }
+ }
+
override fun sendBytes(p: ByteArray) {
val conn = connRef.get()
if (conn != null) {
diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/StreamInterface.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/StreamInterface.kt
index 3dd50cea0..538f4088a 100644
--- a/app/src/main/java/com/geeksville/mesh/repository/radio/StreamInterface.kt
+++ b/app/src/main/java/com/geeksville/mesh/repository/radio/StreamInterface.kt
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2025 Meshtastic LLC
+ * Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -14,10 +14,12 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
-
package com.geeksville.mesh.repository.radio
import co.touchlab.kermit.Logger
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.sync.Mutex
+import kotlinx.coroutines.sync.withLock
/**
* An interface that assumes we are talking to a meshtastic device over some sort of stream connection (serial or TCP
@@ -32,6 +34,8 @@ abstract class StreamInterface(protected val service: RadioInterfaceService) : I
private val debugLineBuf = kotlin.text.StringBuilder()
+ private val writeMutex = Mutex()
+
/** The index of the next byte we are hoping to receive */
private var ptr = 0
@@ -74,15 +78,19 @@ abstract class StreamInterface(protected val service: RadioInterfaceService) : I
override fun handleSendToRadio(p: ByteArray) {
// This method is called from a continuation and it might show up late, so check for uart being null
- val header = ByteArray(4)
- header[0] = START1
- header[1] = START2
- header[2] = (p.size shr 8).toByte()
- header[3] = (p.size and 0xff).toByte()
+ service.serviceScope.launch {
+ writeMutex.withLock {
+ val header = ByteArray(4)
+ header[0] = START1
+ header[1] = START2
+ header[2] = (p.size shr 8).toByte()
+ header[3] = (p.size and 0xff).toByte()
- sendBytes(header)
- sendBytes(p)
- flushBytes()
+ sendBytes(header)
+ sendBytes(p)
+ flushBytes()
+ }
+ }
}
/** Print device serial debug output somewhere */
diff --git a/app/src/main/java/com/geeksville/mesh/repository/radio/TCPInterface.kt b/app/src/main/java/com/geeksville/mesh/repository/radio/TCPInterface.kt
index 9fbb9c69f..d37e73908 100644
--- a/app/src/main/java/com/geeksville/mesh/repository/radio/TCPInterface.kt
+++ b/app/src/main/java/com/geeksville/mesh/repository/radio/TCPInterface.kt
@@ -146,6 +146,15 @@ constructor(
}
}
+ override fun keepAlive() {
+ Logger.d { "[$address] TCP keepAlive" }
+ val heartbeat =
+ org.meshtastic.proto.MeshProtos.ToRadio.newBuilder()
+ .setHeartbeat(org.meshtastic.proto.MeshProtos.Heartbeat.getDefaultInstance())
+ .build()
+ handleSendToRadio(heartbeat.toByteArray())
+ }
+
// Create a socket to make the connection with the server
private suspend fun startConnect() = withContext(dispatchers.io) {
val attemptStart = System.currentTimeMillis()
@@ -154,73 +163,68 @@ constructor(
val parts = address.split(":", limit = 2)
val host = parts[0]
val port = parts.getOrNull(1)?.toIntOrNull() ?: SERVICE_PORT
- Logger.i { "[$address] Parsed address. Host: $host, Port: $port" }
Logger.d { "[$address] Resolving host '$host' and connecting to port $port..." }
- try {
- Socket(InetAddress.getByName(host), port).use { socket ->
- socket.tcpNoDelay = true
- socket.soTimeout = SOCKET_TIMEOUT
- this@TCPInterface.socket = socket
+ Socket(InetAddress.getByName(host), port).use { socket ->
+ socket.tcpNoDelay = true
+ socket.keepAlive = true
+ socket.soTimeout = SOCKET_TIMEOUT
+ this@TCPInterface.socket = socket
- val connectTime = System.currentTimeMillis() - attemptStart
- connectionStartTime = System.currentTimeMillis()
- Logger.i {
- "[$address] TCP socket connected in ${connectTime}ms - " +
- "Local: ${socket.localSocketAddress}, Remote: ${socket.remoteSocketAddress}"
- }
+ val connectTime = System.currentTimeMillis() - attemptStart
+ connectionStartTime = System.currentTimeMillis()
+ Logger.i {
+ "[$address] TCP socket connected in ${connectTime}ms - " +
+ "Local: ${socket.localSocketAddress}, Remote: ${socket.remoteSocketAddress}"
+ }
- BufferedOutputStream(socket.getOutputStream()).use { outputStream ->
- outStream = outputStream
+ BufferedOutputStream(socket.getOutputStream()).use { outputStream ->
+ outStream = outputStream
- BufferedInputStream(socket.getInputStream()).use { inputStream ->
- super.connect()
+ BufferedInputStream(socket.getInputStream()).use { inputStream ->
+ super.connect()
- retryCount = 1
- backoffDelay = MIN_BACKOFF_MILLIS
+ retryCount = 1
+ backoffDelay = MIN_BACKOFF_MILLIS
- var timeoutCount = 0
- while (timeoutCount < SOCKET_RETRIES) {
- try { // close after 90s of inactivity
- val c = inputStream.read()
- if (c == -1) {
- Logger.w {
- "[$address] TCP got EOF on stream after $packetsReceived packets received"
- }
- break
- } else {
- timeoutCount = 0
- packetsReceived++
- bytesReceived++
- readChar(c.toByte())
+ var timeoutCount = 0
+ while (timeoutCount < SOCKET_RETRIES) {
+ try { // close after 90s of inactivity
+ val c = inputStream.read()
+ if (c == -1) {
+ Logger.w {
+ "[$address] TCP got EOF on stream after $packetsReceived packets received"
}
- } catch (ex: SocketTimeoutException) {
- timeoutCount++
- timeoutEvents++
- if (timeoutCount % TIMEOUT_LOG_INTERVAL == 0) {
- Logger.d {
- "[$address] TCP socket timeout count: $timeoutCount/$SOCKET_RETRIES " +
- "(total timeouts: $timeoutEvents)"
- }
- }
- // Ignore and start another read
+ break
+ } else {
+ timeoutCount = 0
+ packetsReceived++
+ bytesReceived++
+ readChar(c.toByte())
}
+ } catch (ex: SocketTimeoutException) {
+ timeoutCount++
+ timeoutEvents++
+ if (timeoutCount % TIMEOUT_LOG_INTERVAL == 0) {
+ Logger.d {
+ "[$address] TCP socket timeout count: $timeoutCount/$SOCKET_RETRIES " +
+ "(total timeouts: $timeoutEvents)"
+ }
+ }
+ // Ignore and start another read
}
- if (timeoutCount >= SOCKET_RETRIES) {
- val inactivityMs = SOCKET_RETRIES * SOCKET_TIMEOUT
- Logger.w {
- "[$address] TCP closing connection due to $SOCKET_RETRIES consecutive timeouts " +
- "(${inactivityMs}ms of inactivity)"
- }
+ }
+ if (timeoutCount >= SOCKET_RETRIES) {
+ val inactivityMs = SOCKET_RETRIES * SOCKET_TIMEOUT
+ Logger.w {
+ "[$address] TCP closing connection due to $SOCKET_RETRIES consecutive timeouts " +
+ "(${inactivityMs}ms of inactivity)"
}
}
}
- onDeviceDisconnect(false)
}
- } catch (e: IOException) {
- Logger.e(e) { "[$address] Error connecting to $host:$port" }
- throw e
+ onDeviceDisconnect(false)
}
}
}
diff --git a/app/src/main/java/com/geeksville/mesh/ui/contact/ContactsViewModel.kt b/app/src/main/java/com/geeksville/mesh/ui/contact/ContactsViewModel.kt
index 1806b6dde..b1057558a 100644
--- a/app/src/main/java/com/geeksville/mesh/ui/contact/ContactsViewModel.kt
+++ b/app/src/main/java/com/geeksville/mesh/ui/contact/ContactsViewModel.kt
@@ -33,6 +33,8 @@ import org.jetbrains.compose.resources.getString
import org.meshtastic.core.data.repository.NodeRepository
import org.meshtastic.core.data.repository.PacketRepository
import org.meshtastic.core.data.repository.RadioConfigRepository
+import org.meshtastic.core.database.entity.ContactSettings
+import org.meshtastic.core.database.entity.MyNodeEntity
import org.meshtastic.core.database.entity.Packet
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.util.getChannel
@@ -41,6 +43,7 @@ import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.core.strings.Res
import org.meshtastic.core.strings.channel_name
import org.meshtastic.core.ui.viewmodel.stateInWhileSubscribed
+import org.meshtastic.proto.AppOnlyProtos
import org.meshtastic.proto.channelSet
import javax.inject.Inject
import kotlin.collections.map as collectionsMap
@@ -60,6 +63,10 @@ constructor(
val channels = radioConfigRepository.channelSetFlow.stateInWhileSubscribed(initialValue = channelSet {})
+ // Combine node info and myId to reduce argument count in subsequent combines
+ private val identityFlow: Flow> =
+ combine(nodeRepository.myNodeInfo, nodeRepository.myId) { info, id -> Pair(info, id) }
+
/**
* Non-paginated contact list.
*
@@ -69,12 +76,13 @@ constructor(
* @see contactListPaged for the paginated version used in ContactsScreen
*/
val contactList =
- combine(
- nodeRepository.myNodeInfo,
- packetRepository.getContacts(),
- channels,
- packetRepository.getContactSettings(),
- ) { myNodeInfo, contacts, channelSet, settings ->
+ combine(identityFlow, packetRepository.getContacts(), channels, packetRepository.getContactSettings()) {
+ identity,
+ contacts,
+ channelSet,
+ settings,
+ ->
+ val (myNodeInfo, myId) = identity
val myNodeNum = myNodeInfo?.myNodeNum ?: return@combine emptyList()
// Add empty channel placeholders (always show Broadcast contacts, even when empty)
val placeholder =
@@ -89,7 +97,7 @@ constructor(
val contactKey = packet.contact_key
// Determine if this is my message (originated on this device)
- val fromLocal = data.from == DataPacket.ID_LOCAL
+ val fromLocal = data.from == DataPacket.ID_LOCAL || (myId != null && data.from == myId)
val toBroadcast = data.to == DataPacket.ID_BROADCAST
// grab usernames from NodeInfo
@@ -126,21 +134,23 @@ constructor(
.stateInWhileSubscribed(initialValue = emptyList())
val contactListPaged: Flow> =
- combine(nodeRepository.myNodeInfo, channels, packetRepository.getContactSettings()) {
- myNodeInfo,
- channelSet,
- settings,
- ->
- Triple(myNodeInfo?.myNodeNum, channelSet, settings)
+ combine(identityFlow, channels, packetRepository.getContactSettings()) { identity, channelSet, settings ->
+ val (myNodeInfo, myId) = identity
+ ContactsPagedParams(myNodeInfo?.myNodeNum, channelSet, settings, myId)
}
- .flatMapLatest { (myNodeNum, channelSet, settings) ->
+ .flatMapLatest { params ->
+ val myNodeNum = params.myNodeNum
+ val channelSet = params.channelSet
+ val settings = params.settings
+ val myId = params.myId
+
packetRepository.getContactsPaged().map { pagingData ->
pagingData.map { packet ->
val data = packet.data
val contactKey = packet.contact_key
// Determine if this is my message (originated on this device)
- val fromLocal = data.from == DataPacket.ID_LOCAL
+ val fromLocal = data.from == DataPacket.ID_LOCAL || (myId != null && data.from == myId)
val toBroadcast = data.to == DataPacket.ID_BROADCAST
// grab usernames from NodeInfo
@@ -198,4 +208,11 @@ constructor(
}
private fun getUser(userId: String?) = nodeRepository.getUser(userId ?: DataPacket.ID_BROADCAST)
+
+ private data class ContactsPagedParams(
+ val myNodeNum: Int?,
+ val channelSet: AppOnlyProtos.ChannelSet,
+ val settings: Map,
+ val myId: String?,
+ )
}
diff --git a/core/data/src/main/kotlin/org/meshtastic/core/data/repository/NodeRepository.kt b/core/data/src/main/kotlin/org/meshtastic/core/data/repository/NodeRepository.kt
index 8618ebd0f..27186a5a8 100644
--- a/core/data/src/main/kotlin/org/meshtastic/core/data/repository/NodeRepository.kt
+++ b/core/data/src/main/kotlin/org/meshtastic/core/data/repository/NodeRepository.kt
@@ -23,8 +23,10 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.flowOn
+import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.onEach
@@ -56,15 +58,6 @@ constructor(
private val nodeInfoWriteDataSource: NodeInfoWriteDataSource,
private val dispatchers: CoroutineDispatchers,
) {
- init {
- // Backfill denormalized name columns for existing nodes on startup
- processLifecycle.coroutineScope.launch {
- processLifecycle.repeatOnLifecycle(Lifecycle.State.CREATED) {
- withContext(dispatchers.io) { nodeInfoWriteDataSource.backfillDenormalizedNames() }
- }
- }
- }
-
// hardware info about our local device (can be null)
val myNodeInfo: StateFlow =
nodeInfoReadDataSource
@@ -82,23 +75,35 @@ constructor(
val myId: StateFlow
get() = _myId
- fun getNodeDBbyNum() =
- nodeInfoReadDataSource.nodeDBbyNumFlow().map { map -> map.mapValues { (_, it) -> it.toEntity() } }
-
// A map from nodeNum to Node
val nodeDBbyNum: StateFlow