openwebrx/csdr/module/__init__.py

232 lines
6.1 KiB
Python
Raw Permalink Normal View History

2021-09-06 15:05:33 +02:00
from pycsdr.modules import Module as BaseModule
from pycsdr.modules import Reader, Writer, Buffer
2021-09-06 15:05:33 +02:00
from pycsdr.types import Format
from abc import ABCMeta, abstractmethod
2021-09-06 20:00:14 +02:00
from threading import Thread
from io import BytesIO
from subprocess import Popen, PIPE, TimeoutExpired
2021-09-08 13:47:46 +02:00
from functools import partial
2021-09-06 20:00:14 +02:00
import pickle
import logging
2023-09-03 23:48:56 +02:00
import json
logger = logging.getLogger(__name__)
2021-09-06 15:05:33 +02:00
class Module(BaseModule, metaclass=ABCMeta):
def __init__(self):
self.reader = None
self.writer = None
super().__init__()
2021-09-06 15:05:33 +02:00
def setReader(self, reader: Reader) -> None:
self.reader = reader
2021-09-06 15:05:33 +02:00
def setWriter(self, writer: Writer) -> None:
self.writer = writer
2021-09-06 15:05:33 +02:00
@abstractmethod
def getInputFormat(self) -> Format:
pass
@abstractmethod
def getOutputFormat(self) -> Format:
pass
2021-09-06 20:00:14 +02:00
def pump(self, read, write):
def copy():
while True:
data = None
try:
data = read()
except ValueError:
pass
2021-09-30 23:04:59 +02:00
except BrokenPipeError:
break
if data is None or isinstance(data, bytes) and len(data) == 0:
break
try:
write(data)
except BrokenPipeError:
break
return copy
2021-09-06 20:00:14 +02:00
2021-09-07 14:45:52 +02:00
class AutoStartModule(Module, metaclass=ABCMeta):
2021-09-06 20:00:14 +02:00
def _checkStart(self) -> None:
if self.reader is not None and self.writer is not None:
self.start()
def setReader(self, reader: Reader) -> None:
super().setReader(reader)
self._checkStart()
def setWriter(self, writer: Writer) -> None:
super().setWriter(writer)
self._checkStart()
2021-09-07 14:45:52 +02:00
@abstractmethod
def start(self):
pass
class ThreadModule(AutoStartModule, Thread, metaclass=ABCMeta):
def __init__(self):
self.doRun = True
super().__init__()
Thread.__init__(self)
2021-09-06 20:00:14 +02:00
@abstractmethod
def run(self):
pass
def stop(self):
self.doRun = False
self.reader.stop()
2021-09-07 14:45:52 +02:00
def start(self):
2024-01-24 22:37:20 +01:00
# don't start twice.
if self.is_alive():
return
2021-09-07 14:45:52 +02:00
Thread.start(self)
2021-09-06 20:00:14 +02:00
class PickleModule(ThreadModule):
def getInputFormat(self) -> Format:
return Format.CHAR
def getOutputFormat(self) -> Format:
return Format.CHAR
def run(self):
while self.doRun:
data = self.reader.read()
if data is None:
self.doRun = False
break
io = BytesIO(data.tobytes())
try:
while True:
output = self.process(pickle.load(io))
if output is not None:
self.writer.write(pickle.dumps(output))
except EOFError:
pass
@abstractmethod
def process(self, input):
pass
2021-09-07 17:31:32 +02:00
class LineBasedModule(ThreadModule, metaclass=ABCMeta):
def __init__(self):
self.retained = bytes()
super().__init__()
def getInputFormat(self) -> Format:
return Format.CHAR
def getOutputFormat(self) -> Format:
return Format.CHAR
def run(self):
while self.doRun:
data = self.reader.read()
if data is None:
self.doRun = False
else:
self.retained += data
lines = self.retained.split(b"\n")
# keep the last line
# this should either be empty if the last char was \n
# or an incomplete line if the read returned early
self.retained = lines[-1]
# log all completed lines
for line in lines[0:-1]:
parsed = self.process(line)
if parsed is not None:
self.writer.write(pickle.dumps(parsed))
@abstractmethod
def process(self, line: bytes) -> any:
pass
2023-09-03 23:48:56 +02:00
class JsonParser(LineBasedModule):
def __init__(self, mode: str):
self.mode = mode
super().__init__()
def process(self, line):
try:
2023-09-05 23:28:46 +02:00
msg = json.loads(line)
2023-09-03 23:48:56 +02:00
msg["mode"] = self.mode
logger.debug(msg)
return msg
except json.JSONDecodeError:
2024-01-18 02:08:26 +01:00
logger.exception("error parsing decoder json")
2023-09-03 23:48:56 +02:00
2021-09-07 17:31:32 +02:00
class PopenModule(AutoStartModule, metaclass=ABCMeta):
def __init__(self):
self.process = None
super().__init__()
@abstractmethod
def getCommand(self):
pass
def _getProcess(self):
return Popen(self.getCommand(), stdin=PIPE, stdout=PIPE)
2021-09-07 17:31:32 +02:00
def start(self):
self.process = self._getProcess()
# resume in case the reader has been stop()ed before
self.reader.resume()
2021-09-07 17:31:32 +02:00
Thread(target=self.pump(self.reader.read, self.process.stdin.write)).start()
2023-02-14 18:36:17 +01:00
Thread(target=self.pump(partial(self.process.stdout.read1, 1024), self.writer.write)).start()
2021-09-07 17:31:32 +02:00
def stop(self):
if self.process is not None:
# Try terminating normally, kill if failed to terminate
try:
self.process.terminate()
self.process.wait(3)
except TimeoutExpired:
self.process.kill()
2021-09-07 17:31:32 +02:00
self.process = None
self.reader.stop()
class LogReader(Thread):
def __init__(self, prefix: str, buffer: Buffer):
self.reader = buffer.getReader()
self.logger = logging.getLogger(prefix)
self.retained = bytes()
super().__init__()
self.start()
def run(self) -> None:
while True:
data = self.reader.read()
if data is None:
return
self.retained += data
lines = self.retained.split(b"\n")
# keep the last line
# this should either be empty if the last char was \n
# or an incomplete line if the read returned early
self.retained = lines[-1]
# log all completed lines
for line in lines[0:-1]:
2023-09-17 04:15:08 +02:00
self.logger.info("{}: {}".format("STDOUT", line.decode(errors="replace")))
def stop(self):
self.reader.stop()