refactor(service): improve state management and concurrency in MeshSe… (#2678)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2025-08-09 22:48:45 -05:00 committed by GitHub
parent ed30cbdb18
commit 4dfa71155b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -199,6 +199,7 @@ class MeshService :
private const val CONFIG_ONLY_NONCE = 69420
private const val NODE_INFO_ONLY_NONCE = 69421
private const val CONFIG_WAIT_MS = 250L
}
private var previousSummary: String? = null
@ -213,6 +214,7 @@ class MeshService :
private val serviceJob = Job()
private val serviceScope = CoroutineScope(Dispatchers.IO + serviceJob)
private val locationJobLock = Any()
private var locationFlow: Job? = null
private var mqttMessageFlow: Job? = null
@ -248,39 +250,43 @@ class MeshService :
/** Starts location requests if permissions are granted and not already active. */
@RequiresPermission(allOf = [Manifest.permission.ACCESS_FINE_LOCATION, Manifest.permission.ACCESS_COARSE_LOCATION])
private fun startLocationRequests() {
if (locationFlow?.isActive == true) return
synchronized(locationJobLock) {
if (locationFlow?.isActive == true) return
if (hasLocationPermission()) {
locationFlow =
locationRepository
.getLocations()
.onEach { location ->
val positionBuilder = position {
latitudeI = Position.degI(location.latitude)
longitudeI = Position.degI(location.longitude)
if (LocationCompat.hasMslAltitude(location)) {
altitude = LocationCompat.getMslAltitudeMeters(location).toInt()
if (hasLocationPermission()) {
locationFlow =
locationRepository
.getLocations()
.onEach { location ->
val positionBuilder = position {
latitudeI = Position.degI(location.latitude)
longitudeI = Position.degI(location.longitude)
if (LocationCompat.hasMslAltitude(location)) {
altitude = LocationCompat.getMslAltitudeMeters(location).toInt()
}
altitudeHae = location.altitude.toInt()
time = (location.time / 1000).toInt()
groundSpeed = location.speed.toInt()
groundTrack = location.bearing.toInt()
locationSource = MeshProtos.Position.LocSource.LOC_EXTERNAL
}
altitudeHae = location.altitude.toInt()
time = (location.time / 1000).toInt()
groundSpeed = location.speed.toInt()
groundTrack = location.bearing.toInt()
locationSource = MeshProtos.Position.LocSource.LOC_EXTERNAL
sendPosition(positionBuilder)
}
sendPosition(positionBuilder)
}
.launchIn(serviceScope)
.launchIn(serviceScope)
}
}
}
private fun stopLocationRequests() {
locationFlow
?.takeIf { it.isActive }
?.let {
info("Stopping location requests")
it.cancel()
locationFlow = null
}
synchronized(locationJobLock) {
locationFlow
?.takeIf { it.isActive }
?.let {
info("Stopping location requests")
it.cancel()
locationFlow = null
}
}
}
private fun sendToRadio(toRadioBuilder: ToRadio.Builder) {
@ -364,8 +370,6 @@ class MeshService :
radioConfigRepository.moduleConfigFlow.onEach { moduleConfig = it }.launchIn(serviceScope)
radioConfigRepository.channelSetFlow.onEach { channelSet = it }.launchIn(serviceScope)
radioConfigRepository.serviceAction.onEach(::onServiceAction).launchIn(serviceScope)
loadSettings()
}
override fun onBind(intent: Intent?): IBinder = binder
@ -417,40 +421,34 @@ class MeshService :
connectionRouter.stop()
}
// Node Database and Model Management
private fun loadSettings() = serviceScope.handledLaunch {
resetState() // Clear previous state
myNodeInfo = radioConfigRepository.myNodeInfo.value
val nodesFromDb = radioConfigRepository.getNodeDBbyNum()
nodeDBbyNodeNum.putAll(nodesFromDb)
nodesFromDb.values.forEach { nodeEntity ->
if (nodeEntity.user.id.isNotEmpty()) {
_nodeDBbyID[nodeEntity.user.id] = nodeEntity
}
}
}
/**
* Resets all relevant service state variables to their defaults or clears collections. This is crucial when
* switching to a new device connection to prevent state from a previous session from affecting the new one. It
* ensures a clean slate for node information, configurations, pending operations, and cached data.
* Resets in-memory app state variables. This is crucial when switching to a new device connection to prevent state
* from a previous session from affecting the new one. It ensures a clean slate for node information,
* configurations, pending operations, and cached data.
*
* It does *not* clear persisted DataStore settings, only the Nodes DB from Room.
*/
private fun resetState() = serviceScope.handledLaunch {
private suspend fun resetState() {
debug("Discarding NodeDB and resetting all service state for new device connection")
clearDatabases()
// Core Node and Config data
// Clear only the node database, not persisted configs
radioConfigRepository.clearNodeDB()
// Core Node and Config data (in-memory only)
myNodeInfo = null
rawMyNodeInfo = null
nodeDBbyNodeNum.clear()
_nodeDBbyID.clear()
localConfig = LocalConfig.getDefaultInstance()
moduleConfig = LocalModuleConfig.getDefaultInstance()
channelSet = AppOnlyProtos.ChannelSet.getDefaultInstance()
localStatsTelemetry = null
sessionPasskey = ByteString.EMPTY
// Pending operations and cached data
currentPacketId = Random(System.currentTimeMillis()).nextLong().absoluteValue
packetIdGenerator.set(Random(System.currentTimeMillis()).nextLong().absoluteValue)
offlineSentPackets.clear()
stopPacketQueue()
@ -464,11 +462,7 @@ class MeshService :
batteryPercentCooldowns.clear()
radioConfigRepository.clearChannelSet()
radioConfigRepository.clearLocalConfig()
radioConfigRepository.clearLocalModuleConfig()
info("MeshService state has been reset for a new device session.")
info("MeshService app state has been reset for a new device session.")
}
private var myNodeInfo: MyNodeEntity? = null
@ -1082,7 +1076,7 @@ class MeshService :
}
}
private val offlineSentPackets = mutableListOf<DataPacket>()
private val offlineSentPackets = ConcurrentLinkedQueue<DataPacket>()
private fun handleReceivedMeshPacket(packet: MeshPacket) {
val processedPacket =
@ -1098,6 +1092,7 @@ class MeshService :
private val queuedPackets = ConcurrentLinkedQueue<MeshPacket>()
private val queueResponse = ConcurrentHashMap<Int, CompletableFuture<Boolean>>()
private val queueJobLock = Any()
private var queueJob: Job? = null
private fun sendPacket(packet: MeshPacket): CompletableFuture<Boolean> {
@ -1117,41 +1112,46 @@ class MeshService :
}
private fun startPacketQueue() {
if (queueJob?.isActive == true) return
queueJob =
serviceScope.handledLaunch {
debug("Packet queueJob started")
while (
connectionRouter.connectionState.value == ConnectionState.CONNECTED && queuedPackets.isNotEmpty()
) {
val packet = queuedPackets.poll() ?: break // Should not be null if loop condition met
try {
debug("Queue: Sending packet id=${packet.id.toUInt()}")
val success = sendPacket(packet).get(2, TimeUnit.MINUTES)
debug("Queue: Packet id=${packet.id.toUInt()} sent, success=$success")
} catch (e: TimeoutException) {
debug("Queue: Packet id=${packet.id.toUInt()} timed out: ${e.message}")
queueResponse.remove(packet.id)?.complete(false)
} catch (e: Exception) {
debug("Queue: Packet id=${packet.id.toUInt()} failed: ${e.message}")
queueResponse.remove(packet.id)?.complete(false)
synchronized(queueJobLock) {
if (queueJob?.isActive == true) return
queueJob =
serviceScope.handledLaunch {
debug("Packet queueJob started")
while (
connectionRouter.connectionState.value == ConnectionState.CONNECTED &&
queuedPackets.isNotEmpty()
) {
val packet = queuedPackets.poll() ?: break // Should not be null if loop condition met
try {
debug("Queue: Sending packet id=${packet.id.toUInt()}")
val success = sendPacket(packet).get(2, TimeUnit.MINUTES)
debug("Queue: Packet id=${packet.id.toUInt()} sent, success=$success")
} catch (e: TimeoutException) {
debug("Queue: Packet id=${packet.id.toUInt()} timed out: ${e.message}")
queueResponse.remove(packet.id)?.complete(false)
} catch (e: Exception) {
debug("Queue: Packet id=${packet.id.toUInt()} failed: ${e.message}")
queueResponse.remove(packet.id)?.complete(false)
}
}
debug("Packet queueJob finished or radio disconnected")
}
debug("Packet queueJob finished or radio disconnected")
}
}
}
private fun stopPacketQueue() {
queueJob
?.takeIf { it.isActive }
?.let {
info("Stopping packet queueJob")
it.cancel()
queueJob = null
queuedPackets.clear()
queueResponse.values.forEach { future -> if (!future.isDone) future.complete(false) }
queueResponse.clear()
}
synchronized(queueJobLock) {
queueJob
?.takeIf { it.isActive }
?.let {
info("Stopping packet queueJob")
it.cancel()
queueJob = null
queuedPackets.clear()
queueResponse.values.forEach { future -> if (!future.isDone) future.complete(false) }
queueResponse.clear()
}
}
}
private fun sendNow(dataPacket: DataPacket) {
@ -1161,10 +1161,8 @@ class MeshService :
}
private fun processQueuedPackets() {
val packetsToSend = ArrayList(offlineSentPackets) // Avoid ConcurrentModificationException
offlineSentPackets.clear()
packetsToSend.forEach { p ->
while (offlineSentPackets.isNotEmpty()) {
val p = offlineSentPackets.poll() ?: continue
try {
sendNow(p)
} catch (ex: Exception) {
@ -1669,8 +1667,12 @@ class MeshService :
)
// we have recieved the response to our ConfigOnly request
// send a heartbeat, then request NodeInfoOnly to get the nodeDb from the radio
serviceScope.handledLaunch { radioInterfaceService.keepAlive() }
sendNodeInfoOnlyRequest()
serviceScope.handledLaunch {
delay(CONFIG_WAIT_MS)
radioInterfaceService.keepAlive()
delay(CONFIG_WAIT_MS)
sendNodeInfoOnlyRequest()
}
}
private fun handleNodeInfoNonceResponse() {
@ -1687,7 +1689,6 @@ class MeshService :
}
private fun sendConfigOnlyRequest() {
resetState()
debug("Starting config only with nonce=$CONFIG_ONLY_NONCE")
sendToRadio(ToRadio.newBuilder().setWantConfigId(CONFIG_ONLY_NONCE))
}
@ -1821,11 +1822,6 @@ class MeshService :
lateinit var sharedPreferences: SharedPreferences
fun clearDatabases() = serviceScope.handledLaunch {
debug("Clearing nodeDB")
radioConfigRepository.clearNodeDB()
}
private fun updateLastAddress(deviceAddr: String?) {
val currentAddr = lastAddress.value
debug("setDeviceAddress: New: ${deviceAddr.anonymize}, Old: ${currentAddr.anonymize}")
@ -1833,9 +1829,8 @@ class MeshService :
if (deviceAddr != currentAddr) {
_lastAddress.value = deviceAddr ?: NO_DEVICE_SELECTED
sharedPreferences.edit { putString(DEVICE_ADDRESS_KEY, deviceAddr) }
serviceScope.handledLaunch { resetState() }
clearNotifications()
clearDatabases()
resetState()
}
}