From dbc07532371fddebfa8bddd68d56dfd94663c751 Mon Sep 17 00:00:00 2001 From: agessaman Date: Tue, 14 Oct 2025 21:26:55 -0700 Subject: [PATCH] Add JWT token management and renewal process to PacketCapture - Implement JWT token storage and renewal logic in packet_capture.py. - Introduce a background task for periodic JWT renewal. - Update install.sh to check existing MQTT broker configurations before setup. - Enhance MQTT broker reconnection logic with renewed tokens. - Add new environment variables for JWT renewal interval and threshold. --- install.sh | 14 ++- packet_capture.py | 220 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 229 insertions(+), 5 deletions(-) diff --git a/install.sh b/install.sh index 39de55f..7d5b0da 100755 --- a/install.sh +++ b/install.sh @@ -1381,8 +1381,13 @@ main() { configure_mqtt_brokers else print_info "Keeping existing configuration" - # Still need to configure MQTT brokers if not already configured - configure_mqtt_brokers_only + # Check if MQTT brokers are already configured + if grep -q "^PACKETCAPTURE_MQTT[1-4]_ENABLED=true" "$INSTALL_DIR/.env.local" 2>/dev/null; then + print_info "MQTT brokers already configured - skipping MQTT configuration" + else + # Still need to configure MQTT brokers if not already configured + configure_mqtt_brokers_only + fi fi elif [ ! -f "$INSTALL_DIR/.env.local" ]; then configure_mqtt_brokers @@ -1522,6 +1527,7 @@ User=$current_user WorkingDirectory=$INSTALL_DIR Environment="PATH=$service_path" ExecStart=$INSTALL_DIR/venv/bin/python3 $INSTALL_DIR/packet_capture.py +ExecStop=/bin/bash -c 'if [ -f $INSTALL_DIR/.env.local ] && grep -q "PACKETCAPTURE_CONNECTION_TYPE=ble" $INSTALL_DIR/.env.local; then BLE_DEVICE=\$(grep "PACKETCAPTURE_BLE_DEVICE=" $INSTALL_DIR/.env.local | cut -d= -f2); if [ -n "\$BLE_DEVICE" ] && command -v bluetoothctl >/dev/null 2>&1; then echo "Disconnecting BLE device \$BLE_DEVICE..."; bluetoothctl disconnect "\$BLE_DEVICE" 2>/dev/null || true; sleep 2; fi; fi' KillMode=process Restart=on-failure RestartSec=10 @@ -1759,6 +1765,10 @@ services: # RF data settings - PACKETCAPTURE_RF_DATA_TIMEOUT=15.0 + + # JWT token renewal settings + - PACKETCAPTURE_JWT_RENEWAL_INTERVAL=3600 + - PACKETCAPTURE_JWT_RENEWAL_THRESHOLD=300 networks: - meshcore-network restart: unless-stopped diff --git a/packet_capture.py b/packet_capture.py index ab0d67b..a12edeb 100644 --- a/packet_capture.py +++ b/packet_capture.py @@ -152,11 +152,19 @@ class PacketCapture: # Private key export capability self.private_key_export_available = False + # JWT token management + self.jwt_tokens = {} # Store tokens per broker: {broker_num: {'token': str, 'expires_at': float}} + self.jwt_renewal_interval = self.get_env_int('JWT_RENEWAL_INTERVAL', 3600) # Check every hour + self.jwt_renewal_threshold = self.get_env_int('JWT_RENEWAL_THRESHOLD', 300) # Renew 5 minutes before expiry + # Advert settings self.advert_interval_hours = self.get_env_int('ADVERT_INTERVAL_HOURS', 1) self.last_advert_time = 0 self.advert_task = None + # JWT renewal task + self.jwt_renewal_task = None + # Output file handle self.output_handle = None if self.output_file: @@ -317,7 +325,7 @@ class PacketCapture: self.logger.error(f"Error creating JWT with private key: {e}") return None - async def create_auth_token_jwt(self, audience: str = None) -> Optional[str]: + async def create_auth_token_jwt(self, audience: str = None, broker_num: int = None) -> Optional[str]: """Create JWT token using private key from device""" # Use private key method (fetched from device) jwt_token = await self.create_jwt_with_private_key(audience) @@ -326,11 +334,143 @@ class PacketCapture: self.logger.info("✓ JWT created using private key from device for MQTT authentication") else: self.logger.info("✓ JWT created using private key from device") + + # Store token with expiry time if broker_num is provided + if broker_num is not None: + import time + import json + import base64 + + # Parse token to get expiry time + try: + parts = jwt_token.split('.') + if len(parts) == 3: + # Decode payload to get expiry + payload_data = base64.urlsafe_b64decode(parts[1] + '==') + payload = json.loads(payload_data) + expires_at = payload.get('exp', time.time() + 86400) # Default 24h if not found + + self.jwt_tokens[broker_num] = { + 'token': jwt_token, + 'expires_at': expires_at, + 'audience': audience + } + + if self.debug: + self.logger.debug(f"JWT token stored for broker {broker_num}, expires at {expires_at}") + except Exception as e: + self.logger.warning(f"Could not parse JWT expiry: {e}") + return jwt_token self.logger.error("Failed to create JWT with private key from device") return None + def is_jwt_token_expired(self, broker_num: int) -> bool: + """Check if JWT token for broker is expired or near expiry""" + if broker_num not in self.jwt_tokens: + return True + + import time + current_time = time.time() + token_info = self.jwt_tokens[broker_num] + expires_at = token_info['expires_at'] + + # Check if token is expired or within renewal threshold + return current_time >= (expires_at - self.jwt_renewal_threshold) + + async def renew_jwt_token(self, broker_num: int) -> bool: + """Renew JWT token for a specific broker""" + try: + if broker_num not in self.jwt_tokens: + self.logger.warning(f"No existing JWT token for broker {broker_num}") + return False + + token_info = self.jwt_tokens[broker_num] + audience = token_info.get('audience') + + self.logger.info(f"Renewing JWT token for broker {broker_num}...") + + # Create new token + new_token = await self.create_auth_token_jwt(audience, broker_num) + if new_token: + self.logger.info(f"✓ JWT token renewed for broker {broker_num}") + return True + else: + self.logger.error(f"Failed to renew JWT token for broker {broker_num}") + return False + + except Exception as e: + self.logger.error(f"Error renewing JWT token for broker {broker_num}: {e}") + return False + + async def check_and_renew_jwt_tokens(self): + """Check all JWT tokens and renew if needed""" + try: + for broker_num in list(self.jwt_tokens.keys()): + if self.is_jwt_token_expired(broker_num): + self.logger.info(f"JWT token for broker {broker_num} needs renewal") + await self.renew_jwt_token(broker_num) + + # Reconnect MQTT broker with new token + await self.reconnect_mqtt_broker_with_new_token(broker_num) + + except Exception as e: + self.logger.error(f"Error checking JWT token renewals: {e}") + + async def reconnect_mqtt_broker_with_new_token(self, broker_num: int): + """Reconnect MQTT broker with renewed JWT token""" + try: + # Find the broker in our client list + broker_info = None + for client_info in self.mqtt_clients: + if client_info['broker_num'] == broker_num: + broker_info = client_info + break + + if not broker_info: + self.logger.warning(f"Broker {broker_num} not found in client list") + return False + + # Check if broker uses auth tokens + use_auth_token = self.get_env_bool(f'MQTT{broker_num}_USE_AUTH_TOKEN', False) + if not use_auth_token: + self.logger.debug(f"Broker {broker_num} doesn't use auth tokens, skipping renewal") + return True + + # Get new token + if broker_num not in self.jwt_tokens: + self.logger.error(f"No JWT token available for broker {broker_num}") + return False + + new_token = self.jwt_tokens[broker_num]['token'] + audience = self.jwt_tokens[broker_num].get('audience', '') + + # Disconnect existing client + mqtt_client = broker_info['client'] + if mqtt_client.is_connected(): + mqtt_client.loop_stop() + mqtt_client.disconnect() + + # Create new client with new token + username = f"v1_{self.device_public_key.upper()}" + mqtt_client.username_pw_set(username, new_token) + + # Reconnect + server = self.get_env(f'MQTT{broker_num}_SERVER', "") + port = self.get_env_int(f'MQTT{broker_num}_PORT', 1883) + keepalive = self.get_env_int(f'MQTT{broker_num}_KEEPALIVE', 60) + + mqtt_client.connect(server, port, keepalive=keepalive) + mqtt_client.loop_start() + + self.logger.info(f"✓ MQTT broker {broker_num} reconnected with new JWT token") + return True + + except Exception as e: + self.logger.error(f"Error reconnecting MQTT broker {broker_num} with new token: {e}") + return False + @@ -545,6 +685,9 @@ class PacketCapture: if self.enable_mqtt: await self.check_mqtt_reconnection() + # Check and renew JWT tokens if needed + await self.check_and_renew_jwt_tokens() + except asyncio.CancelledError: if self.debug: self.logger.debug("Connection monitoring cancelled") @@ -637,7 +780,7 @@ class PacketCapture: self.logger.info(f"MQTT{broker_num}: Using JWT authentication") # Use the JWT creation method with private key from device - password = await self.create_auth_token_jwt(audience) + password = await self.create_auth_token_jwt(audience, broker_num) if not password: self.logger.error(f"MQTT{broker_num}: Failed to generate JWT token") return None @@ -1442,6 +1585,10 @@ class PacketCapture: if self.advert_interval_hours > 0: self.advert_task = asyncio.create_task(self.advert_scheduler()) + # Start JWT renewal scheduler task + if self.jwt_renewal_interval > 0: + self.jwt_renewal_task = asyncio.create_task(self.jwt_renewal_scheduler()) + try: mqtt_check_interval = 10 # Check MQTT reconnection every 10 seconds last_mqtt_check = 0 @@ -1461,6 +1608,8 @@ class PacketCapture: monitoring_task.cancel() if self.advert_task: self.advert_task.cancel() + if self.jwt_renewal_task: + self.jwt_renewal_task.cancel() try: await monitoring_task except asyncio.CancelledError: @@ -1470,6 +1619,11 @@ class PacketCapture: await self.advert_task except asyncio.CancelledError: pass + if self.jwt_renewal_task: + try: + await self.jwt_renewal_task + except asyncio.CancelledError: + pass await self.stop() async def stop(self): @@ -1481,7 +1635,28 @@ class PacketCapture: if self.enable_mqtt and self.mqtt_connected: self.publish_status("offline") - if self.meshcore: + # Handle BLE disconnection if using BLE connection + if self.meshcore and self.get_env('CONNECTION_TYPE', 'ble').lower() == 'ble': + try: + self.logger.info("Disconnecting BLE device...") + await self.meshcore.disconnect() + + # Additional BLE disconnection using bluetoothctl on Linux + import platform + if platform.system() == 'Linux': + try: + import subprocess + ble_device = self.get_env('BLE_DEVICE', '') or self.get_env('BLE_ADDRESS', '') + if ble_device and ble_device != 'Unknown': + self.logger.info(f"Force disconnecting BLE device {ble_device}...") + subprocess.run(['bluetoothctl', 'disconnect', ble_device], + capture_output=True, timeout=10) + await asyncio.sleep(1) # Give time for disconnection + except Exception as e: + self.logger.debug(f"Could not force BLE disconnect via bluetoothctl: {e}") + except Exception as e: + self.logger.warning(f"Error during BLE disconnection: {e}") + elif self.meshcore: await self.meshcore.disconnect() for mqtt_client_info in self.mqtt_clients: @@ -1549,6 +1724,34 @@ class PacketCapture: except Exception as e: self.logger.error(f"Error in advert scheduler: {e}") await asyncio.sleep(60) # Wait 1 minute before retrying + + async def jwt_renewal_scheduler(self): + """Background task to check and renew JWT tokens""" + if self.jwt_renewal_interval <= 0: + if self.debug: + self.logger.debug("JWT renewal scheduling disabled (interval = 0)") + return + + if self.debug: + self.logger.debug(f"Starting JWT renewal scheduler with {self.jwt_renewal_interval} second interval") + + while self.connected: + try: + await asyncio.sleep(self.jwt_renewal_interval) + + if not self.connected: + break + + # Check and renew JWT tokens + await self.check_and_renew_jwt_tokens() + + except asyncio.CancelledError: + if self.debug: + self.logger.debug("JWT renewal scheduler cancelled") + break + except Exception as e: + self.logger.error(f"Error in JWT renewal scheduler: {e}") + await asyncio.sleep(60) # Wait 1 minute before retrying async def main(): @@ -1574,6 +1777,17 @@ async def main(): elif args.verbose: capture.logger.setLevel(logging.INFO) + # Setup signal handlers for graceful shutdown + import signal + + def signal_handler(signum, frame): + capture.logger.info(f"Received signal {signum}, shutting down gracefully...") + capture.should_exit = True + + # Register signal handlers + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + try: await capture.start() except KeyboardInterrupt: