meshcore-mqtt-live-map/backend/los.py
2026-04-16 14:05:25 -04:00

241 lines
6.9 KiB
Python

import json
import math
import time
import urllib.parse
import urllib.request
from typing import Any, Dict, List, Optional, Tuple
from config import (
ELEVATION_CACHE_TTL,
LOS_CURVATURE_ENABLED,
LOS_CURVATURE_FACTOR,
LOS_ELEVATION_URL,
LOS_PEAKS_MAX,
LOS_SAMPLE_MAX,
LOS_SAMPLE_MIN,
LOS_SAMPLE_STEP_METERS,
)
from state import elevation_cache
def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
radius = 6371000.0
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi / 2
)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return radius * c
def _elevation_cache_key(lat: float, lon: float) -> str:
return f"{lat:.5f},{lon:.5f}"
def _chunked(seq: List[Any], size: int) -> List[List[Any]]:
return [seq[i:i + size] for i in range(0, len(seq), size)]
def _fetch_elevations(
points: List[Tuple[float, float, float]]
) -> Tuple[Optional[List[float]], Optional[str]]:
now = time.time()
results: List[Optional[float]] = [None] * len(points)
missing: List[Tuple[int, float, float, str]] = []
for idx, (lat, lon, _) in enumerate(points):
key = _elevation_cache_key(lat, lon)
cached = elevation_cache.get(key)
if cached and now - cached[1] <= ELEVATION_CACHE_TTL:
results[idx] = cached[0]
else:
missing.append((idx, lat, lon, key))
if not missing:
if any(val is None for val in results):
return None, "elevation_fetch_failed: incomplete_cache"
return [float(val) for val in results], None
for chunk in _chunked(missing, 100):
locations = "|".join(f"{lat},{lon}" for _, lat, lon, _ in chunk)
query = urllib.parse.urlencode({"locations": locations})
url = f"{LOS_ELEVATION_URL}?{query}"
try:
with urllib.request.urlopen(url, timeout=6) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except Exception as exc:
return None, f"elevation_fetch_failed: {exc}"
if payload.get("status") not in (None, "OK"):
return None, f"elevation_fetch_failed: {payload.get('status')}"
elev_results = payload.get("results", [])
if len(elev_results) != len(chunk):
return None, "elevation_fetch_failed: unexpected_result_length"
for (idx, _, _, key), entry in zip(chunk, elev_results):
elev = entry.get("elevation")
if elev is None:
return None, "elevation_fetch_failed: missing_elevation"
elevation_cache[key] = (float(elev), now)
results[idx] = float(elev)
if any(val is None for val in results):
return None, "elevation_fetch_failed: incomplete_results"
return [float(val) for val in results], None
def _sample_los_points(lat1: float, lon1: float, lat2: float,
lon2: float) -> List[Tuple[float, float, float]]:
distance_m = _haversine_m(lat1, lon1, lat2, lon2)
if distance_m <= 0:
return [(lat1, lon1, 0.0), (lat2, lon2, 1.0)]
samples = int(distance_m / max(1.0, LOS_SAMPLE_STEP_METERS)) + 1
samples = max(LOS_SAMPLE_MIN, min(LOS_SAMPLE_MAX, samples))
if samples < 2:
samples = 2
points: List[Tuple[float, float, float]] = []
for i in range(samples):
t = i / (samples - 1)
lat = lat1 + (lat2 - lat1) * t
lon = lon1 + (lon2 - lon1) * t
points.append((lat, lon, t))
return points
def _los_earth_radius_m() -> float:
return 6371000.0 * LOS_CURVATURE_FACTOR
def _los_earth_bulge_m(distance_m: float, t: float) -> float:
if not LOS_CURVATURE_ENABLED or distance_m <= 0:
return 0.0
d1 = distance_m * t
d2 = distance_m - d1
radius_m = _los_earth_radius_m()
if radius_m <= 0:
return 0.0
return (d1 * d2) / (2.0 * radius_m)
def _los_effective_elevations(
points: List[Tuple[float, float, float]],
elevations: List[float],
distance_m: float,
) -> List[float]:
adjusted = list(elevations)
for idx, point in enumerate(points):
adjusted[idx] = float(elevations[idx]) + _los_earth_bulge_m(
distance_m, point[2]
)
return adjusted
def _los_max_obstruction(
points: List[Tuple[float, float, float]], elevations: List[float],
start_idx: int, end_idx: int
) -> float:
if end_idx <= start_idx + 1:
return 0.0
start_t = points[start_idx][2]
end_t = points[end_idx][2]
if end_t <= start_t:
return 0.0
start_elev = elevations[start_idx]
end_elev = elevations[end_idx]
max_obstruction = 0.0
for idx in range(start_idx + 1, end_idx):
frac = (points[idx][2] - start_t) / (end_t - start_t)
line_elev = start_elev + (end_elev - start_elev) * frac
clearance = elevations[idx] - line_elev
if clearance > max_obstruction:
max_obstruction = clearance
return max_obstruction
def _find_los_suggestion(
points: List[Tuple[float, float, float]], elevations: List[float]
) -> Optional[Dict[str, Any]]:
if len(points) < 3:
return None
best_idx = None
best_score = None
best_clear = False
for idx in range(1, len(points) - 1):
obst_a = _los_max_obstruction(points, elevations, 0, idx)
obst_b = _los_max_obstruction(points, elevations, idx, len(points) - 1)
score = max(obst_a, obst_b)
clear = score <= 0.0
if clear and not best_clear:
best_idx = idx
best_score = score
best_clear = True
elif clear and best_clear:
if elevations[idx] > elevations[best_idx]:
best_idx = idx
best_score = score
elif not best_clear:
if best_score is None or score < best_score:
best_idx = idx
best_score = score
if best_idx is None:
return None
return {
"lat":
round(points[best_idx][0], 6),
"lon":
round(points[best_idx][1], 6),
"elevation_m":
round(float(elevations[best_idx]), 2),
"clear":
best_clear,
"max_obstruction_m":
round(float(best_score), 2) if best_score is not None else None,
}
def _find_los_peaks(
points: List[Tuple[float, float, float]],
elevations: List[float],
distance_m: float,
) -> List[Dict[str, Any]]:
if len(points) < 3:
return []
peak_indices = []
for idx in range(1, len(elevations) - 1):
elev = elevations[idx]
if elev >= elevations[idx - 1] and elev >= elevations[idx + 1]:
peak_indices.append(idx)
if not peak_indices:
try:
peak_indices = [
max(range(1,
len(elevations) - 1), key=lambda i: elevations[i])
]
except ValueError:
return []
peak_indices = sorted(
peak_indices, key=lambda i: elevations[i], reverse=True
)[:LOS_PEAKS_MAX]
peak_indices = sorted(peak_indices, key=lambda i: points[i][2])
peaks = []
for i, idx in enumerate(peak_indices, start=1):
t = points[idx][2]
peaks.append(
{
"index": i,
"lat": round(points[idx][0], 6),
"lon": round(points[idx][1], 6),
"elevation_m": round(float(elevations[idx]), 2),
"distance_m": round(distance_m * t, 2),
}
)
return peaks