feat(ble): Add support for FromRadioSync characteristic (#4609)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2026-02-20 18:02:00 -06:00 committed by GitHub
parent 5d2336c092
commit 96d4027f74
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 396 additions and 108 deletions

View file

@ -33,6 +33,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import no.nordicsemi.kotlin.ble.core.android.AndroidEnvironment
import org.meshtastic.core.database.DatabaseManager
import org.meshtastic.core.prefs.mesh.MeshPrefs
import org.meshtastic.core.prefs.meshlog.MeshLogPrefs
@ -71,6 +72,7 @@ open class MeshUtilApplication :
// Shutdown managers (useful for Robolectric tests)
val entryPoint = EntryPointAccessors.fromApplication(this, AppEntryPoint::class.java)
entryPoint.databaseManager().close()
entryPoint.androidEnvironment().close()
applicationScope.cancel()
super.onTerminate()
}
@ -99,6 +101,8 @@ interface AppEntryPoint {
fun meshPrefs(): MeshPrefs
fun meshLogPrefs(): MeshLogPrefs
fun androidEnvironment(): AndroidEnvironment
}
fun logAssert(executeReliableWrite: Boolean) {

View file

@ -51,6 +51,7 @@ import org.meshtastic.core.ble.BleConnection
import org.meshtastic.core.ble.BleError
import org.meshtastic.core.ble.BleScanner
import org.meshtastic.core.ble.MeshtasticBleConstants.FROMNUM_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.FROMRADIOSYNC_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.LOGRADIO_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.SERVICE_UUID
@ -113,6 +114,7 @@ constructor(
private var fromNumCharacteristic: RemoteCharacteristic? = null
private var fromRadioCharacteristic: RemoteCharacteristic? = null
private var logRadioCharacteristic: RemoteCharacteristic? = null
private var fromRadioSyncCharacteristic: RemoteCharacteristic? = null
init {
connect()
@ -257,13 +259,15 @@ constructor(
try {
val chars =
bleConnection.discoverCharacteristics(
SERVICE_UUID,
serviceUuid = SERVICE_UUID,
requiredUuids =
listOf(
TORADIO_CHARACTERISTIC,
FROMNUM_CHARACTERISTIC,
FROMRADIO_CHARACTERISTIC,
LOGRADIO_CHARACTERISTIC,
),
optionalUuids = listOf(FROMRADIOSYNC_CHARACTERISTIC),
)
if (chars != null) {
@ -271,6 +275,7 @@ constructor(
fromNumCharacteristic = chars[FROMNUM_CHARACTERISTIC]
fromRadioCharacteristic = chars[FROMRADIO_CHARACTERISTIC]
logRadioCharacteristic = chars[LOGRADIO_CHARACTERISTIC]
fromRadioSyncCharacteristic = chars[FROMRADIOSYNC_CHARACTERISTIC]
Logger.d { "[$address] Characteristics discovered successfully" }
setupNotifications()
@ -288,25 +293,48 @@ constructor(
// --- Notification Setup ---
@Suppress("LongMethod")
private suspend fun setupNotifications() {
val fromNumReady = CompletableDeferred<Unit>()
val fromRadioReady = CompletableDeferred<Unit>()
val logRadioReady = CompletableDeferred<Unit>()
fromNumCharacteristic
?.subscribe {
Logger.d { "[$address] FromNum subscription active" }
fromNumReady.complete(Unit)
}
?.onEach { notifyBytes ->
Logger.d { "[$address] FromNum Notification (${notifyBytes.size} bytes), draining queue" }
connectionScope.launch { drainPacketQueueAndDispatch() }
}
?.catch { e ->
if (!fromNumReady.isCompleted) fromNumReady.completeExceptionally(e)
Logger.w(e) { "[$address] Error in fromNumCharacteristic subscription" }
service.onDisconnect(BleError.from(e))
}
?.launchIn(connectionScope) ?: fromNumReady.complete(Unit)
// 1. Prefer FromRadioSync (Indicate) if available
if (fromRadioSyncCharacteristic != null) {
Logger.i { "[$address] Using FromRadioSync for packet reception" }
fromRadioSyncCharacteristic
?.subscribe {
Logger.d { "[$address] FromRadioSync subscription active" }
fromRadioReady.complete(Unit)
}
?.onEach { payload ->
Logger.d { "[$address] FromRadioSync Indication (${payload.size} bytes)" }
dispatchPacket(payload)
}
?.catch { e ->
if (!fromRadioReady.isCompleted) fromRadioReady.completeExceptionally(e)
Logger.w(e) { "[$address] Error in fromRadioSyncCharacteristic subscription" }
service.onDisconnect(BleError.from(e))
}
?.launchIn(connectionScope) ?: fromRadioReady.complete(Unit)
} else {
// 2. Fallback to legacy FromNum (Notify) + FromRadio (Read)
Logger.i { "[$address] Using legacy FromNum/FromRadio for packet reception" }
fromNumCharacteristic
?.subscribe {
Logger.d { "[$address] FromNum subscription active" }
fromRadioReady.complete(Unit)
}
?.onEach { notifyBytes ->
Logger.d { "[$address] FromNum Notification (${notifyBytes.size} bytes), draining queue" }
connectionScope.launch { drainPacketQueueAndDispatch() }
}
?.catch { e ->
if (!fromRadioReady.isCompleted) fromRadioReady.completeExceptionally(e)
Logger.w(e) { "[$address] Error in fromNumCharacteristic subscription" }
service.onDisconnect(BleError.from(e))
}
?.launchIn(connectionScope) ?: fromRadioReady.complete(Unit)
}
logRadioCharacteristic
?.subscribe {
@ -326,7 +354,7 @@ constructor(
try {
withTimeout(CONNECTION_TIMEOUT_MS) {
fromNumReady.await()
fromRadioReady.await()
logRadioReady.await()
}
Logger.d { "[$address] All notifications successfully subscribed" }
@ -364,7 +392,11 @@ constructor(
"to toRadioCharacteristic with $writeType - " +
"${p.size} bytes (Total TX: $bytesSent bytes)"
}
drainPacketQueueAndDispatch()
// Only manually drain if we are using the legacy FromNum/FromRadio flow
if (fromRadioSyncCharacteristic == null) {
drainPacketQueueAndDispatch()
}
} catch (e: InvalidAttributeException) {
Logger.w(e) { "[$address] Attribute invalidated during write, clearing characteristics" }
handleInvalidAttribute(e)
@ -415,5 +447,6 @@ constructor(
fromNumCharacteristic = null
fromRadioCharacteristic = null
logRadioCharacteristic = null
fromRadioSyncCharacteristic = null
}
}

View file

@ -22,8 +22,8 @@ import io.mockk.clearMocks
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import no.nordicsemi.kotlin.ble.client.android.CentralManager
import no.nordicsemi.kotlin.ble.client.android.mock.mock
@ -139,7 +139,7 @@ class NordicBleInterfaceRetryTest {
}
centralManager.simulatePeripherals(listOf(peripheralSpec))
delay(100.milliseconds)
advanceUntilIdle()
val nordicInterface =
NordicBleInterface(
@ -150,7 +150,7 @@ class NordicBleInterfaceRetryTest {
)
// Wait for connection and stable state
delay(2000.milliseconds)
advanceUntilIdle()
verify(timeout = 5000) { service.onConnect() }
// Clear initial discovery errors if any (sometimes mock emits empty list initially)
@ -160,8 +160,8 @@ class NordicBleInterfaceRetryTest {
val dataToSend = byteArrayOf(0x01, 0x02, 0x03)
nordicInterface.handleSendToRadio(dataToSend)
// Give it time to process retries (500ms delay per retry in code)
delay(1500.milliseconds)
// Give it time to process retries
advanceUntilIdle()
assert(writeAttempts == 2) { "Should have attempted write twice, but was $writeAttempts" }
assert(writtenValue != null) { "Value should have been eventually written" }
@ -244,7 +244,7 @@ class NordicBleInterfaceRetryTest {
}
centralManager.simulatePeripherals(listOf(peripheralSpec))
delay(100.milliseconds)
advanceUntilIdle()
val nordicInterface =
NordicBleInterface(
@ -255,7 +255,7 @@ class NordicBleInterfaceRetryTest {
)
// Wait for connection
delay(2000.milliseconds)
advanceUntilIdle()
verify(timeout = 5000) { service.onConnect() }
// Clear initial discovery errors
@ -264,8 +264,8 @@ class NordicBleInterfaceRetryTest {
// Trigger write which will fail repeatedly
nordicInterface.handleSendToRadio(byteArrayOf(0x01))
// Wait for all 3 attempts + delays (500ms * 2)
delay(2500.milliseconds)
// Wait for all attempts
advanceUntilIdle()
assert(writeAttempts == 3) {
"Should have attempted write 3 times (initial + 2 retries), but was $writeAttempts"

View file

@ -21,8 +21,8 @@ import co.touchlab.kermit.Severity
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import no.nordicsemi.kotlin.ble.client.android.CentralManager
import no.nordicsemi.kotlin.ble.client.android.mock.mock
@ -42,6 +42,7 @@ import org.junit.Before
import org.junit.Test
import org.meshtastic.core.ble.BleError
import org.meshtastic.core.ble.MeshtasticBleConstants.FROMNUM_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.FROMRADIOSYNC_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.FROMRADIO_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.LOGRADIO_CHARACTERISTIC
import org.meshtastic.core.ble.MeshtasticBleConstants.SERVICE_UUID
@ -146,7 +147,7 @@ class NordicBleInterfaceTest {
centralManager.getBondedPeripherals().forEach { println("Found bonded peripheral: ${it.address}") }
// Give it a moment to stabilize
delay(100.milliseconds)
advanceUntilIdle()
// Create the interface
println("Creating NordicBleInterface")
@ -160,7 +161,7 @@ class NordicBleInterfaceTest {
// Wait for connection and discovery
println("Waiting for connection...")
delay(2000.milliseconds)
advanceUntilIdle()
println("Verifying onConnect...")
verify(timeout = 5000) { service.onConnect() }
@ -173,13 +174,13 @@ class NordicBleInterfaceTest {
otaPeripheral.simulateValueUpdate(fromNumHandle, byteArrayOf(0x01))
// Wait for drain to start
delay(500.milliseconds)
advanceUntilIdle()
// Simulate a log radio notification
val logData = "test log".toByteArray()
otaPeripheral.simulateValueUpdate(logRadioHandle, logData)
delay(500.milliseconds)
advanceUntilIdle()
// Explicitly stub handleFromRadio just in case relaxed mock fails
io.mockk.every { service.handleFromRadio(any()) } returns Unit
@ -281,7 +282,7 @@ class NordicBleInterfaceTest {
}
centralManager.simulatePeripherals(listOf(peripheralSpec))
delay(100.milliseconds)
advanceUntilIdle()
val nordicInterface =
NordicBleInterface(
@ -292,7 +293,7 @@ class NordicBleInterfaceTest {
)
// Wait for connection
delay(1000.milliseconds)
advanceUntilIdle()
verify(timeout = 2000) { service.onConnect() }
// Test writing
@ -300,7 +301,7 @@ class NordicBleInterfaceTest {
nordicInterface.handleSendToRadio(dataToSend)
// Give it time to process
delay(500.milliseconds)
advanceUntilIdle()
assert(writtenValue != null) { "Value should have been written" }
assert(writtenValue!!.contentEquals(dataToSend)) {
@ -374,7 +375,7 @@ class NordicBleInterfaceTest {
}
centralManager.simulatePeripherals(listOf(peripheralSpec))
delay(100.milliseconds)
advanceUntilIdle()
val nordicInterface =
NordicBleInterface(
@ -385,7 +386,7 @@ class NordicBleInterfaceTest {
)
// Wait for connection
delay(1000.milliseconds)
advanceUntilIdle()
verify(timeout = 2000) { service.onConnect() }
// Find the connected peripheral from CentralManager to trigger disconnect
@ -395,7 +396,7 @@ class NordicBleInterfaceTest {
connectedPeripheral.disconnect()
// Wait for disconnect event propagation
delay(1000.milliseconds)
advanceUntilIdle()
// Verify onDisconnect was called on the service
// NordicBleInterface calls onDisconnect(BleError.Disconnected)
@ -465,7 +466,7 @@ class NordicBleInterfaceTest {
}
centralManager.simulatePeripherals(listOf(peripheralSpec))
delay(100.milliseconds)
advanceUntilIdle()
val nordicInterface =
NordicBleInterface(
@ -476,7 +477,7 @@ class NordicBleInterfaceTest {
)
// Wait for connection and eventual failure
delay(1000.milliseconds)
advanceUntilIdle()
// Verify that discovery failed
verify { service.onDisconnect(any<BleError.DiscoveryFailed>()) }
@ -551,7 +552,7 @@ class NordicBleInterfaceTest {
}
centralManager.simulatePeripherals(listOf(peripheralSpec))
delay(1000.milliseconds)
advanceUntilIdle()
val nordicInterface =
NordicBleInterface(
@ -562,7 +563,7 @@ class NordicBleInterfaceTest {
)
// Wait for connection
delay(1000.milliseconds)
advanceUntilIdle()
verify(timeout = 2000) { service.onConnect() }
// Trigger write which will fail
@ -570,11 +571,99 @@ class NordicBleInterfaceTest {
// Wait for error propagation (retries take time!)
// 3 attempts with 500ms delay between them = ~1000ms+
delay(2500.milliseconds)
advanceUntilIdle()
// Verify onDisconnect was called with error
verify { service.onDisconnect(any<BleError>()) }
nordicInterface.close()
}
@Test
fun `fromRadioSync flow prefers Indicate characteristic`() = runTest(testDispatcher) {
val mockEnvironment = MockAndroidEnvironment.Api31(isBluetoothEnabled = true)
val centralManager = CentralManager.mock(mockEnvironment, scope = backgroundScope)
val service = mockk<RadioInterfaceService>(relaxed = true)
var syncCharHandle: Int = -1
val payload = byteArrayOf(0xDE.toByte(), 0xAD.toByte())
val eventHandler =
object : PeripheralSpecEventHandler {
override fun onConnectionRequest(preferredPhy: List<no.nordicsemi.kotlin.ble.core.Phy>) =
ConnectionResult.Accept
override fun onReadRequest(characteristic: MockRemoteCharacteristic): ReadResponse =
ReadResponse.Success(byteArrayOf())
}
val peripheralSpec =
PeripheralSpec.simulatePeripheral(identifier = address, proximity = Proximity.IMMEDIATE) {
advertising(
parameters = LegacyAdvertisingSetParameters(connectable = true, interval = 100.milliseconds),
) {
CompleteLocalName("Meshtastic_Sync")
}
connectable(
name = "Meshtastic_Sync",
isBonded = true,
eventHandler = eventHandler,
cachedServices = {
Service(uuid = SERVICE_UUID) {
Characteristic(
uuid = TORADIO_CHARACTERISTIC,
properties = setOf(CharacteristicProperty.WRITE),
permission = Permission.WRITE,
)
Characteristic(
uuid = FROMNUM_CHARACTERISTIC,
properties = setOf(CharacteristicProperty.NOTIFY),
permission = Permission.READ,
)
Characteristic(
uuid = FROMRADIO_CHARACTERISTIC,
properties = setOf(CharacteristicProperty.READ),
permission = Permission.READ,
)
Characteristic(
uuid = LOGRADIO_CHARACTERISTIC,
properties = setOf(CharacteristicProperty.NOTIFY),
permission = Permission.READ,
)
// NEW: Provide the Sync characteristic
syncCharHandle =
Characteristic(
uuid = FROMRADIOSYNC_CHARACTERISTIC,
properties = setOf(CharacteristicProperty.INDICATE),
permission = Permission.READ,
)
}
},
)
}
centralManager.simulatePeripherals(listOf(peripheralSpec))
advanceUntilIdle()
val nordicInterface =
NordicBleInterface(
serviceScope = this,
centralManager = centralManager,
service = service,
address = address,
)
// Wait for connection and discovery
advanceUntilIdle()
verify(timeout = 2000) { service.onConnect() }
// Simulate an indication from FROMRADIOSYNC
peripheralSpec.simulateValueUpdate(syncCharHandle, payload)
advanceUntilIdle()
// Verify handleFromRadio was called directly with the payload
verify(timeout = 2000) { service.handleFromRadio(p = payload) }
nordicInterface.close()
}
}