fix: address backfill issue on tcp connections; add logging (#3676)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
Co-authored-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
Mac DeCourcy 2025-11-14 11:22:48 -08:00 committed by GitHub
parent 2a081f3c1f
commit 7369a9bf5d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 332 additions and 21 deletions

View file

@ -26,12 +26,15 @@ import android.content.pm.ServiceInfo
import android.os.Build
import android.os.IBinder
import android.os.RemoteException
import android.util.Log
import androidx.annotation.VisibleForTesting
import androidx.core.app.ServiceCompat
import androidx.core.location.LocationCompat
import com.geeksville.mesh.BuildConfig
import com.geeksville.mesh.concurrent.handledLaunch
import com.geeksville.mesh.model.NO_DEVICE_SELECTED
import com.geeksville.mesh.repository.network.MQTTRepository
import com.geeksville.mesh.repository.radio.InterfaceId
import com.geeksville.mesh.repository.radio.RadioInterfaceService
import com.geeksville.mesh.util.ignoreException
import com.geeksville.mesh.util.toRemoteExceptions
@ -50,7 +53,9 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import org.meshtastic.core.analytics.DataPair
import org.meshtastic.core.analytics.platform.PlatformAnalytics
import org.meshtastic.core.common.hasLocationPermission
@ -118,6 +123,8 @@ import org.meshtastic.proto.position
import org.meshtastic.proto.telemetry
import org.meshtastic.proto.user
import timber.log.Timber
import java.util.ArrayDeque
import java.util.Locale
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject
@ -220,12 +227,66 @@ class MeshService : Service() {
private const val DEFAULT_CONFIG_ONLY_NONCE = 69420
private const val DEFAULT_NODE_INFO_NONCE = 69421
private const val WANT_CONFIG_DELAY = 250L
private const val WANT_CONFIG_DELAY = 50L
private const val HISTORY_TAG = "HistoryReplay"
private const val DEFAULT_HISTORY_RETURN_WINDOW_MINUTES = 60 * 24
private const val DEFAULT_HISTORY_RETURN_MAX_MESSAGES = 100
private const val MAX_EARLY_PACKET_BUFFER = 128
@VisibleForTesting
internal fun buildStoreForwardHistoryRequest(
lastRequest: Int,
historyReturnWindow: Int,
historyReturnMax: Int,
): StoreAndForwardProtos.StoreAndForward {
val historyBuilder = StoreAndForwardProtos.StoreAndForward.History.newBuilder()
if (lastRequest > 0) historyBuilder.lastRequest = lastRequest
if (historyReturnWindow > 0) historyBuilder.window = historyReturnWindow
if (historyReturnMax > 0) historyBuilder.historyMessages = historyReturnMax
return StoreAndForwardProtos.StoreAndForward.newBuilder()
.setRr(StoreAndForwardProtos.StoreAndForward.RequestResponse.CLIENT_HISTORY)
.setHistory(historyBuilder)
.build()
}
@VisibleForTesting
internal fun resolveHistoryRequestParameters(window: Int, max: Int): Pair<Int, Int> {
val resolvedWindow = if (window > 0) window else DEFAULT_HISTORY_RETURN_WINDOW_MINUTES
val resolvedMax = if (max > 0) max else DEFAULT_HISTORY_RETURN_MAX_MESSAGES
return resolvedWindow to resolvedMax
}
}
private val serviceJob = Job()
private val serviceScope = CoroutineScope(Dispatchers.IO + serviceJob)
private inline fun historyLog(
priority: Int = Log.INFO,
throwable: Throwable? = null,
crossinline message: () -> String,
) {
if (!BuildConfig.DEBUG) return
val timber = Timber.tag(HISTORY_TAG)
val msg = message()
if (throwable != null) {
timber.log(priority, throwable, msg)
} else {
timber.log(priority, msg)
}
}
private fun activeDeviceAddress(): String? =
meshPrefs.deviceAddress?.takeIf { !it.equals(NO_DEVICE_SELECTED, ignoreCase = true) && it.isNotBlank() }
private fun currentTransport(address: String? = meshPrefs.deviceAddress): String = when (address?.firstOrNull()) {
InterfaceId.BLUETOOTH.id -> "BLE"
InterfaceId.TCP.id -> "TCP"
InterfaceId.SERIAL.id -> "Serial"
InterfaceId.MOCK.id -> "Mock"
InterfaceId.NOP.id -> "NOP"
else -> "Unknown"
}
private var locationFlow: Job? = null
private var mqttMessageFlow: Job? = null
@ -312,7 +373,17 @@ class MeshService : Service() {
// Switch to the IO thread
serviceScope.handledLaunch { radioInterfaceService.connect() }
radioInterfaceService.connectionState.onEach(::onRadioConnectionState).launchIn(serviceScope)
radioInterfaceService.receivedData.onEach(::onReceiveFromRadio).launchIn(serviceScope)
radioInterfaceService.receivedData
.onStart {
historyLog { "rxCollector START transport=${currentTransport()} scope=${serviceScope.hashCode()}" }
}
.onCompletion { cause ->
historyLog(Log.WARN) {
"rxCollector STOP transport=${currentTransport()} cause=${cause?.message ?: "completed"}"
}
}
.onEach(::onReceiveFromRadio)
.launchIn(serviceScope)
radioInterfaceService.connectionError
.onEach { error -> Timber.e("BLE Connection Error: ${error.message}") }
.launchIn(serviceScope)
@ -424,6 +495,7 @@ class MeshService : Service() {
myNodeInfo = null
nodeDBbyNodeNum.clear()
haveNodeDB = false
earlyReceivedPackets.clear()
}
private var myNodeInfo: MyNodeEntity? = null
@ -1040,7 +1112,79 @@ class MeshService : Service() {
updateNodeInfo(fromNum) { it.paxcounter = p }
}
/**
* Ask the connected radio to replay any packets it buffered while the client was offline.
*
* Radios deliver history via the Store & Forward protocol regardless of transport, so we piggyback on that
* mechanism after BLE/WiFi reconnects.
*/
private fun requestHistoryReplay(trigger: String) {
val address = activeDeviceAddress()
val failure =
when {
address == null -> "no_active_address"
myNodeNum == null -> "no_my_node"
else -> null
}
if (failure != null) {
historyLog { "requestHistory skipped trigger=$trigger reason=$failure" }
return
}
val safeAddress = address!!
val myNum = myNodeNum!!
val storeForwardConfig = moduleConfig.storeForward
val lastRequest = meshPrefs.getStoreForwardLastRequest(safeAddress)
val (window, max) =
resolveHistoryRequestParameters(storeForwardConfig.historyReturnWindow, storeForwardConfig.historyReturnMax)
val windowSource = if (storeForwardConfig.historyReturnWindow > 0) "config" else "default"
val maxSource = if (storeForwardConfig.historyReturnMax > 0) "config" else "default"
val sourceSummary = "window=$window($windowSource) max=$max($maxSource)"
val request =
buildStoreForwardHistoryRequest(
lastRequest = lastRequest,
historyReturnWindow = window,
historyReturnMax = max,
)
val logContext = "trigger=$trigger transport=${currentTransport(safeAddress)} addr=$safeAddress"
historyLog { "requestHistory $logContext lastRequest=$lastRequest $sourceSummary" }
runCatching {
packetHandler.sendToRadio(
newMeshPacketTo(myNum).buildMeshPacket(priority = MeshPacket.Priority.BACKGROUND) {
portnumValue = Portnums.PortNum.STORE_FORWARD_APP_VALUE
payload = ByteString.copyFrom(request.toByteArray())
},
)
}
.onFailure { ex -> historyLog(Log.WARN, ex) { "requestHistory failed $logContext" } }
}
private fun updateStoreForwardLastRequest(source: String, lastRequest: Int) {
if (lastRequest <= 0) return
val address = activeDeviceAddress() ?: return
val current = meshPrefs.getStoreForwardLastRequest(address)
val transport = currentTransport(address)
val logContext = "source=$source transport=$transport address=$address"
if (lastRequest != current) {
meshPrefs.setStoreForwardLastRequest(address, lastRequest)
historyLog { "historyMarker updated $logContext from=$current to=$lastRequest" }
} else {
historyLog(Log.DEBUG) { "historyMarker unchanged $logContext value=$lastRequest" }
}
}
private fun handleReceivedStoreAndForward(dataPacket: DataPacket, s: StoreAndForwardProtos.StoreAndForward) {
Timber.d("StoreAndForward: ${s.variantCase} ${s.rr} from ${dataPacket.from}")
val transport = currentTransport()
val lastRequest =
if (s.variantCase == StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY) {
s.history.lastRequest
} else {
0
}
val baseContext = "transport=$transport from=${dataPacket.from}"
historyLog { "rxStoreForward $baseContext variant=${s.variantCase} rr=${s.rr} lastRequest=$lastRequest" }
when (s.variantCase) {
StoreAndForwardProtos.StoreAndForward.VariantCase.STATS -> {
val u =
@ -1052,6 +1196,11 @@ class MeshService : Service() {
}
StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY -> {
val history = s.history
val historySummary =
"routerHistory $baseContext messages=${history.historyMessages} " +
"window=${history.window} lastRequest=${history.lastRequest}"
historyLog(Log.DEBUG) { historySummary }
val text =
"""
Total messages: ${s.history.historyMessages}
@ -1065,12 +1214,17 @@ class MeshService : Service() {
dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
)
rememberDataPacket(u)
updateStoreForwardLastRequest("router_history", s.history.lastRequest)
}
StoreAndForwardProtos.StoreAndForward.VariantCase.TEXT -> {
if (s.rr == StoreAndForwardProtos.StoreAndForward.RequestResponse.ROUTER_TEXT_BROADCAST) {
dataPacket.to = DataPacket.ID_BROADCAST
}
val textLog =
"rxText $baseContext id=${dataPacket.id} ts=${dataPacket.time} " +
"to=${dataPacket.to} decision=remember"
historyLog(Log.DEBUG) { textLog }
val u =
dataPacket.copy(bytes = s.text.toByteArray(), dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE)
rememberDataPacket(u)
@ -1080,29 +1234,59 @@ class MeshService : Service() {
}
}
private val earlyReceivedPackets = ArrayDeque<MeshPacket>()
// If apps try to send packets when our radio is sleeping, we queue them here instead
private val offlineSentPackets = mutableListOf<DataPacket>()
// Update our model and resend as needed for a MeshPacket we just received from the radio
private fun handleReceivedMeshPacket(packet: MeshPacket) {
val preparedPacket =
packet
.toBuilder()
.apply {
// If the rxTime was not set by the device, update with current time
if (packet.rxTime == 0) setRxTime(currentSecond())
}
.build()
Timber.d("[packet]: ${packet.toOneLineString()}")
if (haveNodeDB) {
processReceivedMeshPacket(
packet
.toBuilder()
.apply {
// If the rxTime was not set by the device, update with current time
if (packet.rxTime == 0) setRxTime(currentSecond())
}
.build(),
)
} else {
Timber.w("Ignoring early received packet: ${packet.toOneLineString()}")
// earlyReceivedPackets.add(packet)
// logAssert(earlyReceivedPackets.size < 128) // The max should normally be about 32,
// but if the device is
// messed up it might try to send forever
processReceivedMeshPacket(preparedPacket)
return
}
val queueSize = earlyReceivedPackets.size
if (queueSize >= MAX_EARLY_PACKET_BUFFER) {
val dropped = earlyReceivedPackets.removeFirst()
historyLog(Log.WARN) {
val portLabel =
if (dropped.hasDecoded()) {
Portnums.PortNum.forNumber(dropped.decoded.portnumValue)?.name
?: dropped.decoded.portnumValue.toString()
} else {
"unknown"
}
"dropEarlyPacket bufferFull size=$queueSize id=${dropped.id} port=$portLabel"
}
}
earlyReceivedPackets.addLast(preparedPacket)
val portLabel =
if (preparedPacket.hasDecoded()) {
Portnums.PortNum.forNumber(preparedPacket.decoded.portnumValue)?.name
?: preparedPacket.decoded.portnumValue.toString()
} else {
"unknown"
}
historyLog { "queueEarlyPacket size=${earlyReceivedPackets.size} id=${preparedPacket.id} port=$portLabel" }
}
private fun flushEarlyReceivedPackets(reason: String) {
if (earlyReceivedPackets.isEmpty()) return
val packets = earlyReceivedPackets.toList()
earlyReceivedPackets.clear()
historyLog { "replayEarlyPackets reason=$reason count=${packets.size}" }
packets.forEach(::processReceivedMeshPacket)
}
private fun sendNow(p: DataPacket) {
@ -1240,6 +1424,7 @@ class MeshService : Service() {
private var connectTimeMsec = 0L
// Called when we gain/lose connection to our radio
@Suppress("CyclomaticComplexMethod")
private fun onConnectionChanged(c: ConnectionState) {
Timber.d("onConnectionChanged: ${connectionStateHolder.getState()} -> $c")
@ -1292,6 +1477,10 @@ class MeshService : Service() {
fun startConnect() {
Timber.d("Starting connect")
historyLog {
val address = meshPrefs.deviceAddress ?: "null"
"onReconnect transport=${currentTransport()} node=$address"
}
try {
connectTimeMsec = System.currentTimeMillis()
startConfigOnly()
@ -1426,24 +1615,32 @@ class MeshService : Service() {
*/
private fun onReceiveFromRadio(bytes: ByteArray) {
runCatching { MeshProtos.FromRadio.parseFrom(bytes) }
.onSuccess { proto -> proto.route() }
.onSuccess { proto ->
if (proto.payloadVariantCase == PayloadVariantCase.PAYLOADVARIANT_NOT_SET) {
Timber.w(
"Received FromRadio with PAYLOADVARIANT_NOT_SET. rawBytes=${bytes.toHexString()} proto=$proto",
)
}
proto.route()
}
.onFailure { primaryException ->
runCatching {
val logRecord = MeshProtos.LogRecord.parseFrom(bytes)
handleLogRecord(logRecord)
}
.onFailure { _ ->
val packet = bytes.toHexString()
Timber.e(
primaryException,
"Failed to parse radio packet (len=${bytes.size} contents=$packet). Not a valid FromRadio or LogRecord.",
"Failed to parse radio packet (len=${bytes.size} contents=${bytes.toHexString()}). " +
"Not a valid FromRadio or LogRecord.",
)
}
}
}
/** Extension function to convert a ByteArray to a hex string for logging. Example output: "0x0a,0x1f,0x..." */
private fun ByteArray.toHexString(): String = this.joinToString(",") { byte -> String.format("0x%02x", byte) }
private fun ByteArray.toHexString(): String =
this.joinToString(",") { byte -> String.format(Locale.US, "0x%02x", byte) }
// A provisional MyNodeInfo that we will install if all of our node config downloads go okay
private var newMyNodeInfo: MyNodeEntity? = null
@ -1778,6 +1975,12 @@ class MeshService : Service() {
serviceBroadcasts.broadcastConnection()
sendAnalytics()
reportConnection()
historyLog {
val ports =
rememberDataType.joinToString(",") { port -> Portnums.PortNum.forNumber(port)?.name ?: port.toString() }
"subscribePorts afterReconnect ports=$ports"
}
requestHistoryReplay("onHasSettings")
packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { setTimeOnly = currentSecond() })
}
@ -1849,6 +2052,7 @@ class MeshService : Service() {
newNodes.clear()
serviceScope.handledLaunch { nodeRepository.installConfig(myNodeInfo!!, nodeDBbyNodeNum.values.toList()) }
haveNodeDB = true
flushEarlyReceivedPackets("node_info_complete")
sendAnalytics()
onHasSettings()
}
@ -2038,12 +2242,24 @@ class MeshService : Service() {
Timber.d(
"SetDeviceAddress: Device address changed from ${currentAddr.anonymize} to ${deviceAddr.anonymize}",
)
val currentLabel = currentAddr ?: "null"
val nextLabel = deviceAddr ?: "null"
val nextTransport = currentTransport(deviceAddr)
historyLog { "dbSwitch request current=$currentLabel next=$nextLabel transportNext=$nextTransport" }
meshPrefs.deviceAddress = deviceAddr
serviceScope.handledLaunch {
// Clear only in-memory caches to avoid cross-device bleed
discardNodeDB()
// Switch active on-disk DB to device-specific database
databaseManager.switchActiveDatabase(deviceAddr)
val activeAddress = databaseManager.currentAddress.value
val activeLabel = activeAddress ?: "null"
val transportLabel = currentTransport()
val meshAddress = meshPrefs.deviceAddress ?: "null"
val nodeId = myNodeInfo?.myNodeNum?.toString() ?: "unknown"
val dbSummary =
"dbSwitch activeAddress=$activeLabel nodeId=$nodeId transport=$transportLabel addr=$meshAddress"
historyLog { dbSummary }
// Do not clear packet DB here; messages are per-device and should persist
clearNotifications()
}

View file

@ -0,0 +1,67 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import org.junit.Assert.assertEquals
import org.junit.Test
import org.meshtastic.proto.StoreAndForwardProtos
class StoreForwardHistoryRequestTest {
@Test
fun `buildStoreForwardHistoryRequest copies positive parameters`() {
val request =
MeshService.buildStoreForwardHistoryRequest(
lastRequest = 42,
historyReturnWindow = 15,
historyReturnMax = 25,
)
assertEquals(StoreAndForwardProtos.StoreAndForward.RequestResponse.CLIENT_HISTORY, request.rr)
assertEquals(42, request.history.lastRequest)
assertEquals(15, request.history.window)
assertEquals(25, request.history.historyMessages)
}
@Test
fun `buildStoreForwardHistoryRequest omits non-positive parameters`() {
val request =
MeshService.buildStoreForwardHistoryRequest(lastRequest = 0, historyReturnWindow = -1, historyReturnMax = 0)
assertEquals(StoreAndForwardProtos.StoreAndForward.RequestResponse.CLIENT_HISTORY, request.rr)
assertEquals(0, request.history.lastRequest)
assertEquals(0, request.history.window)
assertEquals(0, request.history.historyMessages)
}
@Test
fun `resolveHistoryRequestParameters uses config values when positive`() {
val (window, max) = MeshService.resolveHistoryRequestParameters(window = 30, max = 10)
assertEquals(30, window)
assertEquals(10, max)
}
@Test
fun `resolveHistoryRequestParameters falls back to defaults when non-positive`() {
val (window, max) = MeshService.resolveHistoryRequestParameters(window = 0, max = -5)
assertEquals(1440, window)
assertEquals(100, max)
}
}

View file

@ -21,6 +21,7 @@ import android.content.SharedPreferences
import androidx.core.content.edit
import org.meshtastic.core.prefs.NullableStringPrefDelegate
import org.meshtastic.core.prefs.di.MeshSharedPreferences
import java.util.Locale
import javax.inject.Inject
import javax.inject.Singleton
@ -30,6 +31,10 @@ interface MeshPrefs {
fun shouldProvideNodeLocation(nodeNum: Int?): Boolean
fun setShouldProvideNodeLocation(nodeNum: Int?, value: Boolean)
fun getStoreForwardLastRequest(address: String?): Int
fun setStoreForwardLastRequest(address: String?, value: Int)
}
@Singleton
@ -43,7 +48,30 @@ class MeshPrefsImpl @Inject constructor(@MeshSharedPreferences private val prefs
prefs.edit { putBoolean(provideLocationKey(nodeNum), value) }
}
override fun getStoreForwardLastRequest(address: String?): Int = prefs.getInt(storeForwardKey(address), 0)
override fun setStoreForwardLastRequest(address: String?, value: Int) {
prefs.edit {
if (value <= 0) {
remove(storeForwardKey(address))
} else {
putInt(storeForwardKey(address), value)
}
}
}
private fun provideLocationKey(nodeNum: Int?) = "provide-location-$nodeNum"
private fun storeForwardKey(address: String?): String = "store-forward-last-request-${normalizeAddress(address)}"
private fun normalizeAddress(address: String?): String {
val raw = address?.trim()?.takeIf { it.isNotEmpty() }
return when {
raw == null -> "DEFAULT"
raw.equals(NO_DEVICE_SELECTED, ignoreCase = true) -> "DEFAULT"
else -> raw.uppercase(Locale.US).replace(":", "")
}
}
}
private const val NO_DEVICE_SELECTED = "n"