meshcore-mqtt-live-map/backend/decoder.py
2026-04-17 10:57:10 -04:00

1662 lines
44 KiB
Python

import base64
import json
import os
import re
import select
import subprocess
import threading
import time
from typing import Any, Dict, List, Optional, Set, Tuple
from config import (
APP_DIR,
CHANNEL_SECRETS_FILE,
DECODE_WITH_NODE,
DIRECT_COORDS_ALLOW_ZERO,
DIRECT_COORDS_MODE,
DIRECT_COORDS_TOPIC_REGEX,
HEAT_TTL_SECONDS,
MQTT_ONLINE_TOPIC_SUFFIXES,
NODE_DECODE_TIMEOUT_SECONDS,
NODE_SCRIPT_PATH,
PAYLOAD_PREVIEW_MAX,
ROUTE_PATH_MAX_LEN,
ROUTE_MAX_HOP_DISTANCE,
ROUTE_INFRA_ONLY,
ROUTE_ALLOW_AMBIGUOUS_ONE_BYTE_FALLBACK,
ROUTE_PAYLOAD_TYPES,
)
from state import (
devices,
heat_events,
node_hash_candidates,
node_hash_collisions,
node_hash_to_device,
neighbor_edges,
seen_devices,
)
from los import _haversine_m
LATLON_KEYS_LAT = ("lat", "latitude")
LATLON_KEYS_LON = ("lon", "lng", "longitude")
# e.g. "lat 42.3601 lon -71.0589" or "lat=42.36 lon=-71.05"
RE_LAT_LON = re.compile(
r"\blat(?:itude)?\b\s*[:=]?\s*(-?\d+(?:\.\d+)?)\s*[, ]+\s*\b(?:lon|lng|longitude)\b\s*[:=]?\s*(-?\d+(?:\.\d+)?)",
re.IGNORECASE,
)
# e.g. "42.3601 -71.0589" (two floats)
RE_TWO_FLOATS = re.compile(r"(-?\d{1,2}\.\d+)\s*[,\s]+\s*(-?\d{1,3}\.\d+)")
BASE64_LIKE = re.compile(r"^[A-Za-z0-9+/]+={0,2}$")
NODE_HASH_RE = re.compile(r"^[0-9a-fA-F]+$")
NODE_HASH_LENGTHS = (2, 4, 6)
_node_ready_once = False
_node_unavailable_once = False
_node_worker_proc: Optional[subprocess.Popen[str]] = None
_node_worker_lock = threading.Lock()
_channel_secrets_cache: List[str] = []
_channel_secrets_mtime: Optional[float] = None
_channel_secrets_path: Optional[str] = None
def _load_channel_secrets() -> List[str]:
global _channel_secrets_cache, _channel_secrets_mtime, _channel_secrets_path
path = (CHANNEL_SECRETS_FILE or "").strip()
if not path or not os.path.exists(path):
_channel_secrets_cache = []
_channel_secrets_mtime = None
_channel_secrets_path = path or None
return []
try:
mtime = os.path.getmtime(path)
except OSError:
mtime = None
if (
_channel_secrets_path == path and
_channel_secrets_mtime is not None and
mtime == _channel_secrets_mtime
):
return list(_channel_secrets_cache)
try:
with open(path, "r", encoding="utf-8") as handle:
payload = json.load(handle)
except Exception as exc:
print(f"[decode] failed to load channel secrets file {path}: {exc}")
return list(_channel_secrets_cache)
values: List[Any]
if isinstance(payload, dict):
values = list(payload.values())
elif isinstance(payload, list):
values = list(payload)
else:
print(f"[decode] invalid channel secrets file format {path}")
return []
secrets: List[str] = []
seen: Set[str] = set()
for value in values:
if not isinstance(value, str):
continue
normalized = value.strip()
if not normalized:
continue
normalized = normalized.lower()
if not re.fullmatch(r"[0-9a-f]{32}", normalized):
continue
if normalized in seen:
continue
seen.add(normalized)
secrets.append(normalized)
_channel_secrets_cache = list(secrets)
_channel_secrets_mtime = mtime
_channel_secrets_path = path
return secrets
ROUTE_PAYLOAD_TYPES_SET: Set[int] = set()
for _part in ROUTE_PAYLOAD_TYPES.split(","):
_part = _part.strip()
if not _part:
continue
try:
ROUTE_PAYLOAD_TYPES_SET.add(int(_part))
except ValueError:
pass
LIKELY_PACKET_KEYS = (
"hex",
"raw",
"packet",
"packet_hex",
"frame",
"data",
"payload",
"mesh_packet",
"meshcore_packet",
"rx_packet",
"bytes",
"packet_bytes",
)
try:
DIRECT_COORDS_TOPIC_RE = re.compile(DIRECT_COORDS_TOPIC_REGEX, re.IGNORECASE)
except re.error:
DIRECT_COORDS_TOPIC_RE = None
def _valid_lat_lon(lat: float, lon: float) -> bool:
return -90.0 <= lat <= 90.0 and -180.0 <= lon <= 180.0
def _normalize_lat_lon(lat: Any, lon: Any) -> Optional[Tuple[float, float]]:
try:
latf = float(lat)
lonf = float(lon)
except Exception:
return None
if _valid_lat_lon(latf, lonf):
return latf, lonf
for scale in (1e7, 1e6, 1e5, 1e4):
lat2 = latf / scale
lon2 = lonf / scale
if _valid_lat_lon(lat2, lon2):
return lat2, lon2
return None
def _coords_are_zero(lat: Any, lon: Any) -> bool:
try:
lat_val = float(lat)
lon_val = float(lon)
except (TypeError, ValueError):
return False
return abs(lat_val) < 1e-6 and abs(lon_val) < 1e-6
def _find_lat_lon_in_json(obj: Any) -> Optional[Tuple[float, float]]:
"""
Recursively walk JSON objects/lists looking for lat/lon keys.
"""
if isinstance(obj, dict):
lat = None
lon = None
for k in LATLON_KEYS_LAT:
if k in obj:
lat = obj.get(k)
break
for k in LATLON_KEYS_LON:
if k in obj:
lon = obj.get(k)
break
if lat is not None and lon is not None:
normalized = _normalize_lat_lon(lat, lon)
if normalized:
return normalized
for v in obj.values():
found = _find_lat_lon_in_json(v)
if found:
return found
elif isinstance(obj, list):
for v in obj:
found = _find_lat_lon_in_json(v)
if found:
return found
return None
def _strings_from_json(obj: Any) -> List[str]:
"""
Collect all string leaves from a JSON-like structure.
"""
out: List[str] = []
if isinstance(obj, str):
out.append(obj)
elif isinstance(obj, dict):
for v in obj.values():
out.extend(_strings_from_json(v))
elif isinstance(obj, list):
for v in obj:
out.extend(_strings_from_json(v))
return out
def _find_lat_lon_in_text(text: str) -> Optional[Tuple[float, float]]:
"""
Try to extract coordinates from a text blob.
"""
m = RE_LAT_LON.search(text)
if m:
normalized = _normalize_lat_lon(m.group(1), m.group(2))
if normalized:
return normalized
for m2 in RE_TWO_FLOATS.finditer(text):
normalized = _normalize_lat_lon(m2.group(1), m2.group(2))
if normalized:
return normalized
return None
def _maybe_base64_decode_to_text(s: str) -> Optional[str]:
"""
Best-effort: if a string looks base64-ish, try decoding to UTF-8-ish text.
"""
s_stripped = s.strip()
if len(s_stripped) < 24:
return None
if not BASE64_LIKE.match(s_stripped):
return None
try:
raw = base64.b64decode(s_stripped, validate=False)
return raw.decode("utf-8", errors="ignore")
except Exception:
return None
def _looks_like_hex(s: str) -> bool:
s2 = s.strip()
if len(s2) < 20:
return False
if len(s2) % 2 != 0:
return False
return bool(re.fullmatch(r"[0-9a-fA-F]+", s2))
def _try_base64_to_hex(s: str) -> Optional[str]:
s2 = s.strip()
if len(s2) < 24:
return None
if not any(c in s2 for c in "+/="):
return None
try:
raw = base64.b64decode(s2, validate=False)
if len(raw) < 10:
return None
return raw.hex()
except Exception:
return None
def _is_probably_binary(data: bytes) -> bool:
if not data:
return False
printable = 0
for b in data[:200]:
if 32 <= b <= 126 or b in (9, 10, 13):
printable += 1
return printable / min(len(data), 200) < 0.6
def _safe_preview(data: bytes) -> str:
try:
text = data.decode("utf-8", errors="replace")
except Exception:
text = repr(data)
if len(text) > PAYLOAD_PREVIEW_MAX:
return text[:PAYLOAD_PREVIEW_MAX] + "..."
return text
def _normalize_node_hash(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, int):
if value < 0:
return None
if value <= 0xFF:
return f"{value:02X}"
if value <= 0xFFFF:
return f"{value:04X}"
if value <= 0xFFFFFF:
return f"{value:06X}"
return None
if isinstance(value, (bytes, bytearray)):
value = bytes(value).hex()
s = str(value).strip()
if s.lower().startswith("0x"):
s = s[2:]
s = s.replace(" ", "")
if not s:
return None
if len(s) % 2 == 1:
s = f"0{s}"
if len(s) not in NODE_HASH_LENGTHS or not NODE_HASH_RE.match(s):
return None
return s.upper()
def _node_hashes_from_device_id(device_id: str) -> List[str]:
if not device_id:
return []
value = str(device_id).strip()
if value.lower().startswith("0x"):
value = value[2:]
value = value.replace(" ", "")
if not value:
return []
out: List[str] = []
for length in NODE_HASH_LENGTHS:
if len(value) < length:
continue
key = _normalize_node_hash(value[:length])
if key and key not in out:
out.append(key)
return out
def _rebuild_node_hash_map() -> None:
candidates: Dict[str, List[str]] = {}
collisions: Set[str] = set()
for device_id in devices.keys():
node_hashes = _node_hashes_from_device_id(device_id)
if not node_hashes:
continue
for node_hash in node_hashes:
bucket = candidates.setdefault(node_hash, [])
if device_id not in bucket:
bucket.append(device_id)
mapping: Dict[str, str] = {}
for node_hash, ids in candidates.items():
if len(ids) == 1:
mapping[node_hash] = ids[0]
else:
collisions.add(node_hash)
node_hash_candidates.clear()
node_hash_candidates.update(candidates)
node_hash_collisions.clear()
node_hash_collisions.update(collisions)
node_hash_to_device.clear()
node_hash_to_device.update(mapping)
def _choose_closest_device(
node_hash: str, ref_lat: float, ref_lon: float, ts: float
) -> Optional[str]:
"""
If we have multiple candidates for a hash, pick the one physically closest
to (ref_lat, ref_lon). If only one, return it.
"""
candidates = node_hash_candidates.get(node_hash)
if not candidates:
return None
best_id = None
best_dist = None
for device_id in candidates:
state = devices.get(device_id)
if not state:
continue
# If infrastructure-only mode is active, only allow repeaters and rooms
if ROUTE_INFRA_ONLY and (
not state.role or state.role not in ("repeater", "room")
):
continue
# skip invalid coords
if _coords_are_zero(state.lat, state.lon):
continue
# ensure we have floats for calculation
try:
s_lat = float(state.lat)
s_lon = float(state.lon)
except (TypeError, ValueError):
continue
dist = _haversine_m(ref_lat, ref_lon, s_lat, s_lon)
# Explicitly ignore candidates that are too far, even if they are the "closest"
# because a hop > ROUTE_MAX_HOP_DISTANCE is physically unlikely/bogus.
if dist > (ROUTE_MAX_HOP_DISTANCE * 1000.0):
continue
if best_dist is None or dist < best_dist:
best_dist = dist
best_id = device_id
return best_id
def _choose_device_for_hash(node_hash: str, ts: float) -> Optional[str]:
candidates = node_hash_candidates.get(node_hash)
if not candidates:
return None
best_id = None
best_delta = None
for device_id in candidates:
state = devices.get(device_id)
if not state:
continue
# If infrastructure-only mode is active, only allow repeaters and rooms
if ROUTE_INFRA_ONLY and (
not state.role or state.role not in ("repeater", "room")
):
continue
if _coords_are_zero(state.lat, state.lon):
continue
last_seen = seen_devices.get(device_id) or state.ts or 0.0
try:
delta = abs(float(last_seen) - float(ts))
except (TypeError, ValueError):
delta = None
if delta is None:
continue
if best_delta is None or delta < best_delta:
best_delta = delta
best_id = device_id
return best_id
def _choose_neighbor_device(
prev_id: str,
candidates: List[str],
ref_lat: float,
ref_lon: float,
ts: float,
) -> Optional[str]:
edges = neighbor_edges.get(prev_id) if prev_id else None
if not edges:
return None
best_id = None
best_score = None
for device_id in candidates:
edge = edges.get(device_id)
if not edge:
continue
state = devices.get(device_id)
if not state:
continue
# If infrastructure-only mode is active, only allow repeaters and rooms
if ROUTE_INFRA_ONLY and (
not state.role or state.role not in ("repeater", "room")
):
continue
if _coords_are_zero(state.lat, state.lon):
continue
try:
s_lat = float(state.lat)
s_lon = float(state.lon)
except (TypeError, ValueError):
continue
dist = _haversine_m(ref_lat, ref_lon, s_lat, s_lon)
if dist > (ROUTE_MAX_HOP_DISTANCE * 1000.0):
continue
manual = bool(edge.get("manual"))
auto = bool(edge.get("auto"))
count = int(edge.get("count", 0) or 0)
last_seen = float(edge.get("last_seen", 0.0) or 0.0)
priority = 2 if manual else (1 if auto else 0)
score = (priority, count, last_seen)
if best_score is None or score > best_score:
best_score = score
best_id = device_id
return best_id
def _route_points_from_hashes(
path_hashes: List[Any],
origin_id: Optional[str],
receiver_id: Optional[str],
ts: float,
) -> Tuple[Optional[List[List[float]]], List[str], List[Optional[str]]]:
normalized: List[str] = []
for raw in path_hashes:
key = _normalize_node_hash(raw)
if key:
normalized.append(key)
if ROUTE_PATH_MAX_LEN > 0 and len(normalized) > ROUTE_PATH_MAX_LEN:
return None, [], []
receiver_hashes = (
set(_node_hashes_from_device_id(receiver_id)) if receiver_id else set()
)
origin_hashes = (
set(_node_hashes_from_device_id(origin_id)) if origin_id else set()
)
if normalized and receiver_hashes:
if normalized[0] in receiver_hashes and normalized[-1] not in receiver_hashes:
normalized.reverse()
elif normalized and origin_hashes:
if normalized[-1] in origin_hashes and normalized[0] not in origin_hashes:
normalized.reverse()
points: List[List[float]] = []
used_hashes: List[str] = []
point_ids: List[Optional[str]] = []
# We need a reference point to start "walking" the path spatially.
# Best bet is the origin, if known.
current_lat = None
current_lon = None
current_id: Optional[str] = None
if origin_id:
origin_state = devices.get(origin_id)
if origin_state and not _coords_are_zero(
origin_state.lat, origin_state.lon
):
try:
current_lat = float(origin_state.lat)
current_lon = float(origin_state.lon)
current_id = origin_id
except (TypeError, ValueError):
pass
# Build the path
for key in normalized:
device_id = None
candidates = node_hash_candidates.get(key) or []
ambiguous_single_byte = (
len(key) == 2 and len(candidates) > 1 and
not ROUTE_ALLOW_AMBIGUOUS_ONE_BYTE_FALLBACK
)
if current_id and current_lat is not None and current_lon is not None:
if len(candidates) > 1:
# For the first hop, prefer the closest candidate to the origin.
if not points and not ambiguous_single_byte:
device_id = _choose_closest_device(key, current_lat, current_lon, ts)
if not device_id:
neighbor_id = _choose_neighbor_device(
current_id,
candidates,
current_lat,
current_lon,
ts,
)
if neighbor_id:
device_id = neighbor_id
edge = neighbor_edges.get(current_id, {}).get(neighbor_id, {})
manual = " manual" if edge.get("manual") else ""
print(
f"[route] neighbor pick{manual} hash={key} {current_id[:8]} -> {neighbor_id[:8]}"
)
# If we have a location fix, try to find the "closest" candidate for this hash
if (
not device_id and
current_lat is not None and
current_lon is not None and
not ambiguous_single_byte
):
device_id = _choose_closest_device(key, current_lat, current_lon, ts)
if not device_id and not ambiguous_single_byte:
# Fallback to older time-based logic or just picking first valid
device_id = _choose_device_for_hash(key, ts)
if not device_id:
# fallback: just pick *any* mapping if available
device_id = node_hash_to_device.get(key)
if not device_id:
continue
state = devices.get(device_id)
if not state:
continue
if _coords_are_zero(state.lat, state.lon):
continue
try:
p_lat = float(state.lat)
p_lon = float(state.lon)
except (TypeError, ValueError):
continue
# Safety check: enforce max distance even for fallback selections
if current_lat is not None and current_lon is not None:
dist = _haversine_m(current_lat, current_lon, p_lat, p_lon)
if dist > (ROUTE_MAX_HOP_DISTANCE * 1000.0):
continue
point = [p_lat, p_lon]
# Update our "current" reference for the next hop
current_lat = p_lat
current_lon = p_lon
current_id = device_id
if points and point == points[-1]:
continue
points.append(point)
used_hashes.append(key)
point_ids.append(device_id)
# Prepend origin if missing
origin_point = None
if origin_id:
origin_state = devices.get(origin_id)
if origin_state and not _coords_are_zero(
origin_state.lat, origin_state.lon
):
# If infrastructure-only, only add infra nodes
if ROUTE_INFRA_ONLY and (
not origin_state.role or origin_state.role not in ("repeater", "room")
):
pass # skip
else:
try:
origin_point = [float(origin_state.lat), float(origin_state.lon)]
if not points or points[0] != origin_point:
points.insert(0, origin_point)
point_ids.insert(0, origin_id)
elif point_ids:
point_ids[0] = origin_id
except (TypeError, ValueError):
pass
# Append receiver if missing
receiver_point = None
if receiver_id:
receiver_state = devices.get(receiver_id)
if receiver_state and not _coords_are_zero(
receiver_state.lat, receiver_state.lon
):
# If infrastructure-only, only add infra nodes
if ROUTE_INFRA_ONLY and (
not receiver_state.role or
receiver_state.role not in ("repeater", "room")
):
pass # skip
else:
try:
receiver_point = [
float(receiver_state.lat),
float(receiver_state.lon),
]
if points and receiver_point != points[-1]:
dist = _haversine_m(points[-1][0], points[-1][1],
receiver_point[0], receiver_point[1])
if dist <= (ROUTE_MAX_HOP_DISTANCE * 1000.0):
points.append(receiver_point)
point_ids.append(receiver_id)
elif point_ids:
point_ids[-1] = receiver_id
except (TypeError, ValueError):
pass
if len(points) < 2:
return None, used_hashes, point_ids
return points, used_hashes, point_ids
def _route_points_from_device_ids(
origin_id: Optional[str], receiver_id: Optional[str]
) -> Optional[List[List[float]]]:
if not origin_id or not receiver_id or origin_id == receiver_id:
return None
origin_state = devices.get(origin_id)
receiver_state = devices.get(receiver_id)
if not origin_state or not receiver_state:
return None
# If infrastructure-only mode is active, only allow repeaters and rooms
if ROUTE_INFRA_ONLY and (
(not origin_state.role or origin_state.role not in ("repeater", "room")) or
(
not receiver_state.role or
receiver_state.role not in ("repeater", "room")
)
):
return None
if _coords_are_zero(origin_state.lat, origin_state.lon) or _coords_are_zero(
receiver_state.lat, receiver_state.lon
):
return None
points = [
[origin_state.lat, origin_state.lon],
[receiver_state.lat, receiver_state.lon],
]
dist = _haversine_m(points[0][0], points[0][1], points[1][0], points[1][1])
if dist > (ROUTE_MAX_HOP_DISTANCE * 1000.0):
return None
if points[0] == points[1]:
return None
return points
def _append_heat_points(
points: List[List[float]], ts: float, payload_type: Optional[int]
) -> None:
if HEAT_TTL_SECONDS <= 0:
return
for point in points:
heat_events.append(
{
"lat": float(point[0]),
"lon": float(point[1]),
"ts": float(ts),
"weight": 0.7,
}
)
def _serialize_heat_events() -> List[List[float]]:
if HEAT_TTL_SECONDS <= 0:
return []
cutoff = time.time() - HEAT_TTL_SECONDS
return [
[
entry.get("lat"),
entry.get("lon"),
entry.get("ts"),
entry.get("weight", 0.7)
] for entry in heat_events if entry.get("ts", 0) >= cutoff
]
def _extract_device_name(obj: Any, topic: str) -> Optional[str]:
if not isinstance(obj, dict):
return None
for key in (
"name",
"device_name",
"deviceName",
"node_name",
"nodeName",
"display_name",
"displayName",
"callsign",
"label",
):
value = obj.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
if topic.endswith("/status"):
origin = obj.get("origin")
if isinstance(origin, str) and origin.strip():
return origin.strip()
return None
def _normalize_role(value: str) -> Optional[str]:
s = value.strip().lower()
if not s:
return None
if "repeater" in s or s in ("repeat", "relay"):
return "repeater"
if (
"companion" in s or
"chat node" in s or
"chatnode" in s or
"chat-node" in s or
s == "chat"
):
return "companion"
if "room server" in s or "roomserver" in s or s == "room":
return "room"
return None
def _normalize_role_code(value: Any) -> Optional[str]:
try:
num = int(str(value).strip())
except (TypeError, ValueError):
return None
if num == 2:
return "repeater"
if num == 3:
return "room"
if num == 1:
return "companion"
return None
def _extract_role_from_hint(value: Any) -> Optional[str]:
if isinstance(value, str):
role = _normalize_role(value)
if role:
return role
return _normalize_role_code(value)
if isinstance(value, (int, float)) and not isinstance(value, bool):
return _normalize_role_code(value)
return None
def _extract_device_role(obj: Any, topic: str) -> Optional[str]:
del topic
role_keys = (
"role",
"device_role",
"deviceRole",
"node_role",
"nodeRole",
"node_type",
"nodeType",
"device_type",
"deviceType",
"class",
"profile",
)
def walk(value: Any) -> Optional[str]:
if isinstance(value, dict):
for key in role_keys:
if key in value:
role = _extract_role_from_hint(value.get(key))
if role:
return role
for child in value.values():
role = walk(child)
if role:
return role
elif isinstance(value, list):
for child in value:
role = walk(child)
if role:
return role
return None
return walk(obj)
def _apply_meta_role(
debug: Dict[str, Any], meta: Optional[Dict[str, Any]]
) -> None:
if debug.get("device_role"):
return
if not isinstance(meta, dict):
return
for key in ("role", "deviceRoleName", "deviceRole"):
if key not in meta:
continue
normalized = _extract_role_from_hint(meta.get(key))
if normalized:
debug["device_role"] = normalized
return
def _has_location_hints(obj: Any) -> bool:
if isinstance(obj, dict):
for k, v in obj.items():
key = str(k).lower()
if key in (
"location",
"gps",
"position",
"coords",
"coordinate",
"geo",
"geolocation",
"latlon",
):
return True
if isinstance(v, (dict, list)) and _has_location_hints(v):
return True
elif isinstance(obj, list):
for v in obj:
if _has_location_hints(v):
return True
return False
def _topic_marks_online(topic: str) -> bool:
if not MQTT_ONLINE_TOPIC_SUFFIXES:
return False
return any(topic.endswith(suffix) for suffix in MQTT_ONLINE_TOPIC_SUFFIXES)
def _direct_coords_allowed(topic: str, obj: Any) -> bool:
if DIRECT_COORDS_MODE == "off":
return False
if DIRECT_COORDS_MODE == "any":
return True
if DIRECT_COORDS_MODE in ("topic", "strict"):
if DIRECT_COORDS_TOPIC_RE and DIRECT_COORDS_TOPIC_RE.search(topic):
return True
if DIRECT_COORDS_MODE == "topic":
return False
return _has_location_hints(obj)
return True
# =========================
# MeshCore decoder via Node
# =========================
def _ensure_node_decoder() -> bool:
global _node_ready_once, _node_unavailable_once
if not DECODE_WITH_NODE:
return False
if _node_ready_once:
return True
if _node_unavailable_once:
return False
try:
subprocess.run(
["node", "-v"],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except Exception:
_node_unavailable_once = True
print("[decode] node not found in container")
return False
try:
subprocess.run(
[
"node",
"--input-type=module",
"-e",
"import('@michaelhart/meshcore-decoder')",
],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
cwd=APP_DIR,
)
except Exception:
_node_unavailable_once = True
print("[decode] @michaelhart/meshcore-decoder not available")
return False
script = """#!/usr/bin/env node
import { MeshCorePacketDecoder, getDeviceRoleName } from '@michaelhart/meshcore-decoder';
import readline from 'node:readline';
let keyStore = undefined;
let keySignature = '';
function normalizePubkey(value) {
if (typeof value !== 'string') return null;
const normalized = value.trim().toUpperCase();
return /^[0-9A-F]{64}$/.test(normalized) ? normalized : null;
}
function pickAdvertPayload(decodedPacket) {
const payloadDecoded = decodedPacket?.payload?.decoded ?? null;
const payloadRoot = decodedPacket?.payload ?? null;
if (payloadDecoded?.type === 4) return payloadDecoded;
if (payloadRoot?.type === 4) return payloadRoot;
return null;
}
function pickLocation(decodedPacket) {
const advert = pickAdvertPayload(decodedPacket);
const appData = advert?.appData ?? advert?.appdata ?? null;
const loc = appData?.location ?? advert?.location ?? null;
const lat = loc?.latitude ?? loc?.lat ?? null;
const lon = loc?.longitude ?? loc?.lon ?? null;
const name = appData?.name ?? advert?.name ?? null;
const pubkey =
normalizePubkey(advert?.publicKey) ??
normalizePubkey(advert?.publickey) ??
null;
return { lat, lon, name, pubkey };
}
function pickRole(decodedPacket) {
const payloadDecoded = decodedPacket?.payload?.decoded ?? null;
const payloadRoot = decodedPacket?.payload ?? null;
const appData = payloadDecoded?.appData ?? payloadDecoded?.appdata ?? payloadRoot?.appData ?? payloadRoot?.appdata ?? null;
const candidates = [
appData?.role,
appData?.deviceRole,
appData?.nodeRole,
appData?.deviceType,
appData?.nodeType,
appData?.class,
appData?.profile,
payloadDecoded?.role,
payloadDecoded?.deviceRole,
payloadDecoded?.nodeRole,
payloadDecoded?.deviceType,
payloadDecoded?.nodeType,
payloadDecoded?.class,
payloadDecoded?.profile,
payloadRoot?.role,
payloadRoot?.deviceRole,
payloadRoot?.nodeRole,
payloadRoot?.deviceType,
payloadRoot?.nodeType,
payloadRoot?.class,
payloadRoot?.profile,
];
for (const value of candidates) {
if (typeof value === 'string' && value.trim()) return value.trim();
}
return null;
}
function syncKeyStore(channelSecrets) {
const normalized = Array.isArray(channelSecrets)
? channelSecrets
.filter((value) => typeof value === 'string' && /^[0-9a-f]{32}$/i.test(value))
.map((value) => value.toLowerCase())
.sort()
: [];
const nextSignature = normalized.join(',');
if (nextSignature === keySignature) return;
keySignature = nextSignature;
keyStore = normalized.length
? MeshCorePacketDecoder.createKeyStore({ channelSecrets: normalized })
: undefined;
}
function decodeHex(hex, channelSecrets) {
syncKeyStore(channelSecrets);
const decoded = MeshCorePacketDecoder.decode(hex, keyStore ? { keyStore } : undefined);
const loc = pickLocation(decoded);
const payloadDecoded = decoded?.payload?.decoded ?? decoded?.payload ?? null;
const payloadRoot = decoded?.payload ?? null;
const appData = payloadDecoded?.appData ?? payloadDecoded?.appdata ?? payloadRoot?.appData ?? payloadRoot?.appdata ?? null;
const senderName =
payloadDecoded?.decrypted?.sender ??
payloadRoot?.decrypted?.sender ??
null;
const deviceRole = appData?.deviceRole ?? payloadDecoded?.deviceRole ?? payloadRoot?.deviceRole ?? null;
const deviceRoleName = typeof deviceRole === 'number' ? getDeviceRoleName(deviceRole) : null;
const role = pickRole(decoded) || deviceRoleName;
const payloadKeys = payloadDecoded && typeof payloadDecoded === 'object' ? Object.keys(payloadDecoded) : null;
const appDataKeys = appData && typeof appData === 'object' ? Object.keys(appData) : null;
const pathHashes = payloadDecoded?.pathHashes ?? null;
const snrValues = payloadDecoded?.snrValues ?? null;
const path = decoded?.path ?? null;
const pathLength = decoded?.pathLength ?? null;
const out = {
ok: true,
payloadType: decoded?.payloadType ?? null,
routeType: decoded?.routeType ?? null,
messageHash: decoded?.messageHash ?? null,
location: loc,
role,
deviceRole,
deviceRoleName,
senderName,
payloadKeys,
appDataKeys,
pathHashes,
snrValues,
path,
pathLength,
};
return out;
}
const rl = readline.createInterface({
input: process.stdin,
crlfDelay: Infinity,
});
rl.on('line', (line) => {
let request;
try {
request = JSON.parse(line);
} catch (e) {
process.stdout.write(JSON.stringify({ ok: false, error: 'invalid_request_json' }) + '\\n');
return;
}
try {
const hex = typeof request?.hex === 'string' ? request.hex.trim() : '';
if (!hex) {
process.stdout.write(JSON.stringify({ ok: false, error: 'missing_hex' }) + '\\n');
return;
}
const channelSecrets = Array.isArray(request?.channelSecrets) ? request.channelSecrets : [];
process.stdout.write(JSON.stringify(decodeHex(hex, channelSecrets)) + '\\n');
} catch (e) {
process.stdout.write(JSON.stringify({ ok: false, error: String(e) }) + '\\n');
}
});
"""
try:
with open(NODE_SCRIPT_PATH, "w", encoding="utf-8") as handle:
handle.write(script)
os.chmod(NODE_SCRIPT_PATH, 0o755)
except Exception as exc:
_node_unavailable_once = True
print(f"[decode] failed writing node helper: {exc}")
return False
_node_ready_once = True
print("[decode] node decoder ready")
return True
def _stop_node_decoder_worker() -> None:
global _node_worker_proc
proc = _node_worker_proc
_node_worker_proc = None
if not proc:
return
try:
if proc.stdin:
proc.stdin.close()
except Exception:
pass
try:
proc.terminate()
proc.wait(timeout=1.0)
except Exception:
try:
proc.kill()
proc.wait(timeout=1.0)
except Exception:
pass
def _start_node_decoder_worker() -> Optional[subprocess.Popen[str]]:
global _node_worker_proc, _node_unavailable_once
proc = _node_worker_proc
if proc and proc.poll() is None:
return proc
_stop_node_decoder_worker()
try:
proc = subprocess.Popen(
["node", NODE_SCRIPT_PATH],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=APP_DIR,
bufsize=1,
env=os.environ.copy(),
)
except Exception as exc:
_node_unavailable_once = True
print(f"[decode] failed starting node worker: {exc}")
return None
_node_worker_proc = proc
return proc
def _restart_node_decoder_worker() -> Optional[subprocess.Popen[str]]:
_stop_node_decoder_worker()
return _start_node_decoder_worker()
def _node_worker_error(proc: Optional[subprocess.Popen[str]]) -> str:
if not proc or not proc.stderr:
return "node_worker_failed"
try:
err = proc.stderr.read().strip()
except Exception:
err = ""
return err or "node_worker_failed"
def _decode_meshcore_hex(
hex_str: str,
) -> Tuple[Optional[float], Optional[float], Optional[str], Optional[str], Dict[
str, Any]]:
if not _ensure_node_decoder():
return (
None,
None,
None,
None,
{
"ok": False,
"error": "node_decoder_unavailable"
},
)
request = {
"hex": hex_str,
"channelSecrets": _load_channel_secrets(),
}
out = ""
last_error = "node_worker_failed"
with _node_worker_lock:
for attempt in range(2):
proc = _start_node_decoder_worker() if attempt == 0 else _restart_node_decoder_worker()
if not proc or not proc.stdin or not proc.stdout:
last_error = "node_worker_unavailable"
continue
try:
proc.stdin.write(json.dumps(request) + "\n")
proc.stdin.flush()
except Exception as exc:
last_error = str(exc)
_stop_node_decoder_worker()
continue
try:
ready, _, _ = select.select([proc.stdout], [], [], NODE_DECODE_TIMEOUT_SECONDS)
except Exception as exc:
last_error = str(exc)
_stop_node_decoder_worker()
continue
if not ready:
last_error = "node_worker_timeout"
_stop_node_decoder_worker()
continue
try:
out = (proc.stdout.readline() or "").strip()
except Exception as exc:
last_error = str(exc)
_stop_node_decoder_worker()
continue
if out:
break
if proc.poll() is not None:
last_error = _node_worker_error(proc)
else:
last_error = "empty_decoder_output"
_stop_node_decoder_worker()
else:
return (None, None, None, None, {"ok": False, "error": last_error})
if not out:
return (
None, None, None, None, {
"ok": False,
"error": last_error or "empty_decoder_output"
}
)
try:
data = json.loads(out)
except Exception:
return (
None,
None,
None,
None,
{
"ok": False,
"error": "decoder_output_not_json",
"output": out
},
)
if not data.get("ok"):
return (None, None, None, None, {"ok": False, **data})
loc = data.get("location") or {}
lat = loc.get("lat")
lon = loc.get("lon")
name = loc.get("name")
pubkey = loc.get("pubkey")
normalized = None
if lat is not None and lon is not None:
normalized = _normalize_lat_lon(lat, lon)
if normalized:
return (normalized[0], normalized[1], pubkey, name, {"ok": True, **data})
return (
None,
None,
pubkey,
name,
{
"ok": True,
**data, "note": "decoded_no_location"
},
)
# =========================
# Parsing: MeshCore-ish payloads
# =========================
def _device_id_from_topic(topic: str) -> Optional[str]:
parts = topic.split("/")
if len(parts) >= 3 and parts[0] == "meshcore":
return parts[2]
return None
def _find_packet_blob(
obj: Any,
path: str = "root"
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
if isinstance(obj, str):
if _looks_like_hex(obj):
return (obj.strip(), path, "hex")
b64hex = _try_base64_to_hex(obj)
if b64hex:
return (b64hex, path, "base64")
return (None, None, None)
if isinstance(obj, list):
if obj and all(isinstance(x, int) for x in obj[:min(20, len(obj))]):
try:
raw = bytes(obj)
if len(raw) >= 10:
return (raw.hex(), path, "list[int]")
except Exception:
pass
for idx, v in enumerate(obj):
sub_path = f"{path}[{idx}]"
hex_str, where, hint = _find_packet_blob(v, sub_path)
if hex_str:
return (hex_str, where, hint)
return (None, None, None)
if isinstance(obj, dict):
keys = list(obj.keys())
keys.sort(key=lambda k: 0 if k in LIKELY_PACKET_KEYS else 1)
for k in keys:
v = obj.get(k)
sub_path = f"{path}.{k}"
if isinstance(v, str):
if _looks_like_hex(v):
return (v.strip(), sub_path, "hex")
b64hex = _try_base64_to_hex(v)
if b64hex:
return (b64hex, sub_path, "base64")
if (
isinstance(v, list) and v and
all(isinstance(x, int) for x in v[:min(20, len(v))])
):
try:
raw = bytes(v)
if len(raw) >= 10:
return (raw.hex(), sub_path, "list[int]")
except Exception:
pass
if isinstance(v, (dict, list)):
hex_str, where, hint = _find_packet_blob(v, sub_path)
if hex_str:
return (hex_str, where, hint)
return (None, None, None)
def _extract_device_id(
obj: Any, topic: str, decoded_pubkey: Optional[str]
) -> str:
if decoded_pubkey:
return str(decoded_pubkey)
if isinstance(obj, dict):
device_id = (
obj.get("device_id") or obj.get("id") or obj.get("from") or
obj.get("origin_id")
)
if device_id:
return str(device_id)
jwt = obj.get("jwt_payload")
if isinstance(jwt, dict) and jwt.get("publickey"):
return str(jwt.get("publickey"))
return _device_id_from_topic(topic) or topic.split("/")[-1]
def _try_parse_payload(
topic: str, payload_bytes: bytes
) -> Tuple[Optional[Dict[str, Any]], Dict[str, Any]]:
debug: Dict[str, Any] = {
"result": "no_coords",
"found_path": None,
"found_hint": None,
"decoder_meta": None,
"json_keys": None,
"parse_error": None,
"origin_id": None,
"device_name": None,
"device_role": None,
"decoded_pubkey": None,
"packet_hash": None,
"direction": None,
"packet_type": None,
}
text = None
try:
text = payload_bytes.decode("utf-8", errors="strict").strip()
except Exception:
text = payload_bytes.decode("utf-8", errors="ignore").strip()
obj = None
if text and text.startswith("{") and text.endswith("}"):
try:
obj = json.loads(text)
if isinstance(obj, dict):
debug["json_keys"] = list(obj.keys())[:50]
debug["origin_id"] = obj.get("origin_id") or obj.get("originId")
debug["device_name"] = _extract_device_name(obj, topic)
debug["device_role"] = _extract_device_role(obj, topic)
debug["direction"] = obj.get("direction")
debug["packet_hash"] = (
obj.get("hash") or obj.get("message_hash") or obj.get("messageHash")
)
debug["packet_type"] = (
obj.get("packet_type") or obj.get("packetType") or obj.get("type")
)
except Exception as exc:
debug["parse_error"] = str(exc)
if obj is not None:
found = _find_lat_lon_in_json(obj)
if found:
if not _direct_coords_allowed(topic, obj):
debug["result"] = "direct_blocked"
return (None, debug)
if not DIRECT_COORDS_ALLOW_ZERO and _coords_are_zero(found[0], found[1]):
debug["result"] = "direct_zero_coords"
return (None, debug)
device_id = _extract_device_id(obj, topic, None)
ts = time.time()
if isinstance(obj, dict):
tval = obj.get("ts") or obj.get("time") or obj.get("timestamp")
if isinstance(tval, (int, float)):
ts = float(tval)
debug["result"] = "direct_json"
return (
{
"device_id": device_id,
"lat": found[0],
"lon": found[1],
"ts": ts,
"heading": obj.get("heading") if isinstance(obj, dict) else None,
"speed": obj.get("speed") if isinstance(obj, dict) else None,
"rssi": obj.get("rssi") if isinstance(obj, dict) else None,
"snr": obj.get("snr") if isinstance(obj, dict) else None,
"role": debug.get("device_role"),
},
debug,
)
for s in _strings_from_json(obj):
got = _find_lat_lon_in_text(s)
if got:
if not _direct_coords_allowed(topic, obj):
debug["result"] = "direct_blocked"
return (None, debug)
if not DIRECT_COORDS_ALLOW_ZERO and _coords_are_zero(got[0], got[1]):
debug["result"] = "direct_zero_coords"
return (None, debug)
device_id = _extract_device_id(obj, topic, None)
debug["result"] = "direct_text_json"
return (
{
"device_id": device_id,
"lat": got[0],
"lon": got[1],
"ts": time.time(),
"role": debug.get("device_role"),
},
debug,
)
decoded = _maybe_base64_decode_to_text(s)
if decoded:
got2 = _find_lat_lon_in_text(decoded)
if got2:
if not _direct_coords_allowed(topic, obj):
debug["result"] = "direct_blocked"
return (None, debug)
if not DIRECT_COORDS_ALLOW_ZERO and _coords_are_zero(
got2[0], got2[1]
):
debug["result"] = "direct_zero_coords"
return (None, debug)
device_id = _extract_device_id(obj, topic, None)
debug["result"] = "direct_text_json_base64"
return (
{
"device_id": device_id,
"lat": got2[0],
"lon": got2[1],
"ts": time.time(),
"role": debug.get("device_role"),
},
debug,
)
hex_str, where, hint = _find_packet_blob(obj)
debug["found_path"] = where
debug["found_hint"] = hint
if hex_str:
lat, lon, decoded_pubkey, name, meta = _decode_meshcore_hex(hex_str)
debug["decoded_pubkey"] = decoded_pubkey
debug["decoder_meta"] = meta
_apply_meta_role(debug, meta)
if lat is not None and lon is not None:
device_id = _extract_device_id(obj, topic, decoded_pubkey)
debug["result"] = "decoded"
return (
{
"device_id": device_id,
"lat": lat,
"lon": lon,
"ts": time.time(),
"rssi": obj.get("rssi") if isinstance(obj, dict) else None,
"snr": obj.get("snr") if isinstance(obj, dict) else None,
"name": name,
"role": debug.get("device_role"),
},
debug,
)
debug["result"] = (
"decoded_no_location" if meta.get("ok") else "decode_failed"
)
return (None, debug)
debug["result"] = "json_no_packet_blob"
return (None, debug)
if text:
got = _find_lat_lon_in_text(text)
if got:
if not _direct_coords_allowed(topic, None):
debug["result"] = "direct_blocked"
return (None, debug)
if not DIRECT_COORDS_ALLOW_ZERO and _coords_are_zero(got[0], got[1]):
debug["result"] = "direct_zero_coords"
return (None, debug)
debug["result"] = "direct_text"
return (
{
"device_id": _extract_device_id(None, topic, None),
"lat": got[0],
"lon": got[1],
"ts": time.time(),
"role": debug.get("device_role"),
},
debug,
)
if _looks_like_hex(text):
debug["found_path"] = "payload"
debug["found_hint"] = "hex"
lat, lon, decoded_pubkey, name, meta = _decode_meshcore_hex(text.strip())
debug["decoded_pubkey"] = decoded_pubkey
debug["decoder_meta"] = meta
_apply_meta_role(debug, meta)
if lat is not None and lon is not None:
debug["result"] = "decoded"
return (
{
"device_id": _extract_device_id(None, topic, decoded_pubkey),
"lat": lat,
"lon": lon,
"ts": time.time(),
"name": name,
"role": debug.get("device_role"),
},
debug,
)
debug["result"] = (
"decoded_no_location" if meta.get("ok") else "decode_failed"
)
return (None, debug)
b64hex = _try_base64_to_hex(text)
if b64hex:
debug["found_path"] = "payload"
debug["found_hint"] = "base64"
lat, lon, decoded_pubkey, name, meta = _decode_meshcore_hex(b64hex)
debug["decoded_pubkey"] = decoded_pubkey
debug["decoder_meta"] = meta
_apply_meta_role(debug, meta)
if lat is not None and lon is not None:
debug["result"] = "decoded"
return (
{
"device_id": _extract_device_id(None, topic, decoded_pubkey),
"lat": lat,
"lon": lon,
"ts": time.time(),
"name": name,
"role": debug.get("device_role"),
},
debug,
)
debug["result"] = (
"decoded_no_location" if meta.get("ok") else "decode_failed"
)
return (None, debug)
if _is_probably_binary(payload_bytes) and len(payload_bytes) >= 10:
debug["found_path"] = "payload_bytes"
debug["found_hint"] = "raw_bytes"
lat, lon, decoded_pubkey, name, meta = _decode_meshcore_hex(
payload_bytes.hex()
)
debug["decoded_pubkey"] = decoded_pubkey
debug["decoder_meta"] = meta
_apply_meta_role(debug, meta)
if lat is not None and lon is not None:
debug["result"] = "decoded"
return (
{
"device_id": _extract_device_id(None, topic, decoded_pubkey),
"lat": lat,
"lon": lon,
"ts": time.time(),
"name": name,
"role": debug.get("device_role"),
},
debug,
)
debug["result"] = "decoded_no_location" if meta.get(
"ok"
) else "decode_failed"
return (None, debug)
return (None, debug)