G1: NEW-B — wrap parsePacketPayload ADVERT branch in try/except

The ADVERT branch in MeshcorePacketParser.parsePacketPayload reads the
flags byte with `pk_buf.read(1)[0]`, which IndexErrors on a short
advert payload (the minimum advert is 32 + 4 + 64 + 1 = 101 bytes
before any optional fields). Pre-F06, the IndexError would escape as a
swallowed task exception. With F06's umbrella now in place it would
log and skip the dispatch, but the proposal §4.1 NEW-B asks for a
narrower local guard so a malformed advert doesn't poison the rest of
the parse path.

The optional `lat/lon/feat1/feat2` reads after the flags byte also
silently produce zeros on short reads (`int.from_bytes(b"", ...)`
returns 0), which would propagate bogus zero coordinates upstream.
Wrapping the whole branch limits the blast radius to a single
malformed advert.

Wrap the entire body of the ADVERT elif (from `pk_buf = io.BytesIO(...)`
through the final `log_data["adv_feat2"]` assignment) in
`try/except (IndexError, ValueError)` and log a debug message with the
exception type, message, and `pkt_payload` length on failure. This
matches the defensive pattern the proposal specifies.

Finding: NEW-B (S3)
File: src/meshcore/meshcore_parser.py
This commit is contained in:
Matthew Wolter 2026-04-11 18:29:41 -07:00
parent c984a78e70
commit 865c1b21b4

View file

@ -171,39 +171,42 @@ class MeshcorePacketParser:
del self.channels_log[:25]
elif not payload is None and payload_type == 0x04: # Advert
pk_buf = io.BytesIO(pkt_payload)
adv_key = pk_buf.read(32).hex()
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
signature = pk_buf.read(64).hex()
flags = pk_buf.read(1)[0]
adv_type = flags & 0x0F
adv_lat = None
adv_lon = None
adv_feat1 = None
adv_feat2 = None
if flags & 0x10 > 0: #has location
adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
if flags & 0x20 > 0: #has feature1
adv_feat1 = pk_buf.read(2).hex()
if flags & 0x40 > 0: #has feature2
adv_feat2 = pk_buf.read(2).hex()
if flags & 0x80 > 0: #has name
adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00")
log_data["adv_name"] = adv_name
try:
pk_buf = io.BytesIO(pkt_payload)
adv_key = pk_buf.read(32).hex()
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
signature = pk_buf.read(64).hex()
flags = pk_buf.read(1)[0]
adv_type = flags & 0x0F
adv_lat = None
adv_lon = None
adv_feat1 = None
adv_feat2 = None
if flags & 0x10 > 0: #has location
adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
if flags & 0x20 > 0: #has feature1
adv_feat1 = pk_buf.read(2).hex()
if flags & 0x40 > 0: #has feature2
adv_feat2 = pk_buf.read(2).hex()
if flags & 0x80 > 0: #has name
adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00")
log_data["adv_name"] = adv_name
log_data["adv_key"] = adv_key
log_data["adv_timestamp"] = adv_timestamp
log_data["signature"] = signature
log_data["adv_flags"] = flags
log_data["adv_type"] = adv_type
if not adv_lat is None :
log_data["adv_lat"] = adv_lat
if not adv_lon is None :
log_data["adv_lon"] = adv_lon
if not adv_feat1 is None:
log_data["adv_feat1"] = adv_feat1
if not adv_feat2 is None:
log_data["adv_feat2"] = adv_feat2
log_data["adv_key"] = adv_key
log_data["adv_timestamp"] = adv_timestamp
log_data["signature"] = signature
log_data["adv_flags"] = flags
log_data["adv_type"] = adv_type
if not adv_lat is None :
log_data["adv_lat"] = adv_lat
if not adv_lon is None :
log_data["adv_lon"] = adv_lon
if not adv_feat1 is None:
log_data["adv_feat1"] = adv_feat1
if not adv_feat2 is None:
log_data["adv_feat2"] = adv_feat2
except (IndexError, ValueError) as e:
logger.debug(f"parsePacketPayload: malformed ADVERT payload ({type(e).__name__}: {e}), len={len(pkt_payload)}")
return log_data