Align CoroutineDispatchers usage (#3481)

This commit is contained in:
Phil Oliver 2025-10-16 12:12:20 -04:00 committed by GitHub
parent 50b02efcee
commit 88ba0aa449
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 55 additions and 117 deletions

View file

@ -17,7 +17,6 @@
package org.meshtastic.core.data.repository
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
@ -25,7 +24,7 @@ import kotlinx.coroutines.withContext
import kotlinx.serialization.SerializationException
import kotlinx.serialization.json.Json
import org.meshtastic.core.data.model.CustomTileProviderConfig
import org.meshtastic.core.di.annotation.IoDispatcher
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.prefs.map.MapTileProviderPrefs
import timber.log.Timber
import javax.inject.Inject
@ -49,7 +48,7 @@ class CustomTileProviderRepositoryImpl
@Inject
constructor(
private val json: Json,
@IoDispatcher private val ioDispatcher: CoroutineDispatcher,
private val dispatchers: CoroutineDispatchers,
private val mapTileProviderPrefs: MapTileProviderPrefs,
) : CustomTileProviderRepository {
@ -98,7 +97,7 @@ constructor(
}
private suspend fun saveDataToPrefs(providers: List<CustomTileProviderConfig>) {
withContext(ioDispatcher) {
withContext(dispatchers.io) {
try {
val jsonString = json.encodeToString(providers)
mapTileProviderPrefs.customTileProviders = jsonString

View file

@ -18,7 +18,6 @@
package org.meshtastic.core.data.repository
import dagger.Lazy
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.distinctUntilChanged
@ -27,7 +26,7 @@ import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.withContext
import org.meshtastic.core.database.dao.MeshLogDao
import org.meshtastic.core.database.entity.MeshLog
import org.meshtastic.core.di.annotation.IoDispatcher
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.MeshProtos.MeshPacket
import org.meshtastic.proto.Portnums
@ -39,15 +38,15 @@ class MeshLogRepository
@Inject
constructor(
private val meshLogDaoLazy: Lazy<MeshLogDao>,
@IoDispatcher private val ioDispatcher: CoroutineDispatcher,
private val dispatchers: CoroutineDispatchers,
) {
private val meshLogDao by lazy { meshLogDaoLazy.get() }
fun getAllLogs(maxItems: Int = MAX_ITEMS): Flow<List<MeshLog>> =
meshLogDao.getAllLogs(maxItems).flowOn(ioDispatcher).conflate()
meshLogDao.getAllLogs(maxItems).flowOn(dispatchers.io).conflate()
fun getAllLogsInReceiveOrder(maxItems: Int = MAX_ITEMS): Flow<List<MeshLog>> =
meshLogDao.getAllLogsInReceiveOrder(maxItems).flowOn(ioDispatcher).conflate()
meshLogDao.getAllLogsInReceiveOrder(maxItems).flowOn(dispatchers.io).conflate()
private fun parseTelemetryLog(log: MeshLog): Telemetry? = runCatching {
Telemetry.parseFrom(log.fromRadio.packet.decoded.payload)
@ -111,34 +110,34 @@ constructor(
.getLogsFrom(nodeNum, Portnums.PortNum.TELEMETRY_APP_VALUE, MAX_MESH_PACKETS)
.distinctUntilChanged()
.mapLatest { list -> list.mapNotNull(::parseTelemetryLog) }
.flowOn(ioDispatcher)
.flowOn(dispatchers.io)
fun getLogsFrom(
nodeNum: Int,
portNum: Int = Portnums.PortNum.UNKNOWN_APP_VALUE,
maxItem: Int = MAX_MESH_PACKETS,
): Flow<List<MeshLog>> =
meshLogDao.getLogsFrom(nodeNum, portNum, maxItem).distinctUntilChanged().flowOn(ioDispatcher)
meshLogDao.getLogsFrom(nodeNum, portNum, maxItem).distinctUntilChanged().flowOn(dispatchers.io)
/*
* Retrieves MeshPackets matching 'nodeNum' and 'portNum'.
* If 'portNum' is not specified, returns all MeshPackets. Otherwise, filters by 'portNum'.
*/
fun getMeshPacketsFrom(nodeNum: Int, portNum: Int = Portnums.PortNum.UNKNOWN_APP_VALUE): Flow<List<MeshPacket>> =
getLogsFrom(nodeNum, portNum).mapLatest { list -> list.map { it.fromRadio.packet } }.flowOn(ioDispatcher)
getLogsFrom(nodeNum, portNum).mapLatest { list -> list.map { it.fromRadio.packet } }.flowOn(dispatchers.io)
fun getMyNodeInfo(): Flow<MeshProtos.MyNodeInfo?> = getLogsFrom(0, 0)
.mapLatest { list -> list.firstOrNull { it.myNodeInfo != null }?.myNodeInfo }
.flowOn(ioDispatcher)
.flowOn(dispatchers.io)
suspend fun insert(log: MeshLog) = withContext(ioDispatcher) { meshLogDao.insert(log) }
suspend fun insert(log: MeshLog) = withContext(dispatchers.io) { meshLogDao.insert(log) }
suspend fun deleteAll() = withContext(ioDispatcher) { meshLogDao.deleteAll() }
suspend fun deleteAll() = withContext(dispatchers.io) { meshLogDao.deleteAll() }
suspend fun deleteLog(uuid: String) = withContext(ioDispatcher) { meshLogDao.deleteLog(uuid) }
suspend fun deleteLog(uuid: String) = withContext(dispatchers.io) { meshLogDao.deleteLog(uuid) }
suspend fun deleteLogs(nodeNum: Int, portNum: Int) =
withContext(ioDispatcher) { meshLogDao.deleteLogs(nodeNum, portNum) }
withContext(dispatchers.io) { meshLogDao.deleteLogs(nodeNum, portNum) }
companion object {
private const val MAX_ITEMS = 500

View file

@ -19,7 +19,6 @@ package org.meshtastic.core.data.repository
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.coroutineScope
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
@ -37,7 +36,7 @@ import org.meshtastic.core.database.entity.MyNodeEntity
import org.meshtastic.core.database.entity.NodeEntity
import org.meshtastic.core.database.model.Node
import org.meshtastic.core.database.model.NodeSortOption
import org.meshtastic.core.di.annotation.IoDispatcher
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.util.onlineTimeThreshold
import org.meshtastic.proto.MeshProtos
@ -52,13 +51,13 @@ class NodeRepository
constructor(
processLifecycle: Lifecycle,
private val nodeInfoDao: NodeInfoDao,
@IoDispatcher private val ioDispatcher: CoroutineDispatcher,
private val dispatchers: CoroutineDispatchers,
) {
// hardware info about our local device (can be null)
val myNodeInfo: StateFlow<MyNodeEntity?> =
nodeInfoDao
.getMyNodeInfo()
.flowOn(ioDispatcher)
.flowOn(dispatchers.io)
.stateIn(processLifecycle.coroutineScope, SharingStarted.Eagerly, null)
// our node info
@ -83,7 +82,7 @@ constructor(
_ourNodeInfo.value = ourNodeInfo
_myId.value = ourNodeInfo?.user?.id
}
.flowOn(ioDispatcher)
.flowOn(dispatchers.io)
.conflate()
.stateIn(processLifecycle.coroutineScope, SharingStarted.Eagerly, emptyMap())
@ -115,43 +114,43 @@ constructor(
lastHeardMin = if (onlyOnline) onlineTimeThreshold() else -1,
)
.mapLatest { list -> list.map { it.toModel() } }
.flowOn(ioDispatcher)
.flowOn(dispatchers.io)
.conflate()
suspend fun upsert(node: NodeEntity) = withContext(ioDispatcher) { nodeInfoDao.upsert(node) }
suspend fun upsert(node: NodeEntity) = withContext(dispatchers.io) { nodeInfoDao.upsert(node) }
suspend fun installConfig(mi: MyNodeEntity, nodes: List<NodeEntity>) =
withContext(ioDispatcher) { nodeInfoDao.installConfig(mi, nodes) }
withContext(dispatchers.io) { nodeInfoDao.installConfig(mi, nodes) }
suspend fun clearNodeDB() = withContext(ioDispatcher) { nodeInfoDao.clearNodeInfo() }
suspend fun clearNodeDB() = withContext(dispatchers.io) { nodeInfoDao.clearNodeInfo() }
suspend fun deleteNode(num: Int) = withContext(ioDispatcher) {
suspend fun deleteNode(num: Int) = withContext(dispatchers.io) {
nodeInfoDao.deleteNode(num)
nodeInfoDao.deleteMetadata(num)
}
suspend fun deleteNodes(nodeNums: List<Int>) = withContext(ioDispatcher) {
suspend fun deleteNodes(nodeNums: List<Int>) = withContext(dispatchers.io) {
nodeInfoDao.deleteNodes(nodeNums)
nodeNums.forEach { nodeInfoDao.deleteMetadata(it) }
}
suspend fun getNodesOlderThan(lastHeard: Int): List<NodeEntity> =
withContext(ioDispatcher) { nodeInfoDao.getNodesOlderThan(lastHeard) }
withContext(dispatchers.io) { nodeInfoDao.getNodesOlderThan(lastHeard) }
suspend fun getUnknownNodes(): List<NodeEntity> = withContext(ioDispatcher) { nodeInfoDao.getUnknownNodes() }
suspend fun getUnknownNodes(): List<NodeEntity> = withContext(dispatchers.io) { nodeInfoDao.getUnknownNodes() }
suspend fun insertMetadata(metadata: MetadataEntity) = withContext(ioDispatcher) { nodeInfoDao.upsert(metadata) }
suspend fun insertMetadata(metadata: MetadataEntity) = withContext(dispatchers.io) { nodeInfoDao.upsert(metadata) }
val onlineNodeCount: Flow<Int> =
nodeInfoDao
.nodeDBbyNum()
.mapLatest { map -> map.values.count { it.node.lastHeard > onlineTimeThreshold() } }
.flowOn(ioDispatcher)
.flowOn(dispatchers.io)
.conflate()
val totalNodeCount: Flow<Int> =
nodeInfoDao.nodeDBbyNum().mapLatest { map -> map.values.count() }.flowOn(ioDispatcher).conflate()
nodeInfoDao.nodeDBbyNum().mapLatest { map -> map.values.count() }.flowOn(dispatchers.io).conflate()
suspend fun setNodeNotes(num: Int, notes: String) =
withContext(ioDispatcher) { nodeInfoDao.setNodeNotes(num, notes) }
withContext(dispatchers.io) { nodeInfoDao.setNodeNotes(num, notes) }
}

View file

@ -18,30 +18,29 @@
package org.meshtastic.core.data.repository
import dagger.Lazy
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.withContext
import org.meshtastic.core.database.dao.QuickChatActionDao
import org.meshtastic.core.database.entity.QuickChatAction
import org.meshtastic.core.di.annotation.IoDispatcher
import org.meshtastic.core.di.CoroutineDispatchers
import javax.inject.Inject
class QuickChatActionRepository
@Inject
constructor(
private val quickChatDaoLazy: Lazy<QuickChatActionDao>,
@IoDispatcher private val ioDispatcher: CoroutineDispatcher,
private val dispatchers: CoroutineDispatchers,
) {
private val quickChatActionDao by lazy { quickChatDaoLazy.get() }
fun getAllActions() = quickChatActionDao.getAll().flowOn(ioDispatcher)
fun getAllActions() = quickChatActionDao.getAll().flowOn(dispatchers.io)
suspend fun upsert(action: QuickChatAction) = withContext(ioDispatcher) { quickChatActionDao.upsert(action) }
suspend fun upsert(action: QuickChatAction) = withContext(dispatchers.io) { quickChatActionDao.upsert(action) }
suspend fun deleteAll() = withContext(ioDispatcher) { quickChatActionDao.deleteAll() }
suspend fun deleteAll() = withContext(dispatchers.io) { quickChatActionDao.deleteAll() }
suspend fun delete(action: QuickChatAction) = withContext(ioDispatcher) { quickChatActionDao.delete(action) }
suspend fun delete(action: QuickChatAction) = withContext(dispatchers.io) { quickChatActionDao.delete(action) }
suspend fun setItemPosition(uuid: Long, newPos: Int) =
withContext(ioDispatcher) { quickChatActionDao.updateActionPosition(uuid, newPos) }
withContext(dispatchers.io) { quickChatActionDao.updateActionPosition(uuid, newPos) }
}