Changes to OTA Protocol

This commit is contained in:
Jake-B 2025-12-23 14:58:08 -05:00
parent 10ffdf9e10
commit 5a150513aa
2 changed files with 160 additions and 109 deletions

View file

@ -9,6 +9,7 @@ import Foundation
import CoreBluetooth
import OSLog
import UIKit
import CryptoKit
private let meshtasticOTAServiceId = CBUUID(string: "4FAFC201-1FB5-459E-8FCC-C5C9C331914B")
private let statusCharacteristicId = CBUUID(string: "62EC0272-3EC5-11EB-B378-0242AC130003") // ESP32 pTxCharacteristic ESP send (notifying)
@ -54,8 +55,21 @@ final class ESP32BLEOTAViewModel2: ObservableObject {
// Start transfer
let data = try Data(contentsOf: binURL)
let sizeMsg = "OTA_SIZE:\(data.count)"
try await ble.writeValue(Data(sizeMsg.utf8), for: otaChar, type: .withoutResponse, on: peripheral)
// Get hash of the file
let sha256Digest = SHA256.hash(data: data)
let fileHash = sha256Digest.map { String(format: "%02hhx", $0) }.joined()
Logger.services.info("Firmware SHA-256 is \(fileHash)")
let fileSize = data.count
// Using the espota.py derrived start message, same as the WiFi/TCP code.
// Start of OTA Message: (cmd) (port) (size) (hash)
// Normally cmd is FLASH = 0, SPIFFS = 100, AUTH = 200, but only FLASH Implemented.
// Port is not used for BLE, ignored by OTA Loader
let message = "0 0 \(fileSize) \(fileHash)"
try await ble.writeValue(Data(message.utf8), for: otaChar, type: .withoutResponse, on: peripheral)
var buffer = data

View file

@ -24,20 +24,14 @@ class ESP32WifiOTAViewModel: ObservableObject {
// MARK: - Public Interface
/// Starts the OTA update process with the given host, firmware URL, and optional password.
func startUpdate(host: String, firmwareUrl: URL, password: String? = nil) async {
guard self.otaState == .idle else { return }
guard otaState == .idle else { return }
self.progress = 0.0
self.errorMessage = nil
self.statusMessage = "Connecting..."
self.otaState = .waitingForConnection
var listener: NWListener?
defer {
listener?.cancel()
self.otaState = .idle
}
progress = 0.0
errorMessage = nil
statusMessage = "Connecting..."
otaState = .waitingForConnection
do {
let firmwareData = try Data(contentsOf: firmwareUrl)
@ -47,45 +41,62 @@ class ESP32WifiOTAViewModel: ObservableObject {
}
Logger.services.info("[ESP OTA] Starting local TCP Listener...")
let (setupListener, localPort) = try await setupListener(sending: firmwareData)
listener = setupListener
let (listener, localPort) = try await setupListener(sending: firmwareData)
Logger.services.info("[ESP OTA] Listening on port \(localPort)")
self.statusMessage = "Waiting for device. This can take a while..."
// Ensure listener is cancelled on exit (success or failure)
defer { listener.cancel() }
statusMessage = "Waiting for device. This can take a while..."
Logger.services.info("[ESP OTA] Starting Handshake loop...")
try await performHandshake(host: host, localPort: localPort, data: firmwareData, password: password)
try await performHandshake(host: host,
localPort: localPort,
data: firmwareData,
password: password)
self.otaState = .connected
otaState = .connected
for try await _ in transferStream { break }
self.statusMessage = "Success!"
self.otaState = .completed
statusMessage = "Success!"
otaState = .completed
Logger.services.info("[ESP OTA] Update Complete")
} catch {
Logger.services.error("[ESP OTA] Error: \(error.localizedDescription)")
self.errorMessage = error.localizedDescription
self.statusMessage = "Failed"
self.otaState = .error
self.transferContinuation?.finish(throwing: error)
self.transferContinuation = nil
errorMessage = error.localizedDescription
statusMessage = "Failed"
otaState = .error
transferContinuation?.finish(throwing: error)
transferContinuation = nil
}
}
// MARK: - Phase 2: Handshake Logic
// MARK: - Handshake Logic
private actor HandshakeState {
var currentPayload: Data
init(initialPayload: Data) { self.currentPayload = initialPayload }
func updatePayload(_ data: Data) { self.currentPayload = data }
func getPayload() -> Data { return currentPayload }
var okReceived: Bool = false
init(initialPayload: Data) {
self.currentPayload = initialPayload
}
func updatePayload(_ data: Data) {
currentPayload = data
}
func getPayload() -> Data {
currentPayload
}
func markOkReceived() {
okReceived = true
}
func isOkReceived() -> Bool {
okReceived
}
}
/// Performs the UDP handshake with the ESP32 device.
private func performHandshake(host: String, localPort: UInt16, data: Data, password: String?) async throws {
let initialPayload = try generateInvitationPayload(localPort: localPort, data: data, password: password, authNonce: nil)
let state = HandshakeState(initialPayload: initialPayload)
@ -99,46 +110,44 @@ class ESP32WifiOTAViewModel: ObservableObject {
Logger.services.info("[ESP OTA] UDP Connection Ready. Starting broadcast/listen loop.")
var okReceived = false
try await withThrowingTaskGroup(of: Void.self) { group in
// Task A: Broadcaster
group.addTask {
while !Task.isCancelled, !okReceived {
var okReceived = await state.isOkReceived()
while !Task.isCancelled && !okReceived {
let payload = await state.getPayload()
connection.send(content: payload, completion: .contentProcessed { _ in })
try await connection.sendAsync(data: payload)
Logger.services.debug("[ESP OTA] Sent invitation packet")
try await Task.sleep(nanoseconds: UInt64(self.retryDelay * 1_000_000_000))
try await Task.sleep(for: .seconds(self.retryDelay))
okReceived = await state.isOkReceived()
}
}
// Task B: Listener
// Task B: Listener (using async stream for messages)
group.addTask {
while !Task.isCancelled {
let response = try await self.receiveNextMessage(connection: connection)
let messageStream = self.receiveMessageStream(from: connection)
for try await response in messageStream {
if Task.isCancelled { break }
if response == "OK" {
Logger.services.info("[ESP OTA] Handshake OK received!")
okReceived = true
await state.markOkReceived()
return
}
// THIS IS UNTESTED
if response.hasPrefix("AUTH") {
Logger.services.info("[ESP OTA] Auth challenge received: \(response)")
let components = response.components(separatedBy: " ")
if components.count > 1 {
let nonce = components[1]
let newPayload = try self.generateInvitationPayload(localPort: localPort,
data: data,
password: password,
authNonce: nonce)
let newPayload = try self.generateInvitationPayload(localPort: localPort, data: data, password: password, authNonce: nonce)
await state.updatePayload(newPayload)
}
}
if response == "ERASE" {
Logger.services.info("[ESP OTA] Device is erasing the flash partition.")
Task { @MainActor in
await self.updateUI { // Safe MainActor update
self.otaState = .preparing
self.statusMessage = "Preparing flash partition..."
}
@ -148,41 +157,58 @@ class ESP32WifiOTAViewModel: ObservableObject {
// Task C: Timeout
group.addTask {
try await Task.sleep(nanoseconds: UInt64(self.handshakeTotalTimeout * 1_000_000_000))
try await Task.sleep(for: .seconds(self.handshakeTotalTimeout))
throw OTAError.timeout
}
try await group.next()
group.cancelAll()
try await group.next() // Wait for first completion (success or error)
group.cancelAll() // Cancel remaining tasks
}
}
// MARK: - UDP Helpers (Nonisolated)
// MARK: - UDP Helpers
nonisolated private func receiveNextMessage(connection: NWConnection) async throws -> String {
return try await withCheckedThrowingContinuation { continuation in
connection.receiveMessage { content, _, _, error in
if let error = error {
continuation.resume(throwing: error)
} else if let data = content, let str = String(data: data, encoding: .utf8) {
continuation.resume(returning: str.trimmingCharacters(in: .whitespacesAndNewlines))
} else {
continuation.resume(returning: "")
/// Creates an async stream of incoming UDP messages.
nonisolated private func receiveMessageStream(from connection: NWConnection) -> AsyncThrowingStream<String, Error> {
AsyncThrowingStream<String, Error> { continuation in
func receiveNext() {
connection.receiveMessage { content, _, _, error in
if let error = error {
continuation.finish(throwing: error)
return
}
if let data = content, let str = String(data: data, encoding: .utf8) {
continuation.yield(str.trimmingCharacters(in: .whitespacesAndNewlines))
}
if !Task.isCancelled {
receiveNext() // Recurse to continue streaming
} else {
continuation.finish()
}
}
}
receiveNext()
continuation.onTermination = { _ in
connection.cancel()
}
}
}
nonisolated private func generateInvitationPayload(localPort: UInt16, data: Data, password: String?, authNonce: String?) throws -> Data {
let fileMD5 = Insecure.MD5.hash(data: data).map { String(format: "%02hhx", $0) }.joined()
Logger.services.info("Firmware MD5 is \(fileMD5)")
let sha256Digest = SHA256.hash(data: data)
let fileHash = sha256Digest.map { String(format: "%02hhx", $0) }.joined()
Logger.services.info("Firmware SHA-256 is \(fileHash)")
let fileSize = data.count
var message = "0 \(localPort) \(fileSize) \(fileMD5)"
var message = "0 \(localPort) \(fileSize) \(fileHash)"
if let nonce = authNonce, let pass = password {
let authInput = pass + nonce
if let authData = authInput.data(using: .utf8) {
let authHash = Insecure.MD5.hash(data: authData).map { String(format: "%02hhx", $0) }.joined()
let authSha256 = SHA256.hash(data: authData)
let authFirst16 = Data(authSha256.prefix(16))
let authHash = authFirst16.map { String(format: "%02hhx", $0) }.joined()
message += " " + authHash
}
}
@ -191,36 +217,36 @@ class ESP32WifiOTAViewModel: ObservableObject {
return payload
}
/// Uses OSAllocatedUnfairLock to safely ensure resume is called exactly once
/// Waits for the connection to become ready using a continuation with lock for safety.
nonisolated private func waitForConnectionReady(_ connection: NWConnection) async throws {
return try await withCheckedThrowingContinuation { continuation in
let stateLock = OSAllocatedUnfairLock(initialState: false) // The Idiomatic Swift 6 Lock
try await withCheckedThrowingContinuation { continuation in
// Ensure we only resume the continuation once
var didResume = false
connection.stateUpdateHandler = { state in
// We lock, check if we already resumed, set to true, and perform logic
stateLock.withLock { hasResumed in
if hasResumed { return }
switch state {
case .ready:
hasResumed = true
continuation.resume()
case .failed(let err):
hasResumed = true
continuation.resume(throwing: err)
case .cancelled:
hasResumed = true
continuation.resume(throwing: CancellationError())
default:
break
}
// Guard against multiple resumes due to multiple state updates
if didResume { return }
switch state {
case .ready:
didResume = true
continuation.resume()
case .failed(let err):
didResume = true
continuation.resume(throwing: err)
case .cancelled:
didResume = true
continuation.resume(throwing: CancellationError())
default:
break
}
}
}
}
// MARK: - Phase 1 & 4 (Listener & Transfer)
// MARK: - Listener & Transfer Logic
/// Sets up the TCP listener and returns it along with the assigned port.
private func setupListener(sending firmware: Data) async throws -> (NWListener, UInt16) {
let parameters = NWParameters(tls: nil)
parameters.includePeerToPeer = true
@ -228,31 +254,36 @@ class ESP32WifiOTAViewModel: ObservableObject {
let listener = try NWListener(using: parameters, on: .init(integerLiteral: 0))
return try await withCheckedThrowingContinuation { continuation in
let stateLock = OSAllocatedUnfairLock(initialState: false)
return try await withCheckedThrowingContinuation<(NWListener, UInt16)> { continuation in
let stateLock = OSAllocatedUnfairLock<Bool>(initialState: false)
// Set newConnectionHandler before starting (matches original to avoid timing issues)
listener.newConnectionHandler = { newConnection in
Logger.services.info("[ESP OTA] Accepted connection from \(String(describing: newConnection.endpoint))")
Task { @MainActor in
self.handleIncomingConnection(connection: newConnection, data: firmware)
newConnection.start(queue: .global())
newConnection.start(queue: .global()) // Start here as in original
}
}
listener.stateUpdateHandler = { state in
stateLock.withLock { hasResumed in
if hasResumed { return }
stateLock.withLock { isHandled in
if isHandled { return }
isHandled = true
switch state {
case .ready:
Logger.services.debug("[ESP OTA] Listener ready with port: \(String(describing: listener.port?.rawValue ?? 0))")
if let port = listener.port {
hasResumed = true
continuation.resume(returning: (listener, port.rawValue))
} else {
continuation.resume(throwing: OTAError.connectionFailed)
}
case .failed(let error):
hasResumed = true
Logger.services.error("[ESP OTA] Listener failed: \(error)")
continuation.resume(throwing: error)
default:
Logger.services.debug("[ESP OTA] Listener state: \(String(describing: state))")
break
}
}
@ -261,6 +292,7 @@ class ESP32WifiOTAViewModel: ObservableObject {
}
}
/// Handles an incoming TCP connection for firmware transfer.
private func handleIncomingConnection(connection: NWConnection, data: Data) {
connection.stateUpdateHandler = { state in
switch state {
@ -269,17 +301,12 @@ class ESP32WifiOTAViewModel: ObservableObject {
self.otaState = .transferring
do {
try await self.performChunkedTransfer(connection: connection, data: data)
await MainActor.run {
self.transferContinuation?.yield()
self.transferContinuation?.finish()
self.transferContinuation = nil
}
self.transferContinuation?.yield()
self.transferContinuation?.finish()
} catch {
await MainActor.run {
self.transferContinuation?.finish(throwing: error)
self.transferContinuation = nil
}
self.transferContinuation?.finish(throwing: error)
}
self.transferContinuation = nil
connection.cancel()
}
case .failed(let error):
@ -292,11 +319,12 @@ class ESP32WifiOTAViewModel: ObservableObject {
}
}
/// Performs the chunked TCP transfer of firmware data.
nonisolated private func performChunkedTransfer(connection: NWConnection, data: Data) async throws {
var offset = 0
let totalSize = data.count
while offset < totalSize {
while offset < totalSize && !Task.isCancelled {
let endIndex = min(offset + chunkSize, totalSize)
let chunk = data[offset..<endIndex]
try await connection.sendAsync(data: chunk)
@ -304,20 +332,28 @@ class ESP32WifiOTAViewModel: ObservableObject {
offset += chunk.count
let percent = Double(offset) / Double(totalSize)
// Batch progress updates every 10 chunks to avoid excessive MainActor hops
if offset % (chunkSize * 10) == 0 {
await MainActor.run {
await updateUI {
self.progress = percent
self.statusMessage = "Please stay on this screen while update completes..."
}
}
}
await MainActor.run {
await updateUI {
self.progress = 1.0
self.statusMessage = "Done..."
}
try await Task.sleep(nanoseconds: 3_000_000_000)
try await Task.sleep(for: .seconds(3))
}
// MARK: - UI Update Helper
/// Safely updates UI properties on MainActor.
private func updateUI(_ block: @MainActor @Sendable () -> Void) async {
await MainActor.run { block() }
}
}
@ -342,8 +378,8 @@ enum OTAError: Error, LocalizedError {
extension NWConnection {
func sendAsync(data: Data) async throws {
return try await withCheckedThrowingContinuation { continuation in
self.send(content: data, completion: .contentProcessed { error in
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
send(content: data, completion: .contentProcessed { error in
if let error = error {
continuation.resume(throwing: error)
} else {
@ -353,3 +389,4 @@ extension NWConnection {
}
}
}