diff --git a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt index 2b0a40189..2a8dca5a1 100644 --- a/app/src/main/java/com/geeksville/mesh/service/MeshService.kt +++ b/app/src/main/java/com/geeksville/mesh/service/MeshService.kt @@ -26,12 +26,15 @@ import android.content.pm.ServiceInfo import android.os.Build import android.os.IBinder import android.os.RemoteException +import android.util.Log +import androidx.annotation.VisibleForTesting import androidx.core.app.ServiceCompat import androidx.core.location.LocationCompat import com.geeksville.mesh.BuildConfig import com.geeksville.mesh.concurrent.handledLaunch import com.geeksville.mesh.model.NO_DEVICE_SELECTED import com.geeksville.mesh.repository.network.MQTTRepository +import com.geeksville.mesh.repository.radio.InterfaceId import com.geeksville.mesh.repository.radio.RadioInterfaceService import com.geeksville.mesh.util.ignoreException import com.geeksville.mesh.util.toRemoteExceptions @@ -50,7 +53,9 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart import org.meshtastic.core.analytics.DataPair import org.meshtastic.core.analytics.platform.PlatformAnalytics import org.meshtastic.core.common.hasLocationPermission @@ -118,6 +123,8 @@ import org.meshtastic.proto.position import org.meshtastic.proto.telemetry import org.meshtastic.proto.user import timber.log.Timber +import java.util.ArrayDeque +import java.util.Locale import java.util.UUID import java.util.concurrent.ConcurrentHashMap import javax.inject.Inject @@ -220,12 +227,66 @@ class MeshService : Service() { private const val DEFAULT_CONFIG_ONLY_NONCE = 69420 private const val DEFAULT_NODE_INFO_NONCE = 69421 - private const val WANT_CONFIG_DELAY = 250L + private const val WANT_CONFIG_DELAY = 50L + private const val HISTORY_TAG = "HistoryReplay" + private const val DEFAULT_HISTORY_RETURN_WINDOW_MINUTES = 60 * 24 + private const val DEFAULT_HISTORY_RETURN_MAX_MESSAGES = 100 + private const val MAX_EARLY_PACKET_BUFFER = 128 + + @VisibleForTesting + internal fun buildStoreForwardHistoryRequest( + lastRequest: Int, + historyReturnWindow: Int, + historyReturnMax: Int, + ): StoreAndForwardProtos.StoreAndForward { + val historyBuilder = StoreAndForwardProtos.StoreAndForward.History.newBuilder() + if (lastRequest > 0) historyBuilder.lastRequest = lastRequest + if (historyReturnWindow > 0) historyBuilder.window = historyReturnWindow + if (historyReturnMax > 0) historyBuilder.historyMessages = historyReturnMax + return StoreAndForwardProtos.StoreAndForward.newBuilder() + .setRr(StoreAndForwardProtos.StoreAndForward.RequestResponse.CLIENT_HISTORY) + .setHistory(historyBuilder) + .build() + } + + @VisibleForTesting + internal fun resolveHistoryRequestParameters(window: Int, max: Int): Pair { + val resolvedWindow = if (window > 0) window else DEFAULT_HISTORY_RETURN_WINDOW_MINUTES + val resolvedMax = if (max > 0) max else DEFAULT_HISTORY_RETURN_MAX_MESSAGES + return resolvedWindow to resolvedMax + } } private val serviceJob = Job() private val serviceScope = CoroutineScope(Dispatchers.IO + serviceJob) + private inline fun historyLog( + priority: Int = Log.INFO, + throwable: Throwable? = null, + crossinline message: () -> String, + ) { + if (!BuildConfig.DEBUG) return + val timber = Timber.tag(HISTORY_TAG) + val msg = message() + if (throwable != null) { + timber.log(priority, throwable, msg) + } else { + timber.log(priority, msg) + } + } + + private fun activeDeviceAddress(): String? = + meshPrefs.deviceAddress?.takeIf { !it.equals(NO_DEVICE_SELECTED, ignoreCase = true) && it.isNotBlank() } + + private fun currentTransport(address: String? = meshPrefs.deviceAddress): String = when (address?.firstOrNull()) { + InterfaceId.BLUETOOTH.id -> "BLE" + InterfaceId.TCP.id -> "TCP" + InterfaceId.SERIAL.id -> "Serial" + InterfaceId.MOCK.id -> "Mock" + InterfaceId.NOP.id -> "NOP" + else -> "Unknown" + } + private var locationFlow: Job? = null private var mqttMessageFlow: Job? = null @@ -312,7 +373,17 @@ class MeshService : Service() { // Switch to the IO thread serviceScope.handledLaunch { radioInterfaceService.connect() } radioInterfaceService.connectionState.onEach(::onRadioConnectionState).launchIn(serviceScope) - radioInterfaceService.receivedData.onEach(::onReceiveFromRadio).launchIn(serviceScope) + radioInterfaceService.receivedData + .onStart { + historyLog { "rxCollector START transport=${currentTransport()} scope=${serviceScope.hashCode()}" } + } + .onCompletion { cause -> + historyLog(Log.WARN) { + "rxCollector STOP transport=${currentTransport()} cause=${cause?.message ?: "completed"}" + } + } + .onEach(::onReceiveFromRadio) + .launchIn(serviceScope) radioInterfaceService.connectionError .onEach { error -> Timber.e("BLE Connection Error: ${error.message}") } .launchIn(serviceScope) @@ -424,6 +495,7 @@ class MeshService : Service() { myNodeInfo = null nodeDBbyNodeNum.clear() haveNodeDB = false + earlyReceivedPackets.clear() } private var myNodeInfo: MyNodeEntity? = null @@ -1040,7 +1112,79 @@ class MeshService : Service() { updateNodeInfo(fromNum) { it.paxcounter = p } } + /** + * Ask the connected radio to replay any packets it buffered while the client was offline. + * + * Radios deliver history via the Store & Forward protocol regardless of transport, so we piggyback on that + * mechanism after BLE/Wi‑Fi reconnects. + */ + private fun requestHistoryReplay(trigger: String) { + val address = activeDeviceAddress() + val failure = + when { + address == null -> "no_active_address" + myNodeNum == null -> "no_my_node" + else -> null + } + if (failure != null) { + historyLog { "requestHistory skipped trigger=$trigger reason=$failure" } + return + } + + val safeAddress = address!! + val myNum = myNodeNum!! + val storeForwardConfig = moduleConfig.storeForward + val lastRequest = meshPrefs.getStoreForwardLastRequest(safeAddress) + val (window, max) = + resolveHistoryRequestParameters(storeForwardConfig.historyReturnWindow, storeForwardConfig.historyReturnMax) + val windowSource = if (storeForwardConfig.historyReturnWindow > 0) "config" else "default" + val maxSource = if (storeForwardConfig.historyReturnMax > 0) "config" else "default" + val sourceSummary = "window=$window($windowSource) max=$max($maxSource)" + val request = + buildStoreForwardHistoryRequest( + lastRequest = lastRequest, + historyReturnWindow = window, + historyReturnMax = max, + ) + val logContext = "trigger=$trigger transport=${currentTransport(safeAddress)} addr=$safeAddress" + historyLog { "requestHistory $logContext lastRequest=$lastRequest $sourceSummary" } + + runCatching { + packetHandler.sendToRadio( + newMeshPacketTo(myNum).buildMeshPacket(priority = MeshPacket.Priority.BACKGROUND) { + portnumValue = Portnums.PortNum.STORE_FORWARD_APP_VALUE + payload = ByteString.copyFrom(request.toByteArray()) + }, + ) + } + .onFailure { ex -> historyLog(Log.WARN, ex) { "requestHistory failed $logContext" } } + } + + private fun updateStoreForwardLastRequest(source: String, lastRequest: Int) { + if (lastRequest <= 0) return + val address = activeDeviceAddress() ?: return + val current = meshPrefs.getStoreForwardLastRequest(address) + val transport = currentTransport(address) + val logContext = "source=$source transport=$transport address=$address" + if (lastRequest != current) { + meshPrefs.setStoreForwardLastRequest(address, lastRequest) + historyLog { "historyMarker updated $logContext from=$current to=$lastRequest" } + } else { + historyLog(Log.DEBUG) { "historyMarker unchanged $logContext value=$lastRequest" } + } + } + private fun handleReceivedStoreAndForward(dataPacket: DataPacket, s: StoreAndForwardProtos.StoreAndForward) { + Timber.d("StoreAndForward: ${s.variantCase} ${s.rr} from ${dataPacket.from}") + val transport = currentTransport() + val lastRequest = + if (s.variantCase == StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY) { + s.history.lastRequest + } else { + 0 + } + val baseContext = "transport=$transport from=${dataPacket.from}" + historyLog { "rxStoreForward $baseContext variant=${s.variantCase} rr=${s.rr} lastRequest=$lastRequest" } when (s.variantCase) { StoreAndForwardProtos.StoreAndForward.VariantCase.STATS -> { val u = @@ -1052,6 +1196,11 @@ class MeshService : Service() { } StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY -> { + val history = s.history + val historySummary = + "routerHistory $baseContext messages=${history.historyMessages} " + + "window=${history.window} lastRequest=${history.lastRequest}" + historyLog(Log.DEBUG) { historySummary } val text = """ Total messages: ${s.history.historyMessages} @@ -1065,12 +1214,17 @@ class MeshService : Service() { dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE, ) rememberDataPacket(u) + updateStoreForwardLastRequest("router_history", s.history.lastRequest) } StoreAndForwardProtos.StoreAndForward.VariantCase.TEXT -> { if (s.rr == StoreAndForwardProtos.StoreAndForward.RequestResponse.ROUTER_TEXT_BROADCAST) { dataPacket.to = DataPacket.ID_BROADCAST } + val textLog = + "rxText $baseContext id=${dataPacket.id} ts=${dataPacket.time} " + + "to=${dataPacket.to} decision=remember" + historyLog(Log.DEBUG) { textLog } val u = dataPacket.copy(bytes = s.text.toByteArray(), dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE) rememberDataPacket(u) @@ -1080,29 +1234,59 @@ class MeshService : Service() { } } + private val earlyReceivedPackets = ArrayDeque() + // If apps try to send packets when our radio is sleeping, we queue them here instead private val offlineSentPackets = mutableListOf() // Update our model and resend as needed for a MeshPacket we just received from the radio private fun handleReceivedMeshPacket(packet: MeshPacket) { + val preparedPacket = + packet + .toBuilder() + .apply { + // If the rxTime was not set by the device, update with current time + if (packet.rxTime == 0) setRxTime(currentSecond()) + } + .build() Timber.d("[packet]: ${packet.toOneLineString()}") if (haveNodeDB) { - processReceivedMeshPacket( - packet - .toBuilder() - .apply { - // If the rxTime was not set by the device, update with current time - if (packet.rxTime == 0) setRxTime(currentSecond()) - } - .build(), - ) - } else { - Timber.w("Ignoring early received packet: ${packet.toOneLineString()}") - // earlyReceivedPackets.add(packet) - // logAssert(earlyReceivedPackets.size < 128) // The max should normally be about 32, - // but if the device is - // messed up it might try to send forever + processReceivedMeshPacket(preparedPacket) + return } + + val queueSize = earlyReceivedPackets.size + if (queueSize >= MAX_EARLY_PACKET_BUFFER) { + val dropped = earlyReceivedPackets.removeFirst() + historyLog(Log.WARN) { + val portLabel = + if (dropped.hasDecoded()) { + Portnums.PortNum.forNumber(dropped.decoded.portnumValue)?.name + ?: dropped.decoded.portnumValue.toString() + } else { + "unknown" + } + "dropEarlyPacket bufferFull size=$queueSize id=${dropped.id} port=$portLabel" + } + } + + earlyReceivedPackets.addLast(preparedPacket) + val portLabel = + if (preparedPacket.hasDecoded()) { + Portnums.PortNum.forNumber(preparedPacket.decoded.portnumValue)?.name + ?: preparedPacket.decoded.portnumValue.toString() + } else { + "unknown" + } + historyLog { "queueEarlyPacket size=${earlyReceivedPackets.size} id=${preparedPacket.id} port=$portLabel" } + } + + private fun flushEarlyReceivedPackets(reason: String) { + if (earlyReceivedPackets.isEmpty()) return + val packets = earlyReceivedPackets.toList() + earlyReceivedPackets.clear() + historyLog { "replayEarlyPackets reason=$reason count=${packets.size}" } + packets.forEach(::processReceivedMeshPacket) } private fun sendNow(p: DataPacket) { @@ -1240,6 +1424,7 @@ class MeshService : Service() { private var connectTimeMsec = 0L // Called when we gain/lose connection to our radio + @Suppress("CyclomaticComplexMethod") private fun onConnectionChanged(c: ConnectionState) { Timber.d("onConnectionChanged: ${connectionStateHolder.getState()} -> $c") @@ -1292,6 +1477,10 @@ class MeshService : Service() { fun startConnect() { Timber.d("Starting connect") + historyLog { + val address = meshPrefs.deviceAddress ?: "null" + "onReconnect transport=${currentTransport()} node=$address" + } try { connectTimeMsec = System.currentTimeMillis() startConfigOnly() @@ -1426,24 +1615,32 @@ class MeshService : Service() { */ private fun onReceiveFromRadio(bytes: ByteArray) { runCatching { MeshProtos.FromRadio.parseFrom(bytes) } - .onSuccess { proto -> proto.route() } + .onSuccess { proto -> + if (proto.payloadVariantCase == PayloadVariantCase.PAYLOADVARIANT_NOT_SET) { + Timber.w( + "Received FromRadio with PAYLOADVARIANT_NOT_SET. rawBytes=${bytes.toHexString()} proto=$proto", + ) + } + proto.route() + } .onFailure { primaryException -> runCatching { val logRecord = MeshProtos.LogRecord.parseFrom(bytes) handleLogRecord(logRecord) } .onFailure { _ -> - val packet = bytes.toHexString() Timber.e( primaryException, - "Failed to parse radio packet (len=${bytes.size} contents=$packet). Not a valid FromRadio or LogRecord.", + "Failed to parse radio packet (len=${bytes.size} contents=${bytes.toHexString()}). " + + "Not a valid FromRadio or LogRecord.", ) } } } /** Extension function to convert a ByteArray to a hex string for logging. Example output: "0x0a,0x1f,0x..." */ - private fun ByteArray.toHexString(): String = this.joinToString(",") { byte -> String.format("0x%02x", byte) } + private fun ByteArray.toHexString(): String = + this.joinToString(",") { byte -> String.format(Locale.US, "0x%02x", byte) } // A provisional MyNodeInfo that we will install if all of our node config downloads go okay private var newMyNodeInfo: MyNodeEntity? = null @@ -1778,6 +1975,12 @@ class MeshService : Service() { serviceBroadcasts.broadcastConnection() sendAnalytics() reportConnection() + historyLog { + val ports = + rememberDataType.joinToString(",") { port -> Portnums.PortNum.forNumber(port)?.name ?: port.toString() } + "subscribePorts afterReconnect ports=$ports" + } + requestHistoryReplay("onHasSettings") packetHandler.sendToRadio(newMeshPacketTo(myNodeNum).buildAdminPacket { setTimeOnly = currentSecond() }) } @@ -1849,6 +2052,7 @@ class MeshService : Service() { newNodes.clear() serviceScope.handledLaunch { nodeRepository.installConfig(myNodeInfo!!, nodeDBbyNodeNum.values.toList()) } haveNodeDB = true + flushEarlyReceivedPackets("node_info_complete") sendAnalytics() onHasSettings() } @@ -2038,12 +2242,24 @@ class MeshService : Service() { Timber.d( "SetDeviceAddress: Device address changed from ${currentAddr.anonymize} to ${deviceAddr.anonymize}", ) + val currentLabel = currentAddr ?: "null" + val nextLabel = deviceAddr ?: "null" + val nextTransport = currentTransport(deviceAddr) + historyLog { "dbSwitch request current=$currentLabel next=$nextLabel transportNext=$nextTransport" } meshPrefs.deviceAddress = deviceAddr serviceScope.handledLaunch { // Clear only in-memory caches to avoid cross-device bleed discardNodeDB() // Switch active on-disk DB to device-specific database databaseManager.switchActiveDatabase(deviceAddr) + val activeAddress = databaseManager.currentAddress.value + val activeLabel = activeAddress ?: "null" + val transportLabel = currentTransport() + val meshAddress = meshPrefs.deviceAddress ?: "null" + val nodeId = myNodeInfo?.myNodeNum?.toString() ?: "unknown" + val dbSummary = + "dbSwitch activeAddress=$activeLabel nodeId=$nodeId transport=$transportLabel addr=$meshAddress" + historyLog { dbSummary } // Do not clear packet DB here; messages are per-device and should persist clearNotifications() } diff --git a/app/src/test/java/com/geeksville/mesh/service/StoreForwardHistoryRequestTest.kt b/app/src/test/java/com/geeksville/mesh/service/StoreForwardHistoryRequestTest.kt new file mode 100644 index 000000000..52f6b1670 --- /dev/null +++ b/app/src/test/java/com/geeksville/mesh/service/StoreForwardHistoryRequestTest.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2025 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package com.geeksville.mesh.service + +import org.junit.Assert.assertEquals +import org.junit.Test +import org.meshtastic.proto.StoreAndForwardProtos + +class StoreForwardHistoryRequestTest { + + @Test + fun `buildStoreForwardHistoryRequest copies positive parameters`() { + val request = + MeshService.buildStoreForwardHistoryRequest( + lastRequest = 42, + historyReturnWindow = 15, + historyReturnMax = 25, + ) + + assertEquals(StoreAndForwardProtos.StoreAndForward.RequestResponse.CLIENT_HISTORY, request.rr) + assertEquals(42, request.history.lastRequest) + assertEquals(15, request.history.window) + assertEquals(25, request.history.historyMessages) + } + + @Test + fun `buildStoreForwardHistoryRequest omits non-positive parameters`() { + val request = + MeshService.buildStoreForwardHistoryRequest(lastRequest = 0, historyReturnWindow = -1, historyReturnMax = 0) + + assertEquals(StoreAndForwardProtos.StoreAndForward.RequestResponse.CLIENT_HISTORY, request.rr) + assertEquals(0, request.history.lastRequest) + assertEquals(0, request.history.window) + assertEquals(0, request.history.historyMessages) + } + + @Test + fun `resolveHistoryRequestParameters uses config values when positive`() { + val (window, max) = MeshService.resolveHistoryRequestParameters(window = 30, max = 10) + + assertEquals(30, window) + assertEquals(10, max) + } + + @Test + fun `resolveHistoryRequestParameters falls back to defaults when non-positive`() { + val (window, max) = MeshService.resolveHistoryRequestParameters(window = 0, max = -5) + + assertEquals(1440, window) + assertEquals(100, max) + } +} diff --git a/core/prefs/src/main/kotlin/org/meshtastic/core/prefs/mesh/MeshPrefs.kt b/core/prefs/src/main/kotlin/org/meshtastic/core/prefs/mesh/MeshPrefs.kt index a0499a7e6..fb121a692 100644 --- a/core/prefs/src/main/kotlin/org/meshtastic/core/prefs/mesh/MeshPrefs.kt +++ b/core/prefs/src/main/kotlin/org/meshtastic/core/prefs/mesh/MeshPrefs.kt @@ -21,6 +21,7 @@ import android.content.SharedPreferences import androidx.core.content.edit import org.meshtastic.core.prefs.NullableStringPrefDelegate import org.meshtastic.core.prefs.di.MeshSharedPreferences +import java.util.Locale import javax.inject.Inject import javax.inject.Singleton @@ -30,6 +31,10 @@ interface MeshPrefs { fun shouldProvideNodeLocation(nodeNum: Int?): Boolean fun setShouldProvideNodeLocation(nodeNum: Int?, value: Boolean) + + fun getStoreForwardLastRequest(address: String?): Int + + fun setStoreForwardLastRequest(address: String?, value: Int) } @Singleton @@ -43,7 +48,30 @@ class MeshPrefsImpl @Inject constructor(@MeshSharedPreferences private val prefs prefs.edit { putBoolean(provideLocationKey(nodeNum), value) } } + override fun getStoreForwardLastRequest(address: String?): Int = prefs.getInt(storeForwardKey(address), 0) + + override fun setStoreForwardLastRequest(address: String?, value: Int) { + prefs.edit { + if (value <= 0) { + remove(storeForwardKey(address)) + } else { + putInt(storeForwardKey(address), value) + } + } + } + private fun provideLocationKey(nodeNum: Int?) = "provide-location-$nodeNum" + + private fun storeForwardKey(address: String?): String = "store-forward-last-request-${normalizeAddress(address)}" + + private fun normalizeAddress(address: String?): String { + val raw = address?.trim()?.takeIf { it.isNotEmpty() } + return when { + raw == null -> "DEFAULT" + raw.equals(NO_DEVICE_SELECTED, ignoreCase = true) -> "DEFAULT" + else -> raw.uppercase(Locale.US).replace(":", "") + } + } } private const val NO_DEVICE_SELECTED = "n"