diff --git a/examples/ble_t1000_msg_retries.py b/examples/ble_t1000_msg_retries.py new file mode 100755 index 0000000..24ec27b --- /dev/null +++ b/examples/ble_t1000_msg_retries.py @@ -0,0 +1,15 @@ +#!/usr/bin/python + +import asyncio +from meshcore import MeshCore +from meshcore import BLEConnection + +ADDRESS = "T1000" # node ble adress or name +DEST = "993acd42fc779962c68c627829b32b111fa27a67d86b75c17460ff48c3102db4" +MSG = "Hello World" + +async def main () : + mc = await MeshCore.create_ble(ADDRESS, debug=True) + await mc.commands.send_msg_with_retry(DEST,MSG) + +asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index e4d681d..878fcb7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "meshcore" -version = "2.1.5" +version = "2.1.6" authors = [ { name="Florent de Lamotte", email="florent@frizoncorrea.fr" }, { name="Alex Wolden", email="awolden@gmail.com" }, diff --git a/src/meshcore/commands/base.py b/src/meshcore/commands/base.py index 8f416b1..cf4ae16 100644 --- a/src/meshcore/commands/base.py +++ b/src/meshcore/commands/base.py @@ -76,6 +76,10 @@ class CommandHandlerBase: def set_dispatcher(self, dispatcher: EventDispatcher) -> None: self.dispatcher = dispatcher + def set_contact_getter_by_prefix(self, func: Callable[[str], Optional[Dict[str,Any]]] + )-> None: + self._get_contact_by_prefix = func + async def send( self, data: bytes, @@ -166,4 +170,4 @@ class CommandHandlerBase: actual_timeout = timeout if timeout is not None and timeout > 0 else result.payload.get("suggested_timeout", 4000) / 800.0 self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout) - return result \ No newline at end of file + return result diff --git a/src/meshcore/commands/messaging.py b/src/meshcore/commands/messaging.py index f511aa0..683ba8e 100644 --- a/src/meshcore/commands/messaging.py +++ b/src/meshcore/commands/messaging.py @@ -59,7 +59,8 @@ class MessagingCommands(CommandHandlerBase): return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) async def send_msg( - self, dst: DestinationType, msg: str, timestamp: Optional[int] = None + self, dst: DestinationType, msg: str, timestamp: Optional[int] = None, + attempt=0 ) -> Event: dst_bytes = _validate_destination(dst) logger.debug(f"Sending message to {dst_bytes.hex()}: {msg}") @@ -70,13 +71,61 @@ class MessagingCommands(CommandHandlerBase): timestamp = int(time.time()) data = ( - b"\x02\x00\x00" + b"\x02\x00" + + attempt.to_bytes(1, "little") + timestamp.to_bytes(4, "little") + dst_bytes + msg.encode("utf-8") ) return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + async def send_msg_with_retry ( + self, dst: DestinationType, msg: str, timestamp: Optional[int] = None, + max_attempts=3, max_flood_attempts=2, flood_after=2, timeout=0 + ) -> Event: + + dst_bytes = _validate_destination(dst) + contact = self._get_contact_by_prefix(dst_bytes.hex()) + + attempts = 0 + flood_attempts = 0 + if not contact is None : + flood = contact["out_path_len"] == -1 + else: + flood = False + res = None + while attempts < max_attempts and res is None \ + and (not flood or flood_attempts < max_flood_attempts): + if attempts == flood_after : # change path to flood + logger.info("Resetting path") + rp_res = await self.reset_path(dst) + if rp_res.type == EventType.ERROR: + logger.error(f"Couldn't reset path {rp_res} continuing ...") + else: + flood = True + if not contact is None: + contact["out_path"] = "" + contact["out_path_len"] = -1 + + if attempts > 0: + logger.info(f"Retry sending msg: {attempts + 1}") + + result = await self.send_msg(dst, msg, timestamp, attempt=attempts) + if result.type == EventType.ERROR: + logger.error(f"⚠️ Failed to send message: {result.payload}") + + exp_ack = result.payload["expected_ack"].hex() + timeout = result.payload["suggested_timeout"] / 1000 * 1.2 if timeout==0 else timeout + res = await self.dispatcher.wait_for_event(EventType.ACK, + attribute_filters={"code": exp_ack}, + timeout=timeout) + + attempts = attempts + 1 + if flood : + flood_attempts = flood_attempts + 1 + + return None if res is None else result + async def send_chan_msg(self, chan, msg, timestamp=None) -> Event: logger.debug(f"Sending channel message to channel {chan}: {msg}") diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index 84e533f..910f608 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -37,6 +37,7 @@ class MeshCore: self._reader = MessageReader(self.dispatcher) self.commands = CommandHandler(default_timeout=default_timeout) + self.commands.set_contact_getter_by_prefix(self.get_contact_by_key_prefix) # Set up logger if debug: