From 2b6a4b267b006f6e0ab4241661cf5b76ed1a9a27 Mon Sep 17 00:00:00 2001 From: Florent Date: Fri, 29 Aug 2025 20:10:15 +0200 Subject: [PATCH] add req_binary --- pyproject.toml | 2 +- src/meshcore/commands/binary.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2cf793e..e4d681d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "meshcore" -version = "2.1.3" +version = "2.1.5" authors = [ { name="Florent de Lamotte", email="florent@frizoncorrea.fr" }, { name="Alex Wolden", email="awolden@gmail.com" }, diff --git a/src/meshcore/commands/binary.py b/src/meshcore/commands/binary.py index eb1165c..d35f54f 100644 --- a/src/meshcore/commands/binary.py +++ b/src/meshcore/commands/binary.py @@ -11,6 +11,8 @@ logger = logging.getLogger("meshcore") class BinaryReqType(Enum): + STATUS = 0x01 + KEEP_ALIVE = 0x02 TELEMETRY = 0x03 MMA = 0x04 ACL = 0x05 @@ -93,6 +95,36 @@ class BinaryCommandHandler(CommandHandlerBase): else: return res2.payload + async def req_status(self, contact, timeout=0): + code = BinaryReqType.STATUS.value + req = code.to_bytes(1, "little", signed=False) + rep = await self.req_binary(contact, req, timeout) + + if rep is None : + return None + else: + data=bytes.fromhex(rep["data"]) + res = {} + res["pubkey_pre"] = contact["public_key"][0:12] + res["bat"] = int.from_bytes(data[0:2], byteorder="little") + res["tx_queue_len"] = int.from_bytes(data[2:4], byteorder="little") + res["noise_floor"] = int.from_bytes(data[4:6], byteorder="little", signed=True) + res["last_rssi"] = int.from_bytes(data[6:8], byteorder="little", signed=True) + res["nb_recv"] = int.from_bytes(data[8:12], byteorder="little", signed=False) + res["nb_sent"] = int.from_bytes(data[12:16], byteorder="little", signed=False) + res["airtime"] = int.from_bytes(data[16:20], byteorder="little") + res["uptime"] = int.from_bytes(data[20:24], byteorder="little") + res["sent_flood"] = int.from_bytes(data[24:28], byteorder="little") + res["sent_direct"] = int.from_bytes(data[28:32], byteorder="little") + res["recv_flood"] = int.from_bytes(data[32:36], byteorder="little") + res["recv_direct"] = int.from_bytes(data[36:40], byteorder="little") + res["full_evts"] = int.from_bytes(data[40:42], byteorder="little") + res["last_snr"] = (int.from_bytes(data[42:44], byteorder="little", signed=True) / 4) + res["direct_dups"] = int.from_bytes(data[44:46], byteorder="little") + res["flood_dups"] = int.from_bytes(data[46:48], byteorder="little") + res["rx_airtime"] = int.from_bytes(data[48:52], byteorder="little") + return res if res["uptime"] > 0 else None + async def req_telemetry(self, contact, timeout=0): code = BinaryReqType.TELEMETRY.value req = code.to_bytes(1, "little", signed=False)