refactor: null safety, update date/time libraries, and migrate tests (#4900)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2026-03-23 18:17:50 -05:00 committed by GitHub
parent f826cac6c8
commit 664ebf218e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
163 changed files with 503 additions and 4993 deletions

View file

@ -119,11 +119,10 @@ class MeshConfigFlowManagerImpl(
private fun handleNodeInfoComplete() {
Logger.i { "NodeInfo complete (Stage 2)" }
val entities =
newNodes.map { info ->
nodeManager.installNodeInfo(info, withBroadcast = false)
nodeManager.nodeDBbyNodeNum[info.num]!!
}
val entities = newNodes.map { info ->
nodeManager.installNodeInfo(info, withBroadcast = false)
nodeManager.nodeDBbyNodeNum[info.num]!!
}
newNodes.clear()
scope.handledLaunch {

View file

@ -188,20 +188,19 @@ class MeshConnectionManagerImpl(
private fun startHandshakeStallGuard(stage: Int, action: () -> Unit) {
handshakeTimeout?.cancel()
handshakeTimeout =
scope.handledLaunch {
handshakeTimeout = scope.handledLaunch {
delay(HANDSHAKE_TIMEOUT)
if (serviceRepository.connectionState.value is ConnectionState.Connecting) {
Logger.w { "Handshake stall detected! Retrying Stage $stage." }
action()
// Recursive timeout for one more try
delay(HANDSHAKE_TIMEOUT)
if (serviceRepository.connectionState.value is ConnectionState.Connecting) {
Logger.w { "Handshake stall detected! Retrying Stage $stage." }
action()
// Recursive timeout for one more try
delay(HANDSHAKE_TIMEOUT)
if (serviceRepository.connectionState.value is ConnectionState.Connecting) {
Logger.e { "Handshake still stalled after retry. Resetting connection." }
onConnectionChanged(ConnectionState.Disconnected)
}
Logger.e { "Handshake still stalled after retry. Resetting connection." }
onConnectionChanged(ConnectionState.Disconnected)
}
}
}
}
private fun handleDeviceSleep() {
@ -220,19 +219,18 @@ class MeshConnectionManagerImpl(
)
}
sleepTimeout =
scope.handledLaunch {
try {
val localConfig = radioConfigRepository.localConfigFlow.first()
val timeout = (localConfig.power?.ls_secs ?: 0) + DEVICE_SLEEP_TIMEOUT_SECONDS
Logger.d { "Waiting for sleeping device, timeout=$timeout secs" }
delay(timeout.seconds)
Logger.w { "Device timeout out, setting disconnected" }
onConnectionChanged(ConnectionState.Disconnected)
} catch (_: CancellationException) {
Logger.d { "device sleep timeout cancelled" }
}
sleepTimeout = scope.handledLaunch {
try {
val localConfig = radioConfigRepository.localConfigFlow.first()
val timeout = (localConfig.power?.ls_secs ?: 0) + DEVICE_SLEEP_TIMEOUT_SECONDS
Logger.d { "Waiting for sleeping device, timeout=$timeout secs" }
delay(timeout.seconds)
Logger.w { "Device timeout out, setting disconnected" }
onConnectionChanged(ConnectionState.Disconnected)
} catch (_: CancellationException) {
Logger.d { "device sleep timeout cancelled" }
}
}
serviceBroadcasts.broadcastConnection()
}

View file

@ -630,8 +630,7 @@ class MeshDataHandlerImpl(
// Find the original packet to get the contactKey
packetRepository.value.getPacketByPacketId(decoded.reply_id)?.let { originalPacket ->
// Skip notification if the original message was filtered
val targetId =
if (originalPacket.from == DataPacket.ID_LOCAL) originalPacket.to else originalPacket.from
val targetId = if (originalPacket.from == DataPacket.ID_LOCAL) originalPacket.to else originalPacket.from
val contactKey = "${originalPacket.channel}$targetId"
val conversationMuted = packetRepository.value.getContactSettings(contactKey).isMuted
val nodeMuted = nodeManager.nodeDBbyID[fromId]?.isMuted == true
@ -640,11 +639,7 @@ class MeshDataHandlerImpl(
if (!isSilent) {
val channelName =
if (originalPacket.to == DataPacket.ID_BROADCAST) {
radioConfigRepository.channelSetFlow
.first()
.settings
.getOrNull(originalPacket.channel)
?.name
radioConfigRepository.channelSetFlow.first().settings.getOrNull(originalPacket.channel)?.name
} else {
null
}

View file

@ -162,13 +162,12 @@ class MeshMessageProcessorImpl(
private fun flushEarlyReceivedPackets(reason: String) {
scope.launch {
val packets =
earlyMutex.withLock {
if (earlyReceivedPackets.isEmpty()) return@withLock emptyList<MeshPacket>()
val list = earlyReceivedPackets.toList()
earlyReceivedPackets.clear()
list
}
val packets = earlyMutex.withLock {
if (earlyReceivedPackets.isEmpty()) return@withLock emptyList<MeshPacket>()
val list = earlyReceivedPackets.toList()
earlyReceivedPackets.clear()
list
}
if (packets.isEmpty()) return@launch
Logger.d { "replayEarlyPackets reason=$reason count=${packets.size}" }

View file

@ -271,8 +271,9 @@ class NodeManagerImpl(
if (shouldPreserveExistingUser(node.user, user)) {
// keep existing names
} else {
var newUser =
user.let { if (it.is_licensed == true) it.copy(public_key = ByteString.EMPTY) else it }
var newUser = user.let {
if (it.is_licensed == true) it.copy(public_key = ByteString.EMPTY) else it
}
if (info.via_mqtt) {
newUser = newUser.copy(long_name = "${newUser.long_name} (MQTT)")
}

View file

@ -146,32 +146,31 @@ class PacketHandlerImpl(
private fun startPacketQueue() {
if (queueJob?.isActive == true) return
queueJob =
scope.handledLaunch {
try {
while (serviceRepository.connectionState.value == ConnectionState.Connected) {
val packet = queueMutex.withLock { queuedPackets.removeFirstOrNull() } ?: break
@Suppress("TooGenericExceptionCaught", "SwallowedException")
try {
val response = sendPacket(packet)
Logger.d { "queueJob packet id=${packet.id.toUInt()} waiting" }
val success = withTimeout(TIMEOUT) { response.await() }
Logger.d { "queueJob packet id=${packet.id.toUInt()} success $success" }
} catch (e: TimeoutCancellationException) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} timeout" }
} catch (e: Exception) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} failed" }
} finally {
responseMutex.withLock { queueResponse.remove(packet.id) }
}
}
} finally {
queueJob = null
if (queueMutex.withLock { queuedPackets.isNotEmpty() }) {
startPacketQueue()
queueJob = scope.handledLaunch {
try {
while (serviceRepository.connectionState.value == ConnectionState.Connected) {
val packet = queueMutex.withLock { queuedPackets.removeFirstOrNull() } ?: break
@Suppress("TooGenericExceptionCaught", "SwallowedException")
try {
val response = sendPacket(packet)
Logger.d { "queueJob packet id=${packet.id.toUInt()} waiting" }
val success = withTimeout(TIMEOUT) { response.await() }
Logger.d { "queueJob packet id=${packet.id.toUInt()} success $success" }
} catch (e: TimeoutCancellationException) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} timeout" }
} catch (e: Exception) {
Logger.d { "queueJob packet id=${packet.id.toUInt()} failed" }
} finally {
responseMutex.withLock { queueResponse.remove(packet.id) }
}
}
} finally {
queueJob = null
if (queueMutex.withLock { queuedPackets.isNotEmpty() }) {
startPacketQueue()
}
}
}
}
private fun changeStatus(packetId: Int, m: MessageStatus) = scope.handledLaunch {

View file

@ -48,15 +48,14 @@ class TracerouteSnapshotRepositoryImpl(
val dao = dbManager.currentDb.value.tracerouteNodePositionDao()
dao.deleteByLogUuid(logUuid)
if (positions.isEmpty()) return@withContext
val entities =
positions.map { (nodeNum, position) ->
TracerouteNodePositionEntity(
logUuid = logUuid,
requestId = requestId,
nodeNum = nodeNum,
position = position,
)
}
val entities = positions.map { (nodeNum, position) ->
TracerouteNodePositionEntity(
logUuid = logUuid,
requestId = requestId,
nodeNum = nodeNum,
position = position,
)
}
dao.insertAll(entities)
}
}

View file

@ -490,8 +490,10 @@ class MeshDataHandlerTest {
MeshPacket(
id = 42,
from = 456,
decoded =
Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = "hello".encodeToByteArray().toByteString()),
decoded = Data(
portnum = PortNum.TEXT_MESSAGE_APP,
payload = "hello".encodeToByteArray().toByteString(),
),
)
val dataPacket =
DataPacket(
@ -508,8 +510,7 @@ class MeshDataHandlerTest {
// Provide sender node so getSenderName() doesn't fall back to getString (requires Skiko)
every { nodeManager.nodeDBbyID } returns
mapOf(
"!remote" to
Node(num = 456, user = User(id = "!remote", long_name = "Remote User", short_name = "RU")),
"!remote" to Node(num = 456, user = User(id = "!remote", long_name = "Remote User", short_name = "RU")),
)
handler.handleReceivedData(packet, 123)
@ -524,8 +525,10 @@ class MeshDataHandlerTest {
MeshPacket(
id = 42,
from = 456,
decoded =
Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = "hello".encodeToByteArray().toByteString()),
decoded = Data(
portnum = PortNum.TEXT_MESSAGE_APP,
payload = "hello".encodeToByteArray().toByteString(),
),
)
val dataPacket =
DataPacket(
@ -597,8 +600,7 @@ class MeshDataHandlerTest {
MeshPacket(
id = 55,
from = 456,
decoded =
Data(portnum = PortNum.RANGE_TEST_APP, payload = "test".encodeToByteArray().toByteString()),
decoded = Data(portnum = PortNum.RANGE_TEST_APP, payload = "test".encodeToByteArray().toByteString()),
)
val dataPacket =
DataPacket(
@ -614,8 +616,7 @@ class MeshDataHandlerTest {
every { messageFilter.shouldFilter(any(), any()) } returns false
every { nodeManager.nodeDBbyID } returns
mapOf(
"!remote" to
Node(num = 456, user = User(id = "!remote", long_name = "Remote User", short_name = "RU")),
"!remote" to Node(num = 456, user = User(id = "!remote", long_name = "Remote User", short_name = "RU")),
)
handler.handleReceivedData(packet, 123)
@ -687,8 +688,10 @@ class MeshDataHandlerTest {
MeshPacket(
id = 88,
from = 456,
decoded =
Data(portnum = PortNum.TEXT_MESSAGE_APP, payload = "hello".encodeToByteArray().toByteString()),
decoded = Data(
portnum = PortNum.TEXT_MESSAGE_APP,
payload = "hello".encodeToByteArray().toByteString(),
),
)
val dataPacket =
DataPacket(