mirror of
https://github.com/jketterl/openwebrx.git
synced 2026-01-02 23:00:16 +01:00
102 lines
3.7 KiB
Python
102 lines
3.7 KiB
Python
from pycsdr.modules import ExecModule
|
|
from pycsdr.types import Format
|
|
from owrx.aeronautical import AirplaneLocation, AcarsProcessor, IcaoSource, FlightSource
|
|
from owrx.map import Map
|
|
from owrx.metrics import Metrics, CounterMetric
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DumpHFDLModule(ExecModule):
|
|
def __init__(self):
|
|
super().__init__(
|
|
Format.COMPLEX_FLOAT,
|
|
Format.CHAR,
|
|
[
|
|
"dumphfdl",
|
|
"--iq-file", "-",
|
|
"--sample-format", "CF32",
|
|
"--sample-rate", "12000",
|
|
"--output", "decoded:json:file:path=-",
|
|
"0",
|
|
],
|
|
flushSize=50000,
|
|
)
|
|
|
|
|
|
class HFDLMessageParser(AcarsProcessor):
|
|
def __init__(self):
|
|
name = "dumphfdl.decodes.hfdl"
|
|
self.metrics = Metrics.getSharedInstance().getMetric(name)
|
|
if self.metrics is None:
|
|
self.metrics = CounterMetric()
|
|
Metrics.getSharedInstance().addMetric(name, self.metrics)
|
|
super().__init__("HFDL")
|
|
|
|
def process(self, line):
|
|
msg = super().process(line)
|
|
if msg is not None:
|
|
try:
|
|
payload = msg["hfdl"]
|
|
if "lpdu" in payload:
|
|
lpdu = payload["lpdu"]
|
|
icao = lpdu["src"]["ac_info"]["icao"] if "ac_info" in lpdu["src"] else None
|
|
if lpdu["type"]["id"] in [13, 29]:
|
|
hfnpdu = lpdu["hfnpdu"]
|
|
if hfnpdu["type"]["id"] == 209:
|
|
# performance data
|
|
self.processPosition(hfnpdu, icao)
|
|
elif hfnpdu["type"]["id"] == 255:
|
|
# enveloped data
|
|
if "acars" in hfnpdu:
|
|
self.processAcars(hfnpdu["acars"], icao)
|
|
elif lpdu["type"]["id"] in [79, 143, 191]:
|
|
if "ac_info" in lpdu:
|
|
icao = lpdu["ac_info"]["icao"]
|
|
self.processPosition(lpdu["hfnpdu"], icao)
|
|
except Exception:
|
|
logger.exception("error processing HFDL data")
|
|
|
|
self.metrics.inc()
|
|
|
|
return msg
|
|
|
|
def processPosition(self, hfnpdu, icao=None):
|
|
if "pos" in hfnpdu:
|
|
pos = hfnpdu["pos"]
|
|
if abs(pos['lat']) <= 90 and abs(pos['lon']) <= 180:
|
|
flight = self.processFlight(hfnpdu["flight_id"])
|
|
|
|
if icao is not None:
|
|
source = IcaoSource(icao, flight=flight)
|
|
elif flight:
|
|
source = FlightSource(flight)
|
|
else:
|
|
source = None
|
|
|
|
if source:
|
|
msg = {
|
|
"lat": pos["lat"],
|
|
"lon": pos["lon"],
|
|
"flight": flight
|
|
}
|
|
if "utc_time" in hfnpdu:
|
|
ts = self.processTimestamp(**hfnpdu["utc_time"])
|
|
elif "time" in hfnpdu:
|
|
ts = self.processTimestamp(**hfnpdu["time"])
|
|
else:
|
|
ts = None
|
|
Map.getSharedInstance().updateLocation(source, AirplaneLocation(msg), "HFDL", timestamp=ts)
|
|
|
|
def processTimestamp(self, hour, min, sec) -> datetime:
|
|
now = datetime.now(timezone.utc)
|
|
t = now.replace(hour=hour, minute=min, second=sec, microsecond=0)
|
|
# if we have moved the time to the future, it's most likely that we're close to midnight and the time
|
|
# we received actually refers to yesterday
|
|
if t > now:
|
|
t -= timedelta(days=1)
|
|
return t
|