Add trace packet type

This commit is contained in:
Alex Wolden 2025-04-12 13:02:01 -07:00
parent a5f1ec5c26
commit ea2f17025f
5 changed files with 168 additions and 4 deletions

70
examples/serial_trace.py Normal file
View file

@ -0,0 +1,70 @@
#!/usr/bin/env python3
import asyncio
import argparse
from meshcore import MeshCore
from meshcore.events import EventType
async def get_repeater_name(mc, hash_prefix):
"""Find a contact by its 2-character hash prefix and return its name"""
# Ensure contacts are available
await mc.ensure_contacts()
# Find contact with matching hash prefix
contact = mc.get_contact_by_key_prefix(hash_prefix)
if contact:
return contact.get("adv_name", f"Unknown ({hash_prefix})")
else:
return f"Unknown ({hash_prefix})"
async def main():
parser = argparse.ArgumentParser(description='MeshCore Serial Trace Example')
parser.add_argument('-p', '--port', required=True, help='Serial port path')
parser.add_argument('--path', type=str, help='Trace path (comma-separated hex values)')
args = parser.parse_args()
try:
# Connect to device
print(f"Connecting to {args.port}...")
mc = await MeshCore.create_serial(args.port, 115200, debug=True)
# Send trace packet
print(f"Sending trace packet...")
result = await mc.commands.send_trace(path=args.path)
if result:
print("Trace packet sent successfully")
print("Waiting for trace response...")
# Wait for a trace response with 15-second timeout
event = await mc.wait_for_event(EventType.TRACE_DATA, timeout=15)
if event:
trace = event.payload
print(f"Trace data received:")
print(f" Tag: {trace['tag']}")
print(f" Flags: {trace.get('flags', 0)}")
print(f" Path Length: {trace.get('path_len', 0)}")
if trace.get('path'):
print(f" Path ({len(trace['path'])} nodes):")
# Process nodes with hash (repeaters)
for i, node in enumerate(trace['path']):
if 'hash' in node:
# Look up repeater name
repeater_name = await get_repeater_name(mc, node['hash'])
print(f" Node {i+1}: {repeater_name}, SNR={node['snr']:.1f} dB")
else:
print(f" Node {i+1}: SNR={node['snr']:.1f} dB (final node)")
else:
print("No trace response received within timeout")
else:
print("Failed to send trace packet")
await mc.disconnect()
except Exception as e:
print(f"Error: {e}")
if __name__ == '__main__':
asyncio.run(main())

View file

@ -224,4 +224,52 @@ class CommandHandler:
async def send_cli(self, cmd): async def send_cli(self, cmd):
logger.debug(f"Sending CLI command: {cmd}") logger.debug(f"Sending CLI command: {cmd}")
data = b"\x32" + cmd.encode('ascii') data = b"\x32" + cmd.encode('ascii')
return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR]) return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR])
async def send_trace(self, auth_code=0, tag=None, flags=0, path=None):
"""
Send a trace packet to test routing through specific repeaters
Args:
auth_code: 32-bit authentication code (default: 0)
tag: 32-bit integer to identify this trace (default: random)
flags: 8-bit flags field (default: 0)
path: Optional string with comma-separated hex values representing repeater pubkeys (e.g. "23,5f,3a")
or a bytes/bytearray object with the raw path data
Returns:
Dictionary with sent status, tag, and estimated timeout in milliseconds, or False if command failed
"""
# Generate random tag if not provided
if tag is None:
import random
tag = random.randint(1, 0xFFFFFFFF)
logger.debug(f"Sending trace: tag={tag}, auth={auth_code}, flags={flags}, path={path}")
# Prepare the command packet: CMD(1) + tag(4) + auth_code(4) + flags(1) + [path]
cmd_data = bytearray([36]) # CMD_SEND_TRACE_PATH
cmd_data.extend(tag.to_bytes(4, 'little'))
cmd_data.extend(auth_code.to_bytes(4, 'little'))
cmd_data.append(flags)
# Process path if provided
if path:
if isinstance(path, str):
# Convert comma-separated hex values to bytes
try:
path_bytes = bytearray()
for hex_val in path.split(','):
hex_val = hex_val.strip()
path_bytes.append(int(hex_val, 16))
cmd_data.extend(path_bytes)
except ValueError as e:
logger.error(f"Invalid path format: {e}")
return False
elif isinstance(path, (bytes, bytearray)):
cmd_data.extend(path)
else:
logger.error(f"Unsupported path type: {type(path)}")
return False
return await self.send(cmd_data, [EventType.MSG_SENT, EventType.ERROR])

View file

@ -2,7 +2,7 @@ from enum import Enum
import logging import logging
from typing import Any, Dict, Optional, Callable, List, Union from typing import Any, Dict, Optional, Callable, List, Union
import asyncio import asyncio
from dataclasses import dataclass from dataclasses import dataclass, field
logger = logging.getLogger("meshcore") logger = logging.getLogger("meshcore")
@ -30,6 +30,7 @@ class EventType(Enum):
LOGIN_FAILED = "login_failed" LOGIN_FAILED = "login_failed"
STATUS_RESPONSE = "status_response" STATUS_RESPONSE = "status_response"
LOG_DATA = "log_data" LOG_DATA = "log_data"
TRACE_DATA = "trace_data"
# Command response types # Command response types
OK = "command_ok" OK = "command_ok"
@ -40,7 +41,7 @@ class EventType(Enum):
class Event: class Event:
type: EventType type: EventType
payload: Any payload: Any
attributes: Dict[str, Any] = {} attributes: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self): def __post_init__(self):
if self.attributes is None: if self.attributes is None:

View file

@ -27,4 +27,5 @@ class PacketType(Enum):
LOGIN_SUCCESS = 0x85 LOGIN_SUCCESS = 0x85
LOGIN_FAILED = 0x86 LOGIN_FAILED = 0x86
STATUS_RESPONSE = 0x87 STATUS_RESPONSE = 0x87
LOG_DATA = 0x88 LOG_DATA = 0x88
TRACE_DATA = 0x89

View file

@ -230,6 +230,50 @@ class MessageReader:
logger.debug("Received log data") logger.debug("Received log data")
await self.dispatcher.dispatch(Event(EventType.LOG_DATA, data[1:].decode('utf-8', errors='replace'))) await self.dispatcher.dispatch(Event(EventType.LOG_DATA, data[1:].decode('utf-8', errors='replace')))
elif packet_type_value == PacketType.TRACE_DATA.value:
logger.debug(f"Received trace data: {data.hex()}")
res = {}
# According to the source, format is:
# 0x89, reserved(0), path_len, flags, tag(4), auth(4), path_hashes[], path_snrs[], final_snr
reserved = data[1]
path_len = data[2]
flags = data[3]
tag = int.from_bytes(data[4:8], byteorder='little')
auth_code = int.from_bytes(data[8:12], byteorder='little')
# Initialize result
res["tag"] = tag
res["auth"] = auth_code
res["flags"] = flags
res["path_len"] = path_len
# Process path as array of objects with hash and SNR
path_nodes = []
if path_len > 0 and len(data) >= 12 + path_len*2 + 1:
# Extract path with hash and SNR pairs
for i in range(path_len):
node = {
"hash": f"{data[12+i]:02x}",
# SNR is stored as a signed byte representing SNR * 4
"snr": (data[12+path_len+i] if data[12+path_len+i] < 128 else data[12+path_len+i] - 256) / 4.0
}
path_nodes.append(node)
# Add the final node (our device) with its SNR
final_snr_byte = data[12+path_len*2]
final_snr = (final_snr_byte if final_snr_byte < 128 else final_snr_byte - 256) / 4.0
path_nodes.append({
"snr": final_snr
})
res["path"] = path_nodes
logger.debug(f"Parsed trace data: {res}")
await self.dispatcher.dispatch(Event(EventType.TRACE_DATA, res))
else: else:
logger.debug(f"Unhandled data received {data}") logger.debug(f"Unhandled data received {data}")
logger.debug(f"Unhandled packet type: {packet_type_value}") logger.debug(f"Unhandled packet type: {packet_type_value}")