From 325368f2f70c5a5914858ce482b8a7542b68a1ab Mon Sep 17 00:00:00 2001 From: prabathbr <60564552+prabathbr@users.noreply.github.com> Date: Sat, 13 Dec 2025 21:47:45 +1100 Subject: [PATCH] updated to include RX path --- examples/serial_pingbot.py | 127 ++++++++++++++++++++++++++++++++----- 1 file changed, 112 insertions(+), 15 deletions(-) diff --git a/examples/serial_pingbot.py b/examples/serial_pingbot.py index 7140b95..acb0e8d 100644 --- a/examples/serial_pingbot.py +++ b/examples/serial_pingbot.py @@ -1,58 +1,155 @@ import asyncio +import logging +from typing import Any + from meshcore import MeshCore, EventType -SERIAL_PORT = "COM16" # change this to your serial port, tested with T1000-E -CHANNEL_IDX = 1 # change this to the index of your "#ping" channel +SERIAL_PORT = "COM4" # change this to your serial port +CHANNEL_IDX = 1 # change this to the index of your "#ping" channel + +logging.basicConfig(level=logging.INFO) +_LOGGER = logging.getLogger("serial_pingbot") + +latest_pathinfo_str = "(? hops, ?)" + + +def parse_rx_log_data(payload: Any) -> dict[str, Any]: + """Parse RX_LOG event payload to extract LoRa packet details. + + Expected format (hex): + byte0: header + byte1: path_len + next path_len bytes: path nodes + next byte: channel_hash (optional) + """ + result: dict[str, Any] = {} + + try: + hex_str = None + + if isinstance(payload, dict): + hex_str = payload.get("payload") or payload.get("raw_hex") + elif isinstance(payload, (str, bytes)): + hex_str = payload + + if not hex_str: + return result + + if isinstance(hex_str, bytes): + hex_str = hex_str.hex() + + hex_str = str(hex_str).lower().replace(" ", "").replace("\n", "").replace("\r", "") + + if len(hex_str) < 4: + return result + + result["header"] = hex_str[0:2] + + try: + path_len = int(hex_str[2:4], 16) + result["path_len"] = path_len + except ValueError: + return {} + + path_start = 4 + path_end = path_start + (path_len * 2) + + if len(hex_str) < path_end: + return {} + + path_hex = hex_str[path_start:path_end] + result["path"] = path_hex + result["path_nodes"] = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)] + + if len(hex_str) >= path_end + 2: + result["channel_hash"] = hex_str[path_end:path_end + 2] + + except Exception as ex: + _LOGGER.debug(f"Error parsing RX_LOG data: {ex}") + + return result + + +def format_pathinfo(parsed: dict[str, Any]) -> str: + """Return string in format: '( hops, )'.""" + path_len = parsed.get("path_len") + nodes = parsed.get("path_nodes") or [] + + if path_len is None: + return "(? hops, ?)" + + if path_len == 0: + return "(0 hops, direct)" + + path_str = ":".join(nodes) if nodes else "?" + return f"({path_len} hops, {path_str})" async def main(): - # Connect to the MeshCore companion over serial + global latest_pathinfo_str + meshcore = await MeshCore.create_serial(SERIAL_PORT, debug=True) print(f"Connected on {SERIAL_PORT}") - # Let the library automatically fetch messages from the device await meshcore.start_auto_message_fetching() + async def handle_rx_log_data(event): + global latest_pathinfo_str + + rx = event.payload or {} + raw = rx.get("payload") # use 'payload' (not 'raw_hex') for this parser + if not raw: + return + + parsed = parse_rx_log_data(raw) + if parsed: + latest_pathinfo_str = format_pathinfo(parsed) + async def handle_channel_message(event): - msg = event.payload + msg = event.payload or {} + + pathinfo = latest_pathinfo_str chan = msg.get("channel_idx") text = msg.get("text", "") path_len = msg.get("path_len") sender = text.split(":", 1)[0].strip() + print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") + print(pathinfo) + print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") print(f"Received on channel {chan} from {sender}: {text} | path_len={path_len}") if chan == CHANNEL_IDX and "ping" in text.lower(): - - reply = f"@[{sender}] Pong 🏓({path_len})" - + reply = f"@[{sender}] Pong 🏓{pathinfo}" print(f"Detected Ping. Replying in channel {CHANNEL_IDX} with:\n{reply}") result = await meshcore.commands.send_chan_msg(CHANNEL_IDX, reply) - if result.type == EventType.ERROR: print(f"Error sending reply: {result.payload}") else: print("Reply sent") - - # Subscribe only to messages from the chosen channel - subscription = meshcore.subscribe( + sub_chan = meshcore.subscribe( EventType.CHANNEL_MSG_RECV, handle_channel_message, attribute_filters={"channel_idx": CHANNEL_IDX}, ) + sub_rx = meshcore.subscribe( + EventType.RX_LOG_DATA, + handle_rx_log_data, + ) + try: - print(f"Listening for 'Ping' on channel {CHANNEL_IDX}...") - # Keep the program alive + print(f"Listening for 'Ping' on channel {CHANNEL_IDX} and RX_LOG_DATA...") while True: await asyncio.sleep(3600) except KeyboardInterrupt: print("Stopping listener...") finally: - meshcore.unsubscribe(subscription) + meshcore.unsubscribe(sub_chan) + meshcore.unsubscribe(sub_rx) await meshcore.stop_auto_message_fetching() await meshcore.disconnect() print("Disconnected")