This commit is contained in:
mwolter805 2026-04-18 05:19:11 -07:00 committed by GitHub
commit 1701c09d89
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 518 additions and 6 deletions

View file

@ -185,6 +185,24 @@ class ContactCommands(CommandHandlerBase):
data = b"\x3B"
return await self.send(data, [EventType.AUTOADD_CONFIG, EventType.ERROR])
async def get_contact_by_key(self, pubkey: bytes) -> Event:
"""N09: Retrieve a single contact by its public key (CMD 30).
Args:
pubkey: 32-byte public key of the contact.
Returns:
Event with the contact data (same format as CONTACT/NEXT_CONTACT),
or ERROR if not found.
"""
if not isinstance(pubkey, (bytes, bytearray)):
raise TypeError("pubkey must be bytes-like")
# Truncate or pad to 32 bytes
key_bytes = bytes(pubkey[:32])
logger.debug(f"Getting contact by key: {key_bytes.hex()}")
data = b"\x1e" + key_bytes
return await self.send(data, [EventType.NEXT_CONTACT, EventType.ERROR])
async def get_advert_path(self, key: DestinationType) -> Event:
key_bytes = _validate_destination(key, prefix_length=32)
logger.debug(f"getting advert path for: {key} {key_bytes.hex()}")

View file

@ -4,6 +4,7 @@ from hashlib import sha256
from typing import Optional
from ..events import Event, EventType
from ..packets import CommandType
from .base import CommandHandlerBase, DestinationType, _validate_destination
logger = logging.getLogger("meshcore")
@ -273,20 +274,89 @@ class DeviceCommands(CommandHandlerBase):
return await self.sign_finish(timeout=timeout, data_size=len(data))
async def has_connection(self) -> Event:
"""N09: Check if the device has an active connection (CMD 28).
Returns:
Event with a 1-byte response indicating connection status,
or ERROR.
"""
logger.debug("Checking device connection status")
return await self.send(b"\x1c", [EventType.OK, EventType.ERROR])
async def get_tuning(self) -> Event:
"""N03/N09: Request current tuning parameters (CMD_GET_TUNING_PARAMS = 43).
Firmware responds with RESP_CODE_TUNING_PARAMS (23): 9 bytes containing
rx_delay (4 bytes LE) and airtime_factor (4 bytes LE).
Returns:
Event of type TUNING_PARAMS with rx_delay and airtime_factor,
or ERROR.
"""
logger.debug("Getting tuning parameters")
return await self.send(b"\x2b", [EventType.TUNING_PARAMS, EventType.ERROR])
async def request_factory_reset(self) -> str:
"""N09: Request a factory reset token (step 1 of 2).
This method returns a confirmation token string. Pass it to
``confirm_factory_reset(token)`` to actually execute the reset.
The two-step pattern is a Python-side safety measure; the firmware
itself has no token verification.
Returns:
A confirmation token string to pass to confirm_factory_reset().
"""
import secrets
token = secrets.token_hex(8)
logger.warning(
"Factory reset requested. Call confirm_factory_reset('%s') to proceed. "
"This will ERASE ALL DATA on the device.", token
)
# Store the token on the instance for validation
self._factory_reset_token = token
return token
async def confirm_factory_reset(self, token: str) -> Event:
"""N09: Execute factory reset after token confirmation (step 2 of 2).
Args:
token: The token returned by request_factory_reset().
Returns:
Event with OK or ERROR.
Raises:
ValueError: If the token does not match.
"""
expected = getattr(self, "_factory_reset_token", None)
if expected is None or token != expected:
raise ValueError(
"Invalid or expired factory reset token. "
"Call request_factory_reset() first."
)
self._factory_reset_token = None # Consume the token
logger.warning("Executing factory reset — all device data will be erased")
return await self.send(b"\x33", [EventType.OK, EventType.ERROR])
async def get_stats_core(self) -> Event:
logger.debug("Getting core statistics")
# CMD_GET_STATS (56) + STATS_TYPE_CORE (0)
return await self.send(b"\x38\x00", [EventType.STATS_CORE, EventType.ERROR])
# R04: Use CommandType enum instead of literal bytes
cmd = bytes([CommandType.GET_STATS.value, 0x00]) # GET_STATS + STATS_TYPE_CORE
return await self.send(cmd, [EventType.STATS_CORE, EventType.ERROR])
async def get_stats_radio(self) -> Event:
logger.debug("Getting radio statistics")
# CMD_GET_STATS (56) + STATS_TYPE_RADIO (1)
return await self.send(b"\x38\x01", [EventType.STATS_RADIO, EventType.ERROR])
# R04: Use CommandType enum instead of literal bytes
cmd = bytes([CommandType.GET_STATS.value, 0x01]) # GET_STATS + STATS_TYPE_RADIO
return await self.send(cmd, [EventType.STATS_RADIO, EventType.ERROR])
async def get_stats_packets(self) -> Event:
logger.debug("Getting packet statistics")
# CMD_GET_STATS (56) + STATS_TYPE_PACKETS (2)
return await self.send(b"\x38\x02", [EventType.STATS_PACKETS, EventType.ERROR])
# R04: Use CommandType enum instead of literal bytes
cmd = bytes([CommandType.GET_STATS.value, 0x02]) # GET_STATS + STATS_TYPE_PACKETS
return await self.send(cmd, [EventType.STATS_PACKETS, EventType.ERROR])
async def get_allowed_repeat_freq(self) -> Event:
logger.debug("Getting allowed repeat freqs")

View file

@ -291,12 +291,34 @@ class MessagingCommands(CommandHandlerBase):
cmd_data.append(flags)
cmd_data.extend(path_bytes)
# N05: Firmware requires strict len > 10 (MyMesh.cpp:1620).
# When path is empty, cmd(1)+tag(4)+auth(4)+flags(1) = 10 bytes exactly,
# which is silently rejected. Pad with one zero byte to reach 11.
if len(cmd_data) <= 10:
cmd_data.append(0x00)
logger.debug(
f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path_bytes.hex()}"
)
return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR])
async def send_raw_data(self, payload: bytes) -> Event:
"""N09: Send raw data via CMD_SEND_RAW_DATA (25).
Sends an arbitrary payload through the mesh network.
Args:
payload: Raw bytes to send.
Returns:
Event with MSG_SENT or ERROR.
"""
if not isinstance(payload, (bytes, bytearray)):
raise TypeError("payload must be bytes-like")
data = b"\x19" + bytes(payload)
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def set_flood_scope(self, scope):
if scope is None:
logger.debug(f"Resetting scope")

View file

@ -49,6 +49,9 @@ class EventType(Enum):
PATH_RESPONSE = "path_response"
PRIVATE_KEY = "private_key"
DISABLED = "disabled"
CONTACT_DELETED = "contact_deleted"
CONTACTS_FULL = "contacts_full"
TUNING_PARAMS = "tuning_params"
CONTROL_DATA = "control_data"
DISCOVER_RESPONSE = "discover_response"
NEIGHBOURS_RESPONSE = "neighbours_response"

View file

@ -71,6 +71,7 @@ class CommandType(Enum):
SET_AUTOADD_CONFIG = 58
GET_AUTOADD_CONFIG = 59
GET_ALLOWED_REPEAT_FREQ = 60
GET_STATS = 56 # R04: CMD_GET_STATS — used by get_stats_core/radio/packets
SET_PATH_HASH_MODE = 61
# Packet prefixes for the protocol
@ -120,3 +121,6 @@ class PacketType(Enum):
PATH_DISCOVERY_RESPONSE = 0x8D
CONTROL_DATA = 0x8E
CONTACT_DELETED = 0x8F
CONTACTS_FULL = 0x90 # N02: MyMesh::onContactsFull() — 1-byte push, no payload
# Note: 0x90 == ControlType.NODE_DISCOVER_RESP in a different namespace.
# Not a literal conflict (PacketType vs ControlType), but a maintenance hazard.

View file

@ -916,6 +916,37 @@ class MessageReader:
Event(EventType.DISCOVER_RESPONSE, ndr, attributes)
)
elif packet_type_value == PacketType.CONTACT_DELETED.value:
# N01: PUSH_CODE_CONTACT_DELETED (0x8F) — 1-byte code + 32-byte pubkey
# Emitted by MyMesh::onContactOverwrite() (MyMesh.cpp:325-334)
if len(data) < 33:
logger.debug("CONTACT_DELETED frame too short (%d bytes, need 33)", len(data))
return
pubkey = data[1:33].hex()
await self.dispatcher.dispatch(
Event(EventType.CONTACT_DELETED, {"pubkey": pubkey}, {"pubkey": pubkey})
)
elif packet_type_value == PacketType.CONTACTS_FULL.value:
# N02: PUSH_CODE_CONTACTS_FULL (0x90) — 1-byte push, no payload
# Emitted by MyMesh::onContactsFull() (MyMesh.cpp:336)
await self.dispatcher.dispatch(Event(EventType.CONTACTS_FULL, {}))
elif packet_type_value == PacketType.TUNING_PARAMS.value:
# N03: RESP_CODE_TUNING_PARAMS (23) — response to CMD_GET_TUNING_PARAMS (43)
# Format: 1-byte code + 4-byte rx_delay (LE) + 4-byte airtime_factor (LE) = 9 bytes
# Emitted by MyMesh.cpp:1307-1313
if len(data) < 9:
logger.debug("TUNING_PARAMS frame too short (%d bytes, need 9)", len(data))
await self.dispatcher.dispatch(
Event(EventType.ERROR, {"reason": "invalid_frame_length"})
)
return
rx_delay = int.from_bytes(data[1:5], byteorder="little")
airtime_factor = int.from_bytes(data[5:9], byteorder="little")
res = {"rx_delay": rx_delay, "airtime_factor": airtime_factor}
await self.dispatcher.dispatch(Event(EventType.TUNING_PARAMS, res))
else:
logger.debug(f"Unhandled data received {data}")
logger.debug(f"Unhandled packet type: {packet_type_value}")

View file

@ -0,0 +1,364 @@
"""Verification tests for protocol surface gaps.
Each test constructs a mock firmware frame and verifies the SDK dispatches
the correct EventType with the expected payload fields.
"""
import asyncio
import struct
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from meshcore.events import Event, EventType, EventDispatcher
from meshcore.reader import MessageReader
from meshcore.packets import PacketType, CommandType
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_reader():
"""Create a MessageReader with a mock dispatcher that records dispatched events."""
dispatcher = MagicMock(spec=EventDispatcher)
dispatched = []
async def _capture(event):
dispatched.append(event)
dispatcher.dispatch = AsyncMock(side_effect=_capture)
reader = MessageReader(dispatcher)
return reader, dispatched
# ---------------------------------------------------------------------------
# CONTACT_DELETED handler
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_contact_deleted_dispatches_event():
"""A 33-byte CONTACT_DELETED frame dispatches EventType.CONTACT_DELETED."""
reader, dispatched = _make_reader()
pubkey = bytes(range(32))
frame = bytes([PacketType.CONTACT_DELETED.value]) + pubkey
assert len(frame) == 33
await reader.handle_rx(bytearray(frame))
assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.CONTACT_DELETED
assert evt.payload["pubkey"] == pubkey.hex()
assert evt.attributes["pubkey"] == pubkey.hex()
@pytest.mark.asyncio
async def test_contact_deleted_short_frame_ignored():
"""A CONTACT_DELETED frame shorter than 33 bytes is silently dropped."""
reader, dispatched = _make_reader()
# Only 10 bytes — too short
frame = bytes([PacketType.CONTACT_DELETED.value]) + b"\x00" * 9
await reader.handle_rx(bytearray(frame))
assert len(dispatched) == 0
# ---------------------------------------------------------------------------
# CONTACTS_FULL handler + enum entry
# ---------------------------------------------------------------------------
def test_contacts_full_enum_exists():
"""PacketType.CONTACTS_FULL == 0x90."""
assert PacketType.CONTACTS_FULL.value == 0x90
@pytest.mark.asyncio
async def test_contacts_full_dispatches_event():
"""A 1-byte CONTACTS_FULL push dispatches EventType.CONTACTS_FULL."""
reader, dispatched = _make_reader()
frame = bytes([PacketType.CONTACTS_FULL.value])
await reader.handle_rx(bytearray(frame))
assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.CONTACTS_FULL
assert evt.payload == {}
# ---------------------------------------------------------------------------
# TUNING_PARAMS handler
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_tuning_params_dispatches_event():
"""A 9-byte TUNING_PARAMS frame dispatches with rx_delay and airtime_factor."""
reader, dispatched = _make_reader()
rx_delay = 500
airtime_factor = 200
frame = (
bytes([PacketType.TUNING_PARAMS.value])
+ rx_delay.to_bytes(4, "little")
+ airtime_factor.to_bytes(4, "little")
)
assert len(frame) == 9
await reader.handle_rx(bytearray(frame))
assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.TUNING_PARAMS
assert evt.payload["rx_delay"] == 500
assert evt.payload["airtime_factor"] == 200
@pytest.mark.asyncio
async def test_tuning_params_short_frame_dispatches_error():
"""A TUNING_PARAMS frame shorter than 9 bytes dispatches ERROR."""
reader, dispatched = _make_reader()
# Only 5 bytes — too short
frame = bytes([PacketType.TUNING_PARAMS.value]) + b"\x01\x00\x00\x00"
await reader.handle_rx(bytearray(frame))
assert len(dispatched) == 1
evt = dispatched[0]
assert evt.type == EventType.ERROR
assert evt.payload["reason"] == "invalid_frame_length"
# ---------------------------------------------------------------------------
# send_trace() one-byte pad
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_send_trace_empty_path_pads_to_11_bytes():
"""send_trace() with no path produces an 11-byte packet (not 10)."""
from meshcore.commands.messaging import MessagingCommands
cmd = MessagingCommands.__new__(MessagingCommands)
captured_data = None
async def mock_send(data, expected_events, timeout=None):
nonlocal captured_data
captured_data = bytes(data)
return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000})
cmd.send = mock_send
await cmd.send_trace(auth_code=0, tag=1, flags=0, path=None)
assert captured_data is not None
# cmd(1) + tag(4) + auth(4) + flags(1) + pad(1) = 11
assert len(captured_data) == 11
assert captured_data[-1] == 0x00 # The pad byte
@pytest.mark.asyncio
async def test_send_trace_with_path_no_padding():
"""send_trace() with a non-empty path does NOT add padding."""
from meshcore.commands.messaging import MessagingCommands
cmd = MessagingCommands.__new__(MessagingCommands)
captured_data = None
async def mock_send(data, expected_events, timeout=None):
nonlocal captured_data
captured_data = bytes(data)
return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000})
cmd.send = mock_send
# 2-byte path hash (flags=1 means hash_len=2)
await cmd.send_trace(auth_code=0, tag=1, flags=1, path=b"\xAA\xBB")
assert captured_data is not None
# cmd(1) + tag(4) + auth(4) + flags(1) + path(2) = 12 — no pad needed
assert len(captured_data) == 12
# ---------------------------------------------------------------------------
# Command wrapper: send_raw_data
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_send_raw_data_wrapper():
"""send_raw_data sends CMD 0x19 + payload."""
from meshcore.commands.messaging import MessagingCommands
cmd = MessagingCommands.__new__(MessagingCommands)
captured_data = None
async def mock_send(data, expected_events, timeout=None):
nonlocal captured_data
captured_data = bytes(data)
return Event(EventType.MSG_SENT, {"type": 0, "expected_ack": b"\x00" * 4, "suggested_timeout": 1000})
cmd.send = mock_send
await cmd.send_raw_data(b"\xDE\xAD")
assert captured_data is not None
assert captured_data[0] == 0x19 # CMD_SEND_RAW_DATA
assert captured_data[1:] == b"\xDE\xAD"
# ---------------------------------------------------------------------------
# Command wrapper: has_connection
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_has_connection_wrapper():
"""has_connection sends CMD 0x1c."""
from meshcore.commands.device import DeviceCommands
cmd = DeviceCommands.__new__(DeviceCommands)
captured_data = None
async def mock_send(data, expected_events, timeout=None):
nonlocal captured_data
captured_data = bytes(data)
return Event(EventType.OK, {"value": 1})
cmd.send = mock_send
await cmd.has_connection()
assert captured_data is not None
assert captured_data == b"\x1c"
# ---------------------------------------------------------------------------
# Command wrapper: get_tuning
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_tuning_wrapper():
"""get_tuning sends CMD 0x2b (GET_TUNING_PARAMS = 43)."""
from meshcore.commands.device import DeviceCommands
cmd = DeviceCommands.__new__(DeviceCommands)
captured_data = None
async def mock_send(data, expected_events, timeout=None):
nonlocal captured_data
captured_data = bytes(data)
return Event(EventType.TUNING_PARAMS, {"rx_delay": 500, "airtime_factor": 200})
cmd.send = mock_send
result = await cmd.get_tuning()
assert captured_data == b"\x2b"
assert result.type == EventType.TUNING_PARAMS
# ---------------------------------------------------------------------------
# Command wrapper: get_contact_by_key
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_contact_by_key_wrapper():
"""get_contact_by_key sends CMD 0x1e + 32-byte pubkey."""
from meshcore.commands.contact import ContactCommands
cmd = ContactCommands.__new__(ContactCommands)
captured_data = None
async def mock_send(data, expected_events, timeout=None):
nonlocal captured_data
captured_data = bytes(data)
return Event(EventType.NEXT_CONTACT, {"public_key": "ab" * 32})
cmd.send = mock_send
pubkey = bytes(range(32))
await cmd.get_contact_by_key(pubkey)
assert captured_data is not None
assert captured_data[0] == 0x1E # CMD_GET_CONTACT_BY_KEY
assert captured_data[1:] == pubkey
# ---------------------------------------------------------------------------
# Command wrapper: factory_reset (two-step)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_factory_reset_two_step():
"""factory_reset requires a token from request_factory_reset."""
from meshcore.commands.device import DeviceCommands
cmd = DeviceCommands.__new__(DeviceCommands)
captured_data = None
async def mock_send(data, expected_events, timeout=None):
nonlocal captured_data
captured_data = bytes(data)
return Event(EventType.OK, {})
cmd.send = mock_send
# Step 1: request token
token = await cmd.request_factory_reset()
assert isinstance(token, str)
assert len(token) == 16 # hex-encoded 8 bytes
# Step 2: confirm with wrong token fails
with pytest.raises(ValueError, match="Invalid or expired"):
await cmd.confirm_factory_reset("wrong_token")
# Step 2: confirm with correct token succeeds
await cmd.confirm_factory_reset(token)
assert captured_data == b"\x33" # CMD_FACTORY_RESET
@pytest.mark.asyncio
async def test_factory_reset_without_request_fails():
"""confirm_factory_reset without request_factory_reset raises ValueError."""
from meshcore.commands.device import DeviceCommands
cmd = DeviceCommands.__new__(DeviceCommands)
with pytest.raises(ValueError, match="Invalid or expired"):
await cmd.confirm_factory_reset("any_token")
# ---------------------------------------------------------------------------
# GET_STATS enum entry
# ---------------------------------------------------------------------------
def test_get_stats_enum_exists():
"""CommandType.GET_STATS == 56."""
assert CommandType.GET_STATS.value == 56
@pytest.mark.asyncio
async def test_get_stats_core_uses_enum():
"""get_stats_core sends CommandType.GET_STATS.value (0x38) + 0x00."""
from meshcore.commands.device import DeviceCommands
cmd = DeviceCommands.__new__(DeviceCommands)
captured_data = None
async def mock_send(data, expected_events, timeout=None):
nonlocal captured_data
captured_data = bytes(data)
return Event(EventType.STATS_CORE, {})
cmd.send = mock_send
await cmd.get_stats_core()
assert captured_data is not None
assert captured_data[0] == CommandType.GET_STATS.value # 0x38 = 56
assert captured_data[1] == 0x00