diff --git a/src/meshcore/meshcore_parser.py b/src/meshcore/meshcore_parser.py index 2139663..046709a 100644 --- a/src/meshcore/meshcore_parser.py +++ b/src/meshcore/meshcore_parser.py @@ -42,8 +42,30 @@ class MeshcorePacketParser: Returns : completed log_data """ + # Minimum viable payload is 2 bytes (1 header + 1 path_byte) for a + # direct route. Anything shorter is provably broken — for example, + # the LOG_DATA branch in reader.py only requires `len(data) > 3`, + # which means a 4-byte LOG_DATA frame produces a 1-byte payload + # here, and `path_byte = pbuf.read(1)[0]` further down would raise + # IndexError on the empty buffer. Populate sentinel values so the + # caller's downstream `log_data['route_type']` etc. lookups don't + # KeyError, then return early. + if len(payload) < 2: + logger.debug(f"parsePacketPayload: payload too short ({len(payload)} bytes < 2), returning sentinel log_data") + log_data["route_type"] = -1 + log_data["route_typename"] = "UNK" + log_data["payload_type"] = -1 + log_data["payload_typename"] = "UNK" + log_data["payload_ver"] = 0 + log_data["path_len"] = 0 + log_data["path_hash_size"] = 1 + log_data["path"] = "" + log_data["pkt_payload"] = b"" + log_data["pkt_hash"] = 0 + return log_data + pbuf = io.BytesIO(payload) - + header = pbuf.read(1)[0] route_type = header & 0x03 payload_type = (header & 0x3c) >> 2 @@ -128,7 +150,7 @@ class MeshcorePacketParser: uncrypted = cipher.decrypt(msg) timestamp = int.from_bytes(uncrypted[0:4], "little", signed=False) attempt = uncrypted[4] & 3 - txt_type = int.from_bytes(uncrypted[4:4], "little", signed=False) >> 2 + txt_type = int.from_bytes(uncrypted[4:5], "little", signed=False) >> 2 message = uncrypted[5:].strip(b"\0") msg_hash = int.from_bytes(SHA256.new(timestamp.to_bytes(4, "little", signed=False) + message).digest()[0:4], "little", signed=False) log_data["message"] = message.decode("utf-8", "ignore") @@ -149,39 +171,42 @@ class MeshcorePacketParser: del self.channels_log[:25] elif not payload is None and payload_type == 0x04: # Advert - pk_buf = io.BytesIO(pkt_payload) - adv_key = pk_buf.read(32).hex() - adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False) - signature = pk_buf.read(64).hex() - flags = pk_buf.read(1)[0] - adv_type = flags & 0x0F - adv_lat = None - adv_lon = None - adv_feat1 = None - adv_feat2 = None - if flags & 0x10 > 0: #has location - adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 - adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 - if flags & 0x20 > 0: #has feature1 - adv_feat1 = pk_buf.read(2).hex() - if flags & 0x40 > 0: #has feature2 - adv_feat2 = pk_buf.read(2).hex() - if flags & 0x80 > 0: #has name - adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00") - log_data["adv_name"] = adv_name + try: + pk_buf = io.BytesIO(pkt_payload) + adv_key = pk_buf.read(32).hex() + adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False) + signature = pk_buf.read(64).hex() + flags = pk_buf.read(1)[0] + adv_type = flags & 0x0F + adv_lat = None + adv_lon = None + adv_feat1 = None + adv_feat2 = None + if flags & 0x10 > 0: #has location + adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 + adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 + if flags & 0x20 > 0: #has feature1 + adv_feat1 = pk_buf.read(2).hex() + if flags & 0x40 > 0: #has feature2 + adv_feat2 = pk_buf.read(2).hex() + if flags & 0x80 > 0: #has name + adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00") + log_data["adv_name"] = adv_name - log_data["adv_key"] = adv_key - log_data["adv_timestamp"] = adv_timestamp - log_data["signature"] = signature - log_data["adv_flags"] = flags - log_data["adv_type"] = adv_type - if not adv_lat is None : - log_data["adv_lat"] = adv_lat - if not adv_lon is None : - log_data["adv_lon"] = adv_lon - if not adv_feat1 is None: - log_data["adv_feat1"] = adv_feat1 - if not adv_feat2 is None: - log_data["adv_feat2"] = adv_feat2 + log_data["adv_key"] = adv_key + log_data["adv_timestamp"] = adv_timestamp + log_data["signature"] = signature + log_data["adv_flags"] = flags + log_data["adv_type"] = adv_type + if not adv_lat is None : + log_data["adv_lat"] = adv_lat + if not adv_lon is None : + log_data["adv_lon"] = adv_lon + if not adv_feat1 is None: + log_data["adv_feat1"] = adv_feat1 + if not adv_feat2 is None: + log_data["adv_feat2"] = adv_feat2 + except (IndexError, ValueError) as e: + logger.debug(f"parsePacketPayload: malformed ADVERT payload ({type(e).__name__}: {e}), len={len(pkt_payload)}") return log_data diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 802004a..518eb5c 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -3,6 +3,7 @@ import json import struct import time import io +import traceback from typing import Any, Dict from .events import Event, EventType, EventDispatcher, ErrorMessages from .meshcore_parser import MeshcorePacketParser @@ -69,853 +70,887 @@ class MessageReader: except IndexError as e: logger.warning(f"Received empty packet: {e}") return - logger.debug(f"Received data: {data.hex()}") + try: + logger.debug(f"Received data: {data.hex()}") - # Handle command responses - if packet_type_value == PacketType.OK.value: - result: Dict[str, Any] = {} - if len(data) == 5: - result["value"] = int.from_bytes(data[1:5], byteorder="little") + # Handle command responses + if packet_type_value == PacketType.OK.value: + result: Dict[str, Any] = {} + if len(data) == 5: + result["value"] = int.from_bytes(data[1:5], byteorder="little") - # Dispatch event for the OK response - await self.dispatcher.dispatch(Event(EventType.OK, result)) + # Dispatch event for the OK response + await self.dispatcher.dispatch(Event(EventType.OK, result)) - elif packet_type_value == PacketType.ERROR.value: - if len(data) > 1: - result = { "error_code": data[1], } - if data[1] in ErrorMessages: - result["code_string"] = ErrorMessages[data[1]] - else: - result = {} - - # Dispatch event for the ERROR response - await self.dispatcher.dispatch(Event(EventType.ERROR, result)) - - elif packet_type_value == PacketType.CONTACT_START.value: - self.contact_nb = int.from_bytes(data[1:5], byteorder="little") - self.contacts = {} - - elif ( - packet_type_value == PacketType.CONTACT.value - or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value - ): - c = {} - c["public_key"] = dbuf.read(32).hex() - c["type"] = dbuf.read(1)[0] - c["flags"] = dbuf.read(1)[0] - plen = int.from_bytes(dbuf.read(1), signed=False, byteorder="little") - if plen == 255: # flood - c["out_path_hash_mode"] = -1 - c["out_path_len"] = -1 # 6 LSB - else: - c["out_path_hash_mode"] = plen >> 6 - c["out_path_len"] = plen & 0x3F # 6 LSB - c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() - c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") - c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") - c["adv_lat"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - c["adv_lon"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") - - if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: - await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) - else: - await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) - self.contacts[c["public_key"]] = c - - elif packet_type_value == PacketType.ADVERT_PATH.value : - r = {} - r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) - plen = int.from_bytes(dbuf.read(1), "little", signed=False) - if plen == 255: # flood, should not happen - r["path_hash_mode"] = -1 - r["path_len"] = -1 - else: - r["path_hash_mode"] = plen >> 6 # 2 upper bytes - r["path_len"] = plen & 0x3F - r["path"] = dbuf.read().replace(b"\0", b"").hex() - - await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) - - elif packet_type_value == PacketType.CONTACT_END.value: - lastmod = int.from_bytes(dbuf.read(4), byteorder="little") - attributes = { - "lastmod": lastmod, - } - await self.dispatcher.dispatch( - Event(EventType.CONTACTS, self.contacts, attributes) - ) - - elif packet_type_value == PacketType.SELF_INFO.value: - self_info = {} - self_info["adv_type"] = dbuf.read(1)[0] - self_info["tx_power"] = dbuf.read(1)[0] - self_info["max_tx_power"] = dbuf.read(1)[0] - self_info["public_key"] = dbuf.read(32).hex() - self_info["adv_lat"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - self_info["adv_lon"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - self_info["multi_acks"] = dbuf.read(1)[0] - self_info["adv_loc_policy"] = dbuf.read(1)[0] - telemetry_mode = dbuf.read(1)[0] - self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 - self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 - self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 - self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 - self_info["radio_freq"] = ( - int.from_bytes(dbuf.read(4), byteorder="little") / 1000 - ) - self_info["radio_bw"] = ( - int.from_bytes(dbuf.read(4), byteorder="little") / 1000 - ) - self_info["radio_sf"] = dbuf.read(1)[0] - self_info["radio_cr"] = dbuf.read(1)[0] - self_info["name"] = dbuf.read().decode("utf-8", "ignore") - await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) - - elif packet_type_value == PacketType.MSG_SENT.value: - res = {} - res["type"] = dbuf.read(1)[0] - res["expected_ack"] = dbuf.read(4) - res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") - - attributes = { - "type": res["type"], - "expected_ack": res["expected_ack"].hex(), - } - - await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) - - elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: - res = {} - res["type"] = "PRIV" - res["pubkey_prefix"] = dbuf.read(6).hex() - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - txt_type = dbuf.read(1)[0] - res["txt_type"] = txt_type - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") - if txt_type == 2: - res["signature"] = dbuf.read(4).hex() - res["text"] = dbuf.read().decode("utf-8", "ignore") - - attributes = { - "pubkey_prefix": res["pubkey_prefix"], - "txt_type": res["txt_type"], - } - - evt_type = EventType.CONTACT_MSG_RECV - - await self.dispatcher.dispatch(Event(evt_type, res, attributes)) - - elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "PRIV" - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - dbuf.read(2) # reserved - res["pubkey_prefix"] = dbuf.read(6).hex() - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - txt_type = dbuf.read(1)[0] - res["txt_type"] = txt_type - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") - if txt_type == 2: - res["signature"] = dbuf.read(4).hex() - res["text"] = dbuf.read().decode("utf-8", "ignore") - - attributes = { - "pubkey_prefix": res["pubkey_prefix"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CONTACT_MSG_RECV, res, attributes) - ) - - elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: - res = {} - res["type"] = "CHAN" - res["channel_idx"] = dbuf.read(1)[0] - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - res["txt_type"] = dbuf.read(1)[0] - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) - text = dbuf.read().strip(b"\0") - res["text"] = text.decode("utf-8", "ignore") - - # search for text in log_channels - txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) - if self.decrypt_channels: - logged = await self.packet_parser.findLogChannelMsg(txt_hash) - if not logged is None: - res["path"] = logged["path"] - res["RSSI"] = logged["rssi"] - res["SNR"] = logged["snr"] - res["recv_time"] = logged["recv_time"] - res["attempt"] = logged["attempt"] - - attributes = { - "channel_idx": res["channel_idx"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CHANNEL_MSG_RECV, res, attributes) - ) - - elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "CHAN" - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - dbuf.read(2) # reserved - res["channel_idx"] = dbuf.read(1)[0] - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - res["txt_type"] = dbuf.read(1)[0] - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) - text = dbuf.read() - res["text"] = text.decode("utf-8", "ignore") - - # search for text in log_channels - if self.decrypt_channels: - txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) - res["txt_hash"] = txt_hash - logged = await self.packet_parser.findLogChannelMsg(txt_hash) - - if not logged is None: - res["path"] = logged["path"] - res["RSSI"] = logged["rssi"] - res["recv_time"] = logged["recv_time"] - res["attempt"] = logged["attempt"] - - attributes = { - "channel_idx": res["channel_idx"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CHANNEL_MSG_RECV, res, attributes) - ) - - elif packet_type_value == PacketType.CURRENT_TIME.value: - time_value = int.from_bytes(dbuf.read(4), byteorder="little") - result = {"time": time_value} - await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) - - elif packet_type_value == PacketType.NO_MORE_MSGS.value: - result = {"messages_available": False} - await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) - - elif packet_type_value == PacketType.CONTACT_URI.value: - contact_uri = "meshcore://" + dbuf.read().hex() - result = {"uri": contact_uri} - await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) - - elif packet_type_value == PacketType.BATTERY.value: - battery_level = int.from_bytes(dbuf.read(2), byteorder="little") - result = {"level": battery_level} - if len(data) > 3: # has storage info as well - result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") - result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") - await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) - - elif packet_type_value == PacketType.DEVICE_INFO.value: - res = {} - fw_ver = dbuf.read(1)[0] - res["fw ver"] = fw_ver - if fw_ver >= 3: - res["max_contacts"] = dbuf.read(1)[0] * 2 - res["max_channels"] = dbuf.read(1)[0] - res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") - res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") - res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") - res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") - if fw_ver >= 9: # has repeater mode - rpt = dbuf.read(1) - if len(rpt) > 0: - res["repeat"] = (rpt[0] != 0) - if fw_ver >= 10: # has path_hash_mode - path_hash_mode = dbuf.read(1)[0] - res["path_hash_mode"] = path_hash_mode - await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) - - elif packet_type_value == PacketType.CUSTOM_VARS.value: - logger.debug(f"received custom vars response: {data.hex()}") - res = {} - rawdata = dbuf.read().decode("utf-8", "ignore") - if not rawdata == "": - pairs = rawdata.split(",") - for p in pairs: - psplit = p.split(":") - res[psplit[0]] = psplit[1] - logger.debug(f"got custom vars : {res}") - await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) - - elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) - logger.debug(f"received stats response: {data.hex()}") - # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte - # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) - if len(data) < 2: - logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") - await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) - return - - stats_type = data[1] - - if stats_type == 0: # STATS_TYPE_CORE - # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total - # Format: 1: + result = { "error_code": data[1], } + if data[1] in ErrorMessages: + result["code_string"] = ErrorMessages[data[1]] else: - try: - battery_mv, uptime_secs, errors, queue_len = struct.unpack('> 6 + c["out_path_len"] = plen & 0x3F # 6 LSB + c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() + c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") + c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") + c["adv_lat"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + c["adv_lon"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") + + if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: + await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) else: - try: - recv, sent, flood_tx, direct_tx, flood_rx, direct_rx = struct.unpack('= 30: - (recv_errors,) = struct.unpack('= 0: - res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") - else: - res["channel_name"] = name_bytes.decode("utf-8", "ignore") - - res["channel_secret"] = dbuf.read(16) - res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] - - await self.packet_parser.newChannel(res) - - await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) - - # Push notifications - elif packet_type_value == PacketType.ADVERTISEMENT.value: - logger.debug("Advertisement received") - res = {} - res["public_key"] = dbuf.read(32).hex() - await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) - - elif packet_type_value == PacketType.PATH_UPDATE.value: - logger.debug("Code path update") - res = {} - res["public_key"] = dbuf.read(32).hex() - await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) - - elif packet_type_value == PacketType.ACK.value: - logger.debug("Received ACK") - ack_data = {} - - if len(data) >= 5: - ack_data["code"] = dbuf.read(4).hex() - - attributes = {"code": ack_data.get("code", "")} - - await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) - - elif packet_type_value == PacketType.MESSAGES_WAITING.value: - logger.debug("Msgs are waiting") - await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) - - elif packet_type_value == PacketType.RAW_DATA.value: - res = {} - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - res["payload"] = dbuf.read(4).hex() - logger.debug("Received raw data") - print(res) - await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) - - elif packet_type_value == PacketType.LOGIN_SUCCESS.value: - res = {} - attributes = {} - if len(data) > 1: - perms = dbuf.read(1)[0] - res["permissions"] = perms - res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set - - res["pubkey_prefix"] = dbuf.read(6).hex() - - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} - - await self.dispatcher.dispatch( - Event(EventType.LOGIN_SUCCESS, res, attributes) - ) - - elif packet_type_value == PacketType.LOGIN_FAILED.value: - res = {} - attributes = {} - - dbuf.read(1) - - if len(data) > 7: - res["pubkey_prefix"] = pbuf.read(6).hex() - - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} - - await self.dispatcher.dispatch( - Event(EventType.LOGIN_FAILED, res, attributes) - ) - - elif packet_type_value == PacketType.STATUS_RESPONSE.value: - res = parse_status(data, offset=8) - data_hex = data[8:].hex() - logger.debug(f"Status response: {data_hex}") - - attributes = { - "pubkey_prefix": res["pubkey_pre"], - } - - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.LOG_DATA.value: - logger.debug(f"Received RF log data: {data.hex()}") - - # Parse as raw RX data - log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} - attributes = {} - - recv_time = int(time.time()) - log_data["recv_time"] = recv_time - attributes["recv_time"] = recv_time - - # First byte is SNR (signed byte, multiplied by 4) - if len(data) > 1: - snr_byte = dbuf.read(1)[0] - # Convert to signed value - snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 - log_data["snr"] = snr - - # Second byte is RSSI (signed byte) - if len(data) > 2: - rssi_byte = dbuf.read(1)[0] - # Convert to signed value - rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 - log_data["rssi"] = rssi - - # Remaining bytes are the raw data payload - payload = None - if len(data) > 3: - payload=dbuf.read() - log_data["payload"] = payload.hex() - log_data["payload_length"] = len(payload) - - # Parse payload and get some info from it - log_data = await self.packet_parser.parsePacketPayload(payload, log_data) - attributes['route_type'] = log_data['route_type'] - attributes['payload_type'] = log_data['payload_type'] - attributes['path_len'] = log_data['path_len'] - attributes['path'] = log_data['path'] - - # Dispatch as RF log data - await self.dispatcher.dispatch( - Event(EventType.RX_LOG_DATA, log_data, attributes) - ) - - elif packet_type_value == PacketType.TRACE_DATA.value: - logger.debug(f"Received trace data: {data.hex()}") - res = {} - - # According to the source, format is: - # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr - - reserved = dbuf.read(1)[0] - path_len = dbuf.read(1)[0] - flags = dbuf.read(1)[0] - tag = int.from_bytes(dbuf.read(4), byteorder="little") - auth_code = int.from_bytes(dbuf.read(4), byteorder="little") - - path_hash_len = 1 << (flags&3) - path_len = int(path_len / path_hash_len) - - # Initialize result - res["tag"] = tag - res["auth"] = auth_code - res["flags"] = flags - res["path_len"] = path_len - - # Process path as array of objects with hash and SNR - path_nodes = [] - - if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: - # Extract path with hash and SNR pairs - for i in range(path_len): - node = { - "hash": dbuf.read(path_hash_len).hex(), - } - path_nodes.append(node) - - for n in path_nodes: - node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - n["snr"] = node_snr / 4.0 - - # Add the final node (our device) with its SNR - final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 - path_nodes.append({"snr": final_snr}) - - res["path"] = path_nodes - - logger.debug(f"Parsed trace data: {res}") - - attributes = { - "tag": res["tag"], - "auth_code": res["auth"], - } - - await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) - - elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: - logger.debug(f"Received telemetry data: {data.hex()}") - res = {} - - dbuf.read(1) - - res["pubkey_pre"] = dbuf.read(6).hex() - buf = dbuf.read() - - """Parse a given byte string and return as a LppFrame object.""" - i = 0 - lpp_data_list = [] - while i < len(buf) and buf[i] != 0: - lppdata = LppData.from_bytes(buf[i:]) - lpp_data_list.append(lppdata) - i = i + len(lppdata) - - lpp = json.loads( - json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) - ) - - res["lpp"] = lpp - - attributes = { - "raw": buf.hex(), - "pubkey_prefix": res["pubkey_pre"], - } - - await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: - res = {} - freqs = [] - - cont = True - try: - while cont: - min = int.from_bytes(dbuf.read(4), "little", signed=False) - max = int.from_bytes(dbuf.read(4), "little", signed=False) - if min == 0 or max == 0: - cont = False - else: - freqs.append({"min" : min, "max": max}) - except e: - print(e) - - res["freqs"] = freqs - - await self.dispatcher.dispatch( - Event(EventType.ALLOWED_REPEAT_FREQ, res) - ) - - elif packet_type_value == PacketType.BINARY_RESPONSE.value: - dbuf.read(1) - tag = dbuf.read(4).hex() - response_data = dbuf.read() - logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") - - # Always dispatch generic BINARY_RESPONSE - binary_res = {"tag": tag, "data": response_data.hex()} - await self.dispatcher.dispatch( - Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) - ) - - # Check for tracked request type and dispatch specific response - if tag in self.pending_binary_requests: - request_type = self.pending_binary_requests[tag]["request_type"] - is_anon = self.pending_binary_requests[tag]["is_anon"] - pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] - context = self.pending_binary_requests[tag]["context"] - del self.pending_binary_requests[tag] - logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") - - if not is_anon: - - if request_type == BinaryReqType.STATUS and len(response_data) >= 52: - res = {} - res = parse_status(response_data, pubkey_prefix=pubkey_prefix) - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) - ) - - elif request_type == BinaryReqType.TELEMETRY : - try: - lpp = lpp_parse(response_data) - telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) - ) - except Exception as e: - logger.error(f"Error parsing binary telemetry response: {e}") - - elif request_type == BinaryReqType.MMA: - try: - mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header - mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.MMA_RESPONSE, mma_res, mma_res) - ) - except Exception as e: - logger.error(f"Error parsing binary MMA response: {e}") - - elif request_type == BinaryReqType.ACL: - try: - acl_result = parse_acl(response_data) - acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) - ) - except Exception as e: - logger.error(f"Error parsing binary ACL response: {e}") - - elif request_type == BinaryReqType.NEIGHBOURS: - try: - pk_plen = context["pubkey_prefix_length"] - bbuf = io.BytesIO(response_data) - - res = { - "pubkey_prefix": pubkey_prefix, - "tag": tag - } - res.update(context) # add context in result - - res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) - results_count = int.from_bytes(bbuf.read(2), "little", signed=True) - res["results_count"] = results_count - - neighbours_list = [] - - for _ in range (results_count): - neighb = {} - neighb["pubkey"] = bbuf.read(pk_plen).hex() - neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) - neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 - neighbours_list.append(neighb) - - res["neighbours"] = neighbours_list - - await self.dispatcher.dispatch( - Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) - ) - - except Exception as e: - logger.error(f"Error parsing binary NEIGHBOURS response: {e}") - - else: - logger.debug(f"No tracked request found for binary response tag {tag}") - - elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: - logger.debug(f"Received path discovery response: {data.hex()}") - res = {} - dbuf.read(1) - res["pubkey_pre"] = dbuf.read(6).hex() - opl = dbuf.read(1)[0] - opl_hlen = ((opl & 0xc0) >> 6) + 1 - opl = opl & 0x3f - res["out_path_len"] = opl - res["out_path_hash_len"] = opl_hlen - res["out_path"] = dbuf.read(opl*opl_hlen).hex() - ipl = dbuf.read(1)[0] - ipl_hlen = ((ipl & 0xc0) >> 6) + 1 - ipl = ipl & 0x3f - res["in_path_len"] = ipl - res["in_path_hash_len"] = ipl_hlen - res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() - - attributes = {"pubkey_pre": res["pubkey_pre"]} - - await self.dispatcher.dispatch( - Event(EventType.PATH_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.PRIVATE_KEY.value: - logger.debug(f"Received private key response: {data.hex()}") - if len(data) >= 65: # 1 byte response code + 64 bytes private key - private_key = dbuf.read(64) # Extract 64-byte private key - res = {"private_key": private_key} - await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) - else: - logger.error(f"Invalid private key response length: {len(data)}") - - elif packet_type_value == PacketType.SIGN_START.value: - logger.debug(f"Received sign start response: {data.hex()}") - # Payload: 1 reserved byte, 4-byte max length - dbuf.read(1) - max_len = int.from_bytes(dbuf.read(4), "little") - res = {"max_length": max_len} - await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) - - elif packet_type_value == PacketType.SIGNATURE.value: - logger.debug(f"Received signature: {data.hex()}") - signature = dbuf.read() - res = {"signature": signature} - await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) - - elif packet_type_value == PacketType.DISABLED.value: - logger.debug("Received disabled response") - res = {"reason": "private_key_export_disabled"} - await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) - - elif packet_type_value == PacketType.CONTROL_DATA.value: - logger.debug("Received control data packet") - res={} - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - res["path_len"] = dbuf.read(1)[0] - payload = dbuf.read() - payload_type = payload[0] - res["payload_type"] = payload_type - res["payload"] = payload - - attributes = {"payload_type": payload_type} - await self.dispatcher.dispatch( - Event(EventType.CONTROL_DATA, res, attributes) - ) - - # decode NODE_DISCOVER_RESP - if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: - pbuf = io.BytesIO(payload[1:]) - ndr = dict(res) - del ndr["payload_type"] - del ndr["payload"] - ndr["node_type"] = payload_type & 0x0F - ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 - ndr["tag"] = pbuf.read(4).hex() - - pubkey = pbuf.read() - if len(pubkey) < 32: - pubkey = pubkey[0:8] + await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) + self.contacts[c["public_key"]] = c + + elif packet_type_value == PacketType.ADVERT_PATH.value : + r = {} + r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) + plen = int.from_bytes(dbuf.read(1), "little", signed=False) + if plen == 255: # flood, should not happen + r["path_hash_mode"] = -1 + r["path_len"] = -1 else: - pubkey = pubkey[0:32] + r["path_hash_mode"] = plen >> 6 # 2 upper bytes + r["path_len"] = plen & 0x3F + r["path"] = dbuf.read().replace(b"\0", b"").hex() - ndr["pubkey"] = pubkey.hex() + await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) + + elif packet_type_value == PacketType.CONTACT_END.value: + lastmod = int.from_bytes(dbuf.read(4), byteorder="little") + attributes = { + "lastmod": lastmod, + } + await self.dispatcher.dispatch( + Event(EventType.CONTACTS, self.contacts, attributes) + ) + + elif packet_type_value == PacketType.SELF_INFO.value: + self_info = {} + self_info["adv_type"] = dbuf.read(1)[0] + self_info["tx_power"] = dbuf.read(1)[0] + self_info["max_tx_power"] = dbuf.read(1)[0] + self_info["public_key"] = dbuf.read(32).hex() + self_info["adv_lat"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + self_info["adv_lon"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + self_info["multi_acks"] = dbuf.read(1)[0] + self_info["adv_loc_policy"] = dbuf.read(1)[0] + telemetry_mode = dbuf.read(1)[0] + self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 + self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 + self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 + self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 + self_info["radio_freq"] = ( + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 + ) + self_info["radio_bw"] = ( + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 + ) + self_info["radio_sf"] = dbuf.read(1)[0] + self_info["radio_cr"] = dbuf.read(1)[0] + self_info["name"] = dbuf.read().decode("utf-8", "ignore") + await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) + + elif packet_type_value == PacketType.MSG_SENT.value: + res = {} + res["type"] = dbuf.read(1)[0] + res["expected_ack"] = dbuf.read(4) + res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") attributes = { - "node_type" : ndr["node_type"], - "tag" : ndr["tag"], - "pubkey" : ndr["pubkey"], + "type": res["type"], + "expected_ack": res["expected_ack"].hex(), + } + + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) + + elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: + res = {} + res["type"] = "PRIV" + res["pubkey_prefix"] = dbuf.read(6).hex() + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"], + } + + evt_type = EventType.CONTACT_MSG_RECV + + await self.dispatcher.dispatch(Event(evt_type, res, attributes)) + + elif packet_type_value == PacketType.CONTACT_MSG_RECV_V3.value: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "PRIV" + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + dbuf.read(2) # reserved + res["pubkey_prefix"] = dbuf.read(6).hex() + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"], } await self.dispatcher.dispatch( - Event(EventType.DISCOVER_RESPONSE, ndr, attributes) + Event(EventType.CONTACT_MSG_RECV, res, attributes) ) - else: - logger.debug(f"Unhandled data received {data}") - logger.debug(f"Unhandled packet type: {packet_type_value}") + elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: + res = {} + res["type"] = "CHAN" + res["channel_idx"] = dbuf.read(1)[0] + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) + text = dbuf.read().strip(b"\0") + res["text"] = text.decode("utf-8", "ignore") + + # search for text in log_channels + txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) + if self.decrypt_channels: + logged = await self.packet_parser.findLogChannelMsg(txt_hash) + if not logged is None: + res["path"] = logged["path"] + res["RSSI"] = logged["rssi"] + res["SNR"] = logged["snr"] + res["recv_time"] = logged["recv_time"] + res["attempt"] = logged["attempt"] + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"], + } + + await self.dispatcher.dispatch( + Event(EventType.CHANNEL_MSG_RECV, res, attributes) + ) + + elif packet_type_value == PacketType.CHANNEL_MSG_RECV_V3.value: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "CHAN" + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + dbuf.read(2) # reserved + res["channel_idx"] = dbuf.read(1)[0] + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) + text = dbuf.read() + res["text"] = text.decode("utf-8", "ignore") + + # search for text in log_channels + if self.decrypt_channels: + txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) + res["txt_hash"] = txt_hash + logged = await self.packet_parser.findLogChannelMsg(txt_hash) + + if not logged is None: + res["path"] = logged["path"] + res["RSSI"] = logged["rssi"] + res["recv_time"] = logged["recv_time"] + res["attempt"] = logged["attempt"] + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"], + } + + await self.dispatcher.dispatch( + Event(EventType.CHANNEL_MSG_RECV, res, attributes) + ) + + elif packet_type_value == PacketType.CURRENT_TIME.value: + time_value = int.from_bytes(dbuf.read(4), byteorder="little") + result = {"time": time_value} + await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) + + elif packet_type_value == PacketType.NO_MORE_MSGS.value: + result = {"messages_available": False} + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) + + elif packet_type_value == PacketType.CONTACT_URI.value: + contact_uri = "meshcore://" + dbuf.read().hex() + result = {"uri": contact_uri} + await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) + + elif packet_type_value == PacketType.BATTERY.value: + # Full RESP_CODE_BATT_AND_STORAGE: 1 type + 2 level + 4 used_kb + 4 total_kb = 11 bytes. + # Minimum viable frame is 3 bytes (type + level). Shorter frames are + # malformed — dbuf.read(2) would return short bytes and + # int.from_bytes(b"", ...) silently yields 0 (same class as N07). + if len(data) < 3: + logger.debug( + "BATTERY frame too short for level field " + f"({len(data)} bytes < 3), skipping" + ) + return + battery_level = int.from_bytes(dbuf.read(2), byteorder="little") + result = {"level": battery_level} + # The previous `len(data) > 3` guard let 4-10 byte truncated frames + # through, producing silent zero values for used_kb/total_kb because + # io.BytesIO.read() returns short data without raising. + if len(data) >= 11: # has storage info as well + result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) + + elif packet_type_value == PacketType.DEVICE_INFO.value: + res = {} + fw_ver = dbuf.read(1)[0] + res["fw ver"] = fw_ver + if fw_ver >= 3: + res["max_contacts"] = dbuf.read(1)[0] * 2 + res["max_channels"] = dbuf.read(1)[0] + res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") + res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") + res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") + res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") + if fw_ver >= 9: # has repeater mode + rpt = dbuf.read(1) + if len(rpt) > 0: + res["repeat"] = (rpt[0] != 0) + if fw_ver >= 10: # has path_hash_mode + path_hash_mode = dbuf.read(1)[0] + res["path_hash_mode"] = path_hash_mode + await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) + + elif packet_type_value == PacketType.CUSTOM_VARS.value: + logger.debug(f"received custom vars response: {data.hex()}") + res = {} + rawdata = dbuf.read().decode("utf-8", "ignore") + if not rawdata == "": + pairs = rawdata.split(",") + for p in pairs: + psplit = p.split(":") + res[psplit[0]] = psplit[1] + logger.debug(f"got custom vars : {res}") + await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) + + elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) + logger.debug(f"received stats response: {data.hex()}") + # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte + # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) + if len(data) < 2: + logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") + await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) + return + + stats_type = data[1] + + if stats_type == 0: # STATS_TYPE_CORE + # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total + # Format: = 30: + (recv_errors,) = struct.unpack('= 0: + res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") + else: + res["channel_name"] = name_bytes.decode("utf-8", "ignore") + + res["channel_secret"] = dbuf.read(16) + res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] + + await self.packet_parser.newChannel(res) + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) + + # Push notifications + elif packet_type_value == PacketType.ADVERTISEMENT.value: + logger.debug("Advertisement received") + res = {} + res["public_key"] = dbuf.read(32).hex() + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) + + elif packet_type_value == PacketType.PATH_UPDATE.value: + logger.debug("Code path update") + res = {} + res["public_key"] = dbuf.read(32).hex() + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) + + elif packet_type_value == PacketType.ACK.value: + logger.debug("Received ACK") + ack_data = {} + + if len(data) >= 5: + ack_data["code"] = dbuf.read(4).hex() + + attributes = {"code": ack_data.get("code", "")} + + await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) + + elif packet_type_value == PacketType.MESSAGES_WAITING.value: + logger.debug("Msgs are waiting") + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) + + elif packet_type_value == PacketType.RAW_DATA.value: + res = {} + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + res["payload"] = dbuf.read(4).hex() + logger.debug("Received raw data") + logger.debug(res) + await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) + + elif packet_type_value == PacketType.LOGIN_SUCCESS.value: + res = {} + attributes = {} + if len(data) > 1: + perms = dbuf.read(1)[0] + res["permissions"] = perms + res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set + + res["pubkey_prefix"] = dbuf.read(6).hex() + + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + + await self.dispatcher.dispatch( + Event(EventType.LOGIN_SUCCESS, res, attributes) + ) + + elif packet_type_value == PacketType.LOGIN_FAILED.value: + res = {} + attributes = {} + + dbuf.read(1) + + if len(data) > 7: + res["pubkey_prefix"] = dbuf.read(6).hex() + + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + + await self.dispatcher.dispatch( + Event(EventType.LOGIN_FAILED, res, attributes) + ) + + elif packet_type_value == PacketType.STATUS_RESPONSE.value: + # parse_status with offset=8 reads up through data[56:60] + # (rx_airtime field), so the full payload is 60 bytes: + # 1 type + 1 reserved + 6 pubkey + 52 status fields. The + # BINARY_RESPONSE STATUS path below gates with `>= 52` on + # the offset-stripped buffer; this gate is the equivalent + # for the push path with the 8-byte header included. + if len(data) < 60: + logger.debug(f"STATUS_RESPONSE push frame too short ({len(data)} bytes < 60), skipping parse") + return + res = parse_status(data, offset=8) + data_hex = data[8:].hex() + logger.debug(f"Status response: {data_hex}") + + attributes = { + "pubkey_prefix": res["pubkey_pre"], + } + + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.LOG_DATA.value: + logger.debug(f"Received RF log data: {data.hex()}") + + # Parse as raw RX data + log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} + attributes = {} + + recv_time = int(time.time()) + log_data["recv_time"] = recv_time + attributes["recv_time"] = recv_time + + # First byte is SNR (signed byte, multiplied by 4) + if len(data) > 1: + snr_byte = dbuf.read(1)[0] + # Convert to signed value + snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 + log_data["snr"] = snr + + # Second byte is RSSI (signed byte) + if len(data) > 2: + rssi_byte = dbuf.read(1)[0] + # Convert to signed value + rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 + log_data["rssi"] = rssi + + # Remaining bytes are the raw data payload + payload = None + if len(data) > 3: + payload=dbuf.read() + log_data["payload"] = payload.hex() + log_data["payload_length"] = len(payload) + + # Parse payload and get some info from it + log_data = await self.packet_parser.parsePacketPayload(payload, log_data) + attributes['route_type'] = log_data['route_type'] + attributes['payload_type'] = log_data['payload_type'] + attributes['path_len'] = log_data['path_len'] + attributes['path'] = log_data['path'] + + # Dispatch as RF log data + await self.dispatcher.dispatch( + Event(EventType.RX_LOG_DATA, log_data, attributes) + ) + + elif packet_type_value == PacketType.TRACE_DATA.value: + logger.debug(f"Received trace data: {data.hex()}") + res = {} + + # According to the source, format is: + # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr + + reserved = dbuf.read(1)[0] + path_len = dbuf.read(1)[0] + flags = dbuf.read(1)[0] + tag = int.from_bytes(dbuf.read(4), byteorder="little") + auth_code = int.from_bytes(dbuf.read(4), byteorder="little") + + path_hash_len = 1 << (flags&3) + path_len = int(path_len / path_hash_len) + + # Initialize result + res["tag"] = tag + res["auth"] = auth_code + res["flags"] = flags + res["path_len"] = path_len + + # Process path as array of objects with hash and SNR + path_nodes = [] + + if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: + # Extract path with hash and SNR pairs + for i in range(path_len): + node = { + "hash": dbuf.read(path_hash_len).hex(), + } + path_nodes.append(node) + + for n in path_nodes: + node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + n["snr"] = node_snr / 4.0 + + # Add the final node (our device) with its SNR + final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 + path_nodes.append({"snr": final_snr}) + + res["path"] = path_nodes + + logger.debug(f"Parsed trace data: {res}") + + attributes = { + "tag": res["tag"], + "auth_code": res["auth"], + } + + await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) + + elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: + logger.debug(f"Received telemetry data: {data.hex()}") + res = {} + + dbuf.read(1) + + res["pubkey_pre"] = dbuf.read(6).hex() + buf = dbuf.read() + + """Parse a given byte string and return as a LppFrame object.""" + i = 0 + lpp_data_list = [] + while i < len(buf) and buf[i] != 0: + lppdata = LppData.from_bytes(buf[i:]) + lpp_data_list.append(lppdata) + i = i + len(lppdata) + + lpp = json.loads( + json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) + ) + + res["lpp"] = lpp + + attributes = { + "raw": buf.hex(), + "pubkey_prefix": res["pubkey_pre"], + } + + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: + res = {} + freqs = [] + + cont = True + try: + while cont: + min = int.from_bytes(dbuf.read(4), "little", signed=False) + max = int.from_bytes(dbuf.read(4), "little", signed=False) + if min == 0 or max == 0: + cont = False + else: + freqs.append({"min" : min, "max": max}) + except Exception as e: + logger.warning(f"Error parsing ALLOWED_REPEAT_FREQ payload: {e}") + + res["freqs"] = freqs + + await self.dispatcher.dispatch( + Event(EventType.ALLOWED_REPEAT_FREQ, res) + ) + + elif packet_type_value == PacketType.BINARY_RESPONSE.value: + dbuf.read(1) + tag = dbuf.read(4).hex() + response_data = dbuf.read() + logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") + + # Always dispatch generic BINARY_RESPONSE + binary_res = {"tag": tag, "data": response_data.hex()} + await self.dispatcher.dispatch( + Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) + ) + + # Check for tracked request type and dispatch specific response + if tag in self.pending_binary_requests: + request_type = self.pending_binary_requests[tag]["request_type"] + is_anon = self.pending_binary_requests[tag]["is_anon"] + pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] + context = self.pending_binary_requests[tag]["context"] + del self.pending_binary_requests[tag] + logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") + + if not is_anon: + + if request_type == BinaryReqType.STATUS and len(response_data) >= 52: + res = {} + res = parse_status(response_data, pubkey_prefix=pubkey_prefix) + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) + ) + + elif request_type == BinaryReqType.TELEMETRY : + try: + lpp = lpp_parse(response_data) + telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) + ) + except Exception as e: + logger.error(f"Error parsing binary telemetry response: {e}") + + elif request_type == BinaryReqType.MMA: + try: + mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header + mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.MMA_RESPONSE, mma_res, mma_res) + ) + except Exception as e: + logger.error(f"Error parsing binary MMA response: {e}") + + elif request_type == BinaryReqType.ACL: + try: + acl_result = parse_acl(response_data) + acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + except Exception as e: + logger.error(f"Error parsing binary ACL response: {e}") + + elif request_type == BinaryReqType.NEIGHBOURS: + try: + pk_plen = context["pubkey_prefix_length"] + bbuf = io.BytesIO(response_data) + + res = { + "pubkey_prefix": pubkey_prefix, + "tag": tag + } + res.update(context) # add context in result + + res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) + results_count = int.from_bytes(bbuf.read(2), "little", signed=True) + res["results_count"] = results_count + + neighbours_list = [] + + for _ in range (results_count): + neighb = {} + neighb["pubkey"] = bbuf.read(pk_plen).hex() + neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) + neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 + neighbours_list.append(neighb) + + res["neighbours"] = neighbours_list + + await self.dispatcher.dispatch( + Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + + except Exception as e: + logger.error(f"Error parsing binary NEIGHBOURS response: {e}") + + else: + logger.debug(f"No tracked request found for binary response tag {tag}") + + elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: + logger.debug(f"Received path discovery response: {data.hex()}") + res = {} + dbuf.read(1) + res["pubkey_pre"] = dbuf.read(6).hex() + opl = dbuf.read(1)[0] + opl_hlen = ((opl & 0xc0) >> 6) + 1 + opl = opl & 0x3f + res["out_path_len"] = opl + res["out_path_hash_len"] = opl_hlen + res["out_path"] = dbuf.read(opl*opl_hlen).hex() + ipl = dbuf.read(1)[0] + ipl_hlen = ((ipl & 0xc0) >> 6) + 1 + ipl = ipl & 0x3f + res["in_path_len"] = ipl + res["in_path_hash_len"] = ipl_hlen + res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() + + attributes = {"pubkey_pre": res["pubkey_pre"]} + + await self.dispatcher.dispatch( + Event(EventType.PATH_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.PRIVATE_KEY.value: + logger.debug(f"Received private key response: {data.hex()}") + if len(data) >= 65: # 1 byte response code + 64 bytes private key + private_key = dbuf.read(64) # Extract 64-byte private key + res = {"private_key": private_key} + await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) + else: + logger.error(f"Invalid private key response length: {len(data)}") + + elif packet_type_value == PacketType.SIGN_START.value: + logger.debug(f"Received sign start response: {data.hex()}") + # Payload: 1 reserved byte, 4-byte max length + dbuf.read(1) + max_len = int.from_bytes(dbuf.read(4), "little") + res = {"max_length": max_len} + await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) + + elif packet_type_value == PacketType.SIGNATURE.value: + logger.debug(f"Received signature: {data.hex()}") + signature = dbuf.read() + res = {"signature": signature} + await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) + + elif packet_type_value == PacketType.DISABLED.value: + logger.debug("Received disabled response") + res = {"reason": "private_key_export_disabled"} + await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) + + elif packet_type_value == PacketType.CONTROL_DATA.value: + logger.debug("Received control data packet") + res={} + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + res["path_len"] = dbuf.read(1)[0] + payload = dbuf.read() + if len(payload) == 0: + logger.debug("CONTROL_DATA frame has empty payload, skipping") + return + payload_type = payload[0] + res["payload_type"] = payload_type + res["payload"] = payload + + attributes = {"payload_type": payload_type} + await self.dispatcher.dispatch( + Event(EventType.CONTROL_DATA, res, attributes) + ) + + # decode NODE_DISCOVER_RESP + if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: + pbuf = io.BytesIO(payload[1:]) + ndr = dict(res) + del ndr["payload_type"] + del ndr["payload"] + ndr["node_type"] = payload_type & 0x0F + ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 + ndr["tag"] = pbuf.read(4).hex() + + pubkey = pbuf.read() + if len(pubkey) < 32: + pubkey = pubkey[0:8] + else: + pubkey = pubkey[0:32] + + ndr["pubkey"] = pubkey.hex() + + attributes = { + "node_type" : ndr["node_type"], + "tag" : ndr["tag"], + "pubkey" : ndr["pubkey"], + } + + await self.dispatcher.dispatch( + Event(EventType.DISCOVER_RESPONSE, ndr, attributes) + ) + + else: + logger.debug(f"Unhandled data received {data}") + logger.debug(f"Unhandled packet type: {packet_type_value}") + except Exception as e: + logger.error( + "handle_rx parse error: %s: %s | raw=%s\n%s", + type(e).__name__, + e, + data.hex(), + traceback.format_exc(), + ) diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py index 39bb8ac..758968e 100644 --- a/tests/unit/test_reader.py +++ b/tests/unit/test_reader.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import asyncio +import logging from unittest.mock import AsyncMock from meshcore.events import EventType from meshcore.reader import MessageReader @@ -87,4 +88,193 @@ async def test_binary_response(): print(f"⚠️ Unknown request type {request_type}, no specific event expected") if __name__ == "__main__": - asyncio.run(test_binary_response()) \ No newline at end of file + asyncio.run(test_binary_response()) + + +# --------------------------------------------------------------------------- +# Reader/parser crash-safety verification tests +# --------------------------------------------------------------------------- + +class _CapturingDispatcher: + """Quiet dispatcher that records every dispatched event.""" + def __init__(self): + self.events = [] + + async def dispatch(self, event): + self.events.append(event) + + +@pytest.mark.asyncio +async def test_handle_rx_malformed_frame_logged_and_swallowed(caplog): + """Malformed frame must not propagate, must be logged with traceback.""" + dispatcher = _CapturingDispatcher() + reader = MessageReader(dispatcher) + + # 4-byte CHANNEL_MSG_RECV_V3 frame: type byte (0x11) + 1 SNR byte + + # 2 reserved bytes, but no channel_idx byte. The handler will raise + # IndexError on the next dbuf.read(1)[0] when the buffer is empty. + # The umbrella try/except must catch it, log the parse error, and + # return cleanly. + malformed = bytearray.fromhex("11100000") + + with caplog.at_level(logging.ERROR, logger="meshcore"): + await reader.handle_rx(malformed) # must not raise + + error_records = [r for r in caplog.records if "handle_rx parse error" in r.message] + assert error_records, ( + f"Expected an error log containing 'handle_rx parse error'; " + f"got: {[r.message for r in caplog.records]}" + ) + # Traceback should be present in the log message + assert "Traceback" in error_records[0].message, ( + "Umbrella log message must include a traceback" + ) + # No CHANNEL_MSG_RECV event should have been dispatched + assert not any(e.type == EventType.CHANNEL_MSG_RECV for e in dispatcher.events) + + +@pytest.mark.asyncio +async def test_battery_short_frame_omits_storage_fields(): + """Short BATTERY frame must not silently yield zero used_kb/total_kb.""" + dispatcher = _CapturingDispatcher() + reader = MessageReader(dispatcher) + + # 3-byte BATTERY frame: type 0x0c + 2 level bytes (no storage tail). + # Pre-fix the `len(data) > 3` gate would have let any frame >= 4 bytes + # through, producing a BATTERY event with bogus zero used_kb/total_kb + # because io.BytesIO.read() returns short data without raising. + # Post-fix (`len(data) >= 11`) the storage fields are skipped entirely. + short_battery = bytearray.fromhex("0c8000") + + await reader.handle_rx(short_battery) + + battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY] + assert len(battery_events) == 1, ( + f"Expected exactly one BATTERY event, got {len(battery_events)}" + ) + payload = battery_events[0].payload + assert payload["level"] == 0x0080, f"Unexpected level: {payload['level']}" + assert "used_kb" not in payload, ( + "Short BATTERY frame must not include used_kb (would be a silent zero)" + ) + assert "total_kb" not in payload, ( + "Short BATTERY frame must not include total_kb (would be a silent zero)" + ) + + +@pytest.mark.asyncio +async def test_battery_too_short_for_level(caplog): + """BATTERY frame shorter than 3 bytes must be dropped entirely (Option B). + + A 1-byte frame (just the packet-type byte 0x0c, no level bytes) would cause + dbuf.read(2) to return b"" and int.from_bytes(b"", ...) to silently yield 0. + The fix adds an early return with a debug log. + """ + dispatcher = _CapturingDispatcher() + reader = MessageReader(dispatcher) + + # 1-byte BATTERY frame: only the type byte, no level payload. + too_short = bytearray.fromhex("0c") + + with caplog.at_level(logging.DEBUG, logger="meshcore"): + await reader.handle_rx(too_short) + + battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY] + assert len(battery_events) == 0, ( + "BATTERY frame shorter than 3 bytes must not dispatch an event" + ) + debug_records = [ + r for r in caplog.records if "BATTERY frame too short" in r.message + ] + assert debug_records, "Expected a debug log about the short BATTERY frame" + + +@pytest.mark.asyncio +async def test_status_response_short_frame_skipped(caplog): + """Short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros.""" + dispatcher = _CapturingDispatcher() + reader = MessageReader(dispatcher) + + # 30-byte STATUS_RESPONSE push frame, well below the 60-byte minimum. + # First byte is the type (0x87 = PacketType.STATUS_RESPONSE), the rest + # is arbitrary filler. parse_status with offset=8 reads up through + # data[56:60], so anything < 60 bytes would yield short reads and + # silent zero values pre-fix. + short_status = bytearray([0x87] + [0xAA] * 29) + assert len(short_status) == 30 + + with caplog.at_level(logging.DEBUG, logger="meshcore"): + await reader.handle_rx(short_status) + + status_events = [e for e in dispatcher.events if e.type == EventType.STATUS_RESPONSE] + assert len(status_events) == 0, ( + "Short STATUS_RESPONSE push frame must not dispatch a parsed event" + ) + assert any( + "STATUS_RESPONSE push frame too short" in r.message for r in caplog.records + ), "Expected a debug log line for short STATUS_RESPONSE frames" + + +@pytest.mark.asyncio +async def test_parse_packet_payload_txt_type_decodes_high_bits(): + """txt_type must decode the high 6 bits of byte 4, not always be 0.""" + from Crypto.Cipher import AES + from Crypto.Hash import HMAC, SHA256 + from meshcore.meshcore_parser import MeshcorePacketParser + + parser = MeshcorePacketParser() + parser.decrypt_channels = True + + # Set up a synthetic channel with a known 16-byte AES key. Direct dict + # assignment matches how the parser stores channels (newChannel is async + # and serves the same purpose). + channel_secret = b"\x01" * 16 + channel_hash_byte = 0xAB + parser.channels[0] = { + "channel_idx": 0, + "channel_name": "test", + "channel_hash": "ab", + "channel_secret": channel_secret, + } + + # 16-byte plaintext (one AES block): + # bytes 0-3 = sender_timestamp (little-endian) + # byte 4 = (txt_type << 2) | attempt + # bytes 5-15 = message + null padding + # Pick txt_type=5, attempt=1 → byte 4 = (5 << 2) | 1 = 0x15. + # Pre-fix uncrypted[4:4] is empty so txt_type would be 0; + # post-fix uncrypted[4:5] yields 0x15 >> 2 = 5. + plaintext = b"\x00\x00\x00\x00\x15hello\x00\x00\x00\x00\x00\x00" + assert len(plaintext) == 16 + + encrypted = AES.new(channel_secret, AES.MODE_ECB).encrypt(plaintext) + + # cipher_mac = first 2 bytes of HMAC-SHA256(channel_secret, encrypted) + h = HMAC.new(channel_secret, digestmod=SHA256) + h.update(encrypted) + cipher_mac = h.digest()[:2] + + # pkt_payload layout: 1-byte chan_hash + 2-byte cipher_mac + ciphertext + pkt_payload = bytes([channel_hash_byte]) + cipher_mac + encrypted + + # parsePacketPayload expects the full payload buffer: + # header byte (route_type=1 DIRECT, payload_type=5 channel, ver=0) + # path_byte (path_len=0, path_hash_size=1) → 0x00 + # pkt_payload + header = 0x15 # route_type=1, payload_type=5, payload_ver=0 + path_byte = 0x00 + payload = bytes([header, path_byte]) + pkt_payload + + log_data = await parser.parsePacketPayload(payload, log_data={}) + + assert log_data["payload_type"] == 0x05 + assert "txt_type" in log_data, ( + f"txt_type missing from log_data — channel decrypt path was not reached. " + f"log_data keys: {list(log_data.keys())}" + ) + assert log_data["txt_type"] == 5, ( + f"Expected txt_type=5, got {log_data['txt_type']}" + ) + assert log_data["attempt"] == 1, ( + f"Expected attempt=1, got {log_data['attempt']}" + ) \ No newline at end of file