2023-09-08 15:38:48 +02:00
|
|
|
from owrx.map import Map, LatLngLocation, Source
|
|
|
|
|
from csdr.module import JsonParser
|
|
|
|
|
from abc import ABCMeta
|
2023-09-10 03:16:47 +02:00
|
|
|
import re
|
2023-09-08 15:38:48 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class AirplaneLocation(LatLngLocation):
|
|
|
|
|
def __init__(self, message):
|
|
|
|
|
self.props = message
|
|
|
|
|
if "lat" in message and "lon" in message:
|
|
|
|
|
super().__init__(message["lat"], message["lon"])
|
|
|
|
|
else:
|
|
|
|
|
self.lat = None
|
|
|
|
|
self.lon = None
|
|
|
|
|
|
|
|
|
|
def __dict__(self):
|
|
|
|
|
res = super().__dict__()
|
|
|
|
|
res.update(self.props)
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IcaoSource(Source):
|
2023-09-10 00:09:49 +02:00
|
|
|
def __init__(self, icao: str, flight: str = None):
|
2023-09-09 00:45:58 +02:00
|
|
|
self.icao = icao.upper()
|
2023-09-10 00:09:49 +02:00
|
|
|
self.flight = flight
|
2023-09-08 15:38:48 +02:00
|
|
|
|
|
|
|
|
def getKey(self) -> str:
|
|
|
|
|
return "icao:{}".format(self.icao)
|
|
|
|
|
|
|
|
|
|
def __dict__(self):
|
|
|
|
|
d = {"icao": self.icao}
|
2023-09-10 00:09:49 +02:00
|
|
|
if self.flight is not None:
|
|
|
|
|
d["flight"] = self.flight
|
2023-09-08 15:38:48 +02:00
|
|
|
return d
|
|
|
|
|
|
|
|
|
|
|
2023-09-12 18:57:47 +02:00
|
|
|
class FlightSource(Source):
|
2023-09-08 15:38:48 +02:00
|
|
|
def __init__(self, flight):
|
|
|
|
|
self.flight = flight
|
|
|
|
|
|
|
|
|
|
def getKey(self) -> str:
|
2023-09-12 18:57:47 +02:00
|
|
|
return "flight:{}".format(self.flight)
|
2023-09-08 15:38:48 +02:00
|
|
|
|
|
|
|
|
def __dict__(self):
|
|
|
|
|
return {"flight": self.flight}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AcarsProcessor(JsonParser, metaclass=ABCMeta):
|
2023-09-10 03:16:47 +02:00
|
|
|
flightRegex = re.compile("^([0-9A-Z]{2})0*([0-9A-Z]+$)")
|
|
|
|
|
|
2023-09-08 15:38:48 +02:00
|
|
|
def processAcars(self, acars: dict, icao: str = None):
|
|
|
|
|
if "flight" in acars:
|
2023-09-10 03:16:47 +02:00
|
|
|
flight_id = self.processFlight(acars["flight"])
|
2023-09-08 15:38:48 +02:00
|
|
|
elif "reg" in acars:
|
2023-09-08 19:58:53 +02:00
|
|
|
flight_id = acars['reg'].lstrip(".")
|
2023-09-08 15:38:48 +02:00
|
|
|
else:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if "arinc622" in acars:
|
|
|
|
|
arinc622 = acars["arinc622"]
|
|
|
|
|
if "adsc" in arinc622:
|
|
|
|
|
adsc = arinc622["adsc"]
|
2023-09-21 20:51:35 +02:00
|
|
|
if "tags" in adsc and adsc["tags"]:
|
|
|
|
|
msg = {}
|
2023-09-08 15:38:48 +02:00
|
|
|
for tag in adsc["tags"]:
|
|
|
|
|
if "basic_report" in tag:
|
|
|
|
|
basic_report = tag["basic_report"]
|
2023-09-21 20:51:35 +02:00
|
|
|
msg.update({
|
2023-09-08 15:38:48 +02:00
|
|
|
"lat": basic_report["lat"],
|
|
|
|
|
"lon": basic_report["lon"],
|
2023-09-21 20:51:35 +02:00
|
|
|
"altitude": basic_report["alt"],
|
|
|
|
|
})
|
|
|
|
|
if "earth_ref_data" in tag:
|
|
|
|
|
earth_ref_data = tag["earth_ref_data"]
|
|
|
|
|
msg.update({
|
|
|
|
|
"groundtrack": earth_ref_data["true_trk_deg"],
|
|
|
|
|
"groundspeed": earth_ref_data["gnd_spd_kts"],
|
|
|
|
|
"verticalspeed": earth_ref_data["vspd_ftmin"],
|
|
|
|
|
})
|
|
|
|
|
if icao is not None:
|
|
|
|
|
source = IcaoSource(icao, flight=flight_id)
|
|
|
|
|
else:
|
|
|
|
|
source = FlightSource(flight_id)
|
|
|
|
|
Map.getSharedInstance().updateLocation(
|
|
|
|
|
source, AirplaneLocation(msg), "ACARS over {}".format(self.mode)
|
|
|
|
|
)
|
2023-09-10 03:16:47 +02:00
|
|
|
|
|
|
|
|
def processFlight(self, raw):
|
|
|
|
|
return self.flightRegex.sub(r"\g<1>\g<2>", raw)
|