impl of send_msg_reliable

This commit is contained in:
Florent de Lamotte 2025-09-05 10:38:20 +02:00
parent 0c40cf917e
commit 292195473d

View file

@ -59,7 +59,8 @@ class MessagingCommands(CommandHandlerBase):
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_msg(
self, dst: DestinationType, msg: str, timestamp: Optional[int] = None
self, dst: DestinationType, msg: str, timestamp: Optional[int] = None,
attempt=0
) -> Event:
dst_bytes = _validate_destination(dst)
logger.debug(f"Sending message to {dst_bytes.hex()}: {msg}")
@ -70,13 +71,49 @@ class MessagingCommands(CommandHandlerBase):
timestamp = int(time.time())
data = (
b"\x02\x00\x00"
b"\x02\x00"
+ attempt.to_bytes(1, "little")
+ timestamp.to_bytes(4, "little")
+ dst_bytes
+ msg.encode("utf-8")
)
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_msg_reliable(
self, contact, msg: str, timestamp: Optional[int] = None,
max_attempts=3, flood_after=2, timeout=0
) -> Event:
attempts = 0
res = None
while attempts < max_attempts and res is None:
if attempts == flood_after : # change path to flood
logger.info("Resetting path")
rp_res = await self.reset_path(contact)
if rp_res.type == EventType.ERROR:
logger.error(f"Couldn't reset path {rp_res} continuing ...")
else:
contact["out_path"] = ""
contact["out_path_len"] = -1
if attempts > 0:
logger.info(f"Retry sending msg: {attempts + 1}")
result = await self.send_msg(contact, msg, timestamp, attempt=attempts)
if result.type == EventType.ERROR:
print(f"⚠️ Failed to send message: {result.payload}")
return None
exp_ack = result.payload["expected_ack"].hex()
timeout = result.payload["suggested_timeout"] / 1000 * 1.2 if timeout==0 else timeout
res = await self.dispatcher.wait_for_event(EventType.ACK,
attribute_filters={"code": exp_ack},
timeout=timeout)
attempts = attempts + 1
return None if res is None else result
async def send_chan_msg(self, chan, msg, timestamp=None) -> Event:
logger.debug(f"Sending channel message to channel {chan}: {msg}")