From d9197faf3a0daff7bd06ebf297a9f0389526e964 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:06:53 -0700 Subject: [PATCH 01/13] =?UTF-8?q?G1:=20F06=20=E2=80=94=20wrap=20handle=5Fr?= =?UTF-8?q?x=20dispatch=20in=20catch-all=20try/except?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: handle_rx is invoked from a detached task in MessageReader, so any exception escaping its ~850-line if/elif dispatch is silently swallowed by asyncio as "Task exception was never retrieved." The only crash guard previously was a single try/except IndexError around the first byte read; everything past line 73 was unguarded. This commit adds an umbrella try: ... except Exception as e: around the entire dispatch body that logs the exception class, message, raw frame hex, and full traceback via logger.error. The umbrella neutralizes the crash surface of F10, F11, N07, N08, R01, NEW-B, and NEW-C, which the next commits will then fix individually now that they are observable. Refs: Forensics report finding F06 (umbrella crash protection) --- src/meshcore/reader.py | 1680 ++++++++++++++++++++-------------------- 1 file changed, 845 insertions(+), 835 deletions(-) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 802004a..088baeb 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -3,6 +3,7 @@ import json import struct import time import io +import traceback from typing import Any, Dict from .events import Event, EventType, EventDispatcher, ErrorMessages from .meshcore_parser import MeshcorePacketParser @@ -69,853 +70,862 @@ class MessageReader: except IndexError as e: logger.warning(f"Received empty packet: {e}") return - logger.debug(f"Received data: {data.hex()}") + try: + logger.debug(f"Received data: {data.hex()}") - # Handle command responses - if packet_type_value == PacketType.OK.value: - result: Dict[str, Any] = {} - if len(data) == 5: - result["value"] = int.from_bytes(data[1:5], byteorder="little") + # Handle command responses + if packet_type_value == PacketType.OK.value: + result: Dict[str, Any] = {} + if len(data) == 5: + result["value"] = int.from_bytes(data[1:5], byteorder="little") - # Dispatch event for the OK response - await self.dispatcher.dispatch(Event(EventType.OK, result)) + # Dispatch event for the OK response + await self.dispatcher.dispatch(Event(EventType.OK, result)) - elif packet_type_value == PacketType.ERROR.value: - if len(data) > 1: - result = { "error_code": data[1], } - if data[1] in ErrorMessages: - result["code_string"] = ErrorMessages[data[1]] - else: - result = {} - - # Dispatch event for the ERROR response - await self.dispatcher.dispatch(Event(EventType.ERROR, result)) - - elif packet_type_value == PacketType.CONTACT_START.value: - self.contact_nb = int.from_bytes(data[1:5], byteorder="little") - self.contacts = {} - - elif ( - packet_type_value == PacketType.CONTACT.value - or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value - ): - c = {} - c["public_key"] = dbuf.read(32).hex() - c["type"] = dbuf.read(1)[0] - c["flags"] = dbuf.read(1)[0] - plen = int.from_bytes(dbuf.read(1), signed=False, byteorder="little") - if plen == 255: # flood - c["out_path_hash_mode"] = -1 - c["out_path_len"] = -1 # 6 LSB - else: - c["out_path_hash_mode"] = plen >> 6 - c["out_path_len"] = plen & 0x3F # 6 LSB - c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() - c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") - c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") - c["adv_lat"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - c["adv_lon"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") - - if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: - await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) - else: - await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) - self.contacts[c["public_key"]] = c - - elif packet_type_value == PacketType.ADVERT_PATH.value : - r = {} - r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) - plen = int.from_bytes(dbuf.read(1), "little", signed=False) - if plen == 255: # flood, should not happen - r["path_hash_mode"] = -1 - r["path_len"] = -1 - else: - r["path_hash_mode"] = plen >> 6 # 2 upper bytes - r["path_len"] = plen & 0x3F - r["path"] = dbuf.read().replace(b"\0", b"").hex() - - await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) - - elif packet_type_value == PacketType.CONTACT_END.value: - lastmod = int.from_bytes(dbuf.read(4), byteorder="little") - attributes = { - "lastmod": lastmod, - } - await self.dispatcher.dispatch( - Event(EventType.CONTACTS, self.contacts, attributes) - ) - - elif packet_type_value == PacketType.SELF_INFO.value: - self_info = {} - self_info["adv_type"] = dbuf.read(1)[0] - self_info["tx_power"] = dbuf.read(1)[0] - self_info["max_tx_power"] = dbuf.read(1)[0] - self_info["public_key"] = dbuf.read(32).hex() - self_info["adv_lat"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - self_info["adv_lon"] = ( - int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 - ) - self_info["multi_acks"] = dbuf.read(1)[0] - self_info["adv_loc_policy"] = dbuf.read(1)[0] - telemetry_mode = dbuf.read(1)[0] - self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 - self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 - self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 - self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 - self_info["radio_freq"] = ( - int.from_bytes(dbuf.read(4), byteorder="little") / 1000 - ) - self_info["radio_bw"] = ( - int.from_bytes(dbuf.read(4), byteorder="little") / 1000 - ) - self_info["radio_sf"] = dbuf.read(1)[0] - self_info["radio_cr"] = dbuf.read(1)[0] - self_info["name"] = dbuf.read().decode("utf-8", "ignore") - await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) - - elif packet_type_value == PacketType.MSG_SENT.value: - res = {} - res["type"] = dbuf.read(1)[0] - res["expected_ack"] = dbuf.read(4) - res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") - - attributes = { - "type": res["type"], - "expected_ack": res["expected_ack"].hex(), - } - - await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) - - elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: - res = {} - res["type"] = "PRIV" - res["pubkey_prefix"] = dbuf.read(6).hex() - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - txt_type = dbuf.read(1)[0] - res["txt_type"] = txt_type - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") - if txt_type == 2: - res["signature"] = dbuf.read(4).hex() - res["text"] = dbuf.read().decode("utf-8", "ignore") - - attributes = { - "pubkey_prefix": res["pubkey_prefix"], - "txt_type": res["txt_type"], - } - - evt_type = EventType.CONTACT_MSG_RECV - - await self.dispatcher.dispatch(Event(evt_type, res, attributes)) - - elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "PRIV" - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - dbuf.read(2) # reserved - res["pubkey_prefix"] = dbuf.read(6).hex() - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - txt_type = dbuf.read(1)[0] - res["txt_type"] = txt_type - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") - if txt_type == 2: - res["signature"] = dbuf.read(4).hex() - res["text"] = dbuf.read().decode("utf-8", "ignore") - - attributes = { - "pubkey_prefix": res["pubkey_prefix"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CONTACT_MSG_RECV, res, attributes) - ) - - elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: - res = {} - res["type"] = "CHAN" - res["channel_idx"] = dbuf.read(1)[0] - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - res["txt_type"] = dbuf.read(1)[0] - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) - text = dbuf.read().strip(b"\0") - res["text"] = text.decode("utf-8", "ignore") - - # search for text in log_channels - txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) - if self.decrypt_channels: - logged = await self.packet_parser.findLogChannelMsg(txt_hash) - if not logged is None: - res["path"] = logged["path"] - res["RSSI"] = logged["rssi"] - res["SNR"] = logged["snr"] - res["recv_time"] = logged["recv_time"] - res["attempt"] = logged["attempt"] - - attributes = { - "channel_idx": res["channel_idx"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CHANNEL_MSG_RECV, res, attributes) - ) - - elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) - res = {} - res["type"] = "CHAN" - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - dbuf.read(2) # reserved - res["channel_idx"] = dbuf.read(1)[0] - plen = dbuf.read(1)[0] - if plen == 255 : # direct message - res["path_hash_mode"] = -1 - res["path_len"] = plen - else: - res["path_hash_mode"] = plen >> 6 - res["path_len"] = plen & 0x3F - res["txt_type"] = dbuf.read(1)[0] - res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) - text = dbuf.read() - res["text"] = text.decode("utf-8", "ignore") - - # search for text in log_channels - if self.decrypt_channels: - txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) - res["txt_hash"] = txt_hash - logged = await self.packet_parser.findLogChannelMsg(txt_hash) - - if not logged is None: - res["path"] = logged["path"] - res["RSSI"] = logged["rssi"] - res["recv_time"] = logged["recv_time"] - res["attempt"] = logged["attempt"] - - attributes = { - "channel_idx": res["channel_idx"], - "txt_type": res["txt_type"], - } - - await self.dispatcher.dispatch( - Event(EventType.CHANNEL_MSG_RECV, res, attributes) - ) - - elif packet_type_value == PacketType.CURRENT_TIME.value: - time_value = int.from_bytes(dbuf.read(4), byteorder="little") - result = {"time": time_value} - await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) - - elif packet_type_value == PacketType.NO_MORE_MSGS.value: - result = {"messages_available": False} - await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) - - elif packet_type_value == PacketType.CONTACT_URI.value: - contact_uri = "meshcore://" + dbuf.read().hex() - result = {"uri": contact_uri} - await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) - - elif packet_type_value == PacketType.BATTERY.value: - battery_level = int.from_bytes(dbuf.read(2), byteorder="little") - result = {"level": battery_level} - if len(data) > 3: # has storage info as well - result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") - result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") - await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) - - elif packet_type_value == PacketType.DEVICE_INFO.value: - res = {} - fw_ver = dbuf.read(1)[0] - res["fw ver"] = fw_ver - if fw_ver >= 3: - res["max_contacts"] = dbuf.read(1)[0] * 2 - res["max_channels"] = dbuf.read(1)[0] - res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") - res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") - res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") - res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") - if fw_ver >= 9: # has repeater mode - rpt = dbuf.read(1) - if len(rpt) > 0: - res["repeat"] = (rpt[0] != 0) - if fw_ver >= 10: # has path_hash_mode - path_hash_mode = dbuf.read(1)[0] - res["path_hash_mode"] = path_hash_mode - await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) - - elif packet_type_value == PacketType.CUSTOM_VARS.value: - logger.debug(f"received custom vars response: {data.hex()}") - res = {} - rawdata = dbuf.read().decode("utf-8", "ignore") - if not rawdata == "": - pairs = rawdata.split(",") - for p in pairs: - psplit = p.split(":") - res[psplit[0]] = psplit[1] - logger.debug(f"got custom vars : {res}") - await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) - - elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) - logger.debug(f"received stats response: {data.hex()}") - # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte - # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) - if len(data) < 2: - logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") - await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) - return - - stats_type = data[1] - - if stats_type == 0: # STATS_TYPE_CORE - # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total - # Format: 1: + result = { "error_code": data[1], } + if data[1] in ErrorMessages: + result["code_string"] = ErrorMessages[data[1]] else: - try: - battery_mv, uptime_secs, errors, queue_len = struct.unpack('> 6 + c["out_path_len"] = plen & 0x3F # 6 LSB + c["out_path"] = dbuf.read(64).replace(b"\0", b"").hex() + c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") + c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") + c["adv_lat"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + c["adv_lon"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") + + if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: + await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) else: - try: - recv, sent, flood_tx, direct_tx, flood_rx, direct_rx = struct.unpack('= 30: - (recv_errors,) = struct.unpack('= 0: - res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") - else: - res["channel_name"] = name_bytes.decode("utf-8", "ignore") - - res["channel_secret"] = dbuf.read(16) - res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] - - await self.packet_parser.newChannel(res) - - await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) - - # Push notifications - elif packet_type_value == PacketType.ADVERTISEMENT.value: - logger.debug("Advertisement received") - res = {} - res["public_key"] = dbuf.read(32).hex() - await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) - - elif packet_type_value == PacketType.PATH_UPDATE.value: - logger.debug("Code path update") - res = {} - res["public_key"] = dbuf.read(32).hex() - await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) - - elif packet_type_value == PacketType.ACK.value: - logger.debug("Received ACK") - ack_data = {} - - if len(data) >= 5: - ack_data["code"] = dbuf.read(4).hex() - - attributes = {"code": ack_data.get("code", "")} - - await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) - - elif packet_type_value == PacketType.MESSAGES_WAITING.value: - logger.debug("Msgs are waiting") - await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) - - elif packet_type_value == PacketType.RAW_DATA.value: - res = {} - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - res["payload"] = dbuf.read(4).hex() - logger.debug("Received raw data") - print(res) - await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) - - elif packet_type_value == PacketType.LOGIN_SUCCESS.value: - res = {} - attributes = {} - if len(data) > 1: - perms = dbuf.read(1)[0] - res["permissions"] = perms - res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set - - res["pubkey_prefix"] = dbuf.read(6).hex() - - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} - - await self.dispatcher.dispatch( - Event(EventType.LOGIN_SUCCESS, res, attributes) - ) - - elif packet_type_value == PacketType.LOGIN_FAILED.value: - res = {} - attributes = {} - - dbuf.read(1) - - if len(data) > 7: - res["pubkey_prefix"] = pbuf.read(6).hex() - - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} - - await self.dispatcher.dispatch( - Event(EventType.LOGIN_FAILED, res, attributes) - ) - - elif packet_type_value == PacketType.STATUS_RESPONSE.value: - res = parse_status(data, offset=8) - data_hex = data[8:].hex() - logger.debug(f"Status response: {data_hex}") - - attributes = { - "pubkey_prefix": res["pubkey_pre"], - } - - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.LOG_DATA.value: - logger.debug(f"Received RF log data: {data.hex()}") - - # Parse as raw RX data - log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} - attributes = {} - - recv_time = int(time.time()) - log_data["recv_time"] = recv_time - attributes["recv_time"] = recv_time - - # First byte is SNR (signed byte, multiplied by 4) - if len(data) > 1: - snr_byte = dbuf.read(1)[0] - # Convert to signed value - snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 - log_data["snr"] = snr - - # Second byte is RSSI (signed byte) - if len(data) > 2: - rssi_byte = dbuf.read(1)[0] - # Convert to signed value - rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 - log_data["rssi"] = rssi - - # Remaining bytes are the raw data payload - payload = None - if len(data) > 3: - payload=dbuf.read() - log_data["payload"] = payload.hex() - log_data["payload_length"] = len(payload) - - # Parse payload and get some info from it - log_data = await self.packet_parser.parsePacketPayload(payload, log_data) - attributes['route_type'] = log_data['route_type'] - attributes['payload_type'] = log_data['payload_type'] - attributes['path_len'] = log_data['path_len'] - attributes['path'] = log_data['path'] - - # Dispatch as RF log data - await self.dispatcher.dispatch( - Event(EventType.RX_LOG_DATA, log_data, attributes) - ) - - elif packet_type_value == PacketType.TRACE_DATA.value: - logger.debug(f"Received trace data: {data.hex()}") - res = {} - - # According to the source, format is: - # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr - - reserved = dbuf.read(1)[0] - path_len = dbuf.read(1)[0] - flags = dbuf.read(1)[0] - tag = int.from_bytes(dbuf.read(4), byteorder="little") - auth_code = int.from_bytes(dbuf.read(4), byteorder="little") - - path_hash_len = 1 << (flags&3) - path_len = int(path_len / path_hash_len) - - # Initialize result - res["tag"] = tag - res["auth"] = auth_code - res["flags"] = flags - res["path_len"] = path_len - - # Process path as array of objects with hash and SNR - path_nodes = [] - - if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: - # Extract path with hash and SNR pairs - for i in range(path_len): - node = { - "hash": dbuf.read(path_hash_len).hex(), - } - path_nodes.append(node) - - for n in path_nodes: - node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - n["snr"] = node_snr / 4.0 - - # Add the final node (our device) with its SNR - final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 - path_nodes.append({"snr": final_snr}) - - res["path"] = path_nodes - - logger.debug(f"Parsed trace data: {res}") - - attributes = { - "tag": res["tag"], - "auth_code": res["auth"], - } - - await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) - - elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: - logger.debug(f"Received telemetry data: {data.hex()}") - res = {} - - dbuf.read(1) - - res["pubkey_pre"] = dbuf.read(6).hex() - buf = dbuf.read() - - """Parse a given byte string and return as a LppFrame object.""" - i = 0 - lpp_data_list = [] - while i < len(buf) and buf[i] != 0: - lppdata = LppData.from_bytes(buf[i:]) - lpp_data_list.append(lppdata) - i = i + len(lppdata) - - lpp = json.loads( - json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) - ) - - res["lpp"] = lpp - - attributes = { - "raw": buf.hex(), - "pubkey_prefix": res["pubkey_pre"], - } - - await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: - res = {} - freqs = [] - - cont = True - try: - while cont: - min = int.from_bytes(dbuf.read(4), "little", signed=False) - max = int.from_bytes(dbuf.read(4), "little", signed=False) - if min == 0 or max == 0: - cont = False - else: - freqs.append({"min" : min, "max": max}) - except e: - print(e) - - res["freqs"] = freqs - - await self.dispatcher.dispatch( - Event(EventType.ALLOWED_REPEAT_FREQ, res) - ) - - elif packet_type_value == PacketType.BINARY_RESPONSE.value: - dbuf.read(1) - tag = dbuf.read(4).hex() - response_data = dbuf.read() - logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") - - # Always dispatch generic BINARY_RESPONSE - binary_res = {"tag": tag, "data": response_data.hex()} - await self.dispatcher.dispatch( - Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) - ) - - # Check for tracked request type and dispatch specific response - if tag in self.pending_binary_requests: - request_type = self.pending_binary_requests[tag]["request_type"] - is_anon = self.pending_binary_requests[tag]["is_anon"] - pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] - context = self.pending_binary_requests[tag]["context"] - del self.pending_binary_requests[tag] - logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") - - if not is_anon: - - if request_type == BinaryReqType.STATUS and len(response_data) >= 52: - res = {} - res = parse_status(response_data, pubkey_prefix=pubkey_prefix) - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) - ) - - elif request_type == BinaryReqType.TELEMETRY : - try: - lpp = lpp_parse(response_data) - telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) - ) - except Exception as e: - logger.error(f"Error parsing binary telemetry response: {e}") - - elif request_type == BinaryReqType.MMA: - try: - mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header - mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.MMA_RESPONSE, mma_res, mma_res) - ) - except Exception as e: - logger.error(f"Error parsing binary MMA response: {e}") - - elif request_type == BinaryReqType.ACL: - try: - acl_result = parse_acl(response_data) - acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} - await self.dispatcher.dispatch( - Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) - ) - except Exception as e: - logger.error(f"Error parsing binary ACL response: {e}") - - elif request_type == BinaryReqType.NEIGHBOURS: - try: - pk_plen = context["pubkey_prefix_length"] - bbuf = io.BytesIO(response_data) - - res = { - "pubkey_prefix": pubkey_prefix, - "tag": tag - } - res.update(context) # add context in result - - res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) - results_count = int.from_bytes(bbuf.read(2), "little", signed=True) - res["results_count"] = results_count - - neighbours_list = [] - - for _ in range (results_count): - neighb = {} - neighb["pubkey"] = bbuf.read(pk_plen).hex() - neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) - neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 - neighbours_list.append(neighb) - - res["neighbours"] = neighbours_list - - await self.dispatcher.dispatch( - Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) - ) - - except Exception as e: - logger.error(f"Error parsing binary NEIGHBOURS response: {e}") - - else: - logger.debug(f"No tracked request found for binary response tag {tag}") - - elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: - logger.debug(f"Received path discovery response: {data.hex()}") - res = {} - dbuf.read(1) - res["pubkey_pre"] = dbuf.read(6).hex() - opl = dbuf.read(1)[0] - opl_hlen = ((opl & 0xc0) >> 6) + 1 - opl = opl & 0x3f - res["out_path_len"] = opl - res["out_path_hash_len"] = opl_hlen - res["out_path"] = dbuf.read(opl*opl_hlen).hex() - ipl = dbuf.read(1)[0] - ipl_hlen = ((ipl & 0xc0) >> 6) + 1 - ipl = ipl & 0x3f - res["in_path_len"] = ipl - res["in_path_hash_len"] = ipl_hlen - res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() - - attributes = {"pubkey_pre": res["pubkey_pre"]} - - await self.dispatcher.dispatch( - Event(EventType.PATH_RESPONSE, res, attributes) - ) - - elif packet_type_value == PacketType.PRIVATE_KEY.value: - logger.debug(f"Received private key response: {data.hex()}") - if len(data) >= 65: # 1 byte response code + 64 bytes private key - private_key = dbuf.read(64) # Extract 64-byte private key - res = {"private_key": private_key} - await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) - else: - logger.error(f"Invalid private key response length: {len(data)}") - - elif packet_type_value == PacketType.SIGN_START.value: - logger.debug(f"Received sign start response: {data.hex()}") - # Payload: 1 reserved byte, 4-byte max length - dbuf.read(1) - max_len = int.from_bytes(dbuf.read(4), "little") - res = {"max_length": max_len} - await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) - - elif packet_type_value == PacketType.SIGNATURE.value: - logger.debug(f"Received signature: {data.hex()}") - signature = dbuf.read() - res = {"signature": signature} - await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) - - elif packet_type_value == PacketType.DISABLED.value: - logger.debug("Received disabled response") - res = {"reason": "private_key_export_disabled"} - await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) - - elif packet_type_value == PacketType.CONTROL_DATA.value: - logger.debug("Received control data packet") - res={} - res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 - res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) - res["path_len"] = dbuf.read(1)[0] - payload = dbuf.read() - payload_type = payload[0] - res["payload_type"] = payload_type - res["payload"] = payload - - attributes = {"payload_type": payload_type} - await self.dispatcher.dispatch( - Event(EventType.CONTROL_DATA, res, attributes) - ) - - # decode NODE_DISCOVER_RESP - if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: - pbuf = io.BytesIO(payload[1:]) - ndr = dict(res) - del ndr["payload_type"] - del ndr["payload"] - ndr["node_type"] = payload_type & 0x0F - ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 - ndr["tag"] = pbuf.read(4).hex() - - pubkey = pbuf.read() - if len(pubkey) < 32: - pubkey = pubkey[0:8] + await self.dispatcher.dispatch(Event(EventType.NEXT_CONTACT, c)) + self.contacts[c["public_key"]] = c + + elif packet_type_value == PacketType.ADVERT_PATH.value : + r = {} + r["timestamp"] = int.from_bytes(dbuf.read(4), "little", signed=False) + plen = int.from_bytes(dbuf.read(1), "little", signed=False) + if plen == 255: # flood, should not happen + r["path_hash_mode"] = -1 + r["path_len"] = -1 else: - pubkey = pubkey[0:32] + r["path_hash_mode"] = plen >> 6 # 2 upper bytes + r["path_len"] = plen & 0x3F + r["path"] = dbuf.read().replace(b"\0", b"").hex() - ndr["pubkey"] = pubkey.hex() + await self.dispatcher.dispatch(Event(EventType.ADVERT_PATH, r)) + + elif packet_type_value == PacketType.CONTACT_END.value: + lastmod = int.from_bytes(dbuf.read(4), byteorder="little") + attributes = { + "lastmod": lastmod, + } + await self.dispatcher.dispatch( + Event(EventType.CONTACTS, self.contacts, attributes) + ) + + elif packet_type_value == PacketType.SELF_INFO.value: + self_info = {} + self_info["adv_type"] = dbuf.read(1)[0] + self_info["tx_power"] = dbuf.read(1)[0] + self_info["max_tx_power"] = dbuf.read(1)[0] + self_info["public_key"] = dbuf.read(32).hex() + self_info["adv_lat"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + self_info["adv_lon"] = ( + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 + ) + self_info["multi_acks"] = dbuf.read(1)[0] + self_info["adv_loc_policy"] = dbuf.read(1)[0] + telemetry_mode = dbuf.read(1)[0] + self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 + self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 + self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 + self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 + self_info["radio_freq"] = ( + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 + ) + self_info["radio_bw"] = ( + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 + ) + self_info["radio_sf"] = dbuf.read(1)[0] + self_info["radio_cr"] = dbuf.read(1)[0] + self_info["name"] = dbuf.read().decode("utf-8", "ignore") + await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) + + elif packet_type_value == PacketType.MSG_SENT.value: + res = {} + res["type"] = dbuf.read(1)[0] + res["expected_ack"] = dbuf.read(4) + res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") attributes = { - "node_type" : ndr["node_type"], - "tag" : ndr["tag"], - "pubkey" : ndr["pubkey"], + "type": res["type"], + "expected_ack": res["expected_ack"].hex(), + } + + await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res, attributes)) + + elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: + res = {} + res["type"] = "PRIV" + res["pubkey_prefix"] = dbuf.read(6).hex() + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"], + } + + evt_type = EventType.CONTACT_MSG_RECV + + await self.dispatcher.dispatch(Event(evt_type, res, attributes)) + + elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "PRIV" + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + dbuf.read(2) # reserved + res["pubkey_prefix"] = dbuf.read(6).hex() + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") + + attributes = { + "pubkey_prefix": res["pubkey_prefix"], + "txt_type": res["txt_type"], } await self.dispatcher.dispatch( - Event(EventType.DISCOVER_RESPONSE, ndr, attributes) + Event(EventType.CONTACT_MSG_RECV, res, attributes) ) - else: - logger.debug(f"Unhandled data received {data}") - logger.debug(f"Unhandled packet type: {packet_type_value}") + elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: + res = {} + res["type"] = "CHAN" + res["channel_idx"] = dbuf.read(1)[0] + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) + text = dbuf.read().strip(b"\0") + res["text"] = text.decode("utf-8", "ignore") + + # search for text in log_channels + txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) + if self.decrypt_channels: + logged = await self.packet_parser.findLogChannelMsg(txt_hash) + if not logged is None: + res["path"] = logged["path"] + res["RSSI"] = logged["rssi"] + res["SNR"] = logged["snr"] + res["recv_time"] = logged["recv_time"] + res["attempt"] = logged["attempt"] + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"], + } + + await self.dispatcher.dispatch( + Event(EventType.CHANNEL_MSG_RECV, res, attributes) + ) + + elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + res = {} + res["type"] = "CHAN" + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + dbuf.read(2) # reserved + res["channel_idx"] = dbuf.read(1)[0] + plen = dbuf.read(1)[0] + if plen == 255 : # direct message + res["path_hash_mode"] = -1 + res["path_len"] = plen + else: + res["path_hash_mode"] = plen >> 6 + res["path_len"] = plen & 0x3F + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little", signed=False) + text = dbuf.read() + res["text"] = text.decode("utf-8", "ignore") + + # search for text in log_channels + if self.decrypt_channels: + txt_hash = int.from_bytes(SHA256.new(res["sender_timestamp"].to_bytes(4, "little", signed=False)+text).digest()[0:4], "little", signed=False) + res["txt_hash"] = txt_hash + logged = await self.packet_parser.findLogChannelMsg(txt_hash) + + if not logged is None: + res["path"] = logged["path"] + res["RSSI"] = logged["rssi"] + res["recv_time"] = logged["recv_time"] + res["attempt"] = logged["attempt"] + + attributes = { + "channel_idx": res["channel_idx"], + "txt_type": res["txt_type"], + } + + await self.dispatcher.dispatch( + Event(EventType.CHANNEL_MSG_RECV, res, attributes) + ) + + elif packet_type_value == PacketType.CURRENT_TIME.value: + time_value = int.from_bytes(dbuf.read(4), byteorder="little") + result = {"time": time_value} + await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) + + elif packet_type_value == PacketType.NO_MORE_MSGS.value: + result = {"messages_available": False} + await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) + + elif packet_type_value == PacketType.CONTACT_URI.value: + contact_uri = "meshcore://" + dbuf.read().hex() + result = {"uri": contact_uri} + await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) + + elif packet_type_value == PacketType.BATTERY.value: + battery_level = int.from_bytes(dbuf.read(2), byteorder="little") + result = {"level": battery_level} + if len(data) > 3: # has storage info as well + result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) + + elif packet_type_value == PacketType.DEVICE_INFO.value: + res = {} + fw_ver = dbuf.read(1)[0] + res["fw ver"] = fw_ver + if fw_ver >= 3: + res["max_contacts"] = dbuf.read(1)[0] * 2 + res["max_channels"] = dbuf.read(1)[0] + res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") + res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") + res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") + res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") + if fw_ver >= 9: # has repeater mode + rpt = dbuf.read(1) + if len(rpt) > 0: + res["repeat"] = (rpt[0] != 0) + if fw_ver >= 10: # has path_hash_mode + path_hash_mode = dbuf.read(1)[0] + res["path_hash_mode"] = path_hash_mode + await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) + + elif packet_type_value == PacketType.CUSTOM_VARS.value: + logger.debug(f"received custom vars response: {data.hex()}") + res = {} + rawdata = dbuf.read().decode("utf-8", "ignore") + if not rawdata == "": + pairs = rawdata.split(",") + for p in pairs: + psplit = p.split(":") + res[psplit[0]] = psplit[1] + logger.debug(f"got custom vars : {res}") + await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) + + elif packet_type_value == PacketType.STATS.value: # RESP_CODE_STATS (24) + logger.debug(f"received stats response: {data.hex()}") + # RESP_CODE_STATS: All stats responses use code 24 with sub-type byte + # Byte 0: response_code (24), Byte 1: stats_type (0=core, 1=radio, 2=packets) + if len(data) < 2: + logger.error(f"Stats response too short: {len(data)} bytes, need at least 2 for header") + await self.dispatcher.dispatch(Event(EventType.ERROR, {"reason": "invalid_frame_length"})) + return + + stats_type = data[1] + + if stats_type == 0: # STATS_TYPE_CORE + # RESP_CODE_STATS + STATS_TYPE_CORE: 11 bytes total + # Format: = 30: + (recv_errors,) = struct.unpack('= 0: + res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") + else: + res["channel_name"] = name_bytes.decode("utf-8", "ignore") + + res["channel_secret"] = dbuf.read(16) + res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2] + + await self.packet_parser.newChannel(res) + + await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) + + # Push notifications + elif packet_type_value == PacketType.ADVERTISEMENT.value: + logger.debug("Advertisement received") + res = {} + res["public_key"] = dbuf.read(32).hex() + await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) + + elif packet_type_value == PacketType.PATH_UPDATE.value: + logger.debug("Code path update") + res = {} + res["public_key"] = dbuf.read(32).hex() + await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) + + elif packet_type_value == PacketType.ACK.value: + logger.debug("Received ACK") + ack_data = {} + + if len(data) >= 5: + ack_data["code"] = dbuf.read(4).hex() + + attributes = {"code": ack_data.get("code", "")} + + await self.dispatcher.dispatch(Event(EventType.ACK, ack_data, attributes)) + + elif packet_type_value == PacketType.MESSAGES_WAITING.value: + logger.debug("Msgs are waiting") + await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, {})) + + elif packet_type_value == PacketType.RAW_DATA.value: + res = {} + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + res["payload"] = dbuf.read(4).hex() + logger.debug("Received raw data") + print(res) + await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) + + elif packet_type_value == PacketType.LOGIN_SUCCESS.value: + res = {} + attributes = {} + if len(data) > 1: + perms = dbuf.read(1)[0] + res["permissions"] = perms + res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set + + res["pubkey_prefix"] = dbuf.read(6).hex() + + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + + await self.dispatcher.dispatch( + Event(EventType.LOGIN_SUCCESS, res, attributes) + ) + + elif packet_type_value == PacketType.LOGIN_FAILED.value: + res = {} + attributes = {} + + dbuf.read(1) + + if len(data) > 7: + res["pubkey_prefix"] = pbuf.read(6).hex() + + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + + await self.dispatcher.dispatch( + Event(EventType.LOGIN_FAILED, res, attributes) + ) + + elif packet_type_value == PacketType.STATUS_RESPONSE.value: + res = parse_status(data, offset=8) + data_hex = data[8:].hex() + logger.debug(f"Status response: {data_hex}") + + attributes = { + "pubkey_prefix": res["pubkey_pre"], + } + + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.LOG_DATA.value: + logger.debug(f"Received RF log data: {data.hex()}") + + # Parse as raw RX data + log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()} + attributes = {} + + recv_time = int(time.time()) + log_data["recv_time"] = recv_time + attributes["recv_time"] = recv_time + + # First byte is SNR (signed byte, multiplied by 4) + if len(data) > 1: + snr_byte = dbuf.read(1)[0] + # Convert to signed value + snr = (snr_byte if snr_byte < 128 else snr_byte - 256) / 4.0 + log_data["snr"] = snr + + # Second byte is RSSI (signed byte) + if len(data) > 2: + rssi_byte = dbuf.read(1)[0] + # Convert to signed value + rssi = rssi_byte if rssi_byte < 128 else rssi_byte - 256 + log_data["rssi"] = rssi + + # Remaining bytes are the raw data payload + payload = None + if len(data) > 3: + payload=dbuf.read() + log_data["payload"] = payload.hex() + log_data["payload_length"] = len(payload) + + # Parse payload and get some info from it + log_data = await self.packet_parser.parsePacketPayload(payload, log_data) + attributes['route_type'] = log_data['route_type'] + attributes['payload_type'] = log_data['payload_type'] + attributes['path_len'] = log_data['path_len'] + attributes['path'] = log_data['path'] + + # Dispatch as RF log data + await self.dispatcher.dispatch( + Event(EventType.RX_LOG_DATA, log_data, attributes) + ) + + elif packet_type_value == PacketType.TRACE_DATA.value: + logger.debug(f"Received trace data: {data.hex()}") + res = {} + + # According to the source, format is: + # 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr + + reserved = dbuf.read(1)[0] + path_len = dbuf.read(1)[0] + flags = dbuf.read(1)[0] + tag = int.from_bytes(dbuf.read(4), byteorder="little") + auth_code = int.from_bytes(dbuf.read(4), byteorder="little") + + path_hash_len = 1 << (flags&3) + path_len = int(path_len / path_hash_len) + + # Initialize result + res["tag"] = tag + res["auth"] = auth_code + res["flags"] = flags + res["path_len"] = path_len + + # Process path as array of objects with hash and SNR + path_nodes = [] + + if path_len > 0 and len(data) >= 12 + path_len + (path_len * path_hash_len) + 1: + # Extract path with hash and SNR pairs + for i in range(path_len): + node = { + "hash": dbuf.read(path_hash_len).hex(), + } + path_nodes.append(node) + + for n in path_nodes: + node_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + n["snr"] = node_snr / 4.0 + + # Add the final node (our device) with its SNR + final_snr = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4.0 + path_nodes.append({"snr": final_snr}) + + res["path"] = path_nodes + + logger.debug(f"Parsed trace data: {res}") + + attributes = { + "tag": res["tag"], + "auth_code": res["auth"], + } + + await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res, attributes)) + + elif packet_type_value == PacketType.TELEMETRY_RESPONSE.value: + logger.debug(f"Received telemetry data: {data.hex()}") + res = {} + + dbuf.read(1) + + res["pubkey_pre"] = dbuf.read(6).hex() + buf = dbuf.read() + + """Parse a given byte string and return as a LppFrame object.""" + i = 0 + lpp_data_list = [] + while i < len(buf) and buf[i] != 0: + lppdata = LppData.from_bytes(buf[i:]) + lpp_data_list.append(lppdata) + i = i + len(lppdata) + + lpp = json.loads( + json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder) + ) + + res["lpp"] = lpp + + attributes = { + "raw": buf.hex(), + "pubkey_prefix": res["pubkey_pre"], + } + + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.ALLOWED_REPEAT_FREQ.value: + res = {} + freqs = [] + + cont = True + try: + while cont: + min = int.from_bytes(dbuf.read(4), "little", signed=False) + max = int.from_bytes(dbuf.read(4), "little", signed=False) + if min == 0 or max == 0: + cont = False + else: + freqs.append({"min" : min, "max": max}) + except e: + print(e) + + res["freqs"] = freqs + + await self.dispatcher.dispatch( + Event(EventType.ALLOWED_REPEAT_FREQ, res) + ) + + elif packet_type_value == PacketType.BINARY_RESPONSE.value: + dbuf.read(1) + tag = dbuf.read(4).hex() + response_data = dbuf.read() + logger.debug(f"Received binary data: {data.hex()}, tag {tag}, data {response_data.hex()}") + + # Always dispatch generic BINARY_RESPONSE + binary_res = {"tag": tag, "data": response_data.hex()} + await self.dispatcher.dispatch( + Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) + ) + + # Check for tracked request type and dispatch specific response + if tag in self.pending_binary_requests: + request_type = self.pending_binary_requests[tag]["request_type"] + is_anon = self.pending_binary_requests[tag]["is_anon"] + pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] + context = self.pending_binary_requests[tag]["context"] + del self.pending_binary_requests[tag] + logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") + + if not is_anon: + + if request_type == BinaryReqType.STATUS and len(response_data) >= 52: + res = {} + res = parse_status(response_data, pubkey_prefix=pubkey_prefix) + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) + ) + + elif request_type == BinaryReqType.TELEMETRY : + try: + lpp = lpp_parse(response_data) + telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) + ) + except Exception as e: + logger.error(f"Error parsing binary telemetry response: {e}") + + elif request_type == BinaryReqType.MMA: + try: + mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header + mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.MMA_RESPONSE, mma_res, mma_res) + ) + except Exception as e: + logger.error(f"Error parsing binary MMA response: {e}") + + elif request_type == BinaryReqType.ACL: + try: + acl_result = parse_acl(response_data) + acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} + await self.dispatcher.dispatch( + Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + except Exception as e: + logger.error(f"Error parsing binary ACL response: {e}") + + elif request_type == BinaryReqType.NEIGHBOURS: + try: + pk_plen = context["pubkey_prefix_length"] + bbuf = io.BytesIO(response_data) + + res = { + "pubkey_prefix": pubkey_prefix, + "tag": tag + } + res.update(context) # add context in result + + res["neighbours_count"] = int.from_bytes(bbuf.read(2), "little", signed=True) + results_count = int.from_bytes(bbuf.read(2), "little", signed=True) + res["results_count"] = results_count + + neighbours_list = [] + + for _ in range (results_count): + neighb = {} + neighb["pubkey"] = bbuf.read(pk_plen).hex() + neighb["secs_ago"] = int.from_bytes(bbuf.read(4), "little", signed=True) + neighb["snr"] = int.from_bytes(bbuf.read(1), "little", signed=True) / 4 + neighbours_list.append(neighb) + + res["neighbours"] = neighbours_list + + await self.dispatcher.dispatch( + Event(EventType.NEIGHBOURS_RESPONSE, res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) + ) + + except Exception as e: + logger.error(f"Error parsing binary NEIGHBOURS response: {e}") + + else: + logger.debug(f"No tracked request found for binary response tag {tag}") + + elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: + logger.debug(f"Received path discovery response: {data.hex()}") + res = {} + dbuf.read(1) + res["pubkey_pre"] = dbuf.read(6).hex() + opl = dbuf.read(1)[0] + opl_hlen = ((opl & 0xc0) >> 6) + 1 + opl = opl & 0x3f + res["out_path_len"] = opl + res["out_path_hash_len"] = opl_hlen + res["out_path"] = dbuf.read(opl*opl_hlen).hex() + ipl = dbuf.read(1)[0] + ipl_hlen = ((ipl & 0xc0) >> 6) + 1 + ipl = ipl & 0x3f + res["in_path_len"] = ipl + res["in_path_hash_len"] = ipl_hlen + res["in_path"] = dbuf.read(ipl*ipl_hlen).hex() + + attributes = {"pubkey_pre": res["pubkey_pre"]} + + await self.dispatcher.dispatch( + Event(EventType.PATH_RESPONSE, res, attributes) + ) + + elif packet_type_value == PacketType.PRIVATE_KEY.value: + logger.debug(f"Received private key response: {data.hex()}") + if len(data) >= 65: # 1 byte response code + 64 bytes private key + private_key = dbuf.read(64) # Extract 64-byte private key + res = {"private_key": private_key} + await self.dispatcher.dispatch(Event(EventType.PRIVATE_KEY, res)) + else: + logger.error(f"Invalid private key response length: {len(data)}") + + elif packet_type_value == PacketType.SIGN_START.value: + logger.debug(f"Received sign start response: {data.hex()}") + # Payload: 1 reserved byte, 4-byte max length + dbuf.read(1) + max_len = int.from_bytes(dbuf.read(4), "little") + res = {"max_length": max_len} + await self.dispatcher.dispatch(Event(EventType.SIGN_START, res)) + + elif packet_type_value == PacketType.SIGNATURE.value: + logger.debug(f"Received signature: {data.hex()}") + signature = dbuf.read() + res = {"signature": signature} + await self.dispatcher.dispatch(Event(EventType.SIGNATURE, res)) + + elif packet_type_value == PacketType.DISABLED.value: + logger.debug("Received disabled response") + res = {"reason": "private_key_export_disabled"} + await self.dispatcher.dispatch(Event(EventType.DISABLED, res)) + + elif packet_type_value == PacketType.CONTROL_DATA.value: + logger.debug("Received control data packet") + res={} + res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 + res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) + res["path_len"] = dbuf.read(1)[0] + payload = dbuf.read() + payload_type = payload[0] + res["payload_type"] = payload_type + res["payload"] = payload + + attributes = {"payload_type": payload_type} + await self.dispatcher.dispatch( + Event(EventType.CONTROL_DATA, res, attributes) + ) + + # decode NODE_DISCOVER_RESP + if payload_type & 0xF0 == ControlType.NODE_DISCOVER_RESP.value: + pbuf = io.BytesIO(payload[1:]) + ndr = dict(res) + del ndr["payload_type"] + del ndr["payload"] + ndr["node_type"] = payload_type & 0x0F + ndr["SNR_in"] = int.from_bytes(pbuf.read(1), byteorder="little", signed=True)/4 + ndr["tag"] = pbuf.read(4).hex() + + pubkey = pbuf.read() + if len(pubkey) < 32: + pubkey = pubkey[0:8] + else: + pubkey = pubkey[0:32] + + ndr["pubkey"] = pubkey.hex() + + attributes = { + "node_type" : ndr["node_type"], + "tag" : ndr["tag"], + "pubkey" : ndr["pubkey"], + } + + await self.dispatcher.dispatch( + Event(EventType.DISCOVER_RESPONSE, ndr, attributes) + ) + + else: + logger.debug(f"Unhandled data received {data}") + logger.debug(f"Unhandled packet type: {packet_type_value}") + except Exception as e: + logger.error( + "handle_rx parse error: %s: %s | raw=%s\n%s", + type(e).__name__, + e, + data.hex(), + traceback.format_exc(), + ) From 2025cb5326912ef228181e9dc9fef101b43ef992 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:15:29 -0700 Subject: [PATCH 02/13] =?UTF-8?q?G1:=20F10=20=E2=80=94=20fix=20pbuf=20Name?= =?UTF-8?q?Error=20in=20PUSH=5FCODE=5FLOGIN=5FFAIL=20handler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: The LOGIN_FAILED handler in handle_rx referenced an undefined identifier `pbuf` instead of the local BytesIO `dbuf`. Firmware emits PUSH_CODE_LOGIN_FAIL as a fixed 8-byte frame, which trivially satisfies the `len(data) > 7` guard, so every remote auth failure raised NameError. The sibling LOGIN_SUCCESS handler a few lines above already uses `dbuf.read(6).hex()` correctly; this commit aligns the LOGIN_FAILED branch with the same pattern. Refs: Forensics report finding F10 (S1) --- src/meshcore/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 088baeb..4e47e77 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -555,7 +555,7 @@ class MessageReader: dbuf.read(1) if len(data) > 7: - res["pubkey_prefix"] = pbuf.read(6).hex() + res["pubkey_prefix"] = dbuf.read(6).hex() attributes = {"pubkey_prefix": res.get("pubkey_prefix")} From a7e257c78d27d3eba873d028fe91668deae7fb63 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:17:17 -0700 Subject: [PATCH 03/13] =?UTF-8?q?G1:=20F11=20=E2=80=94=20replace=20broken?= =?UTF-8?q?=20`except=20e:`=20in=20ALLOWED=5FREPEAT=5FFREQ=20handler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: The ALLOWED_REPEAT_FREQ branch in handle_rx had `except e:` — syntactically valid Python only if `e` happens to be bound to an exception class, which it isn't. The first time the inner read loop actually raised, the except clause itself would raise NameError ("name 'e' is not defined") and propagate out of the handler. The proposal correctly notes this is unreachable in practice today because `int.from_bytes(b"", ...)` returns 0 so the loop terminates cleanly, but it is a latent footgun. Replace with the standard `except Exception as e:` form and swap the `print(e)` for a proper `logger.warning(...)` call to match the rest of the file (which uses the module logger, not stdout). Refs: Forensics report finding F11 (S3) --- src/meshcore/reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 4e47e77..581853a 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -717,8 +717,8 @@ class MessageReader: cont = False else: freqs.append({"min" : min, "max": max}) - except e: - print(e) + except Exception as e: + logger.warning(f"Error parsing ALLOWED_REPEAT_FREQ payload: {e}") res["freqs"] = freqs From 3273c3489ce3538c171ccd21a31bfe5f06f55912 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:19:52 -0700 Subject: [PATCH 04/13] =?UTF-8?q?G1:=20N07=20=E2=80=94=20tighten=20BATTERY?= =?UTF-8?q?=20storage-field=20length=20guard?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: The BATTERY handler previously gated the used_kb / total_kb reads on `len(data) > 3`, which is wrong. The full RESP_CODE_BATT_AND_STORAGE frame is 11 bytes (1 type + 2 level + 4 used_kb + 4 total_kb), so a 4-10 byte truncated frame would pass the guard, and io.BytesIO.read(4) silently returns short bytes instead of raising. int.from_bytes(b"", ...) returns 0, so HA sensor telemetry silently reports zero storage on a truncated frame. Tighten the guard to `len(data) >= 11` so the storage fields are only parsed when the full frame is present. Inline comment added to document the expected frame layout. Note: the unconditional 2-byte `level` read at the top of the handler has the same class of issue (no guard, silent zero on a 1-byte frame). That is out of scope for finding N07 and has been logged in issues_log.md as a separate item. Refs: Forensics report finding N07 (S3) --- src/meshcore/reader.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 581853a..a9b2f4c 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -343,7 +343,12 @@ class MessageReader: elif packet_type_value == PacketType.BATTERY.value: battery_level = int.from_bytes(dbuf.read(2), byteorder="little") result = {"level": battery_level} - if len(data) > 3: # has storage info as well + # Full RESP_CODE_BATT_AND_STORAGE frame is 11 bytes: + # 1 type + 2 level + 4 used_kb + 4 total_kb. The previous + # `len(data) > 3` guard let 4-10 byte truncated frames through, + # producing silent zero values for used_kb/total_kb because + # io.BytesIO.read() returns short data without raising. + if len(data) >= 11: # has storage info as well result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) From a571eff4ce7da5f474c5e44b38cd50ed06f5ac4a Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:21:27 -0700 Subject: [PATCH 05/13] =?UTF-8?q?G1:=20NEW-C=20=E2=80=94=20add=20length=20?= =?UTF-8?q?guard=20to=20STATUS=5FRESPONSE=20push=20handler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: parse_status with offset=8 reads up through data[56:60] (the rx_airtime field), so a full STATUS_RESPONSE push frame is 60 bytes: 1 type + 1 reserved + 6 pubkey + 52 status fields. The push handler in handle_rx previously called parse_status with no length check at all, so a short frame would slice through empty data and silently produce zeros for every missing field. HA sensor telemetry would silently report all-zero status — same class as N07. The BINARY_RESPONSE STATUS path at the bottom of handle_rx already gates parse_status with `len(response_data) >= 52` on its offset-stripped buffer; this commit adds the equivalent gate for the push path: `if len(data) < 60: log + return`. The `return` short-circuits cleanly out of the umbrella try block without dispatching a STATUS_RESPONSE event for the bogus parse. Refs: Forensics report finding NEW-C (S3) --- src/meshcore/reader.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index a9b2f4c..6aa5131 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -569,6 +569,15 @@ class MessageReader: ) elif packet_type_value == PacketType.STATUS_RESPONSE.value: + # parse_status with offset=8 reads up through data[56:60] + # (rx_airtime field), so the full payload is 60 bytes: + # 1 type + 1 reserved + 6 pubkey + 52 status fields. The + # BINARY_RESPONSE STATUS path below gates with `>= 52` on + # the offset-stripped buffer; this gate is the equivalent + # for the push path with the 8-byte header included. + if len(data) < 60: + logger.debug(f"STATUS_RESPONSE push frame too short ({len(data)} bytes < 60), skipping parse") + return res = parse_status(data, offset=8) data_hex = data[8:].hex() logger.debug(f"Status response: {data_hex}") From c984a78e70ac17a7f851977980d41b3ac2013dab Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:26:37 -0700 Subject: [PATCH 06/13] =?UTF-8?q?G1:=20R01=20=E2=80=94=20guard=20parsePack?= =?UTF-8?q?etPayload=20against=20short=20payloads?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a 2-byte hard-floor length check at the top of MeshcorePacketParser.parsePacketPayload. The minimum viable payload is 1 header byte + 1 path_byte for a direct route; anything shorter would crash on `path_byte = pbuf.read(1)[0]` (IndexError on the empty buffer). The reader.py LOG_DATA branch only requires `len(data) > 3`, so a 4-byte LOG_DATA frame produces a 1-byte payload here — that path is reachable. The caller in reader.py dereferences log_data['route_type'], ['payload_type'], ['path_len'], and ['path'] immediately after the parse, so an empty log_data would KeyError on the dispatch (caught by the F06 umbrella, but the dispatch would still be skipped). Populate sentinel values matching the existing route_typename = "UNK" pattern in the function before the early return so the caller's downstream lookups don't KeyError, then return early with a debug log. Finding: R01 (S2) File: src/meshcore/meshcore_parser.py --- src/meshcore/meshcore_parser.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/meshcore/meshcore_parser.py b/src/meshcore/meshcore_parser.py index 2139663..715dfd3 100644 --- a/src/meshcore/meshcore_parser.py +++ b/src/meshcore/meshcore_parser.py @@ -42,8 +42,30 @@ class MeshcorePacketParser: Returns : completed log_data """ + # Minimum viable payload is 2 bytes (1 header + 1 path_byte) for a + # direct route. Anything shorter is provably broken — for example, + # the LOG_DATA branch in reader.py only requires `len(data) > 3`, + # which means a 4-byte LOG_DATA frame produces a 1-byte payload + # here, and `path_byte = pbuf.read(1)[0]` further down would raise + # IndexError on the empty buffer. Populate sentinel values so the + # caller's downstream `log_data['route_type']` etc. lookups don't + # KeyError, then return early. + if len(payload) < 2: + logger.debug(f"parsePacketPayload: payload too short ({len(payload)} bytes < 2), returning sentinel log_data") + log_data["route_type"] = -1 + log_data["route_typename"] = "UNK" + log_data["payload_type"] = -1 + log_data["payload_typename"] = "UNK" + log_data["payload_ver"] = 0 + log_data["path_len"] = 0 + log_data["path_hash_size"] = 1 + log_data["path"] = "" + log_data["pkt_payload"] = b"" + log_data["pkt_hash"] = 0 + return log_data + pbuf = io.BytesIO(payload) - + header = pbuf.read(1)[0] route_type = header & 0x03 payload_type = (header & 0x3c) >> 2 From 865c1b21b4422f7b400a60ff0c489e285f707a50 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:29:41 -0700 Subject: [PATCH 07/13] =?UTF-8?q?G1:=20NEW-B=20=E2=80=94=20wrap=20parsePac?= =?UTF-8?q?ketPayload=20ADVERT=20branch=20in=20try/except?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ADVERT branch in MeshcorePacketParser.parsePacketPayload reads the flags byte with `pk_buf.read(1)[0]`, which IndexErrors on a short advert payload (the minimum advert is 32 + 4 + 64 + 1 = 101 bytes before any optional fields). Pre-F06, the IndexError would escape as a swallowed task exception. With F06's umbrella now in place it would log and skip the dispatch, but the proposal §4.1 NEW-B asks for a narrower local guard so a malformed advert doesn't poison the rest of the parse path. The optional `lat/lon/feat1/feat2` reads after the flags byte also silently produce zeros on short reads (`int.from_bytes(b"", ...)` returns 0), which would propagate bogus zero coordinates upstream. Wrapping the whole branch limits the blast radius to a single malformed advert. Wrap the entire body of the ADVERT elif (from `pk_buf = io.BytesIO(...)` through the final `log_data["adv_feat2"]` assignment) in `try/except (IndexError, ValueError)` and log a debug message with the exception type, message, and `pkt_payload` length on failure. This matches the defensive pattern the proposal specifies. Finding: NEW-B (S3) File: src/meshcore/meshcore_parser.py --- src/meshcore/meshcore_parser.py | 69 +++++++++++++++++---------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/src/meshcore/meshcore_parser.py b/src/meshcore/meshcore_parser.py index 715dfd3..a56165d 100644 --- a/src/meshcore/meshcore_parser.py +++ b/src/meshcore/meshcore_parser.py @@ -171,39 +171,42 @@ class MeshcorePacketParser: del self.channels_log[:25] elif not payload is None and payload_type == 0x04: # Advert - pk_buf = io.BytesIO(pkt_payload) - adv_key = pk_buf.read(32).hex() - adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False) - signature = pk_buf.read(64).hex() - flags = pk_buf.read(1)[0] - adv_type = flags & 0x0F - adv_lat = None - adv_lon = None - adv_feat1 = None - adv_feat2 = None - if flags & 0x10 > 0: #has location - adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 - adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 - if flags & 0x20 > 0: #has feature1 - adv_feat1 = pk_buf.read(2).hex() - if flags & 0x40 > 0: #has feature2 - adv_feat2 = pk_buf.read(2).hex() - if flags & 0x80 > 0: #has name - adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00") - log_data["adv_name"] = adv_name + try: + pk_buf = io.BytesIO(pkt_payload) + adv_key = pk_buf.read(32).hex() + adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False) + signature = pk_buf.read(64).hex() + flags = pk_buf.read(1)[0] + adv_type = flags & 0x0F + adv_lat = None + adv_lon = None + adv_feat1 = None + adv_feat2 = None + if flags & 0x10 > 0: #has location + adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 + adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 + if flags & 0x20 > 0: #has feature1 + adv_feat1 = pk_buf.read(2).hex() + if flags & 0x40 > 0: #has feature2 + adv_feat2 = pk_buf.read(2).hex() + if flags & 0x80 > 0: #has name + adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00") + log_data["adv_name"] = adv_name - log_data["adv_key"] = adv_key - log_data["adv_timestamp"] = adv_timestamp - log_data["signature"] = signature - log_data["adv_flags"] = flags - log_data["adv_type"] = adv_type - if not adv_lat is None : - log_data["adv_lat"] = adv_lat - if not adv_lon is None : - log_data["adv_lon"] = adv_lon - if not adv_feat1 is None: - log_data["adv_feat1"] = adv_feat1 - if not adv_feat2 is None: - log_data["adv_feat2"] = adv_feat2 + log_data["adv_key"] = adv_key + log_data["adv_timestamp"] = adv_timestamp + log_data["signature"] = signature + log_data["adv_flags"] = flags + log_data["adv_type"] = adv_type + if not adv_lat is None : + log_data["adv_lat"] = adv_lat + if not adv_lon is None : + log_data["adv_lon"] = adv_lon + if not adv_feat1 is None: + log_data["adv_feat1"] = adv_feat1 + if not adv_feat2 is None: + log_data["adv_feat2"] = adv_feat2 + except (IndexError, ValueError) as e: + logger.debug(f"parsePacketPayload: malformed ADVERT payload ({type(e).__name__}: {e}), len={len(pkt_payload)}") return log_data From 1a3a665b17554669169f4d4d9f5f37d88764e4c1 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:30:54 -0700 Subject: [PATCH 08/13] =?UTF-8?q?G1:=20R02=20=E2=80=94=20fix=20txt=5Ftype?= =?UTF-8?q?=20empty-slice=20(uncrypted[4:4]=20->=20[4:5])?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `uncrypted[4:4]` is the empty slice. `int.from_bytes(b"", "little")` returns 0, so `txt_type` was always 0 for every decrypted channel message — silently masking the upper 6 bits of byte 4. The line immediately above (`attempt = uncrypted[4] & 3`) already proves byte 4 is in range, so widening the slice to `[4:5]` is safe. This is a one-character fix and changes the observable value of `txt_type` for all callers. Existing consumers that branched on `txt_type` were effectively dead code; this restores the intended behavior. Finding: R02 (Info) File: src/meshcore/meshcore_parser.py --- src/meshcore/meshcore_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meshcore/meshcore_parser.py b/src/meshcore/meshcore_parser.py index a56165d..046709a 100644 --- a/src/meshcore/meshcore_parser.py +++ b/src/meshcore/meshcore_parser.py @@ -150,7 +150,7 @@ class MeshcorePacketParser: uncrypted = cipher.decrypt(msg) timestamp = int.from_bytes(uncrypted[0:4], "little", signed=False) attempt = uncrypted[4] & 3 - txt_type = int.from_bytes(uncrypted[4:4], "little", signed=False) >> 2 + txt_type = int.from_bytes(uncrypted[4:5], "little", signed=False) >> 2 message = uncrypted[5:].strip(b"\0") msg_hash = int.from_bytes(SHA256.new(timestamp.to_bytes(4, "little", signed=False) + message).digest()[0:4], "little", signed=False) log_data["message"] = message.decode("utf-8", "ignore") From d98523cddb5c558d2108f090c83eb2d2b7560c17 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:33:20 -0700 Subject: [PATCH 09/13] =?UTF-8?q?G1:=20N08=20+=20F12=20+=20N10=20=E2=80=94?= =?UTF-8?q?=20cosmetic=20cleanup=20in=20reader.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three small cleanup fixes bundled per proposal §4.1 commit order. N08 — CONTROL_DATA empty payload guard. The handler reads `payload = dbuf.read()` then immediately dereferences `payload[0]` without checking length. A zero-length payload (firmware truncation or garbled frame) raises IndexError. Pre-F06 the IndexError would escape; post-F06 it would log and skip the dispatch via the umbrella. Adding an explicit `if len(payload) == 0: return` after the read short-circuits the empty case before it touches `payload[0]`, with a debug log noting the empty payload. The `return` exits handle_rx cleanly without engaging the F06 umbrella's parse-error path, which is the correct behavior — an empty CONTROL_DATA frame is not a parse error, it's an unusable frame. F12 — print(res) leftover debug. The RAW_DATA handler had a stray `print(res)` polluting stdout. Replaced with `logger.debug(res)` to match the surrounding `logger.debug("Received raw data")` line. N10 — magic numbers 16 and 17. Two `elif packet_type_value == 16/17` branches hardcoded the integer values for CONTACT_MSG_RECV_V3 and CHANNEL_MSG_RECV_V3, both already declared in packets.py:94-95. Replaced with `PacketType.CONTACT_MSG_RECV_V3.value` and `PacketType.CHANNEL_MSG_RECV_V3.value` to eliminate drift risk if the enum is ever renumbered. Findings: N08 (Info), F12 (Info), N10 (Info) File: src/meshcore/reader.py --- src/meshcore/reader.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 6aa5131..ead8b0a 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -222,7 +222,7 @@ class MessageReader: await self.dispatcher.dispatch(Event(evt_type, res, attributes)) - elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + elif packet_type_value == PacketType.CONTACT_MSG_RECV_V3.value: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} res["type"] = "PRIV" res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 @@ -287,7 +287,7 @@ class MessageReader: Event(EventType.CHANNEL_MSG_RECV, res, attributes) ) - elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) + elif packet_type_value == PacketType.CHANNEL_MSG_RECV_V3.value: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} res["type"] = "CHAN" res["SNR"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) / 4 @@ -534,7 +534,7 @@ class MessageReader: res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) res["payload"] = dbuf.read(4).hex() logger.debug("Received raw data") - print(res) + logger.debug(res) await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) elif packet_type_value == PacketType.LOGIN_SUCCESS.value: @@ -895,6 +895,9 @@ class MessageReader: res["RSSI"] = int.from_bytes(dbuf.read(1), byteorder="little", signed=True) res["path_len"] = dbuf.read(1)[0] payload = dbuf.read() + if len(payload) == 0: + logger.debug("CONTROL_DATA frame has empty payload, skipping") + return payload_type = payload[0] res["payload_type"] = payload_type res["payload"] = payload From f3973151d605a5c50b38b84f5aab42483b68350c Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:39:01 -0700 Subject: [PATCH 10/13] G1: add verification tests for F06, N07, NEW-C, R02 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the four unit tests required by proposal §4.1 verification: (a) test_g1_handle_rx_malformed_frame_logged_and_swallowed — F06. Sends a 4-byte CHANNEL_MSG_RECV_V3 frame missing the channel_idx byte. The handler raises IndexError on dbuf.read(1)[0]; the F06 umbrella must catch it, log "handle_rx parse error" with a traceback, and return cleanly without dispatching any event. (b) test_g1_battery_short_frame_omits_storage_fields — N07. Sends a 3-byte BATTERY frame (type + 2-byte level only). Verifies that exactly one BATTERY event is dispatched, the payload contains `level` but does NOT contain `used_kb` or `total_kb`. Pre-fix the `len(data) > 3` gate would have produced both fields with bogus silent zeros. (c) test_g1_status_response_short_frame_skipped — NEW-C. Sends a 30-byte STATUS_RESPONSE push frame (well below the 60-byte minimum). Verifies that no STATUS_RESPONSE event is dispatched and the new "STATUS_RESPONSE push frame too short" debug log fires. (d) test_g1_parse_packet_payload_txt_type_decodes_high_bits — R02. Sets up a synthetic channel with a known 16-byte AES key, encrypts a 16-byte plaintext where byte 4 = (5 << 2) | 1 = 0x15, builds the full pkt_payload (chan_hash + cipher_mac + ciphertext), wraps it in a minimal route header, calls parsePacketPayload directly, and asserts log_data["txt_type"] == 5 and log_data["attempt"] == 1. Pre-R02-fix `uncrypted[4:4]` was the empty slice, so txt_type was always 0 — this test would have failed with txt_type=0. Adds a small _CapturingDispatcher helper class. Adds imports for logging at the top of the file. The existing test_binary_response is left untouched. File: tests/unit/test_reader.py --- tests/unit/test_reader.py | 165 +++++++++++++++++++++++++++++++++++++- 1 file changed, 164 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py index 39bb8ac..aa72b03 100644 --- a/tests/unit/test_reader.py +++ b/tests/unit/test_reader.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import asyncio +import logging from unittest.mock import AsyncMock from meshcore.events import EventType from meshcore.reader import MessageReader @@ -87,4 +88,166 @@ async def test_binary_response(): print(f"⚠️ Unknown request type {request_type}, no specific event expected") if __name__ == "__main__": - asyncio.run(test_binary_response()) \ No newline at end of file + asyncio.run(test_binary_response()) + + +# --------------------------------------------------------------------------- +# G1 verification tests (proposal §4.1) +# --------------------------------------------------------------------------- + +class _CapturingDispatcher: + """Quiet dispatcher that records every dispatched event.""" + def __init__(self): + self.events = [] + + async def dispatch(self, event): + self.events.append(event) + + +@pytest.mark.asyncio +async def test_g1_handle_rx_malformed_frame_logged_and_swallowed(caplog): + """G1/F06: malformed frame must not propagate, must be logged with traceback.""" + dispatcher = _CapturingDispatcher() + reader = MessageReader(dispatcher) + + # 4-byte CHANNEL_MSG_RECV_V3 frame: type byte (0x11) + 1 SNR byte + + # 2 reserved bytes, but no channel_idx byte. The handler will raise + # IndexError on the next dbuf.read(1)[0] when the buffer is empty. + # F06's umbrella try/except must catch it, log the parse error, and + # return cleanly. + malformed = bytearray.fromhex("11100000") + + with caplog.at_level(logging.ERROR, logger="meshcore"): + await reader.handle_rx(malformed) # must not raise + + error_records = [r for r in caplog.records if "handle_rx parse error" in r.message] + assert error_records, ( + f"Expected an error log containing 'handle_rx parse error'; " + f"got: {[r.message for r in caplog.records]}" + ) + # Traceback should be present in the log message (F06 includes it) + assert "Traceback" in error_records[0].message, ( + "F06 umbrella log message must include a traceback" + ) + # No CHANNEL_MSG_RECV event should have been dispatched + assert not any(e.type == EventType.CHANNEL_MSG_RECV for e in dispatcher.events) + + +@pytest.mark.asyncio +async def test_g1_battery_short_frame_omits_storage_fields(): + """G1/N07: short BATTERY frame must not silently yield zero used_kb/total_kb.""" + dispatcher = _CapturingDispatcher() + reader = MessageReader(dispatcher) + + # 3-byte BATTERY frame: type 0x0c + 2 level bytes (no storage tail). + # Pre-fix the `len(data) > 3` gate would have let any frame >= 4 bytes + # through, producing a BATTERY event with bogus zero used_kb/total_kb + # because io.BytesIO.read() returns short data without raising. + # Post-fix (`len(data) >= 11`) the storage fields are skipped entirely. + short_battery = bytearray.fromhex("0c8000") + + await reader.handle_rx(short_battery) + + battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY] + assert len(battery_events) == 1, ( + f"Expected exactly one BATTERY event, got {len(battery_events)}" + ) + payload = battery_events[0].payload + assert payload["level"] == 0x0080, f"Unexpected level: {payload['level']}" + assert "used_kb" not in payload, ( + "Short BATTERY frame must not include used_kb (would be a silent zero)" + ) + assert "total_kb" not in payload, ( + "Short BATTERY frame must not include total_kb (would be a silent zero)" + ) + + +@pytest.mark.asyncio +async def test_g1_status_response_short_frame_skipped(caplog): + """G1/NEW-C: short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros.""" + dispatcher = _CapturingDispatcher() + reader = MessageReader(dispatcher) + + # 30-byte STATUS_RESPONSE push frame, well below the 60-byte minimum. + # First byte is the type (0x87 = PacketType.STATUS_RESPONSE), the rest + # is arbitrary filler. parse_status with offset=8 reads up through + # data[56:60], so anything < 60 bytes would yield short reads and + # silent zero values pre-fix. + short_status = bytearray([0x87] + [0xAA] * 29) + assert len(short_status) == 30 + + with caplog.at_level(logging.DEBUG, logger="meshcore"): + await reader.handle_rx(short_status) + + status_events = [e for e in dispatcher.events if e.type == EventType.STATUS_RESPONSE] + assert len(status_events) == 0, ( + "Short STATUS_RESPONSE push frame must not dispatch a parsed event" + ) + assert any( + "STATUS_RESPONSE push frame too short" in r.message for r in caplog.records + ), "Expected the NEW-C debug log line for short STATUS_RESPONSE frames" + + +@pytest.mark.asyncio +async def test_g1_parse_packet_payload_txt_type_decodes_high_bits(): + """G1/R02: txt_type must decode the high 6 bits of byte 4, not always be 0.""" + from Crypto.Cipher import AES + from Crypto.Hash import HMAC, SHA256 + from meshcore.meshcore_parser import MeshcorePacketParser + + parser = MeshcorePacketParser() + parser.decrypt_channels = True + + # Set up a synthetic channel with a known 16-byte AES key. Direct dict + # assignment matches how the parser stores channels (newChannel is async + # and serves the same purpose). + channel_secret = b"\x01" * 16 + channel_hash_byte = 0xAB + parser.channels[0] = { + "channel_idx": 0, + "channel_name": "test", + "channel_hash": "ab", + "channel_secret": channel_secret, + } + + # 16-byte plaintext (one AES block): + # bytes 0-3 = sender_timestamp (little-endian) + # byte 4 = (txt_type << 2) | attempt + # bytes 5-15 = message + null padding + # Pick txt_type=5, attempt=1 → byte 4 = (5 << 2) | 1 = 0x15. + # Pre-R02-fix uncrypted[4:4] is empty so txt_type would be 0; + # post-fix uncrypted[4:5] yields 0x15 >> 2 = 5. + plaintext = b"\x00\x00\x00\x00\x15hello\x00\x00\x00\x00\x00\x00" + assert len(plaintext) == 16 + + encrypted = AES.new(channel_secret, AES.MODE_ECB).encrypt(plaintext) + + # cipher_mac = first 2 bytes of HMAC-SHA256(channel_secret, encrypted) + h = HMAC.new(channel_secret, digestmod=SHA256) + h.update(encrypted) + cipher_mac = h.digest()[:2] + + # pkt_payload layout: 1-byte chan_hash + 2-byte cipher_mac + ciphertext + pkt_payload = bytes([channel_hash_byte]) + cipher_mac + encrypted + + # parsePacketPayload expects the full payload buffer: + # header byte (route_type=1 DIRECT, payload_type=5 channel, ver=0) + # path_byte (path_len=0, path_hash_size=1) → 0x00 + # pkt_payload + header = 0x15 # route_type=1, payload_type=5, payload_ver=0 + path_byte = 0x00 + payload = bytes([header, path_byte]) + pkt_payload + + log_data = await parser.parsePacketPayload(payload, log_data={}) + + assert log_data["payload_type"] == 0x05 + assert "txt_type" in log_data, ( + f"txt_type missing from log_data — channel decrypt path was not reached. " + f"log_data keys: {list(log_data.keys())}" + ) + assert log_data["txt_type"] == 5, ( + f"Expected txt_type=5 (R02 fix), got {log_data['txt_type']}" + ) + assert log_data["attempt"] == 1, ( + f"Expected attempt=1, got {log_data['attempt']}" + ) \ No newline at end of file From 2bf3f1b9ddd288f5aebaad1b0b35977ac04ea5d3 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 19:24:26 -0700 Subject: [PATCH 11/13] fix: BATTERY handler drops frames shorter than 3 bytes (level field guard) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A BATTERY frame with len(data) < 3 caused dbuf.read(2) to return short bytes; int.from_bytes(b"", ...) silently yielded 0, propagating a bogus level=0 to HA sensors. Same silent-zero class as N07 (storage fields). Option B: early-return with debug log, matching the NEW-C pattern for STATUS_RESPONSE. No BATTERY event is dispatched for malformed frames. Not in the original forensics report — discovered during G1 N07 work and logged in issues_log.md. Resolved here because no later branch touches this handler. Files changed: - src/meshcore/reader.py: add `if len(data) < 3: return` guard before the level read in the BATTERY branch - tests/unit/test_reader.py: add test_g1_battery_too_short_for_level — sends a 1-byte frame (type only), asserts no BATTERY event dispatched and debug log emitted --- src/meshcore/reader.py | 16 ++++++++++++---- tests/unit/test_reader.py | 27 +++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index ead8b0a..518eb5c 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -341,12 +341,20 @@ class MessageReader: await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) elif packet_type_value == PacketType.BATTERY.value: + # Full RESP_CODE_BATT_AND_STORAGE: 1 type + 2 level + 4 used_kb + 4 total_kb = 11 bytes. + # Minimum viable frame is 3 bytes (type + level). Shorter frames are + # malformed — dbuf.read(2) would return short bytes and + # int.from_bytes(b"", ...) silently yields 0 (same class as N07). + if len(data) < 3: + logger.debug( + "BATTERY frame too short for level field " + f"({len(data)} bytes < 3), skipping" + ) + return battery_level = int.from_bytes(dbuf.read(2), byteorder="little") result = {"level": battery_level} - # Full RESP_CODE_BATT_AND_STORAGE frame is 11 bytes: - # 1 type + 2 level + 4 used_kb + 4 total_kb. The previous - # `len(data) > 3` guard let 4-10 byte truncated frames through, - # producing silent zero values for used_kb/total_kb because + # The previous `len(data) > 3` guard let 4-10 byte truncated frames + # through, producing silent zero values for used_kb/total_kb because # io.BytesIO.read() returns short data without raising. if len(data) >= 11: # has storage info as well result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py index aa72b03..67fa9e6 100644 --- a/tests/unit/test_reader.py +++ b/tests/unit/test_reader.py @@ -162,6 +162,33 @@ async def test_g1_battery_short_frame_omits_storage_fields(): ) +@pytest.mark.asyncio +async def test_g1_battery_too_short_for_level(caplog): + """BATTERY frame shorter than 3 bytes must be dropped entirely (Option B). + + A 1-byte frame (just the packet-type byte 0x0c, no level bytes) would cause + dbuf.read(2) to return b"" and int.from_bytes(b"", ...) to silently yield 0. + The fix adds an early return with a debug log, matching the NEW-C pattern. + """ + dispatcher = _CapturingDispatcher() + reader = MessageReader(dispatcher) + + # 1-byte BATTERY frame: only the type byte, no level payload. + too_short = bytearray.fromhex("0c") + + with caplog.at_level(logging.DEBUG, logger="meshcore"): + await reader.handle_rx(too_short) + + battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY] + assert len(battery_events) == 0, ( + "BATTERY frame shorter than 3 bytes must not dispatch an event" + ) + debug_records = [ + r for r in caplog.records if "BATTERY frame too short" in r.message + ] + assert debug_records, "Expected a debug log about the short BATTERY frame" + + @pytest.mark.asyncio async def test_g1_status_response_short_frame_skipped(caplog): """G1/NEW-C: short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros.""" From 66b4a532a1bbc5067861aed3e1f1ce223ac85998 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sun, 12 Apr 2026 07:52:43 -0700 Subject: [PATCH 12/13] Remove internal G-numbering from test_reader.py Strip G1/ prefix from docstrings, _g1_ from function names, and G1 from section comments. Finding IDs (F06, N07, etc.) are preserved as they are meaningful standalone references. --- tests/unit/test_reader.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py index 67fa9e6..24b8a1b 100644 --- a/tests/unit/test_reader.py +++ b/tests/unit/test_reader.py @@ -92,7 +92,7 @@ if __name__ == "__main__": # --------------------------------------------------------------------------- -# G1 verification tests (proposal §4.1) +# Reader/parser crash-safety verification tests # --------------------------------------------------------------------------- class _CapturingDispatcher: @@ -105,8 +105,8 @@ class _CapturingDispatcher: @pytest.mark.asyncio -async def test_g1_handle_rx_malformed_frame_logged_and_swallowed(caplog): - """G1/F06: malformed frame must not propagate, must be logged with traceback.""" +async def test_handle_rx_malformed_frame_logged_and_swallowed(caplog): + """F06: malformed frame must not propagate, must be logged with traceback.""" dispatcher = _CapturingDispatcher() reader = MessageReader(dispatcher) @@ -134,8 +134,8 @@ async def test_g1_handle_rx_malformed_frame_logged_and_swallowed(caplog): @pytest.mark.asyncio -async def test_g1_battery_short_frame_omits_storage_fields(): - """G1/N07: short BATTERY frame must not silently yield zero used_kb/total_kb.""" +async def test_battery_short_frame_omits_storage_fields(): + """N07: short BATTERY frame must not silently yield zero used_kb/total_kb.""" dispatcher = _CapturingDispatcher() reader = MessageReader(dispatcher) @@ -163,7 +163,7 @@ async def test_g1_battery_short_frame_omits_storage_fields(): @pytest.mark.asyncio -async def test_g1_battery_too_short_for_level(caplog): +async def test_battery_too_short_for_level(caplog): """BATTERY frame shorter than 3 bytes must be dropped entirely (Option B). A 1-byte frame (just the packet-type byte 0x0c, no level bytes) would cause @@ -190,8 +190,8 @@ async def test_g1_battery_too_short_for_level(caplog): @pytest.mark.asyncio -async def test_g1_status_response_short_frame_skipped(caplog): - """G1/NEW-C: short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros.""" +async def test_status_response_short_frame_skipped(caplog): + """NEW-C: short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros.""" dispatcher = _CapturingDispatcher() reader = MessageReader(dispatcher) @@ -216,8 +216,8 @@ async def test_g1_status_response_short_frame_skipped(caplog): @pytest.mark.asyncio -async def test_g1_parse_packet_payload_txt_type_decodes_high_bits(): - """G1/R02: txt_type must decode the high 6 bits of byte 4, not always be 0.""" +async def test_parse_packet_payload_txt_type_decodes_high_bits(): + """R02: txt_type must decode the high 6 bits of byte 4, not always be 0.""" from Crypto.Cipher import AES from Crypto.Hash import HMAC, SHA256 from meshcore.meshcore_parser import MeshcorePacketParser From 5a4960b2686a58a036e78772777b64dacfeaf2e4 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sun, 12 Apr 2026 07:55:03 -0700 Subject: [PATCH 13/13] Remove finding IDs from test_reader.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Strip internal forensics finding references (F06, N07, NEW-C, R02) from docstrings, comments, and assertion messages. Descriptive text is preserved — only the ID prefixes are removed. --- tests/unit/test_reader.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py index 24b8a1b..758968e 100644 --- a/tests/unit/test_reader.py +++ b/tests/unit/test_reader.py @@ -106,14 +106,14 @@ class _CapturingDispatcher: @pytest.mark.asyncio async def test_handle_rx_malformed_frame_logged_and_swallowed(caplog): - """F06: malformed frame must not propagate, must be logged with traceback.""" + """Malformed frame must not propagate, must be logged with traceback.""" dispatcher = _CapturingDispatcher() reader = MessageReader(dispatcher) # 4-byte CHANNEL_MSG_RECV_V3 frame: type byte (0x11) + 1 SNR byte + # 2 reserved bytes, but no channel_idx byte. The handler will raise # IndexError on the next dbuf.read(1)[0] when the buffer is empty. - # F06's umbrella try/except must catch it, log the parse error, and + # The umbrella try/except must catch it, log the parse error, and # return cleanly. malformed = bytearray.fromhex("11100000") @@ -125,9 +125,9 @@ async def test_handle_rx_malformed_frame_logged_and_swallowed(caplog): f"Expected an error log containing 'handle_rx parse error'; " f"got: {[r.message for r in caplog.records]}" ) - # Traceback should be present in the log message (F06 includes it) + # Traceback should be present in the log message assert "Traceback" in error_records[0].message, ( - "F06 umbrella log message must include a traceback" + "Umbrella log message must include a traceback" ) # No CHANNEL_MSG_RECV event should have been dispatched assert not any(e.type == EventType.CHANNEL_MSG_RECV for e in dispatcher.events) @@ -135,7 +135,7 @@ async def test_handle_rx_malformed_frame_logged_and_swallowed(caplog): @pytest.mark.asyncio async def test_battery_short_frame_omits_storage_fields(): - """N07: short BATTERY frame must not silently yield zero used_kb/total_kb.""" + """Short BATTERY frame must not silently yield zero used_kb/total_kb.""" dispatcher = _CapturingDispatcher() reader = MessageReader(dispatcher) @@ -168,7 +168,7 @@ async def test_battery_too_short_for_level(caplog): A 1-byte frame (just the packet-type byte 0x0c, no level bytes) would cause dbuf.read(2) to return b"" and int.from_bytes(b"", ...) to silently yield 0. - The fix adds an early return with a debug log, matching the NEW-C pattern. + The fix adds an early return with a debug log. """ dispatcher = _CapturingDispatcher() reader = MessageReader(dispatcher) @@ -191,7 +191,7 @@ async def test_battery_too_short_for_level(caplog): @pytest.mark.asyncio async def test_status_response_short_frame_skipped(caplog): - """NEW-C: short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros.""" + """Short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros.""" dispatcher = _CapturingDispatcher() reader = MessageReader(dispatcher) @@ -212,12 +212,12 @@ async def test_status_response_short_frame_skipped(caplog): ) assert any( "STATUS_RESPONSE push frame too short" in r.message for r in caplog.records - ), "Expected the NEW-C debug log line for short STATUS_RESPONSE frames" + ), "Expected a debug log line for short STATUS_RESPONSE frames" @pytest.mark.asyncio async def test_parse_packet_payload_txt_type_decodes_high_bits(): - """R02: txt_type must decode the high 6 bits of byte 4, not always be 0.""" + """txt_type must decode the high 6 bits of byte 4, not always be 0.""" from Crypto.Cipher import AES from Crypto.Hash import HMAC, SHA256 from meshcore.meshcore_parser import MeshcorePacketParser @@ -242,7 +242,7 @@ async def test_parse_packet_payload_txt_type_decodes_high_bits(): # byte 4 = (txt_type << 2) | attempt # bytes 5-15 = message + null padding # Pick txt_type=5, attempt=1 → byte 4 = (5 << 2) | 1 = 0x15. - # Pre-R02-fix uncrypted[4:4] is empty so txt_type would be 0; + # Pre-fix uncrypted[4:4] is empty so txt_type would be 0; # post-fix uncrypted[4:5] yields 0x15 >> 2 = 5. plaintext = b"\x00\x00\x00\x00\x15hello\x00\x00\x00\x00\x00\x00" assert len(plaintext) == 16 @@ -273,7 +273,7 @@ async def test_parse_packet_payload_txt_type_decodes_high_bits(): f"log_data keys: {list(log_data.keys())}" ) assert log_data["txt_type"] == 5, ( - f"Expected txt_type=5 (R02 fix), got {log_data['txt_type']}" + f"Expected txt_type=5, got {log_data['txt_type']}" ) assert log_data["attempt"] == 1, ( f"Expected attempt=1, got {log_data['attempt']}"