From 865c1b21b4422f7b400a60ff0c489e285f707a50 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sat, 11 Apr 2026 18:29:41 -0700 Subject: [PATCH] =?UTF-8?q?G1:=20NEW-B=20=E2=80=94=20wrap=20parsePacketPay?= =?UTF-8?q?load=20ADVERT=20branch=20in=20try/except?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ADVERT branch in MeshcorePacketParser.parsePacketPayload reads the flags byte with `pk_buf.read(1)[0]`, which IndexErrors on a short advert payload (the minimum advert is 32 + 4 + 64 + 1 = 101 bytes before any optional fields). Pre-F06, the IndexError would escape as a swallowed task exception. With F06's umbrella now in place it would log and skip the dispatch, but the proposal ยง4.1 NEW-B asks for a narrower local guard so a malformed advert doesn't poison the rest of the parse path. The optional `lat/lon/feat1/feat2` reads after the flags byte also silently produce zeros on short reads (`int.from_bytes(b"", ...)` returns 0), which would propagate bogus zero coordinates upstream. Wrapping the whole branch limits the blast radius to a single malformed advert. Wrap the entire body of the ADVERT elif (from `pk_buf = io.BytesIO(...)` through the final `log_data["adv_feat2"]` assignment) in `try/except (IndexError, ValueError)` and log a debug message with the exception type, message, and `pkt_payload` length on failure. This matches the defensive pattern the proposal specifies. Finding: NEW-B (S3) File: src/meshcore/meshcore_parser.py --- src/meshcore/meshcore_parser.py | 69 +++++++++++++++++---------------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/src/meshcore/meshcore_parser.py b/src/meshcore/meshcore_parser.py index 715dfd3..a56165d 100644 --- a/src/meshcore/meshcore_parser.py +++ b/src/meshcore/meshcore_parser.py @@ -171,39 +171,42 @@ class MeshcorePacketParser: del self.channels_log[:25] elif not payload is None and payload_type == 0x04: # Advert - pk_buf = io.BytesIO(pkt_payload) - adv_key = pk_buf.read(32).hex() - adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False) - signature = pk_buf.read(64).hex() - flags = pk_buf.read(1)[0] - adv_type = flags & 0x0F - adv_lat = None - adv_lon = None - adv_feat1 = None - adv_feat2 = None - if flags & 0x10 > 0: #has location - adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 - adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 - if flags & 0x20 > 0: #has feature1 - adv_feat1 = pk_buf.read(2).hex() - if flags & 0x40 > 0: #has feature2 - adv_feat2 = pk_buf.read(2).hex() - if flags & 0x80 > 0: #has name - adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00") - log_data["adv_name"] = adv_name + try: + pk_buf = io.BytesIO(pkt_payload) + adv_key = pk_buf.read(32).hex() + adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False) + signature = pk_buf.read(64).hex() + flags = pk_buf.read(1)[0] + adv_type = flags & 0x0F + adv_lat = None + adv_lon = None + adv_feat1 = None + adv_feat2 = None + if flags & 0x10 > 0: #has location + adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 + adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0 + if flags & 0x20 > 0: #has feature1 + adv_feat1 = pk_buf.read(2).hex() + if flags & 0x40 > 0: #has feature2 + adv_feat2 = pk_buf.read(2).hex() + if flags & 0x80 > 0: #has name + adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00") + log_data["adv_name"] = adv_name - log_data["adv_key"] = adv_key - log_data["adv_timestamp"] = adv_timestamp - log_data["signature"] = signature - log_data["adv_flags"] = flags - log_data["adv_type"] = adv_type - if not adv_lat is None : - log_data["adv_lat"] = adv_lat - if not adv_lon is None : - log_data["adv_lon"] = adv_lon - if not adv_feat1 is None: - log_data["adv_feat1"] = adv_feat1 - if not adv_feat2 is None: - log_data["adv_feat2"] = adv_feat2 + log_data["adv_key"] = adv_key + log_data["adv_timestamp"] = adv_timestamp + log_data["signature"] = signature + log_data["adv_flags"] = flags + log_data["adv_type"] = adv_type + if not adv_lat is None : + log_data["adv_lat"] = adv_lat + if not adv_lon is None : + log_data["adv_lon"] = adv_lon + if not adv_feat1 is None: + log_data["adv_feat1"] = adv_feat1 + if not adv_feat2 is None: + log_data["adv_feat2"] = adv_feat2 + except (IndexError, ValueError) as e: + logger.debug(f"parsePacketPayload: malformed ADVERT payload ({type(e).__name__}: {e}), len={len(pkt_payload)}") return log_data