diff --git a/.eslintrc.json b/.eslintrc.json
new file mode 100644
index 0000000..af3689c
--- /dev/null
+++ b/.eslintrc.json
@@ -0,0 +1,39 @@
+{
+ "env": {
+ "browser": true,
+ "es2022": true
+ },
+ "extends": ["eslint:recommended"],
+ "parserOptions": {
+ "ecmaVersion": 2022,
+ "sourceType": "script"
+ },
+ "globals": {
+ "L": "readonly"
+ },
+ "rules": {
+ "indent": ["warn", 2, { "SwitchCase": 1 }],
+ "linebreak-style": ["error", "unix"],
+ "quotes": ["warn", "single", { "avoidEscape": true }],
+ "semi": ["error", "always"],
+ "no-unused-vars": ["warn", { "argsIgnorePattern": "^_", "varsIgnorePattern": "^_" }],
+ "no-console": "off",
+ "eqeqeq": ["warn", "smart"],
+ "curly": ["warn", "multi-line"],
+ "no-var": "error",
+ "prefer-const": "warn",
+ "no-multiple-empty-lines": ["warn", { "max": 2, "maxEOF": 1 }],
+ "comma-dangle": ["warn", "only-multiline"],
+ "object-curly-spacing": ["warn", "always"],
+ "array-bracket-spacing": ["warn", "never"],
+ "space-before-function-paren": ["warn", {
+ "anonymous": "always",
+ "named": "never",
+ "asyncArrow": "always"
+ }],
+ "keyword-spacing": ["warn", { "before": true, "after": true }],
+ "space-infix-ops": "warn",
+ "no-trailing-spaces": "warn",
+ "eol-last": ["warn", "always"]
+ }
+}
diff --git a/backend/Dockerfile b/backend/Dockerfile
index bd99667..621708f 100644
--- a/backend/Dockerfile
+++ b/backend/Dockerfile
@@ -12,8 +12,12 @@ RUN pip install --no-cache-dir -r requirements.txt
RUN npm install --prefix /app @michaelhart/meshcore-decoder \
&& npm cache clean --force
+# Copy all Python modules
COPY *.py /app/
-COPY static /app/static
+COPY routes /app/routes/
+COPY services /app/services/
+COPY scripts /app/scripts/
+COPY static /app/static/
EXPOSE 8080
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"]
diff --git a/backend/app.py b/backend/app.py
index 441cbb5..502dfb0 100644
--- a/backend/app.py
+++ b/backend/app.py
@@ -155,7 +155,7 @@ from state import (
# App / State
# =========================
app = FastAPI()
-app.mount("/static", StaticFiles(directory="static"), name="static")
+app.mount("/static", StaticFiles(directory=os.path.join(APP_DIR, "static")), name="static")
mqtt_client: Optional[mqtt.Client] = None
clients: Set[WebSocket] = set()
@@ -1126,7 +1126,7 @@ def root():
with open(html_path, "r", encoding="utf-8") as handle:
content = handle.read()
except Exception:
- return FileResponse("static/index.html")
+ return FileResponse(os.path.join(APP_DIR, "static", "index.html"))
og_image_tag = ""
twitter_image_tag = ""
@@ -1218,7 +1218,7 @@ def manifest():
@app.get("/sw.js")
def service_worker():
- return FileResponse("static/sw.js", media_type="application/javascript")
+ return FileResponse(os.path.join(APP_DIR, "static", "sw.js"), media_type="application/javascript")
@app.get("/snapshot")
diff --git a/backend/app_refactored.py b/backend/app_refactored.py
new file mode 100644
index 0000000..c5db104
--- /dev/null
+++ b/backend/app_refactored.py
@@ -0,0 +1,68 @@
+"""
+Mesh Live Map - FastAPI Application
+
+This is the refactored entry point that imports modular components.
+The original app.py is preserved for reference during the transition.
+"""
+
+import asyncio
+
+from fastapi import FastAPI
+from fastapi.staticfiles import StaticFiles
+
+from decoder import _ensure_node_decoder
+from history import _load_route_history, _route_history_saver
+from routes.api import router as api_router
+from routes.debug import router as debug_router
+from routes.static import router as static_router
+from routes.websocket import router as ws_router
+from services.broadcaster import broadcaster, check_git_updates, git_check_loop
+from services.mqtt import create_client, stop_client
+from services.persistence import load_state, state_saver
+from services.reaper import reaper
+
+# =========================
+# App Setup
+# =========================
+app = FastAPI(title="Mesh Live Map", version="1.0.2")
+app.mount("/static", StaticFiles(directory="static"), name="static")
+
+# Include routers
+app.include_router(static_router)
+app.include_router(api_router)
+app.include_router(debug_router)
+app.include_router(ws_router)
+
+
+# =========================
+# Startup / Shutdown
+# =========================
+@app.on_event("startup")
+async def startup():
+ """Initialize services on startup."""
+ # Load persisted state
+ load_state()
+ _load_route_history()
+
+ # Initialize decoder
+ _ensure_node_decoder()
+
+ # Check for updates
+ check_git_updates()
+
+ # Start MQTT client
+ loop = asyncio.get_event_loop()
+ create_client(loop)
+
+ # Start background tasks
+ asyncio.create_task(broadcaster())
+ asyncio.create_task(reaper())
+ asyncio.create_task(state_saver())
+ asyncio.create_task(_route_history_saver())
+ asyncio.create_task(git_check_loop())
+
+
+@app.on_event("shutdown")
+async def shutdown():
+ """Clean up on shutdown."""
+ stop_client()
diff --git a/backend/auth.py b/backend/auth.py
new file mode 100644
index 0000000..fb5b427
--- /dev/null
+++ b/backend/auth.py
@@ -0,0 +1,45 @@
+"""
+Authentication helpers for API routes.
+"""
+
+from typing import Dict, Optional
+
+from fastapi import HTTPException, Request, WebSocket
+
+from config import PROD_MODE, PROD_TOKEN
+
+
+def extract_token(headers: Dict[str, str]) -> Optional[str]:
+ """Extract bearer token from request headers."""
+ auth = headers.get("authorization")
+ if auth:
+ parts = auth.strip().split()
+ if len(parts) == 2 and parts[0].lower() == "bearer":
+ return parts[1]
+ return auth.strip()
+ return headers.get("x-access-token") or headers.get("x-token")
+
+
+def require_prod_token(request: Request) -> None:
+ """Raise HTTPException if production token is missing or invalid."""
+ if not PROD_MODE:
+ return
+ if not PROD_TOKEN:
+ raise HTTPException(status_code=503, detail="prod_token_not_set")
+ token = request.query_params.get("token") or request.query_params.get("access_token")
+ if not token:
+ token = extract_token(request.headers)
+ if token != PROD_TOKEN:
+ raise HTTPException(status_code=401, detail="unauthorized")
+
+
+def ws_authorized(ws: WebSocket) -> bool:
+ """Check if WebSocket connection is authorized."""
+ if not PROD_MODE:
+ return True
+ if not PROD_TOKEN:
+ return False
+ token = ws.query_params.get("token") or ws.query_params.get("access_token")
+ if not token:
+ token = extract_token(ws.headers)
+ return token == PROD_TOKEN
diff --git a/backend/config.py b/backend/config.py
index 7311fa6..4e8e781 100644
--- a/backend/config.py
+++ b/backend/config.py
@@ -127,4 +127,4 @@ LOS_PEAKS_MAX = int(os.getenv("LOS_PEAKS_MAX", "4"))
COVERAGE_API_URL = os.getenv("COVERAGE_API_URL", "").strip()
APP_DIR = os.path.dirname(os.path.abspath(__file__))
-NODE_SCRIPT_PATH = os.path.join(APP_DIR, "meshcore_decode.mjs")
+NODE_SCRIPT_PATH = os.path.join(APP_DIR, "scripts", "meshcore_decode.mjs")
diff --git a/backend/decoder.py b/backend/decoder.py
index 2ea4c06..5b7175c 100644
--- a/backend/decoder.py
+++ b/backend/decoder.py
@@ -534,6 +534,17 @@ def _direct_coords_allowed(topic: str, obj: Any) -> bool:
# =========================
def _ensure_node_decoder() -> bool:
+ """
+ Verify that the Node.js decoder is available and ready to use.
+
+ Checks:
+ 1. DECODE_WITH_NODE is enabled
+ 2. Node.js is installed
+ 3. @michaelhart/meshcore-decoder package is available
+ 4. The decoder script exists at scripts/meshcore_decode.mjs
+
+ Returns True if decoder is ready, False otherwise.
+ """
global _node_ready_once, _node_unavailable_once
if not DECODE_WITH_NODE:
@@ -543,6 +554,7 @@ def _ensure_node_decoder() -> bool:
if _node_unavailable_once:
return False
+ # Check Node.js is available
try:
subprocess.run(["node", "-v"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception:
@@ -550,6 +562,7 @@ def _ensure_node_decoder() -> bool:
print("[decode] node not found in container")
return False
+ # Check meshcore-decoder package is installed
try:
subprocess.run(
["node", "--input-type=module", "-e", "import('@michaelhart/meshcore-decoder')"],
@@ -563,107 +576,10 @@ def _ensure_node_decoder() -> bool:
print("[decode] @michaelhart/meshcore-decoder not available")
return False
- script = """#!/usr/bin/env node
-import { MeshCoreDecoder, getDeviceRoleName } from '@michaelhart/meshcore-decoder';
-
-const hex = (process.argv[2] || '').trim();
-
-function pickLocation(decodedPacket) {
- const payloadDecoded = decodedPacket?.payload?.decoded ?? null;
- const payloadRoot = decodedPacket?.payload ?? null;
- const appData = payloadDecoded?.appData ?? payloadDecoded?.appdata ?? payloadRoot?.appData ?? payloadRoot?.appdata ?? null;
- const loc = appData?.location ?? payloadDecoded?.location ?? payloadRoot?.location ?? null;
- const lat = loc?.latitude ?? loc?.lat ?? null;
- const lon = loc?.longitude ?? loc?.lon ?? null;
- const name = appData?.name ?? payloadDecoded?.name ?? payloadRoot?.name ?? null;
- const pubkey =
- payloadDecoded?.publicKey ??
- payloadDecoded?.publickey ??
- payloadRoot?.publicKey ??
- payloadRoot?.publickey ??
- decodedPacket?.publicKey ??
- decodedPacket?.publickey ??
- null;
- return { lat, lon, name, pubkey };
-}
-
-function pickRole(decodedPacket) {
- const payloadDecoded = decodedPacket?.payload?.decoded ?? null;
- const payloadRoot = decodedPacket?.payload ?? null;
- const appData = payloadDecoded?.appData ?? payloadDecoded?.appdata ?? payloadRoot?.appData ?? payloadRoot?.appdata ?? null;
- const candidates = [
- appData?.role,
- appData?.deviceRole,
- appData?.nodeRole,
- appData?.deviceType,
- appData?.nodeType,
- appData?.class,
- appData?.profile,
- payloadDecoded?.role,
- payloadDecoded?.deviceRole,
- payloadDecoded?.nodeRole,
- payloadDecoded?.deviceType,
- payloadDecoded?.nodeType,
- payloadDecoded?.class,
- payloadDecoded?.profile,
- payloadRoot?.role,
- payloadRoot?.deviceRole,
- payloadRoot?.nodeRole,
- payloadRoot?.deviceType,
- payloadRoot?.nodeType,
- payloadRoot?.class,
- payloadRoot?.profile,
- ];
- for (const value of candidates) {
- if (typeof value === 'string' && value.trim()) return value.trim();
- }
- return null;
-}
-
-try {
- const decoded = MeshCoreDecoder.decode(hex);
- const loc = pickLocation(decoded);
- const payloadDecoded = decoded?.payload?.decoded ?? decoded?.payload ?? null;
- const payloadRoot = decoded?.payload ?? null;
- const appData = payloadDecoded?.appData ?? payloadDecoded?.appdata ?? payloadRoot?.appData ?? payloadRoot?.appdata ?? null;
- const deviceRole = appData?.deviceRole ?? payloadDecoded?.deviceRole ?? payloadRoot?.deviceRole ?? null;
- const deviceRoleName = typeof deviceRole === 'number' ? getDeviceRoleName(deviceRole) : null;
- const role = pickRole(decoded) || deviceRoleName;
- const payloadKeys = payloadDecoded && typeof payloadDecoded === 'object' ? Object.keys(payloadDecoded) : null;
- const appDataKeys = appData && typeof appData === 'object' ? Object.keys(appData) : null;
- const pathHashes = payloadDecoded?.pathHashes ?? null;
- const snrValues = payloadDecoded?.snrValues ?? null;
- const path = decoded?.path ?? null;
- const pathLength = decoded?.pathLength ?? null;
- const out = {
- ok: true,
- payloadType: decoded?.payloadType ?? null,
- routeType: decoded?.routeType ?? null,
- messageHash: decoded?.messageHash ?? null,
- location: loc,
- role,
- deviceRole,
- deviceRoleName,
- payloadKeys,
- appDataKeys,
- pathHashes,
- snrValues,
- path,
- pathLength,
- };
- console.log(JSON.stringify(out));
-} catch (e) {
- console.log(JSON.stringify({ ok: false, error: String(e) }));
-}
-"""
-
- try:
- with open(NODE_SCRIPT_PATH, "w", encoding="utf-8") as handle:
- handle.write(script)
- os.chmod(NODE_SCRIPT_PATH, 0o755)
- except Exception as exc:
+ # Check external script exists
+ if not os.path.exists(NODE_SCRIPT_PATH):
_node_unavailable_once = True
- print(f"[decode] failed writing node helper: {exc}")
+ print(f"[decode] decoder script not found at {NODE_SCRIPT_PATH}")
return False
_node_ready_once = True
diff --git a/backend/helpers.py b/backend/helpers.py
new file mode 100644
index 0000000..d68f25c
--- /dev/null
+++ b/backend/helpers.py
@@ -0,0 +1,255 @@
+"""
+Shared helper functions for routes and services.
+"""
+
+import time
+from dataclasses import asdict
+from datetime import datetime, timezone
+from typing import Any, Dict, List, Optional
+
+from config import (
+ MAP_RADIUS_KM,
+ MAP_START_LAT,
+ MAP_START_LON,
+ MQTT_ONLINE_FORCE_NAMES_SET,
+ PROD_MODE,
+ ROUTE_HISTORY_HOURS,
+)
+from decoder import _coords_are_zero, _normalize_role
+from los import _haversine_m
+from state import (
+ DeviceState,
+ device_names,
+ device_roles,
+ devices,
+ mqtt_seen,
+ route_history_segments,
+ seen_devices,
+)
+
+
+def iso_from_ts(ts: Optional[float]) -> Optional[str]:
+ """Convert Unix timestamp to ISO 8601 string."""
+ if ts is None:
+ return None
+ try:
+ return datetime.fromtimestamp(float(ts), tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
+ except Exception:
+ return None
+
+
+def within_map_radius(lat: Any, lon: Any) -> bool:
+ """Check if coordinates are within the configured map radius."""
+ if MAP_RADIUS_KM <= 0:
+ return True
+ try:
+ lat_val = float(lat)
+ lon_val = float(lon)
+ except (TypeError, ValueError):
+ return False
+ distance_m = _haversine_m(MAP_START_LAT, MAP_START_LON, lat_val, lon_val)
+ return distance_m <= (MAP_RADIUS_KM * 1000.0)
+
+
+def device_role_code(value: Any) -> int:
+ """Convert role string/int to numeric code (1=companion, 2=repeater, 3=room)."""
+ if isinstance(value, int):
+ if value in (1, 2, 3):
+ return value
+ return 1
+ if isinstance(value, str):
+ trimmed = value.strip()
+ if trimmed.isdigit():
+ num = int(trimmed)
+ if num in (1, 2, 3):
+ return num
+ return 1
+ normalized = _normalize_role(trimmed)
+ if normalized == "repeater":
+ return 2
+ if normalized == "room":
+ return 3
+ if normalized == "companion":
+ return 1
+ return 1
+
+
+def parse_updated_since(value: Optional[str]) -> Optional[float]:
+ """Parse ISO 8601 timestamp string to Unix timestamp."""
+ if not value:
+ return None
+ try:
+ text = value.strip()
+ if text.endswith("Z"):
+ text = text[:-1] + "+00:00"
+ return datetime.fromisoformat(text).timestamp()
+ except Exception:
+ return None
+
+
+def device_payload(device_id: str, state: "DeviceState") -> Dict[str, Any]:
+ """Serialize device state for WebSocket/API response."""
+ payload = asdict(state)
+ last_seen = seen_devices.get(device_id)
+ if last_seen:
+ payload["last_seen_ts"] = last_seen
+ else:
+ payload["last_seen_ts"] = payload.get("ts")
+ mqtt_seen_ts = mqtt_seen.get(device_id)
+ if mqtt_seen_ts:
+ payload["mqtt_seen_ts"] = mqtt_seen_ts
+ if MQTT_ONLINE_FORCE_NAMES_SET:
+ name_value = (state.name or device_names.get(device_id) or "").strip().lower()
+ if name_value and name_value in MQTT_ONLINE_FORCE_NAMES_SET:
+ payload["mqtt_forced"] = True
+ if PROD_MODE:
+ payload.pop("raw_topic", None)
+ return payload
+
+
+def node_api_payload(device_id: str, state: "DeviceState") -> Dict[str, Any]:
+ """Serialize device state for /api/nodes endpoint."""
+ last_seen = seen_devices.get(device_id) or state.ts
+ last_seen_iso = iso_from_ts(last_seen)
+ role_value = state.role or device_roles.get(device_id)
+ role_code = device_role_code(role_value)
+ return {
+ "public_key": device_id,
+ "name": (state.name or device_names.get(device_id) or ""),
+ "device_role": role_code,
+ "role": role_value,
+ "location": {
+ "latitude": float(state.lat),
+ "longitude": float(state.lon),
+ },
+ "lat": state.lat,
+ "lon": state.lon,
+ "last_seen_ts": last_seen,
+ "last_seen": last_seen_iso,
+ "timestamp": int(last_seen) if last_seen else None,
+ "first_seen": last_seen_iso,
+ "battery_voltage": 0,
+ }
+
+
+def route_payload(route: Dict[str, Any]) -> Dict[str, Any]:
+ """Serialize route for WebSocket/API response."""
+ if not PROD_MODE:
+ return route
+ return {
+ "id": route.get("id"),
+ "points": route.get("points"),
+ "route_mode": route.get("route_mode"),
+ "ts": route.get("ts"),
+ "expires_at": route.get("expires_at"),
+ "payload_type": route.get("payload_type"),
+ }
+
+
+def history_edge_payload(edge: Dict[str, Any]) -> Dict[str, Any]:
+ """Serialize history edge for WebSocket/API response."""
+ return {
+ "id": edge.get("id"),
+ "a": edge.get("a"),
+ "b": edge.get("b"),
+ "count": edge.get("count"),
+ "last_ts": edge.get("last_ts"),
+ "recent": edge.get("recent") if isinstance(edge.get("recent"), list) else [],
+ }
+
+
+def peer_is_excluded(peer_id: str) -> bool:
+ """Check if peer should be excluded from stats (forced-online nodes)."""
+ if not MQTT_ONLINE_FORCE_NAMES_SET:
+ return False
+ state = devices.get(peer_id)
+ name_value = ""
+ if state and state.name:
+ name_value = state.name
+ if not name_value:
+ name_value = device_names.get(peer_id) or ""
+ if not name_value:
+ return False
+ return name_value.strip().lower() in MQTT_ONLINE_FORCE_NAMES_SET
+
+
+def peer_device_payload(peer_id: str, count: int, total: int, last_ts: Optional[float]) -> Dict[str, Any]:
+ """Serialize peer info for /peers endpoint."""
+ state = devices.get(peer_id)
+ name = None
+ role = None
+ lat = None
+ lon = None
+ if state:
+ name = state.name or device_names.get(peer_id)
+ role = state.role or device_roles.get(peer_id)
+ if not _coords_are_zero(state.lat, state.lon):
+ lat = float(state.lat)
+ lon = float(state.lon)
+ if not name:
+ name = device_names.get(peer_id)
+ percent = (count / total * 100.0) if total > 0 else 0.0
+ return {
+ "peer_id": peer_id,
+ "name": name or "",
+ "role": role,
+ "lat": lat,
+ "lon": lon,
+ "count": int(count),
+ "percent": round(percent, 1),
+ "last_seen_ts": last_ts,
+ }
+
+
+def peer_stats_for_device(device_id: str, limit: int) -> Dict[str, Any]:
+ """Calculate inbound/outbound peer statistics for a device."""
+ inbound: Dict[str, int] = {}
+ outbound: Dict[str, int] = {}
+ inbound_last: Dict[str, float] = {}
+ outbound_last: Dict[str, float] = {}
+
+ for entry in route_history_segments:
+ if not isinstance(entry, dict):
+ continue
+ a_id = entry.get("a_id")
+ b_id = entry.get("b_id")
+ if not a_id or not b_id:
+ continue
+ ts = entry.get("ts") or 0
+ if a_id == device_id and b_id != device_id:
+ if peer_is_excluded(b_id):
+ continue
+ outbound[b_id] = outbound.get(b_id, 0) + 1
+ outbound_last[b_id] = max(outbound_last.get(b_id, 0), float(ts))
+ if b_id == device_id and a_id != device_id:
+ if peer_is_excluded(a_id):
+ continue
+ inbound[a_id] = inbound.get(a_id, 0) + 1
+ inbound_last[a_id] = max(inbound_last.get(a_id, 0), float(ts))
+
+ inbound_total = sum(inbound.values())
+ outbound_total = sum(outbound.values())
+
+ inbound_items = [
+ peer_device_payload(peer_id, count, inbound_total, inbound_last.get(peer_id))
+ for peer_id, count in inbound.items()
+ ]
+ outbound_items = [
+ peer_device_payload(peer_id, count, outbound_total, outbound_last.get(peer_id))
+ for peer_id, count in outbound.items()
+ ]
+ inbound_items.sort(key=lambda item: item.get("count", 0), reverse=True)
+ outbound_items.sort(key=lambda item: item.get("count", 0), reverse=True)
+
+ if limit > 0:
+ inbound_items = inbound_items[:limit]
+ outbound_items = outbound_items[:limit]
+
+ return {
+ "device_id": device_id,
+ "incoming_total": inbound_total,
+ "outgoing_total": outbound_total,
+ "incoming": inbound_items,
+ "outgoing": outbound_items,
+ "window_hours": ROUTE_HISTORY_HOURS,
+ }
diff --git a/backend/routes/__init__.py b/backend/routes/__init__.py
new file mode 100644
index 0000000..d212dab
--- /dev/null
+++ b/backend/routes/__init__.py
@@ -0,0 +1 @@
+# Routes package
diff --git a/backend/routes/api.py b/backend/routes/api.py
new file mode 100644
index 0000000..4d27dcc
--- /dev/null
+++ b/backend/routes/api.py
@@ -0,0 +1,226 @@
+"""
+API routes for node data, peers, LOS, and coverage.
+"""
+
+import time
+from typing import Any, Dict, List, Optional
+
+import httpx
+from fastapi import APIRouter, HTTPException, Request
+
+from auth import require_prod_token
+from config import (
+ COVERAGE_API_URL,
+ LOS_ELEVATION_URL,
+ LOS_PEAKS_MAX,
+ ROUTE_HISTORY_HOURS,
+)
+from decoder import _coords_are_zero, _normalize_lat_lon
+from helpers import (
+ device_payload,
+ history_edge_payload,
+ node_api_payload,
+ parse_updated_since,
+ peer_stats_for_device,
+ route_payload,
+)
+from los import (
+ _fetch_elevations,
+ _find_los_peaks,
+ _find_los_suggestion,
+ _haversine_m,
+ _los_max_obstruction,
+ _sample_los_points,
+)
+from state import (
+ device_names,
+ device_roles,
+ devices,
+ route_history_edges,
+ routes,
+ seen_devices,
+ trails,
+)
+
+router = APIRouter()
+
+
+@router.get("/snapshot")
+def snapshot(request: Request):
+ """Return full state snapshot."""
+ from services.broadcaster import git_update_info
+ from decoder import _serialize_heat_events
+
+ require_prod_token(request)
+ return {
+ "devices": {k: device_payload(k, v) for k, v in devices.items()},
+ "trails": trails,
+ "routes": [route_payload(r) for r in routes.values()],
+ "history_edges": [history_edge_payload(e) for e in route_history_edges.values()],
+ "history_window_seconds": int(max(0, ROUTE_HISTORY_HOURS * 3600)),
+ "heat": _serialize_heat_events(),
+ "update": git_update_info,
+ "server_time": time.time(),
+ }
+
+
+@router.get("/api/nodes")
+def api_nodes(
+ request: Request,
+ updated_since: Optional[str] = None,
+ mode: Optional[str] = None,
+ format: Optional[str] = None,
+):
+ """Return list of nodes, optionally filtered by update time."""
+ require_prod_token(request)
+ cutoff = parse_updated_since(updated_since)
+ mode_value = (mode or "").strip().lower()
+ apply_delta = mode_value in ("delta", "updates", "since")
+ format_value = (format or "").strip().lower()
+ format_flat = format_value in ("flat", "list", "legacy", "v1")
+
+ nodes: List[Dict[str, Any]] = []
+ all_nodes: List[Dict[str, Any]] = []
+ max_last_seen = 0.0
+
+ for device_id, state in devices.items():
+ payload = node_api_payload(device_id, state)
+ last_seen = payload.get("last_seen_ts") or 0
+ if float(last_seen) > max_last_seen:
+ max_last_seen = float(last_seen)
+ all_nodes.append(payload)
+ if apply_delta and cutoff is not None and float(last_seen) < cutoff:
+ continue
+ nodes.append(payload)
+
+ nodes.sort(key=lambda item: item.get("public_key") or "")
+ if not apply_delta:
+ all_nodes.sort(key=lambda item: item.get("public_key") or "")
+ nodes = all_nodes
+
+ response: Dict[str, Any] = {
+ "server_time": time.time(),
+ "max_last_seen_ts": max_last_seen or None,
+ "updated_since_applied": bool(apply_delta and cutoff is not None),
+ "updated_since_ignored": bool(updated_since and not apply_delta),
+ }
+ if format_flat:
+ response["data"] = nodes
+ else:
+ response["data"] = {"nodes": nodes}
+ return response
+
+
+@router.get("/peers/{device_id}")
+def get_peers(device_id: str, request: Request, limit: int = 8):
+ """Return peer statistics for a device."""
+ require_prod_token(request)
+ if not device_id:
+ raise HTTPException(status_code=400, detail="device_id required")
+
+ limit_value = max(1, min(int(limit or 8), 50))
+ payload = peer_stats_for_device(device_id, limit_value)
+
+ state = devices.get(device_id)
+ if state and not _coords_are_zero(state.lat, state.lon):
+ payload["lat"] = float(state.lat)
+ payload["lon"] = float(state.lon)
+ payload["name"] = (state.name if state else None) or device_names.get(device_id) or ""
+ payload["role"] = (state.role if state else None) or device_roles.get(device_id)
+ payload["last_seen_ts"] = seen_devices.get(device_id) or (state.ts if state else None)
+ payload["server_time"] = time.time()
+ return payload
+
+
+@router.get("/los")
+def line_of_sight(lat1: float, lon1: float, lat2: float, lon2: float, profile: bool = False):
+ """Calculate line of sight between two points."""
+ include_points = bool(profile)
+ start = _normalize_lat_lon(lat1, lon1)
+ end = _normalize_lat_lon(lat2, lon2)
+ if not start or not end:
+ return {"ok": False, "error": "invalid_coords"}
+
+ points = _sample_los_points(start[0], start[1], end[0], end[1])
+ elevations, error = _fetch_elevations(points)
+ if error:
+ return {"ok": False, "error": error}
+
+ distance_m = _haversine_m(start[0], start[1], end[0], end[1])
+ if distance_m <= 0:
+ return {"ok": False, "error": "zero_distance"}
+
+ start_elev = elevations[0]
+ end_elev = elevations[-1]
+ max_obstruction = _los_max_obstruction(points, elevations, 0, len(points) - 1)
+ max_terrain = max(elevations)
+ blocked = max_obstruction > 0.0
+ suggestion = _find_los_suggestion(points, elevations) if blocked else None
+
+ profile_samples = []
+ if distance_m > 0:
+ for (lat, lon, t), elev in zip(points, elevations):
+ line_elev = start_elev + (end_elev - start_elev) * t
+ profile_samples.append([
+ round(distance_m * t, 2),
+ round(float(elev), 2),
+ round(float(line_elev), 2),
+ ])
+
+ peaks = _find_los_peaks(points, elevations, distance_m)
+
+ response = {
+ "ok": True,
+ "blocked": blocked,
+ "max_obstruction_m": round(max_obstruction, 2),
+ "distance_m": round(distance_m, 2),
+ "distance_km": round(distance_m / 1000.0, 3),
+ "distance_mi": round(distance_m / 1609.344, 3),
+ "samples": len(points),
+ "elevation_m": {
+ "start": round(start_elev, 2),
+ "end": round(end_elev, 2),
+ "max_terrain": round(max_terrain, 2),
+ },
+ "provider": LOS_ELEVATION_URL,
+ "note": "Straight-line LOS using SRTM90m. No curvature/refraction.",
+ "suggested": suggestion,
+ "profile": profile_samples,
+ "peaks": peaks,
+ }
+ if include_points:
+ response["profile_points"] = [
+ [round(lat, 6), round(lon, 6), round(t, 4), round(float(elev), 2)]
+ for (lat, lon, t), elev in zip(points, elevations)
+ ]
+ return response
+
+
+@router.get("/coverage")
+async def get_coverage():
+ """Proxy coverage data from external API."""
+ if not COVERAGE_API_URL:
+ raise HTTPException(
+ status_code=503,
+ detail="coverage_api_not_configured: Set COVERAGE_API_URL in .env (e.g., http://localhost:3000)",
+ )
+ try:
+ url = f"{COVERAGE_API_URL}/get-samples"
+ print(f"[coverage] Fetching from {url}")
+ async with httpx.AsyncClient(timeout=10.0) as client:
+ response = await client.get(url)
+ response.raise_for_status()
+ data = response.json()
+ samples = data.get("keys", []) if isinstance(data, dict) else (data if isinstance(data, list) else [])
+ print(f"[coverage] Received {len(samples) if isinstance(samples, list) else 'non-list'} items")
+ if isinstance(samples, list) and len(samples) > 0:
+ print(f"[coverage] Sample item keys: {list(samples[0].keys()) if samples[0] else 'N/A'}")
+ return samples
+ except httpx.TimeoutException:
+ raise HTTPException(status_code=504, detail="coverage_api_timeout")
+ except httpx.HTTPStatusError as e:
+ raise HTTPException(status_code=502, detail=f"coverage_api_error: HTTP {e.response.status_code}")
+ except httpx.HTTPError as e:
+ raise HTTPException(status_code=502, detail=f"coverage_api_error: {str(e)}")
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"coverage_fetch_error: {str(e)}")
diff --git a/backend/routes/debug.py b/backend/routes/debug.py
new file mode 100644
index 0000000..752e373
--- /dev/null
+++ b/backend/routes/debug.py
@@ -0,0 +1,106 @@
+"""
+Debug routes for development/troubleshooting.
+"""
+
+import time
+
+from fastapi import APIRouter, HTTPException
+
+from config import (
+ DECODE_WITH_NODE,
+ DIRECT_COORDS_ALLOW_ZERO,
+ DIRECT_COORDS_MODE,
+ DIRECT_COORDS_TOPIC_REGEX,
+ PROD_MODE,
+)
+from decoder import (
+ DIRECT_COORDS_TOPIC_RE,
+ ROUTE_PAYLOAD_TYPES_SET,
+ _node_ready_once,
+ _node_unavailable_once,
+)
+from state import (
+ debug_last,
+ devices,
+ result_counts,
+ route_history_edges,
+ route_history_segments,
+ routes,
+ seen_devices,
+ stats,
+ status_last,
+ topic_counts,
+)
+
+router = APIRouter()
+
+
+@router.get("/stats")
+def get_stats():
+ """Return message statistics and counters."""
+ if PROD_MODE:
+ return {
+ "stats": {
+ "received_total": stats.get("received_total"),
+ "parsed_total": stats.get("parsed_total"),
+ "unparsed_total": stats.get("unparsed_total"),
+ "last_rx_ts": stats.get("last_rx_ts"),
+ "last_parsed_ts": stats.get("last_parsed_ts"),
+ },
+ "result_counts": result_counts,
+ "mapped_devices": len(devices),
+ "route_count": len(routes),
+ "history_edge_count": len(route_history_edges),
+ "seen_devices": len(seen_devices),
+ "server_time": time.time(),
+ }
+
+ top_topics = sorted(topic_counts.items(), key=lambda kv: kv[1], reverse=True)[:20]
+ return {
+ "stats": stats,
+ "result_counts": result_counts,
+ "mapped_devices": len(devices),
+ "route_count": len(routes),
+ "history_edge_count": len(route_history_edges),
+ "history_segments": len(route_history_segments),
+ "seen_devices": len(seen_devices),
+ "seen_recent": sorted(seen_devices.items(), key=lambda kv: kv[1], reverse=True)[:20],
+ "top_topics": top_topics,
+ "decoder": {
+ "decode_with_node": DECODE_WITH_NODE,
+ "node_ready": _node_ready_once,
+ "node_unavailable": _node_unavailable_once,
+ },
+ "route_payload_types": sorted(ROUTE_PAYLOAD_TYPES_SET),
+ "direct_coords": {
+ "mode": DIRECT_COORDS_MODE,
+ "topic_regex": DIRECT_COORDS_TOPIC_REGEX,
+ "regex_valid": DIRECT_COORDS_TOPIC_RE is not None,
+ "allow_zero": DIRECT_COORDS_ALLOW_ZERO,
+ },
+ "server_time": time.time(),
+ }
+
+
+@router.get("/debug/last")
+def debug_last_entries():
+ """Return recent MQTT message debug entries."""
+ if PROD_MODE:
+ raise HTTPException(status_code=404, detail="not_found")
+ return {
+ "count": len(debug_last),
+ "items": list(reversed(list(debug_last))),
+ "server_time": time.time(),
+ }
+
+
+@router.get("/debug/status")
+def debug_status_entries():
+ """Return recent status message debug entries."""
+ if PROD_MODE:
+ raise HTTPException(status_code=404, detail="not_found")
+ return {
+ "count": len(status_last),
+ "items": list(reversed(list(status_last))),
+ "server_time": time.time(),
+ }
diff --git a/backend/routes/static.py b/backend/routes/static.py
new file mode 100644
index 0000000..574af34
--- /dev/null
+++ b/backend/routes/static.py
@@ -0,0 +1,148 @@
+"""
+Static file and root HTML routes.
+"""
+
+import html
+import os
+
+from fastapi import APIRouter
+from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
+
+from config import (
+ APP_DIR,
+ COVERAGE_API_URL,
+ CUSTOM_LINK_URL,
+ DISTANCE_UNITS,
+ HISTORY_LINK_SCALE,
+ LOS_ELEVATION_URL,
+ LOS_PEAKS_MAX,
+ LOS_SAMPLE_MAX,
+ LOS_SAMPLE_MIN,
+ LOS_SAMPLE_STEP_METERS,
+ MAP_DEFAULT_LAYER,
+ MAP_RADIUS_KM,
+ MAP_RADIUS_SHOW,
+ MAP_START_LAT,
+ MAP_START_LON,
+ MAP_START_ZOOM,
+ MQTT_ONLINE_SECONDS,
+ NODE_MARKER_RADIUS,
+ PROD_MODE,
+ PROD_TOKEN,
+ SITE_DESCRIPTION,
+ SITE_FEED_NOTE,
+ SITE_ICON,
+ SITE_OG_IMAGE,
+ SITE_TITLE,
+ SITE_URL,
+ TRAIL_LEN,
+)
+
+router = APIRouter()
+
+
+@router.get("/")
+def root():
+ """Serve the main HTML page with injected configuration."""
+ from services.broadcaster import git_update_info
+
+ html_path = os.path.join(APP_DIR, "static", "index.html")
+ try:
+ with open(html_path, "r", encoding="utf-8") as handle:
+ content = handle.read()
+ except Exception:
+ return FileResponse("static/index.html")
+
+ og_image_tag = ""
+ twitter_image_tag = ""
+ if SITE_OG_IMAGE:
+ safe_image = html.escape(str(SITE_OG_IMAGE), quote=True)
+ og_image_tag = f''
+ twitter_image_tag = f''
+
+ content = content.replace("{{OG_IMAGE_TAG}}", og_image_tag)
+ content = content.replace("{{TWITTER_IMAGE_TAG}}", twitter_image_tag)
+
+ trail_info_suffix = ""
+ if TRAIL_LEN > 0:
+ trail_info_suffix = f" Trails show last ~{TRAIL_LEN} points."
+
+ replacements = {
+ "SITE_TITLE": SITE_TITLE,
+ "SITE_DESCRIPTION": SITE_DESCRIPTION,
+ "SITE_URL": SITE_URL,
+ "SITE_ICON": SITE_ICON,
+ "SITE_FEED_NOTE": SITE_FEED_NOTE,
+ "CUSTOM_LINK_URL": CUSTOM_LINK_URL,
+ "DISTANCE_UNITS": DISTANCE_UNITS,
+ "NODE_MARKER_RADIUS": NODE_MARKER_RADIUS,
+ "HISTORY_LINK_SCALE": HISTORY_LINK_SCALE,
+ "TRAIL_INFO_SUFFIX": trail_info_suffix,
+ "PROD_MODE": str(PROD_MODE).lower(),
+ "PROD_TOKEN": PROD_TOKEN,
+ "MAP_START_LAT": MAP_START_LAT,
+ "MAP_START_LON": MAP_START_LON,
+ "MAP_START_ZOOM": MAP_START_ZOOM,
+ "MAP_RADIUS_KM": MAP_RADIUS_KM,
+ "MAP_RADIUS_SHOW": str(MAP_RADIUS_SHOW).lower(),
+ "MAP_DEFAULT_LAYER": MAP_DEFAULT_LAYER,
+ "LOS_ELEVATION_URL": LOS_ELEVATION_URL,
+ "LOS_SAMPLE_MIN": LOS_SAMPLE_MIN,
+ "LOS_SAMPLE_MAX": LOS_SAMPLE_MAX,
+ "LOS_SAMPLE_STEP_METERS": LOS_SAMPLE_STEP_METERS,
+ "LOS_PEAKS_MAX": LOS_PEAKS_MAX,
+ "MQTT_ONLINE_SECONDS": MQTT_ONLINE_SECONDS,
+ "COVERAGE_API_URL": COVERAGE_API_URL,
+ "UPDATE_AVAILABLE": str(bool(git_update_info.get("available"))).lower(),
+ "UPDATE_LOCAL": git_update_info.get("local_short") or "",
+ "UPDATE_REMOTE": git_update_info.get("remote_short") or "",
+ "UPDATE_BANNER_HIDDEN": "" if git_update_info.get("available") else "hidden",
+ }
+ for key, value in replacements.items():
+ safe_value = html.escape(str(value), quote=True)
+ content = content.replace(f"{{{{{key}}}}}", safe_value)
+
+ return HTMLResponse(content)
+
+
+@router.get("/manifest.webmanifest")
+def manifest():
+ """Serve PWA manifest."""
+ icons = []
+ if SITE_ICON:
+ icons = [
+ {
+ "src": SITE_ICON,
+ "sizes": "192x192",
+ "type": "image/png",
+ "purpose": "any",
+ },
+ {
+ "src": SITE_ICON,
+ "sizes": "512x512",
+ "type": "image/png",
+ "purpose": "any maskable",
+ },
+ ]
+ short_name = SITE_TITLE if len(SITE_TITLE) <= 12 else SITE_TITLE[:12]
+ return JSONResponse(
+ {
+ "name": SITE_TITLE,
+ "short_name": short_name,
+ "description": SITE_DESCRIPTION,
+ "start_url": "/",
+ "scope": "/",
+ "display": "standalone",
+ "display_override": ["standalone", "minimal-ui"],
+ "background_color": "#0f172a",
+ "theme_color": "#0f172a",
+ "icons": icons,
+ },
+ media_type="application/manifest+json",
+ )
+
+
+@router.get("/sw.js")
+def service_worker():
+ """Serve service worker JavaScript."""
+ return FileResponse("static/sw.js", media_type="application/javascript")
diff --git a/backend/routes/websocket.py b/backend/routes/websocket.py
new file mode 100644
index 0000000..7be4550
--- /dev/null
+++ b/backend/routes/websocket.py
@@ -0,0 +1,61 @@
+"""
+WebSocket endpoint for real-time updates.
+"""
+
+import json
+
+from fastapi import APIRouter, WebSocket, WebSocketDisconnect
+
+from auth import ws_authorized
+from config import ROUTE_HISTORY_HOURS
+from decoder import _serialize_heat_events
+from helpers import device_payload, history_edge_payload, route_payload
+from state import devices, route_history_edges, routes, trails
+
+router = APIRouter()
+
+# Set of connected WebSocket clients (managed by broadcaster service)
+clients: set = set()
+
+
+def get_clients() -> set:
+ """Get the set of connected WebSocket clients."""
+ return clients
+
+
+@router.websocket("/ws")
+async def ws_endpoint(ws: WebSocket):
+ """WebSocket endpoint for real-time updates."""
+ from services.broadcaster import git_update_info
+
+ if not ws_authorized(ws):
+ await ws.accept()
+ await ws.close(code=1008)
+ return
+
+ await ws.accept()
+ clients.add(ws)
+
+ # Send initial snapshot
+ await ws.send_text(
+ json.dumps({
+ "type": "snapshot",
+ "devices": {k: device_payload(k, v) for k, v in devices.items()},
+ "trails": trails,
+ "routes": [route_payload(r) for r in routes.values()],
+ "history_edges": [history_edge_payload(e) for e in route_history_edges.values()],
+ "history_window_seconds": int(max(0, ROUTE_HISTORY_HOURS * 3600)),
+ "heat": _serialize_heat_events(),
+ "update": git_update_info,
+ })
+ )
+
+ try:
+ while True:
+ await ws.receive_text()
+ except WebSocketDisconnect:
+ pass
+ except RuntimeError:
+ pass
+ finally:
+ clients.discard(ws)
diff --git a/backend/scripts/meshcore_decode.mjs b/backend/scripts/meshcore_decode.mjs
new file mode 100644
index 0000000..2721c29
--- /dev/null
+++ b/backend/scripts/meshcore_decode.mjs
@@ -0,0 +1,101 @@
+#!/usr/bin/env node
+/**
+ * MeshCore packet decoder wrapper.
+ *
+ * Usage: node meshcore_decode.mjs
+ *
+ * Decodes a MeshCore packet from hex and extracts location, role, and routing info.
+ * Outputs JSON to stdout.
+ */
+
+import { MeshCoreDecoder, getDeviceRoleName } from '@michaelhart/meshcore-decoder';
+
+const hex = (process.argv[2] || '').trim();
+
+function pickLocation(decodedPacket) {
+ const payloadDecoded = decodedPacket?.payload?.decoded ?? null;
+ const payloadRoot = decodedPacket?.payload ?? null;
+ const appData = payloadDecoded?.appData ?? payloadDecoded?.appdata ?? payloadRoot?.appData ?? payloadRoot?.appdata ?? null;
+ const loc = appData?.location ?? payloadDecoded?.location ?? payloadRoot?.location ?? null;
+ const lat = loc?.latitude ?? loc?.lat ?? null;
+ const lon = loc?.longitude ?? loc?.lon ?? null;
+ const name = appData?.name ?? payloadDecoded?.name ?? payloadRoot?.name ?? null;
+ const pubkey =
+ payloadDecoded?.publicKey ??
+ payloadDecoded?.publickey ??
+ payloadRoot?.publicKey ??
+ payloadRoot?.publickey ??
+ decodedPacket?.publicKey ??
+ decodedPacket?.publickey ??
+ null;
+ return { lat, lon, name, pubkey };
+}
+
+function pickRole(decodedPacket) {
+ const payloadDecoded = decodedPacket?.payload?.decoded ?? null;
+ const payloadRoot = decodedPacket?.payload ?? null;
+ const appData = payloadDecoded?.appData ?? payloadDecoded?.appdata ?? payloadRoot?.appData ?? payloadRoot?.appdata ?? null;
+ const candidates = [
+ appData?.role,
+ appData?.deviceRole,
+ appData?.nodeRole,
+ appData?.deviceType,
+ appData?.nodeType,
+ appData?.class,
+ appData?.profile,
+ payloadDecoded?.role,
+ payloadDecoded?.deviceRole,
+ payloadDecoded?.nodeRole,
+ payloadDecoded?.deviceType,
+ payloadDecoded?.nodeType,
+ payloadDecoded?.class,
+ payloadDecoded?.profile,
+ payloadRoot?.role,
+ payloadRoot?.deviceRole,
+ payloadRoot?.nodeRole,
+ payloadRoot?.deviceType,
+ payloadRoot?.nodeType,
+ payloadRoot?.class,
+ payloadRoot?.profile,
+ ];
+ for (const value of candidates) {
+ if (typeof value === 'string' && value.trim()) return value.trim();
+ }
+ return null;
+}
+
+try {
+ const decoded = MeshCoreDecoder.decode(hex);
+ const loc = pickLocation(decoded);
+ const payloadDecoded = decoded?.payload?.decoded ?? decoded?.payload ?? null;
+ const payloadRoot = decoded?.payload ?? null;
+ const appData = payloadDecoded?.appData ?? payloadDecoded?.appdata ?? payloadRoot?.appData ?? payloadRoot?.appdata ?? null;
+ const deviceRole = appData?.deviceRole ?? payloadDecoded?.deviceRole ?? payloadRoot?.deviceRole ?? null;
+ const deviceRoleName = typeof deviceRole === 'number' ? getDeviceRoleName(deviceRole) : null;
+ const role = pickRole(decoded) || deviceRoleName;
+ const payloadKeys = payloadDecoded && typeof payloadDecoded === 'object' ? Object.keys(payloadDecoded) : null;
+ const appDataKeys = appData && typeof appData === 'object' ? Object.keys(appData) : null;
+ const pathHashes = payloadDecoded?.pathHashes ?? null;
+ const snrValues = payloadDecoded?.snrValues ?? null;
+ const path = decoded?.path ?? null;
+ const pathLength = decoded?.pathLength ?? null;
+ const out = {
+ ok: true,
+ payloadType: decoded?.payloadType ?? null,
+ routeType: decoded?.routeType ?? null,
+ messageHash: decoded?.messageHash ?? null,
+ location: loc,
+ role,
+ deviceRole,
+ deviceRoleName,
+ payloadKeys,
+ appDataKeys,
+ pathHashes,
+ snrValues,
+ path,
+ pathLength,
+ };
+ console.log(JSON.stringify(out));
+} catch (e) {
+ console.log(JSON.stringify({ ok: false, error: String(e) }));
+}
diff --git a/backend/services/__init__.py b/backend/services/__init__.py
new file mode 100644
index 0000000..a70b302
--- /dev/null
+++ b/backend/services/__init__.py
@@ -0,0 +1 @@
+# Services package
diff --git a/backend/services/broadcaster.py b/backend/services/broadcaster.py
new file mode 100644
index 0000000..eb4a2c2
--- /dev/null
+++ b/backend/services/broadcaster.py
@@ -0,0 +1,387 @@
+"""
+WebSocket broadcaster service.
+
+Processes events from the update queue and broadcasts to connected clients.
+"""
+
+import asyncio
+import json
+import os
+import subprocess
+import time
+from typing import Any, Dict, List, Optional, Set
+
+import state
+from config import (
+ GIT_CHECK_ENABLED,
+ GIT_CHECK_FETCH,
+ GIT_CHECK_INTERVAL_SECONDS,
+ GIT_CHECK_PATH,
+ MAP_RADIUS_KM,
+ ROUTE_TTL_SECONDS,
+ TRAIL_LEN,
+)
+from decoder import (
+ ROUTE_PAYLOAD_TYPES_SET,
+ _append_heat_points,
+ _coords_are_zero,
+ _rebuild_node_hash_map,
+ _route_points_from_device_ids,
+ _route_points_from_hashes,
+)
+from helpers import device_payload, history_edge_payload, route_payload, within_map_radius
+from history import _record_route_history
+from state import (
+ DeviceState,
+ device_names,
+ device_roles,
+ devices,
+ mqtt_seen,
+ routes,
+ seen_devices,
+ trails,
+)
+
+# Update queue for async processing
+update_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()
+
+# Git update check state
+git_update_info = {
+ "available": False,
+ "local": None,
+ "remote": None,
+ "local_short": None,
+ "remote_short": None,
+ "error": None,
+}
+
+
+def check_git_updates() -> None:
+ """Check if there are updates available from upstream."""
+ if not GIT_CHECK_ENABLED:
+ return
+
+ if not GIT_CHECK_PATH or not os.path.isdir(GIT_CHECK_PATH):
+ git_update_info["error"] = "git_path_missing"
+ return
+
+ def run_git(args: List[str]) -> str:
+ result = subprocess.run(
+ args,
+ check=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ )
+ return result.stdout.strip()
+
+ try:
+ subprocess.run(
+ ["git", "config", "--global", "--add", "safe.directory", GIT_CHECK_PATH],
+ check=False,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ inside = run_git(["git", "-C", GIT_CHECK_PATH, "rev-parse", "--is-inside-work-tree"])
+ if inside.lower() != "true":
+ git_update_info["error"] = "not_git_repo"
+ return
+ except Exception:
+ git_update_info["error"] = "git_unavailable"
+ return
+
+ try:
+ if GIT_CHECK_FETCH:
+ subprocess.run(
+ ["git", "-C", GIT_CHECK_PATH, "fetch", "--quiet", "--prune"],
+ check=False,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ local_sha = run_git(["git", "-C", GIT_CHECK_PATH, "rev-parse", "HEAD"])
+ remote_sha = run_git(["git", "-C", GIT_CHECK_PATH, "rev-parse", "@{u}"])
+ git_update_info["local"] = local_sha
+ git_update_info["remote"] = remote_sha
+ git_update_info["local_short"] = local_sha[:7]
+ git_update_info["remote_short"] = remote_sha[:7]
+ git_update_info["available"] = local_sha != remote_sha
+ if git_update_info["available"]:
+ print(f"[update] available {git_update_info['local_short']} -> {git_update_info['remote_short']}")
+ except Exception:
+ git_update_info["error"] = "git_compare_failed"
+
+
+async def git_check_loop() -> None:
+ """Periodically check for git updates."""
+ if not GIT_CHECK_ENABLED:
+ return
+ if GIT_CHECK_INTERVAL_SECONDS <= 0:
+ return
+ while True:
+ await asyncio.sleep(GIT_CHECK_INTERVAL_SECONDS)
+ check_git_updates()
+
+
+def evict_device(device_id: str) -> bool:
+ """Remove a device from all state structures."""
+ from state import last_seen_broadcast
+
+ removed = False
+ if device_id in devices:
+ devices.pop(device_id, None)
+ removed = True
+ trails.pop(device_id, None)
+ seen_devices.pop(device_id, None)
+ mqtt_seen.pop(device_id, None)
+ last_seen_broadcast.pop(device_id, None)
+ if removed:
+ state.state_dirty = True
+ _rebuild_node_hash_map()
+ return removed
+
+
+async def broadcaster() -> None:
+ """
+ Main broadcaster loop.
+
+ Processes events from the update queue and broadcasts to all connected WebSocket clients.
+ """
+ from routes.websocket import get_clients
+
+ while True:
+ event = await update_queue.get()
+ clients = get_clients()
+
+ # Handle device name/role updates
+ if isinstance(event, dict) and event.get("type") in ("device_name", "device_role"):
+ device_id = event.get("device_id")
+ device_state = devices.get(device_id)
+ if device_state:
+ if device_id in device_names:
+ device_state.name = device_names[device_id]
+ if device_id in device_roles:
+ device_state.role = device_roles[device_id]
+ payload = {
+ "type": "update",
+ "device": device_payload(device_id, device_state),
+ "trail": trails.get(device_id, []),
+ }
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+ continue
+
+ # Handle device seen (online status) updates
+ if isinstance(event, dict) and event.get("type") == "device_seen":
+ device_id = event.get("device_id")
+ device_state = devices.get(device_id)
+ if device_state:
+ seen_ts = event.get("last_seen_ts") or time.time()
+ mqtt_ts = event.get("mqtt_seen_ts")
+ seen_devices[device_id] = seen_ts
+ if mqtt_ts:
+ mqtt_seen[device_id] = mqtt_ts
+ payload = {
+ "type": "device_seen",
+ "device_id": device_id,
+ "last_seen_ts": seen_ts,
+ "mqtt_seen_ts": mqtt_ts,
+ }
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+ continue
+
+ # Handle device removal
+ if isinstance(event, dict) and event.get("type") == "device_remove":
+ device_id = event.get("device_id")
+ if device_id and evict_device(device_id):
+ payload = {"type": "stale", "device_ids": [device_id]}
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+ continue
+
+ # Handle route updates
+ if isinstance(event, dict) and event.get("type") == "route":
+ route_mode = event.get("route_mode")
+ points = event.get("points")
+ used_hashes: List[str] = []
+ point_ids: List[Optional[str]] = []
+
+ if not points:
+ path_hashes = event.get("path_hashes") or []
+ points, used_hashes, point_ids = _route_points_from_hashes(
+ list(path_hashes),
+ event.get("origin_id"),
+ event.get("receiver_id"),
+ event.get("ts") or time.time(),
+ )
+
+ if not points and route_mode == "fanout":
+ points = _route_points_from_device_ids(event.get("origin_id"), event.get("receiver_id"))
+ if points and event.get("origin_id") and event.get("receiver_id") and len(points) == 2:
+ point_ids = [event.get("origin_id"), event.get("receiver_id")]
+
+ # Fallback: draw direct link if path hashes missing
+ if not points:
+ points = _route_points_from_device_ids(event.get("origin_id"), event.get("receiver_id"))
+ if points:
+ route_mode = "direct"
+ if event.get("origin_id") and event.get("receiver_id") and len(points) == 2:
+ point_ids = [event.get("origin_id"), event.get("receiver_id")]
+
+ if not points:
+ continue
+
+ # Filter routes outside map radius
+ if MAP_RADIUS_KM > 0:
+ outside = any(
+ not within_map_radius(point[0], point[1])
+ for point in points
+ if isinstance(point, (list, tuple)) and len(point) >= 2
+ )
+ if outside:
+ continue
+
+ route_id = (
+ event.get("route_id")
+ or event.get("message_hash")
+ or f"{event.get('origin_id', 'route')}-{int(event.get('ts', time.time()) * 1000)}"
+ )
+ expires_at = (event.get("ts") or time.time()) + ROUTE_TTL_SECONDS
+ route = {
+ "id": route_id,
+ "points": points,
+ "hashes": used_hashes,
+ "point_ids": point_ids,
+ "route_mode": route_mode or ("path" if used_hashes else "direct"),
+ "ts": event.get("ts") or time.time(),
+ "expires_at": expires_at,
+ "origin_id": event.get("origin_id"),
+ "receiver_id": event.get("receiver_id"),
+ "payload_type": event.get("payload_type"),
+ "message_hash": event.get("message_hash"),
+ "snr_values": event.get("snr_values"),
+ "topic": event.get("topic"),
+ }
+ _append_heat_points(points, route["ts"], event.get("payload_type"))
+ routes[route_id] = route
+
+ history_updates, history_removed = _record_route_history(route)
+
+ payload = {"type": "route", "route": route_payload(route)}
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+
+ # Broadcast history updates
+ if history_updates or history_removed:
+ history_payload = {}
+ if history_updates:
+ history_payload["type"] = "history_edges"
+ history_payload["edges"] = [history_edge_payload(edge) for edge in history_updates]
+ if history_removed:
+ history_payload_remove = {"type": "history_edges_remove", "edge_ids": history_removed}
+ else:
+ history_payload_remove = None
+ dead = []
+ for ws in list(clients):
+ try:
+ if history_updates:
+ await ws.send_text(json.dumps(history_payload))
+ if history_payload_remove:
+ await ws.send_text(json.dumps(history_payload_remove))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+ continue
+
+ # Handle device position updates
+ upd = event.get("data") if isinstance(event, dict) and event.get("type") == "device" else event
+
+ device_id = upd["device_id"]
+ if not within_map_radius(upd.get("lat"), upd.get("lon")):
+ if evict_device(device_id):
+ payload = {"type": "stale", "device_ids": [device_id]}
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+ continue
+
+ is_new_device = device_id not in devices
+ device_state = DeviceState(
+ device_id=device_id,
+ lat=upd["lat"],
+ lon=upd["lon"],
+ ts=upd.get("ts", time.time()),
+ heading=upd.get("heading"),
+ speed=upd.get("speed"),
+ rssi=upd.get("rssi"),
+ snr=upd.get("snr"),
+ name=upd.get("name") or device_names.get(device_id),
+ role=upd.get("role") or device_roles.get(device_id),
+ raw_topic=upd.get("raw_topic"),
+ )
+ devices[device_id] = device_state
+ seen_devices[device_id] = time.time()
+ state.state_dirty = True
+
+ if is_new_device:
+ _rebuild_node_hash_map()
+
+ if device_state.name:
+ device_names[device_id] = device_state.name
+ if device_state.role:
+ device_roles[device_id] = device_state.role
+
+ # Update trail
+ if TRAIL_LEN > 0 and not _coords_are_zero(device_state.lat, device_state.lon):
+ trails.setdefault(device_id, [])
+ trails[device_id].append([device_state.lat, device_state.lon, device_state.ts])
+ if len(trails[device_id]) > TRAIL_LEN:
+ trails[device_id] = trails[device_id][-TRAIL_LEN:]
+ elif device_id in trails:
+ trails.pop(device_id, None)
+
+ payload = {
+ "type": "update",
+ "device": device_payload(device_id, device_state),
+ "trail": trails.get(device_id, []),
+ }
+
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
diff --git a/backend/services/mqtt.py b/backend/services/mqtt.py
new file mode 100644
index 0000000..6fa6070
--- /dev/null
+++ b/backend/services/mqtt.py
@@ -0,0 +1,408 @@
+"""
+MQTT service for subscribing to mesh network topics and processing messages.
+"""
+
+import asyncio
+import time
+from typing import Any, Dict, List, Optional
+
+import paho.mqtt.client as mqtt
+
+import state
+from config import (
+ DEBUG_PAYLOAD,
+ DEBUG_PAYLOAD_MAX,
+ MQTT_CA_CERT,
+ MQTT_CLIENT_ID,
+ MQTT_HOST,
+ MQTT_ONLINE_FORCE_NAMES_SET,
+ MQTT_PASSWORD,
+ MQTT_PORT,
+ MQTT_SEEN_BROADCAST_MIN_SECONDS,
+ MQTT_TLS,
+ MQTT_TLS_INSECURE,
+ MQTT_TOPICS,
+ MQTT_TRANSPORT,
+ MQTT_USERNAME,
+ MQTT_WS_PATH,
+)
+from decoder import (
+ ROUTE_PAYLOAD_TYPES_SET,
+ _device_id_from_topic,
+ _safe_preview,
+ _topic_marks_online,
+ _try_parse_payload,
+)
+from helpers import within_map_radius
+from state import (
+ debug_last,
+ device_names,
+ device_roles,
+ device_role_sources,
+ devices,
+ last_seen_broadcast,
+ message_origins,
+ mqtt_seen,
+ result_counts,
+ seen_devices,
+ stats,
+ status_last,
+ topic_counts,
+)
+
+# Global MQTT client
+mqtt_client: Optional[mqtt.Client] = None
+
+
+def on_connect(client, userdata, flags, reason_code, properties=None):
+ """Handle MQTT connection."""
+ topics_str = ", ".join(MQTT_TOPICS)
+ print(f"[mqtt] connected reason_code={reason_code} subscribing topics={topics_str}")
+ for topic in MQTT_TOPICS:
+ client.subscribe(topic, qos=0)
+
+
+def on_disconnect(client, userdata, reason_code, properties=None, *args, **kwargs):
+ """Handle MQTT disconnection."""
+ print(f"[mqtt] disconnected reason_code={reason_code}")
+
+
+def on_message(client, userdata, msg: mqtt.MQTTMessage):
+ """Handle incoming MQTT messages."""
+ from services.broadcaster import update_queue
+
+ stats["received_total"] += 1
+ stats["last_rx_ts"] = time.time()
+ stats["last_rx_topic"] = msg.topic
+ topic_counts[msg.topic] = topic_counts.get(msg.topic, 0) + 1
+ loop: asyncio.AbstractEventLoop = userdata["loop"]
+
+ # Track device online status from topic
+ dev_guess = _device_id_from_topic(msg.topic)
+ if dev_guess and _topic_marks_online(msg.topic):
+ now = time.time()
+ seen_devices[dev_guess] = now
+ mqtt_seen[dev_guess] = now
+ if dev_guess in devices:
+ last_sent = last_seen_broadcast.get(dev_guess, 0)
+ if now - last_sent >= MQTT_SEEN_BROADCAST_MIN_SECONDS:
+ last_seen_broadcast[dev_guess] = now
+ loop.call_soon_threadsafe(
+ update_queue.put_nowait,
+ {
+ "type": "device_seen",
+ "device_id": dev_guess,
+ "last_seen_ts": now,
+ "mqtt_seen_ts": now,
+ },
+ )
+
+ # Parse the payload
+ parsed, debug = _try_parse_payload(msg.topic, msg.payload)
+ device_id_hint = parsed.get("device_id") if parsed else None
+
+ # Filter zero coordinates
+ if parsed and (parsed.get("lat", 0) == 0 and parsed.get("lon", 0) == 0):
+ debug["result"] = "filtered_zero_coords"
+ parsed = None
+
+ # Filter coordinates outside map radius
+ if parsed and not within_map_radius(parsed.get("lat"), parsed.get("lon")):
+ debug["result"] = "filtered_radius"
+ parsed = None
+ if device_id_hint:
+ loop.call_soon_threadsafe(
+ update_queue.put_nowait,
+ {
+ "type": "device_remove",
+ "device_id": device_id_hint,
+ "reason": "radius",
+ },
+ )
+
+ # Extract metadata
+ origin_id = debug.get("origin_id") or _device_id_from_topic(msg.topic)
+ decoder_meta = debug.get("decoder_meta") or {}
+ result = debug.get("result") or "unknown"
+ device_role = debug.get("device_role")
+
+ # Determine role target ID
+ role_target_id = origin_id
+ if device_role and result.startswith("decoded"):
+ role_target_id = None
+ loc_meta = decoder_meta.get("location") if isinstance(decoder_meta, dict) else None
+ loc_pubkey = loc_meta.get("pubkey") if isinstance(loc_meta, dict) else None
+ if isinstance(loc_pubkey, str) and loc_pubkey.strip():
+ role_target_id = loc_pubkey
+ else:
+ decoded_pubkey = debug.get("decoded_pubkey")
+ if isinstance(decoded_pubkey, str) and decoded_pubkey.strip():
+ role_target_id = decoded_pubkey
+
+ # Store debug entry
+ debug_entry = {
+ "ts": time.time(),
+ "topic": msg.topic,
+ "result": debug.get("result"),
+ "found_path": debug.get("found_path"),
+ "found_hint": debug.get("found_hint"),
+ "decoder_meta": decoder_meta,
+ "role_target_id": role_target_id,
+ "packet_hash": debug.get("packet_hash"),
+ "direction": debug.get("direction"),
+ "json_keys": debug.get("json_keys"),
+ "parse_error": debug.get("parse_error"),
+ "origin_id": origin_id,
+ "payload_preview": _safe_preview(msg.payload[:DEBUG_PAYLOAD_MAX]),
+ }
+ debug_last.append(debug_entry)
+
+ # Store status messages
+ if msg.topic.endswith("/status"):
+ status_last.append({
+ "ts": debug_entry["ts"],
+ "topic": msg.topic,
+ "device_name": debug.get("device_name"),
+ "device_role": debug.get("device_role"),
+ "origin_id": origin_id,
+ "json_keys": debug_entry.get("json_keys"),
+ "payload_preview": debug_entry["payload_preview"],
+ })
+
+ result_counts[result] = result_counts.get(result, 0) + 1
+
+ # Update device name if found
+ device_name = debug.get("device_name")
+ if device_name and origin_id:
+ existing_name = device_names.get(origin_id)
+ if existing_name != device_name:
+ device_names[origin_id] = device_name
+ state.state_dirty = True
+ device_state = devices.get(origin_id)
+ if device_state:
+ device_state.name = device_name
+ loop.call_soon_threadsafe(
+ update_queue.put_nowait,
+ {
+ "type": "device_name",
+ "device_id": origin_id,
+ },
+ )
+
+ # Update device role if found
+ if device_role and role_target_id:
+ existing_role = device_roles.get(role_target_id)
+ if existing_role != device_role:
+ device_roles[role_target_id] = device_role
+ device_role_sources[role_target_id] = "explicit"
+ state.state_dirty = True
+ device_state = devices.get(role_target_id)
+ if device_state:
+ device_state.role = device_role
+ loop.call_soon_threadsafe(
+ update_queue.put_nowait,
+ {
+ "type": "device_role",
+ "device_id": role_target_id,
+ },
+ )
+
+ # Process routing information
+ path_hashes = decoder_meta.get("pathHashes")
+ payload_type = decoder_meta.get("payloadType")
+ route_type = decoder_meta.get("routeType")
+ message_hash = decoder_meta.get("messageHash") or debug.get("packet_hash")
+ snr_values = decoder_meta.get("snrValues")
+ path_header = decoder_meta.get("path")
+ direction = debug.get("direction")
+ receiver_id = _device_id_from_topic(msg.topic)
+
+ # Determine route origin
+ route_origin_id = None
+ loc_meta = decoder_meta.get("location") if isinstance(decoder_meta, dict) else None
+ if isinstance(loc_meta, dict):
+ decoded_pubkey = loc_meta.get("pubkey")
+ if decoded_pubkey:
+ route_origin_id = decoded_pubkey
+
+ direction_value = str(direction or "").lower()
+
+ # Track message origins for fanout detection
+ if message_hash:
+ cache = message_origins.get(message_hash)
+ if not cache:
+ cache = {"origin_id": None, "first_rx": None, "receivers": set(), "ts": time.time()}
+ message_origins[message_hash] = cache
+ cache["ts"] = time.time()
+ origin_for_tx = origin_id or receiver_id
+ if direction_value == "tx" and origin_for_tx:
+ cache["origin_id"] = origin_for_tx
+ if direction_value == "rx" and receiver_id:
+ cache["receivers"].add(receiver_id)
+ if not cache.get("first_rx"):
+ cache["first_rx"] = receiver_id
+ cached_origin = cache.get("origin_id")
+ if not route_origin_id and cached_origin:
+ route_origin_id = cached_origin
+ if not route_origin_id and direction_value == "rx":
+ first_rx = cache.get("first_rx")
+ if first_rx and receiver_id and receiver_id != first_rx:
+ route_origin_id = first_rx
+
+ if not route_origin_id:
+ route_origin_id = origin_id
+
+ # Normalize payload/route types
+ try:
+ payload_type = int(payload_type) if payload_type is not None else None
+ except (TypeError, ValueError):
+ payload_type = None
+ try:
+ route_type = int(route_type) if route_type is not None else None
+ except (TypeError, ValueError):
+ route_type = None
+
+ # Determine route hashes
+ route_hashes = None
+ if path_hashes and isinstance(path_hashes, list):
+ route_hashes = path_hashes
+ elif payload_type not in (8, 9) and isinstance(path_header, list):
+ if route_type in (0, 1):
+ route_hashes = path_header
+
+ # Emit route events
+ route_emitted = False
+ if route_hashes and payload_type in ROUTE_PAYLOAD_TYPES_SET:
+ loop.call_soon_threadsafe(
+ update_queue.put_nowait,
+ {
+ "type": "route",
+ "path_hashes": route_hashes,
+ "payload_type": payload_type,
+ "message_hash": message_hash,
+ "origin_id": route_origin_id,
+ "receiver_id": receiver_id,
+ "snr_values": snr_values,
+ "route_type": route_type,
+ "ts": time.time(),
+ "topic": msg.topic,
+ },
+ )
+ route_emitted = True
+ elif message_hash and route_origin_id and receiver_id:
+ if direction_value == "rx" and msg.topic.endswith("/packets"):
+ loop.call_soon_threadsafe(
+ update_queue.put_nowait,
+ {
+ "type": "route",
+ "route_mode": "fanout",
+ "route_id": f"{message_hash}-{receiver_id}",
+ "origin_id": route_origin_id,
+ "receiver_id": receiver_id,
+ "message_hash": message_hash,
+ "route_type": route_type,
+ "payload_type": payload_type,
+ "ts": time.time(),
+ "topic": msg.topic,
+ },
+ )
+ route_emitted = True
+
+ # Fallback: direct route if no path
+ if (
+ not route_emitted
+ and direction_value == "rx"
+ and msg.topic.endswith("/packets")
+ and receiver_id
+ and route_origin_id
+ and receiver_id != route_origin_id
+ and payload_type in ROUTE_PAYLOAD_TYPES_SET
+ ):
+ fallback_id = message_hash or f"{route_origin_id}-{receiver_id}-{int(time.time() * 1000)}"
+ loop.call_soon_threadsafe(
+ update_queue.put_nowait,
+ {
+ "type": "route",
+ "route_mode": "direct",
+ "route_id": f"direct-{fallback_id}",
+ "origin_id": route_origin_id,
+ "receiver_id": receiver_id,
+ "message_hash": message_hash,
+ "route_type": route_type,
+ "payload_type": payload_type,
+ "ts": time.time(),
+ "topic": msg.topic,
+ },
+ )
+
+ # Handle unparsed messages
+ if not parsed:
+ stats["unparsed_total"] += 1
+ if DEBUG_PAYLOAD:
+ print(f"[mqtt] UNPARSED result={result} topic={msg.topic} preview={debug_entry['payload_preview']!r}")
+ return
+
+ parsed["raw_topic"] = msg.topic
+ stats["parsed_total"] += 1
+ stats["last_parsed_ts"] = time.time()
+ stats["last_parsed_topic"] = msg.topic
+
+ if DEBUG_PAYLOAD:
+ print(f"[mqtt] PARSED topic={msg.topic} device={parsed['device_id']} lat={parsed['lat']} lon={parsed['lon']}")
+
+ loop.call_soon_threadsafe(update_queue.put_nowait, {"type": "device", "data": parsed})
+
+
+def create_client(loop: asyncio.AbstractEventLoop) -> mqtt.Client:
+ """Create and configure the MQTT client."""
+ global mqtt_client
+
+ transport = "websockets" if MQTT_TRANSPORT == "websockets" else "tcp"
+ topics_str = ", ".join(MQTT_TOPICS)
+ print(
+ f"[mqtt] connecting host={MQTT_HOST} port={MQTT_PORT} tls={MQTT_TLS} "
+ f"transport={transport} ws_path={MQTT_WS_PATH if transport == 'websockets' else '-'} topics={topics_str}"
+ )
+
+ mqtt_client = mqtt.Client(
+ mqtt.CallbackAPIVersion.VERSION2,
+ client_id=(MQTT_CLIENT_ID or None),
+ userdata={"loop": loop},
+ transport=transport,
+ )
+
+ if transport == "websockets":
+ mqtt_client.ws_set_options(path=MQTT_WS_PATH)
+
+ if MQTT_USERNAME:
+ mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
+
+ if MQTT_TLS:
+ if MQTT_CA_CERT:
+ mqtt_client.tls_set(ca_certs=MQTT_CA_CERT)
+ else:
+ mqtt_client.tls_set()
+ if MQTT_TLS_INSECURE:
+ mqtt_client.tls_insecure_set(True)
+
+ mqtt_client.on_connect = on_connect
+ mqtt_client.on_disconnect = on_disconnect
+ mqtt_client.on_message = on_message
+
+ mqtt_client.reconnect_delay_set(min_delay=1, max_delay=30)
+ mqtt_client.connect_async(MQTT_HOST, MQTT_PORT, keepalive=30)
+ mqtt_client.loop_start()
+
+ return mqtt_client
+
+
+def stop_client() -> None:
+ """Stop the MQTT client."""
+ global mqtt_client
+ if mqtt_client is not None:
+ try:
+ mqtt_client.loop_stop()
+ mqtt_client.disconnect()
+ except Exception:
+ pass
+ mqtt_client = None
diff --git a/backend/services/persistence.py b/backend/services/persistence.py
new file mode 100644
index 0000000..75bd609
--- /dev/null
+++ b/backend/services/persistence.py
@@ -0,0 +1,220 @@
+"""
+State persistence service.
+
+Handles loading and saving of device state, trails, and role overrides.
+"""
+
+import asyncio
+import json
+import os
+from dataclasses import asdict
+from typing import Any, Dict, Set
+
+import state
+from config import (
+ DEVICE_ROLES_FILE,
+ STATE_DIR,
+ STATE_FILE,
+ STATE_SAVE_INTERVAL,
+ TRAIL_LEN,
+)
+from decoder import _coords_are_zero, _normalize_role, _rebuild_node_hash_map
+from helpers import within_map_radius
+from state import (
+ DeviceState,
+ device_names,
+ device_role_sources,
+ device_roles,
+ devices,
+ seen_devices,
+ trails,
+)
+
+
+def load_role_overrides() -> Dict[str, str]:
+ """Load device role overrides from file."""
+ if not DEVICE_ROLES_FILE or not os.path.exists(DEVICE_ROLES_FILE):
+ return {}
+ try:
+ with open(DEVICE_ROLES_FILE, "r", encoding="utf-8") as handle:
+ data = json.load(handle)
+ except Exception:
+ return {}
+ if not isinstance(data, dict):
+ return {}
+ roles: Dict[str, str] = {}
+ for key, value in data.items():
+ if not isinstance(key, str) or not isinstance(value, str):
+ continue
+ role = _normalize_role(value)
+ if not role:
+ continue
+ roles[key.strip()] = role
+ return roles
+
+
+def serialize_state() -> Dict[str, Any]:
+ """Serialize current state for saving."""
+ return {
+ "version": 1,
+ "saved_at": __import__("time").time(),
+ "devices": {k: asdict(v) for k, v in devices.items()},
+ "trails": trails,
+ "seen_devices": seen_devices,
+ "device_names": device_names,
+ "device_roles": device_roles,
+ "device_role_sources": device_role_sources,
+ }
+
+
+def load_state() -> None:
+ """Load state from file."""
+ try:
+ if not os.path.exists(STATE_FILE):
+ return
+ with open(STATE_FILE, "r", encoding="utf-8") as handle:
+ data = json.load(handle)
+ except Exception as exc:
+ print(f"[state] failed to load {STATE_FILE}: {exc}")
+ return
+
+ # Load devices
+ raw_devices = data.get("devices") or {}
+ loaded_devices: Dict[str, DeviceState] = {}
+ dropped_ids: Set[str] = set()
+ for key, value in raw_devices.items():
+ if not isinstance(value, dict):
+ continue
+ try:
+ device_state = DeviceState(**value)
+ except Exception:
+ continue
+ if _coords_are_zero(device_state.lat, device_state.lon) or not within_map_radius(device_state.lat, device_state.lon):
+ dropped_ids.add(str(key))
+ continue
+ loaded_devices[key] = device_state
+
+ devices.clear()
+ devices.update(loaded_devices)
+
+ # Load trails
+ trails.clear()
+ trails.update(data.get("trails") or {})
+ seen_devices.clear()
+ seen_devices.update(data.get("seen_devices") or {})
+
+ # Clean trails
+ cleaned_trails: Dict[str, list] = {}
+ trails_dirty = False
+ for device_id, trail in trails.items():
+ if not isinstance(trail, list):
+ continue
+ filtered: list = []
+ for entry in trail:
+ if not isinstance(entry, (list, tuple)) or len(entry) < 2:
+ continue
+ lat = entry[0]
+ lon = entry[1]
+ try:
+ lat_val = float(lat)
+ lon_val = float(lon)
+ except (TypeError, ValueError):
+ continue
+ if _coords_are_zero(lat_val, lon_val) or not within_map_radius(lat_val, lon_val):
+ trails_dirty = True
+ continue
+ filtered.append(list(entry))
+ if filtered:
+ cleaned_trails[device_id] = filtered
+ else:
+ trails_dirty = True
+
+ trails.clear()
+ trails.update(cleaned_trails)
+
+ # Disable trails if TRAIL_LEN <= 0
+ if TRAIL_LEN <= 0 and trails:
+ trails.clear()
+ trails_dirty = True
+
+ # Clean up dropped devices
+ if dropped_ids:
+ for device_id in dropped_ids:
+ trails.pop(device_id, None)
+ seen_devices.pop(device_id, None)
+ trails_dirty = True
+
+ if trails_dirty:
+ state.state_dirty = True
+
+ # Load device names
+ raw_names = data.get("device_names") or {}
+ if isinstance(raw_names, dict):
+ device_names.clear()
+ device_names.update({str(k): str(v) for k, v in raw_names.items() if str(v).strip()})
+ else:
+ device_names.clear()
+ if dropped_ids:
+ for device_id in dropped_ids:
+ device_names.pop(device_id, None)
+
+ # Load role sources
+ raw_role_sources = data.get("device_role_sources") or {}
+ if isinstance(raw_role_sources, dict):
+ device_role_sources.clear()
+ device_role_sources.update({str(k): str(v) for k, v in raw_role_sources.items() if str(v).strip()})
+ else:
+ device_role_sources.clear()
+ if dropped_ids:
+ for device_id in dropped_ids:
+ device_role_sources.pop(device_id, None)
+
+ # Load device roles
+ raw_roles = data.get("device_roles") or {}
+ device_roles.clear()
+ if isinstance(raw_roles, dict):
+ for key, value in raw_roles.items():
+ if dropped_ids and str(key) in dropped_ids:
+ continue
+ role_value = str(value).strip() if isinstance(value, str) else ""
+ if not role_value:
+ continue
+ source = device_role_sources.get(str(key))
+ if source in ("explicit", "override"):
+ device_roles[str(key)] = role_value
+
+ # Apply role overrides
+ role_overrides = load_role_overrides()
+ if role_overrides:
+ for device_id in role_overrides:
+ device_role_sources[device_id] = "override"
+ device_roles.update(role_overrides)
+ if dropped_ids:
+ for device_id in dropped_ids:
+ device_roles.pop(device_id, None)
+
+ # Rebuild hash map
+ _rebuild_node_hash_map()
+
+ # Apply names and roles to device states
+ for device_id, device_state in devices.items():
+ if not device_state.name and device_id in device_names:
+ device_state.name = device_names[device_id]
+ role_value = device_roles.get(device_id)
+ device_state.role = role_value if role_value else None
+
+
+async def state_saver() -> None:
+ """Periodically save state to file."""
+ while True:
+ if state.state_dirty:
+ try:
+ os.makedirs(STATE_DIR, exist_ok=True)
+ tmp_path = f"{STATE_FILE}.tmp"
+ with open(tmp_path, "w", encoding="utf-8") as handle:
+ json.dump(serialize_state(), handle)
+ os.replace(tmp_path, STATE_FILE)
+ state.state_dirty = False
+ except Exception as exc:
+ print(f"[state] failed to save {STATE_FILE}: {exc}")
+ await asyncio.sleep(max(1.0, STATE_SAVE_INTERVAL))
diff --git a/backend/services/reaper.py b/backend/services/reaper.py
new file mode 100644
index 0000000..2e8a9b8
--- /dev/null
+++ b/backend/services/reaper.py
@@ -0,0 +1,131 @@
+"""
+Reaper service for cleaning up stale devices, routes, and history.
+"""
+
+import asyncio
+import json
+import time
+
+import state
+from config import (
+ DEVICE_TTL_SECONDS,
+ HEAT_TTL_SECONDS,
+ MESSAGE_ORIGIN_TTL_SECONDS,
+)
+from decoder import _coords_are_zero, _rebuild_node_hash_map
+from history import _prune_route_history
+from state import (
+ devices,
+ heat_events,
+ message_origins,
+ routes,
+ seen_devices,
+ trails,
+)
+
+
+async def reaper() -> None:
+ """
+ Periodically clean up stale data.
+
+ - Removes devices that haven't been seen within DEVICE_TTL_SECONDS
+ - Removes expired routes
+ - Prunes heat events
+ - Cleans up message origin cache
+ """
+ from routes.websocket import get_clients
+
+ while True:
+ now = time.time()
+ clients = get_clients()
+
+ # Clean up stale devices
+ if DEVICE_TTL_SECONDS > 0:
+ stale = [dev_id for dev_id, st in list(devices.items()) if now - st.ts > DEVICE_TTL_SECONDS]
+ if stale:
+ payload = {"type": "stale", "device_ids": stale}
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+
+ for dev_id in stale:
+ devices.pop(dev_id, None)
+ trails.pop(dev_id, None)
+ state.state_dirty = True
+ _rebuild_node_hash_map()
+
+ # Clean up routes with zero coordinates
+ if routes:
+ bad_routes = []
+ for route_id, route in list(routes.items()):
+ points = route.get("points") if isinstance(route, dict) else None
+ if not isinstance(points, list):
+ continue
+ if any(_coords_are_zero(p[0], p[1]) for p in points if isinstance(p, list) and len(p) >= 2):
+ bad_routes.append(route_id)
+ if bad_routes:
+ payload = {"type": "route_remove", "route_ids": bad_routes}
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+ for route_id in bad_routes:
+ routes.pop(route_id, None)
+
+ # Clean up expired routes
+ stale_routes = [route_id for route_id, route in list(routes.items()) if now > route.get("expires_at", 0)]
+ if stale_routes:
+ payload = {"type": "route_remove", "route_ids": stale_routes}
+ dead = []
+ for ws in list(clients):
+ try:
+ await ws.send_text(json.dumps(payload))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+ for route_id in stale_routes:
+ routes.pop(route_id, None)
+
+ # Prune route history
+ history_updates, history_removed = _prune_route_history()
+ if history_updates or history_removed:
+ dead = []
+ for ws in list(clients):
+ try:
+ if history_updates:
+ await ws.send_text(json.dumps({"type": "history_edges", "edges": history_updates}))
+ if history_removed:
+ await ws.send_text(json.dumps({"type": "history_edges_remove", "edge_ids": history_removed}))
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ clients.discard(ws)
+
+ # Clean up old heat events
+ if HEAT_TTL_SECONDS > 0 and heat_events:
+ cutoff = now - HEAT_TTL_SECONDS
+ heat_events[:] = [entry for entry in heat_events if entry.get("ts", 0) >= cutoff]
+
+ # Clean up message origin cache
+ if message_origins:
+ for msg_hash, info in list(message_origins.items()):
+ if now - info.get("ts", 0) > MESSAGE_ORIGIN_TTL_SECONDS:
+ message_origins.pop(msg_hash, None)
+
+ # Clean up old seen_devices entries
+ prune_after = max(DEVICE_TTL_SECONDS * 3, 900) if DEVICE_TTL_SECONDS > 0 else 86400
+ for dev_id, last in list(seen_devices.items()):
+ if now - last > prune_after:
+ seen_devices.pop(dev_id, None)
+
+ await asyncio.sleep(5)
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..387facd
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,76 @@
+[project]
+name = "mesh-live-map"
+version = "1.0.2"
+description = "Real-time visualization platform for MeshCore network traffic"
+readme = "README.md"
+requires-python = ">=3.11"
+license = { text = "GPL-3.0" }
+
+dependencies = [
+ "fastapi>=0.115.0",
+ "uvicorn>=0.34.0",
+ "paho-mqtt>=2.1.0",
+ "httpx>=0.27.0",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0.0",
+ "pytest-asyncio>=0.23.0",
+ "pytest-cov>=4.1.0",
+ "ruff>=0.4.0",
+ "httpx>=0.27.0",
+]
+
+[build-system]
+requires = ["setuptools>=61.0"]
+build-backend = "setuptools.build_meta"
+
+[tool.ruff]
+target-version = "py311"
+line-length = 120
+indent-width = 2
+
+[tool.ruff.lint]
+select = [
+ "E", # pycodestyle errors
+ "W", # pycodestyle warnings
+ "F", # pyflakes
+ "I", # isort
+ "B", # flake8-bugbear
+ "C4", # flake8-comprehensions
+ "UP", # pyupgrade
+ "SIM", # flake8-simplify
+]
+ignore = [
+ "E501", # line too long (handled by formatter)
+ "B008", # do not perform function calls in argument defaults
+ "B905", # zip without strict= (python 3.10+)
+ "SIM108", # use ternary instead of if-else (readability)
+]
+
+[tool.ruff.lint.isort]
+known-first-party = ["backend"]
+
+[tool.ruff.format]
+quote-style = "double"
+indent-style = "space"
+skip-magic-trailing-comma = false
+
+[tool.pytest.ini_options]
+testpaths = ["tests"]
+asyncio_mode = "auto"
+python_files = ["test_*.py"]
+python_functions = ["test_*"]
+addopts = "-v --tb=short"
+
+[tool.coverage.run]
+source = ["backend"]
+omit = ["tests/*", "backend/static/*"]
+
+[tool.coverage.report]
+exclude_lines = [
+ "pragma: no cover",
+ "if __name__ == .__main__.:",
+ "raise NotImplementedError",
+]