diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 802004a..088baeb 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -3,6 +3,7 @@ import json import struct import time import io +import traceback from typing import Any, Dict from .events import Event, EventType, EventDispatcher, ErrorMessages from .meshcore_parser import MeshcorePacketParser @@ -69,853 +70,862 @@ class MessageReader: except IndexError as e: logger.warning(f"Received empty packet: {e}") return - logger.debug(f"Received data: {data.hex()}") + try: + logger.debug(f"Received data: {data.hex()}") - # Handle command responses - if packet_type_value == PacketType.OK.value: - result: Dict[str, Any] = {} - if len(data) == 5: - result["value"] = int.from_bytes(data[1:5], byteorder="little") + # Handle command responses + if packet_type_value == PacketType.OK.value: + result: Dict[str, Any] = {} + if len(data) == 5: + result["value"] = int.from_bytes(data[1:5], byteorder="little") - # Dispatch event for the OK response - await self.dispatcher.dispatch(Event(EventType.OK, result)) + # Dispatch event for the OK response + await self.dispatcher.dispatch(Event(EventType.OK, result)) - elif packet_type_value == PacketType.ERROR.value: - if len(data) > 1: - result = { "error_code": data[1], } - if data[1] in ErrorMessages: - result["code_string"] = ErrorMessages[data[1]] - else: - result = {} - - # Dispatch event for the ERROR response - await self.dispatcher.dispatch(Event(EventType.ERROR, result)) - - elif packet_type_value == PacketType.CONTACT_START.value: - self.contact_nb = int.from_bytes(data[1:5], byteorder="little") - self.contacts = {} - - elif ( - packet_type_value == PacketType.CONTACT.value - or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value - ): - c = {} - c["public_key"] = dbuf.read(32).hex() - c["type"] = dbuf.read(1)[0] - c["flags"] = dbuf.read(1)[0] - plen = int.from_bytes(dbuf.read(1), signed=False, byteorder="little") - if plen == 255: # flood - c["out_path_hash_mode"] = -1 - c["out_path_len"] = -1 # 6 LSB - else: - c["out_path_hash_mode"] = plen >> 6 - c["out_path_len"] = plen & 0x3F # 6 LSB - c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() - c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") - c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") - c["adv_lat"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - c["adv_lon"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") - - if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: - await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) - else: - await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) - self.contacts[c["public_key"]] = c - - elif packet_type_value == PacketType.ADVERT_PATH.value : - r = {} - r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) - plen = int.from_bytes(dbuf.read(1), "little", signed=False) - if plen == 255: # flood, should not happen - r["path_hash_mode"] = -1 - r["path_len"] = -1 - else: - r["path_hash_mode"] = plen >> 6 # 2 upper bytes - r["path_len"] = plen & 0x3F - r["path"] = dbuf.read().replace(b"\0", b"").hex() - - await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) - - elif packet_type_value == PacketType.CONTACT_END.value: - lastmod = int.from_bytes(dbuf.read(4), byteorder="little") - attributes = { - "lastmod": lastmod, - } - await self.dispatcher.dispatch( - Event(EventType.CONTACTS, self.contacts, attributes) - ) - - elif packet_type_value == PacketType.SELF_INFO.value: - self_info = {} - self_info["adv_type"] = dbuf.read(1)[0] - self_info["tx_power"] = dbuf.read(1)[0] - self_info["max_tx_power"] = dbuf.read(1)[0] - self_info["public_key"] = dbuf.read(32).hex() - self_info["adv_lat"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - self_info["adv_lon"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - self_info["multi_acks"] = dbuf.read(1)[0] - self_info["adv_loc_policy"] = dbuf.read(1)[0] - telemetry_mode = dbuf.read(1)[0] - self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 - self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 - self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 - self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 - self_info["radio_freq"] = ( - int.from_bytes(dbuf.read(4), byteorder="little") / 1000 - ) - self_info["radio_bw"] = ( - int.from_bytes(dbuf.read(4), byteorder="little") / 1000 - ) - self_info["radio_sf"] = dbuf.read(1)[0] - self_info["radio_cr"] = dbuf.read(1)[0] - self_info["name"] = dbuf.read().decode("utf-8", "ignore") - await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) - - elif packet_type_value == PacketType.MSG_SENT.value: - res = {} - res["type"] = dbuf.read(1)[0] - res["expected_ack"] = dbuf.read(4) - res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") - - attributes = { - "type": res["type"], - "expected_ack": res["expected_ack"].hex(), - } - - await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) - - elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: - res = {} - res["type"] = "PRIV" - res["pubkey_prefix"] = dbuf.read(6).hex() - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - txt_type = dbuf.read(1)[0] - res["txt_type"] = txt_type - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") - if txt_type == 2: - res["signature"] = dbuf.read(4).hex() - res["text"] = dbuf.read().decode("utf-8", "ignore") - - attributes = { - "pubkey_prefix": res["pubkey_prefix"], - "txt_type": res["txt_type"], - } - - evt_type = EventType.CONTACT_MSG_RECV - - await self.dispatcher.dispatch(Event(evt_type, res, attributes)) - - elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "PRIV" - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - dbuf.read(2) # reserved - res["pubkey_prefix"] = dbuf.read(6).hex() - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - txt_type = dbuf.read(1)[0] - res["txt_type"] = txt_type - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") - if txt_type == 2: - res["signature"] = dbuf.read(4).hex() - res["text"] = dbuf.read().decode("utf-8", "ignore") - - attributes = { - "pubkey_prefix": res["pubkey_prefix"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CONTACT_MSG_RECV, res, attributes) - ) - - elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: - res = {} - res["type"] = "CHAN" - res["channel_idx"] = dbuf.read(1)[0] - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - res["txt_type"] = dbuf.read(1)[0] - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) - text = dbuf.read().strip(b"\0") - res["text"] = text.decode("utf-8", "ignore") - - # search for text in log_channels - txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) - if self.decrypt_channels: - logged = await self.packet_parser.findLogChannelMsg(txt_hash) - if not logged is None: - res["path"] = logged["path"] - res["RSSI"] = logged["rssi"] - res["SNR"] = logged["snr"] - res["recv_time"] = logged["recv_time"] - res["attempt"] = logged["attempt"] - - attributes = { - "channel_idx": res["channel_idx"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CHANNEL_MSG_RECV, res, attributes) - ) - - elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "CHAN" - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - dbuf.read(2) # reserved - res["channel_idx"] = dbuf.read(1)[0] - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - res["txt_type"] = dbuf.read(1)[0] - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) - text = dbuf.read() - res["text"] = text.decode("utf-8", "ignore") - - # search for text in log_channels - if self.decrypt_channels: - txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) - res["txt_hash"] = txt_hash - logged = await self.packet_parser.findLogChannelMsg(txt_hash) - - if not logged is None: - res["path"] = logged["path"] - res["RSSI"] = logged["rssi"] - res["recv_time"] = logged["recv_time"] - res["attempt"] = logged["attempt"] - - attributes = { - "channel_idx": res["channel_idx"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CHANNEL_MSG_RECV, res, attributes) - ) - - elif packet_type_value == PacketType.CURRENT_TIME.value: - time_value = int.from_bytes(dbuf.read(4), byteorder="little") - result = {"time": time_value} - await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) - - elif packet_type_value == PacketType.NO_MORE_MSGS.value: - result = {"messages_available": False} - await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) - - elif packet_type_value == PacketType.CONTACT_URI.value: - contact_uri = "meshcore://" + dbuf.read().hex() - result = {"uri": contact_uri} - await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) - - elif packet_type_value == PacketType.BATTERY.value: - battery_level = int.from_bytes(dbuf.read(2), byteorder="little") - result = {"level": battery_level} - if len(data) > 3: # has storage info as well - result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") - result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") - await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) - - elif packet_type_value == PacketType.DEVICE_INFO.value: - res = {} - fw_ver = dbuf.read(1)[0] - res["fw ver"] = fw_ver - if fw_ver >= 3: - res["max_contacts"] = dbuf.read(1)[0] * 2 - res["max_channels"] = dbuf.read(1)[0] - res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") - res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") - res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") - res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") - if fw_ver >= 9: # has repeater mode - rpt = dbuf.read(1) - if len(rpt) > 0: - res["repeat"] = (rpt[0] != 0) - if fw_ver >= 10: # has path_hash_mode - path_hash_mode = dbuf.read(1)[0] - res["path_hash_mode"] = path_hash_mode - await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) - - elif packet_type_value == PacketType.CUSTOM_VARS.value: - logger.debug(f"received custom vars response: {data.hex()}") - res = {} - rawdata = dbuf.read().decode("utf-8", "ignore") - if not rawdata == "": - pairs = rawdata.split(",") - for p in pairs: - psplit = p.split(":") - res[psplit[0]] = psplit[1] - logger.debug(f"got custom vars : {res}") - await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) - - elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) - logger.debug(f"received stats response: {data.hex()}") - # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte - # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) - if len(data) < 2: - logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") - await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) - return - - stats_type = data[1] - - if stats_type == 0: # STATS_TYPE_CORE - # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total - # Format: 1: + result = { "error_code": data[1], } + if data[1] in ErrorMessages: + result["code_string"] = ErrorMessages[data[1]] else: - try: - battery_mv, uptime_secs, errors, queue_len = struct.unpack('> 6 + c["out_path_len"] = plen & 0x3F # 6 LSB + c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() + c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") + c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") + c["adv_lat"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + c["adv_lon"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") + + if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: + await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) else: - try: - recv, sent, flood_tx, direct_tx, flood_rx, direct_rx = struct.unpack('= 30: - (recv_errors,) = struct.unpack('= 0: - res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") - else: - res["channel_name"] = name_bytes.decode("utf-8", "ignore") - - res["channel_secret"] = dbuf.read(16) - res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] - - await self.packet_parser.newChannel(res) - - await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) - - # Push notifications - elif packet_type_value == PacketType.ADVERTISEMENT.value: - logger.debug("Advertisement received") - res = {} - res["public_key"] = dbuf.read(32).hex() - await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) - - elif packet_type_value == PacketType.PATH_UPDATE.value: - logger.debug("Code path update") - res = {} - res["public_key"] = dbuf.read(32).hex() - await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) - - elif packet_type_value == PacketType.ACK.value: - logger.debug("Received ACK") - ack_data = {} - - if len(data) >= 5: - ack_data["code"] = dbuf.read(4).hex() - - attributes = {"code": ack_data.get("code", "")} - - await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) - - elif packet_type_value == PacketType.MESSAGES_WAITING.value: - logger.debug("Msgs are waiting") - await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) - - elif packet_type_value == PacketType.RAW_DATA.value: - res = {} - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - res["payload"] = dbuf.read(4).hex() - logger.debug("Received raw data") - print(res) - await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) - - elif packet_type_value == PacketType.LOGIN_SUCCESS.value: - res = {} - attributes = {} - if len(data) > 1: - perms = dbuf.read(1)[0] - res["permissions"] = perms - res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set - - res["pubkey_prefix"] = dbuf.read(6).hex() - - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} - - await self.dispatcher.dispatch( - Event(EventType.LOGIN_SUCCESS, res, attributes) - ) - - elif packet_type_value == PacketType.LOGIN_FAILED.value: - res = {} - attributes = {} - - dbuf.read(1) - - if len(data) > 7: - res["pubkey_prefix"] = pbuf.read(6).hex() - - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} - - await self.dispatcher.dispatch( - Event(EventType.LOGIN_FAILED, res, attributes) - ) - - elif packet_type_value == PacketType.STATUS_RESPONSE.value: - res = parse_status(data, offset=8) - data_hex = data[8:].hex() - logger.debug(f"Status response: {data_hex}") - - attributes = { - "pubkey_prefix": res["pubkey_pre"], - } - - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.LOG_DATA.value: - logger.debug(f"Received RF log data: {data.hex()}") - - # Parse as raw RX data - log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} - attributes = {} - - recv_time = int(time.time()) - log_data["recv_time"] = recv_time - attributes["recv_time"] = recv_time - - # First byte is SNR (signed byte, multiplied by 4) - if len(data) > 1: - snr_byte = dbuf.read(1)[0] - # Convert to signed value - snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 - log_data["snr"] = snr - - # Second byte is RSSI (signed byte) - if len(data) > 2: - rssi_byte = dbuf.read(1)[0] - # Convert to signed value - rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 - log_data["rssi"] = rssi - - # Remaining bytes are the raw data payload - payload = None - if len(data) > 3: - payload=dbuf.read() - log_data["payload"] = payload.hex() - log_data["payload_length"] = len(payload) - - # Parse payload and get some info from it - log_data = await self.packet_parser.parsePacketPayload(payload, log_data) - attributes['route_type'] = log_data['route_type'] - attributes['payload_type'] = log_data['payload_type'] - attributes['path_len'] = log_data['path_len'] - attributes['path'] = log_data['path'] - - # Dispatch as RF log data - await self.dispatcher.dispatch( - Event(EventType.RX_LOG_DATA, log_data, attributes) - ) - - elif packet_type_value == PacketType.TRACE_DATA.value: - logger.debug(f"Received trace data: {data.hex()}") - res = {} - - # According to the source, format is: - # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr - - reserved = dbuf.read(1)[0] - path_len = dbuf.read(1)[0] - flags = dbuf.read(1)[0] - tag = int.from_bytes(dbuf.read(4), byteorder="little") - auth_code = int.from_bytes(dbuf.read(4), byteorder="little") - - path_hash_len = 1 << (flags&3) - path_len = int(path_len / path_hash_len) - - # Initialize result - res["tag"] = tag - res["auth"] = auth_code - res["flags"] = flags - res["path_len"] = path_len - - # Process path as array of objects with hash and SNR - path_nodes = [] - - if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: - # Extract path with hash and SNR pairs - for i in range(path_len): - node = { - "hash": dbuf.read(path_hash_len).hex(), - } - path_nodes.append(node) - - for n in path_nodes: - node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - n["snr"] = node_snr / 4.0 - - # Add the final node (our device) with its SNR - final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 - path_nodes.append({"snr": final_snr}) - - res["path"] = path_nodes - - logger.debug(f"Parsed trace data: {res}") - - attributes = { - "tag": res["tag"], - "auth_code": res["auth"], - } - - await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) - - elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: - logger.debug(f"Received telemetry data: {data.hex()}") - res = {} - - dbuf.read(1) - - res["pubkey_pre"] = dbuf.read(6).hex() - buf = dbuf.read() - - """Parse a given byte string and return as a LppFrame object.""" - i = 0 - lpp_data_list = [] - while i < len(buf) and buf[i] != 0: - lppdata = LppData.from_bytes(buf[i:]) - lpp_data_list.append(lppdata) - i = i + len(lppdata) - - lpp = json.loads( - json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) - ) - - res["lpp"] = lpp - - attributes = { - "raw": buf.hex(), - "pubkey_prefix": res["pubkey_pre"], - } - - await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: - res = {} - freqs = [] - - cont = True - try: - while cont: - min = int.from_bytes(dbuf.read(4), "little", signed=False) - max = int.from_bytes(dbuf.read(4), "little", signed=False) - if min == 0 or max == 0: - cont = False - else: - freqs.append({"min" : min, "max": max}) - except e: - print(e) - - res["freqs"] = freqs - - await self.dispatcher.dispatch( - Event(EventType.ALLOWED_REPEAT_FREQ, res) - ) - - elif packet_type_value == PacketType.BINARY_RESPONSE.value: - dbuf.read(1) - tag = dbuf.read(4).hex() - response_data = dbuf.read() - logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") - - # Always dispatch generic BINARY_RESPONSE - binary_res = {"tag": tag, "data": response_data.hex()} - await self.dispatcher.dispatch( - Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) - ) - - # Check for tracked request type and dispatch specific response - if tag in self.pending_binary_requests: - request_type = self.pending_binary_requests[tag]["request_type"] - is_anon = self.pending_binary_requests[tag]["is_anon"] - pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] - context = self.pending_binary_requests[tag]["context"] - del self.pending_binary_requests[tag] - logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") - - if not is_anon: - - if request_type == BinaryReqType.STATUS and len(response_data) >= 52: - res = {} - res = parse_status(response_data, pubkey_prefix=pubkey_prefix) - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) - ) - - elif request_type == BinaryReqType.TELEMETRY : - try: - lpp = lpp_parse(response_data) - telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) - ) - except Exception as e: - logger.error(f"Error parsing binary telemetry response: {e}") - - elif request_type == BinaryReqType.MMA: - try: - mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header - mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.MMA_RESPONSE, mma_res, mma_res) - ) - except Exception as e: - logger.error(f"Error parsing binary MMA response: {e}") - - elif request_type == BinaryReqType.ACL: - try: - acl_result = parse_acl(response_data) - acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) - ) - except Exception as e: - logger.error(f"Error parsing binary ACL response: {e}") - - elif request_type == BinaryReqType.NEIGHBOURS: - try: - pk_plen = context["pubkey_prefix_length"] - bbuf = io.BytesIO(response_data) - - res = { - "pubkey_prefix": pubkey_prefix, - "tag": tag - } - res.update(context) # add context in result - - res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) - results_count = int.from_bytes(bbuf.read(2), "little", signed=True) - res["results_count"] = results_count - - neighbours_list = [] - - for _ in range (results_count): - neighb = {} - neighb["pubkey"] = bbuf.read(pk_plen).hex() - neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) - neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 - neighbours_list.append(neighb) - - res["neighbours"] = neighbours_list - - await self.dispatcher.dispatch( - Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) - ) - - except Exception as e: - logger.error(f"Error parsing binary NEIGHBOURS response: {e}") - - else: - logger.debug(f"No tracked request found for binary response tag {tag}") - - elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: - logger.debug(f"Received path discovery response: {data.hex()}") - res = {} - dbuf.read(1) - res["pubkey_pre"] = dbuf.read(6).hex() - opl = dbuf.read(1)[0] - opl_hlen = ((opl & 0xc0) >> 6) + 1 - opl = opl & 0x3f - res["out_path_len"] = opl - res["out_path_hash_len"] = opl_hlen - res["out_path"] = dbuf.read(opl*opl_hlen).hex() - ipl = dbuf.read(1)[0] - ipl_hlen = ((ipl & 0xc0) >> 6) + 1 - ipl = ipl & 0x3f - res["in_path_len"] = ipl - res["in_path_hash_len"] = ipl_hlen - res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() - - attributes = {"pubkey_pre": res["pubkey_pre"]} - - await self.dispatcher.dispatch( - Event(EventType.PATH_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.PRIVATE_KEY.value: - logger.debug(f"Received private key response: {data.hex()}") - if len(data) >= 65: # 1 byte response code + 64 bytes private key - private_key = dbuf.read(64) # Extract 64-byte private key - res = {"private_key": private_key} - await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) - else: - logger.error(f"Invalid private key response length: {len(data)}") - - elif packet_type_value == PacketType.SIGN_START.value: - logger.debug(f"Received sign start response: {data.hex()}") - # Payload: 1 reserved byte, 4-byte max length - dbuf.read(1) - max_len = int.from_bytes(dbuf.read(4), "little") - res = {"max_length": max_len} - await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) - - elif packet_type_value == PacketType.SIGNATURE.value: - logger.debug(f"Received signature: {data.hex()}") - signature = dbuf.read() - res = {"signature": signature} - await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) - - elif packet_type_value == PacketType.DISABLED.value: - logger.debug("Received disabled response") - res = {"reason": "private_key_export_disabled"} - await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) - - elif packet_type_value == PacketType.CONTROL_DATA.value: - logger.debug("Received control data packet") - res={} - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - res["path_len"] = dbuf.read(1)[0] - payload = dbuf.read() - payload_type = payload[0] - res["payload_type"] = payload_type - res["payload"] = payload - - attributes = {"payload_type": payload_type} - await self.dispatcher.dispatch( - Event(EventType.CONTROL_DATA, res, attributes) - ) - - # decode NODE_DISCOVER_RESP - if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: - pbuf = io.BytesIO(payload[1:]) - ndr = dict(res) - del ndr["payload_type"] - del ndr["payload"] - ndr["node_type"] = payload_type & 0x0F - ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 - ndr["tag"] = pbuf.read(4).hex() - - pubkey = pbuf.read() - if len(pubkey) < 32: - pubkey = pubkey[0:8] + await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) + self.contacts[c["public_key"]] = c + + elif packet_type_value == PacketType.ADVERT_PATH.value : + r = {} + r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) + plen = int.from_bytes(dbuf.read(1), "little", signed=False) + if plen == 255: # flood, should not happen + r["path_hash_mode"] = -1 + r["path_len"] = -1 else: - pubkey = pubkey[0:32] + r["path_hash_mode"] = plen >> 6 # 2 upper bytes + r["path_len"] = plen & 0x3F + r["path"] = dbuf.read().replace(b"\0", b"").hex() - ndr["pubkey"] = pubkey.hex() + await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) + + elif packet_type_value == PacketType.CONTACT_END.value: + lastmod = int.from_bytes(dbuf.read(4), byteorder="little") + attributes = { + "lastmod": lastmod, + } + await self.dispatcher.dispatch( + Event(EventType.CONTACTS, self.contacts, attributes) + ) + + elif packet_type_value == PacketType.SELF_INFO.value: + self_info = {} + self_info["adv_type"] = dbuf.read(1)[0] + self_info["tx_power"] = dbuf.read(1)[0] + self_info["max_tx_power"] = dbuf.read(1)[0] + self_info["public_key"] = dbuf.read(32).hex() + self_info["adv_lat"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + self_info["adv_lon"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + self_info["multi_acks"] = dbuf.read(1)[0] + self_info["adv_loc_policy"] = dbuf.read(1)[0] + telemetry_mode = dbuf.read(1)[0] + self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 + self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 + self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 + self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 + self_info["radio_freq"] = ( + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 + ) + self_info["radio_bw"] = ( + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 + ) + self_info["radio_sf"] = dbuf.read(1)[0] + self_info["radio_cr"] = dbuf.read(1)[0] + self_info["name"] = dbuf.read().decode("utf-8", "ignore") + await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) + + elif packet_type_value == PacketType.MSG_SENT.value: + res = {} + res["type"] = dbuf.read(1)[0] + res["expected_ack"] = dbuf.read(4) + res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") attributes = { - "node_type" : ndr["node_type"], - "tag" : ndr["tag"], - "pubkey" : ndr["pubkey"], + "type": res["type"], + "expected_ack": res["expected_ack"].hex(), + } + + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) + + elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: + res = {} + res["type"] = "PRIV" + res["pubkey_prefix"] = dbuf.read(6).hex() + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"], + } + + evt_type = EventType.CONTACT_MSG_RECV + + await self.dispatcher.dispatch(Event(evt_type, res, attributes)) + + elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "PRIV" + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + dbuf.read(2) # reserved + res["pubkey_prefix"] = dbuf.read(6).hex() + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"], } await self.dispatcher.dispatch( - Event(EventType.DISCOVER_RESPONSE, ndr, attributes) + Event(EventType.CONTACT_MSG_RECV, res, attributes) ) - else: - logger.debug(f"Unhandled data received {data}") - logger.debug(f"Unhandled packet type: {packet_type_value}") + elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: + res = {} + res["type"] = "CHAN" + res["channel_idx"] = dbuf.read(1)[0] + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) + text = dbuf.read().strip(b"\0") + res["text"] = text.decode("utf-8", "ignore") + + # search for text in log_channels + txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) + if self.decrypt_channels: + logged = await self.packet_parser.findLogChannelMsg(txt_hash) + if not logged is None: + res["path"] = logged["path"] + res["RSSI"] = logged["rssi"] + res["SNR"] = logged["snr"] + res["recv_time"] = logged["recv_time"] + res["attempt"] = logged["attempt"] + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"], + } + + await self.dispatcher.dispatch( + Event(EventType.CHANNEL_MSG_RECV, res, attributes) + ) + + elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "CHAN" + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + dbuf.read(2) # reserved + res["channel_idx"] = dbuf.read(1)[0] + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) + text = dbuf.read() + res["text"] = text.decode("utf-8", "ignore") + + # search for text in log_channels + if self.decrypt_channels: + txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) + res["txt_hash"] = txt_hash + logged = await self.packet_parser.findLogChannelMsg(txt_hash) + + if not logged is None: + res["path"] = logged["path"] + res["RSSI"] = logged["rssi"] + res["recv_time"] = logged["recv_time"] + res["attempt"] = logged["attempt"] + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"], + } + + await self.dispatcher.dispatch( + Event(EventType.CHANNEL_MSG_RECV, res, attributes) + ) + + elif packet_type_value == PacketType.CURRENT_TIME.value: + time_value = int.from_bytes(dbuf.read(4), byteorder="little") + result = {"time": time_value} + await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) + + elif packet_type_value == PacketType.NO_MORE_MSGS.value: + result = {"messages_available": False} + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) + + elif packet_type_value == PacketType.CONTACT_URI.value: + contact_uri = "meshcore://" + dbuf.read().hex() + result = {"uri": contact_uri} + await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) + + elif packet_type_value == PacketType.BATTERY.value: + battery_level = int.from_bytes(dbuf.read(2), byteorder="little") + result = {"level": battery_level} + if len(data) > 3: # has storage info as well + result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) + + elif packet_type_value == PacketType.DEVICE_INFO.value: + res = {} + fw_ver = dbuf.read(1)[0] + res["fw ver"] = fw_ver + if fw_ver >= 3: + res["max_contacts"] = dbuf.read(1)[0] * 2 + res["max_channels"] = dbuf.read(1)[0] + res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") + res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") + res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") + res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") + if fw_ver >= 9: # has repeater mode + rpt = dbuf.read(1) + if len(rpt) > 0: + res["repeat"] = (rpt[0] != 0) + if fw_ver >= 10: # has path_hash_mode + path_hash_mode = dbuf.read(1)[0] + res["path_hash_mode"] = path_hash_mode + await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) + + elif packet_type_value == PacketType.CUSTOM_VARS.value: + logger.debug(f"received custom vars response: {data.hex()}") + res = {} + rawdata = dbuf.read().decode("utf-8", "ignore") + if not rawdata == "": + pairs = rawdata.split(",") + for p in pairs: + psplit = p.split(":") + res[psplit[0]] = psplit[1] + logger.debug(f"got custom vars : {res}") + await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) + + elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) + logger.debug(f"received stats response: {data.hex()}") + # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte + # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) + if len(data) < 2: + logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") + await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) + return + + stats_type = data[1] + + if stats_type == 0: # STATS_TYPE_CORE + # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total + # Format: = 30: + (recv_errors,) = struct.unpack('= 0: + res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") + else: + res["channel_name"] = name_bytes.decode("utf-8", "ignore") + + res["channel_secret"] = dbuf.read(16) + res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] + + await self.packet_parser.newChannel(res) + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) + + # Push notifications + elif packet_type_value == PacketType.ADVERTISEMENT.value: + logger.debug("Advertisement received") + res = {} + res["public_key"] = dbuf.read(32).hex() + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) + + elif packet_type_value == PacketType.PATH_UPDATE.value: + logger.debug("Code path update") + res = {} + res["public_key"] = dbuf.read(32).hex() + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) + + elif packet_type_value == PacketType.ACK.value: + logger.debug("Received ACK") + ack_data = {} + + if len(data) >= 5: + ack_data["code"] = dbuf.read(4).hex() + + attributes = {"code": ack_data.get("code", "")} + + await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) + + elif packet_type_value == PacketType.MESSAGES_WAITING.value: + logger.debug("Msgs are waiting") + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) + + elif packet_type_value == PacketType.RAW_DATA.value: + res = {} + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + res["payload"] = dbuf.read(4).hex() + logger.debug("Received raw data") + print(res) + await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) + + elif packet_type_value == PacketType.LOGIN_SUCCESS.value: + res = {} + attributes = {} + if len(data) > 1: + perms = dbuf.read(1)[0] + res["permissions"] = perms + res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set + + res["pubkey_prefix"] = dbuf.read(6).hex() + + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + + await self.dispatcher.dispatch( + Event(EventType.LOGIN_SUCCESS, res, attributes) + ) + + elif packet_type_value == PacketType.LOGIN_FAILED.value: + res = {} + attributes = {} + + dbuf.read(1) + + if len(data) > 7: + res["pubkey_prefix"] = pbuf.read(6).hex() + + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + + await self.dispatcher.dispatch( + Event(EventType.LOGIN_FAILED, res, attributes) + ) + + elif packet_type_value == PacketType.STATUS_RESPONSE.value: + res = parse_status(data, offset=8) + data_hex = data[8:].hex() + logger.debug(f"Status response: {data_hex}") + + attributes = { + "pubkey_prefix": res["pubkey_pre"], + } + + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.LOG_DATA.value: + logger.debug(f"Received RF log data: {data.hex()}") + + # Parse as raw RX data + log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} + attributes = {} + + recv_time = int(time.time()) + log_data["recv_time"] = recv_time + attributes["recv_time"] = recv_time + + # First byte is SNR (signed byte, multiplied by 4) + if len(data) > 1: + snr_byte = dbuf.read(1)[0] + # Convert to signed value + snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 + log_data["snr"] = snr + + # Second byte is RSSI (signed byte) + if len(data) > 2: + rssi_byte = dbuf.read(1)[0] + # Convert to signed value + rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 + log_data["rssi"] = rssi + + # Remaining bytes are the raw data payload + payload = None + if len(data) > 3: + payload=dbuf.read() + log_data["payload"] = payload.hex() + log_data["payload_length"] = len(payload) + + # Parse payload and get some info from it + log_data = await self.packet_parser.parsePacketPayload(payload, log_data) + attributes['route_type'] = log_data['route_type'] + attributes['payload_type'] = log_data['payload_type'] + attributes['path_len'] = log_data['path_len'] + attributes['path'] = log_data['path'] + + # Dispatch as RF log data + await self.dispatcher.dispatch( + Event(EventType.RX_LOG_DATA, log_data, attributes) + ) + + elif packet_type_value == PacketType.TRACE_DATA.value: + logger.debug(f"Received trace data: {data.hex()}") + res = {} + + # According to the source, format is: + # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr + + reserved = dbuf.read(1)[0] + path_len = dbuf.read(1)[0] + flags = dbuf.read(1)[0] + tag = int.from_bytes(dbuf.read(4), byteorder="little") + auth_code = int.from_bytes(dbuf.read(4), byteorder="little") + + path_hash_len = 1 << (flags&3) + path_len = int(path_len / path_hash_len) + + # Initialize result + res["tag"] = tag + res["auth"] = auth_code + res["flags"] = flags + res["path_len"] = path_len + + # Process path as array of objects with hash and SNR + path_nodes = [] + + if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: + # Extract path with hash and SNR pairs + for i in range(path_len): + node = { + "hash": dbuf.read(path_hash_len).hex(), + } + path_nodes.append(node) + + for n in path_nodes: + node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + n["snr"] = node_snr / 4.0 + + # Add the final node (our device) with its SNR + final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 + path_nodes.append({"snr": final_snr}) + + res["path"] = path_nodes + + logger.debug(f"Parsed trace data: {res}") + + attributes = { + "tag": res["tag"], + "auth_code": res["auth"], + } + + await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) + + elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: + logger.debug(f"Received telemetry data: {data.hex()}") + res = {} + + dbuf.read(1) + + res["pubkey_pre"] = dbuf.read(6).hex() + buf = dbuf.read() + + """Parse a given byte string and return as a LppFrame object.""" + i = 0 + lpp_data_list = [] + while i < len(buf) and buf[i] != 0: + lppdata = LppData.from_bytes(buf[i:]) + lpp_data_list.append(lppdata) + i = i + len(lppdata) + + lpp = json.loads( + json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) + ) + + res["lpp"] = lpp + + attributes = { + "raw": buf.hex(), + "pubkey_prefix": res["pubkey_pre"], + } + + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: + res = {} + freqs = [] + + cont = True + try: + while cont: + min = int.from_bytes(dbuf.read(4), "little", signed=False) + max = int.from_bytes(dbuf.read(4), "little", signed=False) + if min == 0 or max == 0: + cont = False + else: + freqs.append({"min" : min, "max": max}) + except e: + print(e) + + res["freqs"] = freqs + + await self.dispatcher.dispatch( + Event(EventType.ALLOWED_REPEAT_FREQ, res) + ) + + elif packet_type_value == PacketType.BINARY_RESPONSE.value: + dbuf.read(1) + tag = dbuf.read(4).hex() + response_data = dbuf.read() + logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") + + # Always dispatch generic BINARY_RESPONSE + binary_res = {"tag": tag, "data": response_data.hex()} + await self.dispatcher.dispatch( + Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) + ) + + # Check for tracked request type and dispatch specific response + if tag in self.pending_binary_requests: + request_type = self.pending_binary_requests[tag]["request_type"] + is_anon = self.pending_binary_requests[tag]["is_anon"] + pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] + context = self.pending_binary_requests[tag]["context"] + del self.pending_binary_requests[tag] + logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") + + if not is_anon: + + if request_type == BinaryReqType.STATUS and len(response_data) >= 52: + res = {} + res = parse_status(response_data, pubkey_prefix=pubkey_prefix) + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) + ) + + elif request_type == BinaryReqType.TELEMETRY : + try: + lpp = lpp_parse(response_data) + telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) + ) + except Exception as e: + logger.error(f"Error parsing binary telemetry response: {e}") + + elif request_type == BinaryReqType.MMA: + try: + mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header + mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.MMA_RESPONSE, mma_res, mma_res) + ) + except Exception as e: + logger.error(f"Error parsing binary MMA response: {e}") + + elif request_type == BinaryReqType.ACL: + try: + acl_result = parse_acl(response_data) + acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + except Exception as e: + logger.error(f"Error parsing binary ACL response: {e}") + + elif request_type == BinaryReqType.NEIGHBOURS: + try: + pk_plen = context["pubkey_prefix_length"] + bbuf = io.BytesIO(response_data) + + res = { + "pubkey_prefix": pubkey_prefix, + "tag": tag + } + res.update(context) # add context in result + + res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) + results_count = int.from_bytes(bbuf.read(2), "little", signed=True) + res["results_count"] = results_count + + neighbours_list = [] + + for _ in range (results_count): + neighb = {} + neighb["pubkey"] = bbuf.read(pk_plen).hex() + neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) + neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 + neighbours_list.append(neighb) + + res["neighbours"] = neighbours_list + + await self.dispatcher.dispatch( + Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + + except Exception as e: + logger.error(f"Error parsing binary NEIGHBOURS response: {e}") + + else: + logger.debug(f"No tracked request found for binary response tag {tag}") + + elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: + logger.debug(f"Received path discovery response: {data.hex()}") + res = {} + dbuf.read(1) + res["pubkey_pre"] = dbuf.read(6).hex() + opl = dbuf.read(1)[0] + opl_hlen = ((opl & 0xc0) >> 6) + 1 + opl = opl & 0x3f + res["out_path_len"] = opl + res["out_path_hash_len"] = opl_hlen + res["out_path"] = dbuf.read(opl*opl_hlen).hex() + ipl = dbuf.read(1)[0] + ipl_hlen = ((ipl & 0xc0) >> 6) + 1 + ipl = ipl & 0x3f + res["in_path_len"] = ipl + res["in_path_hash_len"] = ipl_hlen + res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() + + attributes = {"pubkey_pre": res["pubkey_pre"]} + + await self.dispatcher.dispatch( + Event(EventType.PATH_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.PRIVATE_KEY.value: + logger.debug(f"Received private key response: {data.hex()}") + if len(data) >= 65: # 1 byte response code + 64 bytes private key + private_key = dbuf.read(64) # Extract 64-byte private key + res = {"private_key": private_key} + await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) + else: + logger.error(f"Invalid private key response length: {len(data)}") + + elif packet_type_value == PacketType.SIGN_START.value: + logger.debug(f"Received sign start response: {data.hex()}") + # Payload: 1 reserved byte, 4-byte max length + dbuf.read(1) + max_len = int.from_bytes(dbuf.read(4), "little") + res = {"max_length": max_len} + await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) + + elif packet_type_value == PacketType.SIGNATURE.value: + logger.debug(f"Received signature: {data.hex()}") + signature = dbuf.read() + res = {"signature": signature} + await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) + + elif packet_type_value == PacketType.DISABLED.value: + logger.debug("Received disabled response") + res = {"reason": "private_key_export_disabled"} + await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) + + elif packet_type_value == PacketType.CONTROL_DATA.value: + logger.debug("Received control data packet") + res={} + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + res["path_len"] = dbuf.read(1)[0] + payload = dbuf.read() + payload_type = payload[0] + res["payload_type"] = payload_type + res["payload"] = payload + + attributes = {"payload_type": payload_type} + await self.dispatcher.dispatch( + Event(EventType.CONTROL_DATA, res, attributes) + ) + + # decode NODE_DISCOVER_RESP + if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: + pbuf = io.BytesIO(payload[1:]) + ndr = dict(res) + del ndr["payload_type"] + del ndr["payload"] + ndr["node_type"] = payload_type & 0x0F + ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 + ndr["tag"] = pbuf.read(4).hex() + + pubkey = pbuf.read() + if len(pubkey) < 32: + pubkey = pubkey[0:8] + else: + pubkey = pubkey[0:32] + + ndr["pubkey"] = pubkey.hex() + + attributes = { + "node_type" : ndr["node_type"], + "tag" : ndr["tag"], + "pubkey" : ndr["pubkey"], + } + + await self.dispatcher.dispatch( + Event(EventType.DISCOVER_RESPONSE, ndr, attributes) + ) + + else: + logger.debug(f"Unhandled data received {data}") + logger.debug(f"Unhandled packet type: {packet_type_value}") + except Exception as e: + logger.error( + "handle_rx parse error: %s: %s | raw=%s\n%s", + type(e).__name__, + e, + data.hex(), + traceback.format_exc(), + )