meshcore_py/examples/serial_pingbot.py

160 lines
4.6 KiB
Python
Raw Permalink Normal View History

2025-12-08 22:16:13 +11:00
import asyncio
2025-12-13 21:47:45 +11:00
import logging
from typing import Any
2025-12-08 22:16:13 +11:00
from meshcore import MeshCore, EventType
2025-12-13 21:47:45 +11:00
SERIAL_PORT = "COM4" # change this to your serial port
CHANNEL_IDX = 1 # change this to the index of your "#ping" channel
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger("serial_pingbot")
latest_pathinfo_str = "(? hops, ?)"
def parse_rx_log_data(payload: Any) -> dict[str, Any]:
"""Parse RX_LOG event payload to extract LoRa packet details.
Expected format (hex):
byte0: header
byte1: path_len
next path_len bytes: path nodes
next byte: channel_hash (optional)
"""
result: dict[str, Any] = {}
try:
hex_str = None
if isinstance(payload, dict):
hex_str = payload.get("payload") or payload.get("raw_hex")
elif isinstance(payload, (str, bytes)):
hex_str = payload
if not hex_str:
return result
if isinstance(hex_str, bytes):
hex_str = hex_str.hex()
hex_str = str(hex_str).lower().replace(" ", "").replace("\n", "").replace("\r", "")
if len(hex_str) < 4:
return result
result["header"] = hex_str[0:2]
try:
path_len = int(hex_str[2:4], 16)
result["path_len"] = path_len
except ValueError:
return {}
path_start = 4
path_end = path_start + (path_len * 2)
if len(hex_str) < path_end:
return {}
path_hex = hex_str[path_start:path_end]
result["path"] = path_hex
result["path_nodes"] = [path_hex[i:i + 2] for i in range(0, len(path_hex), 2)]
if len(hex_str) >= path_end + 2:
result["channel_hash"] = hex_str[path_end:path_end + 2]
except Exception as ex:
_LOGGER.debug(f"Error parsing RX_LOG data: {ex}")
return result
def format_pathinfo(parsed: dict[str, Any]) -> str:
"""Return string in format: '(<path_len> hops, <aa:bb:cc>)'."""
path_len = parsed.get("path_len")
nodes = parsed.get("path_nodes") or []
if path_len is None:
return "(? hops, ?)"
if path_len == 0:
return "(0 hops, direct)"
path_str = ":".join(nodes) if nodes else "?"
return f"({path_len} hops, {path_str})"
2025-12-08 22:16:13 +11:00
async def main():
2025-12-13 21:47:45 +11:00
global latest_pathinfo_str
2025-12-08 22:16:13 +11:00
meshcore = await MeshCore.create_serial(SERIAL_PORT, debug=True)
print(f"Connected on {SERIAL_PORT}")
await meshcore.start_auto_message_fetching()
2025-12-13 21:47:45 +11:00
async def handle_rx_log_data(event):
global latest_pathinfo_str
rx = event.payload or {}
raw = rx.get("payload") # use 'payload' (not 'raw_hex') for this parser
if not raw:
return
parsed = parse_rx_log_data(raw)
if parsed:
latest_pathinfo_str = format_pathinfo(parsed)
2025-12-08 22:16:13 +11:00
async def handle_channel_message(event):
2025-12-13 21:47:45 +11:00
msg = event.payload or {}
pathinfo = latest_pathinfo_str
2025-12-08 22:16:13 +11:00
chan = msg.get("channel_idx")
text = msg.get("text", "")
path_len = msg.get("path_len")
sender = text.split(":", 1)[0].strip()
2025-12-13 21:47:45 +11:00
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print(pathinfo)
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
2025-12-08 22:16:13 +11:00
print(f"Received on channel {chan} from {sender}: {text} | path_len={path_len}")
if chan == CHANNEL_IDX and "ping" in text.lower():
2025-12-13 21:47:45 +11:00
reply = f"@[{sender}] Pong 🏓{pathinfo}"
2025-12-08 22:16:13 +11:00
print(f"Detected Ping. Replying in channel {CHANNEL_IDX} with:\n{reply}")
result = await meshcore.commands.send_chan_msg(CHANNEL_IDX, reply)
if result.type == EventType.ERROR:
print(f"Error sending reply: {result.payload}")
else:
print("Reply sent")
2025-12-13 21:47:45 +11:00
sub_chan = meshcore.subscribe(
2025-12-08 22:16:13 +11:00
EventType.CHANNEL_MSG_RECV,
handle_channel_message,
attribute_filters={"channel_idx": CHANNEL_IDX},
)
2025-12-13 21:47:45 +11:00
sub_rx = meshcore.subscribe(
EventType.RX_LOG_DATA,
handle_rx_log_data,
)
2025-12-08 22:16:13 +11:00
try:
2025-12-13 21:47:45 +11:00
print(f"Listening for 'Ping' on channel {CHANNEL_IDX} and RX_LOG_DATA...")
2025-12-08 22:16:13 +11:00
while True:
await asyncio.sleep(3600)
except KeyboardInterrupt:
print("Stopping listener...")
finally:
2025-12-13 21:47:45 +11:00
meshcore.unsubscribe(sub_chan)
meshcore.unsubscribe(sub_rx)
2025-12-08 22:16:13 +11:00
await meshcore.stop_auto_message_fetching()
await meshcore.disconnect()
print("Disconnected")
if __name__ == "__main__":
asyncio.run(main())