diff --git a/conductor/tracks.md b/conductor/tracks.md index 431ed9884..eea4743a4 100644 --- a/conductor/tracks.md +++ b/conductor/tracks.md @@ -10,5 +10,5 @@ This file tracks all major tracks for the project. Each track has its own detail - [x] **Track: Extract DatabaseManager to KMP** *Link: [./tracks/extract_database_manager_kmp_20260320/](./tracks/extract_database_manager_kmp_20260320/)* -- [ ] **Track: Extract RadioInterfaceService to KMP** +- [x] **Track: Extract RadioInterfaceService to KMP** *Link: [./tracks/extract_radio_interface_kmp_20260320/](./tracks/extract_radio_interface_kmp_20260320/)* diff --git a/conductor/tracks/extract_radio_interface_kmp_20260320/metadata.json b/conductor/tracks/extract_radio_interface_kmp_20260320/metadata.json index b424ea588..736a106ca 100644 --- a/conductor/tracks/extract_radio_interface_kmp_20260320/metadata.json +++ b/conductor/tracks/extract_radio_interface_kmp_20260320/metadata.json @@ -2,7 +2,7 @@ "id": "extract_radio_interface_kmp_20260320", "name": "Extract RadioInterfaceService to KMP", "description": "Unify the connection orchestration lifecycle (TCP, Serial, BLE) into a shared multiplatform service.", - "status": "in_progress", + "status": "completed", "tags": ["core", "service", "kmp", "desktop", "radio", "connection"], "created_at": "2026-03-20T12:00:00Z" } diff --git a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/AndroidRadioInterfaceService.kt b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/AndroidRadioInterfaceService.kt deleted file mode 100644 index c90ae08d0..000000000 --- a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/AndroidRadioInterfaceService.kt +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Copyright (c) 2025-2026 Meshtastic LLC - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.meshtastic.core.network.radio - -import android.app.Application -import android.provider.Settings -import androidx.lifecycle.Lifecycle -import androidx.lifecycle.coroutineScope -import co.touchlab.kermit.Logger -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import org.koin.core.annotation.Named -import org.koin.core.annotation.Single -import org.meshtastic.core.ble.BluetoothRepository -import org.meshtastic.core.common.BuildConfigProvider -import org.meshtastic.core.common.util.BinaryLogFile -import org.meshtastic.core.common.util.handledLaunch -import org.meshtastic.core.common.util.ignoreException -import org.meshtastic.core.common.util.nowMillis -import org.meshtastic.core.common.util.toRemoteExceptions -import org.meshtastic.core.di.CoroutineDispatchers -import org.meshtastic.core.model.ConnectionState -import org.meshtastic.core.model.InterfaceId -import org.meshtastic.core.model.MeshActivity -import org.meshtastic.core.model.util.anonymize -import org.meshtastic.core.network.repository.NetworkRepository -import org.meshtastic.core.repository.PlatformAnalytics -import org.meshtastic.core.repository.RadioInterfaceService -import org.meshtastic.core.repository.RadioPrefs -import org.meshtastic.core.repository.RadioTransport -import org.meshtastic.proto.Heartbeat -import org.meshtastic.proto.ToRadio - -/** - * Handles the bluetooth link with a mesh radio device. Does not cache any device state, just does bluetooth comms - * etc... - * - * This service is not exposed outside of this process. - * - * Note - this class intentionally dumb. It doesn't understand protobuf framing etc... It is designed to be simple so it - * can be stubbed out with a simulated version as needed. - */ -@Suppress("LongParameterList", "TooManyFunctions") -@Single -class AndroidRadioInterfaceService( - private val context: Application, - private val dispatchers: CoroutineDispatchers, - private val bluetoothRepository: BluetoothRepository, - private val networkRepository: NetworkRepository, - private val buildConfigProvider: BuildConfigProvider, - @Named("ProcessLifecycle") private val processLifecycle: Lifecycle, - private val radioPrefs: RadioPrefs, - private val interfaceFactory: Lazy, - private val analytics: PlatformAnalytics, -) : RadioInterfaceService { - - private val _connectionState = MutableStateFlow(ConnectionState.Disconnected) - override val connectionState: StateFlow = _connectionState.asStateFlow() - - override val supportedDeviceTypes: List = - listOf( - org.meshtastic.core.model.DeviceType.BLE, - org.meshtastic.core.model.DeviceType.TCP, - org.meshtastic.core.model.DeviceType.USB, - ) - - private val _receivedData = MutableSharedFlow(extraBufferCapacity = 64) - override val receivedData: SharedFlow = _receivedData - - private val _connectionError = MutableSharedFlow(extraBufferCapacity = 64) - val connectionError: SharedFlow = _connectionError.asSharedFlow() - - // Thread-safe StateFlow for tracking device address changes - private val _currentDeviceAddressFlow = MutableStateFlow(radioPrefs.devAddr.value) - override val currentDeviceAddressFlow: StateFlow = _currentDeviceAddressFlow.asStateFlow() - - private val logSends = false - private val logReceives = false - private lateinit var sentPacketsLog: BinaryLogFile - private lateinit var receivedPacketsLog: BinaryLogFile - - val mockInterfaceAddress: String by lazy { toInterfaceAddress(InterfaceId.MOCK, "") } - - override val serviceScope: CoroutineScope - get() = _serviceScope - - /** We recreate this scope each time we stop an interface */ - private var _serviceScope = CoroutineScope(dispatchers.io + SupervisorJob()) - - private var radioIf: RadioTransport = NopInterface("") - - /** - * true if we have started our interface - * - * Note: an interface may be started without necessarily yet having a connection - */ - private var isStarted = false - - @Volatile private var listenersInitialized = false - - private fun initStateListeners() { - if (listenersInitialized) return - synchronized(this) { - if (listenersInitialized) return - listenersInitialized = true - - radioPrefs.devAddr - .onEach { addr -> - if (_currentDeviceAddressFlow.value != addr) { - _currentDeviceAddressFlow.value = addr - startInterface() - } - } - .launchIn(processLifecycle.coroutineScope) - - bluetoothRepository.state - .onEach { state -> - if (state.enabled) { - startInterface() - } else if (radioIf is BleRadioInterface) { - stopInterface() - } - } - .catch { Logger.e(it) { "bluetoothRepository.state flow crashed!" } } - .launchIn(processLifecycle.coroutineScope) - - networkRepository.networkAvailable - .onEach { state -> - if (state) { - startInterface() - } else if (radioIf is TCPInterface) { - stopInterface() - } - } - .catch { Logger.e(it) { "networkRepository.networkAvailable flow crashed!" } } - .launchIn(processLifecycle.coroutineScope) - } - } - - companion object { - private const val HEARTBEAT_INTERVAL_MILLIS = 30 * 1000L - } - - private var lastHeartbeatMillis = 0L - - fun keepAlive(now: Long = nowMillis) { - if (now - lastHeartbeatMillis > HEARTBEAT_INTERVAL_MILLIS) { - if (radioIf is SerialInterface) { - Logger.i { "Sending ToRadio heartbeat" } - val heartbeat = ToRadio(heartbeat = Heartbeat()) - handleSendToRadio(heartbeat.encode()) - } else { - // For BLE and TCP this will check if the connection is still alive - radioIf.keepAlive() - } - lastHeartbeatMillis = now - } - } - - /** Constructs a full radio address for the specific interface type. */ - override fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = - interfaceFactory.value.toInterfaceAddress(interfaceId, rest) - - override fun isMockInterface(): Boolean = - buildConfigProvider.isDebug || Settings.System.getString(context.contentResolver, "firebase.test.lab") == "true" - - override fun getDeviceAddress(): String? { - // If the user has unpaired our device, treat things as if we don't have one - return _currentDeviceAddressFlow.value - } - - /** - * Like getDeviceAddress, but filtered to return only devices we are currently bonded with - * - * at - * - * where a is either x for bluetooth or s for serial and t is an interface specific address (macaddr or a device - * path) - */ - fun getBondedDeviceAddress(): String? { - // If the user has unpaired our device, treat things as if we don't have one - val address = getDeviceAddress() - return if (interfaceFactory.value.addressValid(address)) { - address - } else { - null - } - } - - private fun broadcastConnectionChanged(newState: ConnectionState) { - Logger.d { "Broadcasting connection state change to $newState" } - processLifecycle.coroutineScope.launch(dispatchers.default) { _connectionState.emit(newState) } - } - - // Send a packet/command out the radio link, this routine can block if it needs to - private fun handleSendToRadio(p: ByteArray) { - radioIf.handleSendToRadio(p) - emitSendActivity() - } - - // Handle an incoming packet from the radio, broadcasts it as an android intent - @Suppress("TooGenericExceptionCaught") - override fun handleFromRadio(bytes: ByteArray) { - if (logReceives) { - try { - receivedPacketsLog.write(bytes) - receivedPacketsLog.flush() - } catch (t: Throwable) { - Logger.w(t) { "Failed to write receive log in handleFromRadio" } - } - } - - try { - processLifecycle.coroutineScope.launch(dispatchers.io) { _receivedData.emit(bytes) } - emitReceiveActivity() - } catch (t: Throwable) { - Logger.e(t) { "RadioInterfaceService.handleFromRadio failed while emitting data" } - } - } - - override fun onConnect() { - if (_connectionState.value != ConnectionState.Connected) { - broadcastConnectionChanged(ConnectionState.Connected) - } - } - - override fun onDisconnect(isPermanent: Boolean, errorMessage: String?) { - if (errorMessage != null) { - processLifecycle.coroutineScope.launch(dispatchers.default) { _connectionError.emit(errorMessage) } - } - val newTargetState = if (isPermanent) ConnectionState.Disconnected else ConnectionState.DeviceSleep - if (_connectionState.value != newTargetState) { - broadcastConnectionChanged(newTargetState) - } - } - - /** Start our configured interface (if it isn't already running) */ - private fun startInterface() { - if (radioIf !is NopInterface) { - // Already running - return - } - - val isTestLab = Settings.System.getString(context.contentResolver, "firebase.test.lab") == "true" - val address = - getBondedDeviceAddress() - ?: if (isTestLab) { - mockInterfaceAddress - } else { - null - } - - if (address == null) { - Logger.w { "No bonded mesh radio, can't start interface" } - } else { - Logger.i { "Starting radio ${address.anonymize}" } - isStarted = true - - if (logSends) { - sentPacketsLog = BinaryLogFile(context, "sent_log.pb") - } - if (logReceives) { - receivedPacketsLog = BinaryLogFile(context, "receive_log.pb") - } - - radioIf = interfaceFactory.value.createInterface(address, this) - startHeartbeat() - } - } - - private var heartbeatJob: kotlinx.coroutines.Job? = null - - private fun startHeartbeat() { - heartbeatJob?.cancel() - heartbeatJob = - serviceScope.launch { - while (true) { - delay(HEARTBEAT_INTERVAL_MILLIS) - keepAlive() - } - } - } - - private fun stopInterface() { - val r = radioIf - Logger.i { "stopping interface $r" } - isStarted = false - radioIf = interfaceFactory.value.nopInterface - r.close() - - // cancel any old jobs and get ready for the new ones - _serviceScope.cancel("stopping interface") - _serviceScope = CoroutineScope(dispatchers.io + SupervisorJob()) - - if (logSends) { - sentPacketsLog.close() - } - if (logReceives) { - receivedPacketsLog.close() - } - - // Don't broadcast disconnects if we were just using the nop device - if (r !is NopInterface) { - onDisconnect(isPermanent = true) // Tell any clients we are now offline - } - } - - /** - * Change to a new device - * - * @return true if the device changed, false if no change - */ - private fun setBondedDeviceAddress(address: String?): Boolean = - if (getBondedDeviceAddress() == address && isStarted && _connectionState.value == ConnectionState.Connected) { - Logger.w { "Ignoring setBondedDevice ${address.anonymize}, because we are already using that device" } - false - } else { - // Record that this use has configured a new radio - analytics.track("mesh_bond") - - // Ignore any errors that happen while closing old device - ignoreException { stopInterface() } - - // The device address "n" can be used to mean none - - Logger.d { "Setting bonded device to ${address.anonymize}" } - - // Stores the address if non-null, otherwise removes the pref - radioPrefs.setDevAddr(address) - _currentDeviceAddressFlow.value = address - - // Force the service to reconnect - startInterface() - true - } - - override fun setDeviceAddress(deviceAddr: String?): Boolean = toRemoteExceptions { - setBondedDeviceAddress(deviceAddr) - } - - /** - * If the service is not currently connected to the radio, try to connect now. At boot the radio interface service - * will not connect to a radio until this call is received. - */ - override fun connect() = toRemoteExceptions { - // We don't start actually talking to our device until MeshService binds to us - this prevents - // broadcasting connection events before MeshService is ready to receive them - startInterface() - initStateListeners() - } - - override fun sendToRadio(bytes: ByteArray) { - // Do this in the IO thread because it might take a while (and we don't care about the result code) - _serviceScope.handledLaunch { handleSendToRadio(bytes) } - } - - private val _meshActivity = - MutableSharedFlow( - extraBufferCapacity = 64, - onBufferOverflow = kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST, - ) - override val meshActivity: SharedFlow = _meshActivity.asSharedFlow() - - private fun emitSendActivity() { - _meshActivity.tryEmit(MeshActivity.Send) - } - - private fun emitReceiveActivity() { - _meshActivity.tryEmit(MeshActivity.Receive) - } -} diff --git a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/AndroidRadioTransportFactory.kt b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/AndroidRadioTransportFactory.kt new file mode 100644 index 000000000..0d2d0b82a --- /dev/null +++ b/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/AndroidRadioTransportFactory.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.network.radio + +import android.content.Context +import android.provider.Settings +import org.koin.core.annotation.Single +import org.meshtastic.core.common.BuildConfigProvider +import org.meshtastic.core.model.DeviceType +import org.meshtastic.core.model.InterfaceId +import org.meshtastic.core.repository.RadioInterfaceService +import org.meshtastic.core.repository.RadioTransport +import org.meshtastic.core.repository.RadioTransportFactory + +/** + * Android implementation of [RadioTransportFactory] delegating to the legacy [InterfaceFactory]. + */ +@Single +class AndroidRadioTransportFactory( + private val context: Context, + private val interfaceFactory: Lazy, + private val buildConfigProvider: BuildConfigProvider, +) : RadioTransportFactory { + + override val supportedDeviceTypes: List = + listOf( + DeviceType.BLE, + DeviceType.TCP, + DeviceType.USB, + ) + + override fun isMockInterface(): Boolean = + buildConfigProvider.isDebug || Settings.System.getString(context.contentResolver, "firebase.test.lab") == "true" + + override fun createTransport(address: String, service: RadioInterfaceService): RadioTransport { + return interfaceFactory.value.createInterface(address, service) + } + + override fun isAddressValid(address: String?): Boolean { + return interfaceFactory.value.addressValid(address) + } + + override fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String { + return interfaceFactory.value.toInterfaceAddress(interfaceId, rest) + } +} diff --git a/core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/TCPInterface.kt b/core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/radio/TCPInterface.kt similarity index 100% rename from core/network/src/androidMain/kotlin/org/meshtastic/core/network/radio/TCPInterface.kt rename to core/network/src/jvmAndroidMain/kotlin/org/meshtastic/core/network/radio/TCPInterface.kt diff --git a/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioTransportFactory.kt b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioTransportFactory.kt new file mode 100644 index 000000000..918657e99 --- /dev/null +++ b/core/repository/src/commonMain/kotlin/org/meshtastic/core/repository/RadioTransportFactory.kt @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.repository + +import org.meshtastic.core.model.DeviceType +import org.meshtastic.core.model.InterfaceId + +/** + * Creates [RadioTransport] instances for specific device addresses. + * + * Implemented per-platform to provide the correct hardware transport (BLE, Serial, TCP). + */ +interface RadioTransportFactory { + /** The device types supported by this factory. */ + val supportedDeviceTypes: List + + /** Whether we are currently forced into using a mock interface (e.g., Firebase Test Lab). */ + fun isMockInterface(): Boolean + + /** Creates a transport for the given [address], or a NOP implementation if invalid/unsupported. */ + fun createTransport(address: String, service: RadioInterfaceService): RadioTransport + + /** Checks if the given [address] represents a valid, supported transport type. */ + fun isAddressValid(address: String?): Boolean + + /** Constructs a full radio address for the specific [interfaceId] and [rest] identifier. */ + fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String +} diff --git a/core/service/build.gradle.kts b/core/service/build.gradle.kts index 6d3eaf0be..2b8afbe91 100644 --- a/core/service/build.gradle.kts +++ b/core/service/build.gradle.kts @@ -32,14 +32,19 @@ kotlin { sourceSets { commonMain.dependencies { + api(projects.core.repository) implementation(projects.core.common) implementation(projects.core.data) implementation(projects.core.database) + implementation(projects.core.di) implementation(projects.core.model) implementation(projects.core.navigation) + implementation(projects.core.network) + implementation(projects.core.ble) implementation(projects.core.prefs) implementation(projects.core.proto) + implementation(libs.jetbrains.lifecycle.runtime) implementation(libs.kotlinx.coroutines.core) implementation(libs.kermit) } diff --git a/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt b/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt new file mode 100644 index 000000000..e83100048 --- /dev/null +++ b/core/service/src/commonMain/kotlin/org/meshtastic/core/service/SharedRadioInterfaceService.kt @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.service + +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.coroutineScope +import co.touchlab.kermit.Logger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import org.koin.core.annotation.Named +import org.koin.core.annotation.Single +import org.meshtastic.core.ble.BluetoothRepository +import org.meshtastic.core.common.util.handledLaunch +import org.meshtastic.core.common.util.ignoreException +import org.meshtastic.core.common.util.nowMillis +import org.meshtastic.core.di.CoroutineDispatchers +import org.meshtastic.core.model.ConnectionState +import org.meshtastic.core.model.DeviceType +import org.meshtastic.core.model.InterfaceId +import org.meshtastic.core.model.MeshActivity +import org.meshtastic.core.model.util.anonymize +import org.meshtastic.core.network.repository.NetworkRepository +import org.meshtastic.core.repository.PlatformAnalytics +import org.meshtastic.core.repository.RadioInterfaceService +import org.meshtastic.core.repository.RadioPrefs +import org.meshtastic.core.repository.RadioTransport +import org.meshtastic.core.repository.RadioTransportFactory + +/** + * Shared multiplatform connection orchestrator for Meshtastic radios. + * + * Manages the connection lifecycle (connect, active, disconnect, reconnect loop), device address state flows, + * and hardware state observability (BLE/Network toggles). Delegates the actual raw byte transport mapping to + * a platform-specific [RadioTransportFactory]. + */ +@Suppress("LongParameterList", "TooManyFunctions") +@Single +class SharedRadioInterfaceService( + private val dispatchers: CoroutineDispatchers, + private val bluetoothRepository: BluetoothRepository, + private val networkRepository: NetworkRepository, + @Named("ProcessLifecycle") private val processLifecycle: Lifecycle, + private val radioPrefs: RadioPrefs, + private val transportFactory: RadioTransportFactory, + private val analytics: PlatformAnalytics, +) : RadioInterfaceService { + + override val supportedDeviceTypes: List + get() = transportFactory.supportedDeviceTypes + + private val _connectionState = MutableStateFlow(ConnectionState.Disconnected) + override val connectionState: StateFlow = _connectionState.asStateFlow() + + private val _currentDeviceAddressFlow = MutableStateFlow(radioPrefs.devAddr.value) + override val currentDeviceAddressFlow: StateFlow = _currentDeviceAddressFlow.asStateFlow() + + private val _receivedData = MutableSharedFlow(extraBufferCapacity = 64) + override val receivedData: SharedFlow = _receivedData + + private val _meshActivity = + MutableSharedFlow( + extraBufferCapacity = 64, + onBufferOverflow = kotlinx.coroutines.channels.BufferOverflow.DROP_OLDEST, + ) + override val meshActivity: SharedFlow = _meshActivity.asSharedFlow() + + private val _connectionError = MutableSharedFlow(extraBufferCapacity = 64) + val connectionError: SharedFlow = _connectionError.asSharedFlow() + + override val serviceScope: CoroutineScope + get() = _serviceScope + + private var _serviceScope = CoroutineScope(dispatchers.io + SupervisorJob()) + private var radioIf: RadioTransport? = null + private var isStarted = false + @Volatile private var listenersInitialized = false + private var heartbeatJob: kotlinx.coroutines.Job? = null + private var lastHeartbeatMillis = 0L + + companion object { + private const val HEARTBEAT_INTERVAL_MILLIS = 30 * 1000L + } + + private fun initStateListeners() { + if (listenersInitialized) return + synchronized(this) { + if (listenersInitialized) return + listenersInitialized = true + + radioPrefs.devAddr + .onEach { addr -> + if (_currentDeviceAddressFlow.value != addr) { + _currentDeviceAddressFlow.value = addr + startInterface() + } + } + .launchIn(processLifecycle.coroutineScope) + + bluetoothRepository.state + .onEach { state -> + if (state.enabled) { + startInterface() + } else if (getBondedDeviceAddress()?.startsWith(InterfaceId.BLUETOOTH.id) == true) { + stopInterface() + } + } + .catch { Logger.e(it) { "bluetoothRepository.state flow crashed!" } } + .launchIn(processLifecycle.coroutineScope) + + networkRepository.networkAvailable + .onEach { state -> + if (state) { + startInterface() + } else if (getBondedDeviceAddress()?.startsWith(InterfaceId.TCP.id) == true) { + stopInterface() + } + } + .catch { Logger.e(it) { "networkRepository.networkAvailable flow crashed!" } } + .launchIn(processLifecycle.coroutineScope) + } + } + + override fun connect() { + startInterface() + initStateListeners() + } + + override fun isMockInterface(): Boolean = transportFactory.isMockInterface() + + override fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = + transportFactory.toInterfaceAddress(interfaceId, rest) + + override fun getDeviceAddress(): String? = _currentDeviceAddressFlow.value + + private fun getBondedDeviceAddress(): String? { + val address = getDeviceAddress() + return if (transportFactory.isAddressValid(address)) { + address + } else { + null + } + } + + override fun setDeviceAddress(deviceAddr: String?): Boolean { + val sanitized = if (deviceAddr == "n" || deviceAddr.isNullOrBlank()) null else deviceAddr + + if (getBondedDeviceAddress() == sanitized && isStarted && _connectionState.value == ConnectionState.Connected) { + Logger.w { "Ignoring setBondedDevice ${sanitized?.anonymize}, already using that device" } + return false + } + + analytics.track("mesh_bond") + ignoreException { stopInterface() } + + Logger.d { "Setting bonded device to ${sanitized?.anonymize}" } + radioPrefs.setDevAddr(sanitized) + _currentDeviceAddressFlow.value = sanitized + startInterface() + return true + } + + private fun startInterface() { + if (radioIf != null) return + + val address = getBondedDeviceAddress() + ?: if (isMockInterface()) transportFactory.toInterfaceAddress(InterfaceId.MOCK, "") else null + + if (address == null) { + Logger.w { "No valid address to connect to." } + return + } + + Logger.i { "Starting radio interface for ${address.anonymize}" } + isStarted = true + radioIf = transportFactory.createTransport(address, this) + startHeartbeat() + } + + private fun stopInterface() { + val currentIf = radioIf + Logger.i { "Stopping interface $currentIf" } + isStarted = false + radioIf = null + currentIf?.close() + + _serviceScope.cancel("stopping interface") + _serviceScope = CoroutineScope(dispatchers.io + SupervisorJob()) + + if (currentIf != null) { + onDisconnect(isPermanent = true) + } + } + + private fun startHeartbeat() { + heartbeatJob?.cancel() + heartbeatJob = serviceScope.launch { + while (true) { + delay(HEARTBEAT_INTERVAL_MILLIS) + keepAlive() + } + } + } + + fun keepAlive(now: Long = nowMillis) { + if (now - lastHeartbeatMillis > HEARTBEAT_INTERVAL_MILLIS) { + radioIf?.keepAlive() + lastHeartbeatMillis = now + } + } + + override fun sendToRadio(bytes: ByteArray) { + _serviceScope.handledLaunch { + radioIf?.handleSendToRadio(bytes) + _meshActivity.tryEmit(MeshActivity.Send) + } + } + + @Suppress("TooGenericExceptionCaught") + override fun handleFromRadio(bytes: ByteArray) { + try { + processLifecycle.coroutineScope.launch(dispatchers.io) { _receivedData.emit(bytes) } + _meshActivity.tryEmit(MeshActivity.Receive) + } catch (t: Throwable) { + Logger.e(t) { "RadioInterfaceService.handleFromRadio failed while emitting data" } + } + } + + override fun onConnect() { + if (_connectionState.value != ConnectionState.Connected) { + Logger.d { "Broadcasting connection state change to Connected" } + processLifecycle.coroutineScope.launch(dispatchers.default) { + _connectionState.emit(ConnectionState.Connected) + } + } + } + + override fun onDisconnect(isPermanent: Boolean, errorMessage: String?) { + if (errorMessage != null) { + processLifecycle.coroutineScope.launch(dispatchers.default) { _connectionError.emit(errorMessage) } + } + val newTargetState = if (isPermanent) ConnectionState.Disconnected else ConnectionState.DeviceSleep + if (_connectionState.value != newTargetState) { + Logger.d { "Broadcasting connection state change to $newTargetState" } + processLifecycle.coroutineScope.launch(dispatchers.default) { _connectionState.emit(newTargetState) } + } + } +} diff --git a/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt b/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt index 2bc65cb0b..6b81b2c3d 100644 --- a/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt +++ b/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopKoinModule.kt @@ -38,10 +38,10 @@ import org.meshtastic.core.repository.MeshServiceNotifications import org.meshtastic.core.repository.MeshWorkerManager import org.meshtastic.core.repository.MessageQueue import org.meshtastic.core.repository.PlatformAnalytics -import org.meshtastic.core.repository.RadioInterfaceService +import org.meshtastic.core.repository.RadioTransportFactory import org.meshtastic.core.repository.ServiceBroadcasts import org.meshtastic.core.repository.ServiceRepository -import org.meshtastic.desktop.radio.DesktopRadioInterfaceService +import org.meshtastic.desktop.radio.DesktopRadioTransportFactory import org.meshtastic.desktop.stub.NoopAppWidgetUpdater import org.meshtastic.desktop.stub.NoopCompassHeadingProvider import org.meshtastic.desktop.stub.NoopLocationRepository @@ -112,10 +112,9 @@ fun desktopModule() = module { @Suppress("LongMethod") private fun desktopPlatformStubsModule() = module { single { org.meshtastic.core.service.ServiceRepositoryImpl() } - single { - DesktopRadioInterfaceService( + single { + DesktopRadioTransportFactory( dispatchers = get(), - radioPrefs = get(), scanner = get(), bluetoothRepository = get(), connectionFactory = get(), diff --git a/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopPlatformModule.kt b/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopPlatformModule.kt index 384a102ac..6b966f959 100644 --- a/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopPlatformModule.kt +++ b/desktop/src/main/kotlin/org/meshtastic/desktop/di/DesktopPlatformModule.kt @@ -42,7 +42,6 @@ import org.meshtastic.proto.ChannelSet import org.meshtastic.proto.LocalConfig import org.meshtastic.proto.LocalModuleConfig import org.meshtastic.proto.LocalStats -import java.io.File /** * Resolves the desktop data directory for persistent storage (DataStore files, Room database). Defaults to diff --git a/desktop/src/main/kotlin/org/meshtastic/desktop/radio/DesktopRadioInterfaceService.kt b/desktop/src/main/kotlin/org/meshtastic/desktop/radio/DesktopRadioInterfaceService.kt deleted file mode 100644 index c4defd7d1..000000000 --- a/desktop/src/main/kotlin/org/meshtastic/desktop/radio/DesktopRadioInterfaceService.kt +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Copyright (c) 2025-2026 Meshtastic LLC - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.meshtastic.desktop.radio - -import co.touchlab.kermit.Logger -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import org.meshtastic.core.common.util.handledLaunch -import org.meshtastic.core.di.CoroutineDispatchers -import org.meshtastic.core.model.ConnectionState -import org.meshtastic.core.model.InterfaceId -import org.meshtastic.core.model.MeshActivity -import org.meshtastic.core.model.util.anonymize -import org.meshtastic.core.network.transport.TcpTransport -import org.meshtastic.core.repository.RadioInterfaceService -import org.meshtastic.core.repository.RadioPrefs - -/** - * Desktop implementation of [RadioInterfaceService] with real TCP transport. - * - * Delegates all TCP socket management, stream framing, reconnect logic, and heartbeat to the shared [TcpTransport] from - * `core:network`. Desktop supports TCP and BLE connections. - */ -@Suppress("TooManyFunctions") -class DesktopRadioInterfaceService( - private val dispatchers: CoroutineDispatchers, - private val radioPrefs: RadioPrefs, - private val scanner: org.meshtastic.core.ble.BleScanner, - private val bluetoothRepository: org.meshtastic.core.ble.BluetoothRepository, - private val connectionFactory: org.meshtastic.core.ble.BleConnectionFactory, -) : RadioInterfaceService { - - override val supportedDeviceTypes: List = - listOf( - org.meshtastic.core.model.DeviceType.TCP, - org.meshtastic.core.model.DeviceType.BLE, - org.meshtastic.core.model.DeviceType.USB, - ) - - private val _connectionState = MutableStateFlow(ConnectionState.Disconnected) - override val connectionState: StateFlow = _connectionState.asStateFlow() - - private val _currentDeviceAddressFlow = MutableStateFlow(radioPrefs.devAddr.value) - override val currentDeviceAddressFlow: StateFlow = _currentDeviceAddressFlow.asStateFlow() - - private val _receivedData = MutableSharedFlow(extraBufferCapacity = 64) - override val receivedData: SharedFlow = _receivedData - - private val _meshActivity = - MutableSharedFlow(extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST) - override val meshActivity: SharedFlow = _meshActivity.asSharedFlow() - - override var serviceScope: CoroutineScope = CoroutineScope(dispatchers.io + SupervisorJob()) - private set - - private var transport: TcpTransport? = null - private var bleTransport: DesktopBleInterface? = null - private var serialTransport: org.meshtastic.core.network.SerialTransport? = null - - init { - // Observe radioPrefs to handle asynchronous loads from DataStore - radioPrefs.devAddr - .onEach { addr -> - if (_currentDeviceAddressFlow.value != addr) { - _currentDeviceAddressFlow.value = addr - } - // Auto-connect if we have a valid address and are disconnected - if (addr != null && _connectionState.value == ConnectionState.Disconnected) { - Logger.i { "DesktopRadio: Auto-connecting to saved address ${addr.anonymize}" } - startConnection(addr) - } - } - .launchIn(serviceScope) - } - - override fun isMockInterface(): Boolean = false - - override fun getDeviceAddress(): String? = _currentDeviceAddressFlow.value - - // region RadioInterfaceService Implementation - - override fun connect() { - val address = getDeviceAddress() - if (address.isNullOrBlank() || address == "n") { - Logger.w { "DesktopRadio: No address configured, skipping connect" } - return - } - startConnection(address) - } - - override fun setDeviceAddress(deviceAddr: String?): Boolean { - val sanitized = if (deviceAddr == "n" || deviceAddr.isNullOrBlank()) null else deviceAddr - - if (_currentDeviceAddressFlow.value == sanitized && _connectionState.value == ConnectionState.Connected) { - Logger.w { "DesktopRadio: Already connected to ${sanitized?.anonymize}, ignoring" } - return false - } - - Logger.i { "DesktopRadio: Setting device address to ${sanitized?.anonymize}" } - - // Stop any existing connection - stopInterface() - - // Persist and update address - radioPrefs.setDevAddr(sanitized) - _currentDeviceAddressFlow.value = sanitized - - // Start connection if we have a valid address - if (sanitized != null && sanitized != "n") { - startConnection(sanitized) - } - return true - } - - override fun sendToRadio(bytes: ByteArray) { - serviceScope.handledLaunch { - transport?.sendPacket(bytes) - bleTransport?.handleSendToRadio(bytes) - serialTransport?.handleSendToRadio(bytes) - } - } - - override fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = "${interfaceId.id}$rest" - - override fun onConnect() { - if (_connectionState.value != ConnectionState.Connected) { - Logger.i { "DesktopRadio: Connected" } - _connectionState.value = ConnectionState.Connected - } - } - - override fun onDisconnect(isPermanent: Boolean, errorMessage: String?) { - val newState = if (isPermanent) ConnectionState.Disconnected else ConnectionState.DeviceSleep - if (_connectionState.value != newState) { - Logger.i { "DesktopRadio: Disconnected (permanent=$isPermanent, error=$errorMessage)" } - _connectionState.value = newState - } - } - - override fun handleFromRadio(bytes: ByteArray) { - serviceScope.launch(dispatchers.io) { - _receivedData.emit(bytes) - _meshActivity.tryEmit(MeshActivity.Receive) - } - } - - // endregion - - // region Connection Management - - private fun startConnection(address: String) { - if (address.startsWith("t")) { - startTcpConnection(address.removePrefix("t")) - } else if (address.startsWith("s")) { - startSerialConnection(address.removePrefix("s")) - } else if (address.startsWith("x")) { - startBleConnection(address.removePrefix("x")) - } else { - // Assume BLE if no prefix, or prefix is not supported - val stripped = if (address.startsWith("!")) address.removePrefix("!") else address - startBleConnection(stripped) - } - } - - private fun startSerialConnection(portName: String) { - transport?.stop() - bleTransport?.close() - serialTransport?.close() - - val serial = org.meshtastic.core.network.SerialTransport(portName = portName, service = this) - serialTransport = serial - if (!serial.startConnection()) { - onDisconnect(isPermanent = true, errorMessage = "Failed to connect to $portName") - } - } - - private fun startBleConnection(address: String) { - transport?.stop() - bleTransport?.close() - - bleTransport = - DesktopBleInterface( - serviceScope = serviceScope, - scanner = scanner, - bluetoothRepository = bluetoothRepository, - connectionFactory = connectionFactory, - service = this, - address = address, - ) - } - - private fun startTcpConnection(address: String) { - transport?.stop() - - val tcpTransport = - TcpTransport( - dispatchers = dispatchers, - scope = serviceScope, - listener = - object : TcpTransport.Listener { - override fun onConnected() { - onConnect() - } - - override fun onDisconnected() { - onDisconnect(isPermanent = true) - } - - override fun onPacketReceived(bytes: ByteArray) { - handleFromRadio(bytes) - } - }, - logTag = "DesktopRadio", - ) - transport = tcpTransport - tcpTransport.start(address) - } - - private fun stopInterface() { - transport?.stop() - transport = null - - bleTransport?.close() - bleTransport = null - - serialTransport?.close() - serialTransport = null - - // Recreate the service scope - serviceScope.cancel("stopping interface") - serviceScope = CoroutineScope(dispatchers.io + SupervisorJob()) - } - - // endregion -} diff --git a/desktop/src/main/kotlin/org/meshtastic/desktop/radio/DesktopRadioTransportFactory.kt b/desktop/src/main/kotlin/org/meshtastic/desktop/radio/DesktopRadioTransportFactory.kt new file mode 100644 index 000000000..2ef563504 --- /dev/null +++ b/desktop/src/main/kotlin/org/meshtastic/desktop/radio/DesktopRadioTransportFactory.kt @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.desktop.radio + +import org.meshtastic.core.ble.BleConnectionFactory +import org.meshtastic.core.ble.BleScanner +import org.meshtastic.core.ble.BluetoothRepository +import org.meshtastic.core.di.CoroutineDispatchers +import org.meshtastic.core.model.DeviceType +import org.meshtastic.core.model.InterfaceId +import org.meshtastic.core.network.SerialTransport +import org.meshtastic.core.network.radio.TCPInterface +import org.meshtastic.core.repository.RadioInterfaceService +import org.meshtastic.core.repository.RadioTransport +import org.meshtastic.core.repository.RadioTransportFactory + +class DesktopRadioTransportFactory( + private val scanner: BleScanner, + private val bluetoothRepository: BluetoothRepository, + private val connectionFactory: BleConnectionFactory, + private val dispatchers: CoroutineDispatchers, +) : RadioTransportFactory { + + override val supportedDeviceTypes: List = listOf( + DeviceType.TCP, + DeviceType.BLE, + DeviceType.USB, + ) + + override fun isMockInterface(): Boolean = false + + override fun isAddressValid(address: String?): Boolean { + val spec = address?.getOrNull(0) ?: return false + return spec == InterfaceId.TCP.id || + spec == InterfaceId.SERIAL.id || + spec == InterfaceId.BLUETOOTH.id || + address.startsWith("!") + } + + override fun toInterfaceAddress(interfaceId: InterfaceId, rest: String): String = "${interfaceId.id}$rest" + + override fun createTransport(address: String, service: RadioInterfaceService): RadioTransport { + return if (address.startsWith(InterfaceId.TCP.id)) { + TCPInterface(service, dispatchers, address.removePrefix(InterfaceId.TCP.id.toString())) + } else if (address.startsWith(InterfaceId.SERIAL.id)) { + SerialTransport(address.removePrefix(InterfaceId.SERIAL.id.toString()), service) + } else if (address.startsWith(InterfaceId.BLUETOOTH.id)) { + DesktopBleInterface( + serviceScope = service.serviceScope, + scanner = scanner, + bluetoothRepository = bluetoothRepository, + connectionFactory = connectionFactory, + service = service, + address = address.removePrefix(InterfaceId.BLUETOOTH.id.toString()) + ) + } else { + val stripped = if (address.startsWith("!")) address.removePrefix("!") else address + DesktopBleInterface( + serviceScope = service.serviceScope, + scanner = scanner, + bluetoothRepository = bluetoothRepository, + connectionFactory = connectionFactory, + service = service, + address = stripped + ) + } + } +}