openwebrx/owrx/aeronautical.py

90 lines
2.9 KiB
Python
Raw Normal View History

2023-09-08 15:38:48 +02:00
from owrx.map import Map, LatLngLocation, Source
from csdr.module import JsonParser
from abc import ABCMeta
2023-09-10 03:16:47 +02:00
import re
2023-09-08 15:38:48 +02:00
class AirplaneLocation(LatLngLocation):
def __init__(self, message):
self.props = message
if "lat" in message and "lon" in message:
super().__init__(message["lat"], message["lon"])
else:
self.lat = None
self.lon = None
def __dict__(self):
res = super().__dict__()
res.update(self.props)
return res
class IcaoSource(Source):
def __init__(self, icao: str, flight: str = None):
2023-09-09 00:45:58 +02:00
self.icao = icao.upper()
self.flight = flight
2023-09-08 15:38:48 +02:00
def getKey(self) -> str:
return "icao:{}".format(self.icao)
def __dict__(self):
d = {"icao": self.icao}
if self.flight is not None:
d["flight"] = self.flight
2023-09-08 15:38:48 +02:00
return d
class FlightSource(Source):
2023-09-08 15:38:48 +02:00
def __init__(self, flight):
self.flight = flight
def getKey(self) -> str:
return "flight:{}".format(self.flight)
2023-09-08 15:38:48 +02:00
def __dict__(self):
return {"flight": self.flight}
class AcarsProcessor(JsonParser, metaclass=ABCMeta):
2023-09-10 03:16:47 +02:00
flightRegex = re.compile("^([0-9A-Z]{2})0*([0-9A-Z]+$)")
2023-09-08 15:38:48 +02:00
def processAcars(self, acars: dict, icao: str = None):
if "flight" in acars:
2023-09-10 03:16:47 +02:00
flight_id = self.processFlight(acars["flight"])
2023-09-08 15:38:48 +02:00
elif "reg" in acars:
flight_id = acars['reg'].lstrip(".")
2023-09-08 15:38:48 +02:00
else:
return
if "arinc622" in acars:
arinc622 = acars["arinc622"]
if "adsc" in arinc622:
adsc = arinc622["adsc"]
if "tags" in adsc and adsc["tags"]:
msg = {}
2023-09-08 15:38:48 +02:00
for tag in adsc["tags"]:
if "basic_report" in tag:
basic_report = tag["basic_report"]
msg.update({
2023-09-08 15:38:48 +02:00
"lat": basic_report["lat"],
"lon": basic_report["lon"],
"altitude": basic_report["alt"],
})
if "earth_ref_data" in tag:
earth_ref_data = tag["earth_ref_data"]
msg.update({
"groundtrack": earth_ref_data["true_trk_deg"],
"groundspeed": earth_ref_data["gnd_spd_kts"],
"verticalspeed": earth_ref_data["vspd_ftmin"],
})
if icao is not None:
source = IcaoSource(icao, flight=flight_id)
else:
source = FlightSource(flight_id)
Map.getSharedInstance().updateLocation(
source, AirplaneLocation(msg), "ACARS over {}".format(self.mode)
)
2023-09-10 03:16:47 +02:00
def processFlight(self, raw):
return self.flightRegex.sub(r"\g<1>\g<2>", raw)