meshcore_py/tests/unit/test_reader.py
Matthew Wolter 5a4960b268 Remove finding IDs from test_reader.py
Strip internal forensics finding references (F06, N07, NEW-C, R02)
from docstrings, comments, and assertion messages. Descriptive text
is preserved — only the ID prefixes are removed.
2026-04-12 07:55:03 -07:00

280 lines
No EOL
11 KiB
Python

#!/usr/bin/env python3
import asyncio
import logging
from unittest.mock import AsyncMock
from meshcore.events import EventType
from meshcore.reader import MessageReader
class MockDispatcher:
def __init__(self):
self.dispatched_events = []
async def dispatch(self, event):
self.dispatched_events.append(event)
print(f"Dispatched: {event.type} with payload keys: {list(event.payload.keys()) if hasattr(event.payload, 'keys') else event.payload}")
import pytest
@pytest.mark.asyncio
async def test_binary_response():
mock_dispatcher = MockDispatcher()
reader = MessageReader(mock_dispatcher)
packet_hex = "8c00417db968993acd42fc77c3bbd1f08b9b84c39756410c58cd03077162bcb489031869586ab4b103000000000000000000"
packet_data = bytearray.fromhex(packet_hex)
print(f"Testing packet: {packet_hex}")
print(f"Packet type: 0x{packet_data[0]:02x} (should be 0x8c for BINARY_RESPONSE)")
# Register the binary request first
tag = "417db968"
from meshcore.packets import BinaryReqType
pubkey_prefix = "993acd42fc77"
reader.register_binary_request(pubkey_prefix, tag, BinaryReqType.ACL, 10.0)
print(f"Registered ACL request with tag {tag}")
await reader.handle_rx(packet_data)
# Check what was dispatched
print(f"\nTotal events dispatched: {len(mock_dispatcher.dispatched_events)}")
# Verify BINARY_RESPONSE was dispatched
binary_responses = [e for e in mock_dispatcher.dispatched_events if e.type == EventType.BINARY_RESPONSE]
assert len(binary_responses) == 1, f"Expected 1 BINARY_RESPONSE, got {len(binary_responses)}"
print("✅ BINARY_RESPONSE event dispatched correctly")
# Check the binary response payload
binary_event = binary_responses[0]
assert "tag" in binary_event.payload, "BINARY_RESPONSE should have 'tag' in payload"
assert "data" in binary_event.payload, "BINARY_RESPONSE should have 'data' in payload"
print(f"✅ Binary response tag: {binary_event.payload['tag']}")
print(f"✅ Binary response data: {binary_event.payload['data']}")
# Check if a specific parsed event was also dispatched
other_events = [e for e in mock_dispatcher.dispatched_events if e.type != EventType.BINARY_RESPONSE]
if other_events:
print(f"✅ Additional parsed event dispatched: {other_events[0].type}")
print(f" Payload keys: {list(other_events[0].payload.keys()) if hasattr(other_events[0].payload, 'keys') else other_events[0].payload}")
else:
print("⚠️ No additional parsed event dispatched")
# Parse the response data to see what request type it is
response_data = packet_data[6:]
if response_data:
request_type = response_data[0]
print(f"Request type in response: 0x{request_type:02x} ({request_type})")
# Map request types to expected events
from meshcore.packets import BinaryReqType
if request_type == BinaryReqType.STATUS.value:
expected_event = EventType.STATUS_RESPONSE
elif request_type == BinaryReqType.TELEMETRY.value:
expected_event = EventType.TELEMETRY_RESPONSE
elif request_type == BinaryReqType.MMA.value:
expected_event = EventType.MMA_RESPONSE
elif request_type == BinaryReqType.ACL.value:
expected_event = EventType.ACL_RESPONSE
else:
expected_event = None
if expected_event:
specific_events = [e for e in mock_dispatcher.dispatched_events if e.type == expected_event]
if specific_events:
print(f"✅ Expected {expected_event} event was dispatched")
else:
print(f"❌ Expected {expected_event} event was NOT dispatched")
else:
print(f"⚠️ Unknown request type {request_type}, no specific event expected")
if __name__ == "__main__":
asyncio.run(test_binary_response())
# ---------------------------------------------------------------------------
# Reader/parser crash-safety verification tests
# ---------------------------------------------------------------------------
class _CapturingDispatcher:
"""Quiet dispatcher that records every dispatched event."""
def __init__(self):
self.events = []
async def dispatch(self, event):
self.events.append(event)
@pytest.mark.asyncio
async def test_handle_rx_malformed_frame_logged_and_swallowed(caplog):
"""Malformed frame must not propagate, must be logged with traceback."""
dispatcher = _CapturingDispatcher()
reader = MessageReader(dispatcher)
# 4-byte CHANNEL_MSG_RECV_V3 frame: type byte (0x11) + 1 SNR byte +
# 2 reserved bytes, but no channel_idx byte. The handler will raise
# IndexError on the next dbuf.read(1)[0] when the buffer is empty.
# The umbrella try/except must catch it, log the parse error, and
# return cleanly.
malformed = bytearray.fromhex("11100000")
with caplog.at_level(logging.ERROR, logger="meshcore"):
await reader.handle_rx(malformed) # must not raise
error_records = [r for r in caplog.records if "handle_rx parse error" in r.message]
assert error_records, (
f"Expected an error log containing 'handle_rx parse error'; "
f"got: {[r.message for r in caplog.records]}"
)
# Traceback should be present in the log message
assert "Traceback" in error_records[0].message, (
"Umbrella log message must include a traceback"
)
# No CHANNEL_MSG_RECV event should have been dispatched
assert not any(e.type == EventType.CHANNEL_MSG_RECV for e in dispatcher.events)
@pytest.mark.asyncio
async def test_battery_short_frame_omits_storage_fields():
"""Short BATTERY frame must not silently yield zero used_kb/total_kb."""
dispatcher = _CapturingDispatcher()
reader = MessageReader(dispatcher)
# 3-byte BATTERY frame: type 0x0c + 2 level bytes (no storage tail).
# Pre-fix the `len(data) > 3` gate would have let any frame >= 4 bytes
# through, producing a BATTERY event with bogus zero used_kb/total_kb
# because io.BytesIO.read() returns short data without raising.
# Post-fix (`len(data) >= 11`) the storage fields are skipped entirely.
short_battery = bytearray.fromhex("0c8000")
await reader.handle_rx(short_battery)
battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY]
assert len(battery_events) == 1, (
f"Expected exactly one BATTERY event, got {len(battery_events)}"
)
payload = battery_events[0].payload
assert payload["level"] == 0x0080, f"Unexpected level: {payload['level']}"
assert "used_kb" not in payload, (
"Short BATTERY frame must not include used_kb (would be a silent zero)"
)
assert "total_kb" not in payload, (
"Short BATTERY frame must not include total_kb (would be a silent zero)"
)
@pytest.mark.asyncio
async def test_battery_too_short_for_level(caplog):
"""BATTERY frame shorter than 3 bytes must be dropped entirely (Option B).
A 1-byte frame (just the packet-type byte 0x0c, no level bytes) would cause
dbuf.read(2) to return b"" and int.from_bytes(b"", ...) to silently yield 0.
The fix adds an early return with a debug log.
"""
dispatcher = _CapturingDispatcher()
reader = MessageReader(dispatcher)
# 1-byte BATTERY frame: only the type byte, no level payload.
too_short = bytearray.fromhex("0c")
with caplog.at_level(logging.DEBUG, logger="meshcore"):
await reader.handle_rx(too_short)
battery_events = [e for e in dispatcher.events if e.type == EventType.BATTERY]
assert len(battery_events) == 0, (
"BATTERY frame shorter than 3 bytes must not dispatch an event"
)
debug_records = [
r for r in caplog.records if "BATTERY frame too short" in r.message
]
assert debug_records, "Expected a debug log about the short BATTERY frame"
@pytest.mark.asyncio
async def test_status_response_short_frame_skipped(caplog):
"""Short STATUS_RESPONSE push frame must be skipped, not parsed with bogus zeros."""
dispatcher = _CapturingDispatcher()
reader = MessageReader(dispatcher)
# 30-byte STATUS_RESPONSE push frame, well below the 60-byte minimum.
# First byte is the type (0x87 = PacketType.STATUS_RESPONSE), the rest
# is arbitrary filler. parse_status with offset=8 reads up through
# data[56:60], so anything < 60 bytes would yield short reads and
# silent zero values pre-fix.
short_status = bytearray([0x87] + [0xAA] * 29)
assert len(short_status) == 30
with caplog.at_level(logging.DEBUG, logger="meshcore"):
await reader.handle_rx(short_status)
status_events = [e for e in dispatcher.events if e.type == EventType.STATUS_RESPONSE]
assert len(status_events) == 0, (
"Short STATUS_RESPONSE push frame must not dispatch a parsed event"
)
assert any(
"STATUS_RESPONSE push frame too short" in r.message for r in caplog.records
), "Expected a debug log line for short STATUS_RESPONSE frames"
@pytest.mark.asyncio
async def test_parse_packet_payload_txt_type_decodes_high_bits():
"""txt_type must decode the high 6 bits of byte 4, not always be 0."""
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
from meshcore.meshcore_parser import MeshcorePacketParser
parser = MeshcorePacketParser()
parser.decrypt_channels = True
# Set up a synthetic channel with a known 16-byte AES key. Direct dict
# assignment matches how the parser stores channels (newChannel is async
# and serves the same purpose).
channel_secret = b"\x01" * 16
channel_hash_byte = 0xAB
parser.channels[0] = {
"channel_idx": 0,
"channel_name": "test",
"channel_hash": "ab",
"channel_secret": channel_secret,
}
# 16-byte plaintext (one AES block):
# bytes 0-3 = sender_timestamp (little-endian)
# byte 4 = (txt_type << 2) | attempt
# bytes 5-15 = message + null padding
# Pick txt_type=5, attempt=1 → byte 4 = (5 << 2) | 1 = 0x15.
# Pre-fix uncrypted[4:4] is empty so txt_type would be 0;
# post-fix uncrypted[4:5] yields 0x15 >> 2 = 5.
plaintext = b"\x00\x00\x00\x00\x15hello\x00\x00\x00\x00\x00\x00"
assert len(plaintext) == 16
encrypted = AES.new(channel_secret, AES.MODE_ECB).encrypt(plaintext)
# cipher_mac = first 2 bytes of HMAC-SHA256(channel_secret, encrypted)
h = HMAC.new(channel_secret, digestmod=SHA256)
h.update(encrypted)
cipher_mac = h.digest()[:2]
# pkt_payload layout: 1-byte chan_hash + 2-byte cipher_mac + ciphertext
pkt_payload = bytes([channel_hash_byte]) + cipher_mac + encrypted
# parsePacketPayload expects the full payload buffer:
# header byte (route_type=1 DIRECT, payload_type=5 channel, ver=0)
# path_byte (path_len=0, path_hash_size=1) → 0x00
# pkt_payload
header = 0x15 # route_type=1, payload_type=5, payload_ver=0
path_byte = 0x00
payload = bytes([header, path_byte]) + pkt_payload
log_data = await parser.parsePacketPayload(payload, log_data={})
assert log_data["payload_type"] == 0x05
assert "txt_type" in log_data, (
f"txt_type missing from log_data — channel decrypt path was not reached. "
f"log_data keys: {list(log_data.keys())}"
)
assert log_data["txt_type"] == 5, (
f"Expected txt_type=5, got {log_data['txt_type']}"
)
assert log_data["attempt"] == 1, (
f"Expected attempt=1, got {log_data['attempt']}"
)