meshcore-mqtt-live-map/backend/app.py
2026-04-16 14:05:25 -04:00

3914 lines
124 KiB
Python

import asyncio
import json
import os
import html
import time
import subprocess
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from dataclasses import asdict
from typing import Any, Dict, Optional, Set, List, Tuple
import httpx
import paho.mqtt.client as mqtt
import qrcode
from fastapi import (
FastAPI,
WebSocket,
WebSocketDisconnect,
Request,
HTTPException,
Query,
)
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, Response
from fastapi.staticfiles import StaticFiles
from urllib.parse import urlencode, parse_qsl, urlsplit, urlunsplit
from io import BytesIO
from PIL import Image, ImageDraw
import math
import state
from boundary import (
get_map_boundary_name,
get_map_boundary_points,
load_map_boundary,
within_map_boundary,
)
from decoder import (
ROUTE_PAYLOAD_TYPES_SET,
_append_heat_points,
_coords_are_zero,
_device_id_from_topic,
_ensure_node_decoder,
_normalize_lat_lon,
_normalize_role,
_rebuild_node_hash_map,
_route_points_from_hashes,
_route_points_from_device_ids,
_safe_preview,
_serialize_heat_events,
_try_parse_payload,
DIRECT_COORDS_TOPIC_RE,
_node_ready_once,
_node_unavailable_once,
)
from history import (
_load_route_history,
PEER_HISTORY_BUCKET_SECONDS,
_peer_history_cutoff,
_prune_peer_history,
_prune_route_history,
_rebuild_peer_history_from_segments,
_record_route_history,
_route_history_saver,
)
from backup import _backup_loop
from weather import create_weather_router
from turnstile import TurnstileVerifier
from los import (
_fetch_elevations,
_find_los_peaks,
_find_los_suggestion,
_los_effective_elevations,
_haversine_m,
_los_max_obstruction,
_sample_los_points,
)
from config import (
MQTT_HOST,
MQTT_PORT,
MQTT_USERNAME,
MQTT_PASSWORD,
MQTT_TOPIC,
MQTT_TOPICS,
MQTT_TLS,
MQTT_TLS_INSECURE,
MQTT_CA_CERT,
MQTT_TRANSPORT,
MQTT_WS_PATH,
MQTT_CLIENT_ID,
STATE_DIR,
STATE_FILE,
DEVICE_ROLES_FILE,
DEVICE_COORDS_FILE,
NEIGHBOR_OVERRIDES_FILE,
BACKUP_ENABLED,
BACKUP_INTERVAL_SECONDS,
BACKUP_DIR,
BACKUP_RETENTION_DAYS,
STATE_SAVE_INTERVAL,
DEVICE_TTL_WINDOW_SECONDS,
PATH_TTL_SECONDS,
TRAIL_LEN,
ROUTE_TTL_SECONDS,
ROUTE_PAYLOAD_TYPES,
ROUTE_PATH_MAX_LEN,
ROUTE_HISTORY_ENABLED,
ROUTE_HISTORY_HOURS,
ROUTE_HISTORY_MAX_SEGMENTS,
ROUTE_HISTORY_FILE,
ROUTE_HISTORY_PAYLOAD_TYPES,
ROUTE_HISTORY_ALLOWED_MODES,
ROUTE_HISTORY_COMPACT_INTERVAL,
HISTORY_EDGE_SAMPLE_LIMIT,
MESSAGE_ORIGIN_TTL_SECONDS,
HEAT_TTL_SECONDS,
MQTT_ONLINE_SECONDS,
MQTT_ONLINE_STATUS_TTL_SECONDS,
MQTT_ONLINE_INTERNAL_TTL_SECONDS,
MQTT_ACTIVITY_PACKETS_TTL_SECONDS,
MQTT_SEEN_BROADCAST_MIN_SECONDS,
MQTT_ONLINE_FORCE_NAMES_SET,
MQTT_STATUS_OFFLINE_VALUES_SET,
PEERS_DEFAULT_LIMIT,
DEBUG_PAYLOAD,
DEBUG_PAYLOAD_MAX,
TURNSTILE_ENABLED,
TURNSTILE_SITE_KEY,
TURNSTILE_SECRET_KEY,
TURNSTILE_API_URL,
TURNSTILE_TOKEN_TTL_SECONDS,
TURNSTILE_BOT_BYPASS,
TURNSTILE_BOT_ALLOWLIST,
DECODE_WITH_NODE,
NODE_DECODE_TIMEOUT_SECONDS,
PAYLOAD_PREVIEW_MAX,
DIRECT_COORDS_MODE,
DIRECT_COORDS_TOPIC_REGEX,
DIRECT_COORDS_ALLOW_ZERO,
ROUTE_HISTORY_ALLOWED_MODES_SET,
SITE_TITLE,
SITE_DESCRIPTION,
SITE_OG_IMAGE,
SITE_URL,
SITE_ICON,
SITE_FEED_NOTE,
CUSTOM_LINK_URL,
PACKET_ANALYZER_URL,
QR_CODE_BUTTON_ENABLED,
GIT_CHECK_ENABLED,
GIT_CHECK_FETCH,
GIT_CHECK_PATH,
GIT_CHECK_INTERVAL_SECONDS,
DISTANCE_UNITS,
NODE_MARKER_RADIUS,
HISTORY_LINK_SCALE,
MAP_START_LAT,
MAP_START_LON,
MAP_START_ZOOM,
MAP_RADIUS_KM,
MAP_RADIUS_SHOW,
MAP_BOUNDARY_MODE,
MAP_BOUNDARY_FILE,
MAP_BOUNDARY_SHOW,
MAP_DEFAULT_LAYER,
PROD_MODE,
PROD_TOKEN,
LOS_ELEVATION_URL,
LOS_ELEVATION_PROXY_URL,
LOS_SAMPLE_MIN,
LOS_SAMPLE_MAX,
LOS_SAMPLE_STEP_METERS,
ELEVATION_CACHE_TTL,
LOS_CURVATURE_ENABLED,
LOS_CURVATURE_FACTOR,
LOS_PEAKS_MAX,
COVERAGE_API_URL,
COVERAGE_API_KEY,
COVERAGE_MAX_AGE_DAYS,
COVERAGE_CACHE_FILE,
COVERAGE_RATE_LIMIT_COOLDOWN_SECONDS,
COVERAGE_SYNC_INTERVAL_SECONDS,
WEATHER_RADAR_ENABLED,
WEATHER_RADAR_COUNTRY_BOUNDS_ENABLED,
WEATHER_RADAR_COUNTRY_LOOKUP_URL,
WEATHER_WIND_ENABLED,
WEATHER_WIND_API_URL,
WEATHER_WIND_GRID_SIZE,
WEATHER_WIND_REFRESH_SECONDS,
APP_VERSION,
APP_DIR,
NODE_SCRIPT_PATH,
)
from state import (
DeviceState,
stats,
result_counts,
seen_devices,
mqtt_seen,
mqtt_online_source,
mqtt_status_seen,
mqtt_status_values,
mqtt_internal_seen,
mqtt_packets_seen,
last_seen_broadcast,
topic_counts,
debug_last,
status_last,
devices,
trails,
routes,
heat_events,
route_history_segments,
route_history_edges,
peer_history_pairs,
node_hash_to_device,
node_hash_collisions,
node_hash_candidates,
elevation_cache,
device_names,
message_origins,
device_roles,
device_role_sources,
device_coords,
neighbor_edges,
first_seen_devices,
last_seen_in_advert,
)
# =========================
# App / State
# =========================
mqtt_client: Optional[mqtt.Client] = None
background_tasks: Set[asyncio.Task[Any]] = set()
clients: Set[WebSocket] = set()
update_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()
git_update_info = {
"available": False,
"local": None,
"remote": None,
"local_short": None,
"remote_short": None,
"error": None,
}
mqtt_presence_last_summary: Dict[str, int] = {}
def _normalize_route_hashes_for_path_length(
path_hashes: Any,
path_length: Any,
) -> Optional[List[Any]]:
if not isinstance(path_hashes, list) or not path_hashes:
return None
try:
path_length_int = int(path_length) if path_length is not None else None
except (TypeError, ValueError):
path_length_int = None
if path_length_int not in (2, 3):
return list(path_hashes)
width = path_length_int * 2
normalized: List[Any] = []
changed = False
for item in path_hashes:
if isinstance(item, int):
if item < 0:
normalized.append(item)
continue
normalized.append(f"{item:0{width}X}")
changed = True
continue
normalized.append(item)
return normalized if changed else list(path_hashes)
def _coverage_request_url(base_url: str, api_key: str) -> str:
raw = (base_url or "").strip()
if not raw:
return raw
parts = urlsplit(raw)
hostname = (parts.hostname or "").lower()
path = parts.path or ""
is_meshmapper = hostname == "meshmapper.net"
if is_meshmapper:
if not path or path == "/":
path = "/coverage.php"
elif not path.endswith("/coverage.php") and not path.endswith("coverage.php"):
path = path.rstrip("/") + "/coverage.php"
query_items = dict(parse_qsl(parts.query, keep_blank_values=True))
if api_key and not query_items.get("key"):
query_items["key"] = api_key
return urlunsplit(
(
parts.scheme,
parts.netloc,
path,
urlencode(query_items),
parts.fragment,
)
)
if path.endswith("/get-samples") or path.endswith("get-samples"):
return raw
return f"{raw.rstrip('/')}/get-samples"
coverage_cache: Dict[str, Any] = {
"provider": None,
"data": None,
"fetched_at": 0.0,
"cooldown_until": 0.0,
"last_error": None,
"source": None,
}
def _is_meshmapper_coverage_url(base_url: str) -> bool:
parts = urlsplit((base_url or "").strip())
return (parts.hostname or "").lower() == "meshmapper.net"
def _normalize_coverage_response(data: Any) -> Tuple[List[Dict[str, Any]], str]:
if isinstance(data, dict):
if isinstance(data.get("grid_squares"), list):
if data.get("success") is False:
error = str(data.get("error") or "coverage_api_error")
message = str(data.get("message") or error)
raise HTTPException(status_code=502, detail=f"coverage_api_error: {error}: {message}")
return data["grid_squares"], "meshmapper"
keys = data.get("keys", [])
if isinstance(keys, list):
return keys, "legacy"
return [], "legacy"
if isinstance(data, list):
return data, "legacy"
return [], "legacy"
def _coverage_metadata_from_response(
data: Any,
provider: str,
) -> Dict[str, Any]:
meta: Dict[str, Any] = {"provider": provider}
if provider != "meshmapper" or not isinstance(data, dict):
return meta
region = data.get("region")
if isinstance(region, str) and region.strip():
meta["region"] = region.strip().upper()
generated_at = _parse_coverage_timestamp(data.get("generated_at"))
if generated_at is not None:
meta["generated_at"] = generated_at
return meta
def _coverage_cache_has_data() -> bool:
return isinstance(coverage_cache.get("data"), list)
def _parse_coverage_timestamp(value: Any) -> Optional[float]:
if value is None:
return None
if isinstance(value, (int, float)):
numeric = float(value)
elif isinstance(value, str):
text = value.strip()
if not text:
return None
try:
numeric = float(text)
except ValueError:
dt_value = None
normalized = text
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
try:
dt_value = datetime.fromisoformat(normalized)
except ValueError:
for fmt in (
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
"%d/%m/%Y %H:%M:%S",
"%m/%d/%Y %H:%M:%S",
):
try:
dt_value = datetime.strptime(text, fmt)
break
except ValueError:
continue
if dt_value is None:
return None
if dt_value.tzinfo is None:
dt_value = dt_value.replace(tzinfo=timezone.utc)
return dt_value.timestamp()
else:
return None
if numeric > 946684800000:
numeric /= 1000.0
if numeric <= 0:
return None
return numeric
def _coverage_item_timestamp(item: Any) -> Optional[float]:
if not isinstance(item, dict):
return None
for key in ("timestamp", "time"):
ts = _parse_coverage_timestamp(item.get(key))
if ts is not None:
return ts
observed = item.get("observed")
if isinstance(observed, dict):
for key in ("timestamp", "time"):
ts = _parse_coverage_timestamp(observed.get(key))
if ts is not None:
return ts
return None
def _filter_coverage_by_age(
data: List[Dict[str, Any]],
now: Optional[float] = None,
) -> List[Dict[str, Any]]:
try:
max_age_days = float(COVERAGE_MAX_AGE_DAYS)
except (TypeError, ValueError):
max_age_days = 30.0
if max_age_days <= 0:
return list(data)
now_value = float(now or time.time())
cutoff = now_value - (max_age_days * 86400.0)
filtered: List[Dict[str, Any]] = []
for item in data:
ts = _coverage_item_timestamp(item)
if ts is None or ts >= cutoff:
filtered.append(item)
return filtered
def _coverage_response_headers(provider: Optional[str] = None) -> Dict[str, str]:
headers: Dict[str, str] = {}
provider_value = str(provider or coverage_cache.get("provider") or "").strip().lower()
if provider_value:
headers["X-Coverage-Provider"] = provider_value
region_value = str(coverage_cache.get("region") or "").strip().upper()
if provider_value == "meshmapper" and region_value:
headers["X-Coverage-Region"] = region_value
return headers
def _update_coverage_cache(
provider: str,
data: List[Dict[str, Any]],
now: float,
source: str = "memory",
meta: Optional[Dict[str, Any]] = None,
) -> None:
coverage_cache["provider"] = provider
coverage_cache["data"] = list(data)
coverage_cache["fetched_at"] = now
coverage_cache["cooldown_until"] = 0.0
coverage_cache["last_error"] = None
coverage_cache["source"] = source
coverage_cache["region"] = (
str(meta.get("region")).strip().upper()
if isinstance(meta, dict) and meta.get("region")
else None
)
generated_at = meta.get("generated_at") if isinstance(meta, dict) else None
coverage_cache["generated_at"] = float(generated_at) if generated_at else None
def _apply_meshmapper_rate_limit_cooldown(
now: float,
response: Optional[httpx.Response] = None,
) -> int:
cooldown_seconds = max(1, int(COVERAGE_RATE_LIMIT_COOLDOWN_SECONDS))
if response is not None:
try:
data = response.json()
except Exception:
data = None
if isinstance(data, dict):
coverage_cache["last_error"] = data
resets_in_hours = data.get("resets_in_hours")
try:
resets_hours_value = float(resets_in_hours)
except (TypeError, ValueError):
resets_hours_value = None
if resets_hours_value and resets_hours_value > 0:
cooldown_seconds = max(1, int(math.ceil(resets_hours_value * 3600.0)))
coverage_cache["cooldown_until"] = now + cooldown_seconds
return cooldown_seconds
def _load_coverage_cache_file() -> bool:
path = (COVERAGE_CACHE_FILE or "").strip()
if not path or not os.path.exists(path):
return False
try:
with open(path, "r", encoding="utf-8") as f:
payload = json.load(f)
except Exception as exc:
print(f"[coverage] Failed to load cache file {path}: {exc}")
return False
if not isinstance(payload, dict):
return False
data = payload.get("data")
if not isinstance(data, list):
return False
fetched_at = payload.get("fetched_at") or 0.0
try:
fetched_at_value = float(fetched_at)
except (TypeError, ValueError):
fetched_at_value = 0.0
provider = str(payload.get("provider") or "meshmapper")
coverage_cache["provider"] = provider
coverage_cache["data"] = list(data)
coverage_cache["fetched_at"] = fetched_at_value
coverage_cache["cooldown_until"] = float(payload.get("cooldown_until") or 0.0)
coverage_cache["last_error"] = payload.get("last_error")
coverage_cache["source"] = "file"
region = payload.get("region")
coverage_cache["region"] = (
str(region).strip().upper() if isinstance(region, str) and region.strip() else None
)
generated_at = payload.get("generated_at")
try:
coverage_cache["generated_at"] = float(generated_at) if generated_at else None
except (TypeError, ValueError):
coverage_cache["generated_at"] = None
print(
f"[coverage] Loaded cached coverage file {path} entries={len(data)} fetched_at={int(fetched_at_value) if fetched_at_value > 0 else 0}"
)
return True
def _save_coverage_cache_file() -> None:
path = (COVERAGE_CACHE_FILE or "").strip()
if not path or not _coverage_cache_has_data():
return
payload = {
"provider": coverage_cache.get("provider") or "meshmapper",
"fetched_at": float(coverage_cache.get("fetched_at") or 0.0),
"cooldown_until": float(coverage_cache.get("cooldown_until") or 0.0),
"last_error": coverage_cache.get("last_error"),
"region": coverage_cache.get("region"),
"generated_at": coverage_cache.get("generated_at"),
"data": coverage_cache.get("data") or [],
}
try:
directory = os.path.dirname(path)
if directory:
os.makedirs(directory, exist_ok=True)
tmp_path = f"{path}.tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(payload, f, separators=(",", ":"))
os.replace(tmp_path, path)
except Exception as exc:
print(f"[coverage] Failed to save cache file {path}: {exc}")
async def _fetch_coverage_upstream() -> Tuple[List[Dict[str, Any]], str, Dict[str, Any]]:
url = _coverage_request_url(COVERAGE_API_URL, COVERAGE_API_KEY)
print(f"[coverage] Fetching from {url}")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
samples, provider = _normalize_coverage_response(data)
meta = _coverage_metadata_from_response(data, provider)
print(
f"[coverage] Received {len(samples) if isinstance(samples, list) else 'non-list'} items from coverage API provider={provider}"
)
if isinstance(samples, list) and len(samples) > 0:
print(
f"[coverage] Sample item keys: {list(samples[0].keys()) if samples[0] else 'N/A'}"
)
return samples, provider, meta
async def _fetch_coverage_upstream_for_test(
base_url: str,
api_key: str,
) -> Tuple[List[Dict[str, Any]], str]:
url = _coverage_request_url(base_url, api_key)
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
return _normalize_coverage_response(response.json())
async def _sync_meshmapper_coverage_once() -> bool:
now = time.time()
cooldown_until = float(coverage_cache.get("cooldown_until") or 0.0)
if cooldown_until > now:
remaining = int(max(1, math.ceil(cooldown_until - now)))
print(f"[coverage] MeshMapper sync cooldown active ({remaining}s remaining)")
return _coverage_cache_has_data()
try:
samples, provider, meta = await _fetch_coverage_upstream()
if provider != "meshmapper":
return False
_update_coverage_cache(
provider,
samples,
now,
source="meshmapper_sync",
meta=meta,
)
_save_coverage_cache_file()
return True
except httpx.TimeoutException:
coverage_cache["last_error"] = {"error": "coverage_api_timeout"}
print("[coverage] MeshMapper sync timeout")
return _coverage_cache_has_data()
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 429:
cooldown_seconds = _apply_meshmapper_rate_limit_cooldown(now, exc.response)
print(
f"[coverage] MeshMapper sync rate limited cooldown={cooldown_seconds}s"
)
return _coverage_cache_has_data()
coverage_cache["last_error"] = {
"error": "coverage_api_error",
"status_code": exc.response.status_code,
}
print(
f"[coverage] MeshMapper sync HTTP error status={exc.response.status_code}"
)
return _coverage_cache_has_data()
except httpx.HTTPError as exc:
coverage_cache["last_error"] = {"error": f"coverage_api_error: {exc}"}
print(f"[coverage] MeshMapper sync HTTP error: {exc}")
return _coverage_cache_has_data()
except Exception as exc:
coverage_cache["last_error"] = {"error": f"coverage_fetch_error: {exc}"}
print(f"[coverage] MeshMapper sync failed: {exc}")
return _coverage_cache_has_data()
async def _meshmapper_coverage_sync_loop() -> None:
if not _is_meshmapper_coverage_url(COVERAGE_API_URL):
return
_load_coverage_cache_file()
await _sync_meshmapper_coverage_once()
interval = max(60, int(COVERAGE_SYNC_INTERVAL_SECONDS))
while True:
await asyncio.sleep(interval)
await _sync_meshmapper_coverage_once()
@asynccontextmanager
async def _lifespan(_app: FastAPI):
global mqtt_client
print(f"[startup] meshmap version={APP_VERSION}")
load_map_boundary(force=True)
_load_state()
_load_route_history()
_load_neighbor_overrides()
if _is_meshmapper_coverage_url(COVERAGE_API_URL):
_load_coverage_cache_file()
_ensure_node_decoder()
_check_git_updates()
if BACKUP_ENABLED:
print(
f"[backup] enabled dir={BACKUP_DIR} interval={BACKUP_INTERVAL_SECONDS}s retention_days={BACKUP_RETENTION_DAYS}"
)
loop = asyncio.get_running_loop()
transport = "websockets" if MQTT_TRANSPORT == "websockets" else "tcp"
topics_str = ", ".join(MQTT_TOPICS)
print(
f"[mqtt] connecting host={MQTT_HOST} port={MQTT_PORT} tls={MQTT_TLS} transport={transport} ws_path={MQTT_WS_PATH if transport == 'websockets' else '-'} topics={topics_str}"
)
mqtt_client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2,
client_id=(MQTT_CLIENT_ID or None),
userdata={"loop": loop},
transport=transport,
)
if transport == "websockets":
mqtt_client.ws_set_options(path=MQTT_WS_PATH)
if MQTT_USERNAME:
mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
if MQTT_TLS:
if MQTT_CA_CERT:
mqtt_client.tls_set(ca_certs=MQTT_CA_CERT)
else:
mqtt_client.tls_set()
if MQTT_TLS_INSECURE:
mqtt_client.tls_insecure_set(True)
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_disconnect = mqtt_on_disconnect
mqtt_client.on_message = mqtt_on_message
mqtt_client.reconnect_delay_set(min_delay=1, max_delay=30)
mqtt_client.connect_async(MQTT_HOST, MQTT_PORT, keepalive=30)
mqtt_client.loop_start()
background_tasks.clear()
for coro in (
broadcaster(),
reaper(),
_state_saver(),
_route_history_saver(),
_git_check_loop(),
_meshmapper_coverage_sync_loop(),
_backup_loop(),
):
background_tasks.add(asyncio.create_task(coro))
try:
yield
finally:
for task in list(background_tasks):
task.cancel()
if background_tasks:
await asyncio.gather(*background_tasks, return_exceptions=True)
background_tasks.clear()
if mqtt_client is not None:
try:
mqtt_client.loop_stop()
mqtt_client.disconnect()
except Exception:
pass
mqtt_client = None
app = FastAPI(lifespan=_lifespan)
app.mount("/static", StaticFiles(directory="static"), name="static")
# Initialize Turnstile verifier if enabled
turnstile_verifier: Optional[TurnstileVerifier] = None
if TURNSTILE_ENABLED and TURNSTILE_SECRET_KEY:
turnstile_verifier = TurnstileVerifier(
secret_key=TURNSTILE_SECRET_KEY,
api_url=TURNSTILE_API_URL,
token_ttl_seconds=TURNSTILE_TOKEN_TTL_SECONDS,
)
print(f"[startup] Turnstile authentication enabled")
else:
print(f"[startup] Turnstile authentication disabled")
def _compute_asset_version() -> str:
paths = [
os.path.join(APP_DIR, "static", "app.js"),
os.path.join(APP_DIR, "static", "styles.css"),
os.path.join(APP_DIR, "static", "sw.js"),
]
mtimes = []
for path in paths:
try:
mtimes.append(int(os.path.getmtime(path)))
except Exception:
continue
return str(max(mtimes)) if mtimes else "1"
ASSET_VERSION = _compute_asset_version()
ROUTE_SNAPSHOT_MIN_TTL_SECONDS = 10.0
def _snapshot_routes(now: Optional[float] = None) -> List[Dict[str, Any]]:
current = time.time() if now is None else float(now)
min_expires_at = current + ROUTE_SNAPSHOT_MIN_TTL_SECONDS
return [
_route_payload(route)
for route in routes.values()
if float(route.get("expires_at") or 0.0) > min_expires_at
]
# =========================
# Helpers: coordinate hunting
# =========================
def _load_role_overrides() -> Dict[str, str]:
if not DEVICE_ROLES_FILE or not os.path.exists(DEVICE_ROLES_FILE):
return {}
try:
with open(DEVICE_ROLES_FILE, "r", encoding="utf-8") as handle:
data = json.load(handle)
except Exception:
return {}
if not isinstance(data, dict):
return {}
roles: Dict[str, str] = {}
for key, value in data.items():
if not isinstance(key, str) or not isinstance(value, str):
continue
role = _normalize_role(value)
if not role:
continue
roles[key.strip()] = role
return roles
def _load_coord_overrides() -> Dict[str, Dict[str, float]]:
if not DEVICE_COORDS_FILE or not os.path.exists(DEVICE_COORDS_FILE):
return {}
try:
with open(DEVICE_COORDS_FILE, "r", encoding="utf-8") as handle:
data = json.load(handle)
except Exception:
return {}
if not isinstance(data, dict):
return {}
coords: Dict[str, Dict[str, float]] = {}
for key, value in data.items():
if not isinstance(key, str) or not isinstance(value, dict):
continue
lat = value.get("lat")
lon = value.get("lon")
if isinstance(lat, (int, float)) and isinstance(lon, (int, float)):
coords[key.strip()] = {"lat": float(lat), "lon": float(lon)}
return coords
def _neighbor_override_pairs(data: Any) -> List[Tuple[str, str]]:
pairs: List[Tuple[str, str]] = []
if isinstance(data, dict):
for src, targets in data.items():
if not isinstance(src, str):
continue
src_id = src.strip()
if isinstance(targets, list):
for dst in targets:
if not isinstance(dst, str):
continue
dst_id = dst.strip()
if src_id and dst_id and src_id != dst_id:
pairs.append((src_id, dst_id))
elif isinstance(targets, str):
dst_id = targets.strip()
if src_id and dst_id and src_id != dst_id:
pairs.append((src_id, dst_id))
elif isinstance(data, list):
for item in data:
if (
isinstance(item, (list, tuple)) and len(item) >= 2 and
isinstance(item[0], str) and isinstance(item[1], str)
):
src_id = item[0].strip()
dst_id = item[1].strip()
elif isinstance(item, dict):
src = item.get("from") or item.get("src") or item.get("a")
dst = item.get("to") or item.get("dst") or item.get("b")
if not isinstance(src, str) or not isinstance(dst, str):
continue
src_id = src.strip()
dst_id = dst.strip()
else:
continue
if src_id and dst_id and src_id != dst_id:
pairs.append((src_id, dst_id))
return pairs
def _load_neighbor_overrides() -> None:
if not NEIGHBOR_OVERRIDES_FILE or not os.path.exists(NEIGHBOR_OVERRIDES_FILE):
return
try:
with open(NEIGHBOR_OVERRIDES_FILE, "r", encoding="utf-8") as handle:
data = json.load(handle)
except Exception as exc:
print(f"[neighbors] failed to load {NEIGHBOR_OVERRIDES_FILE}: {exc}")
return
now = time.time()
added = 0
for src_id, dst_id in _neighbor_override_pairs(data):
_touch_neighbor(src_id, dst_id, now, manual=True)
_touch_neighbor(dst_id, src_id, now, manual=True)
added += 1
if added:
print(
f"[neighbors] loaded {added} override pairs from {NEIGHBOR_OVERRIDES_FILE}"
)
def _touch_neighbor(
src_id: str,
dst_id: str,
ts: float,
manual: bool = False,
) -> None:
if not src_id or not dst_id or src_id == dst_id:
return
neighbors = neighbor_edges.setdefault(src_id, {})
entry = neighbors.get(dst_id)
if entry is None:
entry = {"count": 0, "last_seen": 0.0, "manual": False}
neighbors[dst_id] = entry
if manual:
entry["manual"] = True
else:
entry["count"] = int(entry.get("count", 0)) + 1
entry["last_seen"] = max(float(entry.get("last_seen", 0.0)), float(ts))
def _record_neighbors(point_ids: List[Optional[str]], ts: float) -> None:
if not point_ids or len(point_ids) < 2:
return
for idx in range(len(point_ids) - 1):
src_id = point_ids[idx]
dst_id = point_ids[idx + 1]
if not src_id or not dst_id or src_id == dst_id:
continue
_touch_neighbor(src_id, dst_id, ts, manual=False)
_touch_neighbor(dst_id, src_id, ts, manual=False)
def _update_path_timestamps(point_ids: List[Optional[str]], ts: float) -> None:
"""Update last_seen_in_path for all devices in a path."""
if not point_ids:
return
for device_id in point_ids:
if device_id:
state.last_seen_in_path[device_id] = max(
state.last_seen_in_path.get(device_id, 0.0), float(ts)
)
def _prune_neighbors(now: float) -> None:
ttl_seconds = PATH_TTL_SECONDS if PATH_TTL_SECONDS > 0 else DEVICE_TTL_WINDOW_SECONDS
if ttl_seconds <= 0 or not neighbor_edges:
return
cutoff = now - ttl_seconds
for src_id, edges in list(neighbor_edges.items()):
for dst_id, entry in list(edges.items()):
if entry.get("manual"):
continue
if entry.get("last_seen", 0.0) < cutoff:
edges.pop(dst_id, None)
if not edges:
neighbor_edges.pop(src_id, None)
def _serialize_state() -> Dict[str, Any]:
return {
"version": 1,
"saved_at": time.time(),
"devices": {
k: asdict(v)
for k, v in devices.items()
},
"trails": trails,
"seen_devices": seen_devices,
"device_names": device_names,
"device_roles": device_roles,
"device_role_sources": device_role_sources,
"last_seen_in_path": state.last_seen_in_path,
"first_seen_devices": first_seen_devices,
"last_seen_in_advert": last_seen_in_advert,
"peer_history_pairs": peer_history_pairs,
}
def _check_git_updates() -> None:
if not GIT_CHECK_ENABLED:
return
if not GIT_CHECK_PATH or not os.path.isdir(GIT_CHECK_PATH):
git_update_info["error"] = "git_path_missing"
return
def _run_git(args: List[str]) -> str:
result = subprocess.run(
args,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return result.stdout.strip()
try:
subprocess.run(
["git", "config", "--global", "--add", "safe.directory", GIT_CHECK_PATH],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
inside = _run_git(
["git", "-C", GIT_CHECK_PATH, "rev-parse", "--is-inside-work-tree"]
)
if inside.lower() != "true":
git_update_info["error"] = "not_git_repo"
return
except Exception:
git_update_info["error"] = "git_unavailable"
return
try:
if GIT_CHECK_FETCH:
subprocess.run(
["git", "-C", GIT_CHECK_PATH, "fetch", "--quiet", "--prune"],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
local_sha = _run_git(["git", "-C", GIT_CHECK_PATH, "rev-parse", "HEAD"])
remote_sha = _run_git(["git", "-C", GIT_CHECK_PATH, "rev-parse", "@{u}"])
git_update_info["local"] = local_sha
git_update_info["remote"] = remote_sha
git_update_info["local_short"] = local_sha[:7]
git_update_info["remote_short"] = remote_sha[:7]
git_update_info["available"] = local_sha != remote_sha
if git_update_info["available"]:
print(
f"[update] available {git_update_info['local_short']} -> {git_update_info['remote_short']}"
)
except Exception:
git_update_info["error"] = "git_compare_failed"
async def _git_check_loop() -> None:
if not GIT_CHECK_ENABLED:
return
if GIT_CHECK_INTERVAL_SECONDS <= 0:
return
while True:
await asyncio.sleep(GIT_CHECK_INTERVAL_SECONDS)
_check_git_updates()
def _device_payload(device_id: str, state: "DeviceState") -> Dict[str, Any]:
payload = asdict(state)
last_seen = seen_devices.get(device_id)
if last_seen:
payload["last_seen_ts"] = last_seen
else:
payload["last_seen_ts"] = payload.get("ts")
mqtt_seen_ts = mqtt_seen.get(device_id)
if mqtt_seen_ts:
payload["mqtt_seen_ts"] = mqtt_seen_ts
mqtt_source = mqtt_online_source.get(device_id)
if mqtt_source:
payload["mqtt_online_source"] = mqtt_source
mqtt_status_ts = mqtt_status_seen.get(device_id)
if mqtt_status_ts:
payload["mqtt_status_ts"] = mqtt_status_ts
mqtt_status_value = mqtt_status_values.get(device_id)
if mqtt_status_value:
payload["mqtt_status_value"] = mqtt_status_value
mqtt_internal_ts = mqtt_internal_seen.get(device_id)
if mqtt_internal_ts:
payload["mqtt_internal_ts"] = mqtt_internal_ts
mqtt_packets_ts = mqtt_packets_seen.get(device_id)
if mqtt_packets_ts:
payload["mqtt_packets_ts"] = mqtt_packets_ts
if MQTT_ONLINE_FORCE_NAMES_SET:
name_value = (state.name or device_names.get(device_id) or
"").strip().lower()
if name_value and name_value in MQTT_ONLINE_FORCE_NAMES_SET:
payload["mqtt_forced"] = True
if PROD_MODE:
payload.pop("raw_topic", None)
return payload
def _iso_from_ts(ts: Optional[float]) -> Optional[str]:
if ts is None:
return None
try:
return datetime.fromtimestamp(float(ts), tz=timezone.utc
).strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception:
return None
def _normalize_device_name_for_dedupe(name: Any) -> str:
if not isinstance(name, str):
return ""
return " ".join(name.strip().lower().split())
def _drop_device_state(device_id: str) -> None:
devices.pop(device_id, None)
trails.pop(device_id, None)
seen_devices.pop(device_id, None)
first_seen_devices.pop(device_id, None)
device_names.pop(device_id, None)
device_roles.pop(device_id, None)
device_role_sources.pop(device_id, None)
device_coords.pop(device_id, None)
state.last_seen_in_path.pop(device_id, None)
last_seen_in_advert.pop(device_id, None)
mqtt_seen.pop(device_id, None)
mqtt_online_source.pop(device_id, None)
mqtt_status_seen.pop(device_id, None)
mqtt_status_values.pop(device_id, None)
mqtt_internal_seen.pop(device_id, None)
mqtt_packets_seen.pop(device_id, None)
last_seen_broadcast.pop(device_id, None)
def _dedupe_loaded_devices() -> Set[str]:
groups: Dict[Tuple[str, int, int], List[str]] = {}
for device_id, dev_state in devices.items():
name = _normalize_device_name_for_dedupe(
dev_state.name or device_names.get(device_id)
)
if (not name or _coords_are_zero(dev_state.lat, dev_state.lon)):
continue
key = (
name,
int(round(float(dev_state.lat) * 100000)),
int(round(float(dev_state.lon) * 100000)),
)
groups.setdefault(key, []).append(device_id)
dropped_ids: Set[str] = set()
for duplicate_ids in groups.values():
if len(duplicate_ids) < 2:
continue
def _score(device_id: str) -> Tuple[float, float, str]:
last_seen = float(seen_devices.get(device_id) or devices[device_id].ts or 0.0)
first_seen = float(first_seen_devices.get(device_id) or last_seen or 0.0)
return (last_seen, -first_seen, device_id)
keep_id = max(duplicate_ids, key=_score)
for device_id in duplicate_ids:
if device_id == keep_id:
continue
_drop_device_state(device_id)
dropped_ids.add(device_id)
if dropped_ids:
state.state_dirty = True
print(
f"[state] Dropped {len(dropped_ids)} duplicate device entries: "
f"{', '.join(sorted(dropped_ids))}"
)
return dropped_ids
def _parse_meshcore_topic(
topic: str
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
parts = topic.split("/")
if len(parts) < 4 or parts[0] != "meshcore":
return (None, None, None)
iata = parts[1].strip() or None
node_id = parts[2].strip() or None
kind = parts[3].strip().lower() or None
return (iata, node_id, kind)
def _parse_json_dict(payload_bytes: bytes) -> Optional[Dict[str, Any]]:
try:
obj = json.loads(payload_bytes.decode("utf-8", errors="strict"))
except Exception:
return None
if isinstance(obj, dict):
return obj
return None
def _extract_status_value(obj: Optional[Dict[str, Any]]) -> Optional[str]:
if not isinstance(obj, dict):
return None
for key in (
"status",
"state",
"mqtt_status",
"mqttStatus",
"connection",
"connection_state",
):
value = obj.get(key)
if isinstance(value, str) and value.strip():
return value.strip().lower()
return None
def _select_mqtt_online_source(
device_id: str, now: float
) -> Tuple[Optional[str], Optional[float]]:
status_ts = mqtt_status_seen.get(device_id)
internal_ts = mqtt_internal_seen.get(device_id)
status_recent = (
bool(status_ts) and MQTT_ONLINE_STATUS_TTL_SECONDS > 0 and
(now - status_ts <= MQTT_ONLINE_STATUS_TTL_SECONDS)
)
internal_recent = (
bool(internal_ts) and MQTT_ONLINE_INTERNAL_TTL_SECONDS > 0 and
(now - internal_ts <= MQTT_ONLINE_INTERNAL_TTL_SECONDS)
)
status_value = (mqtt_status_values.get(device_id) or "").strip().lower()
if status_recent and status_value in MQTT_STATUS_OFFLINE_VALUES_SET:
return (None, None)
if internal_recent:
return ("internal", internal_ts)
if status_recent:
return ("status", status_ts)
return (None, None)
def _mqtt_presence_payload(
device_id: str,
last_seen_ts: Optional[float] = None,
now: Optional[float] = None,
) -> Dict[str, Any]:
now = now or time.time()
return {
"type": "device_seen",
"device_id": device_id,
"last_seen_ts": last_seen_ts,
"mqtt_seen_ts": mqtt_seen.get(device_id),
"mqtt_online_source": mqtt_online_source.get(device_id),
"mqtt_status_ts": mqtt_status_seen.get(device_id),
"mqtt_status_value": mqtt_status_values.get(device_id),
"mqtt_internal_ts": mqtt_internal_seen.get(device_id),
"mqtt_packets_ts": mqtt_packets_seen.get(device_id),
"mqtt_presence": _mqtt_presence_summary(now),
}
def _is_packets_active(ts: Optional[float], now: float) -> bool:
if not ts or MQTT_ACTIVITY_PACKETS_TTL_SECONDS <= 0:
return False
return (now - ts) <= MQTT_ACTIVITY_PACKETS_TTL_SECONDS
def _refresh_mqtt_presence(now: Optional[float] = None) -> None:
now = now or time.time()
candidate_ids = (
set(mqtt_seen.keys()) |
set(mqtt_online_source.keys()) |
set(mqtt_status_seen.keys()) |
set(mqtt_internal_seen.keys())
)
for device_id in list(candidate_ids):
source, source_ts = _select_mqtt_online_source(device_id, now)
if source and source_ts:
mqtt_seen[device_id] = source_ts
mqtt_online_source[device_id] = source
continue
mqtt_seen.pop(device_id, None)
mqtt_online_source.pop(device_id, None)
def _mqtt_presence_summary(now: Optional[float] = None) -> Dict[str, int]:
now = now or time.time()
_refresh_mqtt_presence(now)
connected_total = len(mqtt_seen)
connected_on_map = sum(1 for device_id in mqtt_seen if device_id in devices)
feeding_total = sum(
1 for ts in mqtt_packets_seen.values() if _is_packets_active(ts, now)
)
feeding_on_map = sum(
1
for device_id, ts in mqtt_packets_seen.items()
if device_id in devices and _is_packets_active(ts, now)
)
return {
"connected_total": connected_total,
"connected_on_map": connected_on_map,
"connected_off_map": max(0, connected_total - connected_on_map),
"feeding_total": feeding_total,
"feeding_on_map": feeding_on_map,
"feeding_off_map": max(0, feeding_total - feeding_on_map),
}
def _record_mqtt_presence(
topic: str, payload_bytes: bytes, now: float
) -> Optional[Dict[str, Any]]:
_, device_id, topic_kind = _parse_meshcore_topic(topic)
if not device_id or topic_kind not in ("status", "internal", "packets"):
return None
was_online = device_id in mqtt_seen
if topic_kind == "status":
mqtt_status_seen[device_id] = now
status_value = _extract_status_value(_parse_json_dict(payload_bytes))
if status_value:
mqtt_status_values[device_id] = status_value
elif topic_kind == "internal":
mqtt_internal_seen[device_id] = now
elif topic_kind == "packets":
mqtt_packets_seen[device_id] = now
source, source_ts = _select_mqtt_online_source(device_id, now)
if source and source_ts:
mqtt_seen[device_id] = source_ts
mqtt_online_source[device_id] = source
seen_devices[device_id] = now
else:
mqtt_seen.pop(device_id, None)
mqtt_online_source.pop(device_id, None)
event = _mqtt_presence_payload(
device_id, seen_devices.get(device_id) if source else None, now=now
)
event["topic_kind"] = topic_kind
if not was_online and source:
event["presence_transition"] = "online"
elif was_online and not source:
event["presence_transition"] = "offline"
else:
event["presence_transition"] = "stable"
return event
def _within_map_radius(lat: Any, lon: Any) -> bool:
return within_map_boundary(lat, lon)
def _evict_device(device_id: str) -> bool:
removed = False
if device_id in devices:
devices.pop(device_id, None)
removed = True
trails.pop(device_id, None)
state.last_seen_in_path.pop(device_id, None)
first_seen_devices.pop(device_id, None)
last_seen_in_advert.pop(device_id, None)
if removed:
state.state_dirty = True
_rebuild_node_hash_map()
return removed
def _device_role_code(value: Any) -> int:
if isinstance(value, int):
if value in (1, 2, 3):
return value
return 1
if isinstance(value, str):
trimmed = value.strip()
if trimmed.isdigit():
num = int(trimmed)
if num in (1, 2, 3):
return num
return 1
normalized = _normalize_role(trimmed)
if normalized == "repeater":
return 2
if normalized == "room":
return 3
if normalized == "companion":
return 1
return 1
def _parse_updated_since(value: Optional[str]) -> Optional[float]:
if not value:
return None
try:
text = value.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
return datetime.fromisoformat(text).timestamp()
except Exception:
return None
def _node_api_payload(device_id: str, state: "DeviceState") -> Dict[str, Any]:
last_seen = seen_devices.get(device_id) or state.ts
last_seen_iso = _iso_from_ts(last_seen)
role_value = state.role or device_roles.get(device_id)
device_role = _device_role_code(role_value)
return {
"public_key": device_id,
"name": (state.name or device_names.get(device_id) or ""),
"device_role": device_role,
"role": role_value,
"location": {
"latitude": float(state.lat),
"longitude": float(state.lon),
},
"lat": state.lat,
"lon": state.lon,
"last_seen_ts": last_seen,
"last_seen": last_seen_iso,
"timestamp": int(last_seen) if last_seen else None,
"first_seen": last_seen_iso,
"battery_voltage": 0,
}
def _route_payload(route: Dict[str, Any]) -> Dict[str, Any]:
if not PROD_MODE:
return route
return {
"id": route.get("id"),
"points": route.get("points"),
"hashes": route.get("hashes"),
"point_ids": route.get("point_ids"),
"route_mode": route.get("route_mode"),
"ts": route.get("ts"),
"expires_at": route.get("expires_at"),
"origin_id": route.get("origin_id"),
"receiver_id": route.get("receiver_id"),
"payload_type": route.get("payload_type"),
"message_hash": route.get("message_hash"),
"sender_name": route.get("sender_name"),
}
def _history_edge_payload(edge: Dict[str, Any]) -> Dict[str, Any]:
return {
"id":
edge.get("id"),
"a":
edge.get("a"),
"b":
edge.get("b"),
"count":
edge.get("count"),
"last_ts":
edge.get("last_ts"),
"recent":
edge.get("recent") if isinstance(edge.get("recent"), list) else [],
}
def _extract_token(headers: Dict[str, str]) -> Optional[str]:
auth = headers.get("authorization")
if auth:
parts = auth.strip().split()
if len(parts) == 2 and parts[0].lower() == "bearer":
return parts[1]
return auth.strip()
return headers.get("x-access-token") or headers.get("x-token")
def _extract_cookie_token(headers: Dict[str, str], key: str) -> Optional[str]:
cookie = headers.get("cookie")
if not cookie:
return None
for part in cookie.split(";"):
part = part.strip()
if not part:
continue
if part.startswith(f"{key}="):
return part[len(key) + 1 :]
return None
def _require_prod_token(request: Request) -> None:
if not PROD_MODE:
return
if not PROD_TOKEN:
raise HTTPException(status_code=503, detail="prod_token_not_set")
token = request.query_params.get("token"
) or request.query_params.get("access_token")
if not token:
token = _extract_token(request.headers)
if token != PROD_TOKEN:
raise HTTPException(status_code=401, detail="unauthorized")
app.include_router(create_weather_router(_require_prod_token))
def _ws_authorized(ws: WebSocket) -> bool:
if TURNSTILE_ENABLED and turnstile_verifier:
auth_token = _extract_cookie_token(ws.headers, "meshmap_auth") or \
ws.query_params.get("auth") or \
_extract_token(ws.headers)
if auth_token and turnstile_verifier.verify_auth_token(auth_token):
return True
if not PROD_MODE:
return True
if not PROD_TOKEN:
return False
token = ws.query_params.get("token") or ws.query_params.get("access_token")
if not token:
token = _extract_token(ws.headers)
return token == PROD_TOKEN
def _load_state() -> None:
try:
if not os.path.exists(STATE_FILE):
return
with open(STATE_FILE, "r", encoding="utf-8") as handle:
data = json.load(handle)
except Exception as exc:
print(f"[state] failed to load {STATE_FILE}: {exc}")
return
raw_devices = data.get("devices") or {}
loaded_devices: Dict[str, DeviceState] = {}
dropped_ids: Set[str] = set()
for key, value in raw_devices.items():
if not isinstance(value, dict):
continue
try:
loaded_state = DeviceState(**value)
except Exception:
continue
if _coords_are_zero(loaded_state.lat, loaded_state.lon
) or not _within_map_radius(loaded_state.lat, loaded_state.lon):
dropped_ids.add(str(key))
continue
loaded_devices[key] = loaded_state
devices.clear()
devices.update(loaded_devices)
trails.clear()
trails.update(data.get("trails") or {})
seen_devices.clear()
seen_devices.update(data.get("seen_devices") or {})
first_seen_devices.clear()
raw_first_seen = data.get("first_seen_devices") or {}
if isinstance(raw_first_seen, dict):
for key, value in raw_first_seen.items():
try:
first_seen_devices[str(key)] = float(value)
except (TypeError, ValueError):
continue
else:
raw_first_seen = {}
cleaned_trails: Dict[str, list] = {}
trails_dirty = False
for device_id, trail in trails.items():
if not isinstance(trail, list):
continue
filtered: list = []
for entry in trail:
if not isinstance(entry, (list, tuple)) or len(entry) < 2:
continue
lat = entry[0]
lon = entry[1]
try:
lat_val = float(lat)
lon_val = float(lon)
except (TypeError, ValueError):
continue
if _coords_are_zero(lat_val,
lon_val) or not _within_map_radius(lat_val, lon_val):
trails_dirty = True
continue
filtered.append(list(entry))
if filtered:
cleaned_trails[device_id] = filtered
else:
trails_dirty = True
trails.clear()
trails.update(cleaned_trails)
if TRAIL_LEN <= 0 and trails:
trails.clear()
trails_dirty = True
if dropped_ids:
for device_id in dropped_ids:
trails.pop(device_id, None)
seen_devices.pop(device_id, None)
trails_dirty = True
if trails_dirty:
state.state_dirty = True
raw_names = data.get("device_names") or {}
if isinstance(raw_names, dict):
device_names.clear()
device_names.update(
{
str(k): str(v)
for k, v in raw_names.items() if str(v).strip()
}
)
else:
device_names.clear()
if dropped_ids:
for device_id in dropped_ids:
device_names.pop(device_id, None)
raw_role_sources = data.get("device_role_sources") or {}
if isinstance(raw_role_sources, dict):
device_role_sources.clear()
device_role_sources.update(
{
str(k): str(v)
for k, v in raw_role_sources.items() if str(v).strip()
}
)
else:
device_role_sources.clear()
if dropped_ids:
for device_id in dropped_ids:
device_role_sources.pop(device_id, None)
raw_roles = data.get("device_roles") or {}
device_roles.clear()
if isinstance(raw_roles, dict):
for key, value in raw_roles.items():
if dropped_ids and str(key) in dropped_ids:
continue
role_value = str(value).strip() if isinstance(value, str) else ""
if not role_value:
continue
source = device_role_sources.get(str(key))
if source in ("explicit", "override"):
device_roles[str(key)] = role_value
role_overrides = _load_role_overrides()
if role_overrides:
for device_id in role_overrides:
device_role_sources[device_id] = "override"
device_roles.update(role_overrides)
if dropped_ids:
for device_id in dropped_ids:
device_roles.pop(device_id, None)
raw_path_timestamps = data.get("last_seen_in_path") or {}
state.last_seen_in_path.clear()
if isinstance(raw_path_timestamps, dict):
for key, value in raw_path_timestamps.items():
if dropped_ids and str(key) in dropped_ids:
continue
try:
state.last_seen_in_path[str(key)] = float(value)
except (TypeError, ValueError):
continue
raw_advert_seen = data.get("last_seen_in_advert") or {}
last_seen_in_advert.clear()
if isinstance(raw_advert_seen, dict):
for key, value in raw_advert_seen.items():
if dropped_ids and str(key) in dropped_ids:
continue
try:
last_seen_in_advert[str(key)] = float(value)
except (TypeError, ValueError):
continue
raw_peer_history = data.get("peer_history_pairs") or {}
peer_history_pairs.clear()
if isinstance(raw_peer_history, dict):
for pair_key, value in raw_peer_history.items():
if not isinstance(pair_key, str) or not isinstance(value, dict):
continue
a_id = value.get("a_id")
b_id = value.get("b_id")
buckets = value.get("buckets")
if not isinstance(a_id, str) or not a_id.strip():
continue
if not isinstance(b_id, str) or not b_id.strip():
continue
if not isinstance(buckets, dict):
continue
clean_buckets: Dict[str, int] = {}
for bucket_key, count in buckets.items():
try:
bucket_ts = int(float(bucket_key))
bucket_count = int(count)
except (TypeError, ValueError):
continue
if bucket_count <= 0:
continue
clean_buckets[str(bucket_ts)] = bucket_count
if not clean_buckets:
continue
last_ts = value.get("last_ts")
try:
last_ts_val = float(last_ts) if last_ts is not None else 0.0
except (TypeError, ValueError):
last_ts_val = 0.0
peer_history_pairs[pair_key] = {
"a_id": a_id.strip(),
"b_id": b_id.strip(),
"buckets": clean_buckets,
"last_ts": last_ts_val,
}
if _prune_peer_history():
state.state_dirty = True
# If first_seen data is missing, fall back to loaded last_seen values.
if not raw_first_seen:
for device_id, seen_ts in seen_devices.items():
try:
first_seen_devices[device_id] = float(seen_ts)
except (TypeError, ValueError):
continue
# Load and apply coordinate overrides
coord_overrides = _load_coord_overrides()
if coord_overrides:
device_coords.clear()
device_coords.update(coord_overrides)
if dropped_ids:
for device_id in dropped_ids:
device_coords.pop(device_id, None)
first_seen_devices.pop(device_id, None)
last_seen_in_advert.pop(device_id, None)
for device_id, dev_state in devices.items():
if not dev_state.name and device_id in device_names:
dev_state.name = device_names[device_id]
role_value = device_roles.get(device_id)
dev_state.role = role_value if role_value else None
# Apply coordinate overrides to loaded devices
coord_override = device_coords.get(device_id)
if coord_override:
dev_state.lat = coord_override["lat"]
dev_state.lon = coord_override["lon"]
dropped_ids.update(_dedupe_loaded_devices())
_rebuild_node_hash_map()
async def _state_saver() -> None:
while True:
if state.state_dirty:
try:
os.makedirs(STATE_DIR, exist_ok=True)
tmp_path = f"{STATE_FILE}.tmp"
with open(tmp_path, "w", encoding="utf-8") as handle:
json.dump(_serialize_state(), handle)
os.replace(tmp_path, STATE_FILE)
state.state_dirty = False
except Exception as exc:
print(f"[state] failed to save {STATE_FILE}: {exc}")
await asyncio.sleep(max(1.0, STATE_SAVE_INTERVAL))
def mqtt_on_connect(client, userdata, flags, reason_code, properties=None):
topics_str = ", ".join(MQTT_TOPICS)
print(
f"[mqtt] connected reason_code={reason_code} subscribing topics={topics_str}"
)
for topic in MQTT_TOPICS:
client.subscribe(topic, qos=0)
def mqtt_on_disconnect(
client, userdata, reason_code, properties=None, *args, **kwargs
):
print(f"[mqtt] disconnected reason_code={reason_code}")
def mqtt_on_message(client, userdata, msg: mqtt.MQTTMessage):
stats["received_total"] += 1
stats["last_rx_ts"] = time.time()
stats["last_rx_topic"] = msg.topic
topic_counts[msg.topic] = topic_counts.get(msg.topic, 0) + 1
loop: asyncio.AbstractEventLoop = userdata["loop"]
now = time.time()
mqtt_presence_event = _record_mqtt_presence(msg.topic, msg.payload, now)
if mqtt_presence_event:
device_id = mqtt_presence_event["device_id"]
should_broadcast = False
transition = mqtt_presence_event.get("presence_transition")
if transition in ("online", "offline"):
should_broadcast = True
elif mqtt_presence_event.get("mqtt_seen_ts") and device_id in devices:
last_sent = last_seen_broadcast.get(device_id, 0)
if now - last_sent >= MQTT_SEEN_BROADCAST_MIN_SECONDS:
should_broadcast = True
if should_broadcast:
last_seen_broadcast[device_id] = now
loop.call_soon_threadsafe(update_queue.put_nowait, mqtt_presence_event)
parsed, debug = _try_parse_payload(msg.topic, msg.payload)
device_id_hint = parsed.get("device_id") if parsed else None
# Also try to get device_id from topic if parsing failed or no device_id in parsed data
topic_device_id = _device_id_from_topic(msg.topic)
# Priority: decoded_pubkey (origin/repeater that sent packet) > parsed device_id > topic device_id (receiver)
decoded_pubkey = debug.get("decoded_pubkey")
if isinstance(decoded_pubkey, str) and decoded_pubkey.strip():
decoded_pubkey = decoded_pubkey.strip()
else:
decoded_pubkey = None
# Check if device has coordinate override - prioritize decoded packet public key (origin)
coord_override = None
matched_device_id = None
# Try 1: decoded_pubkey (origin/repeater that sent the packet) - this is what we want!
if decoded_pubkey and decoded_pubkey in device_coords:
coord_override = device_coords[decoded_pubkey]
matched_device_id = decoded_pubkey
# Try 2: device_id_hint from parsed data (should also be decoded_pubkey if available)
elif device_id_hint and device_id_hint in device_coords:
coord_override = device_coords[device_id_hint]
matched_device_id = device_id_hint
# Try 3: Check if decoded_pubkey matches any override via substring (for partial matches)
elif decoded_pubkey:
for override_id in device_coords.keys():
if override_id in decoded_pubkey or decoded_pubkey in override_id:
coord_override = device_coords[override_id]
matched_device_id = override_id
break
# Try 4: topic_device_id (receiver/observer) - only as fallback
if not coord_override and topic_device_id and topic_device_id in device_coords:
coord_override = device_coords[topic_device_id]
matched_device_id = topic_device_id
# Try 5: Check all parts of the topic path (receiver/observer)
if not coord_override:
topic_parts = msg.topic.split("/")
for part in topic_parts:
if part and len(part) > 10 and part in device_coords: # Only check parts that look like device IDs (long hex strings)
coord_override = device_coords[part]
matched_device_id = part
break
# Try 6: Check if any device_id in override file is a substring of any topic part (for partial matches)
if not coord_override:
for part in topic_parts:
if part and len(part) > 10:
# Check if any override key is contained in this topic part or vice versa
for override_id in device_coords.keys():
if override_id in part or part in override_id:
coord_override = device_coords[override_id]
matched_device_id = override_id
break
if coord_override:
break
has_coord_override = coord_override is not None
# Initialize check_lat and check_lon - will be set from override or parsed data
check_lat = None
check_lon = None
if has_coord_override and matched_device_id:
# Use override coordinates for filtering checks and inject into parsed data
check_lat = coord_override["lat"]
check_lon = coord_override["lon"]
# Normalize timestamp: if it's too far in the future (> 1 hour), use current time
now_ts = time.time()
parsed_ts = parsed.get("ts") if parsed else None
if parsed_ts and parsed_ts > now_ts + 3600: # More than 1 hour in future
parsed_ts = now_ts
if DEBUG_PAYLOAD:
print(f"[mqtt] Normalized future timestamp: device={matched_device_id} future_ts={parsed.get('ts')} -> now={now_ts}")
# If parsing failed or has no location, create/update parsed data with override coords
# Use decoded_pubkey as device_id if available (origin), otherwise use matched_device_id
target_device_id = decoded_pubkey or matched_device_id
if not parsed:
parsed = {
"device_id": target_device_id,
"lat": coord_override["lat"],
"lon": coord_override["lon"],
"ts": now_ts,
}
device_id_hint = target_device_id
debug["result"] = "coord_override_created"
if DEBUG_PAYLOAD:
print(f"[mqtt] Created parsed data from coord override: device_id={target_device_id} (matched_override={matched_device_id}) lat={coord_override['lat']} lon={coord_override['lon']}")
elif parsed:
# If parsing succeeded but no location, inject override coordinates
if not parsed.get("lat") or not parsed.get("lon") or _coords_are_zero(parsed.get("lat", 0), parsed.get("lon", 0)):
parsed["lat"] = coord_override["lat"]
parsed["lon"] = coord_override["lon"]
# Ensure device_id is set to the decoded_pubkey (origin) if available
if decoded_pubkey:
parsed["device_id"] = decoded_pubkey
device_id_hint = decoded_pubkey
elif not device_id_hint:
parsed["device_id"] = matched_device_id
device_id_hint = matched_device_id
debug["result"] = debug.get("result") or "coord_override_applied"
if DEBUG_PAYLOAD:
print(f"[mqtt] Applied coord override to parsed data: device_id={parsed.get('device_id')} (matched_override={matched_device_id}) lat={coord_override['lat']} lon={coord_override['lon']}")
# Always normalize timestamp if it's in the future
if parsed_ts:
parsed["ts"] = parsed_ts
# Set check_lat/check_lon from parsed data if not already set from override
if check_lat is None and check_lon is None:
check_lat = parsed.get("lat") if parsed else None
check_lon = parsed.get("lon") if parsed else None
# Don't filter 0,0 coordinates if device has a coordinate override
if parsed and _coords_are_zero(parsed.get("lat", 0), parsed.get("lon", 0)) and not has_coord_override:
debug["result"] = "filtered_zero_coords"
parsed = None
# Check radius using override coordinates if available
if parsed and check_lat is not None and check_lon is not None and not _within_map_radius(check_lat, check_lon):
debug["result"] = "filtered_radius"
if DEBUG_PAYLOAD:
device_id_for_log = matched_device_id or decoded_pubkey or device_id_hint or topic_device_id or parsed.get("device_id")
print(f"[mqtt] Filtered device by radius: device_id={device_id_for_log} lat={check_lat} lon={check_lon} (radius={MAP_RADIUS_KM}km)")
parsed = None
if matched_device_id or decoded_pubkey or device_id_hint:
remove_id = matched_device_id or decoded_pubkey or device_id_hint
loop.call_soon_threadsafe(
update_queue.put_nowait,
{
"type": "device_remove",
"device_id": remove_id,
"reason": "radius",
},
)
origin_id = debug.get("origin_id") or _device_id_from_topic(msg.topic)
decoder_meta = debug.get("decoder_meta") or {}
result = debug.get("result") or "unknown"
device_role = debug.get("device_role")
role_target_id = origin_id
if device_role and result.startswith("decoded"):
role_target_id = None
loc_meta = (
decoder_meta.get("location") if isinstance(decoder_meta, dict) else None
)
loc_pubkey = loc_meta.get("pubkey") if isinstance(loc_meta, dict) else None
if isinstance(loc_pubkey, str) and loc_pubkey.strip():
role_target_id = loc_pubkey
else:
decoded_pubkey = debug.get("decoded_pubkey")
if isinstance(decoded_pubkey, str) and decoded_pubkey.strip():
role_target_id = decoded_pubkey
debug_entry = {
"ts": time.time(),
"topic": msg.topic,
"result": debug.get("result"),
"found_path": debug.get("found_path"),
"found_hint": debug.get("found_hint"),
"decoder_meta": decoder_meta,
"role_target_id": role_target_id,
"packet_hash": debug.get("packet_hash"),
"direction": debug.get("direction"),
"json_keys": debug.get("json_keys"),
"parse_error": debug.get("parse_error"),
"origin_id": origin_id,
"payload_preview": _safe_preview(msg.payload[:DEBUG_PAYLOAD_MAX]),
}
debug_last.append(debug_entry)
if msg.topic.endswith("/status"):
status_last.append(
{
"ts": debug_entry["ts"],
"topic": msg.topic,
"device_name": debug.get("device_name"),
"device_role": debug.get("device_role"),
"origin_id": origin_id,
"json_keys": debug_entry.get("json_keys"),
"payload_preview": debug_entry["payload_preview"],
}
)
result_counts[result] = result_counts.get(result, 0) + 1
device_name = debug.get("device_name")
if device_name and origin_id:
existing_name = device_names.get(origin_id)
if existing_name != device_name:
device_names[origin_id] = device_name
state.state_dirty = True
device_state = devices.get(origin_id)
if device_state:
device_state.name = device_name
loop: asyncio.AbstractEventLoop = userdata["loop"]
loop.call_soon_threadsafe(
update_queue.put_nowait,
{
"type": "device_name",
"device_id": origin_id,
},
)
if device_role and role_target_id:
existing_role = device_roles.get(role_target_id)
if existing_role != device_role:
device_roles[role_target_id] = device_role
device_role_sources[role_target_id] = "explicit"
state.state_dirty = True
device_state = devices.get(role_target_id)
if device_state:
device_state.role = device_role
loop: asyncio.AbstractEventLoop = userdata["loop"]
loop.call_soon_threadsafe(
update_queue.put_nowait,
{
"type": "device_role",
"device_id": role_target_id,
},
)
path_hashes = decoder_meta.get("pathHashes")
payload_type = decoder_meta.get("payloadType")
route_type = decoder_meta.get("routeType")
sender_name = decoder_meta.get("senderName")
if not isinstance(sender_name, str) or not sender_name.strip():
sender_name = None
else:
sender_name = sender_name.strip()
message_hash = debug.get("packet_hash") or decoder_meta.get("messageHash")
snr_values = decoder_meta.get("snrValues")
path_header = decoder_meta.get("path")
path_length = decoder_meta.get("pathLength")
direction = debug.get("direction")
receiver_id = _device_id_from_topic(msg.topic)
route_origin_id = None
loc_meta = decoder_meta.get("location"
) if isinstance(decoder_meta, dict) else None
if isinstance(loc_meta, dict):
decoded_pubkey = loc_meta.get("pubkey")
if decoded_pubkey:
route_origin_id = decoded_pubkey
direction_value = str(direction or "").lower()
if message_hash:
cache = message_origins.get(message_hash)
if not cache:
cache = {
"origin_id": None,
"sender_name": None,
"first_rx": None,
"receivers": set(),
"ts": time.time(),
}
message_origins[message_hash] = cache
cache["ts"] = time.time()
if route_origin_id:
cache["origin_id"] = route_origin_id
if sender_name:
cache["sender_name"] = sender_name
if direction_value == "rx" and receiver_id:
cache["receivers"].add(receiver_id)
if not cache.get("first_rx"):
cache["first_rx"] = receiver_id
if not route_origin_id:
cached_origin_id = cache.get("origin_id")
if isinstance(cached_origin_id, str) and cached_origin_id.strip():
route_origin_id = cached_origin_id
if not sender_name:
cached_sender_name = cache.get("sender_name")
if isinstance(cached_sender_name, str) and cached_sender_name.strip():
sender_name = cached_sender_name.strip()
loop: asyncio.AbstractEventLoop = userdata["loop"]
try:
payload_type = int(payload_type) if payload_type is not None else None
except (TypeError, ValueError):
payload_type = None
try:
route_type = int(route_type) if route_type is not None else None
except (TypeError, ValueError):
route_type = None
if payload_type == 4:
advert_device_id = route_origin_id or origin_id or receiver_id
if advert_device_id:
last_seen_in_advert[advert_device_id] = time.time()
state.state_dirty = True
route_hashes = None
if path_hashes and isinstance(path_hashes, list):
route_hashes = path_hashes
elif payload_type not in (8, 9) and isinstance(path_header, list):
if route_type in (0, 1):
route_hashes = path_header
route_hashes = _normalize_route_hashes_for_path_length(route_hashes, path_length)
route_emitted = False
if route_hashes and payload_type in ROUTE_PAYLOAD_TYPES_SET:
if DEBUG_PAYLOAD:
origin_cache = message_origins.get(message_hash) if message_hash else None
print(
"[route-debug] "
f"hash={message_hash or '-'} "
f"payload={payload_type if payload_type is not None else '-'} "
f"dir={direction_value or '-'} "
f"topic_origin={origin_id or '-'} "
f"route_origin={route_origin_id or '-'} "
f"sender_name={sender_name or '-'} "
f"cached_origin={(origin_cache or {}).get('origin_id') or '-'} "
f"first_rx={(origin_cache or {}).get('first_rx') or '-'} "
f"receiver={receiver_id or '-'} "
f"path={route_hashes}"
)
loop.call_soon_threadsafe(
update_queue.put_nowait,
{
"type": "route",
"path_hashes": route_hashes,
"payload_type": payload_type,
"message_hash": message_hash,
"origin_id": route_origin_id,
"receiver_id": receiver_id,
"snr_values": snr_values,
"route_type": route_type,
"sender_name": sender_name,
"ts": time.time(),
"topic": msg.topic,
},
)
route_emitted = True
if not parsed:
stats["unparsed_total"] += 1
if DEBUG_PAYLOAD:
print(
f"[mqtt] UNPARSED result={result} topic={msg.topic} preview={debug_entry['payload_preview']!r}"
)
return
parsed["raw_topic"] = msg.topic
stats["parsed_total"] += 1
stats["last_parsed_ts"] = time.time()
stats["last_parsed_topic"] = msg.topic
if DEBUG_PAYLOAD:
print(
f"[mqtt] PARSED topic={msg.topic} device={parsed['device_id']} lat={parsed['lat']} lon={parsed['lon']}"
)
loop.call_soon_threadsafe(
update_queue.put_nowait, {
"type": "device",
"data": parsed
}
)
# =========================
# Broadcaster / Reaper
# =========================
async def broadcaster():
while True:
event = await update_queue.get()
if isinstance(event, dict) and event.get("type") in (
"device_name",
"device_role",
):
device_id = event.get("device_id")
device_state = devices.get(device_id)
if device_state:
if device_id in device_names:
device_state.name = device_names[device_id]
if device_id in device_roles:
device_state.role = device_roles[device_id]
payload = {
"type": "update",
"device": _device_payload(device_id, device_state),
"trail": trails.get(device_id, []),
}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
continue
if isinstance(event, dict) and event.get("type") == "device_seen":
device_id = event.get("device_id")
if not device_id:
continue
seen_ts = event.get("last_seen_ts")
if seen_ts:
seen_devices[device_id] = seen_ts
mqtt_ts_present = "mqtt_seen_ts" in event
if mqtt_ts_present:
mqtt_ts = event.get("mqtt_seen_ts")
if mqtt_ts:
mqtt_seen[device_id] = mqtt_ts
else:
mqtt_seen.pop(device_id, None)
if "mqtt_online_source" in event:
source = event.get("mqtt_online_source")
if source:
mqtt_online_source[device_id] = source
else:
mqtt_online_source.pop(device_id, None)
if "mqtt_status_ts" in event:
status_ts = event.get("mqtt_status_ts")
if status_ts:
mqtt_status_seen[device_id] = status_ts
if "mqtt_status_value" in event:
status_value = event.get("mqtt_status_value")
if isinstance(status_value, str) and status_value.strip():
mqtt_status_values[device_id] = status_value.strip().lower()
if "mqtt_internal_ts" in event:
internal_ts = event.get("mqtt_internal_ts")
if internal_ts:
mqtt_internal_seen[device_id] = internal_ts
if "mqtt_packets_ts" in event:
packets_ts = event.get("mqtt_packets_ts")
if packets_ts:
mqtt_packets_seen[device_id] = packets_ts
payload = _mqtt_presence_payload(device_id, seen_devices.get(device_id))
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
continue
if isinstance(event, dict) and event.get("type") == "device_remove":
device_id = event.get("device_id")
if device_id and _evict_device(device_id):
payload = {"type": "stale", "device_ids": [device_id]}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
continue
if isinstance(event, dict) and event.get("type") == "route":
route_mode = event.get("route_mode")
points = event.get("points")
used_hashes: List[str] = []
point_ids: List[Optional[str]] = []
if not points:
path_hashes = event.get("path_hashes") or []
points, used_hashes, point_ids = _route_points_from_hashes(
list(path_hashes),
event.get("origin_id"),
event.get("receiver_id"),
event.get("ts") or time.time(),
)
if not points and route_mode == "fanout":
points = _route_points_from_device_ids(
event.get("origin_id"), event.get("receiver_id")
)
if (
points and event.get("origin_id") and event.get("receiver_id") and
len(points) == 2
):
point_ids = [event.get("origin_id"), event.get("receiver_id")]
# Fallback: if path hashes are missing/unknown, draw a direct link when possible.
if not points:
points = _route_points_from_device_ids(
event.get("origin_id"), event.get("receiver_id")
)
if points:
route_mode = "direct"
if (
event.get("origin_id") and event.get("receiver_id") and
len(points) == 2
):
point_ids = [event.get("origin_id"), event.get("receiver_id")]
if not points:
continue
if MAP_RADIUS_KM > 0:
outside = any(
not _within_map_radius(point[0], point[1]) for point in points
if isinstance(point, (list, tuple)) and len(point) >= 2
)
if outside:
continue
route_id = event.get("route_id")
if not route_id:
message_hash = event.get("message_hash")
receiver_key = event.get("receiver_id") or "observer"
if message_hash:
# Keep one active line per (message, observer) so multi-observer
# receptions do not overwrite each other.
route_id = f"{message_hash}:{receiver_key}"
else:
route_id = (
f"{event.get('origin_id', 'route')}:{receiver_key}:"
f"{int((event.get('ts') or time.time()) * 1000)}"
)
expires_at = (event.get("ts") or time.time()) + ROUTE_TTL_SECONDS
route = {
"id": route_id,
"points": points,
"hashes": used_hashes,
"point_ids": point_ids,
"route_mode": route_mode or ("path" if used_hashes else "direct"),
"ts": event.get("ts") or time.time(),
"expires_at": expires_at,
"origin_id": event.get("origin_id"),
"receiver_id": event.get("receiver_id"),
"payload_type": event.get("payload_type"),
"message_hash": event.get("message_hash"),
"snr_values": event.get("snr_values"),
"sender_name": event.get("sender_name"),
"topic": event.get("topic"),
}
_append_heat_points(points, route["ts"], event.get("payload_type"))
routes[route_id] = route
if point_ids and used_hashes:
_record_neighbors(point_ids, route["ts"])
_update_path_timestamps(point_ids, route["ts"])
history_updates, history_removed = _record_route_history(route)
payload = {"type": "route", "route": _route_payload(route)}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
if history_updates or history_removed:
history_payload = {}
if history_updates:
history_payload["type"] = "history_edges"
history_payload["edges"] = [
_history_edge_payload(edge) for edge in history_updates
]
if history_removed:
history_payload_remove = {
"type": "history_edges_remove",
"edge_ids": history_removed,
}
else:
history_payload_remove = None
dead = []
for ws in list(clients):
try:
if history_updates:
await ws.send_text(json.dumps(history_payload))
if history_payload_remove:
await ws.send_text(json.dumps(history_payload_remove))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
continue
upd = (
event.get("data")
if isinstance(event, dict) and event.get("type") == "device" else event
)
device_id = upd["device_id"]
# Check if device has coordinate override before filtering by radius
coord_override = device_coords.get(device_id)
if coord_override:
# Use override coordinates for radius check
check_lat = coord_override["lat"]
check_lon = coord_override["lon"]
else:
check_lat = upd.get("lat")
check_lon = upd.get("lon")
if not _within_map_radius(check_lat, check_lon):
if _evict_device(device_id):
payload = {"type": "stale", "device_ids": [device_id]}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
continue
is_new_device = device_id not in devices
# Normalize timestamp: if it's too far in the future (> 1 hour), use current time
now_ts = time.time()
device_ts = upd.get("ts", now_ts)
if device_ts > now_ts + 3600: # More than 1 hour in future
if DEBUG_PAYLOAD:
print(f"[mqtt] Normalized future timestamp in device state: device={device_id} future_ts={device_ts} -> now={now_ts}")
device_ts = now_ts
device_state = DeviceState(
device_id=device_id,
lat=upd["lat"],
lon=upd["lon"],
ts=device_ts,
heading=upd.get("heading"),
speed=upd.get("speed"),
rssi=upd.get("rssi"),
snr=upd.get("snr"),
name=upd.get("name") or device_names.get(device_id),
role=upd.get("role") or device_roles.get(device_id),
raw_topic=upd.get("raw_topic"),
)
# Apply coordinate overrides
coord_override = device_coords.get(device_id)
if coord_override:
device_state.lat = coord_override["lat"]
device_state.lon = coord_override["lon"]
devices[device_id] = device_state
now_seen = time.time()
seen_devices[device_id] = now_seen
if device_id not in first_seen_devices:
first_seen_devices[device_id] = now_seen
state.state_dirty = True
if is_new_device:
_rebuild_node_hash_map()
if device_state.name:
device_names[device_id] = device_state.name
if device_state.role:
device_roles[device_id] = device_state.role
if TRAIL_LEN > 0 and not _coords_are_zero(
device_state.lat, device_state.lon
):
trails.setdefault(device_id, [])
trails[device_id].append(
[device_state.lat, device_state.lon, device_state.ts]
)
if len(trails[device_id]) > TRAIL_LEN:
trails[device_id] = trails[device_id][-TRAIL_LEN:]
elif device_id in trails:
trails.pop(device_id, None)
payload = {
"type": "update",
"device": _device_payload(device_id, device_state),
"trail": trails.get(device_id, []),
}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
async def reaper():
global mqtt_presence_last_summary
while True:
now = time.time()
_refresh_mqtt_presence(now)
if DEVICE_TTL_WINDOW_SECONDS > 0 or PATH_TTL_SECONDS > 0:
stale = []
for dev_id, st in list(devices.items()):
if dev_id in mqtt_seen:
continue
device_stale = (
DEVICE_TTL_WINDOW_SECONDS > 0 and (now - st.ts > DEVICE_TTL_WINDOW_SECONDS)
)
last_path_ts = state.last_seen_in_path.get(dev_id, 0.0)
path_stale = (
PATH_TTL_SECONDS > 0 and
(last_path_ts <= 0 or now - last_path_ts > PATH_TTL_SECONDS)
)
if DEVICE_TTL_WINDOW_SECONDS > 0 and PATH_TTL_SECONDS > 0:
should_stale = device_stale and path_stale
elif DEVICE_TTL_WINDOW_SECONDS > 0:
should_stale = device_stale
else:
should_stale = path_stale
if should_stale:
stale.append(dev_id)
if stale:
payload = {"type": "stale", "device_ids": stale}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
for dev_id in stale:
devices.pop(dev_id, None)
trails.pop(dev_id, None)
state.last_seen_in_path.pop(dev_id, None)
state.state_dirty = True
_rebuild_node_hash_map()
if routes:
bad_routes = []
for route_id, route in list(routes.items()):
points = route.get("points") if isinstance(route, dict) else None
if not isinstance(points, list):
continue
if any(
_coords_are_zero(p[0], p[1])
for p in points if isinstance(p, list) and len(p) >= 2
):
bad_routes.append(route_id)
if bad_routes:
payload = {"type": "route_remove", "route_ids": bad_routes}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
for route_id in bad_routes:
routes.pop(route_id, None)
stale_routes = [
route_id for route_id, route in list(routes.items())
if now > route.get("expires_at", 0)
]
if stale_routes:
payload = {"type": "route_remove", "route_ids": stale_routes}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
for route_id in stale_routes:
routes.pop(route_id, None)
history_updates, history_removed = _prune_route_history()
if history_updates or history_removed:
dead = []
for ws in list(clients):
try:
if history_updates:
await ws.send_text(
json.dumps({
"type": "history_edges",
"edges": history_updates
})
)
if history_removed:
await ws.send_text(
json.dumps(
{
"type": "history_edges_remove",
"edge_ids": history_removed,
}
)
)
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
if HEAT_TTL_SECONDS > 0 and heat_events:
cutoff = now - HEAT_TTL_SECONDS
heat_events[:] = [
entry for entry in heat_events if entry.get("ts", 0) >= cutoff
]
if message_origins:
for msg_hash, info in list(message_origins.items()):
if now - info.get("ts", 0) > MESSAGE_ORIGIN_TTL_SECONDS:
message_origins.pop(msg_hash, None)
_prune_neighbors(now)
presence_summary = _mqtt_presence_summary(now)
if presence_summary != mqtt_presence_last_summary:
mqtt_presence_last_summary = dict(presence_summary)
payload = {"type": "mqtt_presence", "mqtt_presence": presence_summary}
dead = []
for ws in list(clients):
try:
await ws.send_text(json.dumps(payload))
except Exception:
dead.append(ws)
for ws in dead:
clients.discard(ws)
retention_window = max(
DEVICE_TTL_WINDOW_SECONDS if DEVICE_TTL_WINDOW_SECONDS > 0 else 0,
PATH_TTL_SECONDS if PATH_TTL_SECONDS > 0 else 0,
)
prune_after = max(retention_window * 3, 900) if retention_window > 0 else 86400
for dev_id, last in list(seen_devices.items()):
if now - last > prune_after:
seen_devices.pop(dev_id, None)
first_seen_devices.pop(dev_id, None)
last_seen_in_advert.pop(dev_id, None)
state.last_seen_in_path.pop(dev_id, None)
if _prune_peer_history(now):
state.state_dirty = True
await asyncio.sleep(5)
# =========================
# Helpers: Turnstile auth
# =========================
TURNSTILE_BOT_TOKENS = [
token.strip().lower()
for token in TURNSTILE_BOT_ALLOWLIST.split(",")
if token and token.strip()
]
def _is_allowlisted_bot(request: Request) -> bool:
"""Return True when the request looks like a known embed bot."""
if not TURNSTILE_ENABLED or not TURNSTILE_BOT_BYPASS:
return False
user_agent = (request.headers.get("user-agent") or "").lower()
if not user_agent:
return False
for token in TURNSTILE_BOT_TOKENS:
if token in user_agent:
return True
return False
def _check_turnstile_auth(request: Request) -> bool:
"""Check if user has valid Turnstile auth token."""
if not TURNSTILE_ENABLED or not turnstile_verifier:
return True
# Allowlist common embed bots (Discord, Slack, etc.)
if _is_allowlisted_bot(request):
ua = request.headers.get("user-agent", "-")
print(f"[turnstile] bot bypass user-agent={ua}")
return True
# Check for auth token in cookies or headers
auth_token = request.cookies.get("meshmap_auth") or \
request.headers.get("Authorization", "").replace("Bearer ", "")
if auth_token and turnstile_verifier.verify_auth_token(auth_token):
return True
return False
# =========================
# FastAPI routes
# =========================
@app.get("/")
def root(request: Request):
# If Turnstile is enabled and user isn't authenticated, serve landing page
if TURNSTILE_ENABLED and not _check_turnstile_auth(request):
landing_path = os.path.join(APP_DIR, "static", "landing.html")
try:
with open(landing_path, "r", encoding="utf-8") as handle:
content = handle.read()
except Exception:
return FileResponse("static/landing.html")
# Replace template variables in landing page
replacements = {
"SITE_TITLE": SITE_TITLE,
"SITE_DESCRIPTION": SITE_DESCRIPTION,
"SITE_ICON": SITE_ICON,
"TURNSTILE_SITE_KEY": TURNSTILE_SITE_KEY,
"ASSET_VERSION": ASSET_VERSION,
}
for key, value in replacements.items():
safe_value = html.escape(str(value), quote=True)
content = content.replace(f"{{{{{key}}}}}", safe_value)
return HTMLResponse(content)
# Serve map page
html_path = os.path.join(APP_DIR, "static", "index.html")
try:
with open(html_path, "r", encoding="utf-8") as handle:
content = handle.read()
except Exception:
return FileResponse("static/index.html")
# Check for lat/lon parameters for dynamic preview image
query_params = request.query_params
lat_param = query_params.get("lat") or query_params.get("latitude")
lon_param = (
query_params.get("lon") or query_params.get("lng") or
query_params.get("long") or query_params.get("longitude")
)
zoom_param = query_params.get("zoom")
og_image_tag = ""
twitter_image_tag = ""
og_url = SITE_URL
# Generate dynamic preview image if coordinates are provided
if lat_param and lon_param:
try:
lat = float(lat_param)
lon = float(lon_param)
zoom = int(zoom_param) if zoom_param and zoom_param.isdigit() else 13
zoom = max(1, min(18, zoom)) # Clamp zoom between 1-18
# Generate preview image URL pointing to our own server
# Use absolute URL for better compatibility with Discord and other platforms
base_url = str(request.url).split("?")[0].rstrip("/")
preview_params = urlencode(
{
"lat": lat,
"lon": lon,
"zoom": zoom,
"marker": "blue",
"theme": "dark",
}
)
preview_url = f"{base_url}/preview.png?{preview_params}"
# Ensure absolute URL (use SITE_URL if available, otherwise construct from request)
if SITE_URL and SITE_URL.startswith("http"):
site_base = SITE_URL.rstrip("/")
preview_url = f"{site_base}/preview.png?{preview_params}"
elif not preview_url.startswith("http"):
# Fallback: construct from request
scheme = request.url.scheme
host = request.headers.get("host", request.url.hostname or "localhost")
preview_url = f"{scheme}://{host}/preview.png?{preview_params}"
safe_image = html.escape(preview_url, quote=True)
# Add image dimensions for better Discord/social media compatibility
# Note: Preview image may fail if container can't reach external services
# In that case, fall back to static SITE_OG_IMAGE if available
og_image_tag = (
f'<meta property="og:image" content="{safe_image}" />\n'
f' <meta property="og:image:width" content="1200" />\n'
f' <meta property="og:image:height" content="630" />\n'
f' <meta property="og:image:type" content="image/png" />'
)
twitter_image_tag = f'<meta name="twitter:image" content="{safe_image}" />'
# If static image is configured, add it as a fallback
if SITE_OG_IMAGE:
safe_static_image = html.escape(str(SITE_OG_IMAGE), quote=True)
og_image_tag += f'\n <meta property="og:image:secure_url" content="{safe_static_image}" />'
# Update og:url to include query parameters
base_url = str(request.url).split("?")[0]
og_url = f"{base_url}?lat={lat}&lon={lon}"
if zoom_param:
og_url += f"&zoom={zoom}"
except (ValueError, TypeError):
# Invalid coordinates, fall back to static image
if SITE_OG_IMAGE:
safe_image = html.escape(str(SITE_OG_IMAGE), quote=True)
og_image_tag = f'<meta property="og:image" content="{safe_image}" />'
twitter_image_tag = (
f'<meta name="twitter:image" content="{safe_image}" />'
)
elif SITE_OG_IMAGE:
safe_image = html.escape(str(SITE_OG_IMAGE), quote=True)
og_image_tag = f'<meta property="og:image" content="{safe_image}" />'
twitter_image_tag = f'<meta name="twitter:image" content="{safe_image}" />'
content = content.replace("{{OG_IMAGE_TAG}}", og_image_tag)
content = content.replace("{{TWITTER_IMAGE_TAG}}", twitter_image_tag)
trail_info_suffix = ""
if TRAIL_LEN > 0:
trail_info_suffix = f" Trails show last ~{TRAIL_LEN} points."
boundary_json = json.dumps(get_map_boundary_points()).replace("</", "<\\/")
# Escape og_url for HTML
SAFE_OG_URL = html.escape(str(og_url), quote=True)
content = content.replace("{{MAP_BOUNDARY_JSON}}", boundary_json)
replacements = {
"SITE_TITLE":
SITE_TITLE,
"SITE_DESCRIPTION":
SITE_DESCRIPTION,
"SITE_URL":
SAFE_OG_URL,
"SITE_ICON":
SITE_ICON,
"SITE_FEED_NOTE":
SITE_FEED_NOTE,
"CUSTOM_LINK_URL":
CUSTOM_LINK_URL,
"PACKET_ANALYZER_URL":
PACKET_ANALYZER_URL,
"QR_CODE_BUTTON_ENABLED":
str(QR_CODE_BUTTON_ENABLED).lower(),
"APP_VERSION":
APP_VERSION,
"ASSET_VERSION":
ASSET_VERSION,
"DISTANCE_UNITS":
DISTANCE_UNITS,
"NODE_MARKER_RADIUS":
NODE_MARKER_RADIUS,
"HISTORY_LINK_SCALE":
HISTORY_LINK_SCALE,
"TRAIL_INFO_SUFFIX":
trail_info_suffix,
"PROD_MODE":
str(PROD_MODE).lower(),
"PROD_TOKEN":
PROD_TOKEN,
"MAP_START_LAT":
MAP_START_LAT,
"MAP_START_LON":
MAP_START_LON,
"MAP_START_ZOOM":
MAP_START_ZOOM,
"MAP_RADIUS_KM":
MAP_RADIUS_KM,
"MAP_RADIUS_SHOW":
str(MAP_RADIUS_SHOW).lower(),
"MAP_BOUNDARY_MODE":
MAP_BOUNDARY_MODE,
"MAP_BOUNDARY_SHOW":
str(MAP_BOUNDARY_SHOW).lower(),
"MAP_BOUNDARY_NAME":
get_map_boundary_name(),
"MAP_DEFAULT_LAYER":
MAP_DEFAULT_LAYER,
"LOS_ELEVATION_URL":
LOS_ELEVATION_URL,
"LOS_ELEVATION_PROXY_URL":
LOS_ELEVATION_PROXY_URL,
"LOS_SAMPLE_MIN":
LOS_SAMPLE_MIN,
"LOS_SAMPLE_MAX":
LOS_SAMPLE_MAX,
"LOS_SAMPLE_STEP_METERS":
LOS_SAMPLE_STEP_METERS,
"LOS_CURVATURE_ENABLED":
str(LOS_CURVATURE_ENABLED).lower(),
"LOS_CURVATURE_FACTOR":
LOS_CURVATURE_FACTOR,
"LOS_PEAKS_MAX":
LOS_PEAKS_MAX,
"MQTT_ONLINE_SECONDS":
MQTT_ONLINE_SECONDS,
"MQTT_ONLINE_STATUS_TTL_SECONDS":
MQTT_ONLINE_STATUS_TTL_SECONDS,
"MQTT_ONLINE_INTERNAL_TTL_SECONDS":
MQTT_ONLINE_INTERNAL_TTL_SECONDS,
"MQTT_ACTIVITY_PACKETS_TTL_SECONDS":
MQTT_ACTIVITY_PACKETS_TTL_SECONDS,
"COVERAGE_API_URL":
COVERAGE_API_URL,
"WEATHER_RADAR_ENABLED":
str(WEATHER_RADAR_ENABLED).lower(),
"WEATHER_RADAR_COUNTRY_BOUNDS_ENABLED":
str(WEATHER_RADAR_COUNTRY_BOUNDS_ENABLED).lower(),
"WEATHER_RADAR_COUNTRY_LOOKUP_URL":
WEATHER_RADAR_COUNTRY_LOOKUP_URL,
"WEATHER_WIND_ENABLED":
str(WEATHER_WIND_ENABLED).lower(),
"WEATHER_WIND_API_URL":
WEATHER_WIND_API_URL,
"WEATHER_WIND_GRID_SIZE":
WEATHER_WIND_GRID_SIZE,
"WEATHER_WIND_REFRESH_SECONDS":
WEATHER_WIND_REFRESH_SECONDS,
"UPDATE_AVAILABLE":
str(bool(git_update_info.get("available"))).lower(),
"UPDATE_LOCAL":
git_update_info.get("local_short") or "",
"UPDATE_REMOTE":
git_update_info.get("remote_short") or "",
"UPDATE_BANNER_HIDDEN":
"" if git_update_info.get("available") else "hidden",
"TURNSTILE_ENABLED":
str(TURNSTILE_ENABLED).lower(),
"TURNSTILE_SITE_KEY":
TURNSTILE_SITE_KEY,
}
for key, value in replacements.items():
safe_value = html.escape(str(value), quote=True)
content = content.replace(f"{{{{{key}}}}}", safe_value)
return HTMLResponse(content)
@app.get("/preview.png")
async def preview_image(
lat: Optional[float] = Query(None, alias="lat"),
lon: Optional[float] = Query(None, alias="lon"),
zoom: Optional[int] = Query(13, alias="zoom"),
marker: Optional[str] = Query("blue", alias="marker"),
theme: Optional[str] = Query("dark", alias="theme"),
):
"""
Generate a preview image of the map with a pin marker at the specified coordinates.
Returns a PNG image suitable for Open Graph/Twitter card previews.
Marker options:
- red-pin, blue-pin, green-pin, yellow-pin, orange-pin, purple-pin, black-pin, white-pin
- red, blue, green, yellow, orange, purple, black, white (simple circle markers)
- Custom format: color-pin or color (e.g., "blue-pin", "green")
"""
if lat is None or lon is None:
# Return a default/error image if coordinates not provided
return Response(content=b"", status_code=400, media_type="image/png")
try:
zoom_val = max(1, min(18, int(zoom) if zoom else 14))
# Image dimensions for social media previews (Open Graph standard)
width = 1200
height = 630
# Validate and sanitize marker option
marker_str = str(marker).lower().strip() if marker else "blue"
if not marker_str or marker_str == "none":
marker_str = "blue"
# Validate theme option (light or dark)
theme_str = str(theme).lower().strip() if theme else "dark"
if theme_str not in ("light", "dark"):
theme_str = "dark"
# Generate map image server-side using OSM tiles
try:
# Convert lat/lon to tile coordinates
def deg2num(lat_deg, lon_deg, zoom_level):
lat_rad = math.radians(lat_deg)
n = 2.0**zoom_level
xtile = int((lon_deg + 180.0) / 360.0 * n)
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return (xtile, ytile)
def num2deg(xtile, ytile, zoom_level):
n = 2.0**zoom_level
lon_deg = xtile / n * 360.0 - 180.0
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * ytile / n)))
lat_deg = math.degrees(lat_rad)
return (lat_deg, lon_deg)
# Calculate which tiles we need
center_tile_x, center_tile_y = deg2num(lat, lon, zoom_val)
tile_size = 256
tiles_x = math.ceil(width / tile_size) + 2
tiles_y = math.ceil(height / tile_size) + 2
# Calculate pixel position of center point within its tile
# Get the northwest corner of the center tile
nw_lat, nw_lon = num2deg(center_tile_x, center_tile_y, zoom_val)
# Get the southeast corner of the center tile
se_lat, se_lon = num2deg(center_tile_x + 1, center_tile_y + 1, zoom_val)
# Calculate pixel offset within the center tile
center_tile_pixel_x = int((lon - nw_lon) / (se_lon - nw_lon) * tile_size)
center_tile_pixel_y = int((nw_lat - lat) / (nw_lat - se_lat) * tile_size)
# Calculate which tiles to fetch
start_tile_x = center_tile_x - tiles_x // 2
start_tile_y = center_tile_y - tiles_y // 2
# Create blank image with theme-appropriate background
bg_color = (
(18, 18, 18) if theme_str == "dark" else (242, 239, 233)
) # Dark or light background
final_image = Image.new("RGB", (width, height), bg_color)
# Fetch and composite tiles
tiles_fetched = 0
tiles_failed = 0
async with httpx.AsyncClient(timeout=10.0, verify=False) as client:
for ty in range(tiles_y):
for tx in range(tiles_x):
tile_x = start_tile_x + tx
tile_y = start_tile_y + ty
# Use theme-appropriate tile server
if theme_str == "dark":
# CartoDB Dark Matter tiles
tile_url = f"https://a.basemaps.cartocdn.com/dark_all/{zoom_val}/{tile_x}/{tile_y}.png"
else:
# Standard OSM light tiles
tile_url = f"https://tile.openstreetmap.org/{zoom_val}/{tile_x}/{tile_y}.png"
try:
response = await client.get(tile_url)
if response.status_code == 200:
tile_img = Image.open(BytesIO(response.content))
# Calculate position: center the marker at the center of the image
# The center tile should place the marker at the center pixel position
x_offset = (
(tx - tiles_x // 2) * tile_size + width // 2 -
center_tile_pixel_x
)
y_offset = (
(ty - tiles_y // 2) * tile_size + height // 2 -
center_tile_pixel_y
)
final_image.paste(
tile_img,
(x_offset, y_offset),
tile_img if tile_img.mode == "RGBA" else None,
)
tiles_fetched += 1
else:
tiles_failed += 1
print(
f"[preview] Tile {tile_x}/{tile_y} returned status {response.status_code}"
)
except Exception as tile_error:
tiles_failed += 1
print(
f"[preview] Failed to fetch tile {tile_x}/{tile_y} from {tile_url}: {tile_error}"
)
continue
print(f"[preview] Fetched {tiles_fetched} tiles, {tiles_failed} failed")
# Draw current devices (all in-bounds, no cap)
def latlon_to_global_px(lat_deg: float, lon_deg: float,
zoom_level: int) -> Tuple[float, float]:
lat_rad = math.radians(lat_deg)
n = 2.0**zoom_level
x_px = (lon_deg + 180.0) / 360.0 * n * tile_size
y_px = (
(1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n * tile_size
)
return (x_px, y_px)
draw = ImageDraw.Draw(final_image)
center_px_x, center_px_y = latlon_to_global_px(lat, lon, zoom_val)
node_radius = 5
node_color = (86, 198, 255) if theme_str == "dark" else (25, 83, 170)
node_outline = (15, 15, 15) if theme_str == "dark" else (255, 255, 255)
for state in list(devices.values()):
try:
dev_lat = float(state.lat)
dev_lon = float(state.lon)
except Exception:
continue
if _coords_are_zero(dev_lat, dev_lon
) or not _within_map_radius(dev_lat, dev_lon):
continue
dev_px_x, dev_px_y = latlon_to_global_px(dev_lat, dev_lon, zoom_val)
img_x = width / 2 + (dev_px_x - center_px_x)
img_y = height / 2 + (dev_px_y - center_px_y)
if (
img_x < -node_radius or img_x > width + node_radius or
img_y < -node_radius or img_y > height + node_radius
):
continue
draw.ellipse(
[
(img_x - node_radius, img_y - node_radius),
(img_x + node_radius, img_y + node_radius),
],
fill=node_color,
outline=node_outline,
width=2,
)
# Draw marker
marker_color_map = {
"red": (220, 53, 69),
"blue": (0, 123, 255),
"green": (40, 167, 69),
"yellow": (255, 193, 7),
"orange": (255, 152, 0),
"purple": (108, 117, 125),
"black": (0, 0, 0),
"white": (255, 255, 255),
}
marker_color = marker_color_map.get(
marker_str, (0, 123, 255)
) # Default to blue
# Calculate marker position (center of image)
marker_x = width // 2
marker_y = height // 2
# Draw a circle marker
marker_radius = 12
draw.ellipse(
[
(marker_x - marker_radius, marker_y - marker_radius),
(marker_x + marker_radius, marker_y + marker_radius),
],
fill=marker_color,
outline=(255, 255, 255),
width=2,
)
# Convert to PNG bytes
img_bytes = BytesIO()
final_image.save(img_bytes, format="PNG")
img_bytes.seek(0)
return Response(
content=img_bytes.getvalue(),
media_type="image/png",
headers={
"Cache-Control": "public, max-age=3600", # Cache for 1 hour
},
)
except Exception as e:
print(f"[preview] Error generating map image: {e}")
import traceback
traceback.print_exc()
# Even if tile fetching fails, try to return a simple map with marker
try:
bg_color = (18, 18, 18) if theme_str == "dark" else (242, 239, 233)
fallback_image = Image.new("RGB", (width, height), bg_color)
draw = ImageDraw.Draw(fallback_image)
# Draw marker
marker_color_map = {
"red": (220, 53, 69),
"blue": (0, 123, 255),
"green": (40, 167, 69),
"yellow": (255, 193, 7),
"orange": (255, 152, 0),
"purple": (108, 117, 125),
"black": (0, 0, 0),
"white": (255, 255, 255),
}
marker_color = marker_color_map.get(marker_str, (0, 123, 255))
marker_x = width // 2
marker_y = height // 2
marker_radius = 12
draw.ellipse(
[
(marker_x - marker_radius, marker_y - marker_radius),
(marker_x + marker_radius, marker_y + marker_radius),
],
fill=marker_color,
outline=(255, 255, 255),
width=2,
)
img_bytes = BytesIO()
fallback_image.save(img_bytes, format="PNG")
img_bytes.seek(0)
print(
f"[preview] Returning fallback image with marker (tile fetch failed)"
)
return Response(
content=img_bytes.getvalue(),
media_type="image/png",
headers={"Cache-Control": "public, max-age=300"},
)
except Exception as fallback_error:
print(
f"[preview] Fallback image generation also failed: {fallback_error}"
)
# Only redirect to static image if even fallback fails
if SITE_OG_IMAGE and SITE_OG_IMAGE.startswith("http"):
from fastapi.responses import RedirectResponse
print(
f"[preview] All image generation failed, redirecting to static OG image: {SITE_OG_IMAGE}"
)
return RedirectResponse(url=SITE_OG_IMAGE, status_code=302)
# Return transparent PNG as last resort
transparent_png = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\nIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01\r\n-\xdb\x00\x00\x00\x00IEND\xaeB`\x82"
return Response(
content=transparent_png,
media_type="image/png",
headers={"Cache-Control": "public, max-age=300"},
)
except Exception as e:
# Log error for debugging
print(f"[preview] Error generating preview image: {e}")
import traceback
traceback.print_exc()
# Return empty image on error
return Response(content=b"", status_code=500, media_type="image/png")
@app.get("/map")
def map_page(request: Request):
"""Serve the map page (with Turnstile auth check)."""
# If Turnstile is enabled and user isn't authenticated, redirect to landing
if TURNSTILE_ENABLED and not _check_turnstile_auth(request):
print("[map] Unauthenticated user accessing /map, redirecting to /")
return HTMLResponse(
"<script>window.location.href = '/';</script>",
status_code=303,
)
# Otherwise serve the map page
html_path = os.path.join(APP_DIR, "static", "index.html")
try:
with open(html_path, "r", encoding="utf-8") as handle:
content = handle.read()
except Exception:
return FileResponse("static/index.html")
# Include all the template replacements (same as root endpoint)
# Generate OG image tags
og_image_tag = ""
twitter_image_tag = ""
if SITE_OG_IMAGE:
safe_image = html.escape(str(SITE_OG_IMAGE), quote=True)
og_image_tag = f'<meta property="og:image" content="{safe_image}" />'
twitter_image_tag = f'<meta name="twitter:image" content="{safe_image}" />'
content = content.replace("{{OG_IMAGE_TAG}}", og_image_tag)
content = content.replace("{{TWITTER_IMAGE_TAG}}", twitter_image_tag)
trail_info_suffix = ""
if TRAIL_LEN > 0:
trail_info_suffix = f" Trails show last ~{TRAIL_LEN} points."
boundary_json = json.dumps(get_map_boundary_points()).replace("</", "<\\/")
SAFE_OG_URL = html.escape(SITE_URL, quote=True)
content = content.replace("{{MAP_BOUNDARY_JSON}}", boundary_json)
replacements = {
"SITE_TITLE": SITE_TITLE,
"SITE_DESCRIPTION": SITE_DESCRIPTION,
"SITE_URL": SAFE_OG_URL,
"SITE_ICON": SITE_ICON,
"SITE_FEED_NOTE": SITE_FEED_NOTE,
"CUSTOM_LINK_URL": CUSTOM_LINK_URL,
"PACKET_ANALYZER_URL": PACKET_ANALYZER_URL,
"QR_CODE_BUTTON_ENABLED": str(QR_CODE_BUTTON_ENABLED).lower(),
"APP_VERSION": APP_VERSION,
"ASSET_VERSION": ASSET_VERSION,
"DISTANCE_UNITS": DISTANCE_UNITS,
"NODE_MARKER_RADIUS": NODE_MARKER_RADIUS,
"HISTORY_LINK_SCALE": HISTORY_LINK_SCALE,
"TRAIL_INFO_SUFFIX": trail_info_suffix,
"PROD_MODE": str(PROD_MODE).lower(),
"PROD_TOKEN": PROD_TOKEN,
"MAP_START_LAT": MAP_START_LAT,
"MAP_START_LON": MAP_START_LON,
"MAP_START_ZOOM": MAP_START_ZOOM,
"MAP_RADIUS_KM": MAP_RADIUS_KM,
"MAP_RADIUS_SHOW": str(MAP_RADIUS_SHOW).lower(),
"MAP_BOUNDARY_MODE": MAP_BOUNDARY_MODE,
"MAP_BOUNDARY_SHOW": str(MAP_BOUNDARY_SHOW).lower(),
"MAP_BOUNDARY_NAME": get_map_boundary_name(),
"MAP_DEFAULT_LAYER": MAP_DEFAULT_LAYER,
"LOS_ELEVATION_URL": LOS_ELEVATION_URL,
"LOS_ELEVATION_PROXY_URL": LOS_ELEVATION_PROXY_URL,
"LOS_SAMPLE_MIN": LOS_SAMPLE_MIN,
"LOS_SAMPLE_MAX": LOS_SAMPLE_MAX,
"LOS_SAMPLE_STEP_METERS": LOS_SAMPLE_STEP_METERS,
"LOS_CURVATURE_ENABLED": str(LOS_CURVATURE_ENABLED).lower(),
"LOS_CURVATURE_FACTOR": LOS_CURVATURE_FACTOR,
"LOS_PEAKS_MAX": LOS_PEAKS_MAX,
"MQTT_ONLINE_SECONDS": MQTT_ONLINE_SECONDS,
"MQTT_ONLINE_STATUS_TTL_SECONDS": MQTT_ONLINE_STATUS_TTL_SECONDS,
"MQTT_ONLINE_INTERNAL_TTL_SECONDS": MQTT_ONLINE_INTERNAL_TTL_SECONDS,
"MQTT_ACTIVITY_PACKETS_TTL_SECONDS": MQTT_ACTIVITY_PACKETS_TTL_SECONDS,
"COVERAGE_API_URL": COVERAGE_API_URL,
"WEATHER_RADAR_ENABLED": str(WEATHER_RADAR_ENABLED).lower(),
"WEATHER_RADAR_COUNTRY_BOUNDS_ENABLED": str(WEATHER_RADAR_COUNTRY_BOUNDS_ENABLED).lower(),
"WEATHER_RADAR_COUNTRY_LOOKUP_URL": WEATHER_RADAR_COUNTRY_LOOKUP_URL,
"WEATHER_WIND_ENABLED": str(WEATHER_WIND_ENABLED).lower(),
"WEATHER_WIND_API_URL": WEATHER_WIND_API_URL,
"WEATHER_WIND_GRID_SIZE": WEATHER_WIND_GRID_SIZE,
"WEATHER_WIND_REFRESH_SECONDS": WEATHER_WIND_REFRESH_SECONDS,
"UPDATE_AVAILABLE": str(bool(git_update_info.get("available"))).lower(),
"UPDATE_LOCAL": git_update_info.get("local_short") or "",
"UPDATE_REMOTE": git_update_info.get("remote_short") or "",
"UPDATE_BANNER_HIDDEN": "" if git_update_info.get("available") else "hidden",
"TURNSTILE_ENABLED": str(TURNSTILE_ENABLED).lower(),
"TURNSTILE_SITE_KEY": TURNSTILE_SITE_KEY,
}
for key, value in replacements.items():
safe_value = html.escape(str(value), quote=True)
content = content.replace(f"{{{{{key}}}}}", safe_value)
return HTMLResponse(content)
@app.get("/manifest.webmanifest")
def manifest():
icons = []
if SITE_ICON:
icons = [
{
"src": SITE_ICON,
"sizes": "192x192",
"type": "image/png",
"purpose": "any",
},
{
"src": SITE_ICON,
"sizes": "512x512",
"type": "image/png",
"purpose": "any maskable",
},
]
short_name = SITE_TITLE if len(SITE_TITLE) <= 12 else SITE_TITLE[:12]
return JSONResponse(
{
"name": SITE_TITLE,
"short_name": short_name,
"description": SITE_DESCRIPTION,
"start_url": "/",
"scope": "/",
"display": "standalone",
"display_override": ["standalone", "minimal-ui"],
"background_color": "#0f172a",
"theme_color": "#0f172a",
"icons": icons,
},
media_type="application/manifest+json",
)
@app.get("/qr")
def qr_code(
request: Request,
text: Optional[str] = Query(None, min_length=1, max_length=1024),
name: Optional[str] = Query(None, min_length=1, max_length=128),
public_key: Optional[str] = Query(None, min_length=64, max_length=64),
device_type: int = Query(1, alias="type", ge=1, le=4),
box_size: int = Query(14, ge=1, le=24),
border: int = Query(4, ge=0, le=8),
):
_require_prod_token(request)
value = ""
if public_key is not None:
key = str(public_key).strip().lower()
if len(key) != 64 or any(ch not in "0123456789abcdef" for ch in key):
raise HTTPException(status_code=400, detail="invalid_public_key")
contact_name = str(name or "").strip()
if not contact_name:
contact_name = key[:8].upper()
value = "meshcore://contact/add?" + urlencode({
"name": contact_name,
"public_key": key,
"type": str(device_type),
})
elif text is not None:
value = str(text).strip()
if not value:
raise HTTPException(status_code=400, detail="text_required")
qr = qrcode.QRCode(
version=None,
error_correction=qrcode.constants.ERROR_CORRECT_M,
box_size=box_size,
border=border,
)
qr.add_data(value)
qr.make(fit=True)
image = qr.make_image(fill_color="black", back_color="white").convert("RGB")
buffer = BytesIO()
image.save(buffer, format="PNG")
return Response(
content=buffer.getvalue(),
media_type="image/png",
headers={"Cache-Control": "no-store"},
)
@app.get("/sw.js")
def service_worker():
return FileResponse("static/sw.js", media_type="application/javascript")
@app.get("/snapshot")
def snapshot(request: Request):
_require_prod_token(request)
now = time.time()
return {
"devices": {
k: _device_payload(k, v)
for k, v in devices.items()
},
"trails": trails,
"routes": _snapshot_routes(now),
"history_edges":
[_history_edge_payload(e) for e in route_history_edges.values()],
"history_window_seconds": int(max(0, ROUTE_HISTORY_HOURS * 3600)),
"heat": _serialize_heat_events(),
"update": git_update_info,
"mqtt_presence": _mqtt_presence_summary(now),
"server_time": now,
}
@app.get("/stats")
def get_stats():
presence_summary = _mqtt_presence_summary()
if PROD_MODE:
return {
"stats":
{
"received_total": stats.get("received_total"),
"parsed_total": stats.get("parsed_total"),
"unparsed_total": stats.get("unparsed_total"),
"last_rx_ts": stats.get("last_rx_ts"),
"last_parsed_ts": stats.get("last_parsed_ts"),
},
"result_counts": result_counts,
"mapped_devices": len(devices),
"route_count": len(routes),
"history_edge_count": len(route_history_edges),
"seen_devices": len(seen_devices),
"mqtt_presence": presence_summary,
"server_time": time.time(),
}
top_topics = sorted(topic_counts.items(), key=lambda kv: kv[1],
reverse=True)[:20]
return {
"stats":
stats,
"result_counts":
result_counts,
"mapped_devices":
len(devices),
"route_count":
len(routes),
"history_edge_count":
len(route_history_edges),
"history_segments":
len(route_history_segments),
"seen_devices":
len(seen_devices),
"seen_recent":
sorted(seen_devices.items(), key=lambda kv: kv[1], reverse=True)[:20],
"mqtt_presence":
presence_summary,
"top_topics":
top_topics,
"decoder":
{
"decode_with_node": DECODE_WITH_NODE,
"node_ready": _node_ready_once,
"node_unavailable": _node_unavailable_once,
},
"route_payload_types":
sorted(ROUTE_PAYLOAD_TYPES_SET),
"direct_coords":
{
"mode": DIRECT_COORDS_MODE,
"topic_regex": DIRECT_COORDS_TOPIC_REGEX,
"regex_valid": DIRECT_COORDS_TOPIC_RE is not None,
"allow_zero": DIRECT_COORDS_ALLOW_ZERO,
},
"server_time":
time.time(),
}
@app.get("/api/nodes")
def api_nodes(
request: Request,
updated_since: Optional[str] = None,
mode: Optional[str] = None,
format: Optional[str] = None,
):
_require_prod_token(request)
cutoff = _parse_updated_since(updated_since)
mode_value = (mode or "").strip().lower()
force_full = mode_value in ("full", "all", "snapshot")
apply_delta = bool(cutoff is not None and not force_full)
format_value = (format or "").strip().lower()
format_nested = format_value in ("nested", "object", "wrapped", "v2")
nodes: List[Dict[str, Any]] = []
all_nodes: List[Dict[str, Any]] = []
max_last_seen = 0.0
for device_id, state in devices.items():
payload = _node_api_payload(device_id, state)
last_seen = payload.get("last_seen_ts") or 0
if float(last_seen) > max_last_seen:
max_last_seen = float(last_seen)
all_nodes.append(payload)
if apply_delta and cutoff is not None and float(last_seen) < cutoff:
continue
nodes.append(payload)
nodes.sort(key=lambda item: item.get("public_key") or "")
if not apply_delta:
all_nodes.sort(key=lambda item: item.get("public_key") or "")
nodes = all_nodes
payload: Dict[str, Any] = {
"server_time": time.time(),
"max_last_seen_ts": max_last_seen or None,
"updated_since_applied": bool(apply_delta and cutoff is not None),
"updated_since_ignored": bool(updated_since and not apply_delta),
}
if format_nested:
payload["data"] = {"nodes": nodes}
else:
# Default to the flat list for MeshBuddy compatibility.
payload["data"] = nodes
payload["nodes"] = nodes
return payload
@app.get("/peers/{device_id}")
def get_peers(device_id: str, request: Request, limit: Optional[int] = Query(None)):
_require_prod_token(request)
if not device_id:
raise HTTPException(status_code=400, detail="device_id required")
raw_limit = PEERS_DEFAULT_LIMIT if limit is None else limit
limit_value = max(1, int(raw_limit or PEERS_DEFAULT_LIMIT))
payload = _peer_stats_for_device(device_id, limit_value)
state = devices.get(device_id)
if state and not _coords_are_zero(state.lat, state.lon):
payload["lat"] = float(state.lat)
payload["lon"] = float(state.lon)
payload["name"] = (
(state.name if state else None) or device_names.get(device_id) or ""
)
payload["role"] = (state.role
if state else None) or device_roles.get(device_id)
payload["last_seen_ts"] = seen_devices.get(device_id
) or (state.ts if state else None)
payload["server_time"] = time.time()
return payload
def _peer_device_payload(
peer_id: str, count: int, total: int, last_ts: Optional[float]
) -> Dict[str, Any]:
state = devices.get(peer_id)
name = None
role = None
lat = None
lon = None
if state:
name = state.name or device_names.get(peer_id)
role = state.role or device_roles.get(peer_id)
if not _coords_are_zero(state.lat, state.lon):
lat = float(state.lat)
lon = float(state.lon)
if not name:
name = device_names.get(peer_id)
percent = (count / total * 100.0) if total > 0 else 0.0
return {
"peer_id": peer_id,
"name": name or "",
"role": role,
"lat": lat,
"lon": lon,
"count": int(count),
"percent": round(percent, 1),
"last_seen_ts": last_ts,
}
def _peer_is_excluded(peer_id: str) -> bool:
if not MQTT_ONLINE_FORCE_NAMES_SET:
return False
state = devices.get(peer_id)
name_value = ""
if state and state.name:
name_value = state.name
if not name_value:
name_value = device_names.get(peer_id) or ""
if not name_value:
return False
return name_value.strip().lower() in MQTT_ONLINE_FORCE_NAMES_SET
def _peer_stats_for_device(device_id: str, limit: int) -> Dict[str, Any]:
inbound: Dict[str, int] = {}
outbound: Dict[str, int] = {}
inbound_last: Dict[str, float] = {}
outbound_last: Dict[str, float] = {}
cutoff = _peer_history_cutoff()
if not peer_history_pairs and route_history_segments:
_rebuild_peer_history_from_segments()
for entry in peer_history_pairs.values():
if not isinstance(entry, dict):
continue
a_id = entry.get("a_id")
b_id = entry.get("b_id")
if not a_id or not b_id:
continue
buckets = entry.get("buckets")
if not isinstance(buckets, dict):
continue
count = 0
last_ts = 0.0
for bucket_key, bucket_count in buckets.items():
try:
bucket_start = float(bucket_key)
bucket_value = int(bucket_count)
except (TypeError, ValueError):
continue
if bucket_value <= 0:
continue
bucket_end = bucket_start + PEER_HISTORY_BUCKET_SECONDS
if bucket_end < cutoff:
continue
count += bucket_value
last_ts = max(last_ts, bucket_end)
if count <= 0:
continue
if a_id == device_id and b_id != device_id:
if _peer_is_excluded(b_id):
continue
outbound[b_id] = outbound.get(b_id, 0) + count
outbound_last[b_id] = max(outbound_last.get(b_id, 0), float(last_ts))
if b_id == device_id and a_id != device_id:
if _peer_is_excluded(a_id):
continue
inbound[a_id] = inbound.get(a_id, 0) + count
inbound_last[a_id] = max(inbound_last.get(a_id, 0), float(last_ts))
inbound_total = sum(inbound.values())
outbound_total = sum(outbound.values())
inbound_items = [
_peer_device_payload(
peer_id, count, inbound_total, inbound_last.get(peer_id)
) for peer_id, count in inbound.items()
]
outbound_items = [
_peer_device_payload(
peer_id, count, outbound_total, outbound_last.get(peer_id)
) for peer_id, count in outbound.items()
]
inbound_items.sort(key=lambda item: item.get("count", 0), reverse=True)
outbound_items.sort(key=lambda item: item.get("count", 0), reverse=True)
if limit > 0:
inbound_items = inbound_items[:limit]
outbound_items = outbound_items[:limit]
return {
"device_id": device_id,
"incoming_total": inbound_total,
"outgoing_total": outbound_total,
"incoming": inbound_items,
"outgoing": outbound_items,
"window_hours": ROUTE_HISTORY_HOURS,
}
@app.get("/los")
def line_of_sight(
lat1: float,
lon1: float,
lat2: float,
lon2: float,
profile: bool = False,
h1: float = 0.0,
h2: float = 0.0,
):
include_points = bool(profile)
start = _normalize_lat_lon(lat1, lon1)
end = _normalize_lat_lon(lat2, lon2)
if not start or not end:
return {"ok": False, "error": "invalid_coords"}
points = _sample_los_points(start[0], start[1], end[0], end[1])
elevations, error = _fetch_elevations(points)
if error:
return {"ok": False, "error": error}
distance_m = _haversine_m(start[0], start[1], end[0], end[1])
if distance_m <= 0:
return {"ok": False, "error": "zero_distance"}
safe_h1 = h1 if math.isfinite(h1) else 0.0
safe_h2 = h2 if math.isfinite(h2) else 0.0
start_elev = elevations[0] + safe_h1
end_elev = elevations[-1] + safe_h2
adjusted = _los_effective_elevations(points, elevations, distance_m)
adjusted[0] = start_elev
adjusted[-1] = end_elev
max_obstruction = _los_max_obstruction(points, adjusted, 0, len(points) - 1)
max_terrain = max(elevations)
blocked = max_obstruction > 0.0
suggestion = _find_los_suggestion(points, adjusted) if blocked else None
profile_samples = []
if distance_m > 0:
for idx, ((lat, lon, t), elev) in enumerate(zip(points, elevations)):
line_elev = start_elev + (end_elev - start_elev) * t
profile_samples.append(
[
round(distance_m * t, 2),
round(float(adjusted[idx]), 2),
round(float(line_elev), 2),
]
)
peaks = _find_los_peaks(points, elevations, distance_m)
response = {
"ok": True,
"blocked": blocked,
"max_obstruction_m": round(max_obstruction, 2),
"distance_m": round(distance_m, 2),
"distance_km": round(distance_m / 1000.0, 3),
"distance_mi": round(distance_m / 1609.344, 3),
"samples": len(points),
"elevation_m":
{
"start": round(start_elev, 2),
"end": round(end_elev, 2),
"max_terrain": round(max_terrain, 2),
},
"provider": LOS_ELEVATION_URL,
"note": (
"LOS using SRTM90m with Earth curvature "
f"({'on' if LOS_CURVATURE_ENABLED else 'off'}; "
f"factor={LOS_CURVATURE_FACTOR:.6f}). No Fresnel clearance model."
),
"suggested": suggestion,
"profile": profile_samples,
"peaks": peaks,
}
if include_points:
response["profile_points"] = [
[round(lat, 6),
round(lon, 6),
round(t, 4),
round(float(elev), 2)]
for (lat, lon, t), elev in zip(points, elevations)
]
return response
@app.get("/los/elevations")
def los_elevations(locations: str = ""):
raw = [loc for loc in (locations or "").split("|") if loc.strip()]
if not raw:
return {"status": "ERROR", "error": "missing_locations"}
if len(raw) > 200:
return {"status": "ERROR", "error": "too_many_locations"}
points = []
for loc in raw:
parts = loc.split(",")
if len(parts) != 2:
return {"status": "ERROR", "error": "invalid_location"}
try:
lat = float(parts[0])
lon = float(parts[1])
except (ValueError, TypeError):
return {"status": "ERROR", "error": "invalid_coords"}
normalized = _normalize_lat_lon(lat, lon)
if not normalized:
return {"status": "ERROR", "error": "invalid_coords"}
points.append((normalized[0], normalized[1], 0.0))
elevations, error = _fetch_elevations(points)
if error:
return {"status": "ERROR", "error": error}
return {
"status": "OK",
"results": [{"elevation": round(float(elev), 2)} for elev in elevations],
"provider": LOS_ELEVATION_URL,
}
@app.get("/coverage")
async def get_coverage():
if not COVERAGE_API_URL:
raise HTTPException(
status_code=503,
detail=
"coverage_api_not_configured: Set COVERAGE_API_URL in .env (e.g., http://localhost:3000)",
)
try:
now = time.time()
is_meshmapper = _is_meshmapper_coverage_url(COVERAGE_API_URL)
if is_meshmapper:
if not _coverage_cache_has_data():
_load_coverage_cache_file()
if _coverage_cache_has_data():
age = int(max(0, now - float(coverage_cache.get("fetched_at") or 0.0)))
source = coverage_cache.get("source") or "cache"
cached_data = coverage_cache["data"]
filtered = _filter_coverage_by_age(cached_data, now=now)
print(
f"[coverage] Serving MeshMapper cached data age={age}s source={source} filtered={len(filtered)}/{len(cached_data)}"
)
return JSONResponse(filtered, headers=_coverage_response_headers("meshmapper"))
cooldown_until = float(coverage_cache.get("cooldown_until") or 0.0)
if cooldown_until > now:
remaining = int(max(1, math.ceil(cooldown_until - now)))
raise HTTPException(
status_code=429,
detail=f"coverage_rate_limited: retry_in_seconds={remaining}",
)
raise HTTPException(status_code=503, detail="coverage_cache_empty")
samples, provider, _meta = await _fetch_coverage_upstream()
filtered = _filter_coverage_by_age(samples, now=now)
print(f"[coverage] Serving filtered legacy coverage {len(filtered)}/{len(samples)}")
return JSONResponse(filtered, headers=_coverage_response_headers(provider))
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="coverage_api_timeout")
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=502,
detail=
f"coverage_api_error: HTTP {e.response.status_code} from {COVERAGE_API_URL}",
)
except HTTPException:
raise
except httpx.HTTPError as e:
raise HTTPException(status_code=502, detail=f"coverage_api_error: {str(e)}")
except Exception as e:
raise HTTPException(
status_code=500, detail=f"coverage_fetch_error: {str(e)}"
)
@app.get("/debug/last")
def debug_last_entries():
if PROD_MODE:
raise HTTPException(status_code=404, detail="not_found")
return {
"count": len(debug_last),
"items": list(reversed(list(debug_last))),
"server_time": time.time(),
}
@app.get("/debug/status")
def debug_status_entries():
if PROD_MODE:
raise HTTPException(status_code=404, detail="not_found")
return {
"count": len(status_last),
"items": list(reversed(list(status_last))),
"server_time": time.time(),
}
@app.post("/api/verify-turnstile")
async def verify_turnstile(request: Request):
"""Verify Cloudflare Turnstile token and issue auth token."""
if not TURNSTILE_ENABLED or not turnstile_verifier:
return JSONResponse(
{"success": False, "error": "Turnstile is not enabled"},
status_code=400,
)
try:
body = await request.json()
token = body.get("token", "").strip()
if not token:
return JSONResponse(
{"success": False, "error": "Token is required"},
status_code=400,
)
# Verify token with Cloudflare
success, error = await turnstile_verifier.verify_turnstile_token(
token,
remote_ip=request.client.host if request.client else None,
)
if not success:
print(f"[turnstile] Verification failed: {error}")
return JSONResponse(
{"success": False, "error": error or "Verification failed"},
status_code=400,
)
# Issue auth token for client
auth_token = turnstile_verifier.issue_auth_token()
print(f"[turnstile] Verification successful, issued auth token")
# Create response with auth token and set cookie
response = JSONResponse(
{
"success": True,
"auth_token": auth_token,
},
status_code=200,
)
# Set auth cookie (expires in TURNSTILE_TOKEN_TTL_SECONDS)
response.set_cookie(
key="meshmap_auth",
value=auth_token,
max_age=TURNSTILE_TOKEN_TTL_SECONDS,
path="/",
samesite="lax",
)
return response
except json.JSONDecodeError:
return JSONResponse(
{"success": False, "error": "Invalid JSON"},
status_code=400,
)
except Exception as e:
print(f"[turnstile] Error verifying token: {e}")
return JSONResponse(
{"success": False, "error": str(e)},
status_code=500,
)
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket):
if not _ws_authorized(ws):
await ws.accept()
await ws.close(code=1008)
return
await ws.accept()
clients.add(ws)
await ws.send_text(
json.dumps(
{
"type": "snapshot",
"devices": {
k: _device_payload(k, v)
for k, v in devices.items()
},
"trails": trails,
"routes": _snapshot_routes(time.time()),
"history_edges":
[_history_edge_payload(e) for e in route_history_edges.values()],
"history_window_seconds": int(max(0, ROUTE_HISTORY_HOURS * 3600)),
"heat": _serialize_heat_events(),
"update": git_update_info,
"mqtt_presence": _mqtt_presence_summary(),
"server_time": time.time(),
}
)
)
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
pass
except RuntimeError:
pass
finally:
clients.discard(ws)