openwebrx/owrx/source/afedri.py

126 lines
3.8 KiB
Python
Raw Permalink Normal View History

import re
from ipaddress import IPv4Address, AddressValueError
2023-10-15 12:31:59 +02:00
from owrx.source.soapy import SoapyConnectorSource, SoapyConnectorDeviceDescription
from owrx.form.input import Input, CheckboxInput, DropdownInput, Option
2023-11-19 12:28:15 +01:00
from owrx.form.input.device import TextInput
from owrx.form.input.validator import Validator, ValidationError
2023-10-15 12:31:59 +02:00
from typing import List
2023-10-31 15:11:52 +01:00
AFEDRI_DEVICE_KEYS = ["rx_mode"]
2023-10-21 05:07:50 +02:00
AFEDRI_PROFILE_KEYS = ["r820t_lna_agc", "r820t_mixer_agc"]
2023-10-15 12:31:59 +02:00
class IPv4AndPortValidator(Validator):
def validate(self, key, value) -> None:
parts = value.split(":")
if len(parts) != 2:
raise ValidationError(key, "Wrong format. Expected IPv4:port")
try:
IPv4Address(parts[0])
except AddressValueError as e:
raise ValidationError(key, "IP address error: {}".format(str(e)))
try:
port = int(parts[1])
except ValueError:
raise ValidationError(key, "Port number invalid")
if not 0 <= port <= 65535:
raise ValidationError(key, "Port number out of range")
class AfedriAddressPortInput(TextInput):
def __init__(self):
super().__init__(
"afedri_adress_port",
"Afedri IP and Port",
infotext="Afedri IP and port to connect to. Format = IPv4:Port",
validator=IPv4AndPortValidator(),
)
2023-10-15 12:31:59 +02:00
class AfedriSource(SoapyConnectorSource):
def getSoapySettingsMappings(self):
mappings = super().getSoapySettingsMappings()
2023-10-21 05:07:50 +02:00
mappings.update({x: x for x in AFEDRI_PROFILE_KEYS})
2023-10-15 12:31:59 +02:00
return mappings
def getEventNames(self):
2023-11-19 12:28:15 +01:00
return super().getEventNames() + ["afedri_adress_port"] + AFEDRI_DEVICE_KEYS
2023-10-15 12:31:59 +02:00
def getDriver(self):
return "afedri"
def buildSoapyDeviceParameters(self, parsed, values):
params = super().buildSoapyDeviceParameters(parsed, values)
address, port = values["afedri_adress_port"].split(":")
2023-10-15 12:31:59 +02:00
params += [{"address": address, "port": port}]
can_be_set_at_start = AFEDRI_DEVICE_KEYS
for elm in can_be_set_at_start:
if elm in values:
params += [{elm: values[elm]}]
return params
class AfedriDeviceDescription(SoapyConnectorDeviceDescription):
def getName(self):
return "Afedri device"
def supportsPpm(self):
# not currently mapped, and it's unclear how this should be sent to the device
return False
def hasAgc(self):
# not currently mapped
return False
def getInputs(self) -> List[Input]:
return super().getInputs() + [
AfedriAddressPortInput(),
2023-10-21 05:07:50 +02:00
CheckboxInput(
"r820t_lna_agc",
"Enable R820T LNA AGC",
),
CheckboxInput(
"r820t_mixer_agc",
"Enable R820T Mixer AGC",
),
DropdownInput(
2023-10-21 05:07:50 +02:00
"rx_mode",
"Switch the device to a specific RX mode at start",
options=[
Option("0", "Single"),
Option("1", "DualDiversity"),
Option("2", "Dual"),
Option("3", "DiversityInternal"),
Option("4", "QuadDiversity"),
Option("5", "Quad"),
],
2023-10-21 05:07:50 +02:00
),
2023-10-15 12:31:59 +02:00
]
def getDeviceMandatoryKeys(self):
return super().getDeviceMandatoryKeys() + ["afedri_adress_port"]
2023-10-15 12:31:59 +02:00
def getDeviceOptionalKeys(self):
2023-11-19 12:28:15 +01:00
return super().getDeviceOptionalKeys() + AFEDRI_DEVICE_KEYS
2023-10-15 12:31:59 +02:00
def getProfileOptionalKeys(self):
2023-10-21 05:07:50 +02:00
return super().getProfileOptionalKeys() + AFEDRI_PROFILE_KEYS
2023-10-15 12:31:59 +02:00
def getGainStages(self):
2023-10-21 05:07:50 +02:00
return [
"RF",
"FE",
"R820T_LNA_GAIN",
"R820T_MIXER_GAIN",
"R820T_VGA_GAIN",
]
def getNumberOfChannels(self) -> int:
return 4