meshcore-packet-capture/auth_token.py
agessaman 6d030af555 Add retry logic for device commands in auth_token and packet_capture modules
- Introduced `_retryable_device_sign` in `auth_token.py` to handle transient errors during device signing operations with exponential backoff.
- Implemented `retryable_device_command` in `packet_capture.py` for executing device commands with timeout and retry logic, enhancing robustness against communication issues.
- Updated various device command calls in `packet_capture.py` to utilize the new retry logic, improving error handling and reliability in device interactions.
2025-12-18 08:04:02 -08:00

1083 lines
44 KiB
Python

#!/usr/bin/env python3
"""
MeshCore Auth Token Generator
Generates JWT-style authentication tokens for MQTT authentication
This implementation uses on-device signing by default (via meshcore_py),
with fallback options to Python signing or meshcore-decoder CLI.
Environment variables:
AUTH_TOKEN_METHOD: Signing method preference
- "device" (default): Use on-device signing via meshcore instance
- "python": Use pure Python implementation with PyNaCl
- "meshcore-decoder": Use meshcore-decoder CLI tool
"""
import json
import base64
import hashlib
import time
import sys
import os
import asyncio
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
try:
import nacl.bindings
import nacl.signing
import nacl.exceptions
except ImportError:
raise ImportError(
"PyNaCl is required for JWT token generation. "
"Please install it with: pip install pynacl"
)
class AuthTokenPayload:
"""JWT-style token payload for MeshCore authentication"""
def __init__(self,
public_key: str,
iat: Optional[int] = None,
exp: Optional[int] = None,
aud: Optional[str] = None,
**kwargs):
self.public_key = public_key.upper()
self.iat = iat if iat is not None else int(time.time())
self.exp = exp
self.aud = aud
self.custom_claims = kwargs
def to_dict(self) -> Dict[str, Any]:
payload = {
'publicKey': self.public_key,
'iat': self.iat
}
if self.exp is not None:
payload['exp'] = self.exp
if self.aud is not None:
payload['aud'] = self.aud
payload.update(self.custom_claims)
return payload
def base64url_encode(data: bytes) -> str:
"""Base64url encode (URL-safe base64 without padding)"""
b64 = base64.b64encode(data).decode('ascii')
return b64.replace('+', '-').replace('/', '_').replace('=', '')
def base64url_decode(data: str) -> bytes:
"""Base64url decode"""
b64 = data.replace('-', '+').replace('_', '/')
padding = 4 - (len(b64) % 4)
if padding != 4:
b64 += '=' * padding
return base64.b64decode(b64)
def hex_to_bytes(hex_str: str) -> bytes:
"""Convert hex string to bytes"""
return bytes.fromhex(hex_str.replace('0x', '').replace(' ', ''))
def bytes_to_hex(data: bytes) -> str:
"""Convert bytes to hex string (lowercase)"""
return data.hex()
def int_to_bytes_le(value: int, length: int) -> bytes:
"""Convert integer to little-endian bytes"""
return value.to_bytes(length, byteorder='little')
def bytes_to_int_le(data: bytes) -> int:
"""Convert little-endian bytes to integer"""
return int.from_bytes(data, byteorder='little')
# Ed25519 group order
L = 2**252 + 27742317777372353535851937790883648493
def ed25519_sign_with_expanded_key(message: bytes, scalar: bytes, prefix: bytes, public_key: bytes) -> bytes:
"""
Sign a message using Ed25519 with pre-expanded key (orlp format)
This implements RFC 8032 Ed25519 signing with an already-expanded key.
This matches exactly how orlp/ed25519's ed25519_sign() works.
Args:
message: Message to sign
scalar: First 32 bytes of orlp private key (clamped scalar)
prefix: Last 32 bytes of orlp private key (prefix for nonce)
public_key: 32-byte public key
Returns:
64-byte signature (R || s)
"""
# Step 1: Compute nonce r = H(prefix || message) mod L
h_r = hashlib.sha512(prefix + message).digest()
r = bytes_to_int_le(h_r) % L
# Step 2: Compute R = r * B (base point multiplication)
r_bytes = int_to_bytes_le(r, 32)
R = nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(r_bytes)
# Step 3: Compute challenge k = H(R || public_key || message) mod L
h_k = hashlib.sha512(R + public_key + message).digest()
k = bytes_to_int_le(h_k) % L
# Step 4: Compute s = (r + k * scalar) mod L
scalar_int = bytes_to_int_le(scalar)
s = (r + k * scalar_int) % L
s_bytes = int_to_bytes_le(s, 32)
# Step 5: Signature is R || s
return R + s_bytes
def _create_auth_token_internal(
payload: AuthTokenPayload,
private_key_hex: str,
public_key_hex: str
) -> str:
"""
Internal function to create a signed authentication token
This signs DIRECTLY with the 64-byte orlp private key.
NO SEED REQUIRED! This matches exactly how meshcore-decoder works.
Args:
payload: Token payload containing claims
private_key_hex: 64-byte private key in hex (orlp format: scalar || prefix)
public_key_hex: 32-byte public key in hex
Returns:
JWT-style token string in format: header.payload.signature
"""
# Create header
header = {
'alg': 'Ed25519',
'typ': 'JWT'
}
# Ensure publicKey is in the payload (normalize to uppercase)
payload.public_key = public_key_hex.upper()
# Encode header and payload as JSON (compact format)
header_json = json.dumps(header, separators=(',', ':'))
payload_json = json.dumps(payload.to_dict(), separators=(',', ':'))
# Encode to UTF-8 bytes
header_bytes = header_json.encode('utf-8')
payload_bytes = payload_json.encode('utf-8')
# Base64url encode
header_encoded = base64url_encode(header_bytes)
payload_encoded = base64url_encode(payload_bytes)
# Create signing input
signing_input = f"{header_encoded}.{payload_encoded}"
signing_input_bytes = signing_input.encode('utf-8')
# Parse keys
private_bytes = hex_to_bytes(private_key_hex)
public_bytes = hex_to_bytes(public_key_hex)
if len(private_bytes) != 64:
raise ValueError(f"Private key must be 64 bytes, got {len(private_bytes)}")
if len(public_bytes) != 32:
raise ValueError(f"Public key must be 32 bytes, got {len(public_bytes)}")
# Extract scalar and prefix from orlp private key
scalar = private_bytes[:32]
prefix = private_bytes[32:64]
# Sign using Ed25519 with expanded key (no seed required!)
signature_bytes = ed25519_sign_with_expanded_key(
signing_input_bytes,
scalar,
prefix,
public_bytes
)
# Convert signature to hex (lowercase)
signature_hex = bytes_to_hex(signature_bytes)
# Return JWT format with hex signature
return f"{header_encoded}.{payload_encoded}.{signature_hex}"
async def _retryable_device_sign(
sign_func,
command_name: str = "sign",
timeout: float = 20.0,
max_retries: int = 3,
retry_delay: float = 0.3,
backoff_multiplier: float = 1.5
):
"""
Execute a device signing command with timeout and retry logic.
Args:
sign_func: Async function that returns a meshcore Event
command_name: Name of the command for logging
timeout: Timeout in seconds for each attempt
max_retries: Maximum number of retry attempts (including initial attempt)
retry_delay: Initial delay between retries in seconds
backoff_multiplier: Multiplier for exponential backoff
Returns:
Event object from the command
Raises:
Exception: If all retries fail
"""
from meshcore import EventType
last_error = None
current_delay = retry_delay
for attempt in range(max_retries):
try:
# Add small delay between retries (except first attempt)
if attempt > 0:
await asyncio.sleep(current_delay)
current_delay *= backoff_multiplier # Exponential backoff
# Execute command with timeout
result = await asyncio.wait_for(
sign_func(),
timeout=timeout
)
# Check if result is an error
if result and hasattr(result, 'type'):
if result.type == EventType.ERROR:
error_payload = result.payload if hasattr(result, 'payload') else {}
error_reason = error_payload.get('reason', 'unknown')
# Check if it's a transient error that we should retry
if error_reason == 'no_event_received' and attempt < max_retries - 1:
last_error = f"{command_name} failed: {error_reason}"
logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
# Permanent error or last attempt
raise Exception(f"{command_name} failed: {error_payload}")
else:
# Success - return the result
if attempt > 0:
logger.debug(f"{command_name} succeeded on attempt {attempt + 1}")
return result
else:
# Unexpected result format
raise Exception(f"{command_name} returned unexpected result format")
except asyncio.TimeoutError:
last_error = f"{command_name} timed out after {timeout}s"
if attempt < max_retries - 1:
logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
else:
raise Exception(f"{last_error} (all {max_retries} attempts exhausted)")
except Exception as e:
# Re-raise if it's not a retryable error
if "failed:" in str(e) and attempt < max_retries - 1:
last_error = str(e)
logger.debug(f"{last_error} (attempt {attempt + 1}/{max_retries})")
continue
raise
# All retries failed
raise Exception(f"{command_name} failed after {max_retries} attempts: {last_error}")
async def _create_auth_token_with_device(
payload: AuthTokenPayload,
public_key_hex: str,
meshcore_instance,
chunk_size: int = None
) -> str:
"""
Create JWT token using on-device signing via meshcore_py
Args:
payload: Token payload containing claims
public_key_hex: 32-byte public key in hex
meshcore_instance: Connected MeshCore instance
Returns:
JWT-style token string
"""
# Import meshcore here to avoid circular imports
try:
from meshcore import EventType
except ImportError:
raise ImportError("meshcore package required for on-device signing")
if not meshcore_instance:
raise Exception("MeshCore instance not provided")
if not meshcore_instance.is_connected:
raise Exception("MeshCore instance not connected")
if not hasattr(meshcore_instance, 'commands'):
raise Exception("MeshCore instance does not support commands")
# Ensure device is ready - wait a brief moment if needed
# (Some operations like private key export might need the device to be ready)
# Also ensure connection is still active
if not meshcore_instance.is_connected:
raise Exception("MeshCore instance disconnected before signing")
await asyncio.sleep(0.2)
# IMPORTANT: The device signs with self_id.sign(), which uses the device's LocalIdentity
# The LocalIdentity has its own pub_key and prv_key that may differ from the exported key.
# We MUST use the device's actual signing public key (from self_id) in the JWT payload
# so the signature will verify correctly.
#
# According to the firmware: self_id.sign() calls ed25519_sign() with:
# - pub_key from LocalIdentity (self_id)
# - prv_key from LocalIdentity (self_id)
# This is standard Ed25519 (RFC 8032), so the signature will verify with the matching public key.
device_signing_public_key = public_key_hex # Default to provided key
if hasattr(meshcore_instance, 'self_info') and meshcore_instance.self_info:
device_info_public_key = meshcore_instance.self_info.get('public_key', '')
if device_info_public_key:
# Normalize to hex string if it's bytes
if isinstance(device_info_public_key, bytes):
device_info_public_key = device_info_public_key.hex()
device_signing_public_key = device_info_public_key.upper()
if device_signing_public_key != public_key_hex.upper():
logger.debug("⚠️ Device's self_id public key differs from exported key")
logger.debug(" Using device's self_id public key for JWT payload (required for verification)")
logger.debug(f" Device signing key (self_id): {device_signing_public_key[:32]}...")
logger.debug(f" Exported key: {public_key_hex[:32]}...")
else:
logger.debug("✓ Device's self_id public key matches exported key")
else:
logger.debug("⚠️ Could not get device's self_info, using provided public key")
logger.debug(" If signature doesn't verify, device may be using different key")
# Update payload with the device's actual signing public key (from self_id)
# This is critical: the signature was created with self_id's private key,
# so it will only verify with self_id's public key
payload.public_key = device_signing_public_key
# Create header and payload
header = {
'alg': 'Ed25519',
'typ': 'JWT'
}
# Encode header and payload as JSON (compact format)
header_json = json.dumps(header, separators=(',', ':'))
payload_json = json.dumps(payload.to_dict(), separators=(',', ':'))
# Base64url encode
header_encoded = base64url_encode(header_json.encode('utf-8'))
payload_encoded = base64url_encode(payload_json.encode('utf-8'))
# Create signing input
signing_input = f"{header_encoded}.{payload_encoded}"
signing_input_bytes = signing_input.encode('utf-8')
# Debug: Output the signing input for troubleshooting
logger.debug("Signing input (message to be signed):")
logger.debug(f" Full string: {signing_input}")
logger.debug(f" Length: {len(signing_input)} characters ({len(signing_input_bytes)} bytes)")
logger.debug(f" Hex representation: {signing_input_bytes.hex()}")
logger.debug(f" Header part: {header_encoded}")
logger.debug(f" Payload part: {payload_encoded}")
# Use meshcore_py signing API
# The sign() method handles chunking internally via sign_start/sign_data/sign_finish
# Based on meshcore_py implementation and example: https://github.com/agessaman/meshcore_py/blob/dev/examples/ble_sign_example.py
if not hasattr(meshcore_instance.commands, 'sign'):
raise Exception("Device signing method 'sign()' not available")
# Debug: Log what we're about to sign
logger.debug("About to sign with device:")
logger.debug(f" Data length: {len(signing_input_bytes)} bytes")
logger.debug(f" Data (first 100 bytes hex): {signing_input_bytes[:100].hex()}")
logger.debug(f" Data (last 100 bytes hex): {signing_input_bytes[-100:].hex() if len(signing_input_bytes) > 100 else signing_input_bytes.hex()}")
# For debugging: manually implement the signing flow to see what's happening
# This will help us verify if sign() is working correctly
if os.getenv('DEBUG_DEVICE_SIGNING', '').lower() == 'true':
logger.debug("Using manual signing flow for debugging")
try:
from meshcore import EventType
# Verify device's public key matches what we expect
# The device signs with self_id, which might be different from exported key
try:
device_id_result = await meshcore_instance.commands.get_id()
if device_id_result and device_id_result.type != EventType.ERROR:
device_id = device_id_result.payload.get("id", {})
device_public_key = device_id.get("public_key", "")
if device_public_key:
device_public_key_hex = device_public_key.hex() if isinstance(device_public_key, bytes) else device_public_key
expected_public_key_hex = public_key_hex.upper()
logger.debug(f"Device public key from get_id: {device_public_key_hex[:32]}...")
logger.debug(f"Expected public key: {expected_public_key_hex[:32]}...")
if device_public_key_hex.upper() != expected_public_key_hex:
logger.debug("⚠️ WARNING: Device public key does NOT match exported key!")
logger.debug(" Device will sign with different key than expected")
else:
logger.debug("✓ Device public key matches exported key")
except Exception as e:
logger.debug(f"Could not verify device public key: {e}")
# Manual sign_start
start_evt = await meshcore_instance.commands.sign_start()
if start_evt.type == EventType.ERROR:
raise Exception(f"sign_start failed: {start_evt.payload}")
max_len = start_evt.payload.get("max_length", 0)
logger.debug(f"sign_start: max_length={max_len}")
# Manual sign_data chunks - use provided chunk_size or library's default
# Get default chunk_size from sign() method if available, otherwise use 120
default_chunk_size = chunk_size if chunk_size is not None else 120
if default_chunk_size is None or default_chunk_size <= 0:
# Try to get default from function signature
if hasattr(meshcore_instance.commands.sign, '__defaults__'):
import inspect
sig = inspect.signature(meshcore_instance.commands.sign)
if 'chunk_size' in sig.parameters:
default_chunk_size = sig.parameters['chunk_size'].default if sig.parameters['chunk_size'].default != inspect.Parameter.empty else 120
if default_chunk_size is None or default_chunk_size <= 0:
default_chunk_size = 120
total_sent = 0
for idx in range(0, len(signing_input_bytes), default_chunk_size):
chunk = signing_input_bytes[idx:idx + default_chunk_size]
logger.debug(f"Sending chunk {idx//default_chunk_size + 1}: {len(chunk)} bytes (offset {idx})")
logger.debug(f" Chunk hex (first 32): {chunk[:32].hex()}")
logger.debug(f" Chunk hex (last 32): {chunk[-32:].hex() if len(chunk) >= 32 else chunk.hex()}")
data_evt = await meshcore_instance.commands.sign_data(chunk)
if data_evt.type == EventType.ERROR:
raise Exception(f"sign_data failed: {data_evt.payload}")
total_sent += len(chunk)
# Small delay between chunks to ensure device processes them
await asyncio.sleep(0.01)
logger.debug(f"Total bytes sent: {total_sent} (expected: {len(signing_input_bytes)})")
# Manual sign_finish with retry logic
logger.debug("Calling sign_finish...")
sig_evt = await _retryable_device_sign(
lambda: meshcore_instance.commands.sign_finish(timeout=None, data_size=len(signing_input_bytes)),
"sign_finish",
timeout=20.0,
max_retries=3,
retry_delay=0.3
)
except Exception as e:
logger.debug(f"Manual signing flow failed: {e}")
raise
else:
# Use the high-level sign() method with library defaults and retry logic
# Call sign() - it internally:
# 1. Calls sign_start() to initialize
# 2. Calls sign_data() for each chunk (using library's default chunk_size)
# 3. Calls sign_finish() with calculated timeout based on data_size
# The timeout parameter lets sign_finish() calculate timeout based on data_size
# For JWT tokens (~372 bytes), it will use at least 15 seconds
# Wrap in retry logic to handle transient BLE communication issues
async def sign_with_retry():
# Try with timeout first (newer dev branch), fall back if not supported
try:
return await meshcore_instance.commands.sign(
signing_input_bytes,
timeout=None # Let sign_finish() calculate timeout based on data_size (15s minimum)
)
except TypeError:
# Older version doesn't support timeout parameter
return await meshcore_instance.commands.sign(
signing_input_bytes
)
# Use retry helper with longer timeout for signing operations
sig_evt = await _retryable_device_sign(
sign_with_retry,
"sign",
timeout=20.0, # Longer timeout for signing (device needs time to process)
max_retries=3,
retry_delay=0.3
)
# Check for error first (as shown in example)
if sig_evt.type == EventType.ERROR:
raise Exception(f"Signing failed: {sig_evt.payload}")
# Get signature (as shown in example)
signature_bytes = sig_evt.payload.get("signature", b"")
if not signature_bytes:
raise Exception("No signature in response")
# Debug: Check signature format
logger.debug("Signature from device:")
logger.debug(f" Type: {type(signature_bytes)}")
if isinstance(signature_bytes, bytes):
logger.debug(f" Length: {len(signature_bytes)} bytes")
logger.debug(f" Hex (first 32): {signature_bytes[:32].hex()}")
logger.debug(f" Hex (last 32): {signature_bytes[-32:].hex()}")
else:
logger.debug(f" Value: {signature_bytes}")
# Convert signature to hex
if isinstance(signature_bytes, bytes):
signature_hex = signature_bytes.hex()
else:
signature_hex = signature_bytes
# Return JWT format with hex signature
return f"{header_encoded}.{payload_encoded}.{signature_hex}"
def _create_auth_token_with_meshcore_decoder(
payload: AuthTokenPayload,
public_key_hex: str,
private_key_hex: str
) -> str:
"""
Create JWT token using meshcore-decoder CLI tool
Args:
payload: Token payload containing claims
public_key_hex: 32-byte public key in hex
private_key_hex: 64-byte private key in hex
Returns:
JWT-style token string
"""
import subprocess
import shutil
import tempfile
import json
# Try to use meshcore-decoder CLI directly first (it's a wrapper around npx)
decoder_cmd = shutil.which('meshcore-decoder')
if decoder_cmd:
# Use the CLI command directly - it handles npx internally
payload_dict = payload.to_dict()
# Extract claims (excluding iat/exp which are handled by the CLI)
claims = {k: v for k, v in payload_dict.items() if k not in ('iat', 'exp', 'publicKey')}
# Build the claims JSON
claims_json = json.dumps(claims) if claims else '{}'
# Calculate expiry (CLI uses seconds from now)
current_time = int(time.time())
expiry_seconds = payload_dict.get('exp', current_time + 86400) - current_time
# Use meshcore-decoder auth-token command
# Format: meshcore-decoder auth-token <public-key> <private-key> --claims <json> --exp <seconds>
try:
result = subprocess.run(
[decoder_cmd, 'auth-token', public_key_hex, private_key_hex,
'--claims', claims_json, '--exp', str(expiry_seconds)],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
token = result.stdout.strip()
if token and token.count('.') == 2:
return token
except Exception as e:
# CLI not available or failed, fall back to script method
pass
# Fallback: Use Node.js script with require (for compatibility)
# Create Node.js script to generate token
payload_dict = payload.to_dict()
node_script = f"""
const {{ Utils }} = require('@michaelhart/meshcore-decoder');
async function createToken() {{
const payload = {json.dumps(payload_dict)};
const privateKey = '{private_key_hex}';
const publicKey = '{public_key_hex}';
try {{
const token = await Utils.createAuthToken(payload, privateKey, publicKey);
console.log(token);
}} catch (error) {{
console.error(JSON.stringify({{ error: error.message }}));
process.exit(1);
}}
}}
createToken();
"""
# Write script to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as f:
f.write(node_script)
script_path = f.name
try:
# First try running with node directly (module might be installed globally)
result = subprocess.run(
['node', script_path],
capture_output=True,
text=True,
timeout=10
)
# If that fails with module not found, use npx to run it
# (meshcore-decoder is typically a wrapper around npx)
if result.returncode != 0 and ('Cannot find module' in result.stderr or 'MODULE_NOT_FOUND' in result.stderr):
npx_cmd = shutil.which('npx')
if npx_cmd:
# Use npx to run the script file directly
# npx will automatically download and make the module available
# when running node with a script file
result = subprocess.run(
['npx', '-y', '--', 'node', script_path],
capture_output=True,
text=True,
timeout=30, # Longer timeout for npx to download if needed
env={**os.environ, 'NODE_PATH': ''} # Clear NODE_PATH to let npx handle module resolution
)
# If that still fails, try using npx with the package explicitly
if result.returncode != 0 and ('Cannot find module' in result.stderr or 'MODULE_NOT_FOUND' in result.stderr):
# Create a wrapper script that npx can use
wrapper_script = f"""
const {{ Utils }} = require('@michaelhart/meshcore-decoder');
const payload = {json.dumps(payload_dict)};
const privateKey = '{private_key_hex}';
const publicKey = '{public_key_hex}';
Utils.createAuthToken(payload, privateKey, publicKey).then(token => {{
console.log(token);
}}).catch(error => {{
console.error(JSON.stringify({{ error: error.message }}));
process.exit(1);
}});
"""
# Write wrapper to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False) as wrapper_file:
wrapper_file.write(wrapper_script)
wrapper_path = wrapper_file.name
try:
# Use npx to run node with the package available
# The trick is to use npx's package resolution by running from a temp directory
import tempfile as tf
with tf.TemporaryDirectory() as tmpdir:
# Copy script to temp dir
import shutil as sh
tmp_script = os.path.join(tmpdir, 'script.js')
sh.copy2(script_path, tmp_script)
# Run npx from temp dir - it will create node_modules there
result = subprocess.run(
['npx', '-y', '--package=@michaelhart/meshcore-decoder', '--', 'node', tmp_script],
capture_output=True,
text=True,
timeout=30,
cwd=tmpdir
)
finally:
# Clean up wrapper if it was created
if 'wrapper_path' in locals():
try:
os.unlink(wrapper_path)
except:
pass
else:
# No npx available, raise the original error
error_msg = result.stderr.strip() or result.stdout.strip()
raise ModuleNotFoundError(
f"meshcore-decoder module not installed and npx is not available. "
f"Install with: npm install -g @michaelhart/meshcore-decoder\n"
f"Error: {error_msg}"
)
if result.returncode != 0:
error_msg = result.stderr.strip() or result.stdout.strip()
# Check if it's a module not found error
if 'Cannot find module' in error_msg or 'MODULE_NOT_FOUND' in error_msg:
raise ModuleNotFoundError(
f"meshcore-decoder module not installed. "
f"Install with: npm install -g @michaelhart/meshcore-decoder\n"
f"Error: {error_msg}"
)
raise Exception(f"meshcore-decoder error: {error_msg}")
token = result.stdout.strip()
if not token or token.count('.') != 2:
raise Exception(f"Invalid token format from meshcore-decoder: {token[:80]}...")
return token
finally:
os.unlink(script_path)
def create_auth_token(
public_key_hex: str,
private_key_hex: str = None,
expiry_seconds: int = 86400,
meshcore_instance = None,
**claims
) -> str:
"""
Create a JWT-style auth token for MeshCore MQTT authentication
This function supports multiple signing methods:
1. On-device signing (default if meshcore_instance provided)
2. Python implementation with PyNaCl
3. meshcore-decoder CLI tool
The method is determined by:
- If meshcore_instance is provided and AUTH_TOKEN_METHOD != "python" or "meshcore-decoder": use device
- If AUTH_TOKEN_METHOD == "meshcore-decoder": use meshcore-decoder CLI
- Otherwise: use Python implementation
Args:
public_key_hex: 32-byte public key in hex format
private_key_hex: 64-byte private key in hex format (MeshCore format)
Required unless using on-device signing
expiry_seconds: Token expiry time in seconds (default 24 hours)
meshcore_instance: Optional connected MeshCore instance for on-device signing
**claims: Additional JWT claims (e.g., audience="mqtt.example.com", sub="device-123")
Returns:
JWT-style token string
Raises:
ValueError: If keys are invalid format or length
Exception: If token generation fails
"""
try:
# Calculate expiration time
current_time = int(time.time())
exp_time = current_time + expiry_seconds
# Extract 'aud' and 'exp' from claims if present (these are handled separately)
aud = claims.pop('aud', None)
# Remove 'exp' from claims if present - we use expiry_seconds parameter instead
claims.pop('exp', None)
# Create payload with expiration and claims
payload = AuthTokenPayload(
public_key=public_key_hex,
iat=current_time,
exp=exp_time,
aud=aud,
**claims
)
# Determine signing method from environment variable
auth_method = os.getenv('AUTH_TOKEN_METHOD', '').lower().strip()
# Use on-device signing if:
# 1. meshcore_instance is provided AND
# 2. AUTH_TOKEN_METHOD is not explicitly set to "python" or "meshcore-decoder"
use_device = (
meshcore_instance is not None and
auth_method not in ('python', 'meshcore-decoder')
)
# Use meshcore-decoder if explicitly requested
use_decoder = auth_method == 'meshcore-decoder'
if use_device:
# On-device signing (async)
# Check if we're in an async context
try:
loop = asyncio.get_running_loop()
# We're in an async context, but this function is sync
# Raise an error suggesting to use async version
raise RuntimeError(
"Cannot use on-device signing in sync context. "
"Use create_auth_token_async() instead, or set AUTH_TOKEN_METHOD=python"
)
except RuntimeError:
# No running loop, create one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
token = loop.run_until_complete(
_create_auth_token_with_device(payload, public_key_hex, meshcore_instance)
)
finally:
loop.close()
elif use_decoder:
# meshcore-decoder CLI
if not private_key_hex:
raise ValueError("private_key_hex required for meshcore-decoder method")
token = _create_auth_token_with_meshcore_decoder(payload, public_key_hex, private_key_hex)
else:
# Python implementation (default fallback)
if not private_key_hex:
raise ValueError("private_key_hex required for Python signing method")
token = _create_auth_token_internal(payload, private_key_hex, public_key_hex)
# Validate token format
if not token or token.count('.') != 2:
raise Exception(f"Invalid token format generated: {token[:80]}...")
return token
except ValueError as e:
raise ValueError(f"Invalid key format: {str(e)}")
except Exception as e:
raise Exception(f"Failed to generate auth token: {str(e)}")
async def create_auth_token_async(
public_key_hex: str,
private_key_hex: str = None,
expiry_seconds: int = 86400,
meshcore_instance = None,
chunk_size: int = None,
**claims
) -> str:
"""
Async version of create_auth_token for use in async contexts
This version supports on-device signing without requiring event loop management.
Use this when calling from async functions.
Args:
public_key_hex: 32-byte public key in hex format
private_key_hex: 64-byte private key in hex format (MeshCore format)
Required unless using on-device signing
expiry_seconds: Token expiry time in seconds (default 24 hours)
meshcore_instance: Optional connected MeshCore instance for on-device signing
**claims: Additional JWT claims (e.g., audience="mqtt.example.com", sub="device-123")
Returns:
JWT-style token string
Raises:
ValueError: If keys are invalid format or length
Exception: If token generation fails
"""
# Calculate expiration time
current_time = int(time.time())
exp_time = current_time + expiry_seconds
# Extract 'aud' and 'exp' from claims if present
aud = claims.pop('aud', None)
claims.pop('exp', None)
# Create payload with expiration and claims
payload = AuthTokenPayload(
public_key=public_key_hex,
iat=current_time,
exp=exp_time,
aud=aud,
**claims
)
# Determine signing method from environment variable
auth_method = os.getenv('AUTH_TOKEN_METHOD', '').lower().strip()
# Use on-device signing if:
# 1. meshcore_instance is provided AND
# 2. AUTH_TOKEN_METHOD is not explicitly set to "python" or "meshcore-decoder"
use_device = (
meshcore_instance is not None and
auth_method not in ('python', 'meshcore-decoder')
)
# Use meshcore-decoder if explicitly requested
use_decoder = auth_method == 'meshcore-decoder'
if use_device:
# On-device signing (async)
# Try device signing, but fall back to Python if it fails
try:
token = await _create_auth_token_with_device(payload, public_key_hex, meshcore_instance, chunk_size=chunk_size)
except Exception as device_error:
# Device signing failed, try to fetch private key for fallback
fallback_private_key = private_key_hex
# If no private key was provided, try to fetch it from device or config
if not fallback_private_key:
logger.debug("Device signing failed, attempting to fetch private key for fallback...")
fallback_private_key = await _fetch_private_key_for_fallback(
meshcore_instance=meshcore_instance,
public_key_hex=public_key_hex
)
# If we still don't have a private key, raise an error
if not fallback_private_key:
raise Exception(
f"Device signing failed ({str(device_error)}) and no private key available for fallback. "
"Please provide private_key_hex, ensure device supports private key export, "
"or set PACKETCAPTURE_PRIVATE_KEY environment variable."
)
# Log the fallback
logger.warning(
f"Device signing failed ({str(device_error)}), falling back to Python signing "
f"with {'provided' if private_key_hex else 'fetched'} private key"
)
# Fall back to Python implementation
token = _create_auth_token_internal(payload, fallback_private_key, public_key_hex)
elif use_decoder:
# meshcore-decoder CLI (sync, but we're in async context)
if not private_key_hex:
raise ValueError("private_key_hex required for meshcore-decoder method")
token = _create_auth_token_with_meshcore_decoder(payload, public_key_hex, private_key_hex)
else:
# Python implementation (sync, but we're in async context)
if not private_key_hex:
raise ValueError("private_key_hex required for Python signing method")
token = _create_auth_token_internal(payload, private_key_hex, public_key_hex)
# Validate token format
if not token or token.count('.') != 2:
raise Exception(f"Invalid token format generated: {token[:80]}...")
return token
def read_private_key_file(filepath: str) -> str:
"""Read private key from file (64-byte hex format)"""
try:
with open(filepath, 'r') as f:
key = f.read().strip()
key = ''.join(key.split())
if len(key) != 128: # 64 bytes = 128 hex chars
raise ValueError(f"Invalid private key length: {len(key)} (expected 128)")
int(key, 16)
return key
except FileNotFoundError:
raise Exception(f"Private key file not found: {filepath}")
except ValueError as e:
raise Exception(f"Invalid private key format: {str(e)}")
async def _fetch_private_key_for_fallback(
meshcore_instance = None,
public_key_hex: str = None
) -> Optional[str]:
"""
Attempt to fetch private key from device or config for fallback signing
Tries in order:
1. Export from device (if meshcore_instance is available and connected)
2. Environment variable (PACKETCAPTURE_PRIVATE_KEY or PRIVATE_KEY)
3. File (PACKETCAPTURE_PRIVATE_KEY_FILE or PRIVATE_KEY_FILE)
Args:
meshcore_instance: Optional connected MeshCore instance
public_key_hex: Optional public key to verify the fetched private key matches
Returns:
Private key as hex string (64 bytes = 128 hex chars), or None if not found
"""
# Try 1: Export from device
if meshcore_instance and meshcore_instance.is_connected:
try:
from meshcore import EventType
if hasattr(meshcore_instance, 'commands') and hasattr(meshcore_instance.commands, 'export_private_key'):
logger.debug("Attempting to export private key from device for fallback...")
result = await meshcore_instance.commands.export_private_key()
if result.type == EventType.PRIVATE_KEY:
device_private_key = result.payload.get("private_key")
if device_private_key:
# Convert to hex string if it's bytes
if isinstance(device_private_key, bytes):
device_private_key = device_private_key.hex()
elif isinstance(device_private_key, bytearray):
device_private_key = bytes(device_private_key).hex()
# Validate length
if len(device_private_key) == 128: # 64 bytes = 128 hex chars
logger.debug("✓ Successfully exported private key from device for fallback")
return device_private_key
else:
logger.debug(f"Exported private key has wrong length: {len(device_private_key)} (expected 128)")
else:
logger.debug("Device returned empty private key")
elif result.type == EventType.DISABLED:
logger.debug("Private key export is disabled on device")
elif result.type == EventType.ERROR:
logger.debug(f"Device returned error when exporting private key: {result.payload}")
else:
logger.debug(f"Unexpected response type when exporting private key: {result.type}")
except Exception as e:
logger.debug(f"Failed to export private key from device: {e}")
# Try 2: Environment variable
env_keys = ['PACKETCAPTURE_PRIVATE_KEY', 'PRIVATE_KEY']
for env_key in env_keys:
env_private_key = os.getenv(env_key, '').strip()
if env_private_key:
# Remove whitespace
env_private_key = ''.join(env_private_key.split())
if len(env_private_key) == 128:
try:
# Validate it's valid hex
int(env_private_key, 16)
logger.debug(f"✓ Found private key in environment variable: {env_key}")
return env_private_key
except ValueError:
logger.debug(f"Private key in {env_key} is not valid hex")
else:
logger.debug(f"Private key in {env_key} has wrong length: {len(env_private_key)} (expected 128)")
# Try 3: File
file_keys = ['PACKETCAPTURE_PRIVATE_KEY_FILE', 'PRIVATE_KEY_FILE']
for file_key in file_keys:
private_key_file = os.getenv(file_key, '').strip()
if private_key_file:
try:
from pathlib import Path
if Path(private_key_file).exists():
private_key = read_private_key_file(private_key_file)
logger.debug(f"✓ Successfully read private key from file: {private_key_file}")
return private_key
else:
logger.debug(f"Private key file not found: {private_key_file}")
except Exception as e:
logger.debug(f"Failed to read private key from file {private_key_file}: {e}")
logger.debug("No private key found in device, environment, or file")
return None
if __name__ == "__main__":
# Test/CLI usage
if len(sys.argv) < 3:
print("Usage: python auth_token.py <public_key_hex> <private_key_hex_or_file>")
sys.exit(1)
public_key = sys.argv[1]
private_key_input = sys.argv[2]
if len(private_key_input) < 128:
try:
private_key = read_private_key_file(private_key_input)
print(f"Loaded private key from: {private_key_input}")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
else:
private_key = private_key_input
try:
token = create_auth_token(public_key, private_key)
print(f"Generated token: {token}")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)