import logging import json import struct import time import io from typing import Any, Dict from .events import Event, EventType, EventDispatcher, ErrorMessages from .packets import BinaryReqType, PacketType, ControlType from .parsing import lpp_parse, lpp_parse_mma, parse_acl, parse_status from cayennelpp import LppFrame, LppData from meshcore.lpp_json_encoder import lpp_json_encoder from Crypto.Cipher import AES from Crypto.Hash import HMAC, SHA256 logger = logging.getLogger("meshcore") PAYLOAD_TYPENAMES = ["REQ", "RESPONSE", "TEXT_MSG", "ACK", "ADVERT", "GRP_TXT", "GRP_DATA", "ANON_REQ", "PATH", "TRACE", "MULTIPART", "CONTROL"] ROUTE_TYPENAMES = ["TC_FLOOD", "FLOOD", "DIRECT", "TC_DIRECT"] CONTACT_TYPENAMES = ["NONE","CLI","REP","ROOM","SENS"] class MessageReader: def __init__(self, dispatcher: EventDispatcher): self.dispatcher = dispatcher # We're only keeping state here that's needed for processing # before events are dispatched self.contacts = {} # Temporary storage during contact list building self.contact_nb = 0 # Used for contact processing # Track pending binary requests by tag for proper response parsing self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at} self.channels = [{} for _ in range(40)] # keep our own copy of channels, 40 elements by default self.decrypt_channels = True self.channels_log = [] # stores the channel msg events def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float, context={}, is_anon=False): """Register a pending binary request for proper response parsing""" # Clean up expired requests before adding new one self.cleanup_expired_requests() expires_at = time.time() + timeout_seconds self.pending_binary_requests[tag] = { "request_type": request_type, "pubkey_prefix": prefix, "expires_at": expires_at, "is_anon": is_anon, "context": context # optional info we want to keep from req to resp } logger.debug(f"Registered binary request: tag={tag}, type={request_type}, expires in {timeout_seconds}s") def cleanup_expired_requests(self): """Remove expired binary requests""" current_time = time.time() expired_tags = [ tag for tag, info in self.pending_binary_requests.items() if current_time > info["expires_at"] ] for tag in expired_tags: logger.debug(f"Cleaning up expired binary request: tag={tag}") del self.pending_binary_requests[tag] async def handle_rx(self, data: bytearray): dbuf = io.BytesIO(data) try: packet_type_value = dbuf.read(1)[0] except IndexError as e: logger.warning(f"Received empty packet: {e}") return logger.debug(f"Received data: {data.hex()}") # Handle command responses if packet_type_value == PacketType.OK.value: result: Dict[str, Any] = {} if len(data) == 5: result["value"] = int.from_bytes(data[1:5], byteorder="little") # Dispatch event for the OK response await self.dispatcher.dispatch(Event(EventType.OK, result)) elif packet_type_value == PacketType.ERROR.value: if len(data) > 1: result = { "error_code": data[1], "code_string": ErrorMessages[data[1]], } else: result = {} # Dispatch event for the ERROR response await self.dispatcher.dispatch(Event(EventType.ERROR, result)) elif packet_type_value == PacketType.CONTACT_START.value: self.contact_nb = int.from_bytes(data[1:5], byteorder="little") self.contacts = {} elif ( packet_type_value == PacketType.CONTACT.value or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value ): c = {} c["public_key"] = dbuf.read(32).hex() c["type"] = dbuf.read(1)[0] c["flags"] = dbuf.read(1)[0] plen = int.from_bytes(dbuf.read(1), signed=False, byteorder="little") if plen == 255: # flood c["out_path_hash_mode"] = -1 c["out_path_len"] = -1 # 6 LSB else: c["out_path_hash_mode"] = plen >> 6 c["out_path_len"] = plen & 0x3F # 6 LSB c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") c["adv_lat"] = ( int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 ) c["adv_lon"] = ( int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 ) c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) else: await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) self.contacts[c["public_key"]] = c elif packet_type_value == PacketType.ADVERT_PATH.value : r = {} r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) plen = int.from_bytes(dbuf.read(1), "little", signed=False) if plen == 255: # flood, should not happen r["path_hash_mode"] = -1 r["path_len"] = -1 else: r["path_hash_mode"] = plen >> 6 # 2 upper bytes r["path_len"] = plen & 0x3F r["path"] = dbuf.read().replace(b"\0", b"").hex() await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) elif packet_type_value == PacketType.CONTACT_END.value: lastmod = int.from_bytes(dbuf.read(4), byteorder="little") attributes = { "lastmod": lastmod, } await self.dispatcher.dispatch( Event(EventType.CONTACTS, self.contacts, attributes) ) elif packet_type_value == PacketType.SELF_INFO.value: self_info = {} self_info["adv_type"] = dbuf.read(1)[0] self_info["tx_power"] = dbuf.read(1)[0] self_info["max_tx_power"] = dbuf.read(1)[0] self_info["public_key"] = dbuf.read(32).hex() self_info["adv_lat"] = ( int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 ) self_info["adv_lon"] = ( int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 ) self_info["multi_acks"] = dbuf.read(1)[0] self_info["adv_loc_policy"] = dbuf.read(1)[0] telemetry_mode = dbuf.read(1)[0] self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 self_info["radio_freq"] = ( int.from_bytes(dbuf.read(4), byteorder="little") / 1000 ) self_info["radio_bw"] = ( int.from_bytes(dbuf.read(4), byteorder="little") / 1000 ) self_info["radio_sf"] = dbuf.read(1)[0] self_info["radio_cr"] = dbuf.read(1)[0] self_info["name"] = dbuf.read().decode("utf-8", "ignore") await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) elif packet_type_value == PacketType.MSG_SENT.value: res = {} res["type"] = dbuf.read(1)[0] res["expected_ack"] = dbuf.read(4) res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") attributes = { "type": res["type"], "expected_ack": res["expected_ack"].hex(), } await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: res = {} res["type"] = "PRIV" res["pubkey_prefix"] = dbuf.read(6).hex() plen = dbuf.read(1)[0] if plen == 255 : # direct message res["path_hash_mode"] = -1 res["path_len"] = plen else: res["path_hash_mode"] = plen >> 6 res["path_len"] = plen & 0x3F txt_type = dbuf.read(1)[0] res["txt_type"] = txt_type res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") if txt_type == 2: res["signature"] = dbuf.read(4).hex() res["text"] = dbuf.read().decode("utf-8", "ignore") attributes = { "pubkey_prefix": res["pubkey_prefix"], "txt_type": res["txt_type"], } evt_type = EventType.CONTACT_MSG_RECV await self.dispatcher.dispatch(Event(evt_type, res, attributes)) elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} res["type"] = "PRIV" res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 dbuf.read(2) # reserved res["pubkey_prefix"] = dbuf.read(6).hex() plen = dbuf.read(1)[0] if plen == 255 : # direct message res["path_hash_mode"] = -1 res["path_len"] = plen else: res["path_hash_mode"] = plen >> 6 res["path_len"] = plen & 0x3F txt_type = dbuf.read(1)[0] res["txt_type"] = txt_type res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") if txt_type == 2: res["signature"] = dbuf.read(4).hex() res["text"] = dbuf.read().decode("utf-8", "ignore") attributes = { "pubkey_prefix": res["pubkey_prefix"], "txt_type": res["txt_type"], } await self.dispatcher.dispatch( Event(EventType.CONTACT_MSG_RECV, res, attributes) ) elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: res = {} res["type"] = "CHAN" res["channel_idx"] = dbuf.read(1)[0] plen = dbuf.read(1)[0] if plen == 255 : # direct message res["path_hash_mode"] = -1 res["path_len"] = plen else: res["path_hash_mode"] = plen >> 6 res["path_len"] = plen & 0x3F res["txt_type"] = dbuf.read(1)[0] res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) text = dbuf.read().strip(b"\0") res["text"] = text.decode("utf-8", "ignore") # search for text in log_channels txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) if self.decrypt_channels: logged = next((l for l in reversed(self.channels_log) if 'msg_hash' in l and l['msg_hash'] == txt_hash), None) if not logged is None: res["path"] = logged["path"] res["RSSI"] = logged["rssi"] res["SNR"] = logged["snr"] res["recv_time"] = logged["recv_time"] res["attempt"] = logged["attempt"] attributes = { "channel_idx": res["channel_idx"], "txt_type": res["txt_type"], } await self.dispatcher.dispatch( Event(EventType.CHANNEL_MSG_RECV, res, attributes) ) elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} res["type"] = "CHAN" res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 dbuf.read(2) # reserved res["channel_idx"] = dbuf.read(1)[0] plen = dbuf.read(1)[0] if plen == 255 : # direct message res["path_hash_mode"] = -1 res["path_len"] = plen else: res["path_hash_mode"] = plen >> 6 res["path_len"] = plen & 0x3F res["txt_type"] = dbuf.read(1)[0] res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) text = dbuf.read() res["text"] = text.decode("utf-8", "ignore") # search for text in log_channels txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) res["txt_hash"] = txt_hash logged = next((l for l in reversed(self.channels_log) if 'msg_hash' in l and l['msg_hash'] == txt_hash), None) if self.decrypt_channels: if not logged is None: res["path"] = logged["path"] res["RSSI"] = logged["rssi"] res["recv_time"] = logged["recv_time"] res["attempt"] = logged["attempt"] attributes = { "channel_idx": res["channel_idx"], "txt_type": res["txt_type"], } await self.dispatcher.dispatch( Event(EventType.CHANNEL_MSG_RECV, res, attributes) ) elif packet_type_value == PacketType.CURRENT_TIME.value: time_value = int.from_bytes(dbuf.read(4), byteorder="little") result = {"time": time_value} await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) elif packet_type_value == PacketType.NO_MORE_MSGS.value: result = {"messages_available": False} await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) elif packet_type_value == PacketType.CONTACT_URI.value: contact_uri = "meshcore://" + dbuf.read().hex() result = {"uri": contact_uri} await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) elif packet_type_value == PacketType.BATTERY.value: battery_level = int.from_bytes(dbuf.read(2), byteorder="little") result = {"level": battery_level} if len(data) > 3: # has storage info as well result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) elif packet_type_value == PacketType.DEVICE_INFO.value: res = {} fw_ver = dbuf.read(1)[0] res["fw ver"] = fw_ver if fw_ver >= 3: res["max_contacts"] = dbuf.read(1)[0] * 2 res["max_channels"] = dbuf.read(1)[0] res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") if fw_ver >= 9: # has repeater mode rpt = dbuf.read(1) if len(rpt) > 0: res["repeat"] = (rpt[0] != 0) if fw_ver >= 10: # has path_hash_mode path_hash_mode = dbuf.read(1)[0] res["path_hash_mode"] = path_hash_mode await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) elif packet_type_value == PacketType.CUSTOM_VARS.value: logger.debug(f"received custom vars response: {data.hex()}") res = {} rawdata = dbuf.read().decode("utf-8", "ignore") if not rawdata == "": pairs = rawdata.split(",") for p in pairs: psplit = p.split(":") res[psplit[0]] = psplit[1] logger.debug(f"got custom vars : {res}") await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) logger.debug(f"received stats response: {data.hex()}") # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) if len(data) < 2: logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) return stats_type = data[1] if stats_type == 0: # STATS_TYPE_CORE # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total # Format: = 30: (recv_errors,) = struct.unpack('= 0: res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") else: res["channel_name"] = name_bytes.decode("utf-8", "ignore") res["channel_secret"] = dbuf.read(16) res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] self.channels[idx] = res await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) # Push notifications elif packet_type_value == PacketType.ADVERTISEMENT.value: logger.debug("Advertisement received") res = {} res["public_key"] = dbuf.read(32).hex() await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) elif packet_type_value == PacketType.PATH_UPDATE.value: logger.debug("Code path update") res = {} res["public_key"] = dbuf.read(32).hex() await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) elif packet_type_value == PacketType.ACK.value: logger.debug("Received ACK") ack_data = {} if len(data) >= 5: ack_data["code"] = dbuf.read(4).hex() attributes = {"code": ack_data.get("code", "")} await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) elif packet_type_value == PacketType.MESSAGES_WAITING.value: logger.debug("Msgs are waiting") await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) elif packet_type_value == PacketType.RAW_DATA.value: res = {} res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) res["payload"] = dbuf.read(4).hex() logger.debug("Received raw data") print(res) await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) elif packet_type_value == PacketType.LOGIN_SUCCESS.value: res = {} attributes = {} if len(data) > 1: perms = dbuf.read(1)[0] res["permissions"] = perms res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set res["pubkey_prefix"] = dbuf.read(6).hex() attributes = {"pubkey_prefix": res.get("pubkey_prefix")} await self.dispatcher.dispatch( Event(EventType.LOGIN_SUCCESS, res, attributes) ) elif packet_type_value == PacketType.LOGIN_FAILED.value: res = {} attributes = {} pbuf.read(1) if len(data) > 7: res["pubkey_prefix"] = pbuf.read(6).hex() attributes = {"pubkey_prefix": res.get("pubkey_prefix")} await self.dispatcher.dispatch( Event(EventType.LOGIN_FAILED, res, attributes) ) elif packet_type_value == PacketType.STATUS_RESPONSE.value: res = parse_status(data, offset=8) data_hex = data[8:].hex() logger.debug(f"Status response: {data_hex}") attributes = { "pubkey_prefix": res["pubkey_pre"], } await self.dispatcher.dispatch( Event(EventType.STATUS_RESPONSE, res, attributes) ) elif packet_type_value == PacketType.LOG_DATA.value: logger.debug(f"Received RF log data: {data.hex()}") # Parse as raw RX data log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} attributes = {} recv_time = int(time.time()) log_data["recv_time"] = recv_time attributes["recv_time"] = recv_time # First byte is SNR (signed byte, multiplied by 4) if len(data) > 1: snr_byte = dbuf.read(1)[0] # Convert to signed value snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 log_data["snr"] = snr # Second byte is RSSI (signed byte) if len(data) > 2: rssi_byte = dbuf.read(1)[0] # Convert to signed value rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 log_data["rssi"] = rssi # Remaining bytes are the raw data payload payload = None if len(data) > 3: payload=dbuf.read() log_data["payload"] = payload.hex() log_data["payload_length"] = len(payload) # Parse payload and get some info from it if not payload is None: pbuf = io.BytesIO(payload) header = pbuf.read(1)[0] route_type = header & 0x03 payload_type = (header & 0x3c) >> 2 payload_ver = (header & 0xc0) >> 6 transport_code = None if route_type == 0x00 or route_type == 0x03: # has transport code transport_code = pbuf.read(4) # discard transport code path_byte = pbuf.read(1)[0] path_hash_size = ((path_byte & 0xC0) >> 6) + 1 path_len = (path_byte & 0x3F) # here path_len is number of hops, not number of bytes path = pbuf.read(path_len*path_hash_size).hex() # Beware of traces where pathes are mixed try : route_typename = ROUTE_TYPENAMES[route_type] except IndexError: logger.debug(f"Unknown route type {route_type}") route_typename = "UNK" try : payload_typename = PAYLOAD_TYPENAMES[payload_type] except IndexError: logger.debug(f"Unknown payload type {payload_type}") payload_typename = "UNK" pkt_payload = pbuf.read() pkt_hash = int.from_bytes(SHA256.new(pkt_payload).digest()[0:4], "little", signed=False) log_data["header"] = header log_data["route_type"] = route_type attributes["route_type"] = route_type log_data["route_typename"] = route_typename log_data["payload_type"] = payload_type attributes["payload_type"] = payload_type log_data["payload_typename"]= payload_typename log_data["payload_ver"] = payload_ver if not transport_code is None: log_data["transport_code"] = transport_code.hex() log_data["path_len"] = path_len attributes["path_len"] = path_len log_data["path_hash_size"] = path_hash_size log_data["path"] = path attributes["path"] = path log_data["pkt_payload"] = pkt_payload log_data["pkt_hash"] = pkt_hash if not payload is None and payload_type == 0x05: # flood msg / channel pk_buf = io.BytesIO(pkt_payload) chan_hash = pk_buf.read(1).hex() cipher_mac = pk_buf.read(2) msg = pk_buf.read() # until the end of buffer channel = None for c in self.channels: if "channel_hash" in c and c["channel_hash"] == chan_hash : # validate against MAC h = HMAC.new(c["channel_secret"], digestmod=SHA256) h.update(msg) if h.digest()[0:2] == cipher_mac: channel = c break chan_name = "" if channel is None : chan_name = chan_hash else: chan_name = channel["channel_name"] log_data["chan_hash"] = chan_hash log_data["cipher_mac"] = cipher_mac.hex() log_data["crypted"] = msg.hex() log_data["chan_name"] = chan_name if not channel is None and self.decrypt_channels: # search for the same packet logged = next((l for l in reversed(self.channels_log) if 'pkt_hash' in l and l['pkt_hash'] == pkt_hash), None) if logged is None: # not found: decrypt the text and hash it aes_key = channel["channel_secret"] cipher = AES.new(aes_key, AES.MODE_ECB) uncrypted = cipher.decrypt(msg) timestamp = int.from_bytes(uncrypted[0:4], "little", signed=False) attempt = uncrypted[4] & 3 txt_type = int.from_bytes(uncrypted[4:4], "little", signed=False) >> 2 message = uncrypted[5:].strip(b"\0") msg_hash = int.from_bytes(SHA256.new(timestamp.to_bytes(4, "little", signed=False) + message).digest()[0:4], "little", signed=False) log_data["message"] = message.decode("utf-8", "ignore") log_data["msg_hash"] = msg_hash log_data["sender_timestamp"] = timestamp log_data["attempt"] = attempt log_data["txt_type"] = txt_type else: # found: copy log_data["message"] = logged["message"] log_data["msg_hash"] = logged["msg_hash"] log_data["sender_timestamp"] = logged["sender_timestamp"] log_data["attempt"] = logged["attempt"] log_data["txt_type"] = logged["txt_type"] self.channels_log.append(log_data) if len(self.channels_log) > 100: del self.channels_log[:25] elif not payload is None and payload_type == 0x04: # Advert pk_buf = io.BytesIO(pkt_payload) adv_key = pk_buf.read(32).hex() adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False) signature = pk_buf.read(64).hex() flags = pk_buf.read(1)[0] adv_type = flags & 0x0F adv_lat = None adv_lon = None adv_feat1 = None adv_feat2 = None if flags & 0x10 > 0: #has location adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 if flags & 0x20 > 0: #has feature1 adv_feat1 = pk_buf.read(2).hex() if flags & 0x40 > 0: #has feature2 adv_feat2 = pk_buf.read(2).hex() if flags & 0x80 > 0: #has name adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00") log_data["adv_name"] = adv_name log_data["adv_key"] = adv_key log_data["adv_timestamp"] = adv_timestamp log_data["signature"] = signature log_data["adv_flags"] = flags log_data["adv_type"] = adv_type if not adv_lat is None : log_data["adv_lat"] = adv_lat if not adv_lon is None : log_data["adv_lon"] = adv_lon if not adv_feat1 is None: log_data["adv_feat1"] = adv_feat1 if not adv_feat2 is None: log_data["adv_feat2"] = adv_feat2 # Dispatch as RF log data await self.dispatcher.dispatch( Event(EventType.RX_LOG_DATA, log_data, attributes) ) elif packet_type_value == PacketType.TRACE_DATA.value: logger.debug(f"Received trace data: {data.hex()}") res = {} # According to the source, format is: # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr reserved = dbuf.read(1)[0] path_len = dbuf.read(1)[0] flags = dbuf.read(1)[0] tag = int.from_bytes(dbuf.read(4), byteorder="little") auth_code = int.from_bytes(dbuf.read(4), byteorder="little") path_hash_len = 1 << (flags&3) path_len = int(path_len / path_hash_len) # Initialize result res["tag"] = tag res["auth"] = auth_code res["flags"] = flags res["path_len"] = path_len # Process path as array of objects with hash and SNR path_nodes = [] if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: # Extract path with hash and SNR pairs for i in range(path_len): node = { "hash": dbuf.read(path_hash_len).hex(), } path_nodes.append(node) for n in path_nodes: node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) n["snr"] = node_snr / 4.0 # Add the final node (our device) with its SNR final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 path_nodes.append({"snr": final_snr}) res["path"] = path_nodes logger.debug(f"Parsed trace data: {res}") attributes = { "tag": res["tag"], "auth_code": res["auth"], } await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: logger.debug(f"Received telemetry data: {data.hex()}") res = {} dbuf.read(1) res["pubkey_pre"] = dbuf.read(6).hex() buf = dbuf.read() """Parse a given byte string and return as a LppFrame object.""" i = 0 lpp_data_list = [] while i < len(buf) and buf[i] != 0: lppdata = LppData.from_bytes(buf[i:]) lpp_data_list.append(lppdata) i = i + len(lppdata) lpp = json.loads( json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) ) res["lpp"] = lpp attributes = { "raw": buf.hex(), "pubkey_prefix": res["pubkey_pre"], } await self.dispatcher.dispatch( Event(EventType.TELEMETRY_RESPONSE, res, attributes) ) elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: res = {} freqs = [] cont = True try: while cont: min = int.from_bytes(dbuf.read(4), "little", signed=False) max = int.from_bytes(dbuf.read(4), "little", signed=False) if min == 0 or max == 0: cont = False else: freqs.append({"min" : min, "max": max}) except e: print(e) res["freqs"] = freqs await self.dispatcher.dispatch( Event(EventType.ALLOWED_REPEAT_FREQ, res) ) elif packet_type_value == PacketType.BINARY_RESPONSE.value: dbuf.read(1) tag = dbuf.read(4).hex() response_data = dbuf.read() logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") # Always dispatch generic BINARY_RESPONSE binary_res = {"tag": tag, "data": response_data.hex()} await self.dispatcher.dispatch( Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) ) # Check for tracked request type and dispatch specific response if tag in self.pending_binary_requests: request_type = self.pending_binary_requests[tag]["request_type"] is_anon = self.pending_binary_requests[tag]["is_anon"] pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] context = self.pending_binary_requests[tag]["context"] del self.pending_binary_requests[tag] logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") if not is_anon: if request_type == BinaryReqType.STATUS and len(response_data) >= 52: res = {} res = parse_status(response_data, pubkey_prefix=pubkey_prefix) await self.dispatcher.dispatch( Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) ) elif request_type == BinaryReqType.TELEMETRY : try: lpp = lpp_parse(response_data) telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} await self.dispatcher.dispatch( Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) ) except Exception as e: logger.error(f"Error parsing binary telemetry response: {e}") elif request_type == BinaryReqType.MMA: try: mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} await self.dispatcher.dispatch( Event(EventType.MMA_RESPONSE, mma_res, mma_res) ) except Exception as e: logger.error(f"Error parsing binary MMA response: {e}") elif request_type == BinaryReqType.ACL: try: acl_result = parse_acl(response_data) acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} await self.dispatcher.dispatch( Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) ) except Exception as e: logger.error(f"Error parsing binary ACL response: {e}") elif request_type == BinaryReqType.NEIGHBOURS: try: pk_plen = context["pubkey_prefix_length"] bbuf = io.BytesIO(response_data) res = { "pubkey_prefix": pubkey_prefix, "tag": tag } res.update(context) # add context in result res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) results_count = int.from_bytes(bbuf.read(2), "little", signed=True) res["results_count"] = results_count neighbours_list = [] for _ in range (results_count): neighb = {} neighb["pubkey"] = bbuf.read(pk_plen).hex() neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 neighbours_list.append(neighb) res["neighbours"] = neighbours_list await self.dispatcher.dispatch( Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) ) except Exception as e: logger.error(f"Error parsing binary NEIGHBOURS response: {e}") else: logger.debug(f"No tracked request found for binary response tag {tag}") elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: logger.debug(f"Received path discovery response: {data.hex()}") res = {} dbuf.read(1) res["pubkey_pre"] = dbuf.read(6).hex() opl = dbuf.read(1)[0] opl_hlen = ((opl & 0xc0) >> 6) + 1 opl = opl & 0xbf res["out_path_len"] = opl res["out_path_hash_len"] = opl_hlen res["out_path"] = dbuf.read(opl*opl_hlen).hex() ipl = dbuf.read(1)[0] ipl_hlen = ((ipl & 0xc0) >> 6) + 1 ipl = ipl & 0xbf res["in_path_len"] = ipl res["in_path_hash_len"] = ipl_hlen res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() attributes = {"pubkey_pre": res["pubkey_pre"]} await self.dispatcher.dispatch( Event(EventType.PATH_RESPONSE, res, attributes) ) elif packet_type_value == PacketType.PRIVATE_KEY.value: logger.debug(f"Received private key response: {data.hex()}") if len(data) >= 65: # 1 byte response code + 64 bytes private key private_key = dbuf.read(64) # Extract 64-byte private key res = {"private_key": private_key} await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) else: logger.error(f"Invalid private key response length: {len(data)}") elif packet_type_value == PacketType.SIGN_START.value: logger.debug(f"Received sign start response: {data.hex()}") # Payload: 1 reserved byte, 4-byte max length dbuf.read(1) max_len = int.from_bytes(dbuf.read(4), "little") res = {"max_length": max_len} await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) elif packet_type_value == PacketType.SIGNATURE.value: logger.debug(f"Received signature: {data.hex()}") signature = dbuf.read() res = {"signature": signature} await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) elif packet_type_value == PacketType.DISABLED.value: logger.debug("Received disabled response") res = {"reason": "private_key_export_disabled"} await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) elif packet_type_value == PacketType.CONTROL_DATA.value: logger.debug("Received control data packet") res={} res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) res["path_len"] = dbuf.read(1)[0] payload = dbuf.read() payload_type = payload[0] res["payload_type"] = payload_type res["payload"] = payload attributes = {"payload_type": payload_type} await self.dispatcher.dispatch( Event(EventType.CONTROL_DATA, res, attributes) ) # decode NODE_DISCOVER_RESP if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: pbuf = io.BytesIO(payload[1:]) ndr = dict(res) del ndr["payload_type"] del ndr["payload"] ndr["node_type"] = payload_type & 0x0F ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 ndr["tag"] = pbuf.read(4).hex() pubkey = pbuf.read() if len(pubkey) < 32: pubkey = pubkey[0:8] else: pubkey = pubkey[0:32] ndr["pubkey"] = pubkey.hex() attributes = { "node_type" : ndr["node_type"], "tag" : ndr["tag"], "pubkey" : ndr["pubkey"], } await self.dispatcher.dispatch( Event(EventType.DISCOVER_RESPONSE, ndr, attributes) ) else: logger.debug(f"Unhandled data received {data}") logger.debug(f"Unhandled packet type: {packet_type_value}")