Add channel commands and fix a lint error

This commit is contained in:
Alex Wolden 2025-06-01 20:30:19 -07:00
parent d73979f234
commit cca0ca90e9
6 changed files with 192 additions and 6 deletions

View file

@ -0,0 +1,116 @@
#!/usr/bin/env python3
import asyncio
import sys
from meshcore import MeshCore
from meshcore.events import EventType
# Default port - change as needed
PORT = "/dev/tty.usbserial-583A0069501"
BAUDRATE = 115200
async def main():
# Check command line arguments
if len(sys.argv) > 1:
port = sys.argv[1]
else:
port = PORT
print(f"Connecting to device on {port}...")
try:
mc = await MeshCore.create_serial(port, BAUDRATE, debug=True)
print("Connected!")
# Display device info
print(f"Device: {mc.self_info.get('adv_name', 'Unknown')}")
print(f"Public Key: {mc.self_info.get('public_key', 'Unknown')}")
print()
while True:
print("Channel Manager")
print("1. Get channel info")
print("2. Set channel")
print("3. Exit")
choice = input("Enter choice (1-3): ")
if choice == "1":
await get_channel_info(mc)
elif choice == "2":
await set_channel_config(mc)
elif choice == "3":
break
else:
print("Invalid choice. Please try again.\n")
except Exception as e:
print(f"Error: {e}")
finally:
if 'mc' in locals():
await mc.disconnect()
print("Disconnected.")
async def get_channel_info(mc):
"""Get information about a specific channel"""
try:
channel_idx = int(input("Enter channel index (0-255): "))
print(f"Getting info for channel {channel_idx}...")
result = await mc.commands.get_channel(channel_idx)
if result.type == EventType.CHANNEL_INFO:
payload = result.payload
print(f"Channel {payload.get('channel_idx', 'Unknown')}:")
print(f" Name: {payload.get('channel_name', 'Unknown')}")
print(f" Secret: {payload.get('channel_secret', b'').hex()}")
elif result.type == EventType.ERROR:
print(f"Error getting channel info: {result.payload}")
else:
print(f"Unexpected response: {result.type}")
except ValueError:
print("Invalid channel index. Please enter a number.")
except Exception as e:
print(f"Error: {e}")
print()
async def set_channel_config(mc):
"""Configure a channel with name and secret"""
try:
channel_idx = int(input("Enter channel index (0-255): "))
channel_name = input("Enter channel name (max 32 chars): ")
# Get secret as hex string
print("Enter channel secret as hex (32 hex chars for 16 bytes):")
print("Example: 0123456789abcdef0123456789abcdef")
secret_hex = input("Secret: ").strip()
# Validate and convert secret
if len(secret_hex) != 32:
print("Error: Secret must be exactly 32 hex characters (16 bytes)")
return
try:
channel_secret = bytes.fromhex(secret_hex)
except ValueError:
print("Error: Invalid hex string")
return
print(f"Setting channel {channel_idx}...")
result = await mc.commands.set_channel(channel_idx, channel_name, channel_secret)
if result.type == EventType.OK:
print("Channel configured successfully!")
elif result.type == EventType.ERROR:
print(f"Error setting channel: {result.payload}")
else:
print(f"Unexpected response: {result.type}")
except ValueError:
print("Invalid input. Please enter valid numbers.")
except Exception as e:
print(f"Error: {e}")
print()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -382,6 +382,25 @@ class CommandHandler:
data = b"\x32" + cmd.encode('utf-8')
return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR])
async def get_channel(self, channel_idx: int) -> Event:
logger.debug(f"Getting channel info for channel {channel_idx}")
data = b"\x1f" + channel_idx.to_bytes(1, 'little')
return await self.send(data, [EventType.CHANNEL_INFO, EventType.ERROR])
async def set_channel(self, channel_idx: int, channel_name: str, channel_secret: bytes) -> Event:
logger.debug(f"Setting channel {channel_idx}: name={channel_name}")
# Pad channel name to 32 bytes
name_bytes = channel_name.encode('utf-8')[:32]
name_bytes = name_bytes.ljust(32, b'\x00')
# Ensure channel secret is exactly 16 bytes
if len(channel_secret) != 16:
raise ValueError("Channel secret must be exactly 16 bytes")
data = b"\x20" + channel_idx.to_bytes(1, 'little') + name_bytes + channel_secret
return await self.send(data, [EventType.OK, EventType.ERROR])
async def send_trace(self, auth_code: int = 0, tag: Optional[int] = None,
flags: int = 0, path: Optional[Union[str, bytes, bytearray]] = None) -> Event:
"""

View file

@ -35,6 +35,7 @@ class EventType(Enum):
RX_LOG_DATA = "rx_log_data"
TELEMETRY_RESPONSE = "telemetry_response"
CUSTOM_VARS = "custom_vars"
CHANNEL_INFO = "channel_info"
# Command response types
OK = "command_ok"

View file

@ -218,7 +218,23 @@ class MessageReader:
psplit = p.split(":")
res[psplit[0]] = psplit[1]
logger.debug(f"got custom vars : {res}")
await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res))
await self.dispatcher.dispatch(Event(EventType.CUSTOM_VARS, res))
elif packet_type_value == PacketType.CHANNEL_INFO.value:
logger.debug(f"received channel info response: {data.hex()}")
res = {}
res["channel_idx"] = data[1]
# Channel name is null-terminated, so find the first null byte
name_bytes = data[2:34]
null_pos = name_bytes.find(0)
if null_pos >= 0:
res["channel_name"] = name_bytes[:null_pos].decode('utf-8')
else:
res["channel_name"] = name_bytes.decode('utf-8')
res["channel_secret"] = data[34:50]
await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res))
# Push notifications
elif packet_type_value == PacketType.ADVERTISEMENT.value:
@ -404,13 +420,13 @@ class MessageReader:
"""Parse a given byte string and return as a LppFrame object."""
i = 0
data = []
lpp_data_list = []
while i < len(buf) and buf[i] != 0:
lppdata = LppData.from_bytes(buf[i:])
data.append(lppdata)
lpp_data_list.append(lppdata)
i = i + len(lppdata)
lpp = json.loads(json.dumps(LppFrame(data), default=lpp_json_encoder))
lpp = json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder))
res["lpp"] = lpp

View file

@ -255,4 +255,23 @@ async def test_send_with_multiple_expected_events_returns_first_completed(comman
# Verify that even though OK was listed first, the ERROR event was returned
assert result.type == EventType.ERROR
assert result.payload == error_payload
assert result.payload == error_payload
# Channel command tests
async def test_get_channel(command_handler, mock_connection):
await command_handler.get_channel(3)
assert mock_connection.send.call_args[0][0] == b"\x1f\x03"
async def test_set_channel(command_handler, mock_connection):
channel_secret = bytes(range(16)) # 16 bytes: 0x00, 0x01, ..., 0x0f
await command_handler.set_channel(5, "MyChannel", channel_secret)
expected_data = b"\x20\x05" # CMD_SET_CHANNEL + channel_idx=5
expected_data += b"MyChannel" + b"\x00" * (32 - len("MyChannel")) # 32-byte padded name
expected_data += channel_secret # 16-byte secret
assert mock_connection.send.call_args[0][0] == expected_data
async def test_set_channel_invalid_secret_length(command_handler):
with pytest.raises(ValueError, match="Channel secret must be exactly 16 bytes"):
await command_handler.set_channel(1, "Test", b"tooshort")

View file

@ -109,4 +109,19 @@ async def test_event_init_with_kwargs():
assert event.type == EventType.ACK
assert event.payload == {"data": "value"}
assert event.attributes == {"code": "1234", "status": "ok"}
assert event.attributes == {"code": "1234", "status": "ok"}
async def test_channel_info_event():
# Test CHANNEL_INFO event type
channel_payload = {
"channel_idx": 3,
"channel_name": "TestChannel",
"channel_secret": b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10"
}
event = Event(EventType.CHANNEL_INFO, channel_payload)
assert event.type == EventType.CHANNEL_INFO
assert event.payload["channel_idx"] == 3
assert event.payload["channel_name"] == "TestChannel"
assert len(event.payload["channel_secret"]) == 16