Add mesh request lock to serialize firmware-bound mesh commands

The companion firmware can only track one outstanding mesh request at a
time — clearPendingReqs() zeros all pending response flags before each
outgoing mesh request. Overlapping mesh commands cause silent response
drops.

Adds _mesh_request_lock to CommandHandlerBase and wraps all _sync
methods with it. Also adds send_login_sync and send_path_discovery_sync
for complete round-trip serialization of those commands.

Local commands (get_bat, get_channel, set_time, send_msg, etc.) are
unaffected — they don't trigger clearPendingReqs() on the firmware.
This commit is contained in:
Alex Wolden 2026-04-04 23:12:32 -07:00
parent 40a70222c8
commit 9fe1b16a2d
3 changed files with 216 additions and 197 deletions

View file

@ -64,6 +64,7 @@ class CommandHandlerBase:
self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None
self._reader: Optional[MessageReader] = None self._reader: Optional[MessageReader] = None
self.dispatcher: Optional[EventDispatcher] = None self.dispatcher: Optional[EventDispatcher] = None
self._mesh_request_lock = asyncio.Lock()
self.default_timeout = ( self.default_timeout = (
default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT
) )

View file

@ -19,122 +19,123 @@ class BinaryCommandHandler(CommandHandlerBase):
return await self.req_status_sync(contact, timeout, min_timeout) return await self.req_status_sync(contact, timeout, min_timeout)
async def req_status_sync(self, contact, timeout=0, min_timeout=0): async def req_status_sync(self, contact, timeout=0, min_timeout=0):
res = await self.send_binary_req( async with self._mesh_request_lock:
contact, res = await self.send_binary_req(
BinaryReqType.STATUS, contact,
timeout=timeout, BinaryReqType.STATUS,
min_timeout=min_timeout timeout=timeout,
) min_timeout=min_timeout
if res.type == EventType.ERROR: )
return None if res.type == EventType.ERROR:
return None
exp_tag = res.payload["expected_ack"].hex()
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout exp_tag = res.payload["expected_ack"].hex()
timeout = timeout if min_timeout < timeout else min_timeout timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if min_timeout < timeout else min_timeout
if self.dispatcher is None:
return None if self.dispatcher is None:
return None
status_event = await self.dispatcher.wait_for_event(
EventType.STATUS_RESPONSE, status_event = await self.dispatcher.wait_for_event(
attribute_filters={"tag": exp_tag}, EventType.STATUS_RESPONSE,
timeout=timeout, attribute_filters={"tag": exp_tag},
) timeout=timeout,
)
return status_event.payload if status_event else None
return status_event.payload if status_event else None
async def req_telemetry(self, contact, timeout=0, min_timeout=0): async def req_telemetry(self, contact, timeout=0, min_timeout=0):
logger.error("*** please consider using req_telemetry_sync instead of req_telemetry") logger.error("*** please consider using req_telemetry_sync instead of req_telemetry")
return await self.req_telemetry_sync(contact, timeout, min_timeout) return await self.req_telemetry_sync(contact, timeout, min_timeout)
async def req_telemetry_sync(self, contact, timeout=0, min_timeout=0): async def req_telemetry_sync(self, contact, timeout=0, min_timeout=0):
res = await self.send_binary_req( async with self._mesh_request_lock:
contact, res = await self.send_binary_req(
BinaryReqType.TELEMETRY, contact,
timeout=timeout, BinaryReqType.TELEMETRY,
min_timeout=min_timeout timeout=timeout,
) min_timeout=min_timeout
if res.type == EventType.ERROR: )
return None if res.type == EventType.ERROR:
return None
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if min_timeout < timeout else min_timeout
if self.dispatcher is None: timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
return None timeout = timeout if min_timeout < timeout else min_timeout
# Listen for TELEMETRY_RESPONSE event if self.dispatcher is None:
telem_event = await self.dispatcher.wait_for_event( return None
EventType.TELEMETRY_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()}, telem_event = await self.dispatcher.wait_for_event(
timeout=timeout, EventType.TELEMETRY_RESPONSE,
) attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
return telem_event.payload["lpp"] if telem_event else None )
return telem_event.payload["lpp"] if telem_event else None
async def req_mma(self, contact, timeout=0, min_timeout=0): async def req_mma(self, contact, timeout=0, min_timeout=0):
logger.error("*** please consider using req_mma_sync instead of req_mma") logger.error("*** please consider using req_mma_sync instead of req_mma")
return await self.req_mma_sync(contact, start, end, timeout,min_timeout) return await self.req_mma_sync(contact, start, end, timeout,min_timeout)
async def req_mma_sync(self, contact, start, end, timeout=0,min_timeout=0): async def req_mma_sync(self, contact, start, end, timeout=0,min_timeout=0):
req = ( async with self._mesh_request_lock:
start.to_bytes(4, "little", signed=False) req = (
+ end.to_bytes(4, "little", signed=False) start.to_bytes(4, "little", signed=False)
+ b"\0\0" + end.to_bytes(4, "little", signed=False)
) + b"\0\0"
res = await self.send_binary_req( )
contact, res = await self.send_binary_req(
BinaryReqType.MMA, contact,
data=req, BinaryReqType.MMA,
timeout=timeout data=req,
) timeout=timeout
if res.type == EventType.ERROR: )
return None if res.type == EventType.ERROR:
return None
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if min_timeout < timeout else min_timeout timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if min_timeout < timeout else min_timeout
if self.dispatcher is None:
return None if self.dispatcher is None:
return None
# Listen for MMA_RESPONSE
mma_event = await self.dispatcher.wait_for_event( mma_event = await self.dispatcher.wait_for_event(
EventType.MMA_RESPONSE, EventType.MMA_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()}, attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout, timeout=timeout,
) )
return mma_event.payload["mma_data"] if mma_event else None return mma_event.payload["mma_data"] if mma_event else None
async def req_acl(self, contact, timeout=0, min_timeout=0): async def req_acl(self, contact, timeout=0, min_timeout=0):
logger.error("*** please consider using req_acl_sync instead of req_acl") logger.error("*** please consider using req_acl_sync instead of req_acl")
return await self.req_acl_sync(contact, timeout, min_timeout) return await self.req_acl_sync(contact, timeout, min_timeout)
async def req_acl_sync(self, contact, timeout=0, min_timeout=0): async def req_acl_sync(self, contact, timeout=0, min_timeout=0):
req = b"\0\0" async with self._mesh_request_lock:
res = await self.send_binary_req( req = b"\0\0"
contact, res = await self.send_binary_req(
BinaryReqType.ACL, contact,
data=req, BinaryReqType.ACL,
timeout=timeout data=req,
) timeout=timeout
if res.type == EventType.ERROR: )
return None if res.type == EventType.ERROR:
return None
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if timeout > min_timeout else min_timeout timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if timeout > min_timeout else min_timeout
if self.dispatcher is None:
return None if self.dispatcher is None:
return None
# Listen for ACL_RESPONSE event with matching tag
acl_event = await self.dispatcher.wait_for_event( acl_event = await self.dispatcher.wait_for_event(
EventType.ACL_RESPONSE, EventType.ACL_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()}, attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout, timeout=timeout,
) )
return acl_event.payload["acl_data"] if acl_event else None return acl_event.payload["acl_data"] if acl_event else None
async def req_neighbours_async(self, async def req_neighbours_async(self,
contact, contact,
@ -172,32 +173,31 @@ class BinaryCommandHandler(CommandHandlerBase):
timeout=0, timeout=0,
min_timeout=0 min_timeout=0
): ):
async with self._mesh_request_lock:
res = await self.req_neighbours_async(contact,
count=count,
offset=offset,
order_by=order_by,
pubkey_prefix_length=pubkey_prefix_length,
timeout=timeout,
min_timeout=min_timeout)
res = await self.req_neighbours_async(contact, if res is None or res.type == EventType.ERROR:
count=count, return None
offset=offset,
order_by=order_by, timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
pubkey_prefix_length=pubkey_prefix_length, timeout = timeout if min_timeout < timeout else min_timeout
if self.dispatcher is None:
return None
neighbours_event = await self.dispatcher.wait_for_event(
EventType.NEIGHBOURS_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout, timeout=timeout,
min_timeout=min_timeout) )
if res is None or res.type == EventType.ERROR: return neighbours_event.payload if neighbours_event else None
return None
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if min_timeout < timeout else min_timeout
if self.dispatcher is None:
return None
# Listen for NEIGHBOUR_RESPONSE
neighbours_event = await self.dispatcher.wait_for_event(
EventType.NEIGHBOURS_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
return neighbours_event.payload if neighbours_event else None
# do several queries if not all neighbours have been obtained # do several queries if not all neighbours have been obtained
async def fetch_all_neighbours(self, async def fetch_all_neighbours(self,
@ -259,30 +259,31 @@ class BinaryCommandHandler(CommandHandlerBase):
) )
async def req_regions_sync(self, contact, timeout=0, min_timeout=0): async def req_regions_sync(self, contact, timeout=0, min_timeout=0):
res = await self.req_regions_async(contact, timeout, min_timeout) async with self._mesh_request_lock:
res = await self.req_regions_async(contact, timeout, min_timeout)
if res.type == EventType.ERROR: if res.type == EventType.ERROR:
return None return None
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if timeout > min_timeout else min_timeout timeout = timeout if timeout > min_timeout else min_timeout
if self.dispatcher is None:
return None
region_event = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
if region_event is None:
return None
pkt = bytes().fromhex(region_event.payload["data"]) if self.dispatcher is None:
pbuf = io.BytesIO(pkt) return None
tag_again = pbuf.read(4)
return pbuf.read().decode("utf-8", "ignore").strip("\x00") region_event = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
if region_event is None:
return None
pkt = bytes().fromhex(region_event.payload["data"])
pbuf = io.BytesIO(pkt)
tag_again = pbuf.read(4)
return pbuf.read().decode("utf-8", "ignore").strip("\x00")
async def req_owner_async(self, contact, timeout=0, min_timeout=0): async def req_owner_async(self, contact, timeout=0, min_timeout=0):
req = b"\0" req = b"\0"
@ -294,33 +295,33 @@ class BinaryCommandHandler(CommandHandlerBase):
) )
async def req_owner_sync(self, contact, timeout=0, min_timeout=0): async def req_owner_sync(self, contact, timeout=0, min_timeout=0):
async with self._mesh_request_lock:
res = await self.req_owner_async(contact, timeout, min_timeout)
res = await self.req_owner_async(contact, timeout, min_timeout) if res.type == EventType.ERROR:
return None
if res.type == EventType.ERROR: timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
return None timeout = timeout if timeout > min_timeout else min_timeout
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout if self.dispatcher is None:
timeout = timeout if timeout > min_timeout else min_timeout return None
if self.dispatcher is None:
return None
owner_event = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
if owner_event is None:
return None
pkt = bytes().fromhex(owner_event.payload["data"]) owner_event = await self.dispatcher.wait_for_event(
pbuf = io.BytesIO(pkt) EventType.BINARY_RESPONSE,
tag_again = pbuf.read(4) attribute_filters={"tag": res.payload["expected_ack"].hex()},
strings = pbuf.read().decode("utf-8", "ignore").split("\n", 1) timeout=timeout,
)
return dict(name=strings[0], owner=strings[1].strip("\x00")) if owner_event is None:
return None
pkt = bytes().fromhex(owner_event.payload["data"])
pbuf = io.BytesIO(pkt)
tag_again = pbuf.read(4)
strings = pbuf.read().decode("utf-8", "ignore").split("\n", 1)
return dict(name=strings[0], owner=strings[1].strip("\x00"))
async def req_basic_async(self, contact, timeout=0, min_timeout=0): async def req_basic_async(self, contact, timeout=0, min_timeout=0):
req = b"\0" req = b"\0"
@ -332,25 +333,25 @@ class BinaryCommandHandler(CommandHandlerBase):
) )
async def req_basic_sync(self, contact, timeout=0, min_timeout=0): async def req_basic_sync(self, contact, timeout=0, min_timeout=0):
async with self._mesh_request_lock:
res = await self.req_basic_async(contact, timeout, min_timeout)
res = await self.req_basic_async(contact, timeout, min_timeout) if res.type == EventType.ERROR:
return None
if res.type == EventType.ERROR: timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
return None timeout = timeout if timeout > min_timeout else min_timeout
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout if self.dispatcher is None:
timeout = timeout if timeout > min_timeout else min_timeout return None
if self.dispatcher is None:
return None
basic_event = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
if basic_event is None:
return None
return basic_event.payload basic_event = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
if basic_event is None:
return None
return basic_event.payload

View file

@ -30,6 +30,18 @@ class MessagingCommands(CommandHandlerBase):
data = b"\x1a" + dst_bytes + pwd.encode("utf-8") data = b"\x1a" + dst_bytes + pwd.encode("utf-8")
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_login_sync(self, dst: DestinationType, pwd: str, timeout: float = 10.0) -> Optional[Event]:
"""Send login to a remote node and wait for the response."""
async with self._mesh_request_lock:
result = await self.send_login(dst, pwd)
if result is None or result.type == EventType.ERROR:
return None
login_event = await self.dispatcher.wait_for_event(
EventType.LOGIN_SUCCESS,
timeout=timeout,
)
return login_event
async def send_logout(self, dst: DestinationType) -> Event: async def send_logout(self, dst: DestinationType) -> Event:
dst_bytes = _validate_destination(dst, prefix_length=32) dst_bytes = _validate_destination(dst, prefix_length=32)
data = b"\x1d" + dst_bytes data = b"\x1d" + dst_bytes
@ -85,12 +97,9 @@ class MessagingCommands(CommandHandlerBase):
self, dst: DestinationType, msg: str, timestamp: Optional[int] = None, self, dst: DestinationType, msg: str, timestamp: Optional[int] = None,
max_attempts=3, max_flood_attempts=2, flood_after=2, timeout=0, min_timeout=0 max_attempts=3, max_flood_attempts=2, flood_after=2, timeout=0, min_timeout=0
) -> Event: ) -> Event:
try: try:
dst_bytes = _validate_destination(dst, prefix_length=32) dst_bytes = _validate_destination(dst, prefix_length=32)
# with 32 bytes we can reset to flood
except ValueError: except ValueError:
# but if we can't, we'll assume we're flood
dst_bytes = _validate_destination(dst, prefix_length=6) dst_bytes = _validate_destination(dst, prefix_length=6)
contact = self._get_contact_by_prefix(dst_bytes.hex()) contact = self._get_contact_by_prefix(dst_bytes.hex())
@ -99,18 +108,14 @@ class MessagingCommands(CommandHandlerBase):
if not contact is None : if not contact is None :
flood = contact["out_path_len"] == -1 flood = contact["out_path_len"] == -1
if len(dst_bytes) < 32: if len(dst_bytes) < 32:
# if we have a contact, then we can get a 32 bytes key !
dst_bytes = _validate_destination(contact, prefix_length=32) dst_bytes = _validate_destination(contact, prefix_length=32)
else: else:
# we can't know if we're flood without fetching all contacts flood = len(dst_bytes) < 32
# if we have a full key (meaning we can reset path) consider direct logger.info(f"send_msg_with_retry: can't determine if flood, assume {flood}")
# else consider flood
flood = len(dst_bytes) < 32
logger.info(f"send_msg_with_retry: can't determine if flood, assume {flood}")
res = None res = None
while attempts < max_attempts and res is None \ while attempts < max_attempts and res is None \
and (not flood or flood_attempts < max_flood_attempts): and (not flood or flood_attempts < max_flood_attempts):
if attempts == flood_after and not flood: # change path to flood if attempts == flood_after and not flood:
logger.info("Resetting path") logger.info("Resetting path")
rp_res = await self.reset_path(dst_bytes) rp_res = await self.reset_path(dst_bytes)
if rp_res.type == EventType.ERROR: if rp_res.type == EventType.ERROR:
@ -122,23 +127,23 @@ class MessagingCommands(CommandHandlerBase):
contact["out_path_len"] = -1 contact["out_path_len"] = -1
if attempts > 0: if attempts > 0:
logger.info(f"Retry sending msg: {attempts + 1}") logger.info(f"Retry sending msg: {attempts + 1}")
result = await self.send_msg(dst, msg, timestamp, attempt=attempts) result = await self.send_msg(dst, msg, timestamp, attempt=attempts)
if result.type == EventType.ERROR: if result.type == EventType.ERROR:
logger.error(f"⚠️ Failed to send message: {result.payload}") logger.error(f"Failed to send message: {result.payload}")
exp_ack = result.payload["expected_ack"].hex() exp_ack = result.payload["expected_ack"].hex()
timeout = result.payload["suggested_timeout"] / 1000 * 1.2 if timeout==0 else timeout timeout = result.payload["suggested_timeout"] / 1000 * 1.2 if timeout==0 else timeout
timeout = timeout if timeout > min_timeout else min_timeout timeout = timeout if timeout > min_timeout else min_timeout
res = await self.dispatcher.wait_for_event(EventType.ACK, res = await self.dispatcher.wait_for_event(EventType.ACK,
attribute_filters={"code": exp_ack}, attribute_filters={"code": exp_ack},
timeout=timeout) timeout=timeout)
attempts = attempts + 1 attempts = attempts + 1
if flood : if flood :
flood_attempts = flood_attempts + 1 flood_attempts = flood_attempts + 1
return None if res is None else result return None if res is None else result
async def send_chan_msg(self, chan: int, msg: str, timestamp: Optional[int|bytes] = None) -> Event: async def send_chan_msg(self, chan: int, msg: str, timestamp: Optional[int|bytes] = None) -> Event:
@ -177,6 +182,18 @@ class MessagingCommands(CommandHandlerBase):
data = b"\x34\x00" + dst_bytes data = b"\x34\x00" + dst_bytes
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_path_discovery_sync(self, dst: DestinationType, timeout: float = 30.0) -> Optional[Event]:
"""Send path discovery request and wait for the response."""
async with self._mesh_request_lock:
result = await self.send_path_discovery(dst)
if result is None or result.type == EventType.ERROR:
return None
path_event = await self.dispatcher.wait_for_event(
EventType.PATH_RESPONSE,
timeout=timeout,
)
return path_event
async def send_trace( async def send_trace(
self, self,
auth_code: int = 0, auth_code: int = 0,