From d9197faf3a0daff7bd06ebf297a9f0389526e964 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:06:53 -0700 Subject: [PATCH] =?UTF-8?q?G1:=20F06=20=E2=80=94=20wrap=20handle=5Frx=20di?= =?UTF-8?q?spatch=20in=20catch-all=20try/except?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: handle_rx is invoked from a detached task in MessageReader, so any exception escaping its ~850-line if/elif dispatch is silently swallowed by asyncio as "Task exception was never retrieved." The only crash guard previously was a single try/except IndexError around the first byte read; everything past line 73 was unguarded. This commit adds an umbrella try: ... except Exception as e: around the entire dispatch body that logs the exception class, message, raw frame hex, and full traceback via logger.error. The umbrella neutralizes the crash surface of F10, F11, N07, N08, R01, NEW-B, and NEW-C, which the next commits will then fix individually now that they are observable. Refs: Forensics report finding F06 (umbrella crash protection) --- src/meshcore/reader.py | 1680 ++++++++++++++++++++-------------------- 1 file changed, 845 insertions(+), 835 deletions(-) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 802004a..088baeb 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -3,6 +3,7 @@ import json import struct import time import io +import traceback from typing import Any, Dict from .events import Event, EventType, EventDispatcher, ErrorMessages from .meshcore_parser import MeshcorePacketParser @@ -69,853 +70,862 @@ class MessageReader: except IndexError as e: logger.warning(f"Received empty packet: {e}") return - logger.debug(f"Received data: {data.hex()}") + try: + logger.debug(f"Received data: {data.hex()}") - # Handle command responses - if packet_type_value == PacketType.OK.value: - result: Dict[str, Any] = {} - if len(data) == 5: - result["value"] = int.from_bytes(data[1:5], byteorder="little") + # Handle command responses + if packet_type_value == PacketType.OK.value: + result: Dict[str, Any] = {} + if len(data) == 5: + result["value"] = int.from_bytes(data[1:5], byteorder="little") - # Dispatch event for the OK response - await self.dispatcher.dispatch(Event(EventType.OK, result)) + # Dispatch event for the OK response + await self.dispatcher.dispatch(Event(EventType.OK, result)) - elif packet_type_value == PacketType.ERROR.value: - if len(data) > 1: - result = { "error_code": data[1], } - if data[1] in ErrorMessages: - result["code_string"] = ErrorMessages[data[1]] - else: - result = {} - - # Dispatch event for the ERROR response - await self.dispatcher.dispatch(Event(EventType.ERROR, result)) - - elif packet_type_value == PacketType.CONTACT_START.value: - self.contact_nb = int.from_bytes(data[1:5], byteorder="little") - self.contacts = {} - - elif ( - packet_type_value == PacketType.CONTACT.value - or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value - ): - c = {} - c["public_key"] = dbuf.read(32).hex() - c["type"] = dbuf.read(1)[0] - c["flags"] = dbuf.read(1)[0] - plen = int.from_bytes(dbuf.read(1), signed=False, byteorder="little") - if plen == 255: # flood - c["out_path_hash_mode"] = -1 - c["out_path_len"] = -1 # 6 LSB - else: - c["out_path_hash_mode"] = plen >> 6 - c["out_path_len"] = plen & 0x3F # 6 LSB - c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() - c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") - c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") - c["adv_lat"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - c["adv_lon"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") - - if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: - await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) - else: - await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) - self.contacts[c["public_key"]] = c - - elif packet_type_value == PacketType.ADVERT_PATH.value : - r = {} - r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) - plen = int.from_bytes(dbuf.read(1), "little", signed=False) - if plen == 255: # flood, should not happen - r["path_hash_mode"] = -1 - r["path_len"] = -1 - else: - r["path_hash_mode"] = plen >> 6 # 2 upper bytes - r["path_len"] = plen & 0x3F - r["path"] = dbuf.read().replace(b"\0", b"").hex() - - await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) - - elif packet_type_value == PacketType.CONTACT_END.value: - lastmod = int.from_bytes(dbuf.read(4), byteorder="little") - attributes = { - "lastmod": lastmod, - } - await self.dispatcher.dispatch( - Event(EventType.CONTACTS, self.contacts, attributes) - ) - - elif packet_type_value == PacketType.SELF_INFO.value: - self_info = {} - self_info["adv_type"] = dbuf.read(1)[0] - self_info["tx_power"] = dbuf.read(1)[0] - self_info["max_tx_power"] = dbuf.read(1)[0] - self_info["public_key"] = dbuf.read(32).hex() - self_info["adv_lat"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - self_info["adv_lon"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - self_info["multi_acks"] = dbuf.read(1)[0] - self_info["adv_loc_policy"] = dbuf.read(1)[0] - telemetry_mode = dbuf.read(1)[0] - self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 - self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 - self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 - self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 - self_info["radio_freq"] = ( - int.from_bytes(dbuf.read(4), byteorder="little") / 1000 - ) - self_info["radio_bw"] = ( - int.from_bytes(dbuf.read(4), byteorder="little") / 1000 - ) - self_info["radio_sf"] = dbuf.read(1)[0] - self_info["radio_cr"] = dbuf.read(1)[0] - self_info["name"] = dbuf.read().decode("utf-8", "ignore") - await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) - - elif packet_type_value == PacketType.MSG_SENT.value: - res = {} - res["type"] = dbuf.read(1)[0] - res["expected_ack"] = dbuf.read(4) - res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") - - attributes = { - "type": res["type"], - "expected_ack": res["expected_ack"].hex(), - } - - await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) - - elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: - res = {} - res["type"] = "PRIV" - res["pubkey_prefix"] = dbuf.read(6).hex() - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - txt_type = dbuf.read(1)[0] - res["txt_type"] = txt_type - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") - if txt_type == 2: - res["signature"] = dbuf.read(4).hex() - res["text"] = dbuf.read().decode("utf-8", "ignore") - - attributes = { - "pubkey_prefix": res["pubkey_prefix"], - "txt_type": res["txt_type"], - } - - evt_type = EventType.CONTACT_MSG_RECV - - await self.dispatcher.dispatch(Event(evt_type, res, attributes)) - - elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "PRIV" - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - dbuf.read(2) # reserved - res["pubkey_prefix"] = dbuf.read(6).hex() - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - txt_type = dbuf.read(1)[0] - res["txt_type"] = txt_type - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") - if txt_type == 2: - res["signature"] = dbuf.read(4).hex() - res["text"] = dbuf.read().decode("utf-8", "ignore") - - attributes = { - "pubkey_prefix": res["pubkey_prefix"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CONTACT_MSG_RECV, res, attributes) - ) - - elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: - res = {} - res["type"] = "CHAN" - res["channel_idx"] = dbuf.read(1)[0] - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - res["txt_type"] = dbuf.read(1)[0] - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) - text = dbuf.read().strip(b"\0") - res["text"] = text.decode("utf-8", "ignore") - - # search for text in log_channels - txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) - if self.decrypt_channels: - logged = await self.packet_parser.findLogChannelMsg(txt_hash) - if not logged is None: - res["path"] = logged["path"] - res["RSSI"] = logged["rssi"] - res["SNR"] = logged["snr"] - res["recv_time"] = logged["recv_time"] - res["attempt"] = logged["attempt"] - - attributes = { - "channel_idx": res["channel_idx"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CHANNEL_MSG_RECV, res, attributes) - ) - - elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "CHAN" - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - dbuf.read(2) # reserved - res["channel_idx"] = dbuf.read(1)[0] - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - res["txt_type"] = dbuf.read(1)[0] - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) - text = dbuf.read() - res["text"] = text.decode("utf-8", "ignore") - - # search for text in log_channels - if self.decrypt_channels: - txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) - res["txt_hash"] = txt_hash - logged = await self.packet_parser.findLogChannelMsg(txt_hash) - - if not logged is None: - res["path"] = logged["path"] - res["RSSI"] = logged["rssi"] - res["recv_time"] = logged["recv_time"] - res["attempt"] = logged["attempt"] - - attributes = { - "channel_idx": res["channel_idx"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CHANNEL_MSG_RECV, res, attributes) - ) - - elif packet_type_value == PacketType.CURRENT_TIME.value: - time_value = int.from_bytes(dbuf.read(4), byteorder="little") - result = {"time": time_value} - await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) - - elif packet_type_value == PacketType.NO_MORE_MSGS.value: - result = {"messages_available": False} - await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) - - elif packet_type_value == PacketType.CONTACT_URI.value: - contact_uri = "meshcore://" + dbuf.read().hex() - result = {"uri": contact_uri} - await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) - - elif packet_type_value == PacketType.BATTERY.value: - battery_level = int.from_bytes(dbuf.read(2), byteorder="little") - result = {"level": battery_level} - if len(data) > 3: # has storage info as well - result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") - result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") - await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) - - elif packet_type_value == PacketType.DEVICE_INFO.value: - res = {} - fw_ver = dbuf.read(1)[0] - res["fw ver"] = fw_ver - if fw_ver >= 3: - res["max_contacts"] = dbuf.read(1)[0] * 2 - res["max_channels"] = dbuf.read(1)[0] - res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") - res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") - res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") - res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") - if fw_ver >= 9: # has repeater mode - rpt = dbuf.read(1) - if len(rpt) > 0: - res["repeat"] = (rpt[0] != 0) - if fw_ver >= 10: # has path_hash_mode - path_hash_mode = dbuf.read(1)[0] - res["path_hash_mode"] = path_hash_mode - await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) - - elif packet_type_value == PacketType.CUSTOM_VARS.value: - logger.debug(f"received custom vars response: {data.hex()}") - res = {} - rawdata = dbuf.read().decode("utf-8", "ignore") - if not rawdata == "": - pairs = rawdata.split(",") - for p in pairs: - psplit = p.split(":") - res[psplit[0]] = psplit[1] - logger.debug(f"got custom vars : {res}") - await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) - - elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) - logger.debug(f"received stats response: {data.hex()}") - # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte - # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) - if len(data) < 2: - logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") - await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) - return - - stats_type = data[1] - - if stats_type == 0: # STATS_TYPE_CORE - # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total - # Format: 1: + result = { "error_code": data[1], } + if data[1] in ErrorMessages: + result["code_string"] = ErrorMessages[data[1]] else: - try: - battery_mv, uptime_secs, errors, queue_len = struct.unpack('> 6 + c["out_path_len"] = plen & 0x3F # 6 LSB + c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() + c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") + c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") + c["adv_lat"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + c["adv_lon"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") + + if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: + await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) else: - try: - recv, sent, flood_tx, direct_tx, flood_rx, direct_rx = struct.unpack('= 30: - (recv_errors,) = struct.unpack('= 0: - res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") - else: - res["channel_name"] = name_bytes.decode("utf-8", "ignore") - - res["channel_secret"] = dbuf.read(16) - res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] - - await self.packet_parser.newChannel(res) - - await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) - - # Push notifications - elif packet_type_value == PacketType.ADVERTISEMENT.value: - logger.debug("Advertisement received") - res = {} - res["public_key"] = dbuf.read(32).hex() - await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) - - elif packet_type_value == PacketType.PATH_UPDATE.value: - logger.debug("Code path update") - res = {} - res["public_key"] = dbuf.read(32).hex() - await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) - - elif packet_type_value == PacketType.ACK.value: - logger.debug("Received ACK") - ack_data = {} - - if len(data) >= 5: - ack_data["code"] = dbuf.read(4).hex() - - attributes = {"code": ack_data.get("code", "")} - - await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) - - elif packet_type_value == PacketType.MESSAGES_WAITING.value: - logger.debug("Msgs are waiting") - await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) - - elif packet_type_value == PacketType.RAW_DATA.value: - res = {} - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - res["payload"] = dbuf.read(4).hex() - logger.debug("Received raw data") - print(res) - await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) - - elif packet_type_value == PacketType.LOGIN_SUCCESS.value: - res = {} - attributes = {} - if len(data) > 1: - perms = dbuf.read(1)[0] - res["permissions"] = perms - res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set - - res["pubkey_prefix"] = dbuf.read(6).hex() - - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} - - await self.dispatcher.dispatch( - Event(EventType.LOGIN_SUCCESS, res, attributes) - ) - - elif packet_type_value == PacketType.LOGIN_FAILED.value: - res = {} - attributes = {} - - dbuf.read(1) - - if len(data) > 7: - res["pubkey_prefix"] = pbuf.read(6).hex() - - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} - - await self.dispatcher.dispatch( - Event(EventType.LOGIN_FAILED, res, attributes) - ) - - elif packet_type_value == PacketType.STATUS_RESPONSE.value: - res = parse_status(data, offset=8) - data_hex = data[8:].hex() - logger.debug(f"Status response: {data_hex}") - - attributes = { - "pubkey_prefix": res["pubkey_pre"], - } - - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.LOG_DATA.value: - logger.debug(f"Received RF log data: {data.hex()}") - - # Parse as raw RX data - log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} - attributes = {} - - recv_time = int(time.time()) - log_data["recv_time"] = recv_time - attributes["recv_time"] = recv_time - - # First byte is SNR (signed byte, multiplied by 4) - if len(data) > 1: - snr_byte = dbuf.read(1)[0] - # Convert to signed value - snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 - log_data["snr"] = snr - - # Second byte is RSSI (signed byte) - if len(data) > 2: - rssi_byte = dbuf.read(1)[0] - # Convert to signed value - rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 - log_data["rssi"] = rssi - - # Remaining bytes are the raw data payload - payload = None - if len(data) > 3: - payload=dbuf.read() - log_data["payload"] = payload.hex() - log_data["payload_length"] = len(payload) - - # Parse payload and get some info from it - log_data = await self.packet_parser.parsePacketPayload(payload, log_data) - attributes['route_type'] = log_data['route_type'] - attributes['payload_type'] = log_data['payload_type'] - attributes['path_len'] = log_data['path_len'] - attributes['path'] = log_data['path'] - - # Dispatch as RF log data - await self.dispatcher.dispatch( - Event(EventType.RX_LOG_DATA, log_data, attributes) - ) - - elif packet_type_value == PacketType.TRACE_DATA.value: - logger.debug(f"Received trace data: {data.hex()}") - res = {} - - # According to the source, format is: - # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr - - reserved = dbuf.read(1)[0] - path_len = dbuf.read(1)[0] - flags = dbuf.read(1)[0] - tag = int.from_bytes(dbuf.read(4), byteorder="little") - auth_code = int.from_bytes(dbuf.read(4), byteorder="little") - - path_hash_len = 1 << (flags&3) - path_len = int(path_len / path_hash_len) - - # Initialize result - res["tag"] = tag - res["auth"] = auth_code - res["flags"] = flags - res["path_len"] = path_len - - # Process path as array of objects with hash and SNR - path_nodes = [] - - if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: - # Extract path with hash and SNR pairs - for i in range(path_len): - node = { - "hash": dbuf.read(path_hash_len).hex(), - } - path_nodes.append(node) - - for n in path_nodes: - node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - n["snr"] = node_snr / 4.0 - - # Add the final node (our device) with its SNR - final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 - path_nodes.append({"snr": final_snr}) - - res["path"] = path_nodes - - logger.debug(f"Parsed trace data: {res}") - - attributes = { - "tag": res["tag"], - "auth_code": res["auth"], - } - - await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) - - elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: - logger.debug(f"Received telemetry data: {data.hex()}") - res = {} - - dbuf.read(1) - - res["pubkey_pre"] = dbuf.read(6).hex() - buf = dbuf.read() - - """Parse a given byte string and return as a LppFrame object.""" - i = 0 - lpp_data_list = [] - while i < len(buf) and buf[i] != 0: - lppdata = LppData.from_bytes(buf[i:]) - lpp_data_list.append(lppdata) - i = i + len(lppdata) - - lpp = json.loads( - json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) - ) - - res["lpp"] = lpp - - attributes = { - "raw": buf.hex(), - "pubkey_prefix": res["pubkey_pre"], - } - - await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: - res = {} - freqs = [] - - cont = True - try: - while cont: - min = int.from_bytes(dbuf.read(4), "little", signed=False) - max = int.from_bytes(dbuf.read(4), "little", signed=False) - if min == 0 or max == 0: - cont = False - else: - freqs.append({"min" : min, "max": max}) - except e: - print(e) - - res["freqs"] = freqs - - await self.dispatcher.dispatch( - Event(EventType.ALLOWED_REPEAT_FREQ, res) - ) - - elif packet_type_value == PacketType.BINARY_RESPONSE.value: - dbuf.read(1) - tag = dbuf.read(4).hex() - response_data = dbuf.read() - logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") - - # Always dispatch generic BINARY_RESPONSE - binary_res = {"tag": tag, "data": response_data.hex()} - await self.dispatcher.dispatch( - Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) - ) - - # Check for tracked request type and dispatch specific response - if tag in self.pending_binary_requests: - request_type = self.pending_binary_requests[tag]["request_type"] - is_anon = self.pending_binary_requests[tag]["is_anon"] - pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] - context = self.pending_binary_requests[tag]["context"] - del self.pending_binary_requests[tag] - logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") - - if not is_anon: - - if request_type == BinaryReqType.STATUS and len(response_data) >= 52: - res = {} - res = parse_status(response_data, pubkey_prefix=pubkey_prefix) - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) - ) - - elif request_type == BinaryReqType.TELEMETRY : - try: - lpp = lpp_parse(response_data) - telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) - ) - except Exception as e: - logger.error(f"Error parsing binary telemetry response: {e}") - - elif request_type == BinaryReqType.MMA: - try: - mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header - mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.MMA_RESPONSE, mma_res, mma_res) - ) - except Exception as e: - logger.error(f"Error parsing binary MMA response: {e}") - - elif request_type == BinaryReqType.ACL: - try: - acl_result = parse_acl(response_data) - acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) - ) - except Exception as e: - logger.error(f"Error parsing binary ACL response: {e}") - - elif request_type == BinaryReqType.NEIGHBOURS: - try: - pk_plen = context["pubkey_prefix_length"] - bbuf = io.BytesIO(response_data) - - res = { - "pubkey_prefix": pubkey_prefix, - "tag": tag - } - res.update(context) # add context in result - - res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) - results_count = int.from_bytes(bbuf.read(2), "little", signed=True) - res["results_count"] = results_count - - neighbours_list = [] - - for _ in range (results_count): - neighb = {} - neighb["pubkey"] = bbuf.read(pk_plen).hex() - neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) - neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 - neighbours_list.append(neighb) - - res["neighbours"] = neighbours_list - - await self.dispatcher.dispatch( - Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) - ) - - except Exception as e: - logger.error(f"Error parsing binary NEIGHBOURS response: {e}") - - else: - logger.debug(f"No tracked request found for binary response tag {tag}") - - elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: - logger.debug(f"Received path discovery response: {data.hex()}") - res = {} - dbuf.read(1) - res["pubkey_pre"] = dbuf.read(6).hex() - opl = dbuf.read(1)[0] - opl_hlen = ((opl & 0xc0) >> 6) + 1 - opl = opl & 0x3f - res["out_path_len"] = opl - res["out_path_hash_len"] = opl_hlen - res["out_path"] = dbuf.read(opl*opl_hlen).hex() - ipl = dbuf.read(1)[0] - ipl_hlen = ((ipl & 0xc0) >> 6) + 1 - ipl = ipl & 0x3f - res["in_path_len"] = ipl - res["in_path_hash_len"] = ipl_hlen - res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() - - attributes = {"pubkey_pre": res["pubkey_pre"]} - - await self.dispatcher.dispatch( - Event(EventType.PATH_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.PRIVATE_KEY.value: - logger.debug(f"Received private key response: {data.hex()}") - if len(data) >= 65: # 1 byte response code + 64 bytes private key - private_key = dbuf.read(64) # Extract 64-byte private key - res = {"private_key": private_key} - await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) - else: - logger.error(f"Invalid private key response length: {len(data)}") - - elif packet_type_value == PacketType.SIGN_START.value: - logger.debug(f"Received sign start response: {data.hex()}") - # Payload: 1 reserved byte, 4-byte max length - dbuf.read(1) - max_len = int.from_bytes(dbuf.read(4), "little") - res = {"max_length": max_len} - await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) - - elif packet_type_value == PacketType.SIGNATURE.value: - logger.debug(f"Received signature: {data.hex()}") - signature = dbuf.read() - res = {"signature": signature} - await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) - - elif packet_type_value == PacketType.DISABLED.value: - logger.debug("Received disabled response") - res = {"reason": "private_key_export_disabled"} - await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) - - elif packet_type_value == PacketType.CONTROL_DATA.value: - logger.debug("Received control data packet") - res={} - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - res["path_len"] = dbuf.read(1)[0] - payload = dbuf.read() - payload_type = payload[0] - res["payload_type"] = payload_type - res["payload"] = payload - - attributes = {"payload_type": payload_type} - await self.dispatcher.dispatch( - Event(EventType.CONTROL_DATA, res, attributes) - ) - - # decode NODE_DISCOVER_RESP - if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: - pbuf = io.BytesIO(payload[1:]) - ndr = dict(res) - del ndr["payload_type"] - del ndr["payload"] - ndr["node_type"] = payload_type & 0x0F - ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 - ndr["tag"] = pbuf.read(4).hex() - - pubkey = pbuf.read() - if len(pubkey) < 32: - pubkey = pubkey[0:8] + await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) + self.contacts[c["public_key"]] = c + + elif packet_type_value == PacketType.ADVERT_PATH.value : + r = {} + r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) + plen = int.from_bytes(dbuf.read(1), "little", signed=False) + if plen == 255: # flood, should not happen + r["path_hash_mode"] = -1 + r["path_len"] = -1 else: - pubkey = pubkey[0:32] + r["path_hash_mode"] = plen >> 6 # 2 upper bytes + r["path_len"] = plen & 0x3F + r["path"] = dbuf.read().replace(b"\0", b"").hex() - ndr["pubkey"] = pubkey.hex() + await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) + + elif packet_type_value == PacketType.CONTACT_END.value: + lastmod = int.from_bytes(dbuf.read(4), byteorder="little") + attributes = { + "lastmod": lastmod, + } + await self.dispatcher.dispatch( + Event(EventType.CONTACTS, self.contacts, attributes) + ) + + elif packet_type_value == PacketType.SELF_INFO.value: + self_info = {} + self_info["adv_type"] = dbuf.read(1)[0] + self_info["tx_power"] = dbuf.read(1)[0] + self_info["max_tx_power"] = dbuf.read(1)[0] + self_info["public_key"] = dbuf.read(32).hex() + self_info["adv_lat"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + self_info["adv_lon"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + self_info["multi_acks"] = dbuf.read(1)[0] + self_info["adv_loc_policy"] = dbuf.read(1)[0] + telemetry_mode = dbuf.read(1)[0] + self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 + self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 + self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 + self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 + self_info["radio_freq"] = ( + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 + ) + self_info["radio_bw"] = ( + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 + ) + self_info["radio_sf"] = dbuf.read(1)[0] + self_info["radio_cr"] = dbuf.read(1)[0] + self_info["name"] = dbuf.read().decode("utf-8", "ignore") + await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) + + elif packet_type_value == PacketType.MSG_SENT.value: + res = {} + res["type"] = dbuf.read(1)[0] + res["expected_ack"] = dbuf.read(4) + res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") attributes = { - "node_type" : ndr["node_type"], - "tag" : ndr["tag"], - "pubkey" : ndr["pubkey"], + "type": res["type"], + "expected_ack": res["expected_ack"].hex(), + } + + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) + + elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: + res = {} + res["type"] = "PRIV" + res["pubkey_prefix"] = dbuf.read(6).hex() + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"], + } + + evt_type = EventType.CONTACT_MSG_RECV + + await self.dispatcher.dispatch(Event(evt_type, res, attributes)) + + elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "PRIV" + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + dbuf.read(2) # reserved + res["pubkey_prefix"] = dbuf.read(6).hex() + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"], } await self.dispatcher.dispatch( - Event(EventType.DISCOVER_RESPONSE, ndr, attributes) + Event(EventType.CONTACT_MSG_RECV, res, attributes) ) - else: - logger.debug(f"Unhandled data received {data}") - logger.debug(f"Unhandled packet type: {packet_type_value}") + elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: + res = {} + res["type"] = "CHAN" + res["channel_idx"] = dbuf.read(1)[0] + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) + text = dbuf.read().strip(b"\0") + res["text"] = text.decode("utf-8", "ignore") + + # search for text in log_channels + txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) + if self.decrypt_channels: + logged = await self.packet_parser.findLogChannelMsg(txt_hash) + if not logged is None: + res["path"] = logged["path"] + res["RSSI"] = logged["rssi"] + res["SNR"] = logged["snr"] + res["recv_time"] = logged["recv_time"] + res["attempt"] = logged["attempt"] + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"], + } + + await self.dispatcher.dispatch( + Event(EventType.CHANNEL_MSG_RECV, res, attributes) + ) + + elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "CHAN" + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + dbuf.read(2) # reserved + res["channel_idx"] = dbuf.read(1)[0] + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) + text = dbuf.read() + res["text"] = text.decode("utf-8", "ignore") + + # search for text in log_channels + if self.decrypt_channels: + txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) + res["txt_hash"] = txt_hash + logged = await self.packet_parser.findLogChannelMsg(txt_hash) + + if not logged is None: + res["path"] = logged["path"] + res["RSSI"] = logged["rssi"] + res["recv_time"] = logged["recv_time"] + res["attempt"] = logged["attempt"] + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"], + } + + await self.dispatcher.dispatch( + Event(EventType.CHANNEL_MSG_RECV, res, attributes) + ) + + elif packet_type_value == PacketType.CURRENT_TIME.value: + time_value = int.from_bytes(dbuf.read(4), byteorder="little") + result = {"time": time_value} + await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) + + elif packet_type_value == PacketType.NO_MORE_MSGS.value: + result = {"messages_available": False} + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) + + elif packet_type_value == PacketType.CONTACT_URI.value: + contact_uri = "meshcore://" + dbuf.read().hex() + result = {"uri": contact_uri} + await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) + + elif packet_type_value == PacketType.BATTERY.value: + battery_level = int.from_bytes(dbuf.read(2), byteorder="little") + result = {"level": battery_level} + if len(data) > 3: # has storage info as well + result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) + + elif packet_type_value == PacketType.DEVICE_INFO.value: + res = {} + fw_ver = dbuf.read(1)[0] + res["fw ver"] = fw_ver + if fw_ver >= 3: + res["max_contacts"] = dbuf.read(1)[0] * 2 + res["max_channels"] = dbuf.read(1)[0] + res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") + res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") + res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") + res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") + if fw_ver >= 9: # has repeater mode + rpt = dbuf.read(1) + if len(rpt) > 0: + res["repeat"] = (rpt[0] != 0) + if fw_ver >= 10: # has path_hash_mode + path_hash_mode = dbuf.read(1)[0] + res["path_hash_mode"] = path_hash_mode + await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) + + elif packet_type_value == PacketType.CUSTOM_VARS.value: + logger.debug(f"received custom vars response: {data.hex()}") + res = {} + rawdata = dbuf.read().decode("utf-8", "ignore") + if not rawdata == "": + pairs = rawdata.split(",") + for p in pairs: + psplit = p.split(":") + res[psplit[0]] = psplit[1] + logger.debug(f"got custom vars : {res}") + await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) + + elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) + logger.debug(f"received stats response: {data.hex()}") + # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte + # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) + if len(data) < 2: + logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") + await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) + return + + stats_type = data[1] + + if stats_type == 0: # STATS_TYPE_CORE + # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total + # Format: = 30: + (recv_errors,) = struct.unpack('= 0: + res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") + else: + res["channel_name"] = name_bytes.decode("utf-8", "ignore") + + res["channel_secret"] = dbuf.read(16) + res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] + + await self.packet_parser.newChannel(res) + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) + + # Push notifications + elif packet_type_value == PacketType.ADVERTISEMENT.value: + logger.debug("Advertisement received") + res = {} + res["public_key"] = dbuf.read(32).hex() + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) + + elif packet_type_value == PacketType.PATH_UPDATE.value: + logger.debug("Code path update") + res = {} + res["public_key"] = dbuf.read(32).hex() + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) + + elif packet_type_value == PacketType.ACK.value: + logger.debug("Received ACK") + ack_data = {} + + if len(data) >= 5: + ack_data["code"] = dbuf.read(4).hex() + + attributes = {"code": ack_data.get("code", "")} + + await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) + + elif packet_type_value == PacketType.MESSAGES_WAITING.value: + logger.debug("Msgs are waiting") + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) + + elif packet_type_value == PacketType.RAW_DATA.value: + res = {} + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + res["payload"] = dbuf.read(4).hex() + logger.debug("Received raw data") + print(res) + await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) + + elif packet_type_value == PacketType.LOGIN_SUCCESS.value: + res = {} + attributes = {} + if len(data) > 1: + perms = dbuf.read(1)[0] + res["permissions"] = perms + res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set + + res["pubkey_prefix"] = dbuf.read(6).hex() + + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + + await self.dispatcher.dispatch( + Event(EventType.LOGIN_SUCCESS, res, attributes) + ) + + elif packet_type_value == PacketType.LOGIN_FAILED.value: + res = {} + attributes = {} + + dbuf.read(1) + + if len(data) > 7: + res["pubkey_prefix"] = pbuf.read(6).hex() + + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + + await self.dispatcher.dispatch( + Event(EventType.LOGIN_FAILED, res, attributes) + ) + + elif packet_type_value == PacketType.STATUS_RESPONSE.value: + res = parse_status(data, offset=8) + data_hex = data[8:].hex() + logger.debug(f"Status response: {data_hex}") + + attributes = { + "pubkey_prefix": res["pubkey_pre"], + } + + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.LOG_DATA.value: + logger.debug(f"Received RF log data: {data.hex()}") + + # Parse as raw RX data + log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} + attributes = {} + + recv_time = int(time.time()) + log_data["recv_time"] = recv_time + attributes["recv_time"] = recv_time + + # First byte is SNR (signed byte, multiplied by 4) + if len(data) > 1: + snr_byte = dbuf.read(1)[0] + # Convert to signed value + snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 + log_data["snr"] = snr + + # Second byte is RSSI (signed byte) + if len(data) > 2: + rssi_byte = dbuf.read(1)[0] + # Convert to signed value + rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 + log_data["rssi"] = rssi + + # Remaining bytes are the raw data payload + payload = None + if len(data) > 3: + payload=dbuf.read() + log_data["payload"] = payload.hex() + log_data["payload_length"] = len(payload) + + # Parse payload and get some info from it + log_data = await self.packet_parser.parsePacketPayload(payload, log_data) + attributes['route_type'] = log_data['route_type'] + attributes['payload_type'] = log_data['payload_type'] + attributes['path_len'] = log_data['path_len'] + attributes['path'] = log_data['path'] + + # Dispatch as RF log data + await self.dispatcher.dispatch( + Event(EventType.RX_LOG_DATA, log_data, attributes) + ) + + elif packet_type_value == PacketType.TRACE_DATA.value: + logger.debug(f"Received trace data: {data.hex()}") + res = {} + + # According to the source, format is: + # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr + + reserved = dbuf.read(1)[0] + path_len = dbuf.read(1)[0] + flags = dbuf.read(1)[0] + tag = int.from_bytes(dbuf.read(4), byteorder="little") + auth_code = int.from_bytes(dbuf.read(4), byteorder="little") + + path_hash_len = 1 << (flags&3) + path_len = int(path_len / path_hash_len) + + # Initialize result + res["tag"] = tag + res["auth"] = auth_code + res["flags"] = flags + res["path_len"] = path_len + + # Process path as array of objects with hash and SNR + path_nodes = [] + + if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: + # Extract path with hash and SNR pairs + for i in range(path_len): + node = { + "hash": dbuf.read(path_hash_len).hex(), + } + path_nodes.append(node) + + for n in path_nodes: + node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + n["snr"] = node_snr / 4.0 + + # Add the final node (our device) with its SNR + final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 + path_nodes.append({"snr": final_snr}) + + res["path"] = path_nodes + + logger.debug(f"Parsed trace data: {res}") + + attributes = { + "tag": res["tag"], + "auth_code": res["auth"], + } + + await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) + + elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: + logger.debug(f"Received telemetry data: {data.hex()}") + res = {} + + dbuf.read(1) + + res["pubkey_pre"] = dbuf.read(6).hex() + buf = dbuf.read() + + """Parse a given byte string and return as a LppFrame object.""" + i = 0 + lpp_data_list = [] + while i < len(buf) and buf[i] != 0: + lppdata = LppData.from_bytes(buf[i:]) + lpp_data_list.append(lppdata) + i = i + len(lppdata) + + lpp = json.loads( + json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) + ) + + res["lpp"] = lpp + + attributes = { + "raw": buf.hex(), + "pubkey_prefix": res["pubkey_pre"], + } + + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: + res = {} + freqs = [] + + cont = True + try: + while cont: + min = int.from_bytes(dbuf.read(4), "little", signed=False) + max = int.from_bytes(dbuf.read(4), "little", signed=False) + if min == 0 or max == 0: + cont = False + else: + freqs.append({"min" : min, "max": max}) + except e: + print(e) + + res["freqs"] = freqs + + await self.dispatcher.dispatch( + Event(EventType.ALLOWED_REPEAT_FREQ, res) + ) + + elif packet_type_value == PacketType.BINARY_RESPONSE.value: + dbuf.read(1) + tag = dbuf.read(4).hex() + response_data = dbuf.read() + logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") + + # Always dispatch generic BINARY_RESPONSE + binary_res = {"tag": tag, "data": response_data.hex()} + await self.dispatcher.dispatch( + Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) + ) + + # Check for tracked request type and dispatch specific response + if tag in self.pending_binary_requests: + request_type = self.pending_binary_requests[tag]["request_type"] + is_anon = self.pending_binary_requests[tag]["is_anon"] + pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] + context = self.pending_binary_requests[tag]["context"] + del self.pending_binary_requests[tag] + logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") + + if not is_anon: + + if request_type == BinaryReqType.STATUS and len(response_data) >= 52: + res = {} + res = parse_status(response_data, pubkey_prefix=pubkey_prefix) + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) + ) + + elif request_type == BinaryReqType.TELEMETRY : + try: + lpp = lpp_parse(response_data) + telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) + ) + except Exception as e: + logger.error(f"Error parsing binary telemetry response: {e}") + + elif request_type == BinaryReqType.MMA: + try: + mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header + mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.MMA_RESPONSE, mma_res, mma_res) + ) + except Exception as e: + logger.error(f"Error parsing binary MMA response: {e}") + + elif request_type == BinaryReqType.ACL: + try: + acl_result = parse_acl(response_data) + acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + except Exception as e: + logger.error(f"Error parsing binary ACL response: {e}") + + elif request_type == BinaryReqType.NEIGHBOURS: + try: + pk_plen = context["pubkey_prefix_length"] + bbuf = io.BytesIO(response_data) + + res = { + "pubkey_prefix": pubkey_prefix, + "tag": tag + } + res.update(context) # add context in result + + res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) + results_count = int.from_bytes(bbuf.read(2), "little", signed=True) + res["results_count"] = results_count + + neighbours_list = [] + + for _ in range (results_count): + neighb = {} + neighb["pubkey"] = bbuf.read(pk_plen).hex() + neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) + neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 + neighbours_list.append(neighb) + + res["neighbours"] = neighbours_list + + await self.dispatcher.dispatch( + Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + + except Exception as e: + logger.error(f"Error parsing binary NEIGHBOURS response: {e}") + + else: + logger.debug(f"No tracked request found for binary response tag {tag}") + + elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: + logger.debug(f"Received path discovery response: {data.hex()}") + res = {} + dbuf.read(1) + res["pubkey_pre"] = dbuf.read(6).hex() + opl = dbuf.read(1)[0] + opl_hlen = ((opl & 0xc0) >> 6) + 1 + opl = opl & 0x3f + res["out_path_len"] = opl + res["out_path_hash_len"] = opl_hlen + res["out_path"] = dbuf.read(opl*opl_hlen).hex() + ipl = dbuf.read(1)[0] + ipl_hlen = ((ipl & 0xc0) >> 6) + 1 + ipl = ipl & 0x3f + res["in_path_len"] = ipl + res["in_path_hash_len"] = ipl_hlen + res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() + + attributes = {"pubkey_pre": res["pubkey_pre"]} + + await self.dispatcher.dispatch( + Event(EventType.PATH_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.PRIVATE_KEY.value: + logger.debug(f"Received private key response: {data.hex()}") + if len(data) >= 65: # 1 byte response code + 64 bytes private key + private_key = dbuf.read(64) # Extract 64-byte private key + res = {"private_key": private_key} + await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) + else: + logger.error(f"Invalid private key response length: {len(data)}") + + elif packet_type_value == PacketType.SIGN_START.value: + logger.debug(f"Received sign start response: {data.hex()}") + # Payload: 1 reserved byte, 4-byte max length + dbuf.read(1) + max_len = int.from_bytes(dbuf.read(4), "little") + res = {"max_length": max_len} + await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) + + elif packet_type_value == PacketType.SIGNATURE.value: + logger.debug(f"Received signature: {data.hex()}") + signature = dbuf.read() + res = {"signature": signature} + await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) + + elif packet_type_value == PacketType.DISABLED.value: + logger.debug("Received disabled response") + res = {"reason": "private_key_export_disabled"} + await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) + + elif packet_type_value == PacketType.CONTROL_DATA.value: + logger.debug("Received control data packet") + res={} + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + res["path_len"] = dbuf.read(1)[0] + payload = dbuf.read() + payload_type = payload[0] + res["payload_type"] = payload_type + res["payload"] = payload + + attributes = {"payload_type": payload_type} + await self.dispatcher.dispatch( + Event(EventType.CONTROL_DATA, res, attributes) + ) + + # decode NODE_DISCOVER_RESP + if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: + pbuf = io.BytesIO(payload[1:]) + ndr = dict(res) + del ndr["payload_type"] + del ndr["payload"] + ndr["node_type"] = payload_type & 0x0F + ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 + ndr["tag"] = pbuf.read(4).hex() + + pubkey = pbuf.read() + if len(pubkey) < 32: + pubkey = pubkey[0:8] + else: + pubkey = pubkey[0:32] + + ndr["pubkey"] = pubkey.hex() + + attributes = { + "node_type" : ndr["node_type"], + "tag" : ndr["tag"], + "pubkey" : ndr["pubkey"], + } + + await self.dispatcher.dispatch( + Event(EventType.DISCOVER_RESPONSE, ndr, attributes) + ) + + else: + logger.debug(f"Unhandled data received {data}") + logger.debug(f"Unhandled packet type: {packet_type_value}") + except Exception as e: + logger.error( + "handle_rx parse error: %s: %s | raw=%s\n%s", + type(e).__name__, + e, + data.hex(), + traceback.format_exc(), + )