mirror of
https://github.com/agessaman/meshcore-packet-capture.git
synced 2026-04-20 23:23:37 +00:00
693 lines
No EOL
32 KiB
Python
693 lines
No EOL
32 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BLE Pairing Helper for MeshCore Packet Capture Installer
|
|
Checks pairing status and handles PIN-based pairing
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import json
|
|
import platform
|
|
import subprocess
|
|
|
|
# Wrap imports in try-except to handle import errors gracefully
|
|
try:
|
|
from meshcore import MeshCore
|
|
except ImportError as e:
|
|
print(json.dumps({
|
|
"status": "error",
|
|
"message": f"Failed to import meshcore: {str(e)}"
|
|
}), flush=True)
|
|
sys.exit(1)
|
|
|
|
def is_linux():
|
|
"""Check if running on Linux"""
|
|
return platform.system().lower() == 'linux'
|
|
|
|
def is_macos():
|
|
"""Check if running on macOS"""
|
|
return platform.system().lower() == 'darwin'
|
|
|
|
async def check_pairing_and_connect(address, name, pin=None):
|
|
"""Check if device is paired and handle pairing if needed"""
|
|
meshcore = None
|
|
try:
|
|
print(f"Checking pairing status for {name} ({address})...", file=sys.stderr, flush=True)
|
|
|
|
# Check if device is available/visible first
|
|
print(f"Checking if device {address} is available...", file=sys.stderr, flush=True)
|
|
|
|
# Quick scan to verify device is still visible
|
|
try:
|
|
from bleak import BleakScanner
|
|
print("Scanning for device availability...", file=sys.stderr, flush=True)
|
|
devices = await BleakScanner.discover(timeout=5.0)
|
|
device_found = any(device.address.upper() == address.upper() for device in devices)
|
|
if device_found:
|
|
print(f"Device {address} is visible and available", file=sys.stderr, flush=True)
|
|
else:
|
|
print(f"Device {address} not found in scan - may be busy or out of range", file=sys.stderr, flush=True)
|
|
except Exception as e:
|
|
print(f"Could not scan for device availability: {e}", file=sys.stderr, flush=True)
|
|
|
|
# Try to connect without PIN first (with timeout)
|
|
# On macOS, use a longer timeout to allow pairing dialog to appear
|
|
try:
|
|
print(f"Attempting to connect to {name} ({address}) without PIN...", file=sys.stderr, flush=True)
|
|
# On macOS, use longer timeout to allow system pairing dialog to appear and be completed
|
|
connection_timeout = 30.0 if is_macos() else 25.0
|
|
print(f"Connection timeout set to {connection_timeout} seconds...", file=sys.stderr, flush=True)
|
|
if is_macos():
|
|
print("If a pairing dialog appears, please enter the PIN from your device.", file=sys.stderr, flush=True)
|
|
|
|
# Create the connection with a reasonable timeout
|
|
# On macOS, longer timeout allows time for user to complete pairing dialog
|
|
meshcore = await asyncio.wait_for(
|
|
MeshCore.create_ble(address=address, debug=False), # Disable debug to reduce output
|
|
timeout=connection_timeout
|
|
)
|
|
|
|
# Wait a moment for connection to fully establish (especially important on macOS)
|
|
await asyncio.sleep(1.5)
|
|
|
|
# Verify connection is actually established and working
|
|
# On macOS, the connection object might be created but not actually connected if pairing is needed
|
|
if not hasattr(meshcore, 'is_connected') or not meshcore.is_connected:
|
|
print("Connection created but not actually connected - pairing may be required", file=sys.stderr, flush=True)
|
|
try:
|
|
await meshcore.disconnect()
|
|
except:
|
|
pass
|
|
print(json.dumps({
|
|
"status": "not_paired",
|
|
"message": "Device requires pairing"
|
|
}), flush=True)
|
|
return False
|
|
|
|
print("BLE connection established successfully", file=sys.stderr, flush=True)
|
|
print(f"Connection verified: is_connected={meshcore.is_connected}", file=sys.stderr, flush=True)
|
|
|
|
# Try to verify connection works by attempting a simple operation
|
|
# This helps ensure the connection is actually functional, not just created
|
|
# On macOS, sometimes the connection appears successful but isn't actually working
|
|
try:
|
|
# Wait a bit more to ensure connection is stable
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Check if we can access device properties (indicates connection is functional)
|
|
if hasattr(meshcore, 'self_info'):
|
|
device_info = meshcore.self_info
|
|
if device_info:
|
|
print(f"Device info retrieved successfully - connection is functional", file=sys.stderr, flush=True)
|
|
else:
|
|
print("Device info not available yet, but connection appears valid", file=sys.stderr, flush=True)
|
|
except Exception as e:
|
|
# If we can't access device info, the connection might not be fully established
|
|
error_msg = str(e)
|
|
print(f"Could not verify connection functionality: {error_msg}", file=sys.stderr, flush=True)
|
|
|
|
# Check if this is a pairing-related error
|
|
if any(keyword in error_msg.lower() for keyword in
|
|
['pair', 'auth', 'permission', 'not permitted', 'not authorized', 'eof']):
|
|
print("Connection verification suggests pairing is required", file=sys.stderr, flush=True)
|
|
try:
|
|
await meshcore.disconnect()
|
|
except:
|
|
pass
|
|
print(json.dumps({
|
|
"status": "not_paired",
|
|
"message": "Device requires pairing"
|
|
}), flush=True)
|
|
return False
|
|
# Otherwise, assume connection is valid even if we can't verify
|
|
print("Assuming connection is valid despite verification failure", file=sys.stderr, flush=True)
|
|
|
|
# Connection succeeded and is functional - device is paired
|
|
await meshcore.disconnect()
|
|
|
|
# Additional safety: Force disconnect on Linux using bluetoothctl if available
|
|
if is_linux():
|
|
try:
|
|
print("Ensuring already-paired device is fully disconnected...", file=sys.stderr, flush=True)
|
|
subprocess.run([
|
|
"bluetoothctl", "disconnect", address
|
|
], capture_output=True, timeout=10)
|
|
await asyncio.sleep(1)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e:
|
|
print(f"Could not force disconnect already-paired device via bluetoothctl (this is OK): {e}", file=sys.stderr, flush=True)
|
|
|
|
print(json.dumps({
|
|
"status": "paired",
|
|
"message": "Device is already paired and ready to use"
|
|
}), flush=True)
|
|
return True
|
|
|
|
except EOFError as e:
|
|
# This is the key indicator that pairing is required
|
|
# Device connected but immediately disconnected - needs pairing
|
|
# On macOS, this often means the device is not paired
|
|
print("Device connected but immediately disconnected - pairing required", file=sys.stderr, flush=True)
|
|
print(f"EOFError details: {e}", file=sys.stderr, flush=True)
|
|
|
|
# On macOS, if we get EOFError, try one more time with a longer wait
|
|
# This gives macOS time to show the pairing dialog
|
|
if is_macos():
|
|
print("On macOS: Waiting a moment and retrying connection to allow pairing dialog...", file=sys.stderr, flush=True)
|
|
await asyncio.sleep(3) # Give time for pairing dialog to appear
|
|
try:
|
|
print("Retrying connection after pairing dialog may have appeared...", file=sys.stderr, flush=True)
|
|
meshcore_retry = await asyncio.wait_for(
|
|
MeshCore.create_ble(address=address, debug=False),
|
|
timeout=30.0
|
|
)
|
|
# If retry succeeds, device was paired via dialog
|
|
await asyncio.sleep(1.5)
|
|
if hasattr(meshcore_retry, 'is_connected') and meshcore_retry.is_connected:
|
|
print("Connection successful after pairing dialog!", file=sys.stderr, flush=True)
|
|
await meshcore_retry.disconnect()
|
|
print(json.dumps({
|
|
"status": "paired",
|
|
"message": "Device paired successfully via system dialog"
|
|
}), flush=True)
|
|
return True
|
|
await meshcore_retry.disconnect()
|
|
except Exception as retry_e:
|
|
print(f"Retry connection failed: {retry_e}", file=sys.stderr, flush=True)
|
|
|
|
print(json.dumps({
|
|
"status": "not_paired",
|
|
"message": "Device requires pairing"
|
|
}), flush=True)
|
|
return False
|
|
|
|
except asyncio.TimeoutError:
|
|
# Timeout could mean device is busy or needs pairing
|
|
print("Connection timed out", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "timeout",
|
|
"message": "Connection timed out - device may be busy or require pairing"
|
|
}), flush=True)
|
|
return False
|
|
|
|
except ConnectionError as e:
|
|
error_msg = str(e)
|
|
print(f"Connection error: {error_msg}", file=sys.stderr, flush=True)
|
|
|
|
# Check for pairing-related errors
|
|
if any(keyword in error_msg.lower() for keyword in
|
|
['pair', 'auth', 'permission', 'not permitted', 'not authorized']):
|
|
print("Device requires pairing", file=sys.stderr, flush=True)
|
|
print(json.dumps({"status": "not_paired", "message": "Device requires pairing"}), flush=True)
|
|
return False
|
|
else:
|
|
print(json.dumps({"status": "error", "message": error_msg}), flush=True)
|
|
return False
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
error_type = type(e).__name__
|
|
print(f"Connection attempt failed: {error_msg}", file=sys.stderr, flush=True)
|
|
print(f"Error type: {error_type}", file=sys.stderr, flush=True)
|
|
|
|
# Check if this is clearly a "device not found" error
|
|
if "No MeshCore device found" in error_msg or "not found" in error_msg.lower():
|
|
print("Device not found or not in range", file=sys.stderr, flush=True)
|
|
print(json.dumps({"status": "not_found", "message": "Device not found or not in range"}), flush=True)
|
|
return False
|
|
# Check for pairing errors
|
|
elif any(keyword in error_msg.lower() for keyword in
|
|
['pair', 'auth', 'permission', 'not permitted', 'not authorized']):
|
|
print("Device requires pairing", file=sys.stderr, flush=True)
|
|
print(json.dumps({"status": "not_paired", "message": "Device requires pairing"}), flush=True)
|
|
return False
|
|
else:
|
|
print(f"Connection error: {error_msg}", file=sys.stderr, flush=True)
|
|
print(json.dumps({"status": "error", "message": error_msg}), flush=True)
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Unexpected error: {e}", file=sys.stderr, flush=True)
|
|
print(json.dumps({"status": "error", "message": str(e)}), flush=True)
|
|
return False
|
|
finally:
|
|
# Ensure we always disconnect
|
|
if meshcore:
|
|
try:
|
|
await meshcore.disconnect()
|
|
except:
|
|
pass
|
|
|
|
async def attempt_pairing(address, name, pin):
|
|
"""Attempt to pair with the device using the provided PIN"""
|
|
if is_linux():
|
|
return await attempt_pairing_linux(address, name, pin)
|
|
else:
|
|
return await attempt_pairing_meshcore(address, name, pin)
|
|
|
|
async def attempt_pairing_linux(address, name, pin):
|
|
"""Attempt to pair with the device using bluetoothctl on Linux"""
|
|
try:
|
|
print(f"Attempting to pair with {name} using PIN {pin} on Linux...", file=sys.stderr, flush=True)
|
|
print(f"Make sure {name} is in pairing mode and displaying the PIN code.", file=sys.stderr, flush=True)
|
|
|
|
# Remove any existing pairing
|
|
subprocess.run(['bluetoothctl', 'remove', address],
|
|
capture_output=True, timeout=5)
|
|
await asyncio.sleep(1)
|
|
|
|
# Use bluetoothctl with expect-style interaction
|
|
import pexpect
|
|
|
|
print("Starting bluetoothctl pairing process...", file=sys.stderr, flush=True)
|
|
|
|
child = pexpect.spawn('bluetoothctl', encoding='utf-8', timeout=30)
|
|
child.logfile = sys.stderr
|
|
|
|
# Set up agent
|
|
child.sendline('agent on')
|
|
child.expect('Agent registered')
|
|
child.sendline('default-agent')
|
|
|
|
# First, try to scan and discover the device
|
|
print("Scanning for device...", file=sys.stderr, flush=True)
|
|
child.sendline('scan on')
|
|
child.expect('Discovery started')
|
|
|
|
# Wait a bit for discovery
|
|
await asyncio.sleep(3)
|
|
|
|
# Check if device is available
|
|
child.sendline(f'info {address}')
|
|
try:
|
|
child.expect(['Device', 'not available'], timeout=5)
|
|
if 'not available' in child.before + child.after:
|
|
print("Device not available, trying to connect anyway...", file=sys.stderr, flush=True)
|
|
except pexpect.TIMEOUT:
|
|
print("Device info check timed out, proceeding with pairing...", file=sys.stderr, flush=True)
|
|
|
|
# Stop scanning
|
|
child.sendline('scan off')
|
|
child.expect('Discovery stopped')
|
|
|
|
# Initiate pairing
|
|
print(f"Initiating pairing with {address}...", file=sys.stderr, flush=True)
|
|
child.sendline(f'pair {address}')
|
|
|
|
# Wait for PIN/passkey request or confirmation
|
|
index = child.expect([
|
|
'Enter PIN code:',
|
|
'Enter passkey',
|
|
r'Confirm passkey.*\(yes/no\)',
|
|
'Pairing successful',
|
|
'Failed to pair',
|
|
'not available',
|
|
'not found',
|
|
pexpect.TIMEOUT
|
|
])
|
|
|
|
if index == 0: # PIN entry
|
|
print(f"Device is requesting PIN entry. Entering PIN {pin}...", file=sys.stderr, flush=True)
|
|
child.sendline(pin)
|
|
# Wait for result after entering PIN
|
|
result_index = child.expect(['Pairing successful', 'Failed to pair', 'Authentication Failed'], timeout=15)
|
|
if result_index == 0:
|
|
print("PIN accepted, pairing successful!", file=sys.stderr, flush=True)
|
|
elif result_index == 1:
|
|
print("Pairing failed after PIN entry", file=sys.stderr, flush=True)
|
|
child.close()
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Pairing failed after PIN entry - PIN may be incorrect"
|
|
}), flush=True)
|
|
return False
|
|
else: # Authentication Failed
|
|
print("Authentication failed - PIN was incorrect", file=sys.stderr, flush=True)
|
|
child.close()
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Authentication failed - PIN was incorrect"
|
|
}), flush=True)
|
|
return False
|
|
|
|
elif index == 1: # Passkey entry
|
|
print(f"Device is requesting passkey entry. Entering passkey {pin}...", file=sys.stderr, flush=True)
|
|
child.sendline(pin)
|
|
# Wait for result after entering passkey
|
|
result_index = child.expect(['Pairing successful', 'Failed to pair', 'Authentication Failed'], timeout=15)
|
|
if result_index == 0:
|
|
print("Passkey accepted, pairing successful!", file=sys.stderr, flush=True)
|
|
elif result_index == 1:
|
|
print("Pairing failed after passkey entry", file=sys.stderr, flush=True)
|
|
child.close()
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Pairing failed after passkey entry - passkey may be incorrect"
|
|
}), flush=True)
|
|
return False
|
|
else: # Authentication Failed
|
|
print("Authentication failed - passkey was incorrect", file=sys.stderr, flush=True)
|
|
child.close()
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Authentication failed - passkey was incorrect"
|
|
}), flush=True)
|
|
return False
|
|
|
|
elif index == 2: # Passkey confirmation
|
|
print(f"Device is requesting passkey confirmation. Confirming...", file=sys.stderr, flush=True)
|
|
child.sendline('yes')
|
|
result_index = child.expect(['Pairing successful', 'Failed to pair'], timeout=10)
|
|
if result_index == 0:
|
|
print("Passkey confirmed, pairing successful!", file=sys.stderr, flush=True)
|
|
else:
|
|
print("Pairing failed after passkey confirmation", file=sys.stderr, flush=True)
|
|
child.close()
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Pairing failed after passkey confirmation"
|
|
}), flush=True)
|
|
return False
|
|
|
|
elif index == 3: # Already successful
|
|
print("Pairing was already successful!", file=sys.stderr, flush=True)
|
|
pass
|
|
|
|
elif index in [4, 5, 6]: # Failed, not available, or not found
|
|
print(f"Device pairing failed: {child.after}", file=sys.stderr, flush=True)
|
|
child.close()
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": f"Device not available or not in pairing mode. Make sure {name} is in pairing mode and nearby."
|
|
}), flush=True)
|
|
return False
|
|
|
|
else: # Timeout
|
|
print("Pairing timed out", file=sys.stderr, flush=True)
|
|
child.close()
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Pairing timed out - device may not be in pairing mode"
|
|
}), flush=True)
|
|
return False
|
|
|
|
# Note: We don't set 'trust' to avoid automatic reconnection
|
|
# The device will be paired but won't automatically reconnect
|
|
|
|
child.sendline('quit')
|
|
child.close()
|
|
|
|
print("Pairing successful via bluetoothctl!", file=sys.stderr, flush=True)
|
|
|
|
# Now verify with meshcore (without PIN)
|
|
await asyncio.sleep(2)
|
|
return await verify_paired_connection(address, name)
|
|
|
|
except Exception as e:
|
|
print(f"Pairing failed: {e}", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": str(e)
|
|
}), flush=True)
|
|
return False
|
|
|
|
async def attempt_pairing_meshcore(address, name, pin):
|
|
"""Attempt to pair with the device using meshcore (macOS/Windows)"""
|
|
meshcore = None
|
|
try:
|
|
print(f"Attempting to pair with {name} using PIN {pin} via meshcore...", file=sys.stderr, flush=True)
|
|
|
|
# First, try to remove any existing pairing to start fresh
|
|
try:
|
|
print("Removing any existing pairing...", file=sys.stderr, flush=True)
|
|
result = subprocess.run(
|
|
['bluetoothctl', 'remove', address],
|
|
capture_output=True,
|
|
timeout=5,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
print(f"Existing pairing removed", file=sys.stderr, flush=True)
|
|
await asyncio.sleep(1)
|
|
except FileNotFoundError:
|
|
print("bluetoothctl not found, skipping unpair step", file=sys.stderr, flush=True)
|
|
except Exception as e:
|
|
print(f"Could not remove existing pairing (this is OK): {e}", file=sys.stderr, flush=True)
|
|
|
|
# Connect with PIN
|
|
print(f"Connecting with PIN...", file=sys.stderr, flush=True)
|
|
meshcore = await asyncio.wait_for(
|
|
MeshCore.create_ble(address=address, pin=pin, debug=True),
|
|
timeout=60.0
|
|
)
|
|
|
|
print("BLE connection with PIN established successfully!", file=sys.stderr, flush=True)
|
|
|
|
# If we get here, pairing succeeded
|
|
await meshcore.disconnect()
|
|
|
|
# Wait a moment for disconnection
|
|
await asyncio.sleep(2)
|
|
|
|
# Additional safety: Force disconnect on Linux using bluetoothctl if available
|
|
if is_linux():
|
|
try:
|
|
print("Ensuring device is fully disconnected...", file=sys.stderr, flush=True)
|
|
subprocess.run([
|
|
"bluetoothctl", "disconnect", address
|
|
], capture_output=True, timeout=10)
|
|
await asyncio.sleep(1)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e:
|
|
print(f"Could not force disconnect via bluetoothctl (this is OK): {e}", file=sys.stderr, flush=True)
|
|
|
|
# Try reconnecting without PIN to confirm pairing persisted
|
|
print("Verifying pairing persisted by reconnecting without PIN...", file=sys.stderr, flush=True)
|
|
try:
|
|
meshcore_verify = await asyncio.wait_for(
|
|
MeshCore.create_ble(address=address, debug=False),
|
|
timeout=25.0
|
|
)
|
|
|
|
print("Verification connection successful", file=sys.stderr, flush=True)
|
|
await meshcore_verify.disconnect()
|
|
|
|
# Additional safety: Force disconnect on Linux using bluetoothctl if available
|
|
if is_linux():
|
|
try:
|
|
print("Ensuring verification device is fully disconnected...", file=sys.stderr, flush=True)
|
|
subprocess.run([
|
|
"bluetoothctl", "disconnect", address
|
|
], capture_output=True, timeout=10)
|
|
await asyncio.sleep(1)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e:
|
|
print(f"Could not force disconnect verification device via bluetoothctl (this is OK): {e}", file=sys.stderr, flush=True)
|
|
|
|
print("Pairing verification successful!", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "paired",
|
|
"message": "Pairing and connection verification successful"
|
|
}), flush=True)
|
|
|
|
# Final safety: Ensure device is completely disconnected before returning
|
|
if is_linux():
|
|
try:
|
|
print("Final disconnect to ensure device is ready for packet capture...", file=sys.stderr, flush=True)
|
|
subprocess.run([
|
|
"bluetoothctl", "disconnect", address
|
|
], capture_output=True, timeout=10)
|
|
await asyncio.sleep(1)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e:
|
|
print(f"Could not perform final disconnect via bluetoothctl (this is OK): {e}", file=sys.stderr, flush=True)
|
|
|
|
return True
|
|
|
|
except EOFError:
|
|
# Even verification failed with EOF - but initial pairing worked
|
|
print("Verification disconnected immediately, but pairing may have succeeded", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "paired",
|
|
"message": "Pairing completed but verification unclear - try using the device"
|
|
}), flush=True)
|
|
|
|
# Final safety: Ensure device is completely disconnected before returning
|
|
if is_linux():
|
|
try:
|
|
print("Final disconnect to ensure device is ready for packet capture...", file=sys.stderr, flush=True)
|
|
subprocess.run([
|
|
"bluetoothctl", "disconnect", address
|
|
], capture_output=True, timeout=10)
|
|
await asyncio.sleep(1)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e:
|
|
print(f"Could not perform final disconnect via bluetoothctl (this is OK): {e}", file=sys.stderr, flush=True)
|
|
|
|
return True
|
|
|
|
except Exception as verify_e:
|
|
print(f"Reconnection test failed: {verify_e}", file=sys.stderr, flush=True)
|
|
print("Pairing may have succeeded but verification failed", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "paired",
|
|
"message": "Pairing successful but verification failed - device should work"
|
|
}), flush=True)
|
|
|
|
# Final safety: Ensure device is completely disconnected before returning
|
|
if is_linux():
|
|
try:
|
|
print("Final disconnect to ensure device is ready for packet capture...", file=sys.stderr, flush=True)
|
|
subprocess.run([
|
|
"bluetoothctl", "disconnect", address
|
|
], capture_output=True, timeout=10)
|
|
await asyncio.sleep(1)
|
|
except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e:
|
|
print(f"Could not perform final disconnect via bluetoothctl (this is OK): {e}", file=sys.stderr, flush=True)
|
|
|
|
return True # Still consider success since initial pairing worked
|
|
|
|
except asyncio.TimeoutError:
|
|
print("Pairing connection timed out", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Pairing timed out - device may not be in pairing mode or PIN may be incorrect"
|
|
}), flush=True)
|
|
return False
|
|
|
|
except EOFError as e:
|
|
print("Connection dropped during pairing - authentication likely failed", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Pairing failed - PIN may be incorrect or expired"
|
|
}), flush=True)
|
|
return False
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
print(f"Pairing failed: {error_msg}", file=sys.stderr, flush=True)
|
|
|
|
# Check for specific authentication failure
|
|
if "AuthenticationFailed" in error_msg or "Authentication Failed" in error_msg:
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Authentication failed - PIN may be incorrect, expired, or device not in pairing mode"
|
|
}), flush=True)
|
|
elif "timeout" in error_msg.lower():
|
|
print(json.dumps({
|
|
"status": "pairing_failed",
|
|
"message": "Pairing timed out - device may be busy or not responding to pairing request"
|
|
}), flush=True)
|
|
else:
|
|
print(json.dumps({"status": "pairing_failed", "message": error_msg}), flush=True)
|
|
return False
|
|
|
|
finally:
|
|
if meshcore:
|
|
try:
|
|
await meshcore.disconnect()
|
|
except:
|
|
pass
|
|
|
|
async def verify_paired_connection(address, name):
|
|
"""Verify the device is paired by connecting without PIN"""
|
|
meshcore = None
|
|
try:
|
|
print("Verifying pairing by connecting without PIN...", file=sys.stderr, flush=True)
|
|
meshcore = await asyncio.wait_for(
|
|
MeshCore.create_ble(address=address, debug=False),
|
|
timeout=25.0
|
|
)
|
|
|
|
print("Verification connection successful!", file=sys.stderr, flush=True)
|
|
await meshcore.disconnect()
|
|
|
|
print(json.dumps({
|
|
"status": "paired",
|
|
"message": "Pairing and verification successful"
|
|
}), flush=True)
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Verification failed: {e}", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "paired",
|
|
"message": "Pairing succeeded but verification unclear - device should work"
|
|
}), flush=True)
|
|
return True # Still consider it success since bluetoothctl pairing worked
|
|
finally:
|
|
if meshcore:
|
|
try:
|
|
await meshcore.disconnect()
|
|
except:
|
|
pass
|
|
|
|
def main():
|
|
"""Main function to handle BLE pairing"""
|
|
try:
|
|
if len(sys.argv) < 3:
|
|
print(json.dumps({"status": "error", "message": "Usage: script.py <address> <name> [pin]"}))
|
|
sys.exit(1)
|
|
|
|
address = sys.argv[1]
|
|
name = sys.argv[2]
|
|
pin = sys.argv[3] if len(sys.argv) > 3 else None
|
|
|
|
# On macOS, just attempt to connect - the system will show pairing dialog if needed
|
|
# On Linux/Windows, check pairing status first, then attempt PIN-based pairing if needed
|
|
if is_macos():
|
|
# macOS: Just attempt connection - system handles pairing dialog
|
|
print("Attempting to connect to device (macOS will show pairing dialog if needed)...", file=sys.stderr, flush=True)
|
|
pairing_check = asyncio.run(check_pairing_and_connect(address, name))
|
|
|
|
if pairing_check:
|
|
# Connection successful - device is paired
|
|
sys.exit(0)
|
|
else:
|
|
# Connection failed - may need pairing via system dialog
|
|
# Exit with 1 to indicate pairing needed
|
|
sys.exit(1)
|
|
else:
|
|
# Linux/Windows: Check pairing status first
|
|
if not pin:
|
|
# Only check pairing status if no PIN is provided
|
|
print("Checking if device is already paired...", file=sys.stderr, flush=True)
|
|
pairing_check = asyncio.run(check_pairing_and_connect(address, name))
|
|
|
|
if pairing_check:
|
|
# Device is already paired - check_pairing_and_connect already printed JSON
|
|
# Just exit with success
|
|
sys.exit(0)
|
|
|
|
# Device is not paired (or PIN provided) - attempt PIN-based pairing
|
|
if pin:
|
|
# Attempt pairing with PIN
|
|
print("Device not paired. Attempting to pair with provided PIN...", file=sys.stderr, flush=True)
|
|
success = asyncio.run(attempt_pairing(address, name, pin))
|
|
if success:
|
|
# attempt_pairing already printed JSON on success
|
|
sys.exit(0)
|
|
else:
|
|
# attempt_pairing already printed JSON on failure
|
|
sys.exit(1)
|
|
else:
|
|
# No PIN provided and device not paired
|
|
# check_pairing_and_connect already printed JSON with status
|
|
# Exit with 0 so installer can prompt for PIN
|
|
sys.exit(0)
|
|
|
|
except KeyboardInterrupt:
|
|
print("Operation interrupted by user", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "error",
|
|
"message": "Operation interrupted by user"
|
|
}), flush=True)
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
import traceback
|
|
error_trace = traceback.format_exc()
|
|
print(f"Unexpected error: {e}", file=sys.stderr, flush=True)
|
|
print(f"Traceback: {error_trace}", file=sys.stderr, flush=True)
|
|
print(json.dumps({
|
|
"status": "error",
|
|
"message": f"Unexpected error: {str(e)}"
|
|
}), flush=True)
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |