feat: introduce Desktop target and expand Kotlin Multiplatform (KMP) architecture (#4761)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2026-03-12 16:14:49 -05:00 committed by GitHub
parent f4364cff9a
commit ac6bb5479b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
386 changed files with 17089 additions and 4590 deletions

View file

@ -0,0 +1,147 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.transport
import co.touchlab.kermit.Logger
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* Meshtastic stream framing codec pure Kotlin, no platform dependencies.
*
* Implements the START1/START2 + 2-byte-length + payload framing protocol used for serial and TCP communication with
* Meshtastic radios.
*
* Shared between Android (`StreamInterface`/`TCPInterface`) and Desktop (`DesktopRadioInterfaceService`).
*/
@Suppress("MagicNumber")
class StreamFrameCodec(
/** Called when a complete packet has been decoded from the byte stream. */
private val onPacketReceived: (ByteArray) -> Unit,
/** Optional log tag for debug output. */
private val logTag: String = "StreamCodec",
) {
companion object {
const val START1: Byte = 0x94.toByte()
const val START2: Byte = 0xc3.toByte()
const val MAX_TO_FROM_RADIO_SIZE = 512
const val HEADER_SIZE = 4
/** Default Meshtastic TCP service port. */
const val DEFAULT_TCP_PORT = 4403
/** Wake bytes to send before connecting to rouse a sleeping device. */
val WAKE_BYTES = byteArrayOf(START1, START1, START1, START1)
}
private val writeMutex = Mutex()
// Framing state machine
private var ptr = 0
private var msb = 0
private var lsb = 0
private var packetLen = 0
private val rxPacket = ByteArray(MAX_TO_FROM_RADIO_SIZE)
private val debugLineBuf = StringBuilder()
/**
* Process a single incoming byte through the stream framing state machine.
*
* Call this repeatedly with bytes from the transport (serial, TCP, etc). When a complete packet is decoded,
* [onPacketReceived] is invoked.
*/
fun processInputByte(c: Byte) {
var nextPtr = ptr + 1
fun lostSync() {
Logger.e { "$logTag: Lost protocol sync" }
nextPtr = 0
}
fun deliverPacket() {
val buf = rxPacket.copyOf(packetLen)
onPacketReceived(buf)
nextPtr = 0
}
when (ptr) {
0 ->
if (c != START1) {
debugOut(c)
nextPtr = 0
}
1 -> if (c != START2) lostSync()
2 -> msb = c.toInt() and 0xff
3 -> {
lsb = c.toInt() and 0xff
packetLen = (msb shl 8) or lsb
if (packetLen > MAX_TO_FROM_RADIO_SIZE) {
lostSync()
} else if (packetLen == 0) {
deliverPacket()
}
}
else -> {
rxPacket[ptr - HEADER_SIZE] = c
if (ptr - HEADER_SIZE + 1 == packetLen) {
deliverPacket()
}
}
}
ptr = nextPtr
}
/**
* Frames a payload into the Meshtastic stream protocol format: [START1][START2][MSB len][LSB len][payload].
*
* Thread-safe via an internal mutex multiple callers can call this concurrently.
*/
suspend fun frameAndSend(payload: ByteArray, sendBytes: (ByteArray) -> Unit, flush: () -> Unit = {}) {
writeMutex.withLock {
val header = ByteArray(HEADER_SIZE)
header[0] = START1
header[1] = START2
header[2] = (payload.size shr 8).toByte()
header[3] = (payload.size and 0xff).toByte()
sendBytes(header)
sendBytes(payload)
flush()
}
}
/** Resets the framing state machine. Call when reconnecting. */
fun reset() {
ptr = 0
msb = 0
lsb = 0
packetLen = 0
debugLineBuf.clear()
}
/** Print device serial debug output to the logger. */
private fun debugOut(b: Byte) {
when (val c = b.toInt().toChar()) {
'\r' -> {}
'\n' -> {
Logger.d { "$logTag DeviceLog: $debugLineBuf" }
debugLineBuf.clear()
}
else -> debugLineBuf.append(c)
}
}
}

View file

@ -0,0 +1,134 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.transport
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class StreamFrameCodecTest {
private val receivedPackets = mutableListOf<ByteArray>()
private val codec = StreamFrameCodec(onPacketReceived = { receivedPackets.add(it) }, logTag = "Test")
@Test
fun `processInputByte delivers a 1-byte packet`() {
val packet = byteArrayOf(0x94.toByte(), 0xc3.toByte(), 0x00, 0x01, 0x42)
packet.forEach { codec.processInputByte(it) }
assertEquals(1, receivedPackets.size)
assertEquals(listOf(0x42.toByte()), receivedPackets[0].toList())
}
@Test
fun `processInputByte handles zero length packet`() {
val packet = byteArrayOf(0x94.toByte(), 0xc3.toByte(), 0x00, 0x00)
packet.forEach { codec.processInputByte(it) }
assertEquals(1, receivedPackets.size)
assertTrue(receivedPackets[0].isEmpty())
}
@Test
fun `processInputByte loses sync on invalid START2`() {
// START1, wrong START2, START1, START2, LenMSB=0, LenLSB=1, payload
val data = byteArrayOf(0x94.toByte(), 0x00, 0x94.toByte(), 0xc3.toByte(), 0x00, 0x01, 0x55)
data.forEach { codec.processInputByte(it) }
assertEquals(1, receivedPackets.size)
assertEquals(listOf(0x55.toByte()), receivedPackets[0].toList())
}
@Test
fun `processInputByte handles multiple packets sequentially`() {
val packet1 = byteArrayOf(0x94.toByte(), 0xc3.toByte(), 0x00, 0x01, 0x11)
val packet2 = byteArrayOf(0x94.toByte(), 0xc3.toByte(), 0x00, 0x01, 0x22)
packet1.forEach { codec.processInputByte(it) }
packet2.forEach { codec.processInputByte(it) }
assertEquals(2, receivedPackets.size)
assertEquals(listOf(0x11.toByte()), receivedPackets[0].toList())
assertEquals(listOf(0x22.toByte()), receivedPackets[1].toList())
}
@Test
fun `processInputByte handles large packet up to MAX_TO_FROM_RADIO_SIZE`() {
val size = 512
val payload = ByteArray(size) { it.toByte() }
val header = byteArrayOf(0x94.toByte(), 0xc3.toByte(), (size shr 8).toByte(), (size and 0xff).toByte())
header.forEach { codec.processInputByte(it) }
payload.forEach { codec.processInputByte(it) }
assertEquals(1, receivedPackets.size)
assertEquals(payload.toList(), receivedPackets[0].toList())
}
@Test
fun `processInputByte loses sync on overly large packet length`() {
// 513 bytes is > 512
val header = byteArrayOf(0x94.toByte(), 0xc3.toByte(), 0x02, 0x01)
header.forEach { codec.processInputByte(it) }
assertTrue(receivedPackets.isEmpty())
}
@Test
fun `processInputByte handles multi-byte payload`() {
val payload = byteArrayOf(0x01, 0x02, 0x03, 0x04, 0x05)
val header = byteArrayOf(0x94.toByte(), 0xc3.toByte(), 0x00, 0x05)
header.forEach { codec.processInputByte(it) }
payload.forEach { codec.processInputByte(it) }
assertEquals(1, receivedPackets.size)
assertEquals(payload.toList(), receivedPackets[0].toList())
}
@Test
fun `reset clears framing state`() {
// Feed partial header
codec.processInputByte(0x94.toByte())
codec.processInputByte(0xc3.toByte())
// Reset mid-stream
codec.reset()
// Now feed a complete packet — should work from scratch
val packet = byteArrayOf(0x94.toByte(), 0xc3.toByte(), 0x00, 0x01, 0xAA.toByte())
packet.forEach { codec.processInputByte(it) }
assertEquals(1, receivedPackets.size)
assertEquals(listOf(0xAA.toByte()), receivedPackets[0].toList())
}
@Test
fun `WAKE_BYTES is four START1 bytes`() {
assertEquals(4, StreamFrameCodec.WAKE_BYTES.size)
StreamFrameCodec.WAKE_BYTES.forEach { assertEquals(0x94.toByte(), it) }
}
@Test
fun `DEFAULT_TCP_PORT is 4403`() {
assertEquals(4403, StreamFrameCodec.DEFAULT_TCP_PORT)
}
}

View file

@ -0,0 +1,310 @@
/*
* Copyright (c) 2025-2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.network.transport
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.proto.Heartbeat
import org.meshtastic.proto.ToRadio
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.IOException
import java.io.OutputStream
import java.net.InetAddress
import java.net.Socket
import java.net.SocketTimeoutException
/**
* Shared JVM TCP transport for Meshtastic radios.
*
* Manages the TCP socket lifecycle (connect, read loop, reconnect with backoff, heartbeat) and uses [StreamFrameCodec]
* for the START1/START2 stream framing protocol.
*
* Used by both Android's `TCPInterface` and Desktop's `DesktopRadioInterfaceService`.
*/
@Suppress("TooManyFunctions", "MagicNumber")
class TcpTransport(
private val dispatchers: CoroutineDispatchers,
private val scope: CoroutineScope,
private val listener: Listener,
private val logTag: String = "TcpTransport",
) {
/** Callbacks from the transport to the owning radio interface. */
interface Listener {
/** Called when the TCP connection is established and wake bytes have been sent. */
fun onConnected()
/** Called when the TCP connection is lost. */
fun onDisconnected()
/** Called when a decoded Meshtastic packet arrives. */
fun onPacketReceived(bytes: ByteArray)
}
companion object {
const val MAX_RECONNECT_RETRIES = Int.MAX_VALUE
const val MIN_BACKOFF_MILLIS = 1_000L
const val MAX_BACKOFF_MILLIS = 5 * 60 * 1_000L
const val SOCKET_TIMEOUT_MS = 5_000
const val SOCKET_RETRIES = 18 // 18 * 5s = 90s inactivity before disconnect
const val HEARTBEAT_INTERVAL_MILLIS = 30_000L
const val TIMEOUT_LOG_INTERVAL = 5
private const val MILLIS_PER_SECOND = 1_000L
}
private val codec = StreamFrameCodec(onPacketReceived = { listener.onPacketReceived(it) }, logTag = logTag)
// TCP socket state
private var socket: Socket? = null
private var outStream: OutputStream? = null
private var connectionJob: Job? = null
private var heartbeatJob: Job? = null
// Metrics
private var connectionStartTime: Long = 0
private var packetsReceived: Int = 0
private var packetsSent: Int = 0
private var bytesReceived: Long = 0
private var bytesSent: Long = 0
private var timeoutEvents: Int = 0
/** Whether the transport is currently connected. */
val isConnected: Boolean
get() = socket?.isConnected == true && !socket!!.isClosed
/**
* Start a TCP connection to the given address with automatic reconnect.
*
* @param address host or host:port string
*/
fun start(address: String) {
stop()
connectionJob = scope.handledLaunch { connectWithRetry(address) }
}
/** Stop the transport and close the socket. */
fun stop() {
connectionJob?.cancel()
connectionJob = null
disconnectSocket()
}
/**
* Send a raw framed Meshtastic packet.
*
* The payload is wrapped with the START1/START2 header by the codec.
*/
suspend fun sendPacket(payload: ByteArray) {
codec.frameAndSend(payload = payload, sendBytes = ::sendBytesRaw, flush = ::flushBytes)
}
/** Send a heartbeat packet to keep the connection alive. */
suspend fun sendHeartbeat() {
val heartbeat = ToRadio(heartbeat = Heartbeat())
sendPacket(heartbeat.encode())
}
// region Connection lifecycle
@Suppress("NestedBlockDepth")
private suspend fun connectWithRetry(address: String) {
var retryCount = 1
var backoff = MIN_BACKOFF_MILLIS
while (retryCount <= MAX_RECONNECT_RETRIES) {
try {
connectAndRead(address)
} catch (ex: IOException) {
Logger.w { "$logTag: [$address] TCP connection error - ${ex.message}" }
disconnectSocket()
} catch (@Suppress("TooGenericExceptionCaught") ex: Throwable) {
Logger.e(ex) { "$logTag: [$address] TCP exception - ${ex.message}" }
disconnectSocket()
}
val delaySec = backoff / MILLIS_PER_SECOND
Logger.i { "$logTag: [$address] Reconnect #$retryCount in ${delaySec}s" }
delay(backoff)
retryCount++
backoff = minOf(backoff * 2, MAX_BACKOFF_MILLIS)
}
}
@Suppress("NestedBlockDepth")
private suspend fun connectAndRead(address: String) = withContext(dispatchers.io) {
val parts = address.split(":", limit = 2)
val host = parts[0]
val port = parts.getOrNull(1)?.toIntOrNull() ?: StreamFrameCodec.DEFAULT_TCP_PORT
Logger.i { "$logTag: [$address] Connecting to $host:$port..." }
val attemptStart = nowMillis
Socket(InetAddress.getByName(host), port).use { sock ->
sock.tcpNoDelay = true
sock.keepAlive = true
sock.soTimeout = SOCKET_TIMEOUT_MS
socket = sock
val connectTime = nowMillis - attemptStart
connectionStartTime = nowMillis
resetMetrics()
codec.reset()
Logger.i { "$logTag: [$address] Socket connected in ${connectTime}ms" }
BufferedOutputStream(sock.getOutputStream()).use { output ->
outStream = output
BufferedInputStream(sock.getInputStream()).use { input ->
// Send wake bytes and signal connected
sendBytesRaw(StreamFrameCodec.WAKE_BYTES)
listener.onConnected()
startHeartbeat(address)
// Read loop
var timeoutCount = 0
while (timeoutCount < SOCKET_RETRIES) {
try {
val c = input.read()
if (c == -1) {
Logger.w { "$logTag: [$address] EOF after $packetsReceived packets" }
break
}
timeoutCount = 0
bytesReceived++
codec.processInputByte(c.toByte())
} catch (_: SocketTimeoutException) {
timeoutCount++
timeoutEvents++
if (timeoutCount % TIMEOUT_LOG_INTERVAL == 0) {
Logger.d { "$logTag: [$address] Timeout $timeoutCount/$SOCKET_RETRIES" }
}
}
}
if (timeoutCount >= SOCKET_RETRIES) {
Logger.w { "$logTag: [$address] Closing after $SOCKET_RETRIES consecutive timeouts" }
}
}
}
disconnectSocket()
}
}
// Guards against recursive disconnects triggered by listener callbacks.
private var isDisconnecting: Boolean = false
private fun disconnectSocket() {
if (isDisconnecting) return
isDisconnecting = true
try {
heartbeatJob?.cancel()
heartbeatJob = null
val s = socket
val hadConnection = s != null || outStream != null
if (s != null) {
val uptime = if (connectionStartTime > 0) nowMillis - connectionStartTime else 0
Logger.i {
"$logTag: Disconnecting - Uptime: ${uptime}ms, " +
"RX: $packetsReceived ($bytesReceived bytes), " +
"TX: $packetsSent ($bytesSent bytes)"
}
try {
s.close()
} catch (_: IOException) {
// Ignore close errors
}
}
socket = null
outStream = null
if (hadConnection) {
listener.onDisconnected()
}
} finally {
isDisconnecting = false
}
}
// endregion
// region Byte I/O
private fun sendBytesRaw(p: ByteArray) {
val stream =
outStream
?: run {
Logger.w { "$logTag: Cannot send ${p.size} bytes: not connected" }
return
}
packetsSent++
bytesSent += p.size
try {
stream.write(p)
} catch (ex: IOException) {
Logger.w(ex) { "$logTag: TCP write error: ${ex.message}" }
disconnectSocket()
}
}
private fun flushBytes() {
val stream = outStream ?: return
try {
stream.flush()
} catch (ex: IOException) {
Logger.w(ex) { "$logTag: TCP flush error: ${ex.message}" }
disconnectSocket()
}
}
// endregion
// region Heartbeat
private fun startHeartbeat(address: String) {
heartbeatJob?.cancel()
heartbeatJob =
scope.launch {
while (true) {
delay(HEARTBEAT_INTERVAL_MILLIS)
Logger.d { "$logTag: [$address] Sending heartbeat" }
sendHeartbeat()
}
}
}
// endregion
private fun resetMetrics() {
packetsReceived = 0
packetsSent = 0
bytesReceived = 0
bytesSent = 0
timeoutEvents = 0
}
}