diff --git a/src/meshcore/commands/base.py b/src/meshcore/commands/base.py index 786dbf7..9e0f00e 100644 --- a/src/meshcore/commands/base.py +++ b/src/meshcore/commands/base.py @@ -64,6 +64,7 @@ class CommandHandlerBase: self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None self._reader: Optional[MessageReader] = None self.dispatcher: Optional[EventDispatcher] = None + self._mesh_request_lock = asyncio.Lock() self.default_timeout = ( default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT ) diff --git a/src/meshcore/commands/binary.py b/src/meshcore/commands/binary.py index 59ebe6b..f1578d0 100644 --- a/src/meshcore/commands/binary.py +++ b/src/meshcore/commands/binary.py @@ -19,122 +19,123 @@ class BinaryCommandHandler(CommandHandlerBase): return await self.req_status_sync(contact, timeout, min_timeout) async def req_status_sync(self, contact, timeout=0, min_timeout=0): - res = await self.send_binary_req( - contact, - BinaryReqType.STATUS, - timeout=timeout, - min_timeout=min_timeout - ) - if res.type == EventType.ERROR: - return None - - exp_tag = res.payload["expected_ack"].hex() - timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - timeout = timeout if min_timeout < timeout else min_timeout - - if self.dispatcher is None: - return None - - status_event = await self.dispatcher.wait_for_event( - EventType.STATUS_RESPONSE, - attribute_filters={"tag": exp_tag}, - timeout=timeout, - ) - - return status_event.payload if status_event else None + async with self._mesh_request_lock: + res = await self.send_binary_req( + contact, + BinaryReqType.STATUS, + timeout=timeout, + min_timeout=min_timeout + ) + if res.type == EventType.ERROR: + return None + + exp_tag = res.payload["expected_ack"].hex() + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if min_timeout < timeout else min_timeout + + if self.dispatcher is None: + return None + + status_event = await self.dispatcher.wait_for_event( + EventType.STATUS_RESPONSE, + attribute_filters={"tag": exp_tag}, + timeout=timeout, + ) + + return status_event.payload if status_event else None async def req_telemetry(self, contact, timeout=0, min_timeout=0): logger.error("*** please consider using req_telemetry_sync instead of req_telemetry") return await self.req_telemetry_sync(contact, timeout, min_timeout) async def req_telemetry_sync(self, contact, timeout=0, min_timeout=0): - res = await self.send_binary_req( - contact, - BinaryReqType.TELEMETRY, - timeout=timeout, - min_timeout=min_timeout - ) - if res.type == EventType.ERROR: - return None - - timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - timeout = timeout if min_timeout < timeout else min_timeout + async with self._mesh_request_lock: + res = await self.send_binary_req( + contact, + BinaryReqType.TELEMETRY, + timeout=timeout, + min_timeout=min_timeout + ) + if res.type == EventType.ERROR: + return None - if self.dispatcher is None: - return None - - # Listen for TELEMETRY_RESPONSE event - telem_event = await self.dispatcher.wait_for_event( - EventType.TELEMETRY_RESPONSE, - attribute_filters={"tag": res.payload["expected_ack"].hex()}, - timeout=timeout, - ) - - return telem_event.payload["lpp"] if telem_event else None + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if min_timeout < timeout else min_timeout + + if self.dispatcher is None: + return None + + telem_event = await self.dispatcher.wait_for_event( + EventType.TELEMETRY_RESPONSE, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, + timeout=timeout, + ) + + return telem_event.payload["lpp"] if telem_event else None async def req_mma(self, contact, timeout=0, min_timeout=0): logger.error("*** please consider using req_mma_sync instead of req_mma") return await self.req_mma_sync(contact, start, end, timeout,min_timeout) async def req_mma_sync(self, contact, start, end, timeout=0,min_timeout=0): - req = ( - start.to_bytes(4, "little", signed=False) - + end.to_bytes(4, "little", signed=False) - + b"\0\0" - ) - res = await self.send_binary_req( - contact, - BinaryReqType.MMA, - data=req, - timeout=timeout - ) - if res.type == EventType.ERROR: - return None - - timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - timeout = timeout if min_timeout < timeout else min_timeout - - if self.dispatcher is None: - return None - - # Listen for MMA_RESPONSE - mma_event = await self.dispatcher.wait_for_event( - EventType.MMA_RESPONSE, - attribute_filters={"tag": res.payload["expected_ack"].hex()}, - timeout=timeout, - ) - - return mma_event.payload["mma_data"] if mma_event else None + async with self._mesh_request_lock: + req = ( + start.to_bytes(4, "little", signed=False) + + end.to_bytes(4, "little", signed=False) + + b"\0\0" + ) + res = await self.send_binary_req( + contact, + BinaryReqType.MMA, + data=req, + timeout=timeout + ) + if res.type == EventType.ERROR: + return None + + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if min_timeout < timeout else min_timeout + + if self.dispatcher is None: + return None + + mma_event = await self.dispatcher.wait_for_event( + EventType.MMA_RESPONSE, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, + timeout=timeout, + ) + + return mma_event.payload["mma_data"] if mma_event else None async def req_acl(self, contact, timeout=0, min_timeout=0): logger.error("*** please consider using req_acl_sync instead of req_acl") return await self.req_acl_sync(contact, timeout, min_timeout) async def req_acl_sync(self, contact, timeout=0, min_timeout=0): - req = b"\0\0" - res = await self.send_binary_req( - contact, - BinaryReqType.ACL, - data=req, - timeout=timeout - ) - if res.type == EventType.ERROR: - return None - - timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - timeout = timeout if timeout > min_timeout else min_timeout - - if self.dispatcher is None: - return None - - # Listen for ACL_RESPONSE event with matching tag - acl_event = await self.dispatcher.wait_for_event( - EventType.ACL_RESPONSE, - attribute_filters={"tag": res.payload["expected_ack"].hex()}, - timeout=timeout, - ) - - return acl_event.payload["acl_data"] if acl_event else None + async with self._mesh_request_lock: + req = b"\0\0" + res = await self.send_binary_req( + contact, + BinaryReqType.ACL, + data=req, + timeout=timeout + ) + if res.type == EventType.ERROR: + return None + + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if timeout > min_timeout else min_timeout + + if self.dispatcher is None: + return None + + acl_event = await self.dispatcher.wait_for_event( + EventType.ACL_RESPONSE, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, + timeout=timeout, + ) + + return acl_event.payload["acl_data"] if acl_event else None async def req_neighbours_async(self, contact, @@ -172,32 +173,31 @@ class BinaryCommandHandler(CommandHandlerBase): timeout=0, min_timeout=0 ): + async with self._mesh_request_lock: + res = await self.req_neighbours_async(contact, + count=count, + offset=offset, + order_by=order_by, + pubkey_prefix_length=pubkey_prefix_length, + timeout=timeout, + min_timeout=min_timeout) - res = await self.req_neighbours_async(contact, - count=count, - offset=offset, - order_by=order_by, - pubkey_prefix_length=pubkey_prefix_length, + if res is None or res.type == EventType.ERROR: + return None + + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if min_timeout < timeout else min_timeout + + if self.dispatcher is None: + return None + + neighbours_event = await self.dispatcher.wait_for_event( + EventType.NEIGHBOURS_RESPONSE, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, timeout=timeout, - min_timeout=min_timeout) + ) - if res is None or res.type == EventType.ERROR: - return None - - timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - timeout = timeout if min_timeout < timeout else min_timeout - - if self.dispatcher is None: - return None - - # Listen for NEIGHBOUR_RESPONSE - neighbours_event = await self.dispatcher.wait_for_event( - EventType.NEIGHBOURS_RESPONSE, - attribute_filters={"tag": res.payload["expected_ack"].hex()}, - timeout=timeout, - ) - - return neighbours_event.payload if neighbours_event else None + return neighbours_event.payload if neighbours_event else None # do several queries if not all neighbours have been obtained async def fetch_all_neighbours(self, @@ -259,30 +259,31 @@ class BinaryCommandHandler(CommandHandlerBase): ) async def req_regions_sync(self, contact, timeout=0, min_timeout=0): - res = await self.req_regions_async(contact, timeout, min_timeout) + async with self._mesh_request_lock: + res = await self.req_regions_async(contact, timeout, min_timeout) - if res.type == EventType.ERROR: - return None + if res.type == EventType.ERROR: + return None - timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - timeout = timeout if timeout > min_timeout else min_timeout - - if self.dispatcher is None: - return None - - region_event = await self.dispatcher.wait_for_event( - EventType.BINARY_RESPONSE, - attribute_filters={"tag": res.payload["expected_ack"].hex()}, - timeout=timeout, - ) - - if region_event is None: - return None + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if timeout > min_timeout else min_timeout - pkt = bytes().fromhex(region_event.payload["data"]) - pbuf = io.BytesIO(pkt) - tag_again = pbuf.read(4) - return pbuf.read().decode("utf-8", "ignore").strip("\x00") + if self.dispatcher is None: + return None + + region_event = await self.dispatcher.wait_for_event( + EventType.BINARY_RESPONSE, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, + timeout=timeout, + ) + + if region_event is None: + return None + + pkt = bytes().fromhex(region_event.payload["data"]) + pbuf = io.BytesIO(pkt) + tag_again = pbuf.read(4) + return pbuf.read().decode("utf-8", "ignore").strip("\x00") async def req_owner_async(self, contact, timeout=0, min_timeout=0): req = b"\0" @@ -294,33 +295,33 @@ class BinaryCommandHandler(CommandHandlerBase): ) async def req_owner_sync(self, contact, timeout=0, min_timeout=0): + async with self._mesh_request_lock: + res = await self.req_owner_async(contact, timeout, min_timeout) - res = await self.req_owner_async(contact, timeout, min_timeout) + if res.type == EventType.ERROR: + return None - if res.type == EventType.ERROR: - return None + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if timeout > min_timeout else min_timeout - timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - timeout = timeout if timeout > min_timeout else min_timeout - - if self.dispatcher is None: - return None - - owner_event = await self.dispatcher.wait_for_event( - EventType.BINARY_RESPONSE, - attribute_filters={"tag": res.payload["expected_ack"].hex()}, - timeout=timeout, - ) - - if owner_event is None: - return None + if self.dispatcher is None: + return None - pkt = bytes().fromhex(owner_event.payload["data"]) - pbuf = io.BytesIO(pkt) - tag_again = pbuf.read(4) - strings = pbuf.read().decode("utf-8", "ignore").split("\n", 1) + owner_event = await self.dispatcher.wait_for_event( + EventType.BINARY_RESPONSE, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, + timeout=timeout, + ) - return dict(name=strings[0], owner=strings[1].strip("\x00")) + if owner_event is None: + return None + + pkt = bytes().fromhex(owner_event.payload["data"]) + pbuf = io.BytesIO(pkt) + tag_again = pbuf.read(4) + strings = pbuf.read().decode("utf-8", "ignore").split("\n", 1) + + return dict(name=strings[0], owner=strings[1].strip("\x00")) async def req_basic_async(self, contact, timeout=0, min_timeout=0): req = b"\0" @@ -332,25 +333,25 @@ class BinaryCommandHandler(CommandHandlerBase): ) async def req_basic_sync(self, contact, timeout=0, min_timeout=0): + async with self._mesh_request_lock: + res = await self.req_basic_async(contact, timeout, min_timeout) - res = await self.req_basic_async(contact, timeout, min_timeout) + if res.type == EventType.ERROR: + return None - if res.type == EventType.ERROR: - return None + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if timeout > min_timeout else min_timeout - timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - timeout = timeout if timeout > min_timeout else min_timeout - - if self.dispatcher is None: - return None - - basic_event = await self.dispatcher.wait_for_event( - EventType.BINARY_RESPONSE, - attribute_filters={"tag": res.payload["expected_ack"].hex()}, - timeout=timeout, - ) - - if basic_event is None: - return None + if self.dispatcher is None: + return None - return basic_event.payload + basic_event = await self.dispatcher.wait_for_event( + EventType.BINARY_RESPONSE, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, + timeout=timeout, + ) + + if basic_event is None: + return None + + return basic_event.payload diff --git a/src/meshcore/commands/messaging.py b/src/meshcore/commands/messaging.py index 1150e6c..162add3 100644 --- a/src/meshcore/commands/messaging.py +++ b/src/meshcore/commands/messaging.py @@ -30,6 +30,18 @@ class MessagingCommands(CommandHandlerBase): data = b"\x1a" + dst_bytes + pwd.encode("utf-8") return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + async def send_login_sync(self, dst: DestinationType, pwd: str, timeout: float = 10.0) -> Optional[Event]: + """Send login to a remote node and wait for the response.""" + async with self._mesh_request_lock: + result = await self.send_login(dst, pwd) + if result is None or result.type == EventType.ERROR: + return None + login_event = await self.dispatcher.wait_for_event( + EventType.LOGIN_SUCCESS, + timeout=timeout, + ) + return login_event + async def send_logout(self, dst: DestinationType) -> Event: dst_bytes = _validate_destination(dst, prefix_length=32) data = b"\x1d" + dst_bytes @@ -85,7 +97,7 @@ class MessagingCommands(CommandHandlerBase): self, dst: DestinationType, msg: str, timestamp: Optional[int] = None, max_attempts=3, max_flood_attempts=2, flood_after=2, timeout=0, min_timeout=0 ) -> Event: - + try: dst_bytes = _validate_destination(dst, prefix_length=32) # with 32 bytes we can reset to flood @@ -105,8 +117,8 @@ class MessagingCommands(CommandHandlerBase): # we can't know if we're flood without fetching all contacts # if we have a full key (meaning we can reset path) consider direct # else consider flood - flood = len(dst_bytes) < 32 - logger.info(f"send_msg_with_retry: can't determine if flood, assume {flood}") + flood = len(dst_bytes) < 32 + logger.info(f"send_msg_with_retry: can't determine if flood, assume {flood}") res = None while attempts < max_attempts and res is None \ and (not flood or flood_attempts < max_flood_attempts): @@ -122,8 +134,8 @@ class MessagingCommands(CommandHandlerBase): contact["out_path_len"] = -1 if attempts > 0: - logger.info(f"Retry sending msg: {attempts + 1}") - + logger.info(f"Retry sending msg: {attempts + 1}") + result = await self.send_msg(dst, msg, timestamp, attempt=attempts) if result.type == EventType.ERROR: logger.error(f"⚠️ Failed to send message: {result.payload}") @@ -131,14 +143,14 @@ class MessagingCommands(CommandHandlerBase): exp_ack = result.payload["expected_ack"].hex() timeout = result.payload["suggested_timeout"] / 1000 * 1.2 if timeout==0 else timeout timeout = timeout if timeout > min_timeout else min_timeout - res = await self.dispatcher.wait_for_event(EventType.ACK, - attribute_filters={"code": exp_ack}, + res = await self.dispatcher.wait_for_event(EventType.ACK, + attribute_filters={"code": exp_ack}, timeout=timeout) attempts = attempts + 1 if flood : flood_attempts = flood_attempts + 1 - + return None if res is None else result async def send_chan_msg(self, chan: int, msg: str, timestamp: Optional[int|bytes] = None) -> Event: @@ -177,6 +189,18 @@ class MessagingCommands(CommandHandlerBase): data = b"\x34\x00" + dst_bytes return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + async def send_path_discovery_sync(self, dst: DestinationType, timeout: float = 30.0) -> Optional[Event]: + """Send path discovery request and wait for the response.""" + async with self._mesh_request_lock: + result = await self.send_path_discovery(dst) + if result is None or result.type == EventType.ERROR: + return None + path_event = await self.dispatcher.wait_for_event( + EventType.PATH_RESPONSE, + timeout=timeout, + ) + return path_event + async def send_trace( self, auth_code: int = 0,