meshcore-mqtt-live-map/backend/history.py
2026-04-09 22:47:51 -04:00

495 lines
15 KiB
Python

import asyncio
import json
import os
import time
from typing import Any, Dict, List, Optional, Set, Tuple
import state
from boundary import within_map_boundary
from config import (
HISTORY_EDGE_SAMPLE_LIMIT,
ROUTE_HISTORY_ALLOWED_MODES_SET,
ROUTE_HISTORY_COMPACT_INTERVAL,
ROUTE_HISTORY_ENABLED,
ROUTE_HISTORY_FILE,
ROUTE_HISTORY_HOURS,
ROUTE_HISTORY_MAX_SEGMENTS,
ROUTE_HISTORY_PAYLOAD_TYPES,
)
from decoder import _coords_are_zero
ROUTE_HISTORY_PAYLOAD_TYPES_SET: Set[int] = set()
for _part in ROUTE_HISTORY_PAYLOAD_TYPES.split(","):
_part = _part.strip()
if not _part:
continue
try:
ROUTE_HISTORY_PAYLOAD_TYPES_SET.add(int(_part))
except ValueError:
pass
PEER_HISTORY_BUCKET_SECONDS = 300
def _peer_history_cutoff(now: Optional[float] = None) -> float:
current = time.time() if now is None else float(now)
return current - (ROUTE_HISTORY_HOURS * 3600)
def _peer_history_bucket_start(ts: float) -> int:
return int(float(ts) // PEER_HISTORY_BUCKET_SECONDS) * PEER_HISTORY_BUCKET_SECONDS
def _peer_history_pair_key(a_id: str, b_id: str) -> str:
return f"{a_id}|{b_id}"
def _record_peer_history_segment(a_id: Any, b_id: Any, ts: float) -> bool:
if not ROUTE_HISTORY_ENABLED or ROUTE_HISTORY_HOURS <= 0:
return False
if not isinstance(a_id, str) or not a_id.strip():
return False
if not isinstance(b_id, str) or not b_id.strip():
return False
a_id = a_id.strip()
b_id = b_id.strip()
if a_id == b_id:
return False
bucket_start = _peer_history_bucket_start(ts)
pair_key = _peer_history_pair_key(a_id, b_id)
entry = state.peer_history_pairs.get(pair_key)
if not entry:
entry = {
"a_id": a_id,
"b_id": b_id,
"buckets": {},
"last_ts": float(ts),
}
state.peer_history_pairs[pair_key] = entry
buckets = entry.get("buckets")
if not isinstance(buckets, dict):
buckets = {}
entry["buckets"] = buckets
bucket_key = str(bucket_start)
buckets[bucket_key] = int(buckets.get(bucket_key, 0)) + 1
entry["last_ts"] = max(float(entry.get("last_ts", 0.0)), float(ts))
return True
def _prune_peer_history(now: Optional[float] = None) -> bool:
if not state.peer_history_pairs:
return False
cutoff = _peer_history_cutoff(now)
changed = False
for pair_key in list(state.peer_history_pairs.keys()):
entry = state.peer_history_pairs.get(pair_key)
if not isinstance(entry, dict):
state.peer_history_pairs.pop(pair_key, None)
changed = True
continue
buckets = entry.get("buckets")
if not isinstance(buckets, dict):
state.peer_history_pairs.pop(pair_key, None)
changed = True
continue
new_buckets: Dict[str, int] = {}
last_ts = 0.0
for bucket_key, count in buckets.items():
try:
bucket_start = float(bucket_key)
bucket_count = int(count)
except (TypeError, ValueError):
changed = True
continue
if bucket_count <= 0:
changed = True
continue
bucket_end = bucket_start + PEER_HISTORY_BUCKET_SECONDS
if bucket_end < cutoff:
changed = True
continue
new_buckets[str(int(bucket_start))] = bucket_count
last_ts = max(last_ts, bucket_end)
if not new_buckets:
state.peer_history_pairs.pop(pair_key, None)
changed = True
continue
if len(new_buckets) != len(buckets):
changed = True
entry["buckets"] = new_buckets
entry["last_ts"] = max(last_ts, float(entry.get("last_ts", 0.0)))
return changed
def _rebuild_peer_history_from_segments() -> bool:
state.peer_history_pairs.clear()
changed = False
for entry in state.route_history_segments:
if not isinstance(entry, dict):
continue
a_id = entry.get("a_id")
b_id = entry.get("b_id")
ts = entry.get("ts")
if not isinstance(ts, (int, float)):
continue
if _record_peer_history_segment(a_id, b_id, float(ts)):
changed = True
_prune_peer_history()
return changed
def _history_payload_allowed(payload_type: Optional[int]) -> bool:
if not ROUTE_HISTORY_ENABLED or ROUTE_HISTORY_HOURS <= 0:
return False
if not ROUTE_HISTORY_PAYLOAD_TYPES_SET:
return True
if payload_type is None:
return False
return payload_type in ROUTE_HISTORY_PAYLOAD_TYPES_SET
def _within_map_radius(lat: float, lon: float) -> bool:
return within_map_boundary(lat, lon)
def _normalize_history_point(point: Any) -> Optional[Tuple[float, float]]:
if not isinstance(point, (list, tuple)) or len(point) < 2:
return None
try:
lat_val = float(point[0])
lon_val = float(point[1])
except (TypeError, ValueError):
return None
if _coords_are_zero(lat_val, lon_val):
return None
if not _within_map_radius(lat_val, lon_val):
return None
return (round(lat_val, 6), round(lon_val, 6))
def _history_edge_key(
a: Tuple[float, float], b: Tuple[float, float]
) -> Tuple[str, Tuple[float, float], Tuple[float, float]]:
if a <= b:
key = f"{a[0]:.6f},{a[1]:.6f}|{b[0]:.6f},{b[1]:.6f}"
return key, a, b
key = f"{b[0]:.6f},{b[1]:.6f}|{a[0]:.6f},{a[1]:.6f}"
return key, b, a
def _history_sample_from_route(route: Dict[str, Any],
ts: float) -> Dict[str, Any]:
return {
"ts": float(ts),
"message_hash": route.get("message_hash"),
"payload_type": route.get("payload_type"),
"origin_id": route.get("origin_id"),
"receiver_id": route.get("receiver_id"),
"route_mode": route.get("route_mode"),
"topic": route.get("topic"),
}
def _update_history_edge_recent(
edge: Dict[str, Any], sample: Dict[str, Any]
) -> None:
if not edge or not sample:
return
recent = edge.get("recent")
if not isinstance(recent, list):
recent = []
recent.append(sample)
recent.sort(key=lambda s: s.get("ts", 0), reverse=True)
if len(recent) > HISTORY_EDGE_SAMPLE_LIMIT:
recent = recent[:HISTORY_EDGE_SAMPLE_LIMIT]
edge["recent"] = recent
def _record_route_history(
route: Dict[str, Any]
) -> Tuple[List[Dict[str, Any]], List[str]]:
if not ROUTE_HISTORY_ENABLED:
return [], []
if ROUTE_HISTORY_ALLOWED_MODES_SET:
route_mode = route.get("route_mode")
if not route_mode or route_mode not in ROUTE_HISTORY_ALLOWED_MODES_SET:
return [], []
payload_type = route.get("payload_type")
if not _history_payload_allowed(payload_type):
return [], []
points = route.get("points")
point_ids = route.get("point_ids"
) if isinstance(route.get("point_ids"), list) else None
if (
(not isinstance(points, list) or len(points) < 2) and
(not point_ids or len(point_ids) < 2)
):
return [], []
ts = route.get("ts") or time.time()
sample = _history_sample_from_route(route, ts)
updated_keys: Set[str] = set()
new_entries: List[Dict[str, Any]] = []
peer_recorded = False
if point_ids and len(point_ids) >= 2:
for idx in range(len(point_ids) - 1):
if _record_peer_history_segment(
point_ids[idx], point_ids[idx + 1], float(ts)
):
peer_recorded = True
if not isinstance(points, list) or len(points) < 2:
if peer_recorded:
_prune_peer_history(ts)
state.state_dirty = True
return [], []
for idx in range(len(points) - 1):
a = _normalize_history_point(points[idx])
b = _normalize_history_point(points[idx + 1])
if not a or not b:
continue
a_id = None
b_id = None
if point_ids and idx < len(point_ids) - 1:
a_id = point_ids[idx]
b_id = point_ids[idx + 1]
key, first, second = _history_edge_key(a, b)
new_entries.append(
{
"ts": float(ts),
"a": [first[0], first[1]],
"b": [second[0], second[1]],
"a_id": a_id,
"b_id": b_id,
"message_hash": sample.get("message_hash"),
"payload_type": sample.get("payload_type"),
"origin_id": sample.get("origin_id"),
"receiver_id": sample.get("receiver_id"),
"route_mode": sample.get("route_mode"),
"topic": sample.get("topic"),
}
)
edge = state.route_history_edges.get(key)
if not edge:
edge = {
"id": key,
"a": [first[0], first[1]],
"b": [second[0], second[1]],
"count": 0,
"last_ts": float(ts),
}
state.route_history_edges[key] = edge
edge["count"] = int(edge.get("count", 0)) + 1
edge["last_ts"] = max(edge.get("last_ts", float(ts)), float(ts))
_update_history_edge_recent(edge, sample)
updated_keys.add(key)
if not new_entries:
if peer_recorded:
_prune_peer_history(ts)
state.state_dirty = True
return [], []
state.route_history_segments.extend(new_entries)
_append_route_history_file(new_entries)
if new_entries or peer_recorded:
_prune_peer_history(ts)
state.state_dirty = True
updates = [
state.route_history_edges[key]
for key in updated_keys if key in state.route_history_edges
]
removed: List[str] = []
if ROUTE_HISTORY_MAX_SEGMENTS > 0 and len(
state.route_history_segments
) > ROUTE_HISTORY_MAX_SEGMENTS:
extra_updates, extra_removed = _prune_route_history(force_limit=True)
updates.extend(extra_updates)
removed.extend(extra_removed)
return updates, removed
def _prune_route_history(
force_limit: bool = False
) -> Tuple[List[Dict[str, Any]], List[str]]:
if not ROUTE_HISTORY_ENABLED or not state.route_history_segments:
return [], []
updated: Dict[str, Dict[str, Any]] = {}
removed: List[str] = []
now = time.time()
cutoff = now - (ROUTE_HISTORY_HOURS * 3600)
while state.route_history_segments:
entry = state.route_history_segments[0]
if not isinstance(entry, dict):
state.route_history_segments.popleft()
continue
ts = entry.get("ts")
if ts is None:
state.route_history_segments.popleft()
continue
if not force_limit and ts >= cutoff:
break
if force_limit and ROUTE_HISTORY_MAX_SEGMENTS > 0 and len(
state.route_history_segments
) <= ROUTE_HISTORY_MAX_SEGMENTS:
break
state.route_history_segments.popleft()
a = entry.get("a")
b = entry.get("b")
a_point = _normalize_history_point(a) if a else None
b_point = _normalize_history_point(b) if b else None
if not a_point or not b_point:
state.route_history_compact = True
continue
key, _, _ = _history_edge_key(a_point, b_point)
edge = state.route_history_edges.get(key)
if not edge:
state.route_history_compact = True
continue
edge["count"] = int(edge.get("count", 0)) - 1
recent = edge.get("recent")
if isinstance(recent, list):
edge["recent"] = [s for s in recent if (s.get("ts") or 0) >= cutoff]
if not edge["recent"]:
edge.pop("recent", None)
if edge["count"] <= 0:
state.route_history_edges.pop(key, None)
removed.append(key)
else:
updated[key] = edge
state.route_history_compact = True
return list(updated.values()), removed
def _append_route_history_file(entries: List[Dict[str, Any]]) -> None:
if not ROUTE_HISTORY_ENABLED or not ROUTE_HISTORY_FILE:
return
if not entries:
return
try:
os.makedirs(os.path.dirname(ROUTE_HISTORY_FILE), exist_ok=True)
with open(ROUTE_HISTORY_FILE, "a", encoding="utf-8") as handle:
for entry in entries:
handle.write(json.dumps(entry) + "\n")
except Exception as exc:
print(f"[history] failed to append {ROUTE_HISTORY_FILE}: {exc}")
def _load_route_history() -> None:
if not ROUTE_HISTORY_ENABLED or not ROUTE_HISTORY_FILE:
return
if not os.path.exists(ROUTE_HISTORY_FILE):
return
cutoff = time.time() - (ROUTE_HISTORY_HOURS * 3600)
loaded_any = False
try:
with open(ROUTE_HISTORY_FILE, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
state.route_history_compact = True
continue
if not isinstance(entry, dict):
state.route_history_compact = True
continue
ts = entry.get("ts")
if not isinstance(ts, (int, float)) or ts < cutoff:
state.route_history_compact = True
continue
a_point = _normalize_history_point(entry.get("a"))
b_point = _normalize_history_point(entry.get("b"))
if not a_point or not b_point:
state.route_history_compact = True
continue
sample = {
"ts": float(ts),
"message_hash": entry.get("message_hash"),
"payload_type": entry.get("payload_type"),
"origin_id": entry.get("origin_id"),
"receiver_id": entry.get("receiver_id"),
"route_mode": entry.get("route_mode"),
"topic": entry.get("topic"),
}
key, first, second = _history_edge_key(a_point, b_point)
state.route_history_segments.append(
{
"ts": float(ts),
"a": [first[0], first[1]],
"b": [second[0], second[1]],
"a_id": entry.get("a_id"),
"b_id": entry.get("b_id"),
"message_hash": sample.get("message_hash"),
"payload_type": sample.get("payload_type"),
"origin_id": sample.get("origin_id"),
"receiver_id": sample.get("receiver_id"),
"route_mode": sample.get("route_mode"),
"topic": sample.get("topic"),
}
)
edge = state.route_history_edges.get(key)
if not edge:
edge = {
"id": key,
"a": [first[0], first[1]],
"b": [second[0], second[1]],
"count": 0,
"last_ts": float(ts),
}
state.route_history_edges[key] = edge
edge["count"] = int(edge.get("count", 0)) + 1
edge["last_ts"] = max(edge.get("last_ts", float(ts)), float(ts))
_update_history_edge_recent(edge, sample)
loaded_any = True
except Exception as exc:
print(f"[history] failed to load {ROUTE_HISTORY_FILE}: {exc}")
return
if not loaded_any:
return
if not state.peer_history_pairs:
_rebuild_peer_history_from_segments()
if ROUTE_HISTORY_MAX_SEGMENTS > 0 and len(
state.route_history_segments
) > ROUTE_HISTORY_MAX_SEGMENTS:
_prune_route_history(force_limit=True)
state.route_history_compact = True
async def _route_history_saver() -> None:
if not ROUTE_HISTORY_ENABLED or not ROUTE_HISTORY_FILE:
return
while True:
await asyncio.sleep(max(5.0, ROUTE_HISTORY_COMPACT_INTERVAL))
if not state.route_history_compact:
continue
now = time.time()
if now - state.route_history_last_compact < ROUTE_HISTORY_COMPACT_INTERVAL:
continue
try:
os.makedirs(os.path.dirname(ROUTE_HISTORY_FILE), exist_ok=True)
tmp_path = f"{ROUTE_HISTORY_FILE}.tmp"
with open(tmp_path, "w", encoding="utf-8") as handle:
for entry in state.route_history_segments:
if not isinstance(entry, dict):
continue
handle.write(json.dumps(entry) + "\n")
os.replace(tmp_path, ROUTE_HISTORY_FILE)
state.route_history_last_compact = now
state.route_history_compact = False
except Exception as exc:
print(f"[history] failed to compact {ROUTE_HISTORY_FILE}: {exc}")