This commit is contained in:
denny-veridooh 2026-03-20 03:25:56 +02:00 committed by GitHub
commit eaa680336a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 336 additions and 23 deletions

View file

@ -21,6 +21,9 @@ class MetaProcessor(PickleModule):
self.fine_increment = - (1/3) / 2048000
# carrier spacing is 1kHz, don't drift further than that.
self.max_shift = 1000 / 2048000
# Cache stable fields (programmes, ensemble_id/label) so they can be
# replayed to clients that connect after the initial FIC decode.
self.cached_output = {}
super().__init__()
def process(self, data):
@ -41,6 +44,12 @@ class MetaProcessor(PickleModule):
if not result:
return
result["mode"] = "DAB"
# Merge only stable fields (programmes, ensemble_label, etc.) into the cache.
# Do NOT replace the whole dict — freq-correction-only packets have no
# stable fields and would wipe out previously cached programme data.
stable = {k: v for k, v in result.items() if k not in ("timestamp", "mode")}
if stable:
self.cached_output.update(stable)
return result
def _nudgeShift(self, amount):
@ -58,32 +67,89 @@ class MetaProcessor(PickleModule):
self.shifter.setRate(0)
class MetaForwarder(PickleModule):
"""
Per-client meta forwarder for shared DAB mode.
Reads from the shared ETI meta buffer (same source as MetaProcessor in
SharedDabDecoder), strips frequency-correction keys (already handled by
MetaProcessor in SharedDabDecoder), and forwards all other DAB metadata
(programme labels, service list) to each client's meta WebSocket channel.
"""
def process(self, data):
result = {}
for key, value in data.items():
if key not in ("coarse_frequency_shift", "fine_frequency_shift"):
result[key] = value
if not result:
return
result["mode"] = "DAB"
return result
class Dablin(BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, HdAudio, MetaProvider, DabServiceSelector, DialFrequencyReceiver):
def __init__(self):
shift = Shift(0)
self.decoder = EtiDecoder()
def __init__(self, shared_decoder=None):
self._shared_decoder = shared_decoder
# Cache ETI reader — getEtiReader() allocates a new buffer cursor each call;
# calling it repeatedly would orphan previous readers. Allocate once in setReader().
self._eti_reader = None
metaBuffer = Buffer(Format.CHAR)
self.decoder.setMetaWriter(metaBuffer)
self.processor = MetaProcessor(shift)
self.processor.setReader(metaBuffer.getReader())
# use a dummy to start with. it won't run without.
# will be replaced by setMetaWriter().
self.processor.setWriter(Buffer(Format.CHAR))
if shared_decoder is None:
# Standalone mode — original behaviour, one EtiDecoder per client.
shift = Shift(0)
self.decoder = EtiDecoder()
self.dablin = DablinModule()
metaBuffer = Buffer(Format.CHAR)
self.decoder.setMetaWriter(metaBuffer)
self.processor = MetaProcessor(shift)
self.processor.setReader(metaBuffer.getReader())
# use a dummy to start with. it won't run without.
# will be replaced by setMetaWriter().
self.processor.setWriter(Buffer(Format.CHAR))
self.dablin = DablinModule()
workers = [
shift,
self.decoder,
self.dablin,
Downmix(Format.FLOAT),
]
else:
# Shared mode — EtiDecoder runs in SharedDabDecoder; this chain only handles
# per-client audio service selection. setReader() is overridden below to wire
# DablinModule to the shared ETI stream instead of the IQ source.
# Per-client MetaForwarder reads from the shared meta buffer and forwards
# programme/service metadata to each client's meta WebSocket channel.
self.dablin = DablinModule()
self._meta_forwarder = MetaForwarder()
self._meta_forwarder.setReader(shared_decoder.getMetaReader())
# use a dummy to start with; will be replaced by setMetaWriter().
self._meta_forwarder.setWriter(Buffer(Format.CHAR))
workers = [
self.dablin,
Downmix(Format.FLOAT),
]
workers = [
shift,
self.decoder,
self.dablin,
Downmix(Format.FLOAT),
]
super().__init__(workers)
def setReader(self, reader) -> None:
if self._shared_decoder is not None:
# Ignore the IQ reader passed by ClientDemodulatorChain.
# Wire DablinModule directly to the shared ETI output buffer instead.
# Cache the reader — getEtiReader() creates a new cursor each call.
if self._eti_reader is None:
self._eti_reader = self._shared_decoder.getEtiReader()
super().setReader(self._eti_reader)
else:
super().setReader(reader)
def _connect(self, w1, w2, buffer: Optional[Buffer] = None) -> None:
if isinstance(w2, EtiDecoder):
# eti decoder needs big chunks of data
# eti decoder needs big chunks of data (standalone mode only —
# EtiDecoder is not in the shared-mode worker list)
buffer = Buffer(w1.getOutputFormat(), size=2097152)
super()._connect(w1, w2, buffer)
@ -94,14 +160,33 @@ class Dablin(BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain,
return 48000
def stop(self):
self.processor.stop()
if self._shared_decoder is None:
self.processor.stop()
else:
self._meta_forwarder.stop()
if self._eti_reader is not None:
self._eti_reader.stop()
self._eti_reader = None
def setMetaWriter(self, writer: Writer) -> None:
self.processor.setWriter(writer)
if self._shared_decoder is None:
self.processor.setWriter(writer)
else:
self._meta_forwarder.setWriter(writer)
# Replay cached programme/ensemble metadata so the client sees the
# programme list immediately, without waiting for EtiDecoder to
# re-emit FIC data (which only happens once at initial decode).
cached = self._shared_decoder.getCachedMeta()
if cached:
import pickle
cached['mode'] = 'DAB' # required: JS DabMetaPanel.isSupported() checks data.mode
writer.write(pickle.dumps(cached))
def setDabServiceId(self, serviceId: int) -> None:
self.decoder.setServiceIdFilter([serviceId])
if self._shared_decoder is None:
self.decoder.setServiceIdFilter([serviceId])
self.dablin.setDabServiceId(serviceId)
def setDialFrequency(self, frequency: int) -> None:
self.processor.resetShift()
if self._shared_decoder is None:
self.processor.resetShift()

View file

@ -0,0 +1,65 @@
# Shared DAB EtiDecoder
## Problem
In the default OpenWebRX implementation, each WebSocket client tuning to a DAB
multiplex spawns its own independent `EtiDecoder` (the OFDM demodulator). With
multiple simultaneous clients on the same multiplex, N identical demodulators
compete for CPU:
- 5 clients → 5 EtiDecoder instances → ~5255% CPU on a Raspberry Pi 5
- CPU starvation causes OFDM lock-loss (`Lock lost` in logs), coarse time shift
spikes to ~130,000, and audible breakup for all listeners including Chrome
## Fix
A single `SharedDabDecoder` runs one `Shift + EtiDecoder + MetaProcessor`
pipeline per DAB multiplex. All clients consume from the shared ETI output
buffer via independent readers (`pycsdr.Buffer` supports multiple independent
reader cursors — the same mechanism used by `SpectrumThread` in `owrx/fft.py`).
Each client retains its own:
- `DablinModule` subprocess — service ID selection is per-client
- `MetaForwarder` — forwards programme labels / service list to the client's
meta WebSocket channel
## Architecture
```
DabDecoderManager (singleton)
|
SharedDabDecoder(sdr_id, center_freq)
[Shift → EtiDecoder → ETI Buffer]
|
MetaProcessor → Meta Buffer
/ | \
reader 1 reader 2 reader N
| | |
DablinModule(A) DablinModule(B) DablinModule(N)
MetaForwarder(A) MetaForwarder(B) MetaForwarder(N)
```
## Results (measured on Raspberry Pi 5, 4 concurrent clients)
| Metric | Before | After |
|--------|--------|-------|
| CPU (4 clients) | ~5255% | ~35% |
| Coarse time shift | ~130,000 (losing lock) | 18 (solid lock) |
| `Lock lost` events | frequent | zero |
| Programme list in browser | ✅ works | ✅ works |
## Files Changed
| File | Change |
|------|--------|
| `owrx/dab/__init__.py` | New — makes `owrx.dab` a proper Python package |
| `owrx/dab/manager.py` | New — `SharedDabDecoder` + `DabDecoderManager` |
| `csdr/chain/dablin.py` | Modified — `MetaForwarder` class + `shared_decoder` param |
| `owrx/dsp.py` | Modified — injects shared decoder, releases on stop/demod-change |
## Known Limitations
**Service switch race:** If the only client on a multiplex switches DAB service,
`setDemodulator` releases then immediately re-acquires the same key. Refcount
transiently hits 0, causing the shared decoder to stop and restart (~1s
re-lock). Acceptable — service switches are rare and user-initiated.

0
owrx/dab/__init__.py Normal file
View file

140
owrx/dab/manager.py Normal file
View file

@ -0,0 +1,140 @@
import threading
import logging
from csdreti.modules import EtiDecoder
from pycsdr.modules import Shift, Buffer
from pycsdr.types import Format
logger = logging.getLogger(__name__)
class SharedDabDecoder:
"""
Runs a single Shift + EtiDecoder + MetaProcessor pipeline for one DAB multiplex.
Multiple clients call getEtiReader() to receive independent readers into the shared
ETI output buffer each reader maintains its own cursor position, so clients consume
data at their own pace without interfering with each other.
"""
def __init__(self, sdr_id, center_freq, sdr_source):
self._sdr_id = sdr_id
self._center_freq = center_freq
self._sdr_source = sdr_source # saved so start() can call getBuffer()
self._iq_reader = None # initialised here so stop() is safe before start()
self._shift = Shift(0)
self._decoder = EtiDecoder()
# Large buffer between Shift and EtiDecoder — EtiDecoder needs big chunks to
# maintain OFDM frame sync.
self._shift_out = Buffer(Format.COMPLEX_FLOAT, size=2097152)
self._shift.setWriter(self._shift_out)
self._decoder.setReader(self._shift_out.getReader())
# ETI output buffer — clients each get an independent reader via getEtiReader().
# pycsdr.Buffer.getReader() creates an independent fan-out cursor (same mechanism
# used by SpectrumThread in owrx/fft.py): each reader has its own position and
# the buffer retains data until the slowest reader has consumed it.
self._eti_buffer = Buffer(Format.CHAR)
self._decoder.setWriter(self._eti_buffer)
# MetaProcessor handles coarse/fine frequency correction by nudging the Shift
# module. Import here to avoid circular import (MetaProcessor lives in dablin.py).
from csdr.chain.dablin import MetaProcessor
self._meta_buffer = Buffer(Format.CHAR)
self._decoder.setMetaWriter(self._meta_buffer)
self._processor = MetaProcessor(self._shift)
self._processor.setReader(self._meta_buffer.getReader())
self._processor.setWriter(Buffer(Format.CHAR)) # dummy — frequency correction only
def start(self):
"""Connect to the SDR IQ buffer and begin OFDM demodulation."""
self._iq_reader = self._sdr_source.getBuffer().getReader()
self._shift.setReader(self._iq_reader)
logger.info("SharedDabDecoder started — sdr=%s center_freq=%s Hz", self._sdr_id, self._center_freq)
def stop(self):
"""Tear down the demodulation pipeline. Called when last client disconnects."""
self._processor.stop()
if self._iq_reader is not None:
self._iq_reader.stop()
self._iq_reader = None
logger.info("SharedDabDecoder stopped — sdr=%s center_freq=%s Hz", self._sdr_id, self._center_freq)
def getEtiReader(self):
"""Return a new independent reader into the shared ETI output buffer."""
return self._eti_buffer.getReader()
def getMetaReader(self):
"""Return a new independent reader into the shared meta buffer.
Per-client MetaForwarder instances read from here to forward programme
labels and service info to each client's meta WebSocket channel."""
return self._meta_buffer.getReader()
def getCachedMeta(self):
"""Return the last stable metadata snapshot captured by MetaProcessor.
Used by Dablin.setMetaWriter() to replay programme/ensemble data to
clients that connect after the initial FIC decode."""
return dict(self._processor.cached_output)
class DabDecoderManager:
"""
Singleton pool of SharedDabDecoder instances, keyed by (sdr_id, center_freq).
Usage:
shared = DabDecoderManager.getShared().acquire(sdr_id, center_freq, sdr_source)
# ... use shared.getEtiReader() ...
DabDecoderManager.getShared().release(sdr_id, center_freq)
The first acquire() for a given key creates and starts the decoder.
Each subsequent acquire() increments the refcount and returns the same decoder.
The last release() stops and removes the decoder.
"""
_instance = None
_class_lock = threading.Lock() # guards singleton creation only
@classmethod
def getShared(cls):
with cls._class_lock:
if cls._instance is None:
cls._instance = DabDecoderManager()
return cls._instance
def __init__(self):
self._lock = threading.Lock() # instance-level lock guards _decoders + _refcounts
self._decoders = {} # (sdr_id, center_freq) → SharedDabDecoder
self._refcounts = {} # (sdr_id, center_freq) → int
def acquire(self, sdr_id, center_freq, sdr_source):
"""
Return the shared decoder for this multiplex, creating it if necessary.
Thread-safe. Increments refcount.
"""
key = (sdr_id, center_freq)
with self._lock:
if key not in self._decoders:
decoder = SharedDabDecoder(sdr_id, center_freq, sdr_source)
decoder.start()
self._decoders[key] = decoder
self._refcounts[key] = 0
self._refcounts[key] += 1
logger.debug("DAB decoder acquired — key=%s refcount=%d", key, self._refcounts[key])
return self._decoders[key]
def release(self, sdr_id, center_freq):
"""
Decrement refcount. Stops and removes the decoder when the last client releases.
Thread-safe. Safe to call if key is not present (no-op).
"""
key = (sdr_id, center_freq)
with self._lock:
if key not in self._refcounts:
return
self._refcounts[key] -= 1
logger.debug("DAB decoder released — key=%s refcount=%d", key, self._refcounts[key])
if self._refcounts[key] <= 0:
self._decoders[key].stop()
del self._decoders[key]
del self._refcounts[key]

View file

@ -409,6 +409,7 @@ class DspManager(SdrSourceEventClient, ClientDemodulatorSecondaryDspEventClient)
def __init__(self, handler, sdrSource):
self.handler = handler
self.sdrSource = sdrSource
self._dabKey = None # (sdr_id, center_freq) when shared DAB decoder is active
self.props = PropertyStack()
@ -584,12 +585,30 @@ class DspManager(SdrSourceEventClient, ClientDemodulatorSecondaryDspEventClient)
return FreeDV()
elif demod == "dab":
from csdr.chain.dablin import Dablin
return Dablin()
from owrx.dab.manager import DabDecoderManager
sdr_id = self.sdrSource.getId()
center_freq = self.props["center_freq"]
try:
shared = DabDecoderManager.getShared().acquire(sdr_id, center_freq, self.sdrSource)
self._dabKey = (sdr_id, center_freq)
return Dablin(shared_decoder=shared)
except Exception:
logger.exception("Shared DAB decoder failed, falling back to standalone")
# _dabKey is set only if acquire() succeeded but Dablin() raised;
# release it so the shared decoder refcount stays consistent.
if self._dabKey is not None:
DabDecoderManager.getShared().release(*self._dabKey)
self._dabKey = None
return Dablin()
elif demod == "empty":
from csdr.chain.analog import Empty
return Empty()
def setDemodulator(self, mod):
if self._dabKey is not None:
from owrx.dab.manager import DabDecoderManager
DabDecoderManager.getShared().release(*self._dabKey)
self._dabKey = None
self.chain.stopDemodulator()
try:
demodulator = self._getDemodulator(mod)
@ -739,6 +758,10 @@ class DspManager(SdrSourceEventClient, ClientDemodulatorSecondaryDspEventClient)
return unpickler
def stop(self):
if self._dabKey is not None:
from owrx.dab.manager import DabDecoderManager
DabDecoderManager.getShared().release(*self._dabKey)
self._dabKey = None
if self.chain:
self.chain.stop()
self.chain = None