openwebrx/owrx/vdl2/dumpvdl2.py
2023-09-08 19:59:18 +02:00

93 lines
3.4 KiB
Python

from pycsdr.modules import ExecModule
from pycsdr.types import Format
from owrx.aeronautical import AcarsProcessor
from owrx.map import Map
from owrx.aeronautical import AirplaneLocation, IcaoSource
import logging
logger = logging.getLogger(__name__)
class DumpVDL2Module(ExecModule):
def __init__(self):
super().__init__(
Format.COMPLEX_SHORT,
Format.CHAR,
[
"dumpvdl2",
"--iq-file", "-",
"--oversample", "1",
"--sample-format", "S16_LE",
"--output", "decoded:json:file:path=-",
]
)
class VDL2MessageParser(AcarsProcessor):
def __init__(self):
super().__init__("VDL2")
def process(self, line):
msg = super().process(line)
if msg is not None:
try:
payload = msg["vdl2"]
if "avlc" in payload:
avlc = payload["avlc"]
src = avlc["src"]["addr"]
if avlc["frame_type"] == "I":
if "acars" in avlc:
self.processAcars(avlc["acars"], icao=src)
elif "x25" in avlc:
x25 = avlc["x25"]
if "clnp" in x25:
clnp = x25["clnp"]
if "cotp" in clnp:
cotp = clnp["cotp"]
if "adsc_v2" in cotp:
adsc_v2 = cotp["adsc_v2"]
if "adsc_report" in adsc_v2:
adsc_report = adsc_v2["adsc_report"]
if "periodic_report" in adsc_report["data"]:
periodic_report = adsc_report["data"]["periodic_report"]
report_data = periodic_report["report_data"]
self.processReport(report_data, src)
except Exception:
logger.exception("error processing VDL2 data")
return msg
def processReport(self, report, icao):
if "position" not in report:
return
msg = {
"lat": self.convertLatitude(**report["position"]["lat"]),
"lon": self.convertLongitude(**report["position"]["lon"]),
"altitude": report["position"]["alt"]["val"],
}
if "ground_vector" in report:
msg.update({
"groundtrack": report["ground_vector"]["ground_track"]["val"],
"groundspeed": report["ground_vector"]["ground_speed"]["val"],
})
if "air_vector" in report:
msg.update({
"verticalspeed": report["air_vector"]["vertical_rate"]["val"],
})
Map.getSharedInstance().updateLocation(IcaoSource(icao), AirplaneLocation(msg), "VDL2")
def convertLatitude(self, dir, **args) -> float:
coord = self.convertCoordinate(**args)
if dir == "south":
coord *= -1
return coord
def convertLongitude(self, dir, **args) -> float:
coord = self.convertCoordinate(**args)
if dir == "west":
coord *= -1
return coord
def convertCoordinate(self, deg, min, sec) -> float:
return deg + float(min) / 60 + float(sec) / 3600