MeshPackets.swift into an actor and make async

This commit is contained in:
Jake-B 2026-01-22 18:00:51 -05:00
parent beb990e6ef
commit 140c1ab734
5 changed files with 1191 additions and 1142 deletions

View file

@ -1,5 +1,5 @@
{
"originHash" : "2569905853aec088d5bac6b540eac77f78963f88b406e8dd95a88c40623cc8b4",
"originHash" : "8c5f88cc59cef0f9e938df2478c1365017c6f2330f0482d58d917ab2eb144fda",
"pins" : [
{
"identity" : "cocoamqtt",
@ -15,8 +15,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/DataDog/dd-sdk-ios.git",
"state" : {
"revision" : "d0a42d8067665cb6ee86af51251ccc071f62bd54",
"version" : "2.29.0"
"revision" : "8d67e973ff4a958cb536263cb816646ee904c508",
"version" : "3.3.0"
}
},
{
@ -60,8 +60,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-protobuf.git",
"state" : {
"revision" : "102a647b573f60f73afdce5613a51d71349fe507",
"version" : "1.30.0"
"revision" : "c169a5744230951031770e27e475ff6eefe51f9d",
"version" : "1.33.3"
}
}
],

View file

@ -61,7 +61,7 @@ extension AccessoryManager {
self.updateState(.communicating)
self.connectionEventTask = Task {
for await event in eventStream {
self.didReceive(event)
await self.didReceive(event)
}
Logger.transport.info("[Accessory] Event stream closed")
}

View file

@ -65,7 +65,7 @@ extension AccessoryManager {
Logger.services.error("⚠️ Client Notification: \(clientNotification.message, privacy: .public)")
}
func handleMyInfo(_ myNodeInfo: MyNodeInfo) {
func handleMyInfo(_ myNodeInfo: MyNodeInfo) async {
// TODO: this works for connections like BLE that have a uniqueId, but what about ones like serial?
guard let connectedDeviceId = activeConnection?.device.id.uuidString else {
Logger.services.error("⚠️ Failed to decode MyInfo, no connected device ID")
@ -75,7 +75,8 @@ extension AccessoryManager {
updateDevice(key: \.num, value: Int64(myNodeInfo.myNodeNum))
if let myInfo = myInfoPacket(myInfo: myNodeInfo, peripheralId: connectedDeviceId, context: context) {
if let myInfoId = await MeshPackets.shared.myInfoPacket(myInfo: myNodeInfo, peripheralId: connectedDeviceId),
let myInfo = try? context.existingObject(with: myInfoId) as? MyInfoEntity {
if let bleName = myInfo.bleName {
updateDevice(key: \.name, value: bleName)
updateDevice(key: \.longName, value: bleName)
@ -95,7 +96,7 @@ extension AccessoryManager {
}
func handleNodeInfo(_ nodeInfo: NodeInfo) {
func handleNodeInfo(_ nodeInfo: NodeInfo) async {
if let continuation = self.firstDatabaseNodeInfoContinuation {
continuation.resume()
self.firstDatabaseNodeInfoContinuation = nil
@ -107,10 +108,13 @@ extension AccessoryManager {
}
// Check if we're in database retrieval mode to defer saves for performance
let isRetrievingDatabase = if case .retrievingDatabase = self.state { true } else { false }
// Commented out: No need to defer save when nodeInfoPacket is now happening off the main thread
// let isRetrievingDatabase = if case .retrievingDatabase = self.state { true } else { false }
// TODO: nodeInfoPacket's channel: parameter is not used
if let nodeInfo = nodeInfoPacket(nodeInfo: nodeInfo, channel: 0, context: context, deferSave: isRetrievingDatabase) {
// deferSave hard coded: No need to defer save when nodeInfoPacket is now happening off the main thread
if let nodeInfoId = await MeshPackets.shared.nodeInfoPacket(nodeInfo: nodeInfo, channel: 0, deferSave: false),
let nodeInfo = try? context.existingObject(with: nodeInfoId) as? NodeInfoEntity {
if let activeDevice = activeConnection?.device, activeDevice.num == nodeInfo.num {
if let user = nodeInfo.user {
updateDevice(deviceId: activeDevice.id, key: \.shortName, value: user.shortName ?? "?")
@ -136,24 +140,24 @@ extension AccessoryManager {
}
func handleChannel(_ channel: Channel) {
func handleChannel(_ channel: Channel) async {
guard let deviceNum = activeConnection?.device.num else {
Logger.data.error("Attempt to process channel information when no connected device.")
return
}
channelPacket(channel: channel, fromNum: Int64(truncatingIfNeeded: deviceNum), context: context)
await MeshPackets.shared.channelPacket(channel: channel, fromNum: Int64(truncatingIfNeeded: deviceNum))
}
func handleConfig(_ config: Config) {
func handleConfig(_ config: Config) async {
guard let device = activeConnection?.device, let deviceNum = device.num, let longName = device.longName else {
Logger.data.error("Attempt to process channel information when no connected device.")
return
}
// Local config parses out the variants. Should we do that here maybe?
localConfig(config: config, context: context, nodeNum: Int64(truncatingIfNeeded: deviceNum), nodeLongName: longName)
await MeshPackets.shared.localConfig(config: config, nodeNum: Int64(truncatingIfNeeded: deviceNum), nodeLongName: longName)
// Handle Timezone
if config.payloadVariant == Config.OneOf_PayloadVariant.device(config.device) {
@ -167,12 +171,12 @@ extension AccessoryManager {
}
}
func handleModuleConfig(_ moduleConfigPacket: ModuleConfig) {
func handleModuleConfig(_ moduleConfigPacket: ModuleConfig) async {
guard let device = activeConnection?.device, let deviceNum = device.num, let longName = device.longName else {
Logger.services.error("Attempt to process channel information when no connected device.")
return
}
moduleConfig(config: moduleConfigPacket, context: context, nodeNum: Int64(truncatingIfNeeded: deviceNum), nodeLongName: longName)
await MeshPackets.shared.moduleConfig(config: moduleConfigPacket, nodeNum: Int64(truncatingIfNeeded: deviceNum), nodeLongName: longName)
// Get Canned Message Message List if the Module is Canned Messages
if moduleConfigPacket.payloadVariant == ModuleConfig.OneOf_PayloadVariant.cannedMessage(moduleConfigPacket.cannedMessage) {
try? getCannedMessageModuleMessages(destNum: deviceNum, wantResponse: true)
@ -183,7 +187,7 @@ extension AccessoryManager {
}
}
func handleDeviceMetadata(_ metadata: DeviceMetadata) {
func handleDeviceMetadata(_ metadata: DeviceMetadata) async {
// Note: moved firmware version check to be inline with connection process
guard let device = activeConnection?.device, let deviceNum = device.num else {
Logger.services.error("Attempt to process device metadata information when no connected device.")
@ -194,7 +198,7 @@ extension AccessoryManager {
updateDevice(key: \.firmwareVersion, value: metadata.firmwareVersion)
deviceMetadataPacket(metadata: metadata, fromNum: deviceNum, context: context)
await MeshPackets.shared.deviceMetadataPacket(metadata: metadata, fromNum: deviceNum)
}
internal func tryClearExistingChannels() {
@ -225,17 +229,16 @@ extension AccessoryManager {
}
func handleTextMessageAppPacket(_ packet: MeshPacket) {
func handleTextMessageAppPacket(_ packet: MeshPacket) async {
guard let device = activeConnection?.device, let deviceNum = device.num else {
Logger.services.error("Attempt to handle text message when no connected device.")
return
}
textMessageAppPacket(
await MeshPackets.shared.textMessageAppPacket(
packet: packet,
wantRangeTestPackets: wantRangeTestPackets,
connectedNode: deviceNum,
context: context,
appState: appState
)
@ -320,25 +323,27 @@ extension AccessoryManager {
case .UNRECOGNIZED:
Logger.mesh.info("\("📮 Store and Forward \(storeAndForwardMessage.rr) message received from \(packet.from.toHex())")")
case .routerTextDirect:
Logger.mesh.info("\("💬 Store and Forward \(storeAndForwardMessage.rr) message received from \(packet.from.toHex())")")
textMessageAppPacket(
packet: packet,
wantRangeTestPackets: false,
connectedNode: connectedNodeNum,
storeForward: true,
context: context,
appState: appState
)
Task {
Logger.mesh.info("\("💬 Store and Forward \(storeAndForwardMessage.rr) message received from \(packet.from.toHex())")")
await MeshPackets.shared.textMessageAppPacket(
packet: packet,
wantRangeTestPackets: false,
connectedNode: connectedNodeNum,
storeForward: true,
appState: appState
)
}
case .routerTextBroadcast:
Logger.mesh.info("\("✉️ Store and Forward \(storeAndForwardMessage.rr) message received from \(packet.from.toHex())")")
textMessageAppPacket(
packet: packet,
wantRangeTestPackets: false,
connectedNode: connectedNodeNum,
storeForward: true,
context: context,
appState: appState
)
Task {
Logger.mesh.info("\("✉️ Store and Forward \(storeAndForwardMessage.rr) message received from \(packet.from.toHex())")")
await MeshPackets.shared.textMessageAppPacket(
packet: packet,
wantRangeTestPackets: false,
connectedNode: connectedNodeNum,
storeForward: true,
appState: appState
)
}
}
}
}

View file

@ -369,13 +369,13 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
}
}
func didReceive(_ event: ConnectionEvent) {
func didReceive(_ event: ConnectionEvent) async {
packetsReceived += 1
switch event {
case .data(let fromRadio):
// Logger.transport.info(" [Accessory] didReceive: \(fromRadio.payloadVariant.debugDescription)")
self.processFromRadio(fromRadio)
await self.processFromRadio(fromRadio)
Task {
await self.heartbeatResponseTimer?.cancel(withReason: "Data packet received")
await self.heartbeatTimer?.reset(delay: .seconds(15.0))
@ -483,7 +483,7 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
}
}
private func processFromRadio(_ decodedInfo: FromRadio) {
private func processFromRadio(_ decodedInfo: FromRadio) async {
switch decodedInfo.payloadVariant {
case .mqttClientProxyMessage(let mqttClientProxyMessage):
handleMqttClientProxyMessage(mqttClientProxyMessage)
@ -492,7 +492,7 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
handleClientNotification(clientNotification)
case .myInfo(let myNodeInfo):
handleMyInfo(myNodeInfo)
await handleMyInfo(myNodeInfo)
case .packet(let packet):
// All received packets get passed through updateAnyPacketFrom to update lastHeard, rxSnr, etc. (like firmware's NodeDB::updateFrom).
@ -506,13 +506,13 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
if case let .decoded(data) = packet.payloadVariant {
switch data.portnum {
case .textMessageApp, .detectionSensorApp, .alertApp:
handleTextMessageAppPacket(packet)
await handleTextMessageAppPacket(packet)
case .remoteHardwareApp:
Logger.mesh.info("🕸️ MESH PACKET received for Remote Hardware App UNHANDLED \((try? decodedInfo.packet.jsonString()) ?? "JSON Decode Failure", privacy: .public)")
case .positionApp:
upsertPositionPacket(packet: packet, context: context)
case .waypointApp:
waypointPacket(packet: packet, context: context)
await MeshPackets.shared.waypointPacket(packet: packet)
case .nodeinfoApp:
guard let connectedNodeNum = self.activeDeviceNum else {
Logger.mesh.error("🕸️ Unable to determine connectedNodeNum for node info upsert.")
@ -528,16 +528,16 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
Logger.mesh.error("🕸️ No active connection. Unable to determine connectedNodeNum for routingPacket.")
return
}
routingPacket(packet: packet, connectedNodeNum: deviceNum, context: context)
await MeshPackets.shared.routingPacket(packet: packet, connectedNodeNum: deviceNum)
case .adminApp:
adminAppPacket(packet: packet, context: context)
await MeshPackets.shared.adminAppPacket(packet: packet)
case .replyApp:
Logger.mesh.info("🕸️ MESH PACKET received for Reply App handling as a text message")
guard let deviceNum = activeConnection?.device.num else {
Logger.mesh.error("🕸️ No active connection. Unable to determine connectedNodeNum for replyApp.")
return
}
textMessageAppPacket(packet: packet, wantRangeTestPackets: wantRangeTestPackets, connectedNode: deviceNum, context: context, appState: appState)
await MeshPackets.shared.textMessageAppPacket(packet: packet, wantRangeTestPackets: wantRangeTestPackets, connectedNode: deviceNum, appState: appState)
case .ipTunnelApp:
Logger.mesh.info("🕸️ MESH PACKET received for IP Tunnel App UNHANDLED UNHANDLED")
case .serialApp:
@ -554,11 +554,10 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
return
}
if wantRangeTestPackets {
textMessageAppPacket(
await MeshPackets.shared.textMessageAppPacket(
packet: packet,
wantRangeTestPackets: true,
connectedNode: deviceNum,
context: context,
appState: appState
)
} else {
@ -569,7 +568,7 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
Logger.mesh.error("🕸️ No active connection. Unable to determine connectedNodeNum for telemetryApp.")
return
}
telemetryPacket(packet: packet, connectedNode: deviceNum, context: context)
await MeshPackets.shared.telemetryPacket(packet: packet, connectedNode: deviceNum)
case .textMessageCompressedApp:
Logger.mesh.info("🕸️ MESH PACKET received for Text Message Compressed App UNHANDLED")
case .zpsApp:
@ -591,7 +590,7 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
Logger.mesh.info("🕸️ MESH PACKET received for Neighbor Info App UNHANDLED \((try? neighborInfo.jsonString()) ?? "JSON Decode Failure", privacy: .public)")
}
case .paxcounterApp:
paxCounterPacket(packet: decodedInfo.packet, context: context)
await MeshPackets.shared.paxCounterPacket(packet: decodedInfo.packet)
case .mapReportApp:
Logger.mesh.info("🕸️ MESH PACKET received Map Report App UNHANDLED \((try? decodedInfo.packet.jsonString()) ?? "JSON Decode Failure", privacy: .public)")
case .UNRECOGNIZED:
@ -614,19 +613,19 @@ class AccessoryManager: ObservableObject, MqttClientProxyManagerDelegate {
}
case .nodeInfo(let nodeInfo):
handleNodeInfo(nodeInfo)
await handleNodeInfo(nodeInfo)
case .channel(let channel):
handleChannel(channel)
await handleChannel(channel)
case .config(let config):
handleConfig(config)
await handleConfig(config)
case .moduleConfig(let moduleConfig):
handleModuleConfig(moduleConfig)
await handleModuleConfig(moduleConfig)
case .metadata(let metadata):
handleDeviceMetadata(metadata)
await handleDeviceMetadata(metadata)
case .deviceuiConfig:
#if DEBUG

File diff suppressed because it is too large Load diff