feat(wire): migrate from protobuf -> wire (#4401)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich 2026-02-03 18:01:12 -06:00 committed by GitHub
parent 9dbc8b7fbf
commit 25657e8f8f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
239 changed files with 7149 additions and 6144 deletions

View file

@ -21,7 +21,6 @@ import co.touchlab.kermit.Logger
import com.geeksville.mesh.BuildConfig
import com.geeksville.mesh.concurrent.handledLaunch
import com.geeksville.mesh.repository.radio.InterfaceId
import com.google.protobuf.InvalidProtocolBufferException
import com.meshtastic.core.strings.getString
import dagger.Lazy
import kotlinx.coroutines.CoroutineScope
@ -29,6 +28,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.first
import okio.ByteString.Companion.toByteString
import org.meshtastic.core.analytics.DataPair
import org.meshtastic.core.analytics.platform.PlatformAnalytics
import org.meshtastic.core.data.repository.PacketRepository
@ -38,6 +38,8 @@ import org.meshtastic.core.database.entity.ReactionEntity
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MessageStatus
import org.meshtastic.core.model.util.SfppHasher
import org.meshtastic.core.model.util.decodeOrNull
import org.meshtastic.core.model.util.toOneLiner
import org.meshtastic.core.prefs.mesh.MeshPrefs
import org.meshtastic.core.service.MeshServiceNotifications
import org.meshtastic.core.service.RetryEvent
@ -48,20 +50,25 @@ import org.meshtastic.core.strings.critical_alert
import org.meshtastic.core.strings.error_duty_cycle
import org.meshtastic.core.strings.unknown_username
import org.meshtastic.core.strings.waypoint_received
import org.meshtastic.proto.AdminProtos
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.MeshProtos.MeshPacket
import org.meshtastic.proto.PaxcountProtos
import org.meshtastic.proto.Portnums
import org.meshtastic.proto.StoreAndForwardProtos
import org.meshtastic.proto.TelemetryProtos
import org.meshtastic.proto.copy
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.Paxcount
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.Position
import org.meshtastic.proto.Routing
import org.meshtastic.proto.StatusMessage
import org.meshtastic.proto.StoreAndForward
import org.meshtastic.proto.StoreForwardPlusPlus
import org.meshtastic.proto.Telemetry
import org.meshtastic.proto.User
import org.meshtastic.proto.Waypoint
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.time.Duration.Companion.milliseconds
@Suppress("LongParameterList", "TooManyFunctions", "LargeClass")
@Suppress("LongParameterList", "TooManyFunctions", "LargeClass", "CyclomaticComplexMethod")
@Singleton
class MeshDataHandler
@Inject
@ -93,10 +100,10 @@ constructor(
private val rememberDataType =
setOf(
Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
Portnums.PortNum.ALERT_APP_VALUE,
Portnums.PortNum.WAYPOINT_APP_VALUE,
Portnums.PortNum.NODE_STATUS_APP_VALUE,
PortNum.TEXT_MESSAGE_APP.value,
PortNum.ALERT_APP.value,
PortNum.WAYPOINT_APP.value,
PortNum.NODE_STATUS_APP.value,
)
fun handleReceivedData(packet: MeshPacket, myNodeNum: Int, logUuid: String? = null, logInsertJob: Job? = null) {
@ -121,14 +128,15 @@ constructor(
logInsertJob: Job?,
): Boolean {
var shouldBroadcast = !fromUs
when (packet.decoded.portnumValue) {
Portnums.PortNum.TEXT_MESSAGE_APP_VALUE -> handleTextMessage(packet, dataPacket, myNodeNum)
Portnums.PortNum.NODE_STATUS_APP_VALUE -> handleNodeStatus(packet, dataPacket, myNodeNum)
Portnums.PortNum.ALERT_APP_VALUE -> rememberDataPacket(dataPacket, myNodeNum)
Portnums.PortNum.WAYPOINT_APP_VALUE -> handleWaypoint(packet, dataPacket, myNodeNum)
Portnums.PortNum.POSITION_APP_VALUE -> handlePosition(packet, dataPacket, myNodeNum)
Portnums.PortNum.NODEINFO_APP_VALUE -> if (!fromUs) handleNodeInfo(packet)
Portnums.PortNum.TELEMETRY_APP_VALUE -> handleTelemetry(packet, dataPacket, myNodeNum)
val decoded = packet.decoded ?: return shouldBroadcast
when (decoded.portnum) {
PortNum.TEXT_MESSAGE_APP -> handleTextMessage(packet, dataPacket, myNodeNum)
PortNum.NODE_STATUS_APP -> handleNodeStatus(packet, dataPacket, myNodeNum)
PortNum.ALERT_APP -> rememberDataPacket(dataPacket, myNodeNum)
PortNum.WAYPOINT_APP -> handleWaypoint(packet, dataPacket, myNodeNum)
PortNum.POSITION_APP -> handlePosition(packet, dataPacket, myNodeNum)
PortNum.NODEINFO_APP -> if (!fromUs) handleNodeInfo(packet)
PortNum.TELEMETRY_APP -> handleTelemetry(packet, dataPacket, myNodeNum)
else -> shouldBroadcast = handleSpecializedDataPacket(packet, dataPacket, myNodeNum, logUuid, logInsertJob)
}
return shouldBroadcast
@ -142,189 +150,192 @@ constructor(
logInsertJob: Job?,
): Boolean {
var shouldBroadcast = false
when (packet.decoded.portnumValue) {
Portnums.PortNum.TRACEROUTE_APP_VALUE -> {
val decoded = packet.decoded ?: return shouldBroadcast
when (decoded.portnum) {
PortNum.TRACEROUTE_APP -> {
tracerouteHandler.handleTraceroute(packet, logUuid, logInsertJob)
shouldBroadcast = false
}
Portnums.PortNum.ROUTING_APP_VALUE -> {
PortNum.ROUTING_APP -> {
handleRouting(packet, dataPacket)
shouldBroadcast = true
}
Portnums.PortNum.PAXCOUNTER_APP_VALUE -> {
PortNum.PAXCOUNTER_APP -> {
handlePaxCounter(packet)
shouldBroadcast = false
}
Portnums.PortNum.STORE_FORWARD_APP_VALUE -> {
PortNum.STORE_FORWARD_APP -> {
handleStoreAndForward(packet, dataPacket, myNodeNum)
shouldBroadcast = false
}
Portnums.PortNum.STORE_FORWARD_PLUSPLUS_APP_VALUE -> {
PortNum.STORE_FORWARD_PLUSPLUS_APP -> {
handleStoreForwardPlusPlus(packet)
shouldBroadcast = false
}
Portnums.PortNum.ADMIN_APP_VALUE -> {
PortNum.ADMIN_APP -> {
handleAdminMessage(packet, myNodeNum)
shouldBroadcast = false
}
Portnums.PortNum.NEIGHBORINFO_APP_VALUE -> {
PortNum.NEIGHBORINFO_APP -> {
neighborInfoHandler.handleNeighborInfo(packet)
shouldBroadcast = true
}
Portnums.PortNum.RANGE_TEST_APP_VALUE,
Portnums.PortNum.DETECTION_SENSOR_APP_VALUE,
PortNum.RANGE_TEST_APP,
PortNum.DETECTION_SENSOR_APP,
-> {
handleRangeTest(dataPacket, myNodeNum)
shouldBroadcast = false
}
else -> {}
}
return shouldBroadcast
}
private fun handleRangeTest(dataPacket: DataPacket, myNodeNum: Int) {
val u = dataPacket.copy(dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE)
val u = dataPacket.copy(dataType = PortNum.TEXT_MESSAGE_APP.value)
rememberDataPacket(u, myNodeNum)
}
private fun handleStoreAndForward(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val u = StoreAndForwardProtos.StoreAndForward.parseFrom(packet.decoded.payload)
val payload = packet.decoded?.payload ?: return
val u = StoreAndForward.ADAPTER.decode(payload)
handleReceivedStoreAndForward(dataPacket, u, myNodeNum)
}
@Suppress("LongMethod")
private fun handleStoreForwardPlusPlus(packet: MeshPacket) {
val payload = packet.decoded?.payload ?: return
val sfpp =
try {
MeshProtos.StoreForwardPlusPlus.parseFrom(packet.decoded.payload)
} catch (e: InvalidProtocolBufferException) {
StoreForwardPlusPlus.ADAPTER.decode(payload)
} catch (e: IOException) {
Logger.e(e) { "Failed to parse StoreForwardPlusPlus packet" }
return
}
Logger.d { "Received StoreForwardPlusPlus packet: $sfpp" }
when (sfpp.sfppMessageType) {
MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE,
MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE_FIRSTHALF,
MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE_SECONDHALF,
when (sfpp.sfpp_message_type) {
StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE,
StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE_FIRSTHALF,
StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE_SECONDHALF,
-> {
val isFragment = sfpp.sfppMessageType != MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE
val isFragment = sfpp.sfpp_message_type != StoreForwardPlusPlus.SFPP_message_type.LINK_PROVIDE
// If it has a commit hash, it's already on the chain (Confirmed)
// Otherwise it's still being routed via SF++ (Routing)
val status = if (sfpp.commitHash.isEmpty) MessageStatus.SFPP_ROUTING else MessageStatus.SFPP_CONFIRMED
val status =
if (sfpp.commit_hash.size == 0) MessageStatus.SFPP_ROUTING else MessageStatus.SFPP_CONFIRMED
// Prefer a full 16-byte hash calculated from the message bytes if available
// But only if it's NOT a fragment, otherwise the calculated hash would be wrong
val hash =
when {
!sfpp.messageHash.isEmpty -> sfpp.messageHash.toByteArray()
!isFragment && !sfpp.message.isEmpty -> {
sfpp.message_hash.size != 0 -> sfpp.message_hash.toByteArray()
!isFragment && sfpp.message.size != 0 -> {
SfppHasher.computeMessageHash(
encryptedPayload = sfpp.message.toByteArray(),
// Map 0 back to NODENUM_BROADCAST to match firmware hash calculation
to =
if (sfpp.encapsulatedTo == 0) DataPacket.NODENUM_BROADCAST else sfpp.encapsulatedTo,
from = sfpp.encapsulatedFrom,
id = sfpp.encapsulatedId,
if (sfpp.encapsulated_to == 0) {
DataPacket.NODENUM_BROADCAST
} else {
sfpp.encapsulated_to
},
from = sfpp.encapsulated_from,
id = sfpp.encapsulated_id,
)
}
else -> null
} ?: return
Logger.d {
"SFPP updateStatus: packetId=${sfpp.encapsulatedId} from=${sfpp.encapsulatedFrom} " +
"to=${sfpp.encapsulatedTo} myNodeNum=${nodeManager.myNodeNum} status=$status"
"SFPP updateStatus: packetId=${sfpp.encapsulated_id} from=${sfpp.encapsulated_from} " +
"to=${sfpp.encapsulated_to} myNodeNum=${nodeManager.myNodeNum} status=$status"
}
scope.handledLaunch {
packetRepository
.get()
.updateSFPPStatus(
packetId = sfpp.encapsulatedId,
from = sfpp.encapsulatedFrom,
to = sfpp.encapsulatedTo,
packetId = sfpp.encapsulated_id,
from = sfpp.encapsulated_from,
to = sfpp.encapsulated_to,
hash = hash,
status = status,
rxTime = sfpp.encapsulatedRxtime.toLong() and 0xFFFFFFFFL,
myNodeNum = nodeManager.myNodeNum,
rxTime = sfpp.encapsulated_rxtime.toLong() and 0xFFFFFFFFL,
myNodeNum = nodeManager.myNodeNum ?: 0,
)
serviceBroadcasts.broadcastMessageStatus(sfpp.encapsulatedId, status)
serviceBroadcasts.broadcastMessageStatus(sfpp.encapsulated_id, status)
}
}
MeshProtos.StoreForwardPlusPlus.SFPP_message_type.CANON_ANNOUNCE -> {
StoreForwardPlusPlus.SFPP_message_type.CANON_ANNOUNCE -> {
scope.handledLaunch {
packetRepository
.get()
.updateSFPPStatusByHash(
hash = sfpp.messageHash.toByteArray(),
status = MessageStatus.SFPP_CONFIRMED,
rxTime = sfpp.encapsulatedRxtime.toLong() and 0xFFFFFFFFL,
)
sfpp.message_hash.let {
packetRepository
.get()
.updateSFPPStatusByHash(
hash = it.toByteArray(),
status = MessageStatus.SFPP_CONFIRMED,
rxTime = sfpp.encapsulated_rxtime.toLong() and 0xFFFFFFFFL,
)
}
}
}
MeshProtos.StoreForwardPlusPlus.SFPP_message_type.CHAIN_QUERY -> {
StoreForwardPlusPlus.SFPP_message_type.CHAIN_QUERY -> {
Logger.i { "SF++: Node ${packet.from} is querying chain status" }
}
MeshProtos.StoreForwardPlusPlus.SFPP_message_type.LINK_REQUEST -> {
StoreForwardPlusPlus.SFPP_message_type.LINK_REQUEST -> {
Logger.i { "SF++: Node ${packet.from} is requesting links" }
}
else -> {}
}
}
private fun handlePaxCounter(packet: MeshPacket) {
val p = PaxcountProtos.Paxcount.parseFrom(packet.decoded.payload)
val payload = packet.decoded?.payload ?: return
val p = Paxcount.ADAPTER.decodeOrNull(payload, Logger) ?: return
nodeManager.handleReceivedPaxcounter(packet.from, p)
}
private fun handlePosition(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val p = MeshProtos.Position.parseFrom(packet.decoded.payload)
val payload = packet.decoded?.payload ?: return
val p = Position.ADAPTER.decodeOrNull(payload, Logger) ?: return
Logger.d { "Position from ${packet.from}: ${Position.ADAPTER.toOneLiner(p)}" }
nodeManager.handleReceivedPosition(packet.from, myNodeNum, p, dataPacket.time)
}
private fun handleWaypoint(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val u = MeshProtos.Waypoint.parseFrom(packet.decoded.payload)
if (u.lockedTo != 0 && u.lockedTo != packet.from) return
val payload = packet.decoded?.payload ?: return
val u = Waypoint.ADAPTER.decode(payload)
if (u.locked_to != 0 && u.locked_to != packet.from) return
val currentSecond = (System.currentTimeMillis() / MILLISECONDS_IN_SECOND).toInt()
rememberDataPacket(dataPacket, myNodeNum, updateNotification = u.expire > currentSecond)
}
private fun handleAdminMessage(packet: MeshPacket, myNodeNum: Int) {
val u = AdminProtos.AdminMessage.parseFrom(packet.decoded.payload)
commandSender.setSessionPasskey(u.sessionPasskey)
val payload = packet.decoded?.payload ?: return
val u = AdminMessage.ADAPTER.decode(payload)
u.session_passkey.let { commandSender.setSessionPasskey(it) }
if (packet.from == myNodeNum) {
when (u.payloadVariantCase) {
AdminProtos.AdminMessage.PayloadVariantCase.GET_CONFIG_RESPONSE ->
configHandler.handleDeviceConfig(u.getConfigResponse)
AdminProtos.AdminMessage.PayloadVariantCase.GET_CHANNEL_RESPONSE ->
configHandler.handleChannel(u.getChannelResponse)
else -> {}
}
val fromNum = packet.from
if (fromNum == myNodeNum) {
u.get_config_response?.let { configHandler.handleDeviceConfig(it) }
u.get_channel_response?.let { configHandler.handleChannel(it) }
}
if (u.payloadVariantCase == AdminProtos.AdminMessage.PayloadVariantCase.GET_DEVICE_METADATA_RESPONSE) {
if (packet.from == myNodeNum) {
configFlowManager.handleLocalMetadata(u.getDeviceMetadataResponse)
u.get_device_metadata_response?.let { metadata ->
if (fromNum == myNodeNum) {
configFlowManager.handleLocalMetadata(metadata)
} else {
nodeManager.insertMetadata(packet.from, u.getDeviceMetadataResponse)
nodeManager.insertMetadata(fromNum, metadata)
}
}
}
private fun handleTextMessage(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
if (packet.decoded.replyId != 0 && packet.decoded.emoji != 0) {
val decoded = packet.decoded ?: return
if (decoded.reply_id != 0 && decoded.emoji != 0) {
rememberReaction(packet)
} else {
rememberDataPacket(dataPacket, myNodeNum)
@ -332,25 +343,28 @@ constructor(
}
private fun handleNodeInfo(packet: MeshPacket) {
val payload = packet.decoded?.payload ?: return
val u =
MeshProtos.User.parseFrom(packet.decoded.payload).copy {
if (isLicensed) clearPublicKey()
if (packet.viaMqtt) longName = "$longName (MQTT)"
}
User.ADAPTER.decode(payload)
.let { if (it.is_licensed == true) it.copy(public_key = okio.ByteString.EMPTY) else it }
.let { if (packet.via_mqtt == true) it.copy(long_name = "${it.long_name} (MQTT)") else it }
nodeManager.handleReceivedUser(packet.from, u, packet.channel)
}
private fun handleNodeStatus(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val s = MeshProtos.StatusMessage.parseFrom(packet.decoded.payload)
val payload = packet.decoded?.payload ?: return
val s = StatusMessage.ADAPTER.decodeOrNull(payload, Logger) ?: return
nodeManager.handleReceivedNodeStatus(packet.from, s)
rememberDataPacket(dataPacket, myNodeNum)
}
private fun handleTelemetry(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val payload = packet.decoded?.payload ?: return
val t =
TelemetryProtos.Telemetry.parseFrom(packet.decoded.payload).copy {
if (time == 0) time = (dataPacket.time.milliseconds.inWholeSeconds).toInt()
(Telemetry.ADAPTER.decodeOrNull(payload, Logger) ?: return).let {
if (it.time == 0) it.copy(time = (dataPacket.time.milliseconds.inWholeSeconds).toInt()) else it
}
Logger.d { "Telemetry from ${packet.from}: ${Telemetry.ADAPTER.toOneLiner(t)}" }
val fromNum = packet.from
val isRemote = (fromNum != myNodeNum)
if (!isRemote) {
@ -358,13 +372,16 @@ constructor(
}
nodeManager.updateNodeInfo(fromNum) { nodeEntity ->
val metrics = t.device_metrics
val environment = t.environment_metrics
val power = t.power_metrics
when {
t.hasDeviceMetrics() -> {
metrics != null -> {
nodeEntity.deviceTelemetry = t
if (fromNum == myNodeNum || (isRemote && nodeEntity.isFavorite)) {
if (
t.deviceMetrics.voltage > BATTERY_PERCENT_UNSUPPORTED &&
t.deviceMetrics.batteryLevel <= BATTERY_PERCENT_LOW_THRESHOLD
(metrics.voltage ?: 0f) > BATTERY_PERCENT_UNSUPPORTED &&
(metrics.battery_level ?: 0) <= BATTERY_PERCENT_LOW_THRESHOLD
) {
if (shouldBatteryNotificationShow(fromNum, t, myNodeNum)) {
serviceNotifications.showOrUpdateLowBatteryNotification(nodeEntity, isRemote)
@ -378,24 +395,26 @@ constructor(
}
}
t.hasEnvironmentMetrics() -> nodeEntity.environmentTelemetry = t
t.hasPowerMetrics() -> nodeEntity.powerTelemetry = t
environment != null -> nodeEntity.environmentTelemetry = t
power != null -> nodeEntity.powerTelemetry = t
}
}
}
private fun shouldBatteryNotificationShow(fromNum: Int, t: TelemetryProtos.Telemetry, myNodeNum: Int): Boolean {
private fun shouldBatteryNotificationShow(fromNum: Int, t: Telemetry, myNodeNum: Int): Boolean {
val isRemote = (fromNum != myNodeNum)
var shouldDisplay = false
var forceDisplay = false
val metrics = t.device_metrics ?: return false
val batteryLevel = metrics.battery_level ?: 0
when {
t.deviceMetrics.batteryLevel <= BATTERY_PERCENT_CRITICAL_THRESHOLD -> {
batteryLevel <= BATTERY_PERCENT_CRITICAL_THRESHOLD -> {
shouldDisplay = true
forceDisplay = true
}
t.deviceMetrics.batteryLevel == BATTERY_PERCENT_LOW_THRESHOLD -> shouldDisplay = true
t.deviceMetrics.batteryLevel.mod(BATTERY_PERCENT_LOW_DIVISOR) == 0 && !isRemote -> shouldDisplay = true
batteryLevel == BATTERY_PERCENT_LOW_THRESHOLD -> shouldDisplay = true
batteryLevel.mod(BATTERY_PERCENT_LOW_DIVISOR) == 0 && !isRemote -> shouldDisplay = true
isRemote -> shouldDisplay = true
}
@ -411,31 +430,32 @@ constructor(
}
private fun handleRouting(packet: MeshPacket, dataPacket: DataPacket) {
val r = MeshProtos.Routing.parseFrom(packet.decoded.payload)
if (r.errorReason == MeshProtos.Routing.Error.DUTY_CYCLE_LIMIT) {
val payload = packet.decoded?.payload ?: return
val r = Routing.ADAPTER.decodeOrNull(payload, Logger) ?: return
if (r.error_reason == Routing.Error.DUTY_CYCLE_LIMIT) {
serviceRepository.setErrorMessage(getString(Res.string.error_duty_cycle))
}
handleAckNak(
packet.decoded.requestId,
packet.decoded?.request_id ?: 0,
dataMapper.toNodeID(packet.from),
r.errorReasonValue,
r.error_reason?.value ?: 0,
dataPacket.relayNode,
)
packetHandler.removeResponse(packet.decoded.requestId, complete = true)
packet.decoded?.request_id?.let { packetHandler.removeResponse(it, complete = true) }
}
@Suppress("CyclomaticComplexMethod", "LongMethod")
private fun handleAckNak(requestId: Int, fromId: String, routingError: Int, relayNode: Int?) {
scope.handledLaunch {
val isAck = routingError == MeshProtos.Routing.Error.NONE_VALUE
val isAck = routingError == Routing.Error.NONE.value
val p = packetRepository.get().getPacketById(requestId)
val reaction = packetRepository.get().getReactionByPacketId(requestId)
val isMaxRetransmit = routingError == MeshProtos.Routing.Error.MAX_RETRANSMIT_VALUE
val isMaxRetransmit = routingError == Routing.Error.MAX_RETRANSMIT.value
val shouldRetry =
isMaxRetransmit &&
p != null &&
p.port_num == Portnums.PortNum.TEXT_MESSAGE_APP_VALUE &&
p.port_num == PortNum.TEXT_MESSAGE_APP.value &&
(p.data.from == DataPacket.ID_LOCAL || p.data.from == nodeManager.getMyId()) &&
p.data.retryCount < MAX_RETRY_ATTEMPTS
@ -482,7 +502,7 @@ constructor(
relayNode = null,
)
val updatedPacket =
p.copy(packetId = newId, data = updatedData, routingError = MeshProtos.Routing.Error.NONE_VALUE)
p.copy(packetId = newId, data = updatedData, routingError = Routing.Error.NONE.value)
packetRepository.get().update(updatedPacket)
Logger.w { "[ackNak] retrying req=$requestId newId=$newId retry=$newRetryCount" }
@ -496,7 +516,7 @@ constructor(
return@handledLaunch
}
if (shouldRetryReaction && reaction != null) {
if (shouldRetryReaction) {
val newRetryCount = reaction.retryCount + 1
// Emit retry event to UI and wait for user response
@ -519,8 +539,8 @@ constructor(
DataPacket(
to = reaction.to,
channel = reaction.channel,
bytes = reaction.emoji.toByteArray(Charsets.UTF_8),
dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
bytes = reaction.emoji.encodeToByteArray().toByteString(),
dataType = PortNum.TEXT_MESSAGE_APP.value,
replyId = reaction.replyId,
wantAck = true,
emoji = reaction.emoji.codePointAt(0),
@ -534,7 +554,7 @@ constructor(
status = MessageStatus.QUEUED,
retryCount = newRetryCount,
relayNode = null,
routingError = MeshProtos.Routing.Error.NONE_VALUE,
routingError = Routing.Error.NONE.value,
)
packetRepository.get().updateReaction(updatedReaction)
@ -579,59 +599,53 @@ constructor(
}
}
private fun handleReceivedStoreAndForward(
dataPacket: DataPacket,
s: StoreAndForwardProtos.StoreAndForward,
myNodeNum: Int,
) {
Logger.d { "StoreAndForward: ${s.variantCase} ${s.rr} from ${dataPacket.from}" }
private fun handleReceivedStoreAndForward(dataPacket: DataPacket, s: StoreAndForward, myNodeNum: Int) {
Logger.d { "StoreAndForward: variant from ${dataPacket.from}" }
val transport = currentTransport()
val isHistory = s.variantCase == StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY
val lastRequest = if (isHistory) s.history.lastRequest else 0
val h = s.history
val lastRequest = h?.last_request ?: 0
val baseContext = "transport=$transport from=${dataPacket.from}"
historyLog { "rxStoreForward $baseContext variant=${s.variantCase} rr=${s.rr} lastRequest=$lastRequest" }
when (s.variantCase) {
StoreAndForwardProtos.StoreAndForward.VariantCase.STATS -> {
historyLog { "rxStoreForward $baseContext lastRequest=$lastRequest" }
when {
s.stats != null -> {
val text = s.stats.toString()
val u =
dataPacket.copy(
bytes = text.encodeToByteArray(),
dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
bytes = text.encodeToByteArray().toByteString(),
dataType = PortNum.TEXT_MESSAGE_APP.value,
)
rememberDataPacket(u, myNodeNum)
}
StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY -> {
val h = s.history
h != null -> {
@Suppress("MaxLineLength")
historyLog(Log.DEBUG) {
"routerHistory $baseContext messages=${h.historyMessages} window=${h.window} lastReq=${h.lastRequest}"
"routerHistory $baseContext messages=${h.history_messages} window=${h.window} lastReq=${h.last_request}"
}
val text =
"Total messages: ${h.historyMessages}\n" +
"Total messages: ${h.history_messages}\n" +
"History window: ${h.window.milliseconds.inWholeMinutes} min\n" +
"Last request: ${h.lastRequest}"
"Last request: ${h.last_request}"
val u =
dataPacket.copy(
bytes = text.encodeToByteArray(),
dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
bytes = text.encodeToByteArray().toByteString(),
dataType = PortNum.TEXT_MESSAGE_APP.value,
)
rememberDataPacket(u, myNodeNum)
historyManager.updateStoreForwardLastRequest("router_history", h.lastRequest, transport)
historyManager.updateStoreForwardLastRequest("router_history", h.last_request, transport)
}
StoreAndForwardProtos.StoreAndForward.VariantCase.HEARTBEAT -> {
val hb = s.heartbeat
s.heartbeat != null -> {
val hb = s.heartbeat!!
historyLog { "rxHeartbeat $baseContext period=${hb.period} secondary=${hb.secondary}" }
}
StoreAndForwardProtos.StoreAndForward.VariantCase.TEXT -> {
if (s.rr == StoreAndForwardProtos.StoreAndForward.RequestResponse.ROUTER_TEXT_BROADCAST) {
s.text != null -> {
if (s.rr == StoreAndForward.RequestResponse.ROUTER_TEXT_BROADCAST) {
dataPacket.to = DataPacket.ID_BROADCAST
}
@Suppress("MaxLineLength")
historyLog(Log.DEBUG) {
"rxText $baseContext id=${dataPacket.id} ts=${dataPacket.time} to=${dataPacket.to} decision=remember"
}
val u =
dataPacket.copy(bytes = s.text.toByteArray(), dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE)
val u = dataPacket.copy(bytes = s.text, dataType = PortNum.TEXT_MESSAGE_APP.value)
rememberDataPacket(u, myNodeNum)
}
else -> {}
@ -689,7 +703,7 @@ constructor(
}
private suspend fun PacketRepository.shouldFilterMessage(dataPacket: DataPacket, contactKey: String): Boolean {
if (dataPacket.dataType != Portnums.PortNum.TEXT_MESSAGE_APP_VALUE) return false
if (dataPacket.dataType != PortNum.TEXT_MESSAGE_APP.value) return false
val isFilteringDisabled = getContactSettings(contactKey).filteringDisabled
return messageFilterService.shouldFilter(dataPacket.text.orEmpty(), isFilteringDisabled)
}
@ -703,7 +717,7 @@ constructor(
val conversationMuted = packetRepository.get().getContactSettings(contactKey).isMuted
val nodeMuted = nodeManager.nodeDBbyID[dataPacket.from]?.isMuted == true
val isSilent = conversationMuted || nodeMuted
if (packet.port_num == Portnums.PortNum.ALERT_APP_VALUE && !isSilent) {
if (packet.port_num == PortNum.ALERT_APP.value && !isSilent) {
serviceNotifications.showAlertNotification(
contactKey,
getSenderName(dataPacket),
@ -717,18 +731,18 @@ constructor(
private fun getSenderName(packet: DataPacket): String {
if (packet.from == DataPacket.ID_LOCAL) {
val myId = nodeManager.getMyId()
return nodeManager.nodeDBbyID[myId]?.user?.longName ?: getString(Res.string.unknown_username)
return nodeManager.nodeDBbyID[myId]?.user?.long_name ?: getString(Res.string.unknown_username)
}
return nodeManager.nodeDBbyID[packet.from]?.user?.longName ?: getString(Res.string.unknown_username)
return nodeManager.nodeDBbyID[packet.from]?.user?.long_name ?: getString(Res.string.unknown_username)
}
private suspend fun updateNotification(contactKey: String, dataPacket: DataPacket, isSilent: Boolean) {
when (dataPacket.dataType) {
Portnums.PortNum.TEXT_MESSAGE_APP_VALUE -> {
PortNum.TEXT_MESSAGE_APP.value -> {
val message = dataPacket.text!!
val channelName =
if (dataPacket.to == DataPacket.ID_BROADCAST) {
radioConfigRepository.channelSetFlow.first().settingsList.getOrNull(dataPacket.channel)?.name
radioConfigRepository.channelSetFlow.first().settings.getOrNull(dataPacket.channel)?.name
} else {
null
}
@ -742,7 +756,7 @@ constructor(
)
}
Portnums.PortNum.WAYPOINT_APP_VALUE -> {
PortNum.WAYPOINT_APP.value -> {
val message = getString(Res.string.waypoint_received, dataPacket.waypoint!!.name)
serviceNotifications.updateWaypointNotification(
contactKey,
@ -759,24 +773,25 @@ constructor(
@Suppress("LongMethod", "KotlinConstantConditions")
private fun rememberReaction(packet: MeshPacket) = scope.handledLaunch {
val emoji = packet.decoded.payload.toByteArray().decodeToString()
val decoded = packet.decoded ?: return@handledLaunch
val emoji = decoded.payload.toByteArray().decodeToString()
val fromId = dataMapper.toNodeID(packet.from)
val toId = dataMapper.toNodeID(packet.to)
val reaction =
ReactionEntity(
myNodeNum = nodeManager.myNodeNum ?: 0,
replyId = packet.decoded.replyId,
replyId = decoded.reply_id,
userId = fromId,
emoji = emoji,
timestamp = System.currentTimeMillis(),
snr = packet.rxSnr,
rssi = packet.rxRssi,
snr = packet.rx_snr,
rssi = packet.rx_rssi,
hopsAway =
if (packet.hopStart == 0 || packet.hopLimit > packet.hopStart) {
if (packet.hop_start == 0 || packet.hop_limit > packet.hop_start) {
HOPS_AWAY_UNAVAILABLE
} else {
packet.hopStart - packet.hopLimit
packet.hop_start - packet.hop_limit
},
packetId = packet.id,
status = MessageStatus.RECEIVED,
@ -788,7 +803,7 @@ constructor(
val existingReactions = packetRepository.get().findReactionsWithId(packet.id)
if (existingReactions.isNotEmpty()) {
Logger.d {
"Skipping duplicate reaction: packetId=${packet.id} replyId=${packet.decoded.replyId} " +
"Skipping duplicate reaction: packetId=${packet.id} replyId=${decoded.reply_id} " +
"from=$fromId emoji=$emoji (already have ${existingReactions.size} reaction(s))"
}
return@handledLaunch
@ -797,7 +812,7 @@ constructor(
packetRepository.get().insertReaction(reaction)
// Find the original packet to get the contactKey
packetRepository.get().getPacketByPacketId(packet.decoded.replyId)?.let { original ->
packetRepository.get().getPacketByPacketId(decoded.reply_id)?.let { original ->
// Skip notification if the original message was filtered
if (original.packet.filtered) return@let
@ -811,7 +826,7 @@ constructor(
if (original.packet.data.to == DataPacket.ID_BROADCAST) {
radioConfigRepository.channelSetFlow
.first()
.settingsList
.settings
.getOrNull(original.packet.data.channel)
?.name
} else {