From 307e517f5e75a53e401602867f9794f55683a85e Mon Sep 17 00:00:00 2001 From: Florent Date: Sun, 9 Nov 2025 16:51:54 +0100 Subject: [PATCH] req_neighbours --- pyproject.toml | 2 +- src/meshcore/commands/base.py | 4 +- src/meshcore/commands/binary.py | 111 ++++++++++++++++++++++++++++++++ src/meshcore/events.py | 1 + src/meshcore/packets.py | 1 + src/meshcore/reader.py | 42 +++++++++++- 6 files changed, 155 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cf04cbf..adfcfcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "meshcore" -version = "2.1.23" +version = "2.1.24" authors = [ { name="Florent de Lamotte", email="florent@frizoncorrea.fr" }, { name="Alex Wolden", email="awolden@gmail.com" }, diff --git a/src/meshcore/commands/base.py b/src/meshcore/commands/base.py index 6b2232e..ae2776a 100644 --- a/src/meshcore/commands/base.py +++ b/src/meshcore/commands/base.py @@ -163,7 +163,7 @@ class CommandHandlerBase: return Event(EventType.OK, {}) # attached at base because its a common method - async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, timeout=None, min_timeout=0) -> Event: + async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, context={}, timeout=None, min_timeout=0) -> Event: dst_bytes = _validate_destination(dst, prefix_length=32) pubkey_prefix = _validate_destination(dst, prefix_length=6) logger.debug(f"Binary request to {dst_bytes.hex()}") @@ -180,6 +180,6 @@ class CommandHandlerBase: # Use provided timeout or fallback to suggested timeout (with 5s default) actual_timeout = timeout if timeout is not None and timeout > 0 else result.payload.get("suggested_timeout", 4000) / 800.0 actual_timeout = min_timeout if actual_timeout < min_timeout else actual_timeout - self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout) + self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout, context=context) return result diff --git a/src/meshcore/commands/binary.py b/src/meshcore/commands/binary.py index 4314bc5..c47e5f0 100644 --- a/src/meshcore/commands/binary.py +++ b/src/meshcore/commands/binary.py @@ -1,4 +1,6 @@ +import asyncio import logging +import random from .base import CommandHandlerBase from ..events import EventType @@ -131,3 +133,112 @@ class BinaryCommandHandler(CommandHandlerBase): ) return acl_event.payload["acl_data"] if acl_event else None + + async def req_neighbours_async(self, + contact, + count=255, + offset=0, + order_by=0, + pubkey_prefix_length=4, + timeout=0, + min_timeout=0 + ): + req = (b"\x00" # version : 0 + + count.to_bytes(1, "little", signed=False) + + offset.to_bytes(2, "little", signed=False) + + order_by.to_bytes(1, "little", signed=False) + + pubkey_prefix_length.to_bytes(1, "little", signed=False) + + random.randint(1, 0xFFFFFFFF).to_bytes(4, "little", signed=False) + ) + + logger.debug(f"Sending binary neighbours req, count: {count}, offset: {offset} {req.hex()}") + + return await self.send_binary_req ( + contact, + BinaryReqType.NEIGHBOURS, + data=req, + timeout=timeout, + context={"pubkey_prefix_length": pubkey_prefix_length} + ) + + async def req_neighbours_sync(self, + contact, + count=255, + offset=0, + order_by=0, + pubkey_prefix_length=4, + timeout=0, + min_timeout=0 + ): + + res = await self.req_neighbours_async(contact, + count=count, + offset=offset, + order_by=order_by, + pubkey_prefix_length=pubkey_prefix_length, + timeout=timeout, + min_timeout=min_timeout) + + if res is None or res.type == EventType.ERROR: + return None + + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + timeout = timeout if min_timeout < timeout else min_timeout + + if self.dispatcher is None: + return None + + # Listen for NEIGHBOUR_RESPONSE + neighbours_event = await self.dispatcher.wait_for_event( + EventType.NEIGHBOURS_RESPONSE, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, + timeout=timeout, + ) + + return neighbours_event.payload if neighbours_event else None + + # do several queries if not all neighbours have been obtained + async def fetch_all_neighbours(self, + contact, + order_by=0, + pubkey_prefix_length=4, + timeout=0, + min_timeout=0 + ): + + # Initial request + res = await self.req_neighbours_sync(contact, + count=255, + offset=0, + order_by=order_by, + pubkey_prefix_length=pubkey_prefix_length, + timeout=timeout, + min_timeout=min_timeout) + + if res is None: + return None + + neighbours_count = res["neighbours_count"] # total neighbours + results_count = res["results_count"] # obtained neighbours + + del res["tag"] + + while results_count < neighbours_count: + #await asyncio.sleep(2) # wait 2s before next fetch + next_res = await self.req_neighbours_sync(contact, + count=255, + offset=results_count, + order_by=order_by, + pubkey_prefix_length=pubkey_prefix_length, + timeout=timeout, + min_timeout=min_timeout+5) # requests are close, so let's have some more timeout + + if next_res is None : + return res # caller should check it has everything + + results_count = results_count + next_res["results_count"] + + res["results_count"] = results_count + res["neighbours"] += next_res["neighbours"] + + return res diff --git a/src/meshcore/events.py b/src/meshcore/events.py index fb889c0..ac523f7 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -46,6 +46,7 @@ class EventType(Enum): DISABLED = "disabled" CONTROL_DATA = "control_data" DISCOVER_RESPONSE = "discover_response" + NEIGHBOURS_RESPONSE = "neighbours_response" # Command response types OK = "command_ok" diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py index 62f45ee..57c1755 100644 --- a/src/meshcore/packets.py +++ b/src/meshcore/packets.py @@ -6,6 +6,7 @@ class BinaryReqType(Enum): TELEMETRY = 0x03 MMA = 0x04 ACL = 0x05 + NEIGHBOURS = 0x06 class ControlType(Enum): NODE_DISCOVER_REQ = 0x80 diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index ace0fff..828357f 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -23,7 +23,7 @@ class MessageReader: # Track pending binary requests by tag for proper response parsing self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at} - def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float): + def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float, context={}): """Register a pending binary request for proper response parsing""" # Clean up expired requests before adding new one self.cleanup_expired_requests() @@ -32,7 +32,8 @@ class MessageReader: self.pending_binary_requests[tag] = { "request_type": request_type, "pubkey_prefix": prefix, - "expires_at": expires_at + "expires_at": expires_at, + "context": context # optional info we want to keep from req to resp } logger.debug(f"Registered binary request: tag={tag}, type={request_type}, expires in {timeout_seconds}s") @@ -519,6 +520,7 @@ class MessageReader: if tag in self.pending_binary_requests: request_type = self.pending_binary_requests[tag]["request_type"] pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] + context = self.pending_binary_requests[tag]["context"] del self.pending_binary_requests[tag] logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") @@ -558,6 +560,40 @@ class MessageReader: ) except Exception as e: logger.error(f"Error parsing binary ACL response: {e}") + + elif request_type == BinaryReqType.NEIGHBOURS: + try: + pk_plen = context["pubkey_prefix_length"] + bbuf = io.BytesIO(response_data) + + res = { + "pubkey_prefix": pubkey_prefix, + "tag": tag + } + res.update(context) # add context in result + + res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) + results_count = int.from_bytes(bbuf.read(2), "little", signed=True) + res["results_count"] = results_count + + neighbours_list = [] + + for _ in range (results_count): + neighb = {} + neighb["pubkey"] = bbuf.read(pk_plen).hex() + neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) + neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 + neighbours_list.append(neighb) + + res["neighbours"] = neighbours_list + + await self.dispatcher.dispatch( + Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + + except Exception as e: + logger.error(f"Error parsing binary NEIGHBOURS response: {e}") + else: logger.debug(f"No tracked request found for binary response tag {tag}") @@ -623,7 +659,7 @@ class MessageReader: if len(pubkey) < 32: pubkey = pubkey[0:8] else: - pubkey = pubkey[0:32] + pubkey = pubkey[0:32] ndr["pubkey"] = pubkey.hex()