From 9aeffb41a1a42289dc64f2df913ffdb5ef63dc2a Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Fri, 29 Aug 2025 11:54:21 -0700 Subject: [PATCH] Move binary process to reader for consistent eventing --- examples/serial_repeater_telemetry.py | 43 ++---- src/meshcore/binary_parsing.py | 71 +++++++++ src/meshcore/commands/binary.py | 209 +++++++++----------------- src/meshcore/events.py | 2 + src/meshcore/reader.py | 97 ++++++++++++ 5 files changed, 257 insertions(+), 165 deletions(-) create mode 100644 src/meshcore/binary_parsing.py diff --git a/examples/serial_repeater_telemetry.py b/examples/serial_repeater_telemetry.py index dfc5ef4..b11fd41 100755 --- a/examples/serial_repeater_telemetry.py +++ b/examples/serial_repeater_telemetry.py @@ -9,22 +9,20 @@ from meshcore.events import EventType async def main(): # Parse command line arguments parser = argparse.ArgumentParser(description='Get status from a repeater via serial connection') - # parser.add_argument('-p', '--port', required=True, help='Serial port') - # parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate') + parser.add_argument('-p', '--port', required=True, help='Serial port') + parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate') parser.add_argument('-r', '--repeater', required=True, help='Repeater name') parser.add_argument('-pw', '--password', required=True, help='Password for login') - # parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds') + parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds') args = parser.parse_args() # Connect to the device - mc = await MeshCore.create_ble("lora-py-tester") - 534463 + mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True) + try: # Get contacts - result = await mc.commands.get_contacts() - print(result) - print(mc._contacts) - repeater = mc.get_contact_by_key_prefix(args.repeater) + await mc.ensure_contacts() + repeater = mc.get_contact_by_name(args.repeater) if repeater is None: print(f"Repeater '{args.repeater}' not found in contacts.") @@ -37,25 +35,14 @@ async def main(): if login_event.type != EventType.ERROR: print("Login successful") - # Continuously poll for telemetry every 60 seconds - print("Starting continuous telemetry polling every 60 seconds...") - while True: - try: - # Send status request - print("Sending status request...") - await mc.commands.send_telemetry_req(repeater) - - # Wait for status response - telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=10) - print(telemetry_event) - - # Wait 60 seconds before next poll - await asyncio.sleep(60) - - except Exception as e: - print(f"Error during telemetry poll: {e}") - # Wait before retrying - await asyncio.sleep(60) + # Send status request + print("Sending status request...") + await mc.commands.send_telemetry_req(repeater) + + # Wait for status response + telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=args.timeout) + print(telemetry_event.payload["lpp"]) + else: print("Login failed or timed out") diff --git a/src/meshcore/binary_parsing.py b/src/meshcore/binary_parsing.py new file mode 100644 index 0000000..56531ac --- /dev/null +++ b/src/meshcore/binary_parsing.py @@ -0,0 +1,71 @@ +import logging +from enum import Enum +import json +from cayennelpp import LppFrame, LppData +from cayennelpp.lpp_type import LppType +from .lpp_json_encoder import lpp_json_encoder, my_lpp_types, lpp_format_val + +logger = logging.getLogger("meshcore") + + +class BinaryReqType(Enum): + STATUS = 0x01 + KEEP_ALIVE = 0x02 + TELEMETRY = 0x03 + MMA = 0x04 + ACL = 0x05 + + +def lpp_parse(buf): + """Parse a given byte string and return as a LppFrame object.""" + i = 0 + lpp_data_list = [] + while i < len(buf) and buf[i] != 0: + lppdata = LppData.from_bytes(buf[i:]) + lpp_data_list.append(lppdata) + i = i + len(lppdata) + + return json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder)) + + +def lpp_parse_mma(buf): + i = 0 + res = [] + while i < len(buf) and buf[i] != 0: + chan = buf[i] + i = i + 1 + type = buf[i] + lpp_type = LppType.get_lpp_type(type) + if lpp_type is None: + logger.error(f"Unknown LPP type: {type}") + return None + size = lpp_type.size + i = i + 1 + min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) + i = i + size + max = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) + i = i + size + avg = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) + i = i + size + res.append( + { + "channel": chan, + "type": my_lpp_types[type][0], + "min": min, + "max": max, + "avg": avg, + } + ) + return res + + +def parse_acl(buf): + i = 0 + res = [] + while i + 7 <= len(buf): + key = buf[i : i + 6].hex() + perm = buf[i + 6] + if key != "000000000000": + res.append({"key": key, "perm": perm}) + i = i + 7 + return res \ No newline at end of file diff --git a/src/meshcore/commands/binary.py b/src/meshcore/commands/binary.py index 4bd2392..2f456d6 100644 --- a/src/meshcore/commands/binary.py +++ b/src/meshcore/commands/binary.py @@ -1,167 +1,102 @@ import logging -from enum import Enum -import json from mailbox import Message from meshcore.commands.messaging import MessagingCommands from .base import CommandHandlerBase from ..events import EventType -from cayennelpp import LppFrame, LppData -from cayennelpp.lpp_type import LppType -from ..lpp_json_encoder import lpp_json_encoder, my_lpp_types, lpp_format_val +from ..binary_parsing import BinaryReqType, lpp_parse, lpp_parse_mma, parse_acl logger = logging.getLogger("meshcore") -class BinaryReqType(Enum): - STATUS = 0x01 - KEEP_ALIVE = 0x02 - TELEMETRY = 0x03 - MMA = 0x04 - ACL = 0x05 - - -def lpp_parse(buf): - """Parse a given byte string and return as a LppFrame object.""" - i = 0 - lpp_data_list = [] - while i < len(buf) and buf[i] != 0: - lppdata = LppData.from_bytes(buf[i:]) - lpp_data_list.append(lppdata) - i = i + len(lppdata) - - return json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder)) - - -def lpp_parse_mma(buf): - i = 0 - res = [] - while i < len(buf) and buf[i] != 0: - chan = buf[i] - i = i + 1 - type = buf[i] - lpp_type = LppType.get_lpp_type(type) - if lpp_type is None: - logger.error(f"Unknown LPP type: {type}") - return None - size = lpp_type.size - i = i + 1 - min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) - i = i + size - max = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) - i = i + size - avg = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) - i = i + size - res.append( - { - "channel": chan, - "type": my_lpp_types[type][0], - "min": min, - "max": max, - "avg": avg, - } - ) - return res - - -def parse_acl(buf): - i = 0 - res = [] - while i + 7 <= len(buf): - key = buf[i : i + 6].hex() - perm = buf[i + 6] - if key != "000000000000": - res.append({"key": key, "perm": perm}) - i = i + 7 - return res - - class BinaryCommandHandler(MessagingCommands): """Helper functions to handle binary requests through binary commands""" - async def req_binary(self, contact, request, timeout=0): - res = await self.send_binary_req(contact, request) - logger.debug(res) - if res.type == EventType.ERROR: - logger.error("Error while requesting binary data") - return None - else: - exp_tag = res.payload["expected_ack"].hex() - timeout = ( - res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - ) - if self.dispatcher is None: - logger.error("No dispatcher set, cannot wait for response") - return None - res2 = await self.dispatcher.wait_for_event( - EventType.BINARY_RESPONSE, - attribute_filters={"tag": exp_tag}, - timeout=timeout, - ) - logger.debug(res2) - if res2 is None: - return None - else: - return res2.payload async def req_status(self, contact, timeout=0): - code = BinaryReqType.STATUS.value - req = code.to_bytes(1, "little", signed=False) - rep = await self.req_binary(contact, req, timeout) - - if rep is None : + res = await self.send_binary_req(contact, BinaryReqType.STATUS.value.to_bytes(1, "little")) + if res.type == EventType.ERROR: return None - else: - data=bytes.fromhex(rep["data"]) - res = {} - res["pubkey_pre"] = contact["public_key"][0:12] - res["bat"] = int.from_bytes(data[0:2], byteorder="little") - res["tx_queue_len"] = int.from_bytes(data[2:4], byteorder="little") - res["noise_floor"] = int.from_bytes(data[4:6], byteorder="little", signed=True) - res["last_rssi"] = int.from_bytes(data[6:8], byteorder="little", signed=True) - res["nb_recv"] = int.from_bytes(data[8:12], byteorder="little", signed=False) - res["nb_sent"] = int.from_bytes(data[12:16], byteorder="little", signed=False) - res["airtime"] = int.from_bytes(data[16:20], byteorder="little") - res["uptime"] = int.from_bytes(data[20:24], byteorder="little") - res["sent_flood"] = int.from_bytes(data[24:28], byteorder="little") - res["sent_direct"] = int.from_bytes(data[28:32], byteorder="little") - res["recv_flood"] = int.from_bytes(data[32:36], byteorder="little") - res["recv_direct"] = int.from_bytes(data[36:40], byteorder="little") - res["full_evts"] = int.from_bytes(data[40:42], byteorder="little") - res["last_snr"] = (int.from_bytes(data[42:44], byteorder="little", signed=True) / 4) - res["direct_dups"] = int.from_bytes(data[44:46], byteorder="little") - res["flood_dups"] = int.from_bytes(data[46:48], byteorder="little") - res["rx_airtime"] = int.from_bytes(data[48:52], byteorder="little") - return res if res["uptime"] > 0 else None + + exp_tag = res.payload["expected_ack"].hex() + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + + if self.dispatcher is None: + return None + + # Listen for STATUS_RESPONSE event with matching pubkey + contact_pubkey_prefix = contact["public_key"][0:12] + status_event = await self.dispatcher.wait_for_event( + EventType.STATUS_RESPONSE, + attribute_filters={"pubkey_prefix": contact_pubkey_prefix}, + timeout=timeout, + ) + + return status_event.payload if status_event else None async def req_telemetry(self, contact, timeout=0): - code = BinaryReqType.TELEMETRY.value - req = code.to_bytes(1, "little", signed=False) - res = await self.req_binary(contact, req, timeout) - if res is None: + res = await self.send_binary_req(contact, BinaryReqType.TELEMETRY.value.to_bytes(1, "little")) + if res.type == EventType.ERROR: return None - else: - return lpp_parse(bytes.fromhex(res["data"])) + + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + + if self.dispatcher is None: + return None + + # Listen for TELEMETRY_RESPONSE event with matching pubkey + contact_pubkey_prefix = contact["public_key"][0:12] + telem_event = await self.dispatcher.wait_for_event( + EventType.TELEMETRY_RESPONSE, + attribute_filters={"pubkey_prefix": contact_pubkey_prefix}, + timeout=timeout, + ) + + return telem_event.payload["lpp"] if telem_event else None async def req_mma(self, contact, start, end, timeout=0): - code = BinaryReqType.MMA.value req = ( - code.to_bytes(1, "little", signed=False) + BinaryReqType.MMA.value.to_bytes(1, "little", signed=False) + start.to_bytes(4, "little", signed=False) + end.to_bytes(4, "little", signed=False) + b"\0\0" ) - res = await self.req_binary(contact, req, timeout) - if res is None: + res = await self.send_binary_req(contact, req) + if res.type == EventType.ERROR: return None - else: - return lpp_parse_mma(bytes.fromhex(res["data"])[4:]) + + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + + if self.dispatcher is None: + return None + + # Listen for MMA_RESPONSE event with matching pubkey + contact_pubkey_prefix = contact["public_key"][0:12] + mma_event = await self.dispatcher.wait_for_event( + EventType.MMA_RESPONSE, + attribute_filters={"pubkey_prefix": contact_pubkey_prefix}, + timeout=timeout, + ) + + return mma_event.payload["mma_data"] if mma_event else None async def req_acl(self, contact, timeout=0): - code = BinaryReqType.ACL.value - req = code.to_bytes(1, "little", signed=False) + b"\0\0" - res = await self.req_binary(contact, req, timeout) - if res is None: + req = BinaryReqType.ACL.value.to_bytes(1, "little", signed=False) + b"\0\0" + res = await self.send_binary_req(contact, req) + if res.type == EventType.ERROR: return None - else: - return parse_acl(bytes.fromhex(res["data"])) + + timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout + + if self.dispatcher is None: + return None + + # Listen for ACL_RESPONSE event with matching pubkey + contact_pubkey_prefix = contact["public_key"][0:12] + acl_event = await self.dispatcher.wait_for_event( + EventType.ACL_RESPONSE, + attribute_filters={"pubkey_prefix": contact_pubkey_prefix}, + timeout=timeout, + ) + + return acl_event.payload["acl_data"] if acl_event else None diff --git a/src/meshcore/events.py b/src/meshcore/events.py index 137fd43..7feaa5f 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -37,6 +37,8 @@ class EventType(Enum): RX_LOG_DATA = "rx_log_data" TELEMETRY_RESPONSE = "telemetry_response" BINARY_RESPONSE = "binary_response" + MMA_RESPONSE = "mma_response" + ACL_RESPONSE = "acl_response" CUSTOM_VARS = "custom_vars" CHANNEL_INFO = "channel_info" PATH_RESPONSE = "path_response" diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 1400f0f..1b54f60 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -3,6 +3,7 @@ import json from typing import Any, Dict from .events import Event, EventType, EventDispatcher from .packets import PacketType +from .binary_parsing import BinaryReqType, lpp_parse, lpp_parse_mma, parse_acl from cayennelpp import LppFrame, LppData from meshcore.lpp_json_encoder import lpp_json_encoder @@ -497,10 +498,106 @@ class MessageReader: attributes = {"tag": res["tag"]} + # Always dispatch the generic BINARY_RESPONSE event first await self.dispatcher.dispatch( Event(EventType.BINARY_RESPONSE, res, attributes) ) + # Parse the request type from the response data and dispatch specific events + response_data = data[6:] + if response_data: # Check if there's response data + request_type = response_data[0] + + if request_type == BinaryReqType.STATUS.value: + # Parse as status response - use same parsing as STATUS_RESPONSE + if len(response_data) >= 53: # Minimum size for status data + status_res = {} + status_res["pubkey_pre"] = data[2:8].hex() # Use pubkey from tag area + status_data = response_data[1:] # Skip the request type byte + + status_res["bat"] = int.from_bytes(status_data[0:2], byteorder="little") + status_res["tx_queue_len"] = int.from_bytes(status_data[2:4], byteorder="little") + status_res["noise_floor"] = int.from_bytes(status_data[4:6], byteorder="little", signed=True) + status_res["last_rssi"] = int.from_bytes(status_data[6:8], byteorder="little", signed=True) + status_res["nb_recv"] = int.from_bytes(status_data[8:12], byteorder="little", signed=False) + status_res["nb_sent"] = int.from_bytes(status_data[12:16], byteorder="little", signed=False) + status_res["airtime"] = int.from_bytes(status_data[16:20], byteorder="little") + status_res["uptime"] = int.from_bytes(status_data[20:24], byteorder="little") + status_res["sent_flood"] = int.from_bytes(status_data[24:28], byteorder="little") + status_res["sent_direct"] = int.from_bytes(status_data[28:32], byteorder="little") + status_res["recv_flood"] = int.from_bytes(status_data[32:36], byteorder="little") + status_res["recv_direct"] = int.from_bytes(status_data[36:40], byteorder="little") + status_res["full_evts"] = int.from_bytes(status_data[40:42], byteorder="little") + status_res["last_snr"] = int.from_bytes(status_data[42:44], byteorder="little", signed=True) / 4 + status_res["direct_dups"] = int.from_bytes(status_data[44:46], byteorder="little") + status_res["flood_dups"] = int.from_bytes(status_data[46:48], byteorder="little") + status_res["rx_airtime"] = int.from_bytes(status_data[48:52], byteorder="little") + + status_attributes = {"pubkey_prefix": status_res["pubkey_pre"]} + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, status_res, status_attributes) + ) + + elif request_type == BinaryReqType.TELEMETRY.value: + # Parse as telemetry response + try: + telemetry_data = response_data[1:] # Skip the request type byte + lpp = lpp_parse(telemetry_data) + + telem_res = { + "pubkey_pre": data[2:8].hex(), + "lpp": lpp + } + + telem_attributes = { + "raw": telemetry_data.hex(), + "pubkey_prefix": telem_res["pubkey_pre"] + } + + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_attributes) + ) + except Exception as e: + logger.error(f"Error parsing binary telemetry response: {e}") + + elif request_type == BinaryReqType.MMA.value: + # Parse as MMA response + try: + mma_data = response_data[5:] # Skip request type + 4 bytes header + mma_result = lpp_parse_mma(mma_data) + + mma_res = { + "pubkey_pre": data[2:8].hex(), + "mma_data": mma_result + } + + mma_attributes = {"pubkey_prefix": mma_res["pubkey_pre"]} + + await self.dispatcher.dispatch( + Event(EventType.MMA_RESPONSE, mma_res, mma_attributes) + ) + except Exception as e: + logger.error(f"Error parsing binary MMA response: {e}") + + elif request_type == BinaryReqType.ACL.value: + # Parse as ACL response + try: + acl_data = response_data[1:] # Skip the request type byte + acl_result = parse_acl(acl_data) + + acl_res = { + "pubkey_pre": data[2:8].hex(), + "acl_data": acl_result + } + + acl_attributes = {"pubkey_prefix": acl_res["pubkey_pre"]} + + await self.dispatcher.dispatch( + Event(EventType.ACL_RESPONSE, acl_res, acl_attributes) + ) + except Exception as e: + logger.error(f"Error parsing binary ACL response: {e}") + elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: logger.debug(f"Received path discovery response: {data.hex()}") res = {}