req_neighbours

This commit is contained in:
Florent 2025-11-09 16:51:54 +01:00
parent dea2f74eae
commit 307e517f5e
6 changed files with 155 additions and 6 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "meshcore"
version = "2.1.23"
version = "2.1.24"
authors = [
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
{ name="Alex Wolden", email="awolden@gmail.com" },

View file

@ -163,7 +163,7 @@ class CommandHandlerBase:
return Event(EventType.OK, {})
# attached at base because its a common method
async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, timeout=None, min_timeout=0) -> Event:
async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, context={}, timeout=None, min_timeout=0) -> Event:
dst_bytes = _validate_destination(dst, prefix_length=32)
pubkey_prefix = _validate_destination(dst, prefix_length=6)
logger.debug(f"Binary request to {dst_bytes.hex()}")
@ -180,6 +180,6 @@ class CommandHandlerBase:
# Use provided timeout or fallback to suggested timeout (with 5s default)
actual_timeout = timeout if timeout is not None and timeout > 0 else result.payload.get("suggested_timeout", 4000) / 800.0
actual_timeout = min_timeout if actual_timeout < min_timeout else actual_timeout
self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout)
self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout, context=context)
return result

View file

@ -1,4 +1,6 @@
import asyncio
import logging
import random
from .base import CommandHandlerBase
from ..events import EventType
@ -131,3 +133,112 @@ class BinaryCommandHandler(CommandHandlerBase):
)
return acl_event.payload["acl_data"] if acl_event else None
async def req_neighbours_async(self,
contact,
count=255,
offset=0,
order_by=0,
pubkey_prefix_length=4,
timeout=0,
min_timeout=0
):
req = (b"\x00" # version : 0
+ count.to_bytes(1, "little", signed=False)
+ offset.to_bytes(2, "little", signed=False)
+ order_by.to_bytes(1, "little", signed=False)
+ pubkey_prefix_length.to_bytes(1, "little", signed=False)
+ random.randint(1, 0xFFFFFFFF).to_bytes(4, "little", signed=False)
)
logger.debug(f"Sending binary neighbours req, count: {count}, offset: {offset} {req.hex()}")
return await self.send_binary_req (
contact,
BinaryReqType.NEIGHBOURS,
data=req,
timeout=timeout,
context={"pubkey_prefix_length": pubkey_prefix_length}
)
async def req_neighbours_sync(self,
contact,
count=255,
offset=0,
order_by=0,
pubkey_prefix_length=4,
timeout=0,
min_timeout=0
):
res = await self.req_neighbours_async(contact,
count=count,
offset=offset,
order_by=order_by,
pubkey_prefix_length=pubkey_prefix_length,
timeout=timeout,
min_timeout=min_timeout)
if res is None or res.type == EventType.ERROR:
return None
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
timeout = timeout if min_timeout < timeout else min_timeout
if self.dispatcher is None:
return None
# Listen for NEIGHBOUR_RESPONSE
neighbours_event = await self.dispatcher.wait_for_event(
EventType.NEIGHBOURS_RESPONSE,
attribute_filters={"tag": res.payload["expected_ack"].hex()},
timeout=timeout,
)
return neighbours_event.payload if neighbours_event else None
# do several queries if not all neighbours have been obtained
async def fetch_all_neighbours(self,
contact,
order_by=0,
pubkey_prefix_length=4,
timeout=0,
min_timeout=0
):
# Initial request
res = await self.req_neighbours_sync(contact,
count=255,
offset=0,
order_by=order_by,
pubkey_prefix_length=pubkey_prefix_length,
timeout=timeout,
min_timeout=min_timeout)
if res is None:
return None
neighbours_count = res["neighbours_count"] # total neighbours
results_count = res["results_count"] # obtained neighbours
del res["tag"]
while results_count < neighbours_count:
#await asyncio.sleep(2) # wait 2s before next fetch
next_res = await self.req_neighbours_sync(contact,
count=255,
offset=results_count,
order_by=order_by,
pubkey_prefix_length=pubkey_prefix_length,
timeout=timeout,
min_timeout=min_timeout+5) # requests are close, so let's have some more timeout
if next_res is None :
return res # caller should check it has everything
results_count = results_count + next_res["results_count"]
res["results_count"] = results_count
res["neighbours"] += next_res["neighbours"]
return res

View file

@ -46,6 +46,7 @@ class EventType(Enum):
DISABLED = "disabled"
CONTROL_DATA = "control_data"
DISCOVER_RESPONSE = "discover_response"
NEIGHBOURS_RESPONSE = "neighbours_response"
# Command response types
OK = "command_ok"

View file

@ -6,6 +6,7 @@ class BinaryReqType(Enum):
TELEMETRY = 0x03
MMA = 0x04
ACL = 0x05
NEIGHBOURS = 0x06
class ControlType(Enum):
NODE_DISCOVER_REQ = 0x80

View file

@ -23,7 +23,7 @@ class MessageReader:
# Track pending binary requests by tag for proper response parsing
self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at}
def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float):
def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float, context={}):
"""Register a pending binary request for proper response parsing"""
# Clean up expired requests before adding new one
self.cleanup_expired_requests()
@ -32,7 +32,8 @@ class MessageReader:
self.pending_binary_requests[tag] = {
"request_type": request_type,
"pubkey_prefix": prefix,
"expires_at": expires_at
"expires_at": expires_at,
"context": context # optional info we want to keep from req to resp
}
logger.debug(f"Registered binary request: tag={tag}, type={request_type}, expires in {timeout_seconds}s")
@ -519,6 +520,7 @@ class MessageReader:
if tag in self.pending_binary_requests:
request_type = self.pending_binary_requests[tag]["request_type"]
pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"]
context = self.pending_binary_requests[tag]["context"]
del self.pending_binary_requests[tag]
logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}")
@ -558,6 +560,40 @@ class MessageReader:
)
except Exception as e:
logger.error(f"Error parsing binary ACL response: {e}")
elif request_type == BinaryReqType.NEIGHBOURS:
try:
pk_plen = context["pubkey_prefix_length"]
bbuf = io.BytesIO(response_data)
res = {
"pubkey_prefix": pubkey_prefix,
"tag": tag
}
res.update(context) # add context in result
res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True)
results_count = int.from_bytes(bbuf.read(2), "little", signed=True)
res["results_count"] = results_count
neighbours_list = []
for _ in range (results_count):
neighb = {}
neighb["pubkey"] = bbuf.read(pk_plen).hex()
neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True)
neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4
neighbours_list.append(neighb)
res["neighbours"] = neighbours_list
await self.dispatcher.dispatch(
Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix})
)
except Exception as e:
logger.error(f"Error parsing binary NEIGHBOURS response: {e}")
else:
logger.debug(f"No tracked request found for binary response tag {tag}")
@ -623,7 +659,7 @@ class MessageReader:
if len(pubkey) < 32:
pubkey = pubkey[0:8]
else:
pubkey = pubkey[0:32]
pubkey = pubkey[0:32]
ndr["pubkey"] = pubkey.hex()