refactor(ble): Refactor NordicBleInterface for clarity and stability (#3653)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich 2025-11-10 19:11:29 -06:00 committed by GitHub
parent 28590bfcdf
commit 3a6834329a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 187 additions and 165 deletions

View file

@ -24,20 +24,34 @@ import android.content.Context
import dagger.Module
import dagger.Provides
import dagger.hilt.InstallIn
import dagger.hilt.android.qualifiers.ApplicationContext
import dagger.hilt.components.SingletonComponent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import no.nordicsemi.kotlin.ble.client.android.CentralManager
import no.nordicsemi.kotlin.ble.client.android.native
import javax.inject.Singleton
@Module
@InstallIn(SingletonComponent::class)
interface BluetoothRepositoryModule {
companion object {
@Provides
fun provideBluetoothManager(application: Application): BluetoothManager? {
return application.getSystemService(Context.BLUETOOTH_SERVICE) as BluetoothManager?
}
fun provideBluetoothManager(application: Application): BluetoothManager? =
application.getSystemService(Context.BLUETOOTH_SERVICE) as BluetoothManager?
@Provides fun provideBluetoothAdapter(service: BluetoothManager?): BluetoothAdapter? = service?.adapter
@Provides
fun provideBluetoothAdapter(service: BluetoothManager?): BluetoothAdapter? {
return service?.adapter
}
@Singleton
fun provideCentralManager(
@ApplicationContext context: Context,
coroutineScope: CoroutineScope,
): CentralManager = CentralManager.native(context, coroutineScope)
@Provides
@Singleton
fun provideSingletonCoroutineScope(): CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
}
}
}

View file

@ -18,28 +18,33 @@
package com.geeksville.mesh.repository.radio
import android.annotation.SuppressLint
import android.app.Application
import com.geeksville.mesh.repository.radio.BleUuidConstants.BTM_FROMNUM_CHARACTER
import com.geeksville.mesh.repository.radio.BleUuidConstants.BTM_FROMRADIO_CHARACTER
import com.geeksville.mesh.repository.radio.BleUuidConstants.BTM_SERVICE_UUID
import com.geeksville.mesh.repository.radio.BleUuidConstants.BTM_TORADIO_CHARACTER
import com.geeksville.mesh.service.RadioNotConnectedException
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import no.nordicsemi.kotlin.ble.client.RemoteCharacteristic
import no.nordicsemi.kotlin.ble.client.android.CentralManager
import no.nordicsemi.kotlin.ble.client.android.ConnectionPriority
import no.nordicsemi.kotlin.ble.client.android.Peripheral
import no.nordicsemi.kotlin.ble.client.android.native
import no.nordicsemi.kotlin.ble.core.WriteType
import timber.log.Timber
import java.util.UUID
@ -52,7 +57,8 @@ import kotlin.uuid.toKotlinUuid
*
* This class is responsible for connecting to and communicating with a Meshtastic device over BLE.
*
* @param context The application context.
* @param serviceScope The coroutine scope to use for launching coroutines.
* @param centralManager The central manager provided by Nordic BLE Library.
* @param service The [RadioInterfaceService] to use for handling radio events.
* @param address The BLE address of the device to connect to.
*/
@ -60,102 +66,86 @@ import kotlin.uuid.toKotlinUuid
class NordicBleInterface
@AssistedInject
constructor(
private val context: Application,
private val serviceScope: CoroutineScope,
private val centralManager: CentralManager,
private val service: RadioInterfaceService,
@Assisted val address: String,
) : IRadioInterface {
private var peripheral: Peripheral? = null
private val localScope: CoroutineScope
get() = service.serviceScope
private val connectionScope = CoroutineScope(serviceScope.coroutineContext + SupervisorJob())
private val drainMutex = Mutex()
private lateinit var centralManager: CentralManager
private var peripheral: Peripheral? = null
private var toRadioCharacteristic: RemoteCharacteristic? = null
private var fromNumCharacteristic: RemoteCharacteristic? = null
private var fromRadioCharacteristic: RemoteCharacteristic? = null
private fun packetQueueFlow(): Flow<ByteArray> = channelFlow {
while (isActive) {
val packet = fromRadioCharacteristic?.read()
if (packet == null || packet.isEmpty()) {
break
}
send(packet)
delay(INTER_READ_DELAY_MS)
}
}
private fun drainPacketQueueAndDispatch(source: String) {
var drainedCount = 0
packetQueueFlow()
.onEach { packet ->
drainedCount++
Timber.d(
"[$source] Read packet queue returned ${packet.size} bytes: ${
packet.joinToString(
prefix = "[",
postfix = "]",
) { b ->
String.format("0x%02x", b)
}
} - dispatching to service.handleFromRadio()",
)
dispatchPacket(packet, source)
}
.catch { ex -> Timber.w(ex, "Exception while draining packet queue (source=$source)") }
.onCompletion {
if (drainedCount > 0) {
Timber.d("[$source] Drained $drainedCount packets from packet queue")
}
}
.launchIn(localScope)
}
private fun dispatchPacket(packet: ByteArray, source: String) {
try {
if (service.serviceScope.coroutineContext[Job]?.isActive == true) {
service.serviceScope.launch { service.handleFromRadio(p = packet) }
} else {
Timber.w(
"service.serviceScope not active while dispatching from packet queue (source=$source); using localScope as fallback",
)
localScope.launch { service.handleFromRadio(p = packet) }
}
} catch (t: Throwable) {
Timber.e(t, "Failed to schedule service.handleFromRadio (source=$source)")
}
}
companion object {
val BTM_SERVICE_UUID: UUID = UUID.fromString("6ba1b218-15a8-461f-9fa8-5dcae273eafd")
val BTM_TORADIO_CHARACTER: UUID = UUID.fromString("f75c76d2-129e-4dad-a1dd-7866124401e7")
val BTM_FROMNUM_CHARACTER: UUID = UUID.fromString("ed9da18c-a800-4f66-a670-aa7547e34453")
val BTM_FROMRADIO_CHARACTER: UUID = UUID.fromString("2c55e69e-4993-11ed-b878-0242ac120002")
private const val INTER_READ_DELAY_MS: Long = 5L
private const val POST_WRITE_DELAY_MS: Long = 25L
}
init {
connect()
}
// --- Packet Flow Management ---
private fun packetQueueFlow(): Flow<ByteArray> = channelFlow {
while (isActive) {
// Use safe call and Elvis operator for cleaner loop termination if read fails or returns empty
val packet =
fromRadioCharacteristic?.read()?.takeIf { it.isNotEmpty() }
?: run {
Timber.d("Packet queue drain complete (empty or null read)")
break
}
send(packet)
}
}
private fun dispatchPacket(packet: ByteArray, source: String) {
connectionScope.launch {
try {
service.handleFromRadio(p = packet)
} catch (t: Throwable) {
Timber.e(t, "Failed to schedule service.handleFromRadio (source=$source)")
}
}
}
private suspend fun drainPacketQueueAndDispatch(source: String) {
drainMutex.withLock {
var drainedCount = 0
packetQueueFlow()
.onEach { packet ->
drainedCount++
logPacketRead(source, packet)
dispatchPacket(packet, source)
}
.catch { ex -> Timber.w(ex, "Exception while draining packet queue (source=$source)") }
.onCompletion {
if (drainedCount > 0) {
Timber.d("[$source] Drained $drainedCount packets from packet queue")
}
}
.collect()
}
}
// --- Connection & Discovery Logic ---
private suspend fun findPeripheral(): Peripheral =
centralManager.scan().mapNotNull { it.peripheral }.firstOrNull { it.address == address }
?: throw RadioNotConnectedException("Device not found")
centralManager.getBondedPeripherals().firstOrNull { it.address == address }
?: throw RadioNotConnectedException("Device not found at address $address")
private fun connect() {
localScope.launch {
connectionScope.launch {
try {
centralManager = CentralManager.native(context, localScope)
peripheral = findAndConnectPeripheral()
peripheral?.let {
onConnected()
observePeripheralChanges()
discoverServicesAndSetupCharacteristics(it)
}
} catch (e: Exception) {
Timber.e(e, "Error during connection setup")
Timber.e(e, "Failed to connect to peripheral $address")
service.onDisconnect(false)
}
}
@ -171,30 +161,48 @@ constructor(
return p
}
private suspend fun onConnected() {
try {
peripheral?.let { p ->
val rssi = p.readRssi()
Timber.d("Peripheral $address: RSSI: $rssi dBm")
val phyInUse = p.readPhy()
Timber.d("Peripheral $address: PHY in use: $phyInUse")
}
} catch (e: Exception) {
Timber.w(e, "Failed to read initial connection properties for $address")
}
}
private fun observePeripheralChanges() {
peripheral?.let { p ->
p.phy.onEach { phy -> Timber.d("PHY changed to $phy") }.launchIn(localScope)
p.connectionParameters.onEach { Timber.d("Connection parameters changed to $it") }.launchIn(localScope)
p.phy.onEach { phy -> Timber.d("Peripheral $address: PHY changed to $phy") }.launchIn(connectionScope)
p.connectionParameters
.onEach { Timber.d("Peripheral $address: Connection parameters changed to $it") }
.launchIn(connectionScope)
p.state
.onEach { state ->
Timber.d("Peripheral state changed to $state")
Timber.d("Peripheral $address: State changed to $state")
if (!state.isConnected) {
toRadioCharacteristic = null
service.onDisconnect(false)
}
}
.launchIn(localScope)
.launchIn(connectionScope)
}
centralManager.state.onEach { state -> Timber.d("CentralManager state changed to $state") }.launchIn(localScope)
centralManager.state
.onEach { state -> Timber.d("CentralManager state changed to $state") }
.launchIn(connectionScope)
}
@OptIn(ExperimentalUuidApi::class)
private fun discoverServicesAndSetupCharacteristics(peripheral: Peripheral) {
localScope.launch {
connectionScope.launch {
peripheral
.services(listOf(BTM_SERVICE_UUID.toKotlinUuid()))
.onEach { services ->
val meshtasticService = services?.find { it.uuid == BTM_SERVICE_UUID.toKotlinUuid() }
if (meshtasticService != null) {
toRadioCharacteristic =
meshtasticService.characteristics.find { it.uuid == BTM_TORADIO_CHARACTER.toKotlinUuid() }
@ -204,109 +212,105 @@ constructor(
meshtasticService.characteristics.find { it.uuid == BTM_FROMRADIO_CHARACTER.toKotlinUuid() }
if (
toRadioCharacteristic == null ||
fromNumCharacteristic == null ||
fromRadioCharacteristic == null
listOf(toRadioCharacteristic, fromNumCharacteristic, fromRadioCharacteristic).all {
it != null
}
) {
Timber.e("Critical: Meshtastic characteristics not found! Cannot connect.")
service.onDisconnect(false)
} else {
logCharacteristicInfo()
setupNotifications()
service.onConnect()
} else {
Timber.w("One or more characteristics not found on peripheral $address")
service.onDisconnect(false)
}
} else {
Timber.w("Meshtastic service not found on peripheral $address")
service.onDisconnect(false)
}
}
.launchIn(localScope)
.launchIn(connectionScope)
}
}
@OptIn(ExperimentalUuidApi::class)
private fun logCharacteristicInfo() {
try {
Timber.d(
"toRadioCharacteristic discovered: uuid=${toRadioCharacteristic?.uuid} instanceId=${toRadioCharacteristic?.instanceId}",
)
} catch (_: Throwable) {
Timber.d("toRadioCharacteristic discovered (minimal info)")
}
try {
Timber.d(
"fromNumCharacteristic discovered: uuid=${fromNumCharacteristic?.uuid} instanceId=${fromNumCharacteristic?.instanceId}",
)
Timber.d(
"fromRadioCharacteristic discovered (packet queue): uuid=${fromRadioCharacteristic?.uuid} instanceId=${fromRadioCharacteristic?.instanceId}",
)
} catch (_: Throwable) {
Timber.d("fromRadioCharacteristic discovered (minimal info)")
}
}
// --- Notification Setup ---
@OptIn(ExperimentalUuidApi::class)
private suspend fun setupNotifications() {
fromNumCharacteristic
?.subscribe()
?.onEach { notifyBytes ->
try {
Timber.d(
"FROMNUM notify, ${notifyBytes.size} bytes: ${
notifyBytes.joinToString(
prefix = "[",
postfix = "]",
) { b -> String.format("0x%02x", b) }
} - reading packet queue",
)
drainPacketQueueAndDispatch("notify")
} catch (ex: Exception) {
Timber.e(ex, "Error handling incoming FROMNUM notify")
}
logFromNumNotification(notifyBytes)
connectionScope.launch { drainPacketQueueAndDispatch("notify") }
}
?.catch { e -> Timber.e(e, "Error in subscribe flow for fromNumCharacteristic") }
?.onCompletion { cause -> Timber.d("fromNum subscribe flow completed, cause=$cause") }
?.launchIn(scope = localScope)
service.onConnect()
?.launchIn(scope = connectionScope)
}
// --- IRadioInterface Implementation ---
/**
* Sends a packet to the radio.
*
* @param p The packet to send.
*/
override fun handleSendToRadio(p: ByteArray) {
val characteristic = toRadioCharacteristic
if (peripheral == null || characteristic == null) {
return
}
toRadioCharacteristic?.let { characteristic ->
if (peripheral == null) return@let
localScope.launch {
try {
characteristic.write(p, writeType = WriteType.WITHOUT_RESPONSE)
localScope.launch {
delay(POST_WRITE_DELAY_MS)
connectionScope.launch {
try {
characteristic.write(p, writeType = WriteType.WITHOUT_RESPONSE)
// Post-write action initiation
drainPacketQueueAndDispatch("post-write")
} catch (e: Exception) {
Timber.e(e, "Failed to write packet to $address")
}
} catch (e: Exception) {
Timber.e(e, "Error writing to characteristic")
}
}
} ?: Timber.w("toRadioCharacteristic not available when attempting to send data to $address")
}
/** Closes the connection to the device. */
override fun close() {
val fn = fromNumCharacteristic
localScope.launch {
try {
fn?.setNotifying(false)
} catch (ex: Exception) {
Timber.w(ex, "Error disabling notifications on close")
}
try {
peripheral?.disconnect()
} catch (ex: Exception) {
Timber.w(ex, "Error while closing NordicBleInterface")
}
runBlocking {
connectionScope.cancel()
peripheral?.disconnect()
service.onDisconnect(true)
}
toRadioCharacteristic = null
fromNumCharacteristic = null
fromRadioCharacteristic = null
}
// --- Logging Helpers ---
@OptIn(ExperimentalUuidApi::class)
private fun logCharacteristicInfo() {
Timber.d(
"toRadioCharacteristic discovered: uuid=${toRadioCharacteristic?.uuid} instanceId=${toRadioCharacteristic?.instanceId}",
)
Timber.d(
"fromNumCharacteristic discovered: uuid=${fromNumCharacteristic?.uuid} instanceId=${fromNumCharacteristic?.instanceId}",
)
Timber.d(
"fromRadioCharacteristic discovered (packet queue): uuid=${fromRadioCharacteristic?.uuid} instanceId=${fromRadioCharacteristic?.instanceId}",
)
}
private fun logPacketRead(source: String, packet: ByteArray) {
val hexString = packet.joinToString(prefix = "[", postfix = "]") { b -> String.format("0x%02x", b) }
Timber.d(
"[$source] Read packet queue returned ${packet.size}" +
" bytes: $hexString - dispatching to service.handleFromRadio()",
)
}
private fun logFromNumNotification(notifyBytes: ByteArray) {
val hexString = notifyBytes.joinToString(prefix = "[", postfix = "]") { b -> String.format("0x%02x", b) }
Timber.d("FROMNUM notify, ${notifyBytes.size} bytes: $hexString - reading packet queue")
}
}
object BleUuidConstants {
val BTM_SERVICE_UUID: UUID = UUID.fromString("6ba1b218-15a8-461f-9fa8-5dcae273eafd")
val BTM_TORADIO_CHARACTER: UUID = UUID.fromString("f75c76d2-129e-4dad-a1dd-7866124401e7")
val BTM_FROMNUM_CHARACTER: UUID = UUID.fromString("ed9da18c-a800-4f66-a670-aa7547e34453")
val BTM_FROMRADIO_CHARACTER: UUID = UUID.fromString("2c55e69e-4993-11ed-b878-0242ac120002")
}

View file

@ -220,7 +220,7 @@ class MeshService : Service() {
private const val DEFAULT_CONFIG_ONLY_NONCE = 69420
private const val DEFAULT_NODE_INFO_NONCE = 69421
private const val HEARTBEAT_INTERVAL = 25L
private const val WANT_CONFIG_DELAY = 50L
}
private val serviceJob = Job()
@ -1792,9 +1792,9 @@ class MeshService : Service() {
}
// Keep BLE awake and allow the firmware to settle before the node-info stage.
serviceScope.handledLaunch {
delay(HEARTBEAT_INTERVAL)
delay(WANT_CONFIG_DELAY)
sendHeartbeat()
delay(HEARTBEAT_INTERVAL)
delay(WANT_CONFIG_DELAY)
startNodeInfoOnly()
}
}

View file

@ -57,6 +57,10 @@ constructor(
private val connectionStateHolder: MeshServiceConnectionStateHolder,
) {
companion object {
private const val TIMEOUT_MS = 250L
}
private var queueJob: Job? = null
private val scope = CoroutineScope(Dispatchers.IO)
@ -138,7 +142,7 @@ constructor(
// send packet to the radio and wait for response
val response = sendPacket(packet)
Timber.d("queueJob packet id=${packet.id.toUInt()} waiting")
val success = response.get(2, TimeUnit.MINUTES)
val success = response.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)
Timber.d("queueJob packet id=${packet.id.toUInt()} success $success")
} catch (e: TimeoutException) {
Timber.d("queueJob packet id=${packet.id.toUInt()} timeout")