From cca0ca90e93d85507b2fac8ba6b7fb486359628b Mon Sep 17 00:00:00 2001 From: Alex Wolden Date: Sun, 1 Jun 2025 20:30:19 -0700 Subject: [PATCH] Add channel commands and fix a lint error --- examples/serial_channel_manager.py | 116 +++++++++++++++++++++++++++++ src/meshcore/commands.py | 19 +++++ src/meshcore/events.py | 1 + src/meshcore/reader.py | 24 +++++- tests/unit/test_commands.py | 21 +++++- tests/unit/test_events.py | 17 ++++- 6 files changed, 192 insertions(+), 6 deletions(-) create mode 100755 examples/serial_channel_manager.py diff --git a/examples/serial_channel_manager.py b/examples/serial_channel_manager.py new file mode 100755 index 0000000..f00726f --- /dev/null +++ b/examples/serial_channel_manager.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 + +import asyncio +import sys +from meshcore import MeshCore +from meshcore.events import EventType + +# Default port - change as needed +PORT = "/dev/tty.usbserial-583A0069501" +BAUDRATE = 115200 + +async def main(): + # Check command line arguments + if len(sys.argv) > 1: + port = sys.argv[1] + else: + port = PORT + + print(f"Connecting to device on {port}...") + + try: + mc = await MeshCore.create_serial(port, BAUDRATE, debug=True) + print("Connected!") + + # Display device info + print(f"Device: {mc.self_info.get('adv_name', 'Unknown')}") + print(f"Public Key: {mc.self_info.get('public_key', 'Unknown')}") + print() + + while True: + print("Channel Manager") + print("1. Get channel info") + print("2. Set channel") + print("3. Exit") + choice = input("Enter choice (1-3): ") + + if choice == "1": + await get_channel_info(mc) + elif choice == "2": + await set_channel_config(mc) + elif choice == "3": + break + else: + print("Invalid choice. Please try again.\n") + + except Exception as e: + print(f"Error: {e}") + finally: + if 'mc' in locals(): + await mc.disconnect() + print("Disconnected.") + +async def get_channel_info(mc): + """Get information about a specific channel""" + try: + channel_idx = int(input("Enter channel index (0-255): ")) + + print(f"Getting info for channel {channel_idx}...") + result = await mc.commands.get_channel(channel_idx) + + if result.type == EventType.CHANNEL_INFO: + payload = result.payload + print(f"Channel {payload.get('channel_idx', 'Unknown')}:") + print(f" Name: {payload.get('channel_name', 'Unknown')}") + print(f" Secret: {payload.get('channel_secret', b'').hex()}") + elif result.type == EventType.ERROR: + print(f"Error getting channel info: {result.payload}") + else: + print(f"Unexpected response: {result.type}") + + except ValueError: + print("Invalid channel index. Please enter a number.") + except Exception as e: + print(f"Error: {e}") + print() + +async def set_channel_config(mc): + """Configure a channel with name and secret""" + try: + channel_idx = int(input("Enter channel index (0-255): ")) + channel_name = input("Enter channel name (max 32 chars): ") + + # Get secret as hex string + print("Enter channel secret as hex (32 hex chars for 16 bytes):") + print("Example: 0123456789abcdef0123456789abcdef") + secret_hex = input("Secret: ").strip() + + # Validate and convert secret + if len(secret_hex) != 32: + print("Error: Secret must be exactly 32 hex characters (16 bytes)") + return + + try: + channel_secret = bytes.fromhex(secret_hex) + except ValueError: + print("Error: Invalid hex string") + return + + print(f"Setting channel {channel_idx}...") + result = await mc.commands.set_channel(channel_idx, channel_name, channel_secret) + + if result.type == EventType.OK: + print("Channel configured successfully!") + elif result.type == EventType.ERROR: + print(f"Error setting channel: {result.payload}") + else: + print(f"Unexpected response: {result.type}") + + except ValueError: + print("Invalid input. Please enter valid numbers.") + except Exception as e: + print(f"Error: {e}") + print() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py index 93b76bc..1d1446a 100644 --- a/src/meshcore/commands.py +++ b/src/meshcore/commands.py @@ -382,6 +382,25 @@ class CommandHandler: data = b"\x32" + cmd.encode('utf-8') return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR]) + async def get_channel(self, channel_idx: int) -> Event: + logger.debug(f"Getting channel info for channel {channel_idx}") + data = b"\x1f" + channel_idx.to_bytes(1, 'little') + return await self.send(data, [EventType.CHANNEL_INFO, EventType.ERROR]) + + async def set_channel(self, channel_idx: int, channel_name: str, channel_secret: bytes) -> Event: + logger.debug(f"Setting channel {channel_idx}: name={channel_name}") + + # Pad channel name to 32 bytes + name_bytes = channel_name.encode('utf-8')[:32] + name_bytes = name_bytes.ljust(32, b'\x00') + + # Ensure channel secret is exactly 16 bytes + if len(channel_secret) != 16: + raise ValueError("Channel secret must be exactly 16 bytes") + + data = b"\x20" + channel_idx.to_bytes(1, 'little') + name_bytes + channel_secret + return await self.send(data, [EventType.OK, EventType.ERROR]) + async def send_trace(self, auth_code: int = 0, tag: Optional[int] = None, flags: int = 0, path: Optional[Union[str, bytes, bytearray]] = None) -> Event: """ diff --git a/src/meshcore/events.py b/src/meshcore/events.py index e263899..c498dbc 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -35,6 +35,7 @@ class EventType(Enum): RX_LOG_DATA = "rx_log_data" TELEMETRY_RESPONSE = "telemetry_response" CUSTOM_VARS = "custom_vars" + CHANNEL_INFO = "channel_info" # Command response types OK = "command_ok" diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index e4856fe..5afc0d0 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -218,7 +218,23 @@ class MessageReader: psplit = p.split(":") res[psplit[0]] = psplit[1] logger.debug(f"got custom vars : {res}") - await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) + await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res)) + + elif packet_type_value == PacketType.CHANNEL_INFO.value: + logger.debug(f"received channel info response: {data.hex()}") + res = {} + res["channel_idx"] = data[1] + + # Channel name is null-terminated, so find the first null byte + name_bytes = data[2:34] + null_pos = name_bytes.find(0) + if null_pos >= 0: + res["channel_name"] = name_bytes[:null_pos].decode('utf-8') + else: + res["channel_name"] = name_bytes.decode('utf-8') + + res["channel_secret"] = data[34:50] + await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) # Push notifications elif packet_type_value == PacketType.ADVERTISEMENT.value: @@ -404,13 +420,13 @@ class MessageReader: """Parse a given byte string and return as a LppFrame object.""" i = 0 - data = [] + lpp_data_list = [] while i < len(buf) and buf[i] != 0: lppdata = LppData.from_bytes(buf[i:]) - data.append(lppdata) + lpp_data_list.append(lppdata) i = i + len(lppdata) - lpp = json.loads(json.dumps(LppFrame(data), default=lpp_json_encoder)) + lpp = json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder)) res["lpp"] = lpp diff --git a/tests/unit/test_commands.py b/tests/unit/test_commands.py index 49e2c39..6b96211 100644 --- a/tests/unit/test_commands.py +++ b/tests/unit/test_commands.py @@ -255,4 +255,23 @@ async def test_send_with_multiple_expected_events_returns_first_completed(comman # Verify that even though OK was listed first, the ERROR event was returned assert result.type == EventType.ERROR - assert result.payload == error_payload \ No newline at end of file + assert result.payload == error_payload + +# Channel command tests +async def test_get_channel(command_handler, mock_connection): + await command_handler.get_channel(3) + assert mock_connection.send.call_args[0][0] == b"\x1f\x03" + +async def test_set_channel(command_handler, mock_connection): + channel_secret = bytes(range(16)) # 16 bytes: 0x00, 0x01, ..., 0x0f + await command_handler.set_channel(5, "MyChannel", channel_secret) + + expected_data = b"\x20\x05" # CMD_SET_CHANNEL + channel_idx=5 + expected_data += b"MyChannel" + b"\x00" * (32 - len("MyChannel")) # 32-byte padded name + expected_data += channel_secret # 16-byte secret + + assert mock_connection.send.call_args[0][0] == expected_data + +async def test_set_channel_invalid_secret_length(command_handler): + with pytest.raises(ValueError, match="Channel secret must be exactly 16 bytes"): + await command_handler.set_channel(1, "Test", b"tooshort") \ No newline at end of file diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index a2925f6..d3dabb7 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -109,4 +109,19 @@ async def test_event_init_with_kwargs(): assert event.type == EventType.ACK assert event.payload == {"data": "value"} - assert event.attributes == {"code": "1234", "status": "ok"} \ No newline at end of file + assert event.attributes == {"code": "1234", "status": "ok"} + +async def test_channel_info_event(): + # Test CHANNEL_INFO event type + channel_payload = { + "channel_idx": 3, + "channel_name": "TestChannel", + "channel_secret": b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10" + } + + event = Event(EventType.CHANNEL_INFO, channel_payload) + + assert event.type == EventType.CHANNEL_INFO + assert event.payload["channel_idx"] == 3 + assert event.payload["channel_name"] == "TestChannel" + assert len(event.payload["channel_secret"]) == 16 \ No newline at end of file