diff --git a/src/meshcore/commands/contact.py b/src/meshcore/commands/contact.py index ac723ea..8a89101 100644 --- a/src/meshcore/commands/contact.py +++ b/src/meshcore/commands/contact.py @@ -185,6 +185,24 @@ class ContactCommands(CommandHandlerBase): data = b"\x3B" return await self.send(data, [EventType.AUTOADD_CONFIG, EventType.ERROR]) + async def get_contact_by_key(self, pubkey: bytes) -> Event: + """N09: Retrieve a single contact by its public key (CMD 30). + + Args: + pubkey: 32-byte public key of the contact. + + Returns: + Event with the contact data (same format as CONTACT/NEXT_CONTACT), + or ERROR if not found. + """ + if not isinstance(pubkey, (bytes, bytearray)): + raise TypeError("pubkey must be bytes-like") + # Truncate or pad to 32 bytes + key_bytes = bytes(pubkey[:32]) + logger.debug(f"Getting contact by key: {key_bytes.hex()}") + data = b"\x1e" + key_bytes + return await self.send(data, [EventType.NEXT_CONTACT, EventType.ERROR]) + async def get_advert_path(self, key: DestinationType) -> Event: key_bytes = _validate_destination(key, prefix_length=32) logger.debug(f"getting advert path for: {key} {key_bytes.hex()}") diff --git a/src/meshcore/commands/device.py b/src/meshcore/commands/device.py index af986db..b6bba51 100644 --- a/src/meshcore/commands/device.py +++ b/src/meshcore/commands/device.py @@ -4,6 +4,7 @@ from hashlib import sha256 from typing import Optional from ..events import Event, EventType +from ..packets import CommandType from .base import CommandHandlerBase, DestinationType, _validate_destination logger = logging.getLogger("meshcore") @@ -273,20 +274,89 @@ class DeviceCommands(CommandHandlerBase): return await self.sign_finish(timeout=timeout, data_size=len(data)) + async def has_connection(self) -> Event: + """N09: Check if the device has an active connection (CMD 28). + + Returns: + Event with a 1-byte response indicating connection status, + or ERROR. + """ + logger.debug("Checking device connection status") + return await self.send(b"\x1c", [EventType.OK, EventType.ERROR]) + + async def get_tuning(self) -> Event: + """N03/N09: Request current tuning parameters (CMD_GET_TUNING_PARAMS = 43). + + Firmware responds with RESP_CODE_TUNING_PARAMS (23): 9 bytes containing + rx_delay (4 bytes LE) and airtime_factor (4 bytes LE). + + Returns: + Event of type TUNING_PARAMS with rx_delay and airtime_factor, + or ERROR. + """ + logger.debug("Getting tuning parameters") + return await self.send(b"\x2b", [EventType.TUNING_PARAMS, EventType.ERROR]) + + async def request_factory_reset(self) -> str: + """N09: Request a factory reset token (step 1 of 2). + + This method returns a confirmation token string. Pass it to + ``confirm_factory_reset(token)`` to actually execute the reset. + The two-step pattern is a Python-side safety measure; the firmware + itself has no token verification. + + Returns: + A confirmation token string to pass to confirm_factory_reset(). + """ + import secrets + token = secrets.token_hex(8) + logger.warning( + "Factory reset requested. Call confirm_factory_reset('%s') to proceed. " + "This will ERASE ALL DATA on the device.", token + ) + # Store the token on the instance for validation + self._factory_reset_token = token + return token + + async def confirm_factory_reset(self, token: str) -> Event: + """N09: Execute factory reset after token confirmation (step 2 of 2). + + Args: + token: The token returned by request_factory_reset(). + + Returns: + Event with OK or ERROR. + + Raises: + ValueError: If the token does not match. + """ + expected = getattr(self, "_factory_reset_token", None) + if expected is None or token != expected: + raise ValueError( + "Invalid or expired factory reset token. " + "Call request_factory_reset() first." + ) + self._factory_reset_token = None # Consume the token + logger.warning("Executing factory reset — all device data will be erased") + return await self.send(b"\x33", [EventType.OK, EventType.ERROR]) + async def get_stats_core(self) -> Event: logger.debug("Getting core statistics") - # CMD_GET_STATS (56) + STATS_TYPE_CORE (0) - return await self.send(b"\x38\x00", [EventType.STATS_CORE, EventType.ERROR]) + # R04: Use CommandType enum instead of literal bytes + cmd = bytes([CommandType.GET_STATS.value, 0x00]) # GET_STATS + STATS_TYPE_CORE + return await self.send(cmd, [EventType.STATS_CORE, EventType.ERROR]) async def get_stats_radio(self) -> Event: logger.debug("Getting radio statistics") - # CMD_GET_STATS (56) + STATS_TYPE_RADIO (1) - return await self.send(b"\x38\x01", [EventType.STATS_RADIO, EventType.ERROR]) + # R04: Use CommandType enum instead of literal bytes + cmd = bytes([CommandType.GET_STATS.value, 0x01]) # GET_STATS + STATS_TYPE_RADIO + return await self.send(cmd, [EventType.STATS_RADIO, EventType.ERROR]) async def get_stats_packets(self) -> Event: logger.debug("Getting packet statistics") - # CMD_GET_STATS (56) + STATS_TYPE_PACKETS (2) - return await self.send(b"\x38\x02", [EventType.STATS_PACKETS, EventType.ERROR]) + # R04: Use CommandType enum instead of literal bytes + cmd = bytes([CommandType.GET_STATS.value, 0x02]) # GET_STATS + STATS_TYPE_PACKETS + return await self.send(cmd, [EventType.STATS_PACKETS, EventType.ERROR]) async def get_allowed_repeat_freq(self) -> Event: logger.debug("Getting allowed repeat freqs")