first version to use the new list implementation

* lots of events are not hooked up at the moment
* web config does not work
* changes made in the web config would probably not work either
This commit is contained in:
Jakob Ketterl 2023-05-06 01:56:28 +02:00
parent ec1c3c17ad
commit e2873f3ef8
7 changed files with 83 additions and 262 deletions

View file

@ -167,8 +167,9 @@ class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient):
modes = Modes.getModes()
self.write_modes(modes)
self.configSubs.append(SdrService.getActiveSources().wire(self._onSdrDeviceChanges))
self.configSubs.append(SdrService.getAvailableProfiles().wire(self._sendProfiles))
# TODO find an alternate solution
#self.configSubs.append(SdrService.getActiveSources().wire(self._onSdrDeviceChanges))
#self.configSubs.append(SdrService.getAvailableProfiles().wire(self._sendProfiles))
self._sendProfiles()
CpuUsageThread.getSharedInstance().add_client(self)
@ -266,7 +267,7 @@ class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient):
self.setSdr()
def _sendProfiles(self, *args):
profiles = [{"id": pid, "name": name} for pid, name in SdrService.getAvailableProfiles().items()]
profiles = SdrService.getAvailableProfiles()
self.write_profiles(profiles)
def handleTextMessage(self, conn, message):

View file

@ -1,225 +1,11 @@
from owrx.config import Config
from owrx.property import PropertyManager, PropertyDeleted, PropertyDelegator, PropertyLayer, PropertyReadOnly
from owrx.feature import FeatureDetector, UnknownFeatureException
from owrx.source import SdrSource, SdrSourceEventClient
from functools import partial
from owrx.source import SdrSource
import logging
logger = logging.getLogger(__name__)
class MappedSdrSources(PropertyDelegator):
def __init__(self, pm: PropertyManager):
self.subscriptions = {}
super().__init__(PropertyLayer())
for key, value in pm.items():
self._addSource(key, value)
pm.wire(self.handleSdrDeviceChange)
def handleSdrDeviceChange(self, changes):
for key, value in changes.items():
if value is PropertyDeleted:
if key in self:
del self[key]
else:
if key not in self:
self._addSource(key, value)
def handleDeviceUpdate(self, key, value, *args):
if key not in self and self.isDeviceValid(value):
self[key] = self.buildNewSource(key, value)
elif key in self and not self.isDeviceValid(value):
del self[key]
def _addSource(self, key, value):
self.handleDeviceUpdate(key, value)
updateMethod = partial(self.handleDeviceUpdate, key, value)
self.subscriptions[key] = [
value.filter("type", "profiles").wire(updateMethod),
value["profiles"].wire(updateMethod)
]
def isDeviceValid(self, device):
return self._sdrTypeAvailable(device) and self._hasProfiles(device)
def _hasProfiles(self, device):
return "profiles" in device and device["profiles"] and len(device["profiles"]) > 0
def _sdrTypeAvailable(self, value):
featureDetector = FeatureDetector()
try:
if not featureDetector.is_available(value["type"]):
logger.error(
'The SDR source type "{0}" is not available. please check the feature report for details.'.format(
value["type"]
)
)
return False
return True
except UnknownFeatureException:
logger.error(
'The SDR source type "{0}" is invalid. Please check your configuration'.format(value["type"])
)
return False
def buildNewSource(self, id, props):
sdrType = props["type"]
className = "".join(x for x in sdrType.title() if x.isalnum()) + "Source"
module = __import__("owrx.source.{0}".format(sdrType), fromlist=[className])
cls = getattr(module, className)
return cls(id, props)
def _removeSource(self, key, source):
source.shutdown()
for sub in self.subscriptions[key]:
sub.cancel()
del self.subscriptions[key]
def __setitem__(self, key, value):
source = self[key] if key in self else None
if source is value:
return
super().__setitem__(key, value)
if source is not None:
self._removeSource(key, source)
def __delitem__(self, key):
source = self[key] if key in self else None
super().__delitem__(key)
if source is not None:
self._removeSource(key, source)
class SourceStateHandler(SdrSourceEventClient):
def __init__(self, pm, key, source: SdrSource):
self.pm = pm
self.key = key
self.source = source
def selfDestruct(self):
self.source.removeClient(self)
def onFail(self):
del self.pm[self.key]
def onDisable(self):
del self.pm[self.key]
def onEnable(self):
self.pm[self.key] = self.source
def onShutdown(self):
del self.pm[self.key]
class ActiveSdrSources(PropertyReadOnly):
def __init__(self, pm: PropertyManager):
self.handlers = {}
self._layer = PropertyLayer()
super().__init__(self._layer)
for key, value in pm.items():
self._addSource(key, value)
pm.wire(self.handleSdrDeviceChange)
def handleSdrDeviceChange(self, changes):
for key, value in changes.items():
if value is PropertyDeleted:
self._removeSource(key)
else:
self._addSource(key, value)
def isAvailable(self, source: SdrSource):
return source.isEnabled() and not source.isFailed()
def _addSource(self, key, source: SdrSource):
if self.isAvailable(source):
self._layer[key] = source
self.handlers[key] = SourceStateHandler(self._layer, key, source)
source.addClient(self.handlers[key])
def _removeSource(self, key):
self.handlers[key].selfDestruct()
del self.handlers[key]
if key in self._layer:
del self._layer[key]
class AvailableProfiles(PropertyReadOnly):
def __init__(self, pm: PropertyManager):
self.subscriptions = {}
self.profileSubscriptions = {}
self._layer = PropertyLayer()
super().__init__(self._layer)
for key, value in pm.items():
self._addSource(key, value)
pm.wire(self.handleSdrDeviceChange)
def handleSdrDeviceChange(self, changes):
for key, value in changes.items():
if value is PropertyDeleted:
self._removeSource(key)
else:
self._addSource(key, value)
def handleSdrNameChange(self, s_id, source, name):
profiles = source.getProfiles()
for p_id in list(self._layer.keys()):
source_id, profile_id = p_id.split("|")
if source_id == s_id:
profile = profiles[profile_id]
self._layer[p_id] = "{} {}".format(name, profile["name"])
def handleProfileChange(self, source_id, source: SdrSource, changes):
for key, value in changes.items():
if value is PropertyDeleted:
self._removeProfile(source_id, key)
else:
self._addProfile(source_id, source, key, value)
def handleProfileNameChange(self, s_id, source: SdrSource, p_id, name):
for concat_p_id in list(self._layer.keys()):
source_id, profile_id = concat_p_id.split("|")
if source_id == s_id and profile_id == p_id:
self._layer[concat_p_id] = "{} {}".format(source.getName(), name)
def _addSource(self, key, source: SdrSource):
for p_id, p in source.getProfiles().items():
self._addProfile(key, source, p_id, p)
self.subscriptions[key] = [
source.getProps().wireProperty("name", partial(self.handleSdrNameChange, key, source)),
source.getProfiles().wire(partial(self.handleProfileChange, key, source)),
]
def _addProfile(self, s_id, source: SdrSource, p_id, profile):
self._layer["{}|{}".format(s_id, p_id)] = "{} {}".format(source.getName(), profile["name"])
if s_id not in self.profileSubscriptions:
self.profileSubscriptions[s_id] = {}
self.profileSubscriptions[s_id][p_id] = profile.wireProperty("name", partial(self.handleProfileNameChange, s_id, source, p_id))
def _removeSource(self, key):
for profile_id in list(self._layer.keys()):
if profile_id.startswith("{}|".format(key)):
del self._layer[profile_id]
if key in self.subscriptions:
while self.subscriptions[key]:
self.subscriptions[key].pop().cancel()
del self.subscriptions[key]
if key in self.profileSubscriptions:
for p_id in self.profileSubscriptions[key].keys():
self.profileSubscriptions[key][p_id].cancel()
del self.profileSubscriptions[key]
def _removeProfile(self, s_id, p_id):
for concat_p_id in list(self._layer.keys()):
source_id, profile_id = concat_p_id.split("|")
if source_id == s_id and profile_id == p_id:
del self._layer[concat_p_id]
if s_id in self.profileSubscriptions and p_id in self.profileSubscriptions[s_id]:
self.profileSubscriptions[s_id][p_id].cancel()
del self.profileSubscriptions[s_id][p_id]
class SdrService(object):
sources = None
activeSources = None
@ -231,36 +17,50 @@ class SdrService(object):
if not sources:
return None
# TODO: configure default sdr in config? right now it will pick the first one off the list.
return sources[list(sources.keys())[0]]
return sources[0]
@staticmethod
def getSource(id):
sources = SdrService.getActiveSources()
if not sources:
return None
if id not in sources:
try:
return next(s for s in sources if s.getId() == id)
except StopIteration:
return None
return sources[id]
@staticmethod
def getAllSources():
def buildNewSource(props):
sdrType = props["type"]
className = "".join(x for x in sdrType.title() if x.isalnum()) + "Source"
module = __import__("owrx.source.{0}".format(sdrType), fromlist=[className])
cls = getattr(module, className)
return cls(props)
if SdrService.sources is None:
SdrService.sources = MappedSdrSources(Config.get()["sdrs"])
SdrService.sources = Config.get()["sdrs"].map(buildNewSource)
return SdrService.sources
@staticmethod
def getActiveSources():
def isAvailable(source: SdrSource):
return source.isEnabled() and not source.isFailed()
if SdrService.activeSources is None:
SdrService.activeSources = ActiveSdrSources(SdrService.getAllSources())
SdrService.activeSources = SdrService.getAllSources().filter(isAvailable)
return SdrService.activeSources
@staticmethod
def getAvailableProfiles():
def buildProfiles(source):
return source.getProfiles().map(lambda profile: {"id": "{}|{}".format(source.getId(), profile["id"]), "name": "{} {}".format(source.getName(), profile["name"])})
if SdrService.availableProfiles is None:
SdrService.availableProfiles = AvailableProfiles(SdrService.getActiveSources())
SdrService.availableProfiles = SdrService.getActiveSources().map(buildProfiles).flatten()
return SdrService.availableProfiles
@staticmethod
def stopAllSources():
for source in SdrService.getAllSources().values():
for source in SdrService.getAllSources():
source.stop()

View file

@ -8,6 +8,7 @@ from owrx.property import PropertyLayer, PropertyDeleted
from owrx.service.schedule import ServiceScheduler
from owrx.service.chain import ServiceDemodulatorChain
from owrx.modes import Modes, DigitalMode
from owrx.active.list import ActiveListListener, ActiveListChange, ActiveListIndexDeleted, ActiveListIndexAdded
from typing import Union, Optional
from csdr.chain.demodulator import BaseDemodulatorChain, ServiceDemodulator, DialFrequencyReceiver
from pycsdr.modules import Buffer
@ -313,6 +314,26 @@ class ServiceHandler(SdrSourceEventClient):
raise ValueError("unsupported service modulation: {}".format(mod))
class SdrDeviceEventHandler(ActiveListListener):
def onListChange(self, changes: list[ActiveListChange]):
for change in changes:
if isinstance(change, ActiveListIndexDeleted):
key = change.oldValue.getId()
if key in Services.handlers:
Services.handlers[key].shutdown()
del Services.handlers[key]
if key in Services.schedulers:
Services.schedulers[key].shutdown()
del Services.schedulers[key]
elif isinstance(change, ActiveListIndexAdded):
source = change.newValue
key = source.getId()
Services.schedulers[key] = ServiceScheduler(source)
if Config.get()["services_enabled"]:
Services.handlers[key] = ServiceHandler(source)
class Services(object):
handlers = {}
schedulers = {}
@ -322,35 +343,20 @@ class Services(object):
config = Config.get()
config.wireProperty("services_enabled", Services._receiveEnabledEvent)
activeSources = SdrService.getActiveSources()
activeSources.wire(Services._receiveDeviceEvent)
for key, source in activeSources.items():
Services.schedulers[key] = ServiceScheduler(source)
activeSources.addListener(SdrDeviceEventHandler())
for source in activeSources:
Services.schedulers[source.getId()] = ServiceScheduler(source)
@staticmethod
def _receiveEnabledEvent(state):
if state:
for key, source in SdrService.getActiveSources().__dict__().items():
Services.handlers[key] = ServiceHandler(source)
for source in SdrService.getActiveSources():
Services.handlers[source.getId()] = ServiceHandler(source)
else:
for handler in list(Services.handlers.values()):
handler.shutdown()
Services.handlers = {}
@staticmethod
def _receiveDeviceEvent(changes):
for key, source in changes.items():
if source is PropertyDeleted:
if key in Services.handlers:
Services.handlers[key].shutdown()
del Services.handlers[key]
if key in Services.schedulers:
Services.schedulers[key].shutdown()
del Services.schedulers[key]
else:
Services.schedulers[key] = ServiceScheduler(source)
if Config.get()["services_enabled"]:
Services.handlers[key] = ServiceHandler(source)
@staticmethod
def stop():
for handler in list(Services.handlers.values()):

View file

@ -12,6 +12,7 @@ from owrx.command import CommandMapper
from owrx.socket import getAvailablePort
from owrx.property import PropertyStack, PropertyLayer, PropertyFilter, PropertyCarousel, PropertyDeleted
from owrx.property.filter import ByLambda
from owrx.active.list import ActiveListListener, ActiveListChange, ActiveListIndexAdded
from owrx.form.input import Input, TextInput, NumberInput, CheckboxInput, ModesInput, ExponentialInput
from owrx.form.input.converter import OptionalConverter
from owrx.form.input.device import GainInput, SchedulerInput, WaterfallLevelsInput
@ -76,18 +77,30 @@ class SdrSourceEventClient(object):
return SdrClientClass.INACTIVE
class SdrProfileCarouselListener(ActiveListListener):
def __init__(self, carousel):
self.carousel = carousel
def onListChange(self, changes: list[ActiveListChange]):
for change in changes:
# TODO: respond to deletions and updates
if isinstance(change, ActiveListIndexAdded):
profile = change.newValue
self.carousel.addLayer(profile["id"], profile)
class SdrProfileCarousel(PropertyCarousel):
def __init__(self, props):
super().__init__()
if "profiles" not in props:
return
for profile_id, profile in props["profiles"].items():
self.addLayer(profile_id, profile)
for profile in props["profiles"]:
self.addLayer(profile["id"], profile)
# activate first available profile
self.switch()
props["profiles"].wire(self.handleProfileUpdate)
props["profiles"].addListener(SdrProfileCarouselListener(self))
def addLayer(self, profile_id, profile):
profile_stack = PropertyStack()
@ -110,13 +123,13 @@ class SdrProfileCarousel(PropertyCarousel):
class SdrSource(ABC):
def __init__(self, id, props):
self.id = id
def __init__(self, props):
self.id = props["id"] if "id" in props else None
self.commandMapper = None
self.tcpSource = None
self.buffer = None
self.logger = logger.getChild(id) if id is not None else logger
self.logger = logger.getChild(self.id) if self.id is not None else logger
self.logger.addHandler(HistoryHandler.getHandler(self.logger.name))
self.stdoutPipe = None
self.stderrPipe = None
@ -188,20 +201,20 @@ class SdrSource(ABC):
def validateProfiles(self):
props = PropertyStack()
props.addLayer(1, self.props)
for id, p in self.props["profiles"].items():
for p in self.props["profiles"]:
props.replaceLayer(0, p)
if "center_freq" not in props:
self.logger.warning('Profile "%s" does not specify a center_freq', id)
self.logger.warning('Profile "%s" does not specify a center_freq', p["id"])
continue
if "samp_rate" not in props:
self.logger.warning('Profile "%s" does not specify a samp_rate', id)
self.logger.warning('Profile "%s" does not specify a samp_rate', p["id"])
continue
if "start_freq" in props:
start_freq = props["start_freq"]
srh = props["samp_rate"] / 2
center_freq = props["center_freq"]
if start_freq < center_freq - srh or start_freq > center_freq + srh:
self.logger.warning('start_freq for profile "%s" is out of range', id)
self.logger.warning('start_freq for profile "%s" is out of range', p["id"])
def isAlwaysOn(self):
return "always-on" in self.props and self.props["always-on"]
@ -232,10 +245,11 @@ class SdrSource(ABC):
def activateProfile(self, profile_id):
try:
profile_name = self.getProfiles()[profile_id]["name"]
profile = next(p for p in self.getProfiles() if p["id"] == profile_id)
profile_name = profile["name"]
self.logger.debug("activating profile \"%s\" for \"%s\"", profile_name, self.getName())
self.profileCarousel.switch(profile_id)
except KeyError:
except StopIteration:
self.logger.warning("invalid profile %s for sdr %s. ignoring", profile_id, self.getId())
def getId(self):

View file

@ -8,10 +8,10 @@ from owrx.form.input import Input, NumberInput, CheckboxInput
class ConnectorSource(SdrSource):
def __init__(self, id, props):
def __init__(self, props):
self.controlSocket = None
self.controlPort = getAvailablePort()
super().__init__(id, props)
super().__init__(props)
def getCommandMapper(self):
return (

View file

@ -7,9 +7,9 @@ from pycsdr.types import Format
class DirectSource(SdrSource, metaclass=ABCMeta):
def __init__(self, id, props):
def __init__(self, props):
self._conversion = None
super().__init__(id, props)
super().__init__(props)
def onPropertyChange(self, changes):
self.logger.debug("restarting sdr source due to property changes: {0}".format(changes))

View file

@ -23,7 +23,7 @@ class Resampler(SdrSource):
self.chain.setReader(sdr.getBuffer().getReader())
super().__init__(None, props)
super().__init__(props)
def getBuffer(self):
if self.buffer is None: