openwebrx/owrx/adsb/modes.py

312 lines
11 KiB
Python
Raw Normal View History

2023-08-25 21:15:29 +02:00
from abc import ABC
2023-08-23 00:40:24 +02:00
from csdr.module import PickleModule
2023-08-23 18:44:05 +02:00
from math import sqrt, atan2, pi, floor, acos, cos
2023-08-25 21:15:29 +02:00
from owrx.map import LatLngLocation, IncrementalUpdate, Location, Map
2023-08-23 18:44:05 +02:00
import time
2023-08-23 00:40:24 +02:00
import logging
logger = logging.getLogger(__name__)
FEET_PER_METER = 3.28084
2023-08-23 18:44:05 +02:00
nz = 15
d_lat_even = 360 / (4 * nz)
d_lat_odd = 360 / (4 * nz - 1)
2023-08-25 21:15:29 +02:00
class AirplaneLocation(LatLngLocation, IncrementalUpdate, ABC):
mapKeys = [
"icao",
"lat",
"lon",
"altitude",
"heading",
"groundtrack",
"groundspeed",
"verticalspeed",
"identification",
]
ttl = 30
2023-08-23 19:33:16 +02:00
def __init__(self, message):
self.history = []
self.timestamp = time.time()
2023-08-25 21:15:29 +02:00
self.props = message
if "lat" in message and "lon" in message:
super().__init__(message["lat"], message["lon"])
else:
self.lat = None
self.lon = None
def update(self, previousLocation: Location):
history = previousLocation.history
history += [{
"timestamp": self.timestamp,
"props": self.props,
}]
now = time.time()
history = [p for p in history if now - p["timestamp"] < self.ttl]
self.history = sorted(history, key=lambda p: p["timestamp"])
merged = {}
for p in self.history:
merged.update(p["props"])
self.props = merged
if "lat" in merged:
self.lat = merged["lat"]
if "lon" in merged:
self.lon = merged["lon"]
2023-08-25 21:15:29 +02:00
def __dict__(self):
dict = super().__dict__()
dict.update(self.props)
return dict
2023-08-23 19:33:16 +02:00
2023-08-23 18:44:05 +02:00
class CprCache:
def __init__(self):
self.aircraft = {}
def getRecentData(self, icao: str):
if icao not in self.aircraft:
return []
now = time.time()
filtered = [r for r in self.aircraft[icao] if now - r["timestamp"] < 10]
records = sorted(filtered, key=lambda r: r["timestamp"])
self.aircraft[icao] = records
return [r["data"] for r in records]
def addRecord(self, icao: str, data: any):
if icao not in self.aircraft:
self.aircraft[icao] = []
self.aircraft[icao].append({"timestamp": time.time(), "data": data})
2023-08-23 00:40:24 +02:00
class ModeSParser(PickleModule):
2023-08-23 18:44:05 +02:00
def __init__(self):
self.cprCache = CprCache()
super().__init__()
2023-08-23 00:40:24 +02:00
def process(self, input):
format = (input[0] & 0b11111000) >> 3
2023-08-23 13:48:58 +02:00
message = {
2023-08-23 00:40:24 +02:00
"mode": "ADSB",
"format": format
}
if format == 17:
message["capability"] = input[0] & 0b111
2023-08-23 18:44:05 +02:00
message["icao"] = icao = input[1:4].hex()
2023-08-23 00:40:24 +02:00
type = (input[4] & 0b11111000) >> 3
2023-08-25 21:15:29 +02:00
message["adsb_type"] = type
2023-08-23 00:40:24 +02:00
2023-08-23 18:44:05 +02:00
if type in [1, 2, 3, 4]:
2023-08-23 00:40:24 +02:00
# identification message
id = [
(input[5] & 0b11111100) >> 2,
((input[5] & 0b00000011) << 4) | ((input[6] & 0b11110000) >> 4),
((input[6] & 0b00001111) << 2) | ((input[7] & 0b11000000) >> 6),
input[7] & 0b00111111,
(input[8] & 0b11111100) >> 2,
((input[8] & 0b00000011) << 4) | ((input[9] & 0b11110000) >> 4),
((input[9] & 0b00001111) << 2) | ((input[10] & 0b11000000) >> 6),
input[10] & 0b00111111
]
message["identification"] = bytes(b + (0x40 if b < 27 else 0) for b in id).decode("ascii")
2023-08-23 18:44:05 +02:00
elif type in [5, 6, 7, 8]:
2023-08-23 00:40:24 +02:00
# surface position
pass
2023-08-23 18:44:05 +02:00
elif type in [9, 10, 11, 12, 13, 14, 15, 16, 17, 18]:
2023-08-23 00:40:24 +02:00
# airborne position (w/ baro altitude)
2023-08-23 18:44:05 +02:00
cpr = self.__getCprData(icao, input)
if cpr is not None:
lat, lon = cpr
message["lat"] = lat
message["lon"] = lon
2023-08-23 00:40:24 +02:00
q = (input[5] & 0b1)
2023-08-23 13:48:58 +02:00
altitude = ((input[5] & 0b11111110) << 3) | ((input[6] & 0b11110000) >> 4)
2023-08-23 00:40:24 +02:00
if q:
message["altitude"] = altitude * 25 - 1000
else:
# TODO: it's gray encoded
message["altitude"] = altitude * 100
elif type == 19:
# airborne velocity
subtype = input[4] & 0b111
2023-08-23 18:44:05 +02:00
if subtype in [1, 2]:
# velocity is reported in an east/west and a north/south component
# vew = velocity east / west
2023-08-23 00:40:24 +02:00
vew = ((input[5] & 0b00000011) << 8) | input[6]
2023-08-23 18:44:05 +02:00
# vns = velocity north / south
2023-08-23 00:40:24 +02:00
vns = ((input[7] & 0b01111111) << 3) | ((input[8] & 0b1110000000) >> 5)
2023-08-23 18:44:05 +02:00
# 0 means no data
if vew != 0 and vns != 0:
# dew = direction east/west (0 = to east, 1 = to west)
dew = (input[5] & 0b00000100) >> 2
# dns = direction north/south (0 = to north, 1 = to south)
dns = (input[7] & 0b10000000) >> 7
vx = vew - 1
if dew:
vx *= -1
vy = vns - 1
if dns:
vy *= -1
# supersonic
if subtype == 2:
vx *= 4
vy *= 4
message["groundspeed"] = sqrt(vx ** 2 + vy ** 2)
message["groundtrack"] = (atan2(vx, vy) * 360 / (2 * pi)) % 360
# vertical rate
vr = ((input[8] & 0b00000111) << 6) | ((input[9] & 0b11111100) >> 2)
if vr != 0:
# vertical speed sign (1 = negative)
svr = ((input[8] & 0b00001000) >> 3)
# vertical speed
vs = 64 * (vr - 1)
if svr:
vs *= -1
message["verticalspeed"] = vs
elif subtype in [3, 4]:
sh = (input[5] & 0b00000100) >> 2
if sh:
hdg = ((input[5] & 0b00000011) << 8) | input[6]
message["heading"] = hdg * 360 / 1024
logger.debug("decoded from subtype 3: heading = %i", message["heading"])
airspeed = ((input[7] & 0b01111111) << 3) | ((input[8] & 0b11100000) >> 5)
if airspeed != 0:
airspeed -= 1
# supersonic
if subtype == 4:
airspeed *= 4
airspeed_type = (input[7] & 0b10000000) >> 7
if airspeed_type:
message["TAS"] = airspeed
logger.debug("decoded from subtype 3: TAS = %i", message["TAS"])
else:
message["IAS"] = airspeed
logger.debug("decoded from subtype 3: IAS = %i", message["IAS"])
elif type in [20, 21, 22]:
2023-08-23 00:40:24 +02:00
# airborne position (w/GNSS height)
2023-08-23 18:44:05 +02:00
cpr = self.__getCprData(icao, input)
if cpr is not None:
lat, lon = cpr
message["lat"] = lat
message["lon"] = lon
2023-08-23 00:40:24 +02:00
altitude = (input[5] << 4) | ((input[6] & 0b1111) >> 4)
message["altitude"] = altitude * FEET_PER_METER
elif type == 28:
# aircraft status
pass
elif type == 29:
# target state and status information
pass
elif type == 31:
# aircraft operation status
pass
2023-08-24 02:49:29 +02:00
elif format == 11:
# Mode-S All-call reply
message["icao"] = input[1:4].hex()
if "icao" in message and AirplaneLocation.mapKeys & message.keys():
data = {k: message[k] for k in AirplaneLocation.mapKeys if k in message}
loc = AirplaneLocation(data)
2023-08-25 21:15:29 +02:00
Map.getSharedInstance().updateLocation({"icao": message['icao']}, loc, "ADS-B", None)
2023-08-23 19:33:16 +02:00
2023-08-23 00:40:24 +02:00
return message
2023-08-23 18:44:05 +02:00
def __getCprData(self, icao: str, input):
self.cprCache.addRecord(icao, {
"cpr_format": (input[6] & 0b00000100) >> 2,
2023-08-24 01:43:19 +02:00
"lat_cpr": ((input[6] & 0b00000011) << 15) | (input[7] << 7) | ((input[8] & 0b11111110) >> 1),
"lon_cpr": ((input[8] & 0b00000001) << 16) | (input[9] << 8) | (input[10]),
2023-08-23 18:44:05 +02:00
})
records = self.cprCache.getRecentData(icao)
try:
# records are sorted by timestamp, last should be newest
2023-08-24 01:43:19 +02:00
odd = next(r for r in reversed(records) if r["cpr_format"])
even = next(r for r in reversed(records) if not r["cpr_format"])
newest = next(reversed(records))
2023-08-23 18:44:05 +02:00
lat_cpr_even = even["lat_cpr"] / 2 ** 17
lat_cpr_odd = odd["lat_cpr"] / 2 ** 17
# latitude zone index
j = floor(59 * lat_cpr_even - 60 * lat_cpr_odd + .5)
lat_even = d_lat_even * ((j % 60) + lat_cpr_even)
lat_odd = d_lat_odd * ((j % 59) + lat_cpr_odd)
if lat_even >= 270:
lat_even -= 360
if lat_odd >= 270:
lat_odd -= 360
def nl(lat):
if lat == 0:
return 59
elif lat == 87:
return 2
elif lat == -87:
return 2
elif lat > 87:
return 1
elif lat < -87:
return 1
else:
2023-08-23 19:33:16 +02:00
return floor((2 * pi) / acos(1 - (1 - cos(pi / (2 * nz))) / (cos((pi / 180) * abs(lat)) ** 2)))
2023-08-23 18:44:05 +02:00
if nl(lat_even) != nl(lat_odd):
logger.debug("latitude zone mismatch")
return
lat = lat_odd if newest["cpr_format"] else lat_even
lon_cpr_even = even["lon_cpr"] / 2 ** 17
lon_cpr_odd = odd["lon_cpr"] / 2 ** 17
# longitude zone index
nl_lat = nl(lat)
m = floor(lon_cpr_even * (nl_lat - 1) - lon_cpr_odd * nl_lat + .5)
n_even = max(nl_lat, 1)
2023-08-23 19:33:16 +02:00
n_odd = max(nl_lat - 1, 1)
2023-08-23 18:44:05 +02:00
d_lon_even = 360 / n_even
d_lon_odd = 360 / n_odd
lon_even = d_lon_even * (m % n_even + lon_cpr_even)
lon_odd = d_lon_odd * (m % n_odd + lon_cpr_odd)
lon = lon_odd if newest["cpr_format"] else lon_even
if lon >= 180:
lon -= 360
return lat, lon
except StopIteration:
# we don't have both CPR records. better luck next time.
pass