import asyncio import json import os import html import time import subprocess from contextlib import asynccontextmanager from datetime import datetime, timezone from dataclasses import asdict from typing import Any, Dict, Optional, Set, List, Tuple import httpx import paho.mqtt.client as mqtt import qrcode from fastapi import ( FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException, Query, ) from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, Response from fastapi.staticfiles import StaticFiles from urllib.parse import urlencode, parse_qsl, urlsplit, urlunsplit from io import BytesIO from PIL import Image, ImageDraw import math import state from boundary import ( get_map_boundary_name, get_map_boundary_points, load_map_boundary, within_map_boundary, ) from decoder import ( ROUTE_PAYLOAD_TYPES_SET, _append_heat_points, _coords_are_zero, _device_id_from_topic, _ensure_node_decoder, _normalize_lat_lon, _normalize_role, _rebuild_node_hash_map, _route_points_from_hashes, _route_points_from_device_ids, _safe_preview, _serialize_heat_events, _try_parse_payload, DIRECT_COORDS_TOPIC_RE, _node_ready_once, _node_unavailable_once, ) from history import ( _load_route_history, PEER_HISTORY_BUCKET_SECONDS, _peer_history_cutoff, _prune_peer_history, _prune_route_history, _rebuild_peer_history_from_segments, _record_route_history, _route_history_saver, ) from backup import _backup_loop from weather import create_weather_router from turnstile import TurnstileVerifier from los import ( _fetch_elevations, _find_los_peaks, _find_los_suggestion, _los_effective_elevations, _haversine_m, _los_max_obstruction, _sample_los_points, ) from config import ( MQTT_HOST, MQTT_PORT, MQTT_USERNAME, MQTT_PASSWORD, MQTT_TOPIC, MQTT_TOPICS, MQTT_TLS, MQTT_TLS_INSECURE, MQTT_CA_CERT, MQTT_TRANSPORT, MQTT_WS_PATH, MQTT_CLIENT_ID, STATE_DIR, STATE_FILE, DEVICE_ROLES_FILE, DEVICE_COORDS_FILE, NEIGHBOR_OVERRIDES_FILE, BACKUP_ENABLED, BACKUP_INTERVAL_SECONDS, BACKUP_DIR, BACKUP_RETENTION_DAYS, STATE_SAVE_INTERVAL, DEVICE_TTL_WINDOW_SECONDS, PATH_TTL_SECONDS, TRAIL_LEN, ROUTE_TTL_SECONDS, ROUTE_PAYLOAD_TYPES, ROUTE_PATH_MAX_LEN, ROUTE_HISTORY_ENABLED, ROUTE_HISTORY_HOURS, ROUTE_HISTORY_MAX_SEGMENTS, ROUTE_HISTORY_FILE, ROUTE_HISTORY_PAYLOAD_TYPES, ROUTE_HISTORY_ALLOWED_MODES, ROUTE_HISTORY_COMPACT_INTERVAL, HISTORY_EDGE_SAMPLE_LIMIT, MESSAGE_ORIGIN_TTL_SECONDS, HEAT_TTL_SECONDS, MQTT_ONLINE_SECONDS, MQTT_ONLINE_STATUS_TTL_SECONDS, MQTT_ONLINE_INTERNAL_TTL_SECONDS, MQTT_ACTIVITY_PACKETS_TTL_SECONDS, MQTT_SEEN_BROADCAST_MIN_SECONDS, MQTT_ONLINE_FORCE_NAMES_SET, MQTT_STATUS_OFFLINE_VALUES_SET, PEERS_DEFAULT_LIMIT, DEBUG_PAYLOAD, DEBUG_PAYLOAD_MAX, TURNSTILE_ENABLED, TURNSTILE_SITE_KEY, TURNSTILE_SECRET_KEY, TURNSTILE_API_URL, TURNSTILE_TOKEN_TTL_SECONDS, TURNSTILE_BOT_BYPASS, TURNSTILE_BOT_ALLOWLIST, DECODE_WITH_NODE, NODE_DECODE_TIMEOUT_SECONDS, PAYLOAD_PREVIEW_MAX, DIRECT_COORDS_MODE, DIRECT_COORDS_TOPIC_REGEX, DIRECT_COORDS_ALLOW_ZERO, ROUTE_HISTORY_ALLOWED_MODES_SET, SITE_TITLE, SITE_DESCRIPTION, SITE_OG_IMAGE, SITE_URL, SITE_ICON, SITE_FEED_NOTE, CUSTOM_LINK_URL, PACKET_ANALYZER_URL, QR_CODE_BUTTON_ENABLED, GIT_CHECK_ENABLED, GIT_CHECK_FETCH, GIT_CHECK_PATH, GIT_CHECK_INTERVAL_SECONDS, DISTANCE_UNITS, NODE_MARKER_RADIUS, HISTORY_LINK_SCALE, MAP_START_LAT, MAP_START_LON, MAP_START_ZOOM, MAP_RADIUS_KM, MAP_RADIUS_SHOW, MAP_BOUNDARY_MODE, MAP_BOUNDARY_FILE, MAP_BOUNDARY_SHOW, MAP_DEFAULT_LAYER, PROD_MODE, PROD_TOKEN, LOS_ELEVATION_URL, LOS_ELEVATION_PROXY_URL, LOS_SAMPLE_MIN, LOS_SAMPLE_MAX, LOS_SAMPLE_STEP_METERS, ELEVATION_CACHE_TTL, LOS_CURVATURE_ENABLED, LOS_CURVATURE_FACTOR, LOS_PEAKS_MAX, COVERAGE_API_URL, COVERAGE_API_KEY, COVERAGE_MAX_AGE_DAYS, COVERAGE_CACHE_FILE, COVERAGE_RATE_LIMIT_COOLDOWN_SECONDS, COVERAGE_SYNC_INTERVAL_SECONDS, WEATHER_RADAR_ENABLED, WEATHER_RADAR_COUNTRY_BOUNDS_ENABLED, WEATHER_RADAR_COUNTRY_LOOKUP_URL, WEATHER_WIND_ENABLED, WEATHER_WIND_API_URL, WEATHER_WIND_GRID_SIZE, WEATHER_WIND_REFRESH_SECONDS, APP_VERSION, APP_DIR, NODE_SCRIPT_PATH, ) from state import ( DeviceState, stats, result_counts, seen_devices, mqtt_seen, mqtt_online_source, mqtt_status_seen, mqtt_status_values, mqtt_internal_seen, mqtt_packets_seen, last_seen_broadcast, topic_counts, debug_last, status_last, devices, trails, routes, heat_events, route_history_segments, route_history_edges, peer_history_pairs, node_hash_to_device, node_hash_collisions, node_hash_candidates, elevation_cache, device_names, message_origins, device_roles, device_role_sources, device_coords, neighbor_edges, first_seen_devices, last_seen_in_advert, ) # ========================= # App / State # ========================= mqtt_client: Optional[mqtt.Client] = None background_tasks: Set[asyncio.Task[Any]] = set() clients: Set[WebSocket] = set() update_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() git_update_info = { "available": False, "local": None, "remote": None, "local_short": None, "remote_short": None, "error": None, } mqtt_presence_last_summary: Dict[str, int] = {} def _normalize_route_hashes_for_path_length( path_hashes: Any, path_length: Any, ) -> Optional[List[Any]]: if not isinstance(path_hashes, list) or not path_hashes: return None try: path_length_int = int(path_length) if path_length is not None else None except (TypeError, ValueError): path_length_int = None if path_length_int not in (2, 3): return list(path_hashes) width = path_length_int * 2 normalized: List[Any] = [] changed = False for item in path_hashes: if isinstance(item, int): if item < 0: normalized.append(item) continue normalized.append(f"{item:0{width}X}") changed = True continue normalized.append(item) return normalized if changed else list(path_hashes) def _coverage_request_url(base_url: str, api_key: str) -> str: raw = (base_url or "").strip() if not raw: return raw parts = urlsplit(raw) hostname = (parts.hostname or "").lower() path = parts.path or "" is_meshmapper = hostname == "meshmapper.net" if is_meshmapper: if not path or path == "/": path = "/coverage.php" elif not path.endswith("/coverage.php") and not path.endswith("coverage.php"): path = path.rstrip("/") + "/coverage.php" query_items = dict(parse_qsl(parts.query, keep_blank_values=True)) if api_key and not query_items.get("key"): query_items["key"] = api_key return urlunsplit( ( parts.scheme, parts.netloc, path, urlencode(query_items), parts.fragment, ) ) if path.endswith("/get-samples") or path.endswith("get-samples"): return raw return f"{raw.rstrip('/')}/get-samples" coverage_cache: Dict[str, Any] = { "provider": None, "data": None, "fetched_at": 0.0, "cooldown_until": 0.0, "last_error": None, "source": None, } def _is_meshmapper_coverage_url(base_url: str) -> bool: parts = urlsplit((base_url or "").strip()) return (parts.hostname or "").lower() == "meshmapper.net" def _normalize_coverage_response(data: Any) -> Tuple[List[Dict[str, Any]], str]: if isinstance(data, dict): if isinstance(data.get("grid_squares"), list): if data.get("success") is False: error = str(data.get("error") or "coverage_api_error") message = str(data.get("message") or error) raise HTTPException(status_code=502, detail=f"coverage_api_error: {error}: {message}") return data["grid_squares"], "meshmapper" keys = data.get("keys", []) if isinstance(keys, list): return keys, "legacy" return [], "legacy" if isinstance(data, list): return data, "legacy" return [], "legacy" def _coverage_metadata_from_response( data: Any, provider: str, ) -> Dict[str, Any]: meta: Dict[str, Any] = {"provider": provider} if provider != "meshmapper" or not isinstance(data, dict): return meta region = data.get("region") if isinstance(region, str) and region.strip(): meta["region"] = region.strip().upper() generated_at = _parse_coverage_timestamp(data.get("generated_at")) if generated_at is not None: meta["generated_at"] = generated_at return meta def _coverage_cache_has_data() -> bool: return isinstance(coverage_cache.get("data"), list) def _parse_coverage_timestamp(value: Any) -> Optional[float]: if value is None: return None if isinstance(value, (int, float)): numeric = float(value) elif isinstance(value, str): text = value.strip() if not text: return None try: numeric = float(text) except ValueError: dt_value = None normalized = text if normalized.endswith("Z"): normalized = normalized[:-1] + "+00:00" try: dt_value = datetime.fromisoformat(normalized) except ValueError: for fmt in ( "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%d/%m/%Y %H:%M:%S", "%m/%d/%Y %H:%M:%S", ): try: dt_value = datetime.strptime(text, fmt) break except ValueError: continue if dt_value is None: return None if dt_value.tzinfo is None: dt_value = dt_value.replace(tzinfo=timezone.utc) return dt_value.timestamp() else: return None if numeric > 946684800000: numeric /= 1000.0 if numeric <= 0: return None return numeric def _coverage_item_timestamp(item: Any) -> Optional[float]: if not isinstance(item, dict): return None for key in ("timestamp", "time"): ts = _parse_coverage_timestamp(item.get(key)) if ts is not None: return ts observed = item.get("observed") if isinstance(observed, dict): for key in ("timestamp", "time"): ts = _parse_coverage_timestamp(observed.get(key)) if ts is not None: return ts return None def _filter_coverage_by_age( data: List[Dict[str, Any]], now: Optional[float] = None, ) -> List[Dict[str, Any]]: try: max_age_days = float(COVERAGE_MAX_AGE_DAYS) except (TypeError, ValueError): max_age_days = 30.0 if max_age_days <= 0: return list(data) now_value = float(now or time.time()) cutoff = now_value - (max_age_days * 86400.0) filtered: List[Dict[str, Any]] = [] for item in data: ts = _coverage_item_timestamp(item) if ts is None or ts >= cutoff: filtered.append(item) return filtered def _coverage_response_headers(provider: Optional[str] = None) -> Dict[str, str]: headers: Dict[str, str] = {} provider_value = str(provider or coverage_cache.get("provider") or "").strip().lower() if provider_value: headers["X-Coverage-Provider"] = provider_value region_value = str(coverage_cache.get("region") or "").strip().upper() if provider_value == "meshmapper" and region_value: headers["X-Coverage-Region"] = region_value return headers def _update_coverage_cache( provider: str, data: List[Dict[str, Any]], now: float, source: str = "memory", meta: Optional[Dict[str, Any]] = None, ) -> None: coverage_cache["provider"] = provider coverage_cache["data"] = list(data) coverage_cache["fetched_at"] = now coverage_cache["cooldown_until"] = 0.0 coverage_cache["last_error"] = None coverage_cache["source"] = source coverage_cache["region"] = ( str(meta.get("region")).strip().upper() if isinstance(meta, dict) and meta.get("region") else None ) generated_at = meta.get("generated_at") if isinstance(meta, dict) else None coverage_cache["generated_at"] = float(generated_at) if generated_at else None def _apply_meshmapper_rate_limit_cooldown( now: float, response: Optional[httpx.Response] = None, ) -> int: cooldown_seconds = max(1, int(COVERAGE_RATE_LIMIT_COOLDOWN_SECONDS)) if response is not None: try: data = response.json() except Exception: data = None if isinstance(data, dict): coverage_cache["last_error"] = data resets_in_hours = data.get("resets_in_hours") try: resets_hours_value = float(resets_in_hours) except (TypeError, ValueError): resets_hours_value = None if resets_hours_value and resets_hours_value > 0: cooldown_seconds = max(1, int(math.ceil(resets_hours_value * 3600.0))) coverage_cache["cooldown_until"] = now + cooldown_seconds return cooldown_seconds def _load_coverage_cache_file() -> bool: path = (COVERAGE_CACHE_FILE or "").strip() if not path or not os.path.exists(path): return False try: with open(path, "r", encoding="utf-8") as f: payload = json.load(f) except Exception as exc: print(f"[coverage] Failed to load cache file {path}: {exc}") return False if not isinstance(payload, dict): return False data = payload.get("data") if not isinstance(data, list): return False fetched_at = payload.get("fetched_at") or 0.0 try: fetched_at_value = float(fetched_at) except (TypeError, ValueError): fetched_at_value = 0.0 provider = str(payload.get("provider") or "meshmapper") coverage_cache["provider"] = provider coverage_cache["data"] = list(data) coverage_cache["fetched_at"] = fetched_at_value coverage_cache["cooldown_until"] = float(payload.get("cooldown_until") or 0.0) coverage_cache["last_error"] = payload.get("last_error") coverage_cache["source"] = "file" region = payload.get("region") coverage_cache["region"] = ( str(region).strip().upper() if isinstance(region, str) and region.strip() else None ) generated_at = payload.get("generated_at") try: coverage_cache["generated_at"] = float(generated_at) if generated_at else None except (TypeError, ValueError): coverage_cache["generated_at"] = None print( f"[coverage] Loaded cached coverage file {path} entries={len(data)} fetched_at={int(fetched_at_value) if fetched_at_value > 0 else 0}" ) return True def _save_coverage_cache_file() -> None: path = (COVERAGE_CACHE_FILE or "").strip() if not path or not _coverage_cache_has_data(): return payload = { "provider": coverage_cache.get("provider") or "meshmapper", "fetched_at": float(coverage_cache.get("fetched_at") or 0.0), "cooldown_until": float(coverage_cache.get("cooldown_until") or 0.0), "last_error": coverage_cache.get("last_error"), "region": coverage_cache.get("region"), "generated_at": coverage_cache.get("generated_at"), "data": coverage_cache.get("data") or [], } try: directory = os.path.dirname(path) if directory: os.makedirs(directory, exist_ok=True) tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as f: json.dump(payload, f, separators=(",", ":")) os.replace(tmp_path, path) except Exception as exc: print(f"[coverage] Failed to save cache file {path}: {exc}") async def _fetch_coverage_upstream() -> Tuple[List[Dict[str, Any]], str, Dict[str, Any]]: url = _coverage_request_url(COVERAGE_API_URL, COVERAGE_API_KEY) print(f"[coverage] Fetching from {url}") async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url) response.raise_for_status() data = response.json() samples, provider = _normalize_coverage_response(data) meta = _coverage_metadata_from_response(data, provider) print( f"[coverage] Received {len(samples) if isinstance(samples, list) else 'non-list'} items from coverage API provider={provider}" ) if isinstance(samples, list) and len(samples) > 0: print( f"[coverage] Sample item keys: {list(samples[0].keys()) if samples[0] else 'N/A'}" ) return samples, provider, meta async def _fetch_coverage_upstream_for_test( base_url: str, api_key: str, ) -> Tuple[List[Dict[str, Any]], str]: url = _coverage_request_url(base_url, api_key) async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url) response.raise_for_status() return _normalize_coverage_response(response.json()) async def _sync_meshmapper_coverage_once() -> bool: now = time.time() cooldown_until = float(coverage_cache.get("cooldown_until") or 0.0) if cooldown_until > now: remaining = int(max(1, math.ceil(cooldown_until - now))) print(f"[coverage] MeshMapper sync cooldown active ({remaining}s remaining)") return _coverage_cache_has_data() try: samples, provider, meta = await _fetch_coverage_upstream() if provider != "meshmapper": return False _update_coverage_cache( provider, samples, now, source="meshmapper_sync", meta=meta, ) _save_coverage_cache_file() return True except httpx.TimeoutException: coverage_cache["last_error"] = {"error": "coverage_api_timeout"} print("[coverage] MeshMapper sync timeout") return _coverage_cache_has_data() except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: cooldown_seconds = _apply_meshmapper_rate_limit_cooldown(now, exc.response) print( f"[coverage] MeshMapper sync rate limited cooldown={cooldown_seconds}s" ) return _coverage_cache_has_data() coverage_cache["last_error"] = { "error": "coverage_api_error", "status_code": exc.response.status_code, } print( f"[coverage] MeshMapper sync HTTP error status={exc.response.status_code}" ) return _coverage_cache_has_data() except httpx.HTTPError as exc: coverage_cache["last_error"] = {"error": f"coverage_api_error: {exc}"} print(f"[coverage] MeshMapper sync HTTP error: {exc}") return _coverage_cache_has_data() except Exception as exc: coverage_cache["last_error"] = {"error": f"coverage_fetch_error: {exc}"} print(f"[coverage] MeshMapper sync failed: {exc}") return _coverage_cache_has_data() async def _meshmapper_coverage_sync_loop() -> None: if not _is_meshmapper_coverage_url(COVERAGE_API_URL): return _load_coverage_cache_file() await _sync_meshmapper_coverage_once() interval = max(60, int(COVERAGE_SYNC_INTERVAL_SECONDS)) while True: await asyncio.sleep(interval) await _sync_meshmapper_coverage_once() @asynccontextmanager async def _lifespan(_app: FastAPI): global mqtt_client print(f"[startup] meshmap version={APP_VERSION}") load_map_boundary(force=True) _load_state() _load_route_history() _load_neighbor_overrides() if _is_meshmapper_coverage_url(COVERAGE_API_URL): _load_coverage_cache_file() _ensure_node_decoder() _check_git_updates() if BACKUP_ENABLED: print( f"[backup] enabled dir={BACKUP_DIR} interval={BACKUP_INTERVAL_SECONDS}s retention_days={BACKUP_RETENTION_DAYS}" ) loop = asyncio.get_running_loop() transport = "websockets" if MQTT_TRANSPORT == "websockets" else "tcp" topics_str = ", ".join(MQTT_TOPICS) print( f"[mqtt] connecting host={MQTT_HOST} port={MQTT_PORT} tls={MQTT_TLS} transport={transport} ws_path={MQTT_WS_PATH if transport == 'websockets' else '-'} topics={topics_str}" ) mqtt_client = mqtt.Client( mqtt.CallbackAPIVersion.VERSION2, client_id=(MQTT_CLIENT_ID or None), userdata={"loop": loop}, transport=transport, ) if transport == "websockets": mqtt_client.ws_set_options(path=MQTT_WS_PATH) if MQTT_USERNAME: mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) if MQTT_TLS: if MQTT_CA_CERT: mqtt_client.tls_set(ca_certs=MQTT_CA_CERT) else: mqtt_client.tls_set() if MQTT_TLS_INSECURE: mqtt_client.tls_insecure_set(True) mqtt_client.on_connect = mqtt_on_connect mqtt_client.on_disconnect = mqtt_on_disconnect mqtt_client.on_message = mqtt_on_message mqtt_client.reconnect_delay_set(min_delay=1, max_delay=30) mqtt_client.connect_async(MQTT_HOST, MQTT_PORT, keepalive=30) mqtt_client.loop_start() background_tasks.clear() for coro in ( broadcaster(), reaper(), _state_saver(), _route_history_saver(), _git_check_loop(), _meshmapper_coverage_sync_loop(), _backup_loop(), ): background_tasks.add(asyncio.create_task(coro)) try: yield finally: for task in list(background_tasks): task.cancel() if background_tasks: await asyncio.gather(*background_tasks, return_exceptions=True) background_tasks.clear() if mqtt_client is not None: try: mqtt_client.loop_stop() mqtt_client.disconnect() except Exception: pass mqtt_client = None app = FastAPI(lifespan=_lifespan) app.mount("/static", StaticFiles(directory="static"), name="static") # Initialize Turnstile verifier if enabled turnstile_verifier: Optional[TurnstileVerifier] = None if TURNSTILE_ENABLED and TURNSTILE_SECRET_KEY: turnstile_verifier = TurnstileVerifier( secret_key=TURNSTILE_SECRET_KEY, api_url=TURNSTILE_API_URL, token_ttl_seconds=TURNSTILE_TOKEN_TTL_SECONDS, ) print(f"[startup] Turnstile authentication enabled") else: print(f"[startup] Turnstile authentication disabled") def _compute_asset_version() -> str: paths = [ os.path.join(APP_DIR, "static", "app.js"), os.path.join(APP_DIR, "static", "styles.css"), os.path.join(APP_DIR, "static", "sw.js"), ] mtimes = [] for path in paths: try: mtimes.append(int(os.path.getmtime(path))) except Exception: continue return str(max(mtimes)) if mtimes else "1" ASSET_VERSION = _compute_asset_version() ROUTE_SNAPSHOT_MIN_TTL_SECONDS = 10.0 def _snapshot_routes(now: Optional[float] = None) -> List[Dict[str, Any]]: current = time.time() if now is None else float(now) min_expires_at = current + ROUTE_SNAPSHOT_MIN_TTL_SECONDS return [ _route_payload(route) for route in routes.values() if float(route.get("expires_at") or 0.0) > min_expires_at ] # ========================= # Helpers: coordinate hunting # ========================= def _load_role_overrides() -> Dict[str, str]: if not DEVICE_ROLES_FILE or not os.path.exists(DEVICE_ROLES_FILE): return {} try: with open(DEVICE_ROLES_FILE, "r", encoding="utf-8") as handle: data = json.load(handle) except Exception: return {} if not isinstance(data, dict): return {} roles: Dict[str, str] = {} for key, value in data.items(): if not isinstance(key, str) or not isinstance(value, str): continue role = _normalize_role(value) if not role: continue roles[key.strip()] = role return roles def _load_coord_overrides() -> Dict[str, Dict[str, float]]: if not DEVICE_COORDS_FILE or not os.path.exists(DEVICE_COORDS_FILE): return {} try: with open(DEVICE_COORDS_FILE, "r", encoding="utf-8") as handle: data = json.load(handle) except Exception: return {} if not isinstance(data, dict): return {} coords: Dict[str, Dict[str, float]] = {} for key, value in data.items(): if not isinstance(key, str) or not isinstance(value, dict): continue lat = value.get("lat") lon = value.get("lon") if isinstance(lat, (int, float)) and isinstance(lon, (int, float)): coords[key.strip()] = {"lat": float(lat), "lon": float(lon)} return coords def _neighbor_override_pairs(data: Any) -> List[Tuple[str, str]]: pairs: List[Tuple[str, str]] = [] if isinstance(data, dict): for src, targets in data.items(): if not isinstance(src, str): continue src_id = src.strip() if isinstance(targets, list): for dst in targets: if not isinstance(dst, str): continue dst_id = dst.strip() if src_id and dst_id and src_id != dst_id: pairs.append((src_id, dst_id)) elif isinstance(targets, str): dst_id = targets.strip() if src_id and dst_id and src_id != dst_id: pairs.append((src_id, dst_id)) elif isinstance(data, list): for item in data: if ( isinstance(item, (list, tuple)) and len(item) >= 2 and isinstance(item[0], str) and isinstance(item[1], str) ): src_id = item[0].strip() dst_id = item[1].strip() elif isinstance(item, dict): src = item.get("from") or item.get("src") or item.get("a") dst = item.get("to") or item.get("dst") or item.get("b") if not isinstance(src, str) or not isinstance(dst, str): continue src_id = src.strip() dst_id = dst.strip() else: continue if src_id and dst_id and src_id != dst_id: pairs.append((src_id, dst_id)) return pairs def _load_neighbor_overrides() -> None: if not NEIGHBOR_OVERRIDES_FILE or not os.path.exists(NEIGHBOR_OVERRIDES_FILE): return try: with open(NEIGHBOR_OVERRIDES_FILE, "r", encoding="utf-8") as handle: data = json.load(handle) except Exception as exc: print(f"[neighbors] failed to load {NEIGHBOR_OVERRIDES_FILE}: {exc}") return now = time.time() added = 0 for src_id, dst_id in _neighbor_override_pairs(data): _touch_neighbor(src_id, dst_id, now, manual=True) _touch_neighbor(dst_id, src_id, now, manual=True) added += 1 if added: print( f"[neighbors] loaded {added} override pairs from {NEIGHBOR_OVERRIDES_FILE}" ) def _touch_neighbor( src_id: str, dst_id: str, ts: float, manual: bool = False, ) -> None: if not src_id or not dst_id or src_id == dst_id: return neighbors = neighbor_edges.setdefault(src_id, {}) entry = neighbors.get(dst_id) if entry is None: entry = {"count": 0, "last_seen": 0.0, "manual": False} neighbors[dst_id] = entry if manual: entry["manual"] = True else: entry["count"] = int(entry.get("count", 0)) + 1 entry["last_seen"] = max(float(entry.get("last_seen", 0.0)), float(ts)) def _record_neighbors(point_ids: List[Optional[str]], ts: float) -> None: if not point_ids or len(point_ids) < 2: return for idx in range(len(point_ids) - 1): src_id = point_ids[idx] dst_id = point_ids[idx + 1] if not src_id or not dst_id or src_id == dst_id: continue _touch_neighbor(src_id, dst_id, ts, manual=False) _touch_neighbor(dst_id, src_id, ts, manual=False) def _update_path_timestamps(point_ids: List[Optional[str]], ts: float) -> None: """Update last_seen_in_path for all devices in a path.""" if not point_ids: return for device_id in point_ids: if device_id: state.last_seen_in_path[device_id] = max( state.last_seen_in_path.get(device_id, 0.0), float(ts) ) def _prune_neighbors(now: float) -> None: ttl_seconds = PATH_TTL_SECONDS if PATH_TTL_SECONDS > 0 else DEVICE_TTL_WINDOW_SECONDS if ttl_seconds <= 0 or not neighbor_edges: return cutoff = now - ttl_seconds for src_id, edges in list(neighbor_edges.items()): for dst_id, entry in list(edges.items()): if entry.get("manual"): continue if entry.get("last_seen", 0.0) < cutoff: edges.pop(dst_id, None) if not edges: neighbor_edges.pop(src_id, None) def _serialize_state() -> Dict[str, Any]: return { "version": 1, "saved_at": time.time(), "devices": { k: asdict(v) for k, v in devices.items() }, "trails": trails, "seen_devices": seen_devices, "device_names": device_names, "device_roles": device_roles, "device_role_sources": device_role_sources, "last_seen_in_path": state.last_seen_in_path, "first_seen_devices": first_seen_devices, "last_seen_in_advert": last_seen_in_advert, "peer_history_pairs": peer_history_pairs, } def _check_git_updates() -> None: if not GIT_CHECK_ENABLED: return if not GIT_CHECK_PATH or not os.path.isdir(GIT_CHECK_PATH): git_update_info["error"] = "git_path_missing" return def _run_git(args: List[str]) -> str: result = subprocess.run( args, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) return result.stdout.strip() try: subprocess.run( ["git", "config", "--global", "--add", "safe.directory", GIT_CHECK_PATH], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) inside = _run_git( ["git", "-C", GIT_CHECK_PATH, "rev-parse", "--is-inside-work-tree"] ) if inside.lower() != "true": git_update_info["error"] = "not_git_repo" return except Exception: git_update_info["error"] = "git_unavailable" return try: if GIT_CHECK_FETCH: subprocess.run( ["git", "-C", GIT_CHECK_PATH, "fetch", "--quiet", "--prune"], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) local_sha = _run_git(["git", "-C", GIT_CHECK_PATH, "rev-parse", "HEAD"]) remote_sha = _run_git(["git", "-C", GIT_CHECK_PATH, "rev-parse", "@{u}"]) git_update_info["local"] = local_sha git_update_info["remote"] = remote_sha git_update_info["local_short"] = local_sha[:7] git_update_info["remote_short"] = remote_sha[:7] git_update_info["available"] = local_sha != remote_sha if git_update_info["available"]: print( f"[update] available {git_update_info['local_short']} -> {git_update_info['remote_short']}" ) except Exception: git_update_info["error"] = "git_compare_failed" async def _git_check_loop() -> None: if not GIT_CHECK_ENABLED: return if GIT_CHECK_INTERVAL_SECONDS <= 0: return while True: await asyncio.sleep(GIT_CHECK_INTERVAL_SECONDS) _check_git_updates() def _device_payload(device_id: str, state: "DeviceState") -> Dict[str, Any]: payload = asdict(state) last_seen = seen_devices.get(device_id) if last_seen: payload["last_seen_ts"] = last_seen else: payload["last_seen_ts"] = payload.get("ts") mqtt_seen_ts = mqtt_seen.get(device_id) if mqtt_seen_ts: payload["mqtt_seen_ts"] = mqtt_seen_ts mqtt_source = mqtt_online_source.get(device_id) if mqtt_source: payload["mqtt_online_source"] = mqtt_source mqtt_status_ts = mqtt_status_seen.get(device_id) if mqtt_status_ts: payload["mqtt_status_ts"] = mqtt_status_ts mqtt_status_value = mqtt_status_values.get(device_id) if mqtt_status_value: payload["mqtt_status_value"] = mqtt_status_value mqtt_internal_ts = mqtt_internal_seen.get(device_id) if mqtt_internal_ts: payload["mqtt_internal_ts"] = mqtt_internal_ts mqtt_packets_ts = mqtt_packets_seen.get(device_id) if mqtt_packets_ts: payload["mqtt_packets_ts"] = mqtt_packets_ts if MQTT_ONLINE_FORCE_NAMES_SET: name_value = (state.name or device_names.get(device_id) or "").strip().lower() if name_value and name_value in MQTT_ONLINE_FORCE_NAMES_SET: payload["mqtt_forced"] = True if PROD_MODE: payload.pop("raw_topic", None) return payload def _iso_from_ts(ts: Optional[float]) -> Optional[str]: if ts is None: return None try: return datetime.fromtimestamp(float(ts), tz=timezone.utc ).strftime("%Y-%m-%dT%H:%M:%SZ") except Exception: return None def _normalize_device_name_for_dedupe(name: Any) -> str: if not isinstance(name, str): return "" return " ".join(name.strip().lower().split()) def _drop_device_state(device_id: str) -> None: devices.pop(device_id, None) trails.pop(device_id, None) seen_devices.pop(device_id, None) first_seen_devices.pop(device_id, None) device_names.pop(device_id, None) device_roles.pop(device_id, None) device_role_sources.pop(device_id, None) device_coords.pop(device_id, None) state.last_seen_in_path.pop(device_id, None) last_seen_in_advert.pop(device_id, None) mqtt_seen.pop(device_id, None) mqtt_online_source.pop(device_id, None) mqtt_status_seen.pop(device_id, None) mqtt_status_values.pop(device_id, None) mqtt_internal_seen.pop(device_id, None) mqtt_packets_seen.pop(device_id, None) last_seen_broadcast.pop(device_id, None) def _dedupe_loaded_devices() -> Set[str]: groups: Dict[Tuple[str, int, int], List[str]] = {} for device_id, dev_state in devices.items(): name = _normalize_device_name_for_dedupe( dev_state.name or device_names.get(device_id) ) if (not name or _coords_are_zero(dev_state.lat, dev_state.lon)): continue key = ( name, int(round(float(dev_state.lat) * 100000)), int(round(float(dev_state.lon) * 100000)), ) groups.setdefault(key, []).append(device_id) dropped_ids: Set[str] = set() for duplicate_ids in groups.values(): if len(duplicate_ids) < 2: continue def _score(device_id: str) -> Tuple[float, float, str]: last_seen = float(seen_devices.get(device_id) or devices[device_id].ts or 0.0) first_seen = float(first_seen_devices.get(device_id) or last_seen or 0.0) return (last_seen, -first_seen, device_id) keep_id = max(duplicate_ids, key=_score) for device_id in duplicate_ids: if device_id == keep_id: continue _drop_device_state(device_id) dropped_ids.add(device_id) if dropped_ids: state.state_dirty = True print( f"[state] Dropped {len(dropped_ids)} duplicate device entries: " f"{', '.join(sorted(dropped_ids))}" ) return dropped_ids def _parse_meshcore_topic( topic: str ) -> Tuple[Optional[str], Optional[str], Optional[str]]: parts = topic.split("/") if len(parts) < 4 or parts[0] != "meshcore": return (None, None, None) iata = parts[1].strip() or None node_id = parts[2].strip() or None kind = parts[3].strip().lower() or None return (iata, node_id, kind) def _parse_json_dict(payload_bytes: bytes) -> Optional[Dict[str, Any]]: try: obj = json.loads(payload_bytes.decode("utf-8", errors="strict")) except Exception: return None if isinstance(obj, dict): return obj return None def _extract_status_value(obj: Optional[Dict[str, Any]]) -> Optional[str]: if not isinstance(obj, dict): return None for key in ( "status", "state", "mqtt_status", "mqttStatus", "connection", "connection_state", ): value = obj.get(key) if isinstance(value, str) and value.strip(): return value.strip().lower() return None def _select_mqtt_online_source( device_id: str, now: float ) -> Tuple[Optional[str], Optional[float]]: status_ts = mqtt_status_seen.get(device_id) internal_ts = mqtt_internal_seen.get(device_id) status_recent = ( bool(status_ts) and MQTT_ONLINE_STATUS_TTL_SECONDS > 0 and (now - status_ts <= MQTT_ONLINE_STATUS_TTL_SECONDS) ) internal_recent = ( bool(internal_ts) and MQTT_ONLINE_INTERNAL_TTL_SECONDS > 0 and (now - internal_ts <= MQTT_ONLINE_INTERNAL_TTL_SECONDS) ) status_value = (mqtt_status_values.get(device_id) or "").strip().lower() if status_recent and status_value in MQTT_STATUS_OFFLINE_VALUES_SET: return (None, None) if internal_recent: return ("internal", internal_ts) if status_recent: return ("status", status_ts) return (None, None) def _mqtt_presence_payload( device_id: str, last_seen_ts: Optional[float] = None, now: Optional[float] = None, ) -> Dict[str, Any]: now = now or time.time() return { "type": "device_seen", "device_id": device_id, "last_seen_ts": last_seen_ts, "mqtt_seen_ts": mqtt_seen.get(device_id), "mqtt_online_source": mqtt_online_source.get(device_id), "mqtt_status_ts": mqtt_status_seen.get(device_id), "mqtt_status_value": mqtt_status_values.get(device_id), "mqtt_internal_ts": mqtt_internal_seen.get(device_id), "mqtt_packets_ts": mqtt_packets_seen.get(device_id), "mqtt_presence": _mqtt_presence_summary(now), } def _is_packets_active(ts: Optional[float], now: float) -> bool: if not ts or MQTT_ACTIVITY_PACKETS_TTL_SECONDS <= 0: return False return (now - ts) <= MQTT_ACTIVITY_PACKETS_TTL_SECONDS def _refresh_mqtt_presence(now: Optional[float] = None) -> None: now = now or time.time() candidate_ids = ( set(mqtt_seen.keys()) | set(mqtt_online_source.keys()) | set(mqtt_status_seen.keys()) | set(mqtt_internal_seen.keys()) ) for device_id in list(candidate_ids): source, source_ts = _select_mqtt_online_source(device_id, now) if source and source_ts: mqtt_seen[device_id] = source_ts mqtt_online_source[device_id] = source continue mqtt_seen.pop(device_id, None) mqtt_online_source.pop(device_id, None) def _mqtt_presence_summary(now: Optional[float] = None) -> Dict[str, int]: now = now or time.time() _refresh_mqtt_presence(now) connected_total = len(mqtt_seen) connected_on_map = sum(1 for device_id in mqtt_seen if device_id in devices) feeding_total = sum( 1 for ts in mqtt_packets_seen.values() if _is_packets_active(ts, now) ) feeding_on_map = sum( 1 for device_id, ts in mqtt_packets_seen.items() if device_id in devices and _is_packets_active(ts, now) ) return { "connected_total": connected_total, "connected_on_map": connected_on_map, "connected_off_map": max(0, connected_total - connected_on_map), "feeding_total": feeding_total, "feeding_on_map": feeding_on_map, "feeding_off_map": max(0, feeding_total - feeding_on_map), } def _record_mqtt_presence( topic: str, payload_bytes: bytes, now: float ) -> Optional[Dict[str, Any]]: _, device_id, topic_kind = _parse_meshcore_topic(topic) if not device_id or topic_kind not in ("status", "internal", "packets"): return None was_online = device_id in mqtt_seen if topic_kind == "status": mqtt_status_seen[device_id] = now status_value = _extract_status_value(_parse_json_dict(payload_bytes)) if status_value: mqtt_status_values[device_id] = status_value elif topic_kind == "internal": mqtt_internal_seen[device_id] = now elif topic_kind == "packets": mqtt_packets_seen[device_id] = now source, source_ts = _select_mqtt_online_source(device_id, now) if source and source_ts: mqtt_seen[device_id] = source_ts mqtt_online_source[device_id] = source seen_devices[device_id] = now else: mqtt_seen.pop(device_id, None) mqtt_online_source.pop(device_id, None) event = _mqtt_presence_payload( device_id, seen_devices.get(device_id) if source else None, now=now ) event["topic_kind"] = topic_kind if not was_online and source: event["presence_transition"] = "online" elif was_online and not source: event["presence_transition"] = "offline" else: event["presence_transition"] = "stable" return event def _within_map_radius(lat: Any, lon: Any) -> bool: return within_map_boundary(lat, lon) def _evict_device(device_id: str) -> bool: removed = False if device_id in devices: devices.pop(device_id, None) removed = True trails.pop(device_id, None) state.last_seen_in_path.pop(device_id, None) first_seen_devices.pop(device_id, None) last_seen_in_advert.pop(device_id, None) if removed: state.state_dirty = True _rebuild_node_hash_map() return removed def _device_role_code(value: Any) -> int: if isinstance(value, int): if value in (1, 2, 3): return value return 1 if isinstance(value, str): trimmed = value.strip() if trimmed.isdigit(): num = int(trimmed) if num in (1, 2, 3): return num return 1 normalized = _normalize_role(trimmed) if normalized == "repeater": return 2 if normalized == "room": return 3 if normalized == "companion": return 1 return 1 def _parse_updated_since(value: Optional[str]) -> Optional[float]: if not value: return None try: text = value.strip() if text.endswith("Z"): text = text[:-1] + "+00:00" return datetime.fromisoformat(text).timestamp() except Exception: return None def _node_api_payload(device_id: str, state: "DeviceState") -> Dict[str, Any]: last_seen = seen_devices.get(device_id) or state.ts last_seen_iso = _iso_from_ts(last_seen) role_value = state.role or device_roles.get(device_id) device_role = _device_role_code(role_value) return { "public_key": device_id, "name": (state.name or device_names.get(device_id) or ""), "device_role": device_role, "role": role_value, "location": { "latitude": float(state.lat), "longitude": float(state.lon), }, "lat": state.lat, "lon": state.lon, "last_seen_ts": last_seen, "last_seen": last_seen_iso, "timestamp": int(last_seen) if last_seen else None, "first_seen": last_seen_iso, "battery_voltage": 0, } def _route_payload(route: Dict[str, Any]) -> Dict[str, Any]: if not PROD_MODE: return route return { "id": route.get("id"), "points": route.get("points"), "hashes": route.get("hashes"), "point_ids": route.get("point_ids"), "route_mode": route.get("route_mode"), "ts": route.get("ts"), "expires_at": route.get("expires_at"), "origin_id": route.get("origin_id"), "receiver_id": route.get("receiver_id"), "payload_type": route.get("payload_type"), "message_hash": route.get("message_hash"), "sender_name": route.get("sender_name"), } def _history_edge_payload(edge: Dict[str, Any]) -> Dict[str, Any]: return { "id": edge.get("id"), "a": edge.get("a"), "b": edge.get("b"), "count": edge.get("count"), "last_ts": edge.get("last_ts"), "recent": edge.get("recent") if isinstance(edge.get("recent"), list) else [], } def _extract_token(headers: Dict[str, str]) -> Optional[str]: auth = headers.get("authorization") if auth: parts = auth.strip().split() if len(parts) == 2 and parts[0].lower() == "bearer": return parts[1] return auth.strip() return headers.get("x-access-token") or headers.get("x-token") def _extract_cookie_token(headers: Dict[str, str], key: str) -> Optional[str]: cookie = headers.get("cookie") if not cookie: return None for part in cookie.split(";"): part = part.strip() if not part: continue if part.startswith(f"{key}="): return part[len(key) + 1 :] return None def _require_prod_token(request: Request) -> None: if not PROD_MODE: return if not PROD_TOKEN: raise HTTPException(status_code=503, detail="prod_token_not_set") token = request.query_params.get("token" ) or request.query_params.get("access_token") if not token: token = _extract_token(request.headers) if token != PROD_TOKEN: raise HTTPException(status_code=401, detail="unauthorized") app.include_router(create_weather_router(_require_prod_token)) def _ws_authorized(ws: WebSocket) -> bool: if TURNSTILE_ENABLED and turnstile_verifier: auth_token = _extract_cookie_token(ws.headers, "meshmap_auth") or \ ws.query_params.get("auth") or \ _extract_token(ws.headers) if auth_token and turnstile_verifier.verify_auth_token(auth_token): return True if not PROD_MODE: return True if not PROD_TOKEN: return False token = ws.query_params.get("token") or ws.query_params.get("access_token") if not token: token = _extract_token(ws.headers) return token == PROD_TOKEN def _load_state() -> None: try: if not os.path.exists(STATE_FILE): return with open(STATE_FILE, "r", encoding="utf-8") as handle: data = json.load(handle) except Exception as exc: print(f"[state] failed to load {STATE_FILE}: {exc}") return raw_devices = data.get("devices") or {} loaded_devices: Dict[str, DeviceState] = {} dropped_ids: Set[str] = set() for key, value in raw_devices.items(): if not isinstance(value, dict): continue try: loaded_state = DeviceState(**value) except Exception: continue if _coords_are_zero(loaded_state.lat, loaded_state.lon ) or not _within_map_radius(loaded_state.lat, loaded_state.lon): dropped_ids.add(str(key)) continue loaded_devices[key] = loaded_state devices.clear() devices.update(loaded_devices) trails.clear() trails.update(data.get("trails") or {}) seen_devices.clear() seen_devices.update(data.get("seen_devices") or {}) first_seen_devices.clear() raw_first_seen = data.get("first_seen_devices") or {} if isinstance(raw_first_seen, dict): for key, value in raw_first_seen.items(): try: first_seen_devices[str(key)] = float(value) except (TypeError, ValueError): continue else: raw_first_seen = {} cleaned_trails: Dict[str, list] = {} trails_dirty = False for device_id, trail in trails.items(): if not isinstance(trail, list): continue filtered: list = [] for entry in trail: if not isinstance(entry, (list, tuple)) or len(entry) < 2: continue lat = entry[0] lon = entry[1] try: lat_val = float(lat) lon_val = float(lon) except (TypeError, ValueError): continue if _coords_are_zero(lat_val, lon_val) or not _within_map_radius(lat_val, lon_val): trails_dirty = True continue filtered.append(list(entry)) if filtered: cleaned_trails[device_id] = filtered else: trails_dirty = True trails.clear() trails.update(cleaned_trails) if TRAIL_LEN <= 0 and trails: trails.clear() trails_dirty = True if dropped_ids: for device_id in dropped_ids: trails.pop(device_id, None) seen_devices.pop(device_id, None) trails_dirty = True if trails_dirty: state.state_dirty = True raw_names = data.get("device_names") or {} if isinstance(raw_names, dict): device_names.clear() device_names.update( { str(k): str(v) for k, v in raw_names.items() if str(v).strip() } ) else: device_names.clear() if dropped_ids: for device_id in dropped_ids: device_names.pop(device_id, None) raw_role_sources = data.get("device_role_sources") or {} if isinstance(raw_role_sources, dict): device_role_sources.clear() device_role_sources.update( { str(k): str(v) for k, v in raw_role_sources.items() if str(v).strip() } ) else: device_role_sources.clear() if dropped_ids: for device_id in dropped_ids: device_role_sources.pop(device_id, None) raw_roles = data.get("device_roles") or {} device_roles.clear() if isinstance(raw_roles, dict): for key, value in raw_roles.items(): if dropped_ids and str(key) in dropped_ids: continue role_value = str(value).strip() if isinstance(value, str) else "" if not role_value: continue source = device_role_sources.get(str(key)) if source in ("explicit", "override"): device_roles[str(key)] = role_value role_overrides = _load_role_overrides() if role_overrides: for device_id in role_overrides: device_role_sources[device_id] = "override" device_roles.update(role_overrides) if dropped_ids: for device_id in dropped_ids: device_roles.pop(device_id, None) raw_path_timestamps = data.get("last_seen_in_path") or {} state.last_seen_in_path.clear() if isinstance(raw_path_timestamps, dict): for key, value in raw_path_timestamps.items(): if dropped_ids and str(key) in dropped_ids: continue try: state.last_seen_in_path[str(key)] = float(value) except (TypeError, ValueError): continue raw_advert_seen = data.get("last_seen_in_advert") or {} last_seen_in_advert.clear() if isinstance(raw_advert_seen, dict): for key, value in raw_advert_seen.items(): if dropped_ids and str(key) in dropped_ids: continue try: last_seen_in_advert[str(key)] = float(value) except (TypeError, ValueError): continue raw_peer_history = data.get("peer_history_pairs") or {} peer_history_pairs.clear() if isinstance(raw_peer_history, dict): for pair_key, value in raw_peer_history.items(): if not isinstance(pair_key, str) or not isinstance(value, dict): continue a_id = value.get("a_id") b_id = value.get("b_id") buckets = value.get("buckets") if not isinstance(a_id, str) or not a_id.strip(): continue if not isinstance(b_id, str) or not b_id.strip(): continue if not isinstance(buckets, dict): continue clean_buckets: Dict[str, int] = {} for bucket_key, count in buckets.items(): try: bucket_ts = int(float(bucket_key)) bucket_count = int(count) except (TypeError, ValueError): continue if bucket_count <= 0: continue clean_buckets[str(bucket_ts)] = bucket_count if not clean_buckets: continue last_ts = value.get("last_ts") try: last_ts_val = float(last_ts) if last_ts is not None else 0.0 except (TypeError, ValueError): last_ts_val = 0.0 peer_history_pairs[pair_key] = { "a_id": a_id.strip(), "b_id": b_id.strip(), "buckets": clean_buckets, "last_ts": last_ts_val, } if _prune_peer_history(): state.state_dirty = True # If first_seen data is missing, fall back to loaded last_seen values. if not raw_first_seen: for device_id, seen_ts in seen_devices.items(): try: first_seen_devices[device_id] = float(seen_ts) except (TypeError, ValueError): continue # Load and apply coordinate overrides coord_overrides = _load_coord_overrides() if coord_overrides: device_coords.clear() device_coords.update(coord_overrides) if dropped_ids: for device_id in dropped_ids: device_coords.pop(device_id, None) first_seen_devices.pop(device_id, None) last_seen_in_advert.pop(device_id, None) for device_id, dev_state in devices.items(): if not dev_state.name and device_id in device_names: dev_state.name = device_names[device_id] role_value = device_roles.get(device_id) dev_state.role = role_value if role_value else None # Apply coordinate overrides to loaded devices coord_override = device_coords.get(device_id) if coord_override: dev_state.lat = coord_override["lat"] dev_state.lon = coord_override["lon"] dropped_ids.update(_dedupe_loaded_devices()) _rebuild_node_hash_map() async def _state_saver() -> None: while True: if state.state_dirty: try: os.makedirs(STATE_DIR, exist_ok=True) tmp_path = f"{STATE_FILE}.tmp" with open(tmp_path, "w", encoding="utf-8") as handle: json.dump(_serialize_state(), handle) os.replace(tmp_path, STATE_FILE) state.state_dirty = False except Exception as exc: print(f"[state] failed to save {STATE_FILE}: {exc}") await asyncio.sleep(max(1.0, STATE_SAVE_INTERVAL)) def mqtt_on_connect(client, userdata, flags, reason_code, properties=None): topics_str = ", ".join(MQTT_TOPICS) print( f"[mqtt] connected reason_code={reason_code} subscribing topics={topics_str}" ) for topic in MQTT_TOPICS: client.subscribe(topic, qos=0) def mqtt_on_disconnect( client, userdata, reason_code, properties=None, *args, **kwargs ): print(f"[mqtt] disconnected reason_code={reason_code}") def mqtt_on_message(client, userdata, msg: mqtt.MQTTMessage): stats["received_total"] += 1 stats["last_rx_ts"] = time.time() stats["last_rx_topic"] = msg.topic topic_counts[msg.topic] = topic_counts.get(msg.topic, 0) + 1 loop: asyncio.AbstractEventLoop = userdata["loop"] now = time.time() mqtt_presence_event = _record_mqtt_presence(msg.topic, msg.payload, now) if mqtt_presence_event: device_id = mqtt_presence_event["device_id"] should_broadcast = False transition = mqtt_presence_event.get("presence_transition") if transition in ("online", "offline"): should_broadcast = True elif mqtt_presence_event.get("mqtt_seen_ts") and device_id in devices: last_sent = last_seen_broadcast.get(device_id, 0) if now - last_sent >= MQTT_SEEN_BROADCAST_MIN_SECONDS: should_broadcast = True if should_broadcast: last_seen_broadcast[device_id] = now loop.call_soon_threadsafe(update_queue.put_nowait, mqtt_presence_event) parsed, debug = _try_parse_payload(msg.topic, msg.payload) device_id_hint = parsed.get("device_id") if parsed else None # Also try to get device_id from topic if parsing failed or no device_id in parsed data topic_device_id = _device_id_from_topic(msg.topic) # Priority: decoded_pubkey (origin/repeater that sent packet) > parsed device_id > topic device_id (receiver) decoded_pubkey = debug.get("decoded_pubkey") if isinstance(decoded_pubkey, str) and decoded_pubkey.strip(): decoded_pubkey = decoded_pubkey.strip() else: decoded_pubkey = None # Check if device has coordinate override - prioritize decoded packet public key (origin) coord_override = None matched_device_id = None # Try 1: decoded_pubkey (origin/repeater that sent the packet) - this is what we want! if decoded_pubkey and decoded_pubkey in device_coords: coord_override = device_coords[decoded_pubkey] matched_device_id = decoded_pubkey # Try 2: device_id_hint from parsed data (should also be decoded_pubkey if available) elif device_id_hint and device_id_hint in device_coords: coord_override = device_coords[device_id_hint] matched_device_id = device_id_hint # Try 3: Check if decoded_pubkey matches any override via substring (for partial matches) elif decoded_pubkey: for override_id in device_coords.keys(): if override_id in decoded_pubkey or decoded_pubkey in override_id: coord_override = device_coords[override_id] matched_device_id = override_id break # Try 4: topic_device_id (receiver/observer) - only as fallback if not coord_override and topic_device_id and topic_device_id in device_coords: coord_override = device_coords[topic_device_id] matched_device_id = topic_device_id # Try 5: Check all parts of the topic path (receiver/observer) if not coord_override: topic_parts = msg.topic.split("/") for part in topic_parts: if part and len(part) > 10 and part in device_coords: # Only check parts that look like device IDs (long hex strings) coord_override = device_coords[part] matched_device_id = part break # Try 6: Check if any device_id in override file is a substring of any topic part (for partial matches) if not coord_override: for part in topic_parts: if part and len(part) > 10: # Check if any override key is contained in this topic part or vice versa for override_id in device_coords.keys(): if override_id in part or part in override_id: coord_override = device_coords[override_id] matched_device_id = override_id break if coord_override: break has_coord_override = coord_override is not None # Initialize check_lat and check_lon - will be set from override or parsed data check_lat = None check_lon = None if has_coord_override and matched_device_id: # Use override coordinates for filtering checks and inject into parsed data check_lat = coord_override["lat"] check_lon = coord_override["lon"] # Normalize timestamp: if it's too far in the future (> 1 hour), use current time now_ts = time.time() parsed_ts = parsed.get("ts") if parsed else None if parsed_ts and parsed_ts > now_ts + 3600: # More than 1 hour in future parsed_ts = now_ts if DEBUG_PAYLOAD: print(f"[mqtt] Normalized future timestamp: device={matched_device_id} future_ts={parsed.get('ts')} -> now={now_ts}") # If parsing failed or has no location, create/update parsed data with override coords # Use decoded_pubkey as device_id if available (origin), otherwise use matched_device_id target_device_id = decoded_pubkey or matched_device_id if not parsed: parsed = { "device_id": target_device_id, "lat": coord_override["lat"], "lon": coord_override["lon"], "ts": now_ts, } device_id_hint = target_device_id debug["result"] = "coord_override_created" if DEBUG_PAYLOAD: print(f"[mqtt] Created parsed data from coord override: device_id={target_device_id} (matched_override={matched_device_id}) lat={coord_override['lat']} lon={coord_override['lon']}") elif parsed: # If parsing succeeded but no location, inject override coordinates if not parsed.get("lat") or not parsed.get("lon") or _coords_are_zero(parsed.get("lat", 0), parsed.get("lon", 0)): parsed["lat"] = coord_override["lat"] parsed["lon"] = coord_override["lon"] # Ensure device_id is set to the decoded_pubkey (origin) if available if decoded_pubkey: parsed["device_id"] = decoded_pubkey device_id_hint = decoded_pubkey elif not device_id_hint: parsed["device_id"] = matched_device_id device_id_hint = matched_device_id debug["result"] = debug.get("result") or "coord_override_applied" if DEBUG_PAYLOAD: print(f"[mqtt] Applied coord override to parsed data: device_id={parsed.get('device_id')} (matched_override={matched_device_id}) lat={coord_override['lat']} lon={coord_override['lon']}") # Always normalize timestamp if it's in the future if parsed_ts: parsed["ts"] = parsed_ts # Set check_lat/check_lon from parsed data if not already set from override if check_lat is None and check_lon is None: check_lat = parsed.get("lat") if parsed else None check_lon = parsed.get("lon") if parsed else None # Don't filter 0,0 coordinates if device has a coordinate override if parsed and _coords_are_zero(parsed.get("lat", 0), parsed.get("lon", 0)) and not has_coord_override: debug["result"] = "filtered_zero_coords" parsed = None # Check radius using override coordinates if available if parsed and check_lat is not None and check_lon is not None and not _within_map_radius(check_lat, check_lon): debug["result"] = "filtered_radius" if DEBUG_PAYLOAD: device_id_for_log = matched_device_id or decoded_pubkey or device_id_hint or topic_device_id or parsed.get("device_id") print(f"[mqtt] Filtered device by radius: device_id={device_id_for_log} lat={check_lat} lon={check_lon} (radius={MAP_RADIUS_KM}km)") parsed = None if matched_device_id or decoded_pubkey or device_id_hint: remove_id = matched_device_id or decoded_pubkey or device_id_hint loop.call_soon_threadsafe( update_queue.put_nowait, { "type": "device_remove", "device_id": remove_id, "reason": "radius", }, ) origin_id = debug.get("origin_id") or _device_id_from_topic(msg.topic) decoder_meta = debug.get("decoder_meta") or {} result = debug.get("result") or "unknown" device_role = debug.get("device_role") role_target_id = origin_id if device_role and result.startswith("decoded"): role_target_id = None loc_meta = ( decoder_meta.get("location") if isinstance(decoder_meta, dict) else None ) loc_pubkey = loc_meta.get("pubkey") if isinstance(loc_meta, dict) else None if isinstance(loc_pubkey, str) and loc_pubkey.strip(): role_target_id = loc_pubkey else: decoded_pubkey = debug.get("decoded_pubkey") if isinstance(decoded_pubkey, str) and decoded_pubkey.strip(): role_target_id = decoded_pubkey debug_entry = { "ts": time.time(), "topic": msg.topic, "result": debug.get("result"), "found_path": debug.get("found_path"), "found_hint": debug.get("found_hint"), "decoder_meta": decoder_meta, "role_target_id": role_target_id, "packet_hash": debug.get("packet_hash"), "direction": debug.get("direction"), "json_keys": debug.get("json_keys"), "parse_error": debug.get("parse_error"), "origin_id": origin_id, "payload_preview": _safe_preview(msg.payload[:DEBUG_PAYLOAD_MAX]), } debug_last.append(debug_entry) if msg.topic.endswith("/status"): status_last.append( { "ts": debug_entry["ts"], "topic": msg.topic, "device_name": debug.get("device_name"), "device_role": debug.get("device_role"), "origin_id": origin_id, "json_keys": debug_entry.get("json_keys"), "payload_preview": debug_entry["payload_preview"], } ) result_counts[result] = result_counts.get(result, 0) + 1 device_name = debug.get("device_name") if device_name and origin_id: existing_name = device_names.get(origin_id) if existing_name != device_name: device_names[origin_id] = device_name state.state_dirty = True device_state = devices.get(origin_id) if device_state: device_state.name = device_name loop: asyncio.AbstractEventLoop = userdata["loop"] loop.call_soon_threadsafe( update_queue.put_nowait, { "type": "device_name", "device_id": origin_id, }, ) if device_role and role_target_id: existing_role = device_roles.get(role_target_id) if existing_role != device_role: device_roles[role_target_id] = device_role device_role_sources[role_target_id] = "explicit" state.state_dirty = True device_state = devices.get(role_target_id) if device_state: device_state.role = device_role loop: asyncio.AbstractEventLoop = userdata["loop"] loop.call_soon_threadsafe( update_queue.put_nowait, { "type": "device_role", "device_id": role_target_id, }, ) path_hashes = decoder_meta.get("pathHashes") payload_type = decoder_meta.get("payloadType") route_type = decoder_meta.get("routeType") sender_name = decoder_meta.get("senderName") if not isinstance(sender_name, str) or not sender_name.strip(): sender_name = None else: sender_name = sender_name.strip() message_hash = debug.get("packet_hash") or decoder_meta.get("messageHash") snr_values = decoder_meta.get("snrValues") path_header = decoder_meta.get("path") path_length = decoder_meta.get("pathLength") direction = debug.get("direction") receiver_id = _device_id_from_topic(msg.topic) route_origin_id = None loc_meta = decoder_meta.get("location" ) if isinstance(decoder_meta, dict) else None if isinstance(loc_meta, dict): decoded_pubkey = loc_meta.get("pubkey") if decoded_pubkey: route_origin_id = decoded_pubkey direction_value = str(direction or "").lower() if message_hash: cache = message_origins.get(message_hash) if not cache: cache = { "origin_id": None, "sender_name": None, "first_rx": None, "receivers": set(), "ts": time.time(), } message_origins[message_hash] = cache cache["ts"] = time.time() if route_origin_id: cache["origin_id"] = route_origin_id if sender_name: cache["sender_name"] = sender_name if direction_value == "rx" and receiver_id: cache["receivers"].add(receiver_id) if not cache.get("first_rx"): cache["first_rx"] = receiver_id if not route_origin_id: cached_origin_id = cache.get("origin_id") if isinstance(cached_origin_id, str) and cached_origin_id.strip(): route_origin_id = cached_origin_id if not sender_name: cached_sender_name = cache.get("sender_name") if isinstance(cached_sender_name, str) and cached_sender_name.strip(): sender_name = cached_sender_name.strip() loop: asyncio.AbstractEventLoop = userdata["loop"] try: payload_type = int(payload_type) if payload_type is not None else None except (TypeError, ValueError): payload_type = None try: route_type = int(route_type) if route_type is not None else None except (TypeError, ValueError): route_type = None if payload_type == 4: advert_device_id = route_origin_id or origin_id or receiver_id if advert_device_id: last_seen_in_advert[advert_device_id] = time.time() state.state_dirty = True route_hashes = None if path_hashes and isinstance(path_hashes, list): route_hashes = path_hashes elif payload_type not in (8, 9) and isinstance(path_header, list): if route_type in (0, 1): route_hashes = path_header route_hashes = _normalize_route_hashes_for_path_length(route_hashes, path_length) route_emitted = False if route_hashes and payload_type in ROUTE_PAYLOAD_TYPES_SET: if DEBUG_PAYLOAD: origin_cache = message_origins.get(message_hash) if message_hash else None print( "[route-debug] " f"hash={message_hash or '-'} " f"payload={payload_type if payload_type is not None else '-'} " f"dir={direction_value or '-'} " f"topic_origin={origin_id or '-'} " f"route_origin={route_origin_id or '-'} " f"sender_name={sender_name or '-'} " f"cached_origin={(origin_cache or {}).get('origin_id') or '-'} " f"first_rx={(origin_cache or {}).get('first_rx') or '-'} " f"receiver={receiver_id or '-'} " f"path={route_hashes}" ) loop.call_soon_threadsafe( update_queue.put_nowait, { "type": "route", "path_hashes": route_hashes, "payload_type": payload_type, "message_hash": message_hash, "origin_id": route_origin_id, "receiver_id": receiver_id, "snr_values": snr_values, "route_type": route_type, "sender_name": sender_name, "ts": time.time(), "topic": msg.topic, }, ) route_emitted = True if not parsed: stats["unparsed_total"] += 1 if DEBUG_PAYLOAD: print( f"[mqtt] UNPARSED result={result} topic={msg.topic} preview={debug_entry['payload_preview']!r}" ) return parsed["raw_topic"] = msg.topic stats["parsed_total"] += 1 stats["last_parsed_ts"] = time.time() stats["last_parsed_topic"] = msg.topic if DEBUG_PAYLOAD: print( f"[mqtt] PARSED topic={msg.topic} device={parsed['device_id']} lat={parsed['lat']} lon={parsed['lon']}" ) loop.call_soon_threadsafe( update_queue.put_nowait, { "type": "device", "data": parsed } ) # ========================= # Broadcaster / Reaper # ========================= async def broadcaster(): while True: event = await update_queue.get() if isinstance(event, dict) and event.get("type") in ( "device_name", "device_role", ): device_id = event.get("device_id") device_state = devices.get(device_id) if device_state: if device_id in device_names: device_state.name = device_names[device_id] if device_id in device_roles: device_state.role = device_roles[device_id] payload = { "type": "update", "device": _device_payload(device_id, device_state), "trail": trails.get(device_id, []), } dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) continue if isinstance(event, dict) and event.get("type") == "device_seen": device_id = event.get("device_id") if not device_id: continue seen_ts = event.get("last_seen_ts") if seen_ts: seen_devices[device_id] = seen_ts mqtt_ts_present = "mqtt_seen_ts" in event if mqtt_ts_present: mqtt_ts = event.get("mqtt_seen_ts") if mqtt_ts: mqtt_seen[device_id] = mqtt_ts else: mqtt_seen.pop(device_id, None) if "mqtt_online_source" in event: source = event.get("mqtt_online_source") if source: mqtt_online_source[device_id] = source else: mqtt_online_source.pop(device_id, None) if "mqtt_status_ts" in event: status_ts = event.get("mqtt_status_ts") if status_ts: mqtt_status_seen[device_id] = status_ts if "mqtt_status_value" in event: status_value = event.get("mqtt_status_value") if isinstance(status_value, str) and status_value.strip(): mqtt_status_values[device_id] = status_value.strip().lower() if "mqtt_internal_ts" in event: internal_ts = event.get("mqtt_internal_ts") if internal_ts: mqtt_internal_seen[device_id] = internal_ts if "mqtt_packets_ts" in event: packets_ts = event.get("mqtt_packets_ts") if packets_ts: mqtt_packets_seen[device_id] = packets_ts payload = _mqtt_presence_payload(device_id, seen_devices.get(device_id)) dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) continue if isinstance(event, dict) and event.get("type") == "device_remove": device_id = event.get("device_id") if device_id and _evict_device(device_id): payload = {"type": "stale", "device_ids": [device_id]} dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) continue if isinstance(event, dict) and event.get("type") == "route": route_mode = event.get("route_mode") points = event.get("points") used_hashes: List[str] = [] point_ids: List[Optional[str]] = [] if not points: path_hashes = event.get("path_hashes") or [] points, used_hashes, point_ids = _route_points_from_hashes( list(path_hashes), event.get("origin_id"), event.get("receiver_id"), event.get("ts") or time.time(), ) if not points and route_mode == "fanout": points = _route_points_from_device_ids( event.get("origin_id"), event.get("receiver_id") ) if ( points and event.get("origin_id") and event.get("receiver_id") and len(points) == 2 ): point_ids = [event.get("origin_id"), event.get("receiver_id")] # Fallback: if path hashes are missing/unknown, draw a direct link when possible. if not points: points = _route_points_from_device_ids( event.get("origin_id"), event.get("receiver_id") ) if points: route_mode = "direct" if ( event.get("origin_id") and event.get("receiver_id") and len(points) == 2 ): point_ids = [event.get("origin_id"), event.get("receiver_id")] if not points: continue if MAP_RADIUS_KM > 0: outside = any( not _within_map_radius(point[0], point[1]) for point in points if isinstance(point, (list, tuple)) and len(point) >= 2 ) if outside: continue route_id = event.get("route_id") if not route_id: message_hash = event.get("message_hash") receiver_key = event.get("receiver_id") or "observer" if message_hash: # Keep one active line per (message, observer) so multi-observer # receptions do not overwrite each other. route_id = f"{message_hash}:{receiver_key}" else: route_id = ( f"{event.get('origin_id', 'route')}:{receiver_key}:" f"{int((event.get('ts') or time.time()) * 1000)}" ) expires_at = (event.get("ts") or time.time()) + ROUTE_TTL_SECONDS route = { "id": route_id, "points": points, "hashes": used_hashes, "point_ids": point_ids, "route_mode": route_mode or ("path" if used_hashes else "direct"), "ts": event.get("ts") or time.time(), "expires_at": expires_at, "origin_id": event.get("origin_id"), "receiver_id": event.get("receiver_id"), "payload_type": event.get("payload_type"), "message_hash": event.get("message_hash"), "snr_values": event.get("snr_values"), "sender_name": event.get("sender_name"), "topic": event.get("topic"), } _append_heat_points(points, route["ts"], event.get("payload_type")) routes[route_id] = route if point_ids and used_hashes: _record_neighbors(point_ids, route["ts"]) _update_path_timestamps(point_ids, route["ts"]) history_updates, history_removed = _record_route_history(route) payload = {"type": "route", "route": _route_payload(route)} dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) if history_updates or history_removed: history_payload = {} if history_updates: history_payload["type"] = "history_edges" history_payload["edges"] = [ _history_edge_payload(edge) for edge in history_updates ] if history_removed: history_payload_remove = { "type": "history_edges_remove", "edge_ids": history_removed, } else: history_payload_remove = None dead = [] for ws in list(clients): try: if history_updates: await ws.send_text(json.dumps(history_payload)) if history_payload_remove: await ws.send_text(json.dumps(history_payload_remove)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) continue upd = ( event.get("data") if isinstance(event, dict) and event.get("type") == "device" else event ) device_id = upd["device_id"] # Check if device has coordinate override before filtering by radius coord_override = device_coords.get(device_id) if coord_override: # Use override coordinates for radius check check_lat = coord_override["lat"] check_lon = coord_override["lon"] else: check_lat = upd.get("lat") check_lon = upd.get("lon") if not _within_map_radius(check_lat, check_lon): if _evict_device(device_id): payload = {"type": "stale", "device_ids": [device_id]} dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) continue is_new_device = device_id not in devices # Normalize timestamp: if it's too far in the future (> 1 hour), use current time now_ts = time.time() device_ts = upd.get("ts", now_ts) if device_ts > now_ts + 3600: # More than 1 hour in future if DEBUG_PAYLOAD: print(f"[mqtt] Normalized future timestamp in device state: device={device_id} future_ts={device_ts} -> now={now_ts}") device_ts = now_ts device_state = DeviceState( device_id=device_id, lat=upd["lat"], lon=upd["lon"], ts=device_ts, heading=upd.get("heading"), speed=upd.get("speed"), rssi=upd.get("rssi"), snr=upd.get("snr"), name=upd.get("name") or device_names.get(device_id), role=upd.get("role") or device_roles.get(device_id), raw_topic=upd.get("raw_topic"), ) # Apply coordinate overrides coord_override = device_coords.get(device_id) if coord_override: device_state.lat = coord_override["lat"] device_state.lon = coord_override["lon"] devices[device_id] = device_state now_seen = time.time() seen_devices[device_id] = now_seen if device_id not in first_seen_devices: first_seen_devices[device_id] = now_seen state.state_dirty = True if is_new_device: _rebuild_node_hash_map() if device_state.name: device_names[device_id] = device_state.name if device_state.role: device_roles[device_id] = device_state.role if TRAIL_LEN > 0 and not _coords_are_zero( device_state.lat, device_state.lon ): trails.setdefault(device_id, []) trails[device_id].append( [device_state.lat, device_state.lon, device_state.ts] ) if len(trails[device_id]) > TRAIL_LEN: trails[device_id] = trails[device_id][-TRAIL_LEN:] elif device_id in trails: trails.pop(device_id, None) payload = { "type": "update", "device": _device_payload(device_id, device_state), "trail": trails.get(device_id, []), } dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) async def reaper(): global mqtt_presence_last_summary while True: now = time.time() _refresh_mqtt_presence(now) if DEVICE_TTL_WINDOW_SECONDS > 0 or PATH_TTL_SECONDS > 0: stale = [] for dev_id, st in list(devices.items()): if dev_id in mqtt_seen: continue device_stale = ( DEVICE_TTL_WINDOW_SECONDS > 0 and (now - st.ts > DEVICE_TTL_WINDOW_SECONDS) ) last_path_ts = state.last_seen_in_path.get(dev_id, 0.0) path_stale = ( PATH_TTL_SECONDS > 0 and (last_path_ts <= 0 or now - last_path_ts > PATH_TTL_SECONDS) ) if DEVICE_TTL_WINDOW_SECONDS > 0 and PATH_TTL_SECONDS > 0: should_stale = device_stale and path_stale elif DEVICE_TTL_WINDOW_SECONDS > 0: should_stale = device_stale else: should_stale = path_stale if should_stale: stale.append(dev_id) if stale: payload = {"type": "stale", "device_ids": stale} dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) for dev_id in stale: devices.pop(dev_id, None) trails.pop(dev_id, None) state.last_seen_in_path.pop(dev_id, None) state.state_dirty = True _rebuild_node_hash_map() if routes: bad_routes = [] for route_id, route in list(routes.items()): points = route.get("points") if isinstance(route, dict) else None if not isinstance(points, list): continue if any( _coords_are_zero(p[0], p[1]) for p in points if isinstance(p, list) and len(p) >= 2 ): bad_routes.append(route_id) if bad_routes: payload = {"type": "route_remove", "route_ids": bad_routes} dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) for route_id in bad_routes: routes.pop(route_id, None) stale_routes = [ route_id for route_id, route in list(routes.items()) if now > route.get("expires_at", 0) ] if stale_routes: payload = {"type": "route_remove", "route_ids": stale_routes} dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) for route_id in stale_routes: routes.pop(route_id, None) history_updates, history_removed = _prune_route_history() if history_updates or history_removed: dead = [] for ws in list(clients): try: if history_updates: await ws.send_text( json.dumps({ "type": "history_edges", "edges": history_updates }) ) if history_removed: await ws.send_text( json.dumps( { "type": "history_edges_remove", "edge_ids": history_removed, } ) ) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) if HEAT_TTL_SECONDS > 0 and heat_events: cutoff = now - HEAT_TTL_SECONDS heat_events[:] = [ entry for entry in heat_events if entry.get("ts", 0) >= cutoff ] if message_origins: for msg_hash, info in list(message_origins.items()): if now - info.get("ts", 0) > MESSAGE_ORIGIN_TTL_SECONDS: message_origins.pop(msg_hash, None) _prune_neighbors(now) presence_summary = _mqtt_presence_summary(now) if presence_summary != mqtt_presence_last_summary: mqtt_presence_last_summary = dict(presence_summary) payload = {"type": "mqtt_presence", "mqtt_presence": presence_summary} dead = [] for ws in list(clients): try: await ws.send_text(json.dumps(payload)) except Exception: dead.append(ws) for ws in dead: clients.discard(ws) retention_window = max( DEVICE_TTL_WINDOW_SECONDS if DEVICE_TTL_WINDOW_SECONDS > 0 else 0, PATH_TTL_SECONDS if PATH_TTL_SECONDS > 0 else 0, ) prune_after = max(retention_window * 3, 900) if retention_window > 0 else 86400 for dev_id, last in list(seen_devices.items()): if now - last > prune_after: seen_devices.pop(dev_id, None) first_seen_devices.pop(dev_id, None) last_seen_in_advert.pop(dev_id, None) state.last_seen_in_path.pop(dev_id, None) if _prune_peer_history(now): state.state_dirty = True await asyncio.sleep(5) # ========================= # Helpers: Turnstile auth # ========================= TURNSTILE_BOT_TOKENS = [ token.strip().lower() for token in TURNSTILE_BOT_ALLOWLIST.split(",") if token and token.strip() ] def _is_allowlisted_bot(request: Request) -> bool: """Return True when the request looks like a known embed bot.""" if not TURNSTILE_ENABLED or not TURNSTILE_BOT_BYPASS: return False user_agent = (request.headers.get("user-agent") or "").lower() if not user_agent: return False for token in TURNSTILE_BOT_TOKENS: if token in user_agent: return True return False def _check_turnstile_auth(request: Request) -> bool: """Check if user has valid Turnstile auth token.""" if not TURNSTILE_ENABLED or not turnstile_verifier: return True # Allowlist common embed bots (Discord, Slack, etc.) if _is_allowlisted_bot(request): ua = request.headers.get("user-agent", "-") print(f"[turnstile] bot bypass user-agent={ua}") return True # Check for auth token in cookies or headers auth_token = request.cookies.get("meshmap_auth") or \ request.headers.get("Authorization", "").replace("Bearer ", "") if auth_token and turnstile_verifier.verify_auth_token(auth_token): return True return False # ========================= # FastAPI routes # ========================= @app.get("/") def root(request: Request): # If Turnstile is enabled and user isn't authenticated, serve landing page if TURNSTILE_ENABLED and not _check_turnstile_auth(request): landing_path = os.path.join(APP_DIR, "static", "landing.html") try: with open(landing_path, "r", encoding="utf-8") as handle: content = handle.read() except Exception: return FileResponse("static/landing.html") # Replace template variables in landing page replacements = { "SITE_TITLE": SITE_TITLE, "SITE_DESCRIPTION": SITE_DESCRIPTION, "SITE_ICON": SITE_ICON, "TURNSTILE_SITE_KEY": TURNSTILE_SITE_KEY, "ASSET_VERSION": ASSET_VERSION, } for key, value in replacements.items(): safe_value = html.escape(str(value), quote=True) content = content.replace(f"{{{{{key}}}}}", safe_value) return HTMLResponse(content) # Serve map page html_path = os.path.join(APP_DIR, "static", "index.html") try: with open(html_path, "r", encoding="utf-8") as handle: content = handle.read() except Exception: return FileResponse("static/index.html") # Check for lat/lon parameters for dynamic preview image query_params = request.query_params lat_param = query_params.get("lat") or query_params.get("latitude") lon_param = ( query_params.get("lon") or query_params.get("lng") or query_params.get("long") or query_params.get("longitude") ) zoom_param = query_params.get("zoom") og_image_tag = "" twitter_image_tag = "" og_url = SITE_URL # Generate dynamic preview image if coordinates are provided if lat_param and lon_param: try: lat = float(lat_param) lon = float(lon_param) zoom = int(zoom_param) if zoom_param and zoom_param.isdigit() else 13 zoom = max(1, min(18, zoom)) # Clamp zoom between 1-18 # Generate preview image URL pointing to our own server # Use absolute URL for better compatibility with Discord and other platforms base_url = str(request.url).split("?")[0].rstrip("/") preview_params = urlencode( { "lat": lat, "lon": lon, "zoom": zoom, "marker": "blue", "theme": "dark", } ) preview_url = f"{base_url}/preview.png?{preview_params}" # Ensure absolute URL (use SITE_URL if available, otherwise construct from request) if SITE_URL and SITE_URL.startswith("http"): site_base = SITE_URL.rstrip("/") preview_url = f"{site_base}/preview.png?{preview_params}" elif not preview_url.startswith("http"): # Fallback: construct from request scheme = request.url.scheme host = request.headers.get("host", request.url.hostname or "localhost") preview_url = f"{scheme}://{host}/preview.png?{preview_params}" safe_image = html.escape(preview_url, quote=True) # Add image dimensions for better Discord/social media compatibility # Note: Preview image may fail if container can't reach external services # In that case, fall back to static SITE_OG_IMAGE if available og_image_tag = ( f'\n' f' \n' f' \n' f' ' ) twitter_image_tag = f'' # If static image is configured, add it as a fallback if SITE_OG_IMAGE: safe_static_image = html.escape(str(SITE_OG_IMAGE), quote=True) og_image_tag += f'\n ' # Update og:url to include query parameters base_url = str(request.url).split("?")[0] og_url = f"{base_url}?lat={lat}&lon={lon}" if zoom_param: og_url += f"&zoom={zoom}" except (ValueError, TypeError): # Invalid coordinates, fall back to static image if SITE_OG_IMAGE: safe_image = html.escape(str(SITE_OG_IMAGE), quote=True) og_image_tag = f'' twitter_image_tag = ( f'' ) elif SITE_OG_IMAGE: safe_image = html.escape(str(SITE_OG_IMAGE), quote=True) og_image_tag = f'' twitter_image_tag = f'' content = content.replace("{{OG_IMAGE_TAG}}", og_image_tag) content = content.replace("{{TWITTER_IMAGE_TAG}}", twitter_image_tag) trail_info_suffix = "" if TRAIL_LEN > 0: trail_info_suffix = f" Trails show last ~{TRAIL_LEN} points." boundary_json = json.dumps(get_map_boundary_points()).replace(" Tuple[float, float]: lat_rad = math.radians(lat_deg) n = 2.0**zoom_level x_px = (lon_deg + 180.0) / 360.0 * n * tile_size y_px = ( (1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n * tile_size ) return (x_px, y_px) draw = ImageDraw.Draw(final_image) center_px_x, center_px_y = latlon_to_global_px(lat, lon, zoom_val) node_radius = 5 node_color = (86, 198, 255) if theme_str == "dark" else (25, 83, 170) node_outline = (15, 15, 15) if theme_str == "dark" else (255, 255, 255) for state in list(devices.values()): try: dev_lat = float(state.lat) dev_lon = float(state.lon) except Exception: continue if _coords_are_zero(dev_lat, dev_lon ) or not _within_map_radius(dev_lat, dev_lon): continue dev_px_x, dev_px_y = latlon_to_global_px(dev_lat, dev_lon, zoom_val) img_x = width / 2 + (dev_px_x - center_px_x) img_y = height / 2 + (dev_px_y - center_px_y) if ( img_x < -node_radius or img_x > width + node_radius or img_y < -node_radius or img_y > height + node_radius ): continue draw.ellipse( [ (img_x - node_radius, img_y - node_radius), (img_x + node_radius, img_y + node_radius), ], fill=node_color, outline=node_outline, width=2, ) # Draw marker marker_color_map = { "red": (220, 53, 69), "blue": (0, 123, 255), "green": (40, 167, 69), "yellow": (255, 193, 7), "orange": (255, 152, 0), "purple": (108, 117, 125), "black": (0, 0, 0), "white": (255, 255, 255), } marker_color = marker_color_map.get( marker_str, (0, 123, 255) ) # Default to blue # Calculate marker position (center of image) marker_x = width // 2 marker_y = height // 2 # Draw a circle marker marker_radius = 12 draw.ellipse( [ (marker_x - marker_radius, marker_y - marker_radius), (marker_x + marker_radius, marker_y + marker_radius), ], fill=marker_color, outline=(255, 255, 255), width=2, ) # Convert to PNG bytes img_bytes = BytesIO() final_image.save(img_bytes, format="PNG") img_bytes.seek(0) return Response( content=img_bytes.getvalue(), media_type="image/png", headers={ "Cache-Control": "public, max-age=3600", # Cache for 1 hour }, ) except Exception as e: print(f"[preview] Error generating map image: {e}") import traceback traceback.print_exc() # Even if tile fetching fails, try to return a simple map with marker try: bg_color = (18, 18, 18) if theme_str == "dark" else (242, 239, 233) fallback_image = Image.new("RGB", (width, height), bg_color) draw = ImageDraw.Draw(fallback_image) # Draw marker marker_color_map = { "red": (220, 53, 69), "blue": (0, 123, 255), "green": (40, 167, 69), "yellow": (255, 193, 7), "orange": (255, 152, 0), "purple": (108, 117, 125), "black": (0, 0, 0), "white": (255, 255, 255), } marker_color = marker_color_map.get(marker_str, (0, 123, 255)) marker_x = width // 2 marker_y = height // 2 marker_radius = 12 draw.ellipse( [ (marker_x - marker_radius, marker_y - marker_radius), (marker_x + marker_radius, marker_y + marker_radius), ], fill=marker_color, outline=(255, 255, 255), width=2, ) img_bytes = BytesIO() fallback_image.save(img_bytes, format="PNG") img_bytes.seek(0) print( f"[preview] Returning fallback image with marker (tile fetch failed)" ) return Response( content=img_bytes.getvalue(), media_type="image/png", headers={"Cache-Control": "public, max-age=300"}, ) except Exception as fallback_error: print( f"[preview] Fallback image generation also failed: {fallback_error}" ) # Only redirect to static image if even fallback fails if SITE_OG_IMAGE and SITE_OG_IMAGE.startswith("http"): from fastapi.responses import RedirectResponse print( f"[preview] All image generation failed, redirecting to static OG image: {SITE_OG_IMAGE}" ) return RedirectResponse(url=SITE_OG_IMAGE, status_code=302) # Return transparent PNG as last resort transparent_png = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\nIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01\r\n-\xdb\x00\x00\x00\x00IEND\xaeB`\x82" return Response( content=transparent_png, media_type="image/png", headers={"Cache-Control": "public, max-age=300"}, ) except Exception as e: # Log error for debugging print(f"[preview] Error generating preview image: {e}") import traceback traceback.print_exc() # Return empty image on error return Response(content=b"", status_code=500, media_type="image/png") @app.get("/map") def map_page(request: Request): """Serve the map page (with Turnstile auth check).""" # If Turnstile is enabled and user isn't authenticated, redirect to landing if TURNSTILE_ENABLED and not _check_turnstile_auth(request): print("[map] Unauthenticated user accessing /map, redirecting to /") return HTMLResponse( "", status_code=303, ) # Otherwise serve the map page html_path = os.path.join(APP_DIR, "static", "index.html") try: with open(html_path, "r", encoding="utf-8") as handle: content = handle.read() except Exception: return FileResponse("static/index.html") # Include all the template replacements (same as root endpoint) # Generate OG image tags og_image_tag = "" twitter_image_tag = "" if SITE_OG_IMAGE: safe_image = html.escape(str(SITE_OG_IMAGE), quote=True) og_image_tag = f'' twitter_image_tag = f'' content = content.replace("{{OG_IMAGE_TAG}}", og_image_tag) content = content.replace("{{TWITTER_IMAGE_TAG}}", twitter_image_tag) trail_info_suffix = "" if TRAIL_LEN > 0: trail_info_suffix = f" Trails show last ~{TRAIL_LEN} points." boundary_json = json.dumps(get_map_boundary_points()).replace(" max_last_seen: max_last_seen = float(last_seen) all_nodes.append(payload) if apply_delta and cutoff is not None and float(last_seen) < cutoff: continue nodes.append(payload) nodes.sort(key=lambda item: item.get("public_key") or "") if not apply_delta: all_nodes.sort(key=lambda item: item.get("public_key") or "") nodes = all_nodes payload: Dict[str, Any] = { "server_time": time.time(), "max_last_seen_ts": max_last_seen or None, "updated_since_applied": bool(apply_delta and cutoff is not None), "updated_since_ignored": bool(updated_since and not apply_delta), } if format_nested: payload["data"] = {"nodes": nodes} else: # Default to the flat list for MeshBuddy compatibility. payload["data"] = nodes payload["nodes"] = nodes return payload @app.get("/peers/{device_id}") def get_peers(device_id: str, request: Request, limit: Optional[int] = Query(None)): _require_prod_token(request) if not device_id: raise HTTPException(status_code=400, detail="device_id required") raw_limit = PEERS_DEFAULT_LIMIT if limit is None else limit limit_value = max(1, int(raw_limit or PEERS_DEFAULT_LIMIT)) payload = _peer_stats_for_device(device_id, limit_value) state = devices.get(device_id) if state and not _coords_are_zero(state.lat, state.lon): payload["lat"] = float(state.lat) payload["lon"] = float(state.lon) payload["name"] = ( (state.name if state else None) or device_names.get(device_id) or "" ) payload["role"] = (state.role if state else None) or device_roles.get(device_id) payload["last_seen_ts"] = seen_devices.get(device_id ) or (state.ts if state else None) payload["server_time"] = time.time() return payload def _peer_device_payload( peer_id: str, count: int, total: int, last_ts: Optional[float] ) -> Dict[str, Any]: state = devices.get(peer_id) name = None role = None lat = None lon = None if state: name = state.name or device_names.get(peer_id) role = state.role or device_roles.get(peer_id) if not _coords_are_zero(state.lat, state.lon): lat = float(state.lat) lon = float(state.lon) if not name: name = device_names.get(peer_id) percent = (count / total * 100.0) if total > 0 else 0.0 return { "peer_id": peer_id, "name": name or "", "role": role, "lat": lat, "lon": lon, "count": int(count), "percent": round(percent, 1), "last_seen_ts": last_ts, } def _peer_is_excluded(peer_id: str) -> bool: if not MQTT_ONLINE_FORCE_NAMES_SET: return False state = devices.get(peer_id) name_value = "" if state and state.name: name_value = state.name if not name_value: name_value = device_names.get(peer_id) or "" if not name_value: return False return name_value.strip().lower() in MQTT_ONLINE_FORCE_NAMES_SET def _peer_stats_for_device(device_id: str, limit: int) -> Dict[str, Any]: inbound: Dict[str, int] = {} outbound: Dict[str, int] = {} inbound_last: Dict[str, float] = {} outbound_last: Dict[str, float] = {} cutoff = _peer_history_cutoff() if not peer_history_pairs and route_history_segments: _rebuild_peer_history_from_segments() for entry in peer_history_pairs.values(): if not isinstance(entry, dict): continue a_id = entry.get("a_id") b_id = entry.get("b_id") if not a_id or not b_id: continue buckets = entry.get("buckets") if not isinstance(buckets, dict): continue count = 0 last_ts = 0.0 for bucket_key, bucket_count in buckets.items(): try: bucket_start = float(bucket_key) bucket_value = int(bucket_count) except (TypeError, ValueError): continue if bucket_value <= 0: continue bucket_end = bucket_start + PEER_HISTORY_BUCKET_SECONDS if bucket_end < cutoff: continue count += bucket_value last_ts = max(last_ts, bucket_end) if count <= 0: continue if a_id == device_id and b_id != device_id: if _peer_is_excluded(b_id): continue outbound[b_id] = outbound.get(b_id, 0) + count outbound_last[b_id] = max(outbound_last.get(b_id, 0), float(last_ts)) if b_id == device_id and a_id != device_id: if _peer_is_excluded(a_id): continue inbound[a_id] = inbound.get(a_id, 0) + count inbound_last[a_id] = max(inbound_last.get(a_id, 0), float(last_ts)) inbound_total = sum(inbound.values()) outbound_total = sum(outbound.values()) inbound_items = [ _peer_device_payload( peer_id, count, inbound_total, inbound_last.get(peer_id) ) for peer_id, count in inbound.items() ] outbound_items = [ _peer_device_payload( peer_id, count, outbound_total, outbound_last.get(peer_id) ) for peer_id, count in outbound.items() ] inbound_items.sort(key=lambda item: item.get("count", 0), reverse=True) outbound_items.sort(key=lambda item: item.get("count", 0), reverse=True) if limit > 0: inbound_items = inbound_items[:limit] outbound_items = outbound_items[:limit] return { "device_id": device_id, "incoming_total": inbound_total, "outgoing_total": outbound_total, "incoming": inbound_items, "outgoing": outbound_items, "window_hours": ROUTE_HISTORY_HOURS, } @app.get("/los") def line_of_sight( lat1: float, lon1: float, lat2: float, lon2: float, profile: bool = False, h1: float = 0.0, h2: float = 0.0, ): include_points = bool(profile) start = _normalize_lat_lon(lat1, lon1) end = _normalize_lat_lon(lat2, lon2) if not start or not end: return {"ok": False, "error": "invalid_coords"} points = _sample_los_points(start[0], start[1], end[0], end[1]) elevations, error = _fetch_elevations(points) if error: return {"ok": False, "error": error} distance_m = _haversine_m(start[0], start[1], end[0], end[1]) if distance_m <= 0: return {"ok": False, "error": "zero_distance"} safe_h1 = h1 if math.isfinite(h1) else 0.0 safe_h2 = h2 if math.isfinite(h2) else 0.0 start_elev = elevations[0] + safe_h1 end_elev = elevations[-1] + safe_h2 adjusted = _los_effective_elevations(points, elevations, distance_m) adjusted[0] = start_elev adjusted[-1] = end_elev max_obstruction = _los_max_obstruction(points, adjusted, 0, len(points) - 1) max_terrain = max(elevations) blocked = max_obstruction > 0.0 suggestion = _find_los_suggestion(points, adjusted) if blocked else None profile_samples = [] if distance_m > 0: for idx, ((lat, lon, t), elev) in enumerate(zip(points, elevations)): line_elev = start_elev + (end_elev - start_elev) * t profile_samples.append( [ round(distance_m * t, 2), round(float(adjusted[idx]), 2), round(float(line_elev), 2), ] ) peaks = _find_los_peaks(points, elevations, distance_m) response = { "ok": True, "blocked": blocked, "max_obstruction_m": round(max_obstruction, 2), "distance_m": round(distance_m, 2), "distance_km": round(distance_m / 1000.0, 3), "distance_mi": round(distance_m / 1609.344, 3), "samples": len(points), "elevation_m": { "start": round(start_elev, 2), "end": round(end_elev, 2), "max_terrain": round(max_terrain, 2), }, "provider": LOS_ELEVATION_URL, "note": ( "LOS using SRTM90m with Earth curvature " f"({'on' if LOS_CURVATURE_ENABLED else 'off'}; " f"factor={LOS_CURVATURE_FACTOR:.6f}). No Fresnel clearance model." ), "suggested": suggestion, "profile": profile_samples, "peaks": peaks, } if include_points: response["profile_points"] = [ [round(lat, 6), round(lon, 6), round(t, 4), round(float(elev), 2)] for (lat, lon, t), elev in zip(points, elevations) ] return response @app.get("/los/elevations") def los_elevations(locations: str = ""): raw = [loc for loc in (locations or "").split("|") if loc.strip()] if not raw: return {"status": "ERROR", "error": "missing_locations"} if len(raw) > 200: return {"status": "ERROR", "error": "too_many_locations"} points = [] for loc in raw: parts = loc.split(",") if len(parts) != 2: return {"status": "ERROR", "error": "invalid_location"} try: lat = float(parts[0]) lon = float(parts[1]) except (ValueError, TypeError): return {"status": "ERROR", "error": "invalid_coords"} normalized = _normalize_lat_lon(lat, lon) if not normalized: return {"status": "ERROR", "error": "invalid_coords"} points.append((normalized[0], normalized[1], 0.0)) elevations, error = _fetch_elevations(points) if error: return {"status": "ERROR", "error": error} return { "status": "OK", "results": [{"elevation": round(float(elev), 2)} for elev in elevations], "provider": LOS_ELEVATION_URL, } @app.get("/coverage") async def get_coverage(): if not COVERAGE_API_URL: raise HTTPException( status_code=503, detail= "coverage_api_not_configured: Set COVERAGE_API_URL in .env (e.g., http://localhost:3000)", ) try: now = time.time() is_meshmapper = _is_meshmapper_coverage_url(COVERAGE_API_URL) if is_meshmapper: if not _coverage_cache_has_data(): _load_coverage_cache_file() if _coverage_cache_has_data(): age = int(max(0, now - float(coverage_cache.get("fetched_at") or 0.0))) source = coverage_cache.get("source") or "cache" cached_data = coverage_cache["data"] filtered = _filter_coverage_by_age(cached_data, now=now) print( f"[coverage] Serving MeshMapper cached data age={age}s source={source} filtered={len(filtered)}/{len(cached_data)}" ) return JSONResponse(filtered, headers=_coverage_response_headers("meshmapper")) cooldown_until = float(coverage_cache.get("cooldown_until") or 0.0) if cooldown_until > now: remaining = int(max(1, math.ceil(cooldown_until - now))) raise HTTPException( status_code=429, detail=f"coverage_rate_limited: retry_in_seconds={remaining}", ) raise HTTPException(status_code=503, detail="coverage_cache_empty") samples, provider, _meta = await _fetch_coverage_upstream() filtered = _filter_coverage_by_age(samples, now=now) print(f"[coverage] Serving filtered legacy coverage {len(filtered)}/{len(samples)}") return JSONResponse(filtered, headers=_coverage_response_headers(provider)) except httpx.TimeoutException: raise HTTPException(status_code=504, detail="coverage_api_timeout") except httpx.HTTPStatusError as e: raise HTTPException( status_code=502, detail= f"coverage_api_error: HTTP {e.response.status_code} from {COVERAGE_API_URL}", ) except HTTPException: raise except httpx.HTTPError as e: raise HTTPException(status_code=502, detail=f"coverage_api_error: {str(e)}") except Exception as e: raise HTTPException( status_code=500, detail=f"coverage_fetch_error: {str(e)}" ) @app.get("/debug/last") def debug_last_entries(): if PROD_MODE: raise HTTPException(status_code=404, detail="not_found") return { "count": len(debug_last), "items": list(reversed(list(debug_last))), "server_time": time.time(), } @app.get("/debug/status") def debug_status_entries(): if PROD_MODE: raise HTTPException(status_code=404, detail="not_found") return { "count": len(status_last), "items": list(reversed(list(status_last))), "server_time": time.time(), } @app.post("/api/verify-turnstile") async def verify_turnstile(request: Request): """Verify Cloudflare Turnstile token and issue auth token.""" if not TURNSTILE_ENABLED or not turnstile_verifier: return JSONResponse( {"success": False, "error": "Turnstile is not enabled"}, status_code=400, ) try: body = await request.json() token = body.get("token", "").strip() if not token: return JSONResponse( {"success": False, "error": "Token is required"}, status_code=400, ) # Verify token with Cloudflare success, error = await turnstile_verifier.verify_turnstile_token( token, remote_ip=request.client.host if request.client else None, ) if not success: print(f"[turnstile] Verification failed: {error}") return JSONResponse( {"success": False, "error": error or "Verification failed"}, status_code=400, ) # Issue auth token for client auth_token = turnstile_verifier.issue_auth_token() print(f"[turnstile] Verification successful, issued auth token") # Create response with auth token and set cookie response = JSONResponse( { "success": True, "auth_token": auth_token, }, status_code=200, ) # Set auth cookie (expires in TURNSTILE_TOKEN_TTL_SECONDS) response.set_cookie( key="meshmap_auth", value=auth_token, max_age=TURNSTILE_TOKEN_TTL_SECONDS, path="/", samesite="lax", ) return response except json.JSONDecodeError: return JSONResponse( {"success": False, "error": "Invalid JSON"}, status_code=400, ) except Exception as e: print(f"[turnstile] Error verifying token: {e}") return JSONResponse( {"success": False, "error": str(e)}, status_code=500, ) @app.websocket("/ws") async def ws_endpoint(ws: WebSocket): if not _ws_authorized(ws): await ws.accept() await ws.close(code=1008) return await ws.accept() clients.add(ws) await ws.send_text( json.dumps( { "type": "snapshot", "devices": { k: _device_payload(k, v) for k, v in devices.items() }, "trails": trails, "routes": _snapshot_routes(time.time()), "history_edges": [_history_edge_payload(e) for e in route_history_edges.values()], "history_window_seconds": int(max(0, ROUTE_HISTORY_HOURS * 3600)), "heat": _serialize_heat_events(), "update": git_update_info, "mqtt_presence": _mqtt_presence_summary(), "server_time": time.time(), } ) ) try: while True: await ws.receive_text() except WebSocketDisconnect: pass except RuntimeError: pass finally: clients.discard(ws)