diff --git a/src/meshcore/commands/device.py b/src/meshcore/commands/device.py index af986db..890b675 100644 --- a/src/meshcore/commands/device.py +++ b/src/meshcore/commands/device.py @@ -158,6 +158,27 @@ class DeviceCommands(CommandHandlerBase): infos["multi_acks"] = multi_acks return await self.set_other_params_from_infos(infos) + async def set_led_params(self, led_ble_mode: int, led_status_mode: int) -> Event: + logger.debug(f"Setting LED params: ble={led_ble_mode}, status={led_status_mode}") + data = ( + b"\x3e" + + int(led_ble_mode).to_bytes(1, "little") + + int(led_status_mode).to_bytes(1, "little") + ) + return await self.send(data, [EventType.OK, EventType.ERROR]) + + async def set_led_ble_mode(self, mode: int) -> Event: + logger.debug(f"Setting LED BLE mode to {mode}") + res = await self.send_device_query() + current_status = res.payload.get("led_status_mode", 0) if res.type != EventType.ERROR else 0 + return await self.set_led_params(mode, current_status) + + async def set_led_status_mode(self, mode: int) -> Event: + logger.debug(f"Setting LED status mode to {mode}") + res = await self.send_device_query() + current_ble = res.payload.get("led_ble_mode", 0) if res.type != EventType.ERROR else 0 + return await self.set_led_params(current_ble, mode) + async def set_devicepin(self, pin: int) -> Event: logger.debug(f"Setting device PIN to: {pin}") return await self.send( diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py index 077739a..4bfd092 100644 --- a/src/meshcore/packets.py +++ b/src/meshcore/packets.py @@ -72,6 +72,7 @@ class CommandType(Enum): GET_AUTOADD_CONFIG = 59 GET_ALLOWED_REPEAT_FREQ = 60 SET_PATH_HASH_MODE = 61 + SET_LED_PARAMS = 62 # Packet prefixes for the protocol class PacketType(Enum): diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 54052a5..fcccee3 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -346,6 +346,11 @@ class MessageReader: if fw_ver >= 10: # has path_hash_mode path_hash_mode = dbuf.read(1)[0] res["path_hash_mode"] = path_hash_mode + if fw_ver >= 11: # has LED preferences + led_data = dbuf.read(2) + if len(led_data) >= 2: + res["led_ble_mode"] = led_data[0] + res["led_status_mode"] = led_data[1] await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) elif packet_type_value == PacketType.CUSTOM_VARS.value: diff --git a/tests/unit/test_commands.py b/tests/unit/test_commands.py index eb3170a..50ffc71 100644 --- a/tests/unit/test_commands.py +++ b/tests/unit/test_commands.py @@ -320,6 +320,31 @@ async def test_set_channel_invalid_secret_length(command_handler): await command_handler.set_channel(1, "Test", b"tooshort") +async def test_set_led_params(command_handler, mock_connection): + """set_led_params sends CMD_SET_LED_PARAMS (0x3e) with both LED mode bytes.""" + await command_handler.set_led_params(2, 1) + + sent = mock_connection.send.call_args[0][0] + assert sent == b"\x3e\x02\x01" + + +async def test_set_other_params_no_led_bytes(command_handler, mock_connection): + """set_other_params_from_infos should NOT include LED bytes.""" + infos = { + "manual_add_contacts": 0, + "telemetry_mode_base": 0, + "telemetry_mode_loc": 0, + "telemetry_mode_env": 0, + "adv_loc_policy": 0, + "multi_acks": 0, + } + await command_handler.set_other_params_from_infos(infos) + + sent = mock_connection.send.call_args[0][0] + assert sent.startswith(b"\x26") + assert len(sent) == 5 # cmd(1) + 4 params, no LED bytes + + async def test_send_chan_msg_with_str_timestamp(command_handler, mock_connection): ts = 1620000000 await command_handler.send_chan_msg(3, "world", timestamp=ts) diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py index 088d400..3826acb 100644 --- a/tests/unit/test_reader.py +++ b/tests/unit/test_reader.py @@ -85,5 +85,66 @@ async def test_binary_response(): else: print(f"⚠️ Unknown request type {request_type}, no specific event expected") +@pytest.mark.asyncio +async def test_device_info_v11_has_led_fields(): + """Firmware v11+ includes led_ble_mode and led_status_mode in DEVICE_INFO.""" + mock_dispatcher = MockDispatcher() + reader = MessageReader(mock_dispatcher) + + # Build a DEVICE_INFO packet: PacketType.DEVICE_INFO (13) + fw_ver=11 + fields + import struct + packet = bytearray() + packet.append(13) # RESP_CODE_DEVICE_INFO + packet.append(11) # FIRMWARE_VER_CODE = 11 + packet.append(25) # max_contacts / 2 + packet.append(8) # max_channels + packet += struct.pack("