openwebrx/csdr/chain/analog.py
2024-01-18 16:59:10 +01:00

128 lines
4 KiB
Python

from csdr.chain.demodulator import BaseDemodulatorChain, FixedIfSampleRateChain, HdAudio, DeemphasisTauChain, \
MetaProvider, RdsChain
from pycsdr.modules import AmDemod, DcBlock, FmDemod, Limit, NfmDeemphasis, Agc, WfmDeemphasis, FractionalDecimator, \
RealPart, Writer, Buffer
from pycsdr.types import Format, AgcProfile
from csdr.chain.redsea import Redsea
from typing import Optional
from owrx.feature import FeatureDetector
class Am(BaseDemodulatorChain):
def __init__(self):
agc = Agc(Format.FLOAT)
agc.setProfile(AgcProfile.SLOW)
agc.setInitialGain(200)
workers = [
AmDemod(),
DcBlock(),
agc,
]
super().__init__(workers)
class NFm(BaseDemodulatorChain):
def __init__(self, sampleRate: int):
self.sampleRate = sampleRate
agc = Agc(Format.FLOAT)
agc.setProfile(AgcProfile.SLOW)
agc.setMaxGain(3)
workers = [
FmDemod(),
Limit(),
NfmDeemphasis(sampleRate),
agc,
]
super().__init__(workers)
def setSampleRate(self, sampleRate: int) -> None:
if sampleRate == self.sampleRate:
return
self.sampleRate = sampleRate
self.replace(2, NfmDeemphasis(sampleRate))
class WFm(BaseDemodulatorChain, FixedIfSampleRateChain, DeemphasisTauChain, HdAudio, MetaProvider, RdsChain):
def __init__(self, sampleRate: int, tau: float, rdsRbds: bool):
self.sampleRate = sampleRate
self.tau = tau
self.rdsRbds = rdsRbds
self.limit = Limit()
# this buffer is used to tap into the raw audio stream for redsea RDS decoding
self.metaTapBuffer = Buffer(Format.FLOAT)
workers = [
FmDemod(),
self.limit,
FractionalDecimator(Format.FLOAT, 200000.0 / self.sampleRate, prefilter=True),
WfmDeemphasis(self.sampleRate, self.tau),
]
self.metaChain = None
self.metaWriter = None
super().__init__(workers)
def _connect(self, w1, w2, buffer: Optional[Buffer] = None) -> None:
if w1 is self.limit:
buffer = self.metaTapBuffer
super()._connect(w1, w2, buffer)
def getFixedIfSampleRate(self):
return 200000
def setDeemphasisTau(self, tau: float) -> None:
if tau == self.tau:
return
self.tau = tau
self.replace(3, WfmDeemphasis(self.sampleRate, self.tau))
def setSampleRate(self, sampleRate: int) -> None:
if sampleRate == self.sampleRate:
return
self.sampleRate = sampleRate
self.replace(2, FractionalDecimator(Format.FLOAT, 200000.0 / self.sampleRate, prefilter=True))
self.replace(3, WfmDeemphasis(self.sampleRate, self.tau))
def setMetaWriter(self, writer: Writer) -> None:
if not FeatureDetector().is_available("redsea"):
return
if self.metaChain is None:
self.metaChain = Redsea(self.getFixedIfSampleRate(), self.rdsRbds)
self.metaChain.setReader(self.metaTapBuffer.getReader())
self.metaWriter = writer
self.metaChain.setWriter(self.metaWriter)
def stop(self):
super().stop()
if self.metaChain is not None:
self.metaChain.stop()
self.metaChain = None
self.metaWriter = None
def setRdsRbds(self, rdsRbds: bool) -> None:
self.rdsRbds = rdsRbds
if self.metaChain is not None:
self.metaChain.stop()
self.metaChain = Redsea(self.getFixedIfSampleRate(), self.rdsRbds)
self.metaChain.setReader(self.metaTapBuffer.getReader())
self.metaChain.setWriter(self.metaWriter)
class Ssb(BaseDemodulatorChain):
def __init__(self):
workers = [
RealPart(),
Agc(Format.FLOAT),
]
super().__init__(workers)
class Empty(BaseDemodulatorChain):
def __init__(self):
super().__init__([])
def getOutputFormat(self) -> Format:
return Format.FLOAT
def setWriter(self, writer):
pass