feat(connections): Connecting state refactor (#3722)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2025-11-17 15:15:22 -06:00 committed by GitHub
parent 12ccb34553
commit 73d933fe14
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 379 additions and 263 deletions

View file

@ -94,6 +94,7 @@ import org.meshtastic.core.service.ServiceAction
import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.core.strings.Res
import org.meshtastic.core.strings.connected_count
import org.meshtastic.core.strings.connecting
import org.meshtastic.core.strings.critical_alert
import org.meshtastic.core.strings.device_sleeping
import org.meshtastic.core.strings.disconnected
@ -1426,98 +1427,110 @@ class MeshService : Service() {
// Called when we gain/lose connection to our radio
@Suppress("CyclomaticComplexMethod")
private fun onConnectionChanged(c: ConnectionState) {
Timber.d("onConnectionChanged: ${connectionStateHolder.getState()} -> $c")
// Perform all the steps needed once we start waiting for device sleep to complete
fun startDeviceSleep() {
packetHandler.stopPacketQueue()
stopLocationRequests()
stopMqttClientProxy()
if (connectTimeMsec != 0L) {
val now = System.currentTimeMillis()
connectTimeMsec = 0L
analytics.track("connected_seconds", DataPair("connected_seconds", (now - connectTimeMsec) / 1000.0))
}
// Have our timeout fire in the appropriate number of seconds
sleepTimeout =
serviceScope.handledLaunch {
try {
// If we have a valid timeout, wait that long (+30 seconds) otherwise, just
// wait 30 seconds
val timeout = (localConfig.power?.lsSecs ?: 0) + 30
Timber.d("Waiting for sleeping device, timeout=$timeout secs")
delay(timeout * 1000L)
Timber.w("Device timeout out, setting disconnected")
onConnectionChanged(ConnectionState.DISCONNECTED)
} catch (_: CancellationException) {
Timber.d("device sleep timeout cancelled")
}
}
// broadcast an intent with our new connection state
serviceBroadcasts.broadcastConnection()
}
fun startDisconnect() {
Timber.d("Starting disconnect")
packetHandler.stopPacketQueue()
stopLocationRequests()
stopMqttClientProxy()
analytics.track("mesh_disconnect", DataPair("num_nodes", numNodes), DataPair("num_online", numOnlineNodes))
analytics.track("num_nodes", DataPair("num_nodes", numNodes))
// broadcast an intent with our new connection state
serviceBroadcasts.broadcastConnection()
}
fun startConnect() {
Timber.d("Starting connect")
historyLog {
val address = meshPrefs.deviceAddress ?: "null"
"onReconnect transport=${currentTransport()} node=$address"
}
try {
connectTimeMsec = System.currentTimeMillis()
startConfigOnly()
} catch (ex: InvalidProtocolBufferException) {
Timber.e(ex, "Invalid protocol buffer sent by device - update device software and try again")
} catch (ex: RadioNotConnectedException) {
Timber.e("Lost connection to radio during init - waiting for reconnect ${ex.message}")
} catch (ex: RemoteException) {
connectionStateHolder.setState(ConnectionState.DEVICE_SLEEP)
startDeviceSleep()
throw ex
}
}
if (connectionStateHolder.connectionState.value == c && c !is ConnectionState.Connected) return
Timber.d("onConnectionChanged: ${connectionStateHolder.connectionState.value} -> $c")
// Cancel any existing timeouts
sleepTimeout?.let {
it.cancel()
sleepTimeout = null
}
sleepTimeout?.cancel()
sleepTimeout = null
connectionStateHolder.setState(c)
when (c) {
ConnectionState.CONNECTED -> startConnect()
ConnectionState.DEVICE_SLEEP -> startDeviceSleep()
ConnectionState.DISCONNECTED -> startDisconnect()
is ConnectionState.Connecting -> {
connectionStateHolder.setState(ConnectionState.Connecting)
}
is ConnectionState.Connected -> {
handleConnected()
}
is ConnectionState.DeviceSleep -> {
handleDeviceSleep()
}
is ConnectionState.Disconnected -> {
handleDisconnected()
}
}
updateServiceStatusNotification()
}
private fun handleDisconnected() {
connectionStateHolder.setState(ConnectionState.Disconnected)
Timber.d("Starting disconnect")
packetHandler.stopPacketQueue()
stopLocationRequests()
stopMqttClientProxy()
analytics.track("mesh_disconnect", DataPair("num_nodes", numNodes), DataPair("num_online", numOnlineNodes))
analytics.track("num_nodes", DataPair("num_nodes", numNodes))
// broadcast an intent with our new connection state
serviceBroadcasts.broadcastConnection()
}
private fun handleDeviceSleep() {
connectionStateHolder.setState(ConnectionState.DeviceSleep)
packetHandler.stopPacketQueue()
stopLocationRequests()
stopMqttClientProxy()
if (connectTimeMsec != 0L) {
val now = System.currentTimeMillis()
connectTimeMsec = 0L
analytics.track("connected_seconds", DataPair("connected_seconds", (now - connectTimeMsec) / 1000.0))
}
updateServiceStatusNotification()
// Have our timeout fire in the appropriate number of seconds
sleepTimeout =
serviceScope.handledLaunch {
try {
// If we have a valid timeout, wait that long (+30 seconds) otherwise, just
// wait 30 seconds
val timeout = (localConfig.power?.lsSecs ?: 0) + 30
Timber.d("Waiting for sleeping device, timeout=$timeout secs")
delay(timeout * 1000L)
Timber.w("Device timeout out, setting disconnected")
onConnectionChanged(ConnectionState.Disconnected)
} catch (_: CancellationException) {
Timber.d("device sleep timeout cancelled")
}
}
// broadcast an intent with our new connection state
serviceBroadcasts.broadcastConnection()
}
private fun handleConnected() {
connectionStateHolder.setState(ConnectionState.Connecting)
serviceBroadcasts.broadcastConnection()
Timber.d("Starting connect")
historyLog {
val address = meshPrefs.deviceAddress ?: "null"
"onReconnect transport=${currentTransport()} node=$address"
}
try {
connectTimeMsec = System.currentTimeMillis()
startConfigOnly()
} catch (ex: InvalidProtocolBufferException) {
Timber.e(ex, "Invalid protocol buffer sent by device - update device software and try again")
} catch (ex: RadioNotConnectedException) {
Timber.e("Lost connection to radio during init - waiting for reconnect ${ex.message}")
} catch (ex: RemoteException) {
onConnectionChanged(ConnectionState.DeviceSleep)
throw ex
}
}
private fun updateServiceStatusNotification(telemetry: TelemetryProtos.Telemetry? = null): Notification {
val notificationSummary =
when (connectionStateHolder.getState()) {
ConnectionState.CONNECTED -> getString(Res.string.connected_count).format(numOnlineNodes)
when (connectionStateHolder.connectionState.value) {
is ConnectionState.Connected -> getString(Res.string.connected_count).format(numOnlineNodes)
ConnectionState.DISCONNECTED -> getString(Res.string.disconnected)
ConnectionState.DEVICE_SLEEP -> getString(Res.string.device_sleeping)
is ConnectionState.Disconnected -> getString(Res.string.disconnected)
is ConnectionState.DeviceSleep -> getString(Res.string.device_sleeping)
is ConnectionState.Connecting -> getString(Res.string.connecting)
}
return serviceNotifications.updateServiceStateNotification(
summaryString = notificationSummary,
@ -1533,15 +1546,16 @@ class MeshService : Service() {
val effectiveState =
when (newState) {
ConnectionState.CONNECTED -> ConnectionState.CONNECTED
ConnectionState.DEVICE_SLEEP ->
is ConnectionState.Connected -> ConnectionState.Connected
is ConnectionState.DeviceSleep ->
if (lsEnabled) {
ConnectionState.DEVICE_SLEEP
ConnectionState.DeviceSleep
} else {
ConnectionState.DISCONNECTED
ConnectionState.Disconnected
}
ConnectionState.DISCONNECTED -> ConnectionState.DISCONNECTED
is ConnectionState.Connecting -> ConnectionState.Connecting
is ConnectionState.Disconnected -> ConnectionState.Disconnected
}
onConnectionChanged(effectiveState)
}
@ -1972,7 +1986,6 @@ class MeshService : Service() {
private fun onHasSettings() {
processQueuedPackets()
startMqttClientProxy()
serviceBroadcasts.broadcastConnection()
sendAnalytics()
reportConnection()
historyLog {
@ -2055,6 +2068,9 @@ class MeshService : Service() {
flushEarlyReceivedPackets("node_info_complete")
sendAnalytics()
onHasSettings()
connectionStateHolder.setState(ConnectionState.Connected)
serviceBroadcasts.broadcastConnection()
updateServiceStatusNotification()
}
}
@ -2323,7 +2339,7 @@ class MeshService : Service() {
if (p.id == 0) p.id = generatePacketId()
val bytes = p.bytes!!
Timber.i(
"sendData dest=${p.to}, id=${p.id} <- ${bytes.size} bytes (connectionState=${connectionStateHolder.getState()})",
"sendData dest=${p.to}, id=${p.id} <- ${bytes.size} bytes (connectionState=${connectionStateHolder.connectionState.value})",
)
if (p.dataType == 0) throw Exception("Port numbers must be non-zero!")
if (bytes.size >= MeshProtos.Constants.DATA_PAYLOAD_LEN.number) {
@ -2332,7 +2348,7 @@ class MeshService : Service() {
} else {
p.status = MessageStatus.QUEUED
}
if (connectionStateHolder.getState() == ConnectionState.CONNECTED) {
if (connectionStateHolder.connectionState.value == ConnectionState.Connected) {
try {
sendNow(p)
} catch (ex: Exception) {
@ -2450,7 +2466,7 @@ class MeshService : Service() {
}
override fun connectionState(): String = toRemoteExceptions {
val r = connectionStateHolder.getState()
val r = connectionStateHolder.connectionState.value
Timber.i("in connectionState=$r")
r.toString()
}

View file

@ -75,7 +75,7 @@ constructor(
/** Broadcast our current connection status */
fun broadcastConnection() {
val connectionState = connectionStateHolder.getState()
val connectionState = connectionStateHolder.connectionState.value
val intent = Intent(MeshService.ACTION_MESH_CONNECTED).putExtra(EXTRA_CONNECTED, connectionState.toString())
serviceRepository.setConnectionState(connectionState)
explicitBroadcast(intent)

View file

@ -17,17 +17,18 @@
package com.geeksville.mesh.service
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import org.meshtastic.core.service.ConnectionState
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class MeshServiceConnectionStateHolder @Inject constructor() {
private var connectionState = ConnectionState.DISCONNECTED
private val _connectionState = MutableStateFlow<ConnectionState>(ConnectionState.Disconnected)
val connectionState = _connectionState.asStateFlow()
fun setState(state: ConnectionState) {
connectionState = state
_connectionState.value = state
}
fun getState() = connectionState
}

View file

@ -135,7 +135,7 @@ constructor(
queueJob =
scope.handledLaunch {
Timber.d("packet queueJob started")
while (connectionStateHolder.getState() == ConnectionState.CONNECTED) {
while (connectionStateHolder.connectionState.value == ConnectionState.Connected) {
// take the first packet from the queue head
val packet = queuedPackets.poll() ?: break
try {
@ -181,7 +181,9 @@ constructor(
val future = CompletableFuture<Boolean>()
queueResponse[packet.id] = future
try {
if (connectionStateHolder.getState() != ConnectionState.CONNECTED) throw RadioNotConnectedException()
if (connectionStateHolder.connectionState.value != ConnectionState.Connected) {
throw RadioNotConnectedException()
}
sendToRadio(ToRadio.newBuilder().apply { this.packet = packet })
} catch (ex: Exception) {
Timber.e(ex, "sendToRadio error: ${ex.message}")