From df388e494e46bfc11c8a94b2a37d3102a674ba8d Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sun, 12 Apr 2026 04:14:01 -0700 Subject: [PATCH 1/6] =?UTF-8?q?G6:=20N02+R04=20=E2=80=94=20add=20CONTACTS?= =?UTF-8?q?=5FFULL=20and=20GET=5FSTATS=20enum=20entries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: PacketType was missing CONTACTS_FULL (0x90), emitted by MyMesh::onContactsFull(). CommandType was missing GET_STATS (56), used by get_stats_core/radio/packets but referenced only as literal b"\\x38". Adding both enum entries prepares for handler and wrapper implementations in subsequent commits. Refs: Forensics report findings N02, R04 --- src/meshcore/packets.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py index b5cef23..ad27fce 100644 --- a/src/meshcore/packets.py +++ b/src/meshcore/packets.py @@ -71,6 +71,7 @@ class CommandType(Enum): SET_AUTOADD_CONFIG = 58 GET_AUTOADD_CONFIG = 59 GET_ALLOWED_REPEAT_FREQ = 60 + GET_STATS = 56 # R04: CMD_GET_STATS — used by get_stats_core/radio/packets SET_PATH_HASH_MODE = 61 # Packet prefixes for the protocol @@ -120,3 +121,6 @@ class PacketType(Enum): PATH_DISCOVERY_RESPONSE = 0x8D CONTROL_DATA = 0x8E CONTACT_DELETED = 0x8F + CONTACTS_FULL = 0x90 # N02: MyMesh::onContactsFull() — 1-byte push, no payload + # Note: 0x90 == ControlType.NODE_DISCOVER_RESP in a different namespace. + # Not a literal conflict (PacketType vs ControlType), but a maintenance hazard. From 8400995600ad4858c964a7585d31750d7585ae2c Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sun, 12 Apr 2026 04:14:11 -0700 Subject: [PATCH 2/6] =?UTF-8?q?G6:=20N01+N02+N03=20=E2=80=94=20add=20reade?= =?UTF-8?q?r=20branches=20for=20CONTACT=5FDELETED,=20CONTACTS=5FFULL,=20TU?= =?UTF-8?q?NING=5FPARAMS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: Three firmware push/response codes had no SDK handler — frames fell through to the "Unhandled" debug log. CONTACT_DELETED (0x8F) carries a 32-byte pubkey from onContactOverwrite(); CONTACTS_FULL (0x90) is a 1-byte push from onContactsFull(); TUNING_PARAMS (23) is the 9-byte response to CMD_GET_TUNING_PARAMS carrying rx_delay and airtime_factor. All three now dispatch typed events. Short frames are guarded. Refs: Forensics report findings N01, N02, N03 --- src/meshcore/events.py | 3 +++ src/meshcore/reader.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/src/meshcore/events.py b/src/meshcore/events.py index f8a7521..85821ef 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -49,6 +49,9 @@ class EventType(Enum): PATH_RESPONSE = "path_response" PRIVATE_KEY = "private_key" DISABLED = "disabled" + CONTACT_DELETED = "contact_deleted" + CONTACTS_FULL = "contacts_full" + TUNING_PARAMS = "tuning_params" CONTROL_DATA = "control_data" DISCOVER_RESPONSE = "discover_response" NEIGHBOURS_RESPONSE = "neighbours_response" diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 802004a..bfadb68 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -916,6 +916,37 @@ class MessageReader: Event(EventType.DISCOVER_RESPONSE, ndr, attributes) ) + elif packet_type_value == PacketType.CONTACT_DELETED.value: + # N01: PUSH_CODE_CONTACT_DELETED (0x8F) — 1-byte code + 32-byte pubkey + # Emitted by MyMesh::onContactOverwrite() (MyMesh.cpp:325-334) + if len(data) < 33: + logger.debug("CONTACT_DELETED frame too short (%d bytes, need 33)", len(data)) + return + pubkey = data[1:33].hex() + await self.dispatcher.dispatch( + Event(EventType.CONTACT_DELETED, {"pubkey": pubkey}, {"pubkey": pubkey}) + ) + + elif packet_type_value == PacketType.CONTACTS_FULL.value: + # N02: PUSH_CODE_CONTACTS_FULL (0x90) — 1-byte push, no payload + # Emitted by MyMesh::onContactsFull() (MyMesh.cpp:336) + await self.dispatcher.dispatch(Event(EventType.CONTACTS_FULL, {})) + + elif packet_type_value == PacketType.TUNING_PARAMS.value: + # N03: RESP_CODE_TUNING_PARAMS (23) — response to CMD_GET_TUNING_PARAMS (43) + # Format: 1-byte code + 4-byte rx_delay (LE) + 4-byte airtime_factor (LE) = 9 bytes + # Emitted by MyMesh.cpp:1307-1313 + if len(data) < 9: + logger.debug("TUNING_PARAMS frame too short (%d bytes, need 9)", len(data)) + await self.dispatcher.dispatch( + Event(EventType.ERROR, {"reason": "invalid_frame_length"}) + ) + return + rx_delay = int.from_bytes(data[1:5], byteorder="little") + airtime_factor = int.from_bytes(data[5:9], byteorder="little") + res = {"rx_delay": rx_delay, "airtime_factor": airtime_factor} + await self.dispatcher.dispatch(Event(EventType.TUNING_PARAMS, res)) + else: logger.debug(f"Unhandled data received {data}") logger.debug(f"Unhandled packet type: {packet_type_value}") From 1f319159b617103124c4a837a59d60380ccac277 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sun, 12 Apr 2026 04:14:20 -0700 Subject: [PATCH 3/6] =?UTF-8?q?G6:=20N05=20=E2=80=94=20pad=20send=5Ftrace(?= =?UTF-8?q?)=20to=2011=20bytes=20when=20path=20is=20empty?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: Firmware requires strict len > 10 (MyMesh.cpp:1620). When path is empty, send_trace() builds exactly 10 bytes (cmd+tag+auth+flags), which is silently rejected. Appending one zero byte when the packet is <= 10 bytes satisfies the firmware gate without changing the semantic content. Refs: Forensics report finding N05 --- src/meshcore/commands/messaging.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/meshcore/commands/messaging.py b/src/meshcore/commands/messaging.py index b266ae0..5d83690 100644 --- a/src/meshcore/commands/messaging.py +++ b/src/meshcore/commands/messaging.py @@ -291,12 +291,34 @@ class MessagingCommands(CommandHandlerBase): cmd_data.append(flags) cmd_data.extend(path_bytes) + # N05: Firmware requires strict len > 10 (MyMesh.cpp:1620). + # When path is empty, cmd(1)+tag(4)+auth(4)+flags(1) = 10 bytes exactly, + # which is silently rejected. Pad with one zero byte to reach 11. + if len(cmd_data) <= 10: + cmd_data.append(0x00) + logger.debug( f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path_bytes.hex()}" ) return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR]) + async def send_raw_data(self, payload: bytes) -> Event: + """N09: Send raw data via CMD_SEND_RAW_DATA (25). + + Sends an arbitrary payload through the mesh network. + + Args: + payload: Raw bytes to send. + + Returns: + Event with MSG_SENT or ERROR. + """ + if not isinstance(payload, (bytes, bytearray)): + raise TypeError("payload must be bytes-like") + data = b"\x19" + bytes(payload) + return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) + async def set_flood_scope(self, scope): if scope is None: logger.debug(f"Resetting scope") From 6f1b48cc50bf03163177246ce73ddc1755065397 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sun, 12 Apr 2026 04:14:31 -0700 Subject: [PATCH 4/6] =?UTF-8?q?G6:=20N09=20=E2=80=94=20add=20command=20wra?= =?UTF-8?q?ppers=20for=20orphaned=20CommandType=20entries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: Five CommandType enum entries had no user-facing SDK method: SEND_RAW_DATA (25), HAS_CONNECTION (28), GET_CONTACT_BY_KEY (30), GET_TUNING_PARAMS (43), FACTORY_RESET (51). Added send_raw_data() in MessagingCommands, has_connection()/get_tuning()/request_factory_reset()/ confirm_factory_reset() in DeviceCommands, get_contact_by_key() in ContactCommands. FACTORY_RESET uses a two-step token pattern as a Python-side safety measure against accidental invocation. Refs: Forensics report finding N09 (also N03 for get_tuning) --- src/meshcore/commands/contact.py | 18 +++++++ src/meshcore/commands/device.py | 82 +++++++++++++++++++++++++++++--- 2 files changed, 94 insertions(+), 6 deletions(-) diff --git a/src/meshcore/commands/contact.py b/src/meshcore/commands/contact.py index ac723ea..8a89101 100644 --- a/src/meshcore/commands/contact.py +++ b/src/meshcore/commands/contact.py @@ -185,6 +185,24 @@ class ContactCommands(CommandHandlerBase): data = b"\x3B" return await self.send(data, [EventType.AUTOADD_CONFIG, EventType.ERROR]) + async def get_contact_by_key(self, pubkey: bytes) -> Event: + """N09: Retrieve a single contact by its public key (CMD 30). + + Args: + pubkey: 32-byte public key of the contact. + + Returns: + Event with the contact data (same format as CONTACT/NEXT_CONTACT), + or ERROR if not found. + """ + if not isinstance(pubkey, (bytes, bytearray)): + raise TypeError("pubkey must be bytes-like") + # Truncate or pad to 32 bytes + key_bytes = bytes(pubkey[:32]) + logger.debug(f"Getting contact by key: {key_bytes.hex()}") + data = b"\x1e" + key_bytes + return await self.send(data, [EventType.NEXT_CONTACT, EventType.ERROR]) + async def get_advert_path(self, key: DestinationType) -> Event: key_bytes = _validate_destination(key, prefix_length=32) logger.debug(f"getting advert path for: {key} {key_bytes.hex()}") diff --git a/src/meshcore/commands/device.py b/src/meshcore/commands/device.py index af986db..b6bba51 100644 --- a/src/meshcore/commands/device.py +++ b/src/meshcore/commands/device.py @@ -4,6 +4,7 @@ from hashlib import sha256 from typing import Optional from ..events import Event, EventType +from ..packets import CommandType from .base import CommandHandlerBase, DestinationType, _validate_destination logger = logging.getLogger("meshcore") @@ -273,20 +274,89 @@ class DeviceCommands(CommandHandlerBase): return await self.sign_finish(timeout=timeout, data_size=len(data)) + async def has_connection(self) -> Event: + """N09: Check if the device has an active connection (CMD 28). + + Returns: + Event with a 1-byte response indicating connection status, + or ERROR. + """ + logger.debug("Checking device connection status") + return await self.send(b"\x1c", [EventType.OK, EventType.ERROR]) + + async def get_tuning(self) -> Event: + """N03/N09: Request current tuning parameters (CMD_GET_TUNING_PARAMS = 43). + + Firmware responds with RESP_CODE_TUNING_PARAMS (23): 9 bytes containing + rx_delay (4 bytes LE) and airtime_factor (4 bytes LE). + + Returns: + Event of type TUNING_PARAMS with rx_delay and airtime_factor, + or ERROR. + """ + logger.debug("Getting tuning parameters") + return await self.send(b"\x2b", [EventType.TUNING_PARAMS, EventType.ERROR]) + + async def request_factory_reset(self) -> str: + """N09: Request a factory reset token (step 1 of 2). + + This method returns a confirmation token string. Pass it to + ``confirm_factory_reset(token)`` to actually execute the reset. + The two-step pattern is a Python-side safety measure; the firmware + itself has no token verification. + + Returns: + A confirmation token string to pass to confirm_factory_reset(). + """ + import secrets + token = secrets.token_hex(8) + logger.warning( + "Factory reset requested. Call confirm_factory_reset('%s') to proceed. " + "This will ERASE ALL DATA on the device.", token + ) + # Store the token on the instance for validation + self._factory_reset_token = token + return token + + async def confirm_factory_reset(self, token: str) -> Event: + """N09: Execute factory reset after token confirmation (step 2 of 2). + + Args: + token: The token returned by request_factory_reset(). + + Returns: + Event with OK or ERROR. + + Raises: + ValueError: If the token does not match. + """ + expected = getattr(self, "_factory_reset_token", None) + if expected is None or token != expected: + raise ValueError( + "Invalid or expired factory reset token. " + "Call request_factory_reset() first." + ) + self._factory_reset_token = None # Consume the token + logger.warning("Executing factory reset — all device data will be erased") + return await self.send(b"\x33", [EventType.OK, EventType.ERROR]) + async def get_stats_core(self) -> Event: logger.debug("Getting core statistics") - # CMD_GET_STATS (56) + STATS_TYPE_CORE (0) - return await self.send(b"\x38\x00", [EventType.STATS_CORE, EventType.ERROR]) + # R04: Use CommandType enum instead of literal bytes + cmd = bytes([CommandType.GET_STATS.value, 0x00]) # GET_STATS + STATS_TYPE_CORE + return await self.send(cmd, [EventType.STATS_CORE, EventType.ERROR]) async def get_stats_radio(self) -> Event: logger.debug("Getting radio statistics") - # CMD_GET_STATS (56) + STATS_TYPE_RADIO (1) - return await self.send(b"\x38\x01", [EventType.STATS_RADIO, EventType.ERROR]) + # R04: Use CommandType enum instead of literal bytes + cmd = bytes([CommandType.GET_STATS.value, 0x01]) # GET_STATS + STATS_TYPE_RADIO + return await self.send(cmd, [EventType.STATS_RADIO, EventType.ERROR]) async def get_stats_packets(self) -> Event: logger.debug("Getting packet statistics") - # CMD_GET_STATS (56) + STATS_TYPE_PACKETS (2) - return await self.send(b"\x38\x02", [EventType.STATS_PACKETS, EventType.ERROR]) + # R04: Use CommandType enum instead of literal bytes + cmd = bytes([CommandType.GET_STATS.value, 0x02]) # GET_STATS + STATS_TYPE_PACKETS + return await self.send(cmd, [EventType.STATS_PACKETS, EventType.ERROR]) async def get_allowed_repeat_freq(self) -> Event: logger.debug("Getting allowed repeat freqs") From baa5494758c094fc8c4c889c37ef88caccb77301 Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sun, 12 Apr 2026 04:14:48 -0700 Subject: [PATCH 5/6] G6: add verification tests for N01, N02, N03, N05, N09, R04 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Why: 16 tests covering all G6 findings — reader dispatch for CONTACT_DELETED/CONTACTS_FULL/TUNING_PARAMS (including short-frame guards), send_trace() padding behavior, all 5 new command wrappers (send_raw_data, has_connection, get_tuning, get_contact_by_key, factory_reset two-step), and GET_STATS enum existence + usage. Refs: Forensics report findings N01, N02, N03, N05, N09, R04 --- tests/unit/test_g6_protocol_surface_gaps.py | 364 ++++++++++++++++++++ 1 file changed, 364 insertions(+) create mode 100644 tests/unit/test_g6_protocol_surface_gaps.py diff --git a/tests/unit/test_g6_protocol_surface_gaps.py b/tests/unit/test_g6_protocol_surface_gaps.py new file mode 100644 index 0000000..fffac40 --- /dev/null +++ b/tests/unit/test_g6_protocol_surface_gaps.py @@ -0,0 +1,364 @@ +"""Verification tests for G6 — Protocol surface gaps (N01, N02, N03, N05, N09, R04). + +Each test constructs a mock firmware frame and verifies the SDK dispatches +the correct EventType with the expected payload fields. +""" + +import asyncio +import struct +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from meshcore.events import Event, EventType, EventDispatcher +from meshcore.reader import MessageReader +from meshcore.packets import PacketType, CommandType + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_reader(): + """Create a MessageReader with a mock dispatcher that records dispatched events.""" + dispatcher = MagicMock(spec=EventDispatcher) + dispatched = [] + + async def _capture(event): + dispatched.append(event) + + dispatcher.dispatch = AsyncMock(side_effect=_capture) + reader = MessageReader(dispatcher) + return reader, dispatched + + +# --------------------------------------------------------------------------- +# N01 — CONTACT_DELETED handler +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_contact_deleted_dispatches_event(): + """N01: A 33-byte CONTACT_DELETED frame dispatches EventType.CONTACT_DELETED.""" + reader, dispatched = _make_reader() + pubkey = bytes(range(32)) + frame = bytes([PacketType.CONTACT_DELETED.value]) + pubkey + assert len(frame) == 33 + + await reader.handle_rx(bytearray(frame)) + + assert len(dispatched) == 1 + evt = dispatched[0] + assert evt.type == EventType.CONTACT_DELETED + assert evt.payload["pubkey"] == pubkey.hex() + assert evt.attributes["pubkey"] == pubkey.hex() + + +@pytest.mark.asyncio +async def test_contact_deleted_short_frame_ignored(): + """N01: A CONTACT_DELETED frame shorter than 33 bytes is silently dropped.""" + reader, dispatched = _make_reader() + # Only 10 bytes — too short + frame = bytes([PacketType.CONTACT_DELETED.value]) + b"\x00" * 9 + + await reader.handle_rx(bytearray(frame)) + + assert len(dispatched) == 0 + + +# --------------------------------------------------------------------------- +# N02 — CONTACTS_FULL handler + enum entry +# --------------------------------------------------------------------------- + +def test_contacts_full_enum_exists(): + """N02: PacketType.CONTACTS_FULL == 0x90.""" + assert PacketType.CONTACTS_FULL.value == 0x90 + + +@pytest.mark.asyncio +async def test_contacts_full_dispatches_event(): + """N02: A 1-byte CONTACTS_FULL push dispatches EventType.CONTACTS_FULL.""" + reader, dispatched = _make_reader() + frame = bytes([PacketType.CONTACTS_FULL.value]) + + await reader.handle_rx(bytearray(frame)) + + assert len(dispatched) == 1 + evt = dispatched[0] + assert evt.type == EventType.CONTACTS_FULL + assert evt.payload == {} + + +# --------------------------------------------------------------------------- +# N03 — TUNING_PARAMS handler +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_tuning_params_dispatches_event(): + """N03: A 9-byte TUNING_PARAMS frame dispatches with rx_delay and airtime_factor.""" + reader, dispatched = _make_reader() + rx_delay = 500 + airtime_factor = 200 + frame = ( + bytes([PacketType.TUNING_PARAMS.value]) + + rx_delay.to_bytes(4, "little") + + airtime_factor.to_bytes(4, "little") + ) + assert len(frame) == 9 + + await reader.handle_rx(bytearray(frame)) + + assert len(dispatched) == 1 + evt = dispatched[0] + assert evt.type == EventType.TUNING_PARAMS + assert evt.payload["rx_delay"] == 500 + assert evt.payload["airtime_factor"] == 200 + + +@pytest.mark.asyncio +async def test_tuning_params_short_frame_dispatches_error(): + """N03: A TUNING_PARAMS frame shorter than 9 bytes dispatches ERROR.""" + reader, dispatched = _make_reader() + # Only 5 bytes — too short + frame = bytes([PacketType.TUNING_PARAMS.value]) + b"\x01\x00\x00\x00" + + await reader.handle_rx(bytearray(frame)) + + assert len(dispatched) == 1 + evt = dispatched[0] + assert evt.type == EventType.ERROR + assert evt.payload["reason"] == "invalid_frame_length" + + +# --------------------------------------------------------------------------- +# N05 — send_trace() one-byte pad +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_send_trace_empty_path_pads_to_11_bytes(): + """N05: send_trace() with no path produces an 11-byte packet (not 10).""" + from meshcore.commands.messaging import MessagingCommands + + cmd = MessagingCommands.__new__(MessagingCommands) + + captured_data = None + + async def mock_send(data, expected_events, timeout=None): + nonlocal captured_data + captured_data = bytes(data) + return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000}) + + cmd.send = mock_send + + await cmd.send_trace(auth_code=0, tag=1, flags=0, path=None) + + assert captured_data is not None + # cmd(1) + tag(4) + auth(4) + flags(1) + pad(1) = 11 + assert len(captured_data) == 11 + assert captured_data[-1] == 0x00 # The pad byte + + +@pytest.mark.asyncio +async def test_send_trace_with_path_no_padding(): + """N05: send_trace() with a non-empty path does NOT add padding.""" + from meshcore.commands.messaging import MessagingCommands + + cmd = MessagingCommands.__new__(MessagingCommands) + + captured_data = None + + async def mock_send(data, expected_events, timeout=None): + nonlocal captured_data + captured_data = bytes(data) + return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000}) + + cmd.send = mock_send + + # 2-byte path hash (flags=1 means hash_len=2) + await cmd.send_trace(auth_code=0, tag=1, flags=1, path=b"\xAA\xBB") + + assert captured_data is not None + # cmd(1) + tag(4) + auth(4) + flags(1) + path(2) = 12 — no pad needed + assert len(captured_data) == 12 + + +# --------------------------------------------------------------------------- +# N09 — Command wrapper: send_raw_data +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_send_raw_data_wrapper(): + """N09: send_raw_data sends CMD 0x19 + payload.""" + from meshcore.commands.messaging import MessagingCommands + + cmd = MessagingCommands.__new__(MessagingCommands) + + captured_data = None + + async def mock_send(data, expected_events, timeout=None): + nonlocal captured_data + captured_data = bytes(data) + return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000}) + + cmd.send = mock_send + + await cmd.send_raw_data(b"\xDE\xAD") + + assert captured_data is not None + assert captured_data[0] == 0x19 # CMD_SEND_RAW_DATA + assert captured_data[1:] == b"\xDE\xAD" + + +# --------------------------------------------------------------------------- +# N09 — Command wrapper: has_connection +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_has_connection_wrapper(): + """N09: has_connection sends CMD 0x1c.""" + from meshcore.commands.device import DeviceCommands + + cmd = DeviceCommands.__new__(DeviceCommands) + + captured_data = None + + async def mock_send(data, expected_events, timeout=None): + nonlocal captured_data + captured_data = bytes(data) + return Event(EventType.OK, {"value": 1}) + + cmd.send = mock_send + + await cmd.has_connection() + + assert captured_data is not None + assert captured_data == b"\x1c" + + +# --------------------------------------------------------------------------- +# N09 — Command wrapper: get_tuning +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_get_tuning_wrapper(): + """N09/N03: get_tuning sends CMD 0x2b (GET_TUNING_PARAMS = 43).""" + from meshcore.commands.device import DeviceCommands + + cmd = DeviceCommands.__new__(DeviceCommands) + + captured_data = None + + async def mock_send(data, expected_events, timeout=None): + nonlocal captured_data + captured_data = bytes(data) + return Event(EventType.TUNING_PARAMS, {"rx_delay": 500, "airtime_factor": 200}) + + cmd.send = mock_send + + result = await cmd.get_tuning() + + assert captured_data == b"\x2b" + assert result.type == EventType.TUNING_PARAMS + + +# --------------------------------------------------------------------------- +# N09 — Command wrapper: get_contact_by_key +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_get_contact_by_key_wrapper(): + """N09: get_contact_by_key sends CMD 0x1e + 32-byte pubkey.""" + from meshcore.commands.contact import ContactCommands + + cmd = ContactCommands.__new__(ContactCommands) + + captured_data = None + + async def mock_send(data, expected_events, timeout=None): + nonlocal captured_data + captured_data = bytes(data) + return Event(EventType.NEXT_CONTACT, {"public_key": "ab" * 32}) + + cmd.send = mock_send + + pubkey = bytes(range(32)) + await cmd.get_contact_by_key(pubkey) + + assert captured_data is not None + assert captured_data[0] == 0x1E # CMD_GET_CONTACT_BY_KEY + assert captured_data[1:] == pubkey + + +# --------------------------------------------------------------------------- +# N09 — Command wrapper: factory_reset (two-step) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_factory_reset_two_step(): + """N09: factory_reset requires a token from request_factory_reset.""" + from meshcore.commands.device import DeviceCommands + + cmd = DeviceCommands.__new__(DeviceCommands) + + captured_data = None + + async def mock_send(data, expected_events, timeout=None): + nonlocal captured_data + captured_data = bytes(data) + return Event(EventType.OK, {}) + + cmd.send = mock_send + + # Step 1: request token + token = await cmd.request_factory_reset() + assert isinstance(token, str) + assert len(token) == 16 # hex-encoded 8 bytes + + # Step 2: confirm with wrong token fails + with pytest.raises(ValueError, match="Invalid or expired"): + await cmd.confirm_factory_reset("wrong_token") + + # Step 2: confirm with correct token succeeds + await cmd.confirm_factory_reset(token) + assert captured_data == b"\x33" # CMD_FACTORY_RESET + + +@pytest.mark.asyncio +async def test_factory_reset_without_request_fails(): + """N09: confirm_factory_reset without request_factory_reset raises ValueError.""" + from meshcore.commands.device import DeviceCommands + + cmd = DeviceCommands.__new__(DeviceCommands) + + with pytest.raises(ValueError, match="Invalid or expired"): + await cmd.confirm_factory_reset("any_token") + + +# --------------------------------------------------------------------------- +# R04 — GET_STATS enum entry +# --------------------------------------------------------------------------- + +def test_get_stats_enum_exists(): + """R04: CommandType.GET_STATS == 56.""" + assert CommandType.GET_STATS.value == 56 + + +@pytest.mark.asyncio +async def test_get_stats_core_uses_enum(): + """R04: get_stats_core sends CommandType.GET_STATS.value (0x38) + 0x00.""" + from meshcore.commands.device import DeviceCommands + + cmd = DeviceCommands.__new__(DeviceCommands) + + captured_data = None + + async def mock_send(data, expected_events, timeout=None): + nonlocal captured_data + captured_data = bytes(data) + return Event(EventType.STATS_CORE, {}) + + cmd.send = mock_send + + await cmd.get_stats_core() + + assert captured_data is not None + assert captured_data[0] == CommandType.GET_STATS.value # 0x38 = 56 + assert captured_data[1] == 0x00 From aa7f584ca0de819110665402cfdd36cba7a6603b Mon Sep 17 00:00:00 2001 From: Matthew Wolter Date: Sun, 12 Apr 2026 07:58:07 -0700 Subject: [PATCH 6/6] Remove internal references from protocol surface gaps tests Rename test_g6_protocol_surface_gaps.py to test_protocol_surface_gaps.py. Strip G6 from module docstring, and finding IDs (N01, N02, N03, N05, N09, R04) from docstrings and section comments. --- ..._gaps.py => test_protocol_surface_gaps.py} | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) rename tests/unit/{test_g6_protocol_surface_gaps.py => test_protocol_surface_gaps.py} (86%) diff --git a/tests/unit/test_g6_protocol_surface_gaps.py b/tests/unit/test_protocol_surface_gaps.py similarity index 86% rename from tests/unit/test_g6_protocol_surface_gaps.py rename to tests/unit/test_protocol_surface_gaps.py index fffac40..a490a81 100644 --- a/tests/unit/test_g6_protocol_surface_gaps.py +++ b/tests/unit/test_protocol_surface_gaps.py @@ -1,4 +1,4 @@ -"""Verification tests for G6 — Protocol surface gaps (N01, N02, N03, N05, N09, R04). +"""Verification tests for protocol surface gaps. Each test constructs a mock firmware frame and verifies the SDK dispatches the correct EventType with the expected payload fields. @@ -32,12 +32,12 @@ def _make_reader(): # --------------------------------------------------------------------------- -# N01 — CONTACT_DELETED handler +# CONTACT_DELETED handler # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_contact_deleted_dispatches_event(): - """N01: A 33-byte CONTACT_DELETED frame dispatches EventType.CONTACT_DELETED.""" + """A 33-byte CONTACT_DELETED frame dispatches EventType.CONTACT_DELETED.""" reader, dispatched = _make_reader() pubkey = bytes(range(32)) frame = bytes([PacketType.CONTACT_DELETED.value]) + pubkey @@ -54,7 +54,7 @@ async def test_contact_deleted_dispatches_event(): @pytest.mark.asyncio async def test_contact_deleted_short_frame_ignored(): - """N01: A CONTACT_DELETED frame shorter than 33 bytes is silently dropped.""" + """A CONTACT_DELETED frame shorter than 33 bytes is silently dropped.""" reader, dispatched = _make_reader() # Only 10 bytes — too short frame = bytes([PacketType.CONTACT_DELETED.value]) + b"\x00" * 9 @@ -65,17 +65,17 @@ async def test_contact_deleted_short_frame_ignored(): # --------------------------------------------------------------------------- -# N02 — CONTACTS_FULL handler + enum entry +# CONTACTS_FULL handler + enum entry # --------------------------------------------------------------------------- def test_contacts_full_enum_exists(): - """N02: PacketType.CONTACTS_FULL == 0x90.""" + """PacketType.CONTACTS_FULL == 0x90.""" assert PacketType.CONTACTS_FULL.value == 0x90 @pytest.mark.asyncio async def test_contacts_full_dispatches_event(): - """N02: A 1-byte CONTACTS_FULL push dispatches EventType.CONTACTS_FULL.""" + """A 1-byte CONTACTS_FULL push dispatches EventType.CONTACTS_FULL.""" reader, dispatched = _make_reader() frame = bytes([PacketType.CONTACTS_FULL.value]) @@ -88,12 +88,12 @@ async def test_contacts_full_dispatches_event(): # --------------------------------------------------------------------------- -# N03 — TUNING_PARAMS handler +# TUNING_PARAMS handler # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_tuning_params_dispatches_event(): - """N03: A 9-byte TUNING_PARAMS frame dispatches with rx_delay and airtime_factor.""" + """A 9-byte TUNING_PARAMS frame dispatches with rx_delay and airtime_factor.""" reader, dispatched = _make_reader() rx_delay = 500 airtime_factor = 200 @@ -115,7 +115,7 @@ async def test_tuning_params_dispatches_event(): @pytest.mark.asyncio async def test_tuning_params_short_frame_dispatches_error(): - """N03: A TUNING_PARAMS frame shorter than 9 bytes dispatches ERROR.""" + """A TUNING_PARAMS frame shorter than 9 bytes dispatches ERROR.""" reader, dispatched = _make_reader() # Only 5 bytes — too short frame = bytes([PacketType.TUNING_PARAMS.value]) + b"\x01\x00\x00\x00" @@ -129,12 +129,12 @@ async def test_tuning_params_short_frame_dispatches_error(): # --------------------------------------------------------------------------- -# N05 — send_trace() one-byte pad +# send_trace() one-byte pad # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_send_trace_empty_path_pads_to_11_bytes(): - """N05: send_trace() with no path produces an 11-byte packet (not 10).""" + """send_trace() with no path produces an 11-byte packet (not 10).""" from meshcore.commands.messaging import MessagingCommands cmd = MessagingCommands.__new__(MessagingCommands) @@ -158,7 +158,7 @@ async def test_send_trace_empty_path_pads_to_11_bytes(): @pytest.mark.asyncio async def test_send_trace_with_path_no_padding(): - """N05: send_trace() with a non-empty path does NOT add padding.""" + """send_trace() with a non-empty path does NOT add padding.""" from meshcore.commands.messaging import MessagingCommands cmd = MessagingCommands.__new__(MessagingCommands) @@ -181,12 +181,12 @@ async def test_send_trace_with_path_no_padding(): # --------------------------------------------------------------------------- -# N09 — Command wrapper: send_raw_data +# Command wrapper: send_raw_data # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_send_raw_data_wrapper(): - """N09: send_raw_data sends CMD 0x19 + payload.""" + """send_raw_data sends CMD 0x19 + payload.""" from meshcore.commands.messaging import MessagingCommands cmd = MessagingCommands.__new__(MessagingCommands) @@ -208,12 +208,12 @@ async def test_send_raw_data_wrapper(): # --------------------------------------------------------------------------- -# N09 — Command wrapper: has_connection +# Command wrapper: has_connection # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_has_connection_wrapper(): - """N09: has_connection sends CMD 0x1c.""" + """has_connection sends CMD 0x1c.""" from meshcore.commands.device import DeviceCommands cmd = DeviceCommands.__new__(DeviceCommands) @@ -234,12 +234,12 @@ async def test_has_connection_wrapper(): # --------------------------------------------------------------------------- -# N09 — Command wrapper: get_tuning +# Command wrapper: get_tuning # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_tuning_wrapper(): - """N09/N03: get_tuning sends CMD 0x2b (GET_TUNING_PARAMS = 43).""" + """get_tuning sends CMD 0x2b (GET_TUNING_PARAMS = 43).""" from meshcore.commands.device import DeviceCommands cmd = DeviceCommands.__new__(DeviceCommands) @@ -260,12 +260,12 @@ async def test_get_tuning_wrapper(): # --------------------------------------------------------------------------- -# N09 — Command wrapper: get_contact_by_key +# Command wrapper: get_contact_by_key # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_contact_by_key_wrapper(): - """N09: get_contact_by_key sends CMD 0x1e + 32-byte pubkey.""" + """get_contact_by_key sends CMD 0x1e + 32-byte pubkey.""" from meshcore.commands.contact import ContactCommands cmd = ContactCommands.__new__(ContactCommands) @@ -288,12 +288,12 @@ async def test_get_contact_by_key_wrapper(): # --------------------------------------------------------------------------- -# N09 — Command wrapper: factory_reset (two-step) +# Command wrapper: factory_reset (two-step) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_factory_reset_two_step(): - """N09: factory_reset requires a token from request_factory_reset.""" + """factory_reset requires a token from request_factory_reset.""" from meshcore.commands.device import DeviceCommands cmd = DeviceCommands.__new__(DeviceCommands) @@ -323,7 +323,7 @@ async def test_factory_reset_two_step(): @pytest.mark.asyncio async def test_factory_reset_without_request_fails(): - """N09: confirm_factory_reset without request_factory_reset raises ValueError.""" + """confirm_factory_reset without request_factory_reset raises ValueError.""" from meshcore.commands.device import DeviceCommands cmd = DeviceCommands.__new__(DeviceCommands) @@ -333,17 +333,17 @@ async def test_factory_reset_without_request_fails(): # --------------------------------------------------------------------------- -# R04 — GET_STATS enum entry +# GET_STATS enum entry # --------------------------------------------------------------------------- def test_get_stats_enum_exists(): - """R04: CommandType.GET_STATS == 56.""" + """CommandType.GET_STATS == 56.""" assert CommandType.GET_STATS.value == 56 @pytest.mark.asyncio async def test_get_stats_core_uses_enum(): - """R04: get_stats_core sends CommandType.GET_STATS.value (0x38) + 0x00.""" + """get_stats_core sends CommandType.GET_STATS.value (0x38) + 0x00.""" from meshcore.commands.device import DeviceCommands cmd = DeviceCommands.__new__(DeviceCommands)