mirror of
https://github.com/meshcore-dev/meshcore_py.git
synced 2026-04-20 22:13:49 +00:00
Merge pull request #26 from meshcore-dev/fdlamotte/send_msg_reliable
quick sketch of send_msg_reliable
This commit is contained in:
commit
d65c04a7b3
5 changed files with 73 additions and 4 deletions
15
examples/ble_t1000_msg_retries.py
Executable file
15
examples/ble_t1000_msg_retries.py
Executable file
|
|
@ -0,0 +1,15 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import asyncio
|
||||
from meshcore import MeshCore
|
||||
from meshcore import BLEConnection
|
||||
|
||||
ADDRESS = "T1000" # node ble adress or name
|
||||
DEST = "993acd42fc779962c68c627829b32b111fa27a67d86b75c17460ff48c3102db4"
|
||||
MSG = "Hello World"
|
||||
|
||||
async def main () :
|
||||
mc = await MeshCore.create_ble(ADDRESS, debug=True)
|
||||
await mc.commands.send_msg_with_retry(DEST,MSG)
|
||||
|
||||
asyncio.run(main())
|
||||
|
|
@ -4,7 +4,7 @@ build-backend = "hatchling.build"
|
|||
|
||||
[project]
|
||||
name = "meshcore"
|
||||
version = "2.1.5"
|
||||
version = "2.1.6"
|
||||
authors = [
|
||||
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
|
||||
{ name="Alex Wolden", email="awolden@gmail.com" },
|
||||
|
|
|
|||
|
|
@ -76,6 +76,10 @@ class CommandHandlerBase:
|
|||
def set_dispatcher(self, dispatcher: EventDispatcher) -> None:
|
||||
self.dispatcher = dispatcher
|
||||
|
||||
def set_contact_getter_by_prefix(self, func: Callable[[str], Optional[Dict[str,Any]]]
|
||||
)-> None:
|
||||
self._get_contact_by_prefix = func
|
||||
|
||||
async def send(
|
||||
self,
|
||||
data: bytes,
|
||||
|
|
@ -166,4 +170,4 @@ class CommandHandlerBase:
|
|||
actual_timeout = timeout if timeout is not None and timeout > 0 else result.payload.get("suggested_timeout", 4000) / 800.0
|
||||
self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout)
|
||||
|
||||
return result
|
||||
return result
|
||||
|
|
|
|||
|
|
@ -59,7 +59,8 @@ class MessagingCommands(CommandHandlerBase):
|
|||
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
|
||||
|
||||
async def send_msg(
|
||||
self, dst: DestinationType, msg: str, timestamp: Optional[int] = None
|
||||
self, dst: DestinationType, msg: str, timestamp: Optional[int] = None,
|
||||
attempt=0
|
||||
) -> Event:
|
||||
dst_bytes = _validate_destination(dst)
|
||||
logger.debug(f"Sending message to {dst_bytes.hex()}: {msg}")
|
||||
|
|
@ -70,13 +71,61 @@ class MessagingCommands(CommandHandlerBase):
|
|||
timestamp = int(time.time())
|
||||
|
||||
data = (
|
||||
b"\x02\x00\x00"
|
||||
b"\x02\x00"
|
||||
+ attempt.to_bytes(1, "little")
|
||||
+ timestamp.to_bytes(4, "little")
|
||||
+ dst_bytes
|
||||
+ msg.encode("utf-8")
|
||||
)
|
||||
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
|
||||
|
||||
async def send_msg_with_retry (
|
||||
self, dst: DestinationType, msg: str, timestamp: Optional[int] = None,
|
||||
max_attempts=3, max_flood_attempts=2, flood_after=2, timeout=0
|
||||
) -> Event:
|
||||
|
||||
dst_bytes = _validate_destination(dst)
|
||||
contact = self._get_contact_by_prefix(dst_bytes.hex())
|
||||
|
||||
attempts = 0
|
||||
flood_attempts = 0
|
||||
if not contact is None :
|
||||
flood = contact["out_path_len"] == -1
|
||||
else:
|
||||
flood = False
|
||||
res = None
|
||||
while attempts < max_attempts and res is None \
|
||||
and (not flood or flood_attempts < max_flood_attempts):
|
||||
if attempts == flood_after : # change path to flood
|
||||
logger.info("Resetting path")
|
||||
rp_res = await self.reset_path(dst)
|
||||
if rp_res.type == EventType.ERROR:
|
||||
logger.error(f"Couldn't reset path {rp_res} continuing ...")
|
||||
else:
|
||||
flood = True
|
||||
if not contact is None:
|
||||
contact["out_path"] = ""
|
||||
contact["out_path_len"] = -1
|
||||
|
||||
if attempts > 0:
|
||||
logger.info(f"Retry sending msg: {attempts + 1}")
|
||||
|
||||
result = await self.send_msg(dst, msg, timestamp, attempt=attempts)
|
||||
if result.type == EventType.ERROR:
|
||||
logger.error(f"⚠️ Failed to send message: {result.payload}")
|
||||
|
||||
exp_ack = result.payload["expected_ack"].hex()
|
||||
timeout = result.payload["suggested_timeout"] / 1000 * 1.2 if timeout==0 else timeout
|
||||
res = await self.dispatcher.wait_for_event(EventType.ACK,
|
||||
attribute_filters={"code": exp_ack},
|
||||
timeout=timeout)
|
||||
|
||||
attempts = attempts + 1
|
||||
if flood :
|
||||
flood_attempts = flood_attempts + 1
|
||||
|
||||
return None if res is None else result
|
||||
|
||||
async def send_chan_msg(self, chan, msg, timestamp=None) -> Event:
|
||||
logger.debug(f"Sending channel message to channel {chan}: {msg}")
|
||||
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ class MeshCore:
|
|||
|
||||
self._reader = MessageReader(self.dispatcher)
|
||||
self.commands = CommandHandler(default_timeout=default_timeout)
|
||||
self.commands.set_contact_getter_by_prefix(self.get_contact_by_key_prefix)
|
||||
|
||||
# Set up logger
|
||||
if debug:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue