From d61942307892aa0ffa8580ac639bf66dd10b54da Mon Sep 17 00:00:00 2001 From: Florent de Lamotte Date: Wed, 22 Oct 2025 10:21:07 +0200 Subject: [PATCH] timeout for each contact in get_contacts --- pyproject.toml | 2 +- src/meshcore/commands/base.py | 87 ++++++++++++++++++-------------- src/meshcore/commands/contact.py | 27 +++++++++- src/meshcore/events.py | 1 + src/meshcore/reader.py | 1 + 5 files changed, 77 insertions(+), 41 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8182f46..2f68287 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "meshcore" -version = "2.1.12" +version = "2.1.13" authors = [ { name="Florent de Lamotte", email="florent@frizoncorrea.fr" }, { name="Alex Wolden", email="awolden@gmail.com" }, diff --git a/src/meshcore/commands/base.py b/src/meshcore/commands/base.py index 7cdcce3..6b2232e 100644 --- a/src/meshcore/commands/base.py +++ b/src/meshcore/commands/base.py @@ -80,6 +80,53 @@ class CommandHandlerBase: )-> None: self._get_contact_by_prefix = func + async def wait_for_events( + self, + expected_events: Optional[Union[EventType, List[EventType]]] = None, + timeout: Optional[float] = None, + ) -> Event: + try: + # Convert single event to list if needed + if not isinstance(expected_events, list): + expected_events = [expected_events] + + logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") + + # Create futures for all expected events + futures = [] + for event_type in expected_events: + future = asyncio.create_task( + self.dispatcher.wait_for_event(event_type, {}, timeout) + ) + futures.append(future) + + # Wait for the first event to complete or all to timeout + done, pending = await asyncio.wait( + futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED + ) + + # Cancel all pending futures + for future in pending: + future.cancel() + + # Check if any future completed successfully + for future in done: + event = await future + if event: + return event + + # Create an error event when no event is received + return Event(EventType.ERROR, {"reason": "no_event_received"}) + except asyncio.TimeoutError: + logger.debug(f"Command timed out {data}") + return Event(EventType.ERROR, {"reason": "timeout"}) + except Exception as e: + logger.debug(f"Command error: {e}") + return Event(EventType.ERROR, {"error": str(e)}) + + return Event(EventType.ERROR, {}) + + async def send( self, data: bytes, @@ -110,45 +157,9 @@ class CommandHandlerBase: await self._sender_func(data) if expected_events: - try: - # Convert single event to list if needed - if not isinstance(expected_events, list): - expected_events = [expected_events] - - logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") - - # Create futures for all expected events - futures = [] - for event_type in expected_events: - future = asyncio.create_task( - self.dispatcher.wait_for_event(event_type, {}, timeout) - ) - futures.append(future) - - # Wait for the first event to complete or all to timeout - done, pending = await asyncio.wait( - futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED - ) - - # Cancel all pending futures - for future in pending: - future.cancel() - - # Check if any future completed successfully - for future in done: - event = await future - if event: - return event - - # Create an error event when no event is received - return Event(EventType.ERROR, {"reason": "no_event_received"}) - except asyncio.TimeoutError: - logger.debug(f"Command timed out {data}") - return Event(EventType.ERROR, {"reason": "timeout"}) - except Exception as e: - logger.debug(f"Command error: {e}") - return Event(EventType.ERROR, {"error": str(e)}) # For commands that don't expect events, return a success event + return await self.wait_for_events(expected_events, timeout) + return Event(EventType.OK, {}) # attached at base because its a common method diff --git a/src/meshcore/commands/contact.py b/src/meshcore/commands/contact.py index 00617ae..64a8e4d 100644 --- a/src/meshcore/commands/contact.py +++ b/src/meshcore/commands/contact.py @@ -8,12 +8,35 @@ logger = logging.getLogger("meshcore") class ContactCommands(CommandHandlerBase): - async def get_contacts(self, lastmod=0) -> Event: + async def get_contacts(self, lastmod=0, anim=False) -> Event: logger.debug("Getting contacts") data = b"\x04" if lastmod > 0: data = data + lastmod.to_bytes(4, "little") - return await self.send(data, [EventType.CONTACTS, EventType.ERROR], timeout=30) + if anim: + print("Fetching contacts ", end="", flush=True) + # wait first event + res = await self.send(data) + while True: + # wait next event + res = await self.wait_for_events( + [EventType.NEXT_CONTACT, EventType.CONTACTS, EventType.ERROR], + timeout=5) + if res is None: # Timeout + if anim: + print(" Timeout") + return res + if res.type == EventType.ERROR: + if anim: + print(" Error") + return res + elif res.type == EventType.CONTACTS: + if anim: + print(" Done") + return res + elif res.type == EventType.NEXT_CONTACT: + if anim: + print(".", end="", flush=True) async def reset_path(self, key: DestinationType) -> Event: key_bytes = _validate_destination(key, prefix_length=32) diff --git a/src/meshcore/events.py b/src/meshcore/events.py index 4fbcc6d..913430b 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -21,6 +21,7 @@ class EventType(Enum): DEVICE_INFO = "device_info" MSG_SENT = "message_sent" NEW_CONTACT = "new_contact" + NEXT_CONTACT = "next_contact" # Push notifications ADVERTISEMENT = "advertisement" diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 7671509..aa98a4a 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -99,6 +99,7 @@ class MessageReader: if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) else: + await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) self.contacts[c["public_key"]] = c elif packet_type_value == PacketType.CONTACT_END.value: