mirror of
https://github.com/meshtastic/Meshtastic-Apple.git
synced 2026-04-20 22:13:56 +00:00
874 lines
34 KiB
Swift
874 lines
34 KiB
Swift
|
|
//
|
||
|
|
// MessageRetryQueueManager.swift
|
||
|
|
// Meshtastic
|
||
|
|
//
|
||
|
|
// Retry queue manager using Swift actors and Task scheduling
|
||
|
|
//
|
||
|
|
|
||
|
|
import Foundation
|
||
|
|
import CoreData
|
||
|
|
import MeshtasticProtobufs
|
||
|
|
import OSLog
|
||
|
|
import Combine
|
||
|
|
|
||
|
|
enum MessageType: String, Codable, CaseIterable {
|
||
|
|
case text = "text"
|
||
|
|
case position = "position"
|
||
|
|
case waypoint = "waypoint"
|
||
|
|
case admin = "admin"
|
||
|
|
case traceroute = "traceroute"
|
||
|
|
case nodeInfo = "nodeInfo"
|
||
|
|
case unknown = "unknown"
|
||
|
|
}
|
||
|
|
|
||
|
|
enum RetryState: String, Codable {
|
||
|
|
case pending = "pending"
|
||
|
|
case sending = "sending"
|
||
|
|
case waitingForAck = "waitingForAck"
|
||
|
|
case completed = "completed"
|
||
|
|
case failed = "failed"
|
||
|
|
case cancelled = "cancelled"
|
||
|
|
}
|
||
|
|
|
||
|
|
struct RetryQueueItem: Identifiable, Hashable {
|
||
|
|
let id: UUID
|
||
|
|
let originalMessageId: Int64
|
||
|
|
let messageType: MessageType
|
||
|
|
let serializedPacket: Data? // Full MeshPacket serialized for retry
|
||
|
|
let createdAt: Date
|
||
|
|
|
||
|
|
var retryCount: Int
|
||
|
|
var state: RetryState
|
||
|
|
var nextRetryDate: Date
|
||
|
|
var lastError: String?
|
||
|
|
var currentPacketId: UInt32? // Track the current packet ID being sent for ACK lookup
|
||
|
|
var packetIdHistory: [UInt32] // Track all packet IDs associated with this retry chain
|
||
|
|
|
||
|
|
func hash(into hasher: inout Hasher) {
|
||
|
|
hasher.combine(id)
|
||
|
|
}
|
||
|
|
|
||
|
|
init(
|
||
|
|
id: UUID = UUID(),
|
||
|
|
originalMessageId: Int64,
|
||
|
|
messageType: MessageType,
|
||
|
|
serializedPacket: Data? = nil
|
||
|
|
) {
|
||
|
|
self.id = id
|
||
|
|
self.originalMessageId = originalMessageId
|
||
|
|
self.messageType = messageType
|
||
|
|
self.serializedPacket = serializedPacket
|
||
|
|
self.createdAt = Date()
|
||
|
|
|
||
|
|
// If an item is in the retry queue, we're scheduling at least the first retry.
|
||
|
|
// retryCount is 1-based: 1 = first retry (attempt 2/3), 2 = second retry (attempt 3/3)
|
||
|
|
self.retryCount = 1
|
||
|
|
self.state = .pending
|
||
|
|
self.nextRetryDate = Date().addingTimeInterval(messageType == .traceroute ? 30 : 10)
|
||
|
|
self.currentPacketId = nil
|
||
|
|
self.packetIdHistory = [UInt32(truncatingIfNeeded: originalMessageId)]
|
||
|
|
}
|
||
|
|
|
||
|
|
// Convenience initializers for backwards compatibility
|
||
|
|
init(
|
||
|
|
id: UUID = UUID(),
|
||
|
|
originalMessageId: Int64,
|
||
|
|
messageType: MessageType,
|
||
|
|
payload: Data,
|
||
|
|
portNum: PortNum,
|
||
|
|
toUserNum: Int64,
|
||
|
|
channel: Int32,
|
||
|
|
isEmoji: Bool = false,
|
||
|
|
replyID: Int64 = 0,
|
||
|
|
pkiEncrypted: Bool = false,
|
||
|
|
publicKey: Data? = nil,
|
||
|
|
originalPayload: String? = nil,
|
||
|
|
hopLimit: UInt32? = nil
|
||
|
|
) {
|
||
|
|
self.id = id
|
||
|
|
self.originalMessageId = originalMessageId
|
||
|
|
self.messageType = messageType
|
||
|
|
self.serializedPacket = nil
|
||
|
|
self.createdAt = Date()
|
||
|
|
|
||
|
|
self.retryCount = 1
|
||
|
|
self.state = .pending
|
||
|
|
self.nextRetryDate = Date().addingTimeInterval(messageType == .traceroute ? 30 : 10)
|
||
|
|
self.currentPacketId = nil
|
||
|
|
self.packetIdHistory = [UInt32(truncatingIfNeeded: originalMessageId)]
|
||
|
|
}
|
||
|
|
|
||
|
|
var normalizedRetryCount: Int { max(1, retryCount) }
|
||
|
|
|
||
|
|
// Display attempt number: original send = 1, first retry = 2, second retry = 3
|
||
|
|
var displayAttemptNumber: Int {
|
||
|
|
normalizedRetryCount + 1
|
||
|
|
}
|
||
|
|
|
||
|
|
func matchesPacketId(_ packetId: UInt32) -> Bool {
|
||
|
|
currentPacketId == packetId || packetIdHistory.contains(packetId)
|
||
|
|
}
|
||
|
|
|
||
|
|
static func == (lhs: RetryQueueItem, rhs: RetryQueueItem) -> Bool {
|
||
|
|
lhs.id == rhs.id
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
actor MessageRetryQueueManager {
|
||
|
|
static let shared = MessageRetryQueueManager()
|
||
|
|
|
||
|
|
// Posted whenever queue state changes (for UI refresh)
|
||
|
|
nonisolated static let didUpdateNotification = Foundation.Notification.Name("MessageRetryQueueManager.didUpdate")
|
||
|
|
|
||
|
|
private var queue: [RetryQueueItem] = []
|
||
|
|
private var failedMessageIds: Set<Int64> = [] // Track messages that have exhausted retries
|
||
|
|
private var processingTask: Task<Void, Never>?
|
||
|
|
private var cancellables = Set<AnyCancellable>()
|
||
|
|
|
||
|
|
private(set) var pendingCount: Int = 0
|
||
|
|
|
||
|
|
let maxRetries = 2
|
||
|
|
private let retryDelays: [TimeInterval] = [10, 20] // First retry at 10s, second at 20s
|
||
|
|
private let tracerouteRetryDelays: [TimeInterval] = [30, 60] // Traceroute retries with 30s and 60s delays
|
||
|
|
private let minimumRetrySpacing: TimeInterval = 10
|
||
|
|
private let queueProcessingInterval: TimeInterval = 1.0
|
||
|
|
private let tracerouteCooldown: TimeInterval = 30.0 // Traceroute has 30s rate limit
|
||
|
|
|
||
|
|
private var lastTracerouteRetryTime: Date?
|
||
|
|
|
||
|
|
private func setCurrentPacketId(for itemId: UUID, packetId: UInt32) {
|
||
|
|
if let index = queue.firstIndex(where: { $0.id == itemId }) {
|
||
|
|
queue[index].currentPacketId = packetId
|
||
|
|
if !queue[index].packetIdHistory.contains(packetId) {
|
||
|
|
queue[index].packetIdHistory.append(packetId)
|
||
|
|
}
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private func notifyUpdate() {
|
||
|
|
Task { @MainActor in
|
||
|
|
NotificationCenter.default.post(name: MessageRetryQueueManager.didUpdateNotification, object: nil)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private init() {
|
||
|
|
Task { [weak self] in
|
||
|
|
guard let self = self else { return }
|
||
|
|
await self.startQueueProcessor()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func startQueueProcessor() {
|
||
|
|
guard processingTask == nil else { return }
|
||
|
|
|
||
|
|
processingTask = Task { [weak self] in
|
||
|
|
guard let self = self else { return }
|
||
|
|
|
||
|
|
while !Task.isCancelled {
|
||
|
|
await self.processQueue()
|
||
|
|
try? await Task.sleep(nanoseconds: UInt64(self.queueProcessingInterval * 1_000_000_000))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
Logger.mesh.info("📬 Message retry queue processor started")
|
||
|
|
}
|
||
|
|
|
||
|
|
func stopQueueProcessor() {
|
||
|
|
processingTask?.cancel()
|
||
|
|
processingTask = nil
|
||
|
|
Logger.mesh.info("📬 Message retry queue processor stopped")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
|
||
|
|
func processQueue() async {
|
||
|
|
guard await AccessoryManager.shared.isConnected else {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
let now = Date()
|
||
|
|
var itemsToProcess: [RetryQueueItem] = []
|
||
|
|
|
||
|
|
for item in queue where item.state == .pending && item.nextRetryDate <= now {
|
||
|
|
// Check traceroute cooldown
|
||
|
|
if item.messageType == .traceroute, let lastTime = lastTracerouteRetryTime {
|
||
|
|
let timeSinceLastTraceroute = now.timeIntervalSince(lastTime)
|
||
|
|
if timeSinceLastTraceroute < tracerouteCooldown {
|
||
|
|
// Skip this traceroute retry, will be picked up in next processing cycle
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
}
|
||
|
|
itemsToProcess.append(item)
|
||
|
|
}
|
||
|
|
|
||
|
|
for item in itemsToProcess {
|
||
|
|
guard !Task.isCancelled else { break }
|
||
|
|
|
||
|
|
// Update traceroute cooldown tracker
|
||
|
|
if item.messageType == .traceroute {
|
||
|
|
lastTracerouteRetryTime = Date()
|
||
|
|
}
|
||
|
|
|
||
|
|
await processItem(item)
|
||
|
|
}
|
||
|
|
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
}
|
||
|
|
|
||
|
|
private func processItem(_ item: RetryQueueItem) async {
|
||
|
|
guard item.state == .pending else { return }
|
||
|
|
|
||
|
|
updateItemState(item.id, state: .sending)
|
||
|
|
|
||
|
|
do {
|
||
|
|
switch item.messageType {
|
||
|
|
case .text:
|
||
|
|
if item.serializedPacket != nil {
|
||
|
|
try await resendFromSerializedPacket(item)
|
||
|
|
} else {
|
||
|
|
try await resendTextMessage(item)
|
||
|
|
}
|
||
|
|
case .position:
|
||
|
|
try await resendPositionMessage(item)
|
||
|
|
case .waypoint:
|
||
|
|
if item.serializedPacket != nil {
|
||
|
|
try await resendFromSerializedPacket(item)
|
||
|
|
} else {
|
||
|
|
try await resendWaypointMessage(item)
|
||
|
|
}
|
||
|
|
case .admin:
|
||
|
|
if item.serializedPacket != nil {
|
||
|
|
try await resendFromSerializedPacket(item)
|
||
|
|
} else {
|
||
|
|
Logger.mesh.warning("Admin message retry with no payload")
|
||
|
|
updateItemState(item.id, state: .failed)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
case .traceroute:
|
||
|
|
try await resendTracerouteMessage(item)
|
||
|
|
case .nodeInfo:
|
||
|
|
if item.serializedPacket != nil {
|
||
|
|
try await resendFromSerializedPacket(item)
|
||
|
|
} else {
|
||
|
|
Logger.mesh.warning("Node info retry with no payload")
|
||
|
|
updateItemState(item.id, state: .failed)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
case .unknown:
|
||
|
|
if item.serializedPacket != nil {
|
||
|
|
try await resendFromSerializedPacket(item)
|
||
|
|
} else {
|
||
|
|
Logger.mesh.warning("Unknown message retry with no payload")
|
||
|
|
updateItemState(item.id, state: .failed)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
updateItemState(item.id, state: .waitingForAck)
|
||
|
|
|
||
|
|
Logger.mesh.info("📬 Message \(item.originalMessageId) attempt \(item.displayAttemptNumber)/\(self.maxRetries + 1) sent successfully")
|
||
|
|
|
||
|
|
} catch {
|
||
|
|
Logger.mesh.error("📬 Failed to retry message \(item.originalMessageId): \(error.localizedDescription, privacy: .public)")
|
||
|
|
|
||
|
|
let newRetryCount = item.normalizedRetryCount + 1
|
||
|
|
if newRetryCount > maxRetries {
|
||
|
|
updateItemState(item.id, state: .failed)
|
||
|
|
updateItemError(item.id, error: error.localizedDescription)
|
||
|
|
// Track that this message has exhausted its retries
|
||
|
|
failedMessageIds.insert(item.originalMessageId)
|
||
|
|
} else {
|
||
|
|
let delay = retryDelay(for: item.messageType, retryCount: newRetryCount)
|
||
|
|
updateItemRetry(item.id, retryCount: newRetryCount, nextRetryDate: Date().addingTimeInterval(delay))
|
||
|
|
updateItemState(item.id, state: .pending)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private func retryDelay(for messageType: MessageType, retryCount: Int) -> TimeInterval {
|
||
|
|
switch messageType {
|
||
|
|
case .traceroute:
|
||
|
|
return tracerouteRetryDelays[safe: retryCount - 1] ?? tracerouteCooldown
|
||
|
|
default:
|
||
|
|
return retryDelays[safe: retryCount - 1] ?? 20
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private func resendFromSerializedPacket(_ item: RetryQueueItem) async throws {
|
||
|
|
guard let serializedData = item.serializedPacket else {
|
||
|
|
throw AccessoryError.appError("No serialized packet for retry")
|
||
|
|
}
|
||
|
|
|
||
|
|
var meshPacket = try MeshPacket(serializedData: serializedData)
|
||
|
|
let newMessageId = UInt32.random(in: UInt32(UInt8.max)..<UInt32.max)
|
||
|
|
meshPacket.id = newMessageId
|
||
|
|
|
||
|
|
setCurrentPacketId(for: item.id, packetId: newMessageId)
|
||
|
|
|
||
|
|
var toRadio = ToRadio()
|
||
|
|
toRadio.packet = meshPacket
|
||
|
|
|
||
|
|
try await AccessoryManager.shared.send(toRadio, debugDescription: "Retry \(item.messageType) for message \(item.originalMessageId)")
|
||
|
|
}
|
||
|
|
|
||
|
|
private func resendTextMessage(_ item: RetryQueueItem) async throws {
|
||
|
|
// Try to get the original message from the database
|
||
|
|
let context = PersistenceController.shared.container.viewContext
|
||
|
|
let fetchRequest = MessageEntity.fetchRequest()
|
||
|
|
fetchRequest.predicate = NSPredicate(format: "messageId == %lld", item.originalMessageId)
|
||
|
|
|
||
|
|
var originalPayload: String?
|
||
|
|
var channel: Int32 = 0
|
||
|
|
var isEmoji: Bool = false
|
||
|
|
var replyID: Int64 = 0
|
||
|
|
var toUserNum: Int64 = 0
|
||
|
|
|
||
|
|
do {
|
||
|
|
let fetchedMessages = try context.fetch(fetchRequest)
|
||
|
|
if let message = fetchedMessages.first {
|
||
|
|
originalPayload = message.messagePayload
|
||
|
|
channel = message.channel
|
||
|
|
isEmoji = message.isEmoji
|
||
|
|
replyID = message.replyID
|
||
|
|
toUserNum = message.toUser?.num ?? 0
|
||
|
|
}
|
||
|
|
} catch {
|
||
|
|
Logger.mesh.error("📬 Failed to fetch message for retry: \(error.localizedDescription, privacy: .public)")
|
||
|
|
}
|
||
|
|
|
||
|
|
guard originalPayload != nil else {
|
||
|
|
throw AccessoryError.appError("Missing message payload")
|
||
|
|
}
|
||
|
|
|
||
|
|
let newMessageId = UInt32.random(in: UInt32(UInt8.max)..<UInt32.max)
|
||
|
|
|
||
|
|
setCurrentPacketId(for: item.id, packetId: newMessageId)
|
||
|
|
|
||
|
|
var meshPacket = MeshPacket()
|
||
|
|
meshPacket.id = newMessageId
|
||
|
|
|
||
|
|
if toUserNum > 0 {
|
||
|
|
meshPacket.to = UInt32(toUserNum)
|
||
|
|
} else {
|
||
|
|
meshPacket.to = Constants.maximumNodeNum
|
||
|
|
}
|
||
|
|
|
||
|
|
meshPacket.channel = UInt32(channel)
|
||
|
|
meshPacket.from = UInt32(await AccessoryManager.shared.activeDeviceNum ?? 0)
|
||
|
|
meshPacket.wantAck = true
|
||
|
|
|
||
|
|
var dataMessage = DataMessage()
|
||
|
|
if let payloadData = originalPayload?.data(using: .utf8) {
|
||
|
|
dataMessage.payload = payloadData
|
||
|
|
}
|
||
|
|
dataMessage.portnum = .textMessageApp
|
||
|
|
dataMessage.emoji = isEmoji ? 1 : 0
|
||
|
|
if replyID > 0 {
|
||
|
|
dataMessage.replyID = UInt32(replyID)
|
||
|
|
}
|
||
|
|
|
||
|
|
meshPacket.decoded = dataMessage
|
||
|
|
|
||
|
|
var toRadio = ToRadio()
|
||
|
|
toRadio.packet = meshPacket
|
||
|
|
try await AccessoryManager.shared.send(toRadio, debugDescription: "Retry message \(item.originalMessageId) -> \(newMessageId)")
|
||
|
|
}
|
||
|
|
|
||
|
|
private func resendPositionMessage(_ item: RetryQueueItem) async throws {
|
||
|
|
guard let fromNodeNum = await AccessoryManager.shared.activeConnection?.device.num else {
|
||
|
|
throw AccessoryError.ioFailed("Not connected to any device")
|
||
|
|
}
|
||
|
|
|
||
|
|
guard let positionPacket = try await AccessoryManager.shared.getPositionFromPhoneGPS(destNum: fromNodeNum, fixedPosition: false) else {
|
||
|
|
throw AccessoryError.appError("Unable to get position data")
|
||
|
|
}
|
||
|
|
|
||
|
|
var meshPacket = MeshPacket()
|
||
|
|
let newMessageId = UInt32.random(in: UInt32(UInt8.max)..<UInt32.max)
|
||
|
|
meshPacket.id = newMessageId
|
||
|
|
meshPacket.to = Constants.maximumNodeNum
|
||
|
|
meshPacket.channel = 0
|
||
|
|
meshPacket.from = UInt32(fromNodeNum)
|
||
|
|
meshPacket.wantAck = true
|
||
|
|
|
||
|
|
setCurrentPacketId(for: item.id, packetId: newMessageId)
|
||
|
|
|
||
|
|
var dataMessage = DataMessage()
|
||
|
|
if let serializedData = try? positionPacket.serializedData() {
|
||
|
|
dataMessage.payload = serializedData
|
||
|
|
dataMessage.portnum = PortNum.positionApp
|
||
|
|
meshPacket.decoded = dataMessage
|
||
|
|
} else {
|
||
|
|
throw AccessoryError.ioFailed("Failed to serialize position packet")
|
||
|
|
}
|
||
|
|
|
||
|
|
var toRadio = ToRadio()
|
||
|
|
toRadio.packet = meshPacket
|
||
|
|
try await AccessoryManager.shared.send(toRadio, debugDescription: "Retry position for message \(item.originalMessageId)")
|
||
|
|
}
|
||
|
|
|
||
|
|
private func resendWaypointMessage(_ item: RetryQueueItem) async throws {
|
||
|
|
Logger.mesh.warning("📬 Waypoint retry - requires serialized packet for full implementation")
|
||
|
|
|
||
|
|
guard let serializedData = item.serializedPacket else {
|
||
|
|
throw AccessoryError.appError("No serialized packet for waypoint retry")
|
||
|
|
}
|
||
|
|
|
||
|
|
var meshPacket = try MeshPacket(serializedData: serializedData)
|
||
|
|
let newMessageId = UInt32.random(in: UInt32(UInt8.max)..<UInt32.max)
|
||
|
|
meshPacket.id = newMessageId
|
||
|
|
|
||
|
|
setCurrentPacketId(for: item.id, packetId: newMessageId)
|
||
|
|
|
||
|
|
var toRadio = ToRadio()
|
||
|
|
toRadio.packet = meshPacket
|
||
|
|
try await AccessoryManager.shared.send(toRadio, debugDescription: "Retry waypoint for message \(item.originalMessageId)")
|
||
|
|
}
|
||
|
|
|
||
|
|
private func resendTracerouteMessage(_ item: RetryQueueItem) async throws {
|
||
|
|
guard let fromNodeNum = await AccessoryManager.shared.activeConnection?.device.num else {
|
||
|
|
throw AccessoryError.ioFailed("Not connected to any device")
|
||
|
|
}
|
||
|
|
|
||
|
|
let routePacket = RouteDiscovery()
|
||
|
|
var meshPacket = MeshPacket()
|
||
|
|
let newMessageId = UInt32.random(in: UInt32(UInt8.max)..<UInt32.max)
|
||
|
|
meshPacket.id = newMessageId
|
||
|
|
|
||
|
|
meshPacket.to = Constants.maximumNodeNum
|
||
|
|
meshPacket.from = UInt32(fromNodeNum)
|
||
|
|
meshPacket.channel = 0
|
||
|
|
meshPacket.wantAck = true
|
||
|
|
|
||
|
|
var dataMessage = DataMessage()
|
||
|
|
if let serializedData = try? routePacket.serializedData() {
|
||
|
|
dataMessage.payload = serializedData
|
||
|
|
dataMessage.portnum = PortNum.tracerouteApp
|
||
|
|
dataMessage.wantResponse = true
|
||
|
|
meshPacket.decoded = dataMessage
|
||
|
|
} else {
|
||
|
|
throw AccessoryError.ioFailed("Failed to serialize traceroute packet")
|
||
|
|
}
|
||
|
|
|
||
|
|
setCurrentPacketId(for: item.id, packetId: newMessageId)
|
||
|
|
|
||
|
|
var toRadio = ToRadio()
|
||
|
|
toRadio.packet = meshPacket
|
||
|
|
|
||
|
|
try await AccessoryManager.shared.send(toRadio, debugDescription: "Retry traceroute for message \(item.originalMessageId)")
|
||
|
|
|
||
|
|
// Update TraceRouteEntity with the new packet ID
|
||
|
|
let context = PersistenceController.shared.container.viewContext
|
||
|
|
let traceRequest = TraceRouteEntity.fetchRequest()
|
||
|
|
traceRequest.predicate = NSPredicate(format: "id == %lld", item.originalMessageId)
|
||
|
|
|
||
|
|
do {
|
||
|
|
let fetchedRoutes = try context.fetch(traceRequest)
|
||
|
|
var targetNodeNum: Int64 = 0
|
||
|
|
for route in fetchedRoutes {
|
||
|
|
targetNodeNum = route.node?.num ?? 0
|
||
|
|
context.delete(route)
|
||
|
|
}
|
||
|
|
|
||
|
|
let newTraceRoute = TraceRouteEntity(context: context)
|
||
|
|
newTraceRoute.id = Int64(newMessageId)
|
||
|
|
newTraceRoute.time = Date()
|
||
|
|
newTraceRoute.sent = true
|
||
|
|
|
||
|
|
if targetNodeNum > 0 {
|
||
|
|
let nodesRequest = NodeInfoEntity.fetchRequest()
|
||
|
|
nodesRequest.predicate = NSPredicate(format: "num == %lld", targetNodeNum)
|
||
|
|
if let nodes = try? context.fetch(nodesRequest), let node = nodes.first {
|
||
|
|
newTraceRoute.node = node
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
try context.save()
|
||
|
|
Logger.mesh.info("📬 Created replacement TraceRouteEntity with new ID \(newMessageId) for retry of \(item.originalMessageId)")
|
||
|
|
} catch {
|
||
|
|
Logger.mesh.error("📬 Failed to update TraceRouteEntity for retry: \(error.localizedDescription, privacy: .public)")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// MARK: - Queue Management
|
||
|
|
|
||
|
|
func addToQueue(
|
||
|
|
originalMessageId: Int64,
|
||
|
|
messageType: MessageType,
|
||
|
|
payload: Data,
|
||
|
|
portNum: PortNum,
|
||
|
|
toUserNum: Int64,
|
||
|
|
channel: Int32,
|
||
|
|
isEmoji: Bool = false,
|
||
|
|
replyID: Int64 = 0,
|
||
|
|
pkiEncrypted: Bool = false,
|
||
|
|
publicKey: Data? = nil,
|
||
|
|
originalPayload: String? = nil,
|
||
|
|
hopLimit: UInt32? = nil
|
||
|
|
) {
|
||
|
|
// Don't add if this message has already exhausted its retries
|
||
|
|
if failedMessageIds.contains(originalMessageId) {
|
||
|
|
Logger.mesh.info("📬 Message \(originalMessageId) has exhausted retries, not re-adding to queue")
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
let originalPacketId = UInt32(truncatingIfNeeded: originalMessageId)
|
||
|
|
// Check if already in queue by original message ID / any known packet ID
|
||
|
|
if queue.contains(where: { $0.originalMessageId == originalMessageId || $0.matchesPacketId(originalPacketId) }) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Check if we have an existing item with this packet ID (late routing for a prior retry)
|
||
|
|
if let existingItem = queue.first(where: { $0.matchesPacketId(originalPacketId) }) {
|
||
|
|
Logger.mesh.info("📬 Message with packet ID \(originalMessageId) is already being tracked (original: \(existingItem.originalMessageId))")
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
let item = RetryQueueItem(
|
||
|
|
originalMessageId: originalMessageId,
|
||
|
|
messageType: messageType,
|
||
|
|
payload: payload,
|
||
|
|
portNum: portNum,
|
||
|
|
toUserNum: toUserNum,
|
||
|
|
channel: channel,
|
||
|
|
isEmoji: isEmoji,
|
||
|
|
replyID: replyID,
|
||
|
|
pkiEncrypted: pkiEncrypted,
|
||
|
|
publicKey: publicKey,
|
||
|
|
originalPayload: originalPayload,
|
||
|
|
hopLimit: hopLimit
|
||
|
|
)
|
||
|
|
|
||
|
|
queue.append(item)
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
Logger.mesh.info("📬 Added message \(originalMessageId) to retry queue (retry 1/\(self.maxRetries + 1) in 10s)")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
|
||
|
|
func cancelRetry(for messageId: Int64) {
|
||
|
|
if let index = queue.firstIndex(where: { $0.originalMessageId == messageId }) {
|
||
|
|
queue[index].state = .cancelled
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
Logger.mesh.info("📬 Cancelled retry for message \(messageId)")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func cancelRetry(forItemId itemId: UUID) {
|
||
|
|
if let index = queue.firstIndex(where: { $0.id == itemId }) {
|
||
|
|
queue[index].state = .cancelled
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
Logger.mesh.info("📬 Cancelled retry for item \(itemId)")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func clearAllRetries() {
|
||
|
|
for index in queue.indices {
|
||
|
|
queue[index].state = .cancelled
|
||
|
|
}
|
||
|
|
self.pendingCount = 0
|
||
|
|
Logger.mesh.info("📬 Cleared all pending retries")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
|
||
|
|
func markCompleted(for messageId: Int64) {
|
||
|
|
if let index = queue.firstIndex(where: { $0.originalMessageId == messageId }) {
|
||
|
|
queue[index].state = .completed
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
Logger.mesh.info("📬 Marked message \(messageId) as completed")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func markFailed(for messageId: Int64, error: String? = nil) {
|
||
|
|
if let index = queue.firstIndex(where: { $0.originalMessageId == messageId }) {
|
||
|
|
queue[index].state = .failed
|
||
|
|
if let error = error {
|
||
|
|
queue[index].lastError = error
|
||
|
|
}
|
||
|
|
// Track that this message has exhausted its retries
|
||
|
|
failedMessageIds.insert(messageId)
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
Logger.mesh.info("📬 Marked message \(messageId) as failed")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func removeCompleted() {
|
||
|
|
queue.removeAll { $0.state == .completed || $0.state == .failed || $0.state == .cancelled }
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
|
||
|
|
func clearAll() {
|
||
|
|
queue.removeAll()
|
||
|
|
self.pendingCount = 0
|
||
|
|
Logger.mesh.info("📬 Cleared all pending retries")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
|
||
|
|
func getQueue() -> [RetryQueueItem] {
|
||
|
|
return queue
|
||
|
|
}
|
||
|
|
|
||
|
|
func getPendingItems() -> [RetryQueueItem] {
|
||
|
|
return queue.filter { $0.state == .pending || $0.state == .waitingForAck }
|
||
|
|
}
|
||
|
|
|
||
|
|
func getStatus(for messageId: Int64) -> RetryState? {
|
||
|
|
if let item = queue.first(where: { $0.originalMessageId == messageId }) {
|
||
|
|
return item.state
|
||
|
|
}
|
||
|
|
let packetId = UInt32(truncatingIfNeeded: messageId)
|
||
|
|
if let item = queue.first(where: { $0.matchesPacketId(packetId) }) {
|
||
|
|
return item.state
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func addToRetryQueue(_ item: RetryQueueItem) {
|
||
|
|
queue.append(item)
|
||
|
|
pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
Logger.mesh.info("📬 Added message \(item.originalMessageId) to retry queue (retry 1/\(self.maxRetries + 1) in 10s)")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
|
||
|
|
func getRetryStatus(for messageId: Int64) -> (current: Int, max: Int, state: RetryState)? {
|
||
|
|
if let item = queue.first(where: { $0.originalMessageId == messageId }) {
|
||
|
|
return (item.displayAttemptNumber, maxRetries + 1, item.state)
|
||
|
|
}
|
||
|
|
let packetId = UInt32(truncatingIfNeeded: messageId)
|
||
|
|
if let item = queue.first(where: { $0.matchesPacketId(packetId) }) {
|
||
|
|
return (item.displayAttemptNumber, maxRetries + 1, item.state)
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func originalMessageId(forPacketId packetId: UInt32) -> Int64? {
|
||
|
|
queue.first(where: { $0.matchesPacketId(packetId) })?.originalMessageId
|
||
|
|
}
|
||
|
|
|
||
|
|
func markCompletedByPacketId(_ packetId: UInt32) {
|
||
|
|
// Check both original message ID and current packet ID
|
||
|
|
if let index = queue.firstIndex(where: { $0.matchesPacketId(packetId) }) {
|
||
|
|
queue[index].state = .completed
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
Logger.mesh.info("📬 Marked message with packet ID \(packetId) as completed (retry)")
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func canRetry(_ messageId: Int64) -> Bool {
|
||
|
|
// Check if message has already exhausted its retries
|
||
|
|
if failedMessageIds.contains(messageId) {
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
|
||
|
|
// Check by original message ID
|
||
|
|
if let item = queue.first(where: { $0.originalMessageId == messageId }) {
|
||
|
|
return item.state == .pending || item.state == .waitingForAck || item.state == .sending
|
||
|
|
}
|
||
|
|
|
||
|
|
// Also check by any known packet ID (for retries that created new packet IDs)
|
||
|
|
let packetId = UInt32(truncatingIfNeeded: messageId)
|
||
|
|
if let item = queue.first(where: { $0.matchesPacketId(packetId) }) {
|
||
|
|
return item.state == .pending || item.state == .waitingForAck || item.state == .sending
|
||
|
|
}
|
||
|
|
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Handle a NACK for a packet - finds existing item and increments retry count
|
||
|
|
func handleNack(for packetId: Int64) {
|
||
|
|
// Check if message has already exhausted its retries
|
||
|
|
if failedMessageIds.contains(packetId) {
|
||
|
|
Logger.mesh.info("📬 Message \(packetId) has exhausted retries, ignoring NACK")
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
let pid = UInt32(truncatingIfNeeded: packetId)
|
||
|
|
|
||
|
|
// Prefer matching by current packet ID / history (covers late routing for older retry packets)
|
||
|
|
if let index = queue.firstIndex(where: { $0.matchesPacketId(pid) }) {
|
||
|
|
let item = queue[index]
|
||
|
|
// If this NACK is for an older packet ID, but we're currently waiting on a newer packet,
|
||
|
|
// ignore it so we don't flip the UI to failed while the latest attempt is in-flight.
|
||
|
|
if item.currentPacketId != nil, item.currentPacketId != pid,
|
||
|
|
(item.state == .sending || item.state == .waitingForAck) {
|
||
|
|
Logger.mesh.info("📬 Ignoring stale NACK for packet \(packetId) (current: \(String(describing: item.currentPacketId)))")
|
||
|
|
return
|
||
|
|
}
|
||
|
|
handleNackForItem(at: index)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Message not in queue - this is the first NACK, add it to queue
|
||
|
|
Logger.mesh.info("📬 First NACK for packet \(packetId), adding to retry queue")
|
||
|
|
addNewRetryForPacket(packetId)
|
||
|
|
}
|
||
|
|
|
||
|
|
private func addNewRetryForPacket(_ packetId: Int64) {
|
||
|
|
// Try to find message data to create a proper retry item
|
||
|
|
let context = PersistenceController.shared.container.viewContext
|
||
|
|
|
||
|
|
// Try to find a MessageEntity with this ID (text messages)
|
||
|
|
let fetchRequest = MessageEntity.fetchRequest()
|
||
|
|
fetchRequest.predicate = NSPredicate(format: "messageId == %lld", packetId)
|
||
|
|
|
||
|
|
do {
|
||
|
|
let fetchedMessages = try context.fetch(fetchRequest)
|
||
|
|
if let message = fetchedMessages.first {
|
||
|
|
// Clear error status so UI shows retry state
|
||
|
|
message.ackError = 0
|
||
|
|
message.receivedACK = false
|
||
|
|
message.ackTimestamp = 0
|
||
|
|
try context.save()
|
||
|
|
|
||
|
|
// Found message entity, create retry with full data
|
||
|
|
let payloadData = message.messagePayload?.data(using: .utf8) ?? Data()
|
||
|
|
let item = RetryQueueItem(
|
||
|
|
originalMessageId: packetId,
|
||
|
|
messageType: .text,
|
||
|
|
payload: payloadData,
|
||
|
|
portNum: .textMessageApp,
|
||
|
|
toUserNum: message.toUser?.num ?? 0,
|
||
|
|
channel: message.channel,
|
||
|
|
isEmoji: message.isEmoji,
|
||
|
|
replyID: message.replyID,
|
||
|
|
pkiEncrypted: message.pkiEncrypted,
|
||
|
|
publicKey: message.publicKey,
|
||
|
|
originalPayload: message.messagePayload
|
||
|
|
)
|
||
|
|
queue.append(item)
|
||
|
|
Logger.mesh.info("📬 Added text message \(packetId) to retry queue (retry 1/\(self.maxRetries + 1) in 10s)")
|
||
|
|
} else {
|
||
|
|
// Check if it's a traceroute message
|
||
|
|
let traceRequest = TraceRouteEntity.fetchRequest()
|
||
|
|
traceRequest.predicate = NSPredicate(format: "id == %lld", packetId)
|
||
|
|
do {
|
||
|
|
let fetchedRoutes = try context.fetch(traceRequest)
|
||
|
|
if let traceRoute = fetchedRoutes.first {
|
||
|
|
// Clear error status
|
||
|
|
traceRoute.sent = true
|
||
|
|
try context.save()
|
||
|
|
|
||
|
|
let item = RetryQueueItem(
|
||
|
|
originalMessageId: packetId,
|
||
|
|
messageType: .traceroute,
|
||
|
|
payload: Data(),
|
||
|
|
portNum: .tracerouteApp,
|
||
|
|
toUserNum: traceRoute.node?.num ?? 0,
|
||
|
|
channel: 0
|
||
|
|
)
|
||
|
|
queue.append(item)
|
||
|
|
Logger.mesh.info("📬 Added traceroute \(packetId) to retry queue (retry 1/\(self.maxRetries + 1) in 30s)")
|
||
|
|
} else {
|
||
|
|
// No entity found - create basic unknown retry
|
||
|
|
// The actual resend will fail gracefully if no serialized packet
|
||
|
|
let item = RetryQueueItem(
|
||
|
|
originalMessageId: packetId,
|
||
|
|
messageType: .unknown
|
||
|
|
)
|
||
|
|
queue.append(item)
|
||
|
|
Logger.mesh.info("📬 Added unknown packet \(packetId) to retry queue (basic, retry 1/\(self.maxRetries + 1) in 10s)")
|
||
|
|
}
|
||
|
|
} catch {
|
||
|
|
// No traceroute found, create basic unknown retry
|
||
|
|
let item = RetryQueueItem(
|
||
|
|
originalMessageId: packetId,
|
||
|
|
messageType: .unknown
|
||
|
|
)
|
||
|
|
queue.append(item)
|
||
|
|
Logger.mesh.info("📬 Added packet \(packetId) to retry queue (fallback, retry 1/\(self.maxRetries + 1) in 10s)")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
} catch {
|
||
|
|
// Even on error, add basic unknown item
|
||
|
|
let item = RetryQueueItem(
|
||
|
|
originalMessageId: packetId,
|
||
|
|
messageType: .unknown
|
||
|
|
)
|
||
|
|
queue.append(item)
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
Logger.mesh.error("📬 Failed to fetch message for retry, added basic item: \(error.localizedDescription, privacy: .public)")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private func handleNackForItem(at index: Int) {
|
||
|
|
let item = queue[index]
|
||
|
|
let newRetryCount = item.normalizedRetryCount + 1
|
||
|
|
|
||
|
|
if newRetryCount > self.maxRetries {
|
||
|
|
// Exhausted retries
|
||
|
|
queue[index].state = .failed
|
||
|
|
failedMessageIds.insert(item.originalMessageId)
|
||
|
|
Logger.mesh.info("📬 Message \(item.originalMessageId) exhausted \(self.maxRetries) retries, marking as failed")
|
||
|
|
} else {
|
||
|
|
// Increment retry count and reschedule
|
||
|
|
let delay = retryDelay(for: item.messageType, retryCount: newRetryCount)
|
||
|
|
queue[index].retryCount = newRetryCount
|
||
|
|
queue[index].nextRetryDate = Date().addingTimeInterval(delay)
|
||
|
|
queue[index].state = .pending
|
||
|
|
queue[index].currentPacketId = nil // Will be set when retry is sent
|
||
|
|
|
||
|
|
// Clear the error in the database so UI shows retry state instead of error
|
||
|
|
clearMessageError(for: item.originalMessageId)
|
||
|
|
|
||
|
|
Logger.mesh.info("📬 Message \(item.originalMessageId) NACK received, retry \(newRetryCount)/\(self.maxRetries + 1) scheduled in \(Int(delay))s")
|
||
|
|
}
|
||
|
|
|
||
|
|
self.pendingCount = queue.filter { $0.state == .pending || $0.state == .waitingForAck || $0.state == .sending }.count
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
|
||
|
|
private func clearMessageError(for messageId: Int64) {
|
||
|
|
let context = PersistenceController.shared.container.viewContext
|
||
|
|
let fetchRequest = MessageEntity.fetchRequest()
|
||
|
|
fetchRequest.predicate = NSPredicate(format: "messageId == %lld", messageId)
|
||
|
|
|
||
|
|
do {
|
||
|
|
let messages = try context.fetch(fetchRequest)
|
||
|
|
for message in messages {
|
||
|
|
message.ackError = 0
|
||
|
|
message.receivedACK = false
|
||
|
|
message.ackTimestamp = 0
|
||
|
|
}
|
||
|
|
try context.save()
|
||
|
|
Logger.mesh.info("📬 Cleared error status for message \(messageId) during retry")
|
||
|
|
} catch {
|
||
|
|
Logger.mesh.error("📬 Failed to clear message error: \(error.localizedDescription, privacy: .public)")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// MARK: - Private Helpers
|
||
|
|
|
||
|
|
private func updateItemState(_ itemId: UUID, state: RetryState) {
|
||
|
|
if let index = queue.firstIndex(where: { $0.id == itemId }) {
|
||
|
|
queue[index].state = state
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private func updateItemRetry(_ itemId: UUID, retryCount: Int, nextRetryDate: Date) {
|
||
|
|
if let index = queue.firstIndex(where: { $0.id == itemId }) {
|
||
|
|
queue[index].retryCount = retryCount
|
||
|
|
queue[index].nextRetryDate = nextRetryDate
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private func updateItemError(_ itemId: UUID, error: String) {
|
||
|
|
if let index = queue.firstIndex(where: { $0.id == itemId }) {
|
||
|
|
queue[index].lastError = error
|
||
|
|
notifyUpdate()
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
extension Array {
|
||
|
|
subscript(safe index: Index) -> Element? {
|
||
|
|
indices.contains(index) ? self[index] : nil
|
||
|
|
}
|
||
|
|
}
|