implement active flatten transformation

This commit is contained in:
Jakob Ketterl 2023-05-08 18:17:11 +02:00
parent a88c02e2c2
commit 66258c74a8
4 changed files with 78 additions and 14 deletions

View file

@ -39,7 +39,7 @@ class ActiveListIndexDeleted(ActiveListChange):
class ActiveListListener(ABC):
@abstractmethod
def onListChange(self, changes: list[ActiveListChange]):
def onListChange(self, source: "ActiveList", changes: list[ActiveListChange]):
pass
@ -48,7 +48,7 @@ class ActiveListTransformationListener(ActiveListListener):
self.transformation = transformation
self.target = target
def onListChange(self, changes: list[ActiveListChange]):
def onListChange(self, source: "ActiveList", changes: list[ActiveListChange]):
for change in changes:
if isinstance(change, ActiveListIndexUpdated):
self.target[change.index] = self.transformation(change.newValue)
@ -64,7 +64,7 @@ class ActiveListFilterListener(ActiveListListener):
self.keyMap = keyMap
self.target = target
def onListChange(self, changes: list[ActiveListChange]):
def onListChange(self, source: "ActiveList", changes: list[ActiveListChange]):
for change in changes:
if isinstance(change, ActiveListIndexAdded):
if self.filter(change.newValue):
@ -87,6 +87,27 @@ class ActiveListFilterListener(ActiveListListener):
del self.keyMap[idx]
class ActiveListFlattenListener(ActiveListListener):
def __init__(self, source: "ActiveList", target: "ActiveList"):
self.source = source
self.target = target
for member in self.source:
member.addListener(self)
def getOffsetFor(self, source: "ActiveList"):
idx = self.source.index(source)
return sum(len(s) for s in self.source[0:idx])
def onListChange(self, source: "ActiveList", changes: list[ActiveListChange]):
for change in changes:
if isinstance(change, ActiveListIndexAdded):
self.target.insert(self.getOffsetFor(source) + change.index, change.newValue)
elif isinstance(change, ActiveListIndexUpdated):
self.target[self.getOffsetFor(source) + change.index] = change.newValue
elif isinstance(change, ActiveListIndexDeleted):
del self.target[self.getOffsetFor(source) + change.index]
class ActiveList:
def __init__(self, elements: list = None):
self.delegate = elements.copy() if elements is not None else []
@ -109,7 +130,7 @@ class ActiveList:
def __fireChanges(self, changes: list[ActiveListChange]):
for listener in self.listeners:
try:
listener.onListChange(changes)
listener.onListChange(self, changes)
except Exception:
logger.exception("Exception during onListChange notification")
@ -120,6 +141,9 @@ class ActiveList:
self.delegate.insert(index, value)
self.__fireChanges([ActiveListIndexInserted(index, value)])
def index(self, value):
return self.delegate.index(value)
def map(self, transform: callable):
res = ActiveList([transform(v) for v in self])
self.addListener(ActiveListTransformationListener(transform, res))
@ -137,7 +161,7 @@ class ActiveList:
def flatten(self):
res = ActiveList([y for x in self for y in x])
# TODO handle events
handler = ActiveListFlattenListener(self, res)
return res
def __setitem__(self, key, value):

View file

@ -8,7 +8,7 @@ from owrx.property import PropertyLayer, PropertyDeleted
from owrx.service.schedule import ServiceScheduler
from owrx.service.chain import ServiceDemodulatorChain
from owrx.modes import Modes, DigitalMode
from owrx.active.list import ActiveListListener, ActiveListChange, ActiveListIndexDeleted, ActiveListIndexAdded
from owrx.active.list import ActiveList, ActiveListListener, ActiveListChange, ActiveListIndexDeleted, ActiveListIndexAdded
from typing import Union, Optional
from csdr.chain.demodulator import BaseDemodulatorChain, ServiceDemodulator, DialFrequencyReceiver
from pycsdr.modules import Buffer
@ -316,7 +316,7 @@ class ServiceHandler(SdrSourceEventClient):
class SdrDeviceEventHandler(ActiveListListener):
def onListChange(self, changes: list[ActiveListChange]):
def onListChange(self, source: ActiveList, changes: list[ActiveListChange]):
for change in changes:
if isinstance(change, ActiveListIndexDeleted):
key = change.oldValue.getId()

View file

@ -12,7 +12,7 @@ from owrx.command import CommandMapper
from owrx.socket import getAvailablePort
from owrx.property import PropertyStack, PropertyLayer, PropertyFilter, PropertyCarousel, PropertyDeleted
from owrx.property.filter import ByLambda
from owrx.active.list import ActiveListListener, ActiveListChange, ActiveListIndexAdded, ActiveListIndexDeleted, ActiveListIndexUpdated
from owrx.active.list import ActiveList, ActiveListListener, ActiveListChange, ActiveListIndexAdded, ActiveListIndexDeleted, ActiveListIndexUpdated
from owrx.form.input import Input, TextInput, NumberInput, CheckboxInput, ModesInput, ExponentialInput
from owrx.form.input.converter import OptionalConverter
from owrx.form.input.device import GainInput, SchedulerInput, WaterfallLevelsInput
@ -81,7 +81,7 @@ class SdrProfileCarouselListener(ActiveListListener):
def __init__(self, carousel):
self.carousel = carousel
def onListChange(self, changes: list[ActiveListChange]):
def onListChange(self, source: ActiveList, changes: list[ActiveListChange]):
for change in changes:
if isinstance(change, ActiveListIndexAdded):
self.carousel.addProfile(change.newValue)

View file

@ -23,7 +23,8 @@ class ActiveListTest(TestCase):
list.addListener(listenerMock)
list[0] = "testvalue"
listenerMock.onListChange.assert_called_once()
changes, = listenerMock.onListChange.call_args.args
source, changes = listenerMock.onListChange.call_args.args
self.assertIs(source, list)
self.assertEqual(len(changes), 1)
self.assertIsInstance(changes[0], ActiveListIndexUpdated)
self.assertEqual(changes[0].index, 0)
@ -52,7 +53,8 @@ class ActiveListTest(TestCase):
list.addListener(listenerMock)
list.append("testvalue")
listenerMock.onListChange.assert_called_once()
changes, = listenerMock.onListChange.call_args.args
source, changes = listenerMock.onListChange.call_args.args
self.assertIs(source, list)
self.assertEqual(len(changes), 1)
self.assertIsInstance(changes[0], ActiveListIndexAppended)
self.assertEqual(changes[0].index, 0)
@ -70,7 +72,8 @@ class ActiveListTest(TestCase):
list.addListener(listenerMock)
del list[0]
listenerMock.onListChange.assert_called_once()
changes, = listenerMock.onListChange.call_args.args
source, changes = listenerMock.onListChange.call_args.args
self.assertIs(source, list)
self.assertEqual(len(changes), 1)
self.assertIsInstance(changes[0], ActiveListIndexDeleted)
self.assertEqual(changes[0].index, 0)
@ -94,7 +97,8 @@ class ActiveListTest(TestCase):
list.addListener(listenerMock)
list.insert(1, "value1.5")
listenerMock.onListChange.assert_called_once()
changes, = listenerMock.onListChange.call_args.args
source, changes = listenerMock.onListChange.call_args.args
self.assertIs(source, list)
self.assertEqual(len(changes), 1)
self.assertIsInstance(changes[0], ActiveListIndexInserted)
self.assertEqual(changes[0].index, 1)
@ -174,11 +178,47 @@ class ActiveListTest(TestCase):
del list[1]
self.assertEqual(len(filteredList), 1)
def testIndex(self):
list = ActiveList([1, 2, 3, 4, 5])
self.assertEqual(list.index(3), 2)
def testFlatten(self):
list = ActiveList([[1, 2], [3, 4]])
list = ActiveList([ActiveList([1, 2]), ActiveList([3, 4])])
flattenedList = list.flatten()
self.assertEqual(len(flattenedList), 4)
self.assertEqual(flattenedList[0], 1)
self.assertEqual(flattenedList[1], 2)
self.assertEqual(flattenedList[2], 3)
self.assertEqual(flattenedList[3], 4)
def testActiveFlattenAdd(self):
sublist = ActiveList([3, 4])
list = ActiveList([ActiveList([1, 2]), sublist, ActiveList([6, 7])])
flattenedList = list.flatten()
sublist.append(5)
self.assertEqual(len(flattenedList), 7)
self.assertEqual(flattenedList[4], 5)
def testActiveFlattenInsert(self):
sublist = ActiveList([3, 5])
list = ActiveList([ActiveList([1, 2]), sublist, ActiveList([6, 7])])
flattenedList = list.flatten()
sublist.insert(1, 4)
self.assertEqual(len(flattenedList), 7)
self.assertEqual(flattenedList[3], 4)
def testActiveFlattenUpdate(self):
sublist = ActiveList([3, 9, 5])
list = ActiveList([ActiveList([1, 2]), sublist, ActiveList([6, 7])])
flattenedList = list.flatten()
sublist[1] = 4
self.assertEqual(len(flattenedList), 7)
self.assertEqual(flattenedList[3], 4)
def testActiveFlattenDelete(self):
sublist = ActiveList([3, 4, 9, 5])
list = ActiveList([ActiveList([1, 2]), sublist, ActiveList([6, 7])])
flattenedList = list.flatten()
del sublist[2]
self.assertEqual(len(flattenedList), 7)
self.assertEqual(flattenedList[4], 5)