chat commands implementation

This commit is contained in:
Florent 2025-04-17 14:09:36 +02:00
parent ab39232cd1
commit 078f1375e2
2 changed files with 147 additions and 90 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "meshcore-cli"
version = "0.5.3"
version = "0.5.5"
authors = [
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
]

View file

@ -59,11 +59,28 @@ async def interactive_loop(mc) :
print(f"{contact['adv_name']}> ", end="", flush=True)
line = (await asyncio.to_thread(sys.stdin.readline)).rstrip('\n')
if line.startswith("$") :
if line == "" : # blank line
pass
elif line.startswith("$") : # command
args = shlex.split(line[1:])
await process_cmds(mc, args)
elif line.startswith("to ") :
elif line.startswith(".") or\
line.startswith("set ") or\
line.startswith("get ") or\
line.startswith("public") or\
line.startswith("clock") or\
line.startswith("time") or\
line.startswith("ver") or\
line.startswith("reboot") or\
line.startswith("advert") or\
line.startswith("chan") or\
line.startswith("card") : # terminal chat commands
args = shlex.split(line)
await process_cmds(mc, args)
elif line.startswith("to ") : # dest
dest = line[3:]
nc = mc.get_contact_by_name(dest)
if nc is None:
@ -72,20 +89,29 @@ async def interactive_loop(mc) :
else :
contact = nc
elif line == "to" :
print(contact["adv_name"])
elif line == "reset path" : # reset path from terminal chat
res = await mc.commands.reset_path(contact)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error resetting path: {res}")
else:
print(f"Path to contact['adv_name'] has been reset")
await mc.commands.get_contacts()
elif line == "quit" or line == "q" :
break
elif line == "lc" :
elif line == "list" : # list command from chat displays contacts on a line
it = iter(mc.contacts.items())
c = next(it)
print (c[1]["adv_name"], end="")
for c in it :
print(f", {c[1]['adv_name']}", end="")
print("")
elif line == "" :
pass
else :
if line.startswith("send") :
line = line[5:]
@ -110,7 +136,7 @@ async def interactive_loop(mc) :
# Handle task cancellation from KeyboardInterrupt in asyncio.run()
print("Exiting cli")
async def process_event_message(mc, ev, json_output):
async def process_event_message(mc, ev, json_output, end="\n"):
if ev.type == EventType.NO_MORE_MSGS:
logger.debug("No more messages")
return False
@ -118,17 +144,22 @@ async def process_event_message(mc, ev, json_output):
logger.error(f"Error retrieving messages: {ev.payload}")
return False
elif json_output :
print(json.dumps(ev.payload, indent=4))
print(json.dumps(ev.payload, indent=4), end=end, flush=True)
else :
await mc.ensure_contacts()
data = ev.payload
ct = mc.get_contact_by_key_prefix(data['pubkey_prefix'])
if ct is None:
logger.info(f"Unknown contact with pubkey prefix: {data['pubkey_prefix']}")
name = data["pubkey_prefix"]
if (data['type'] == "PRIV") :
ct = mc.get_contact_by_key_prefix(data['pubkey_prefix'])
if ct is None:
logger.info(f"Unknown contact with pubkey prefix: {data['pubkey_prefix']}")
name = data["pubkey_prefix"]
else:
name = ct["adv_name"]
print(f"{name}({"D" if data['path_len'] == 255 else str(data['path_len'])}): {data['text']}")
elif (data['type'] == "CHAN") :
print(f"ch{data['channel_idx']}({"D" if data['path_len'] == 255 else data['path_len']}): {data['text']}")
else:
name = ct["adv_name"]
print(f"{name}: {data['text']}")
print(json.dumps(ev.payload))
return True
async def next_cmd(mc, cmds, json_output=False):
@ -140,7 +171,7 @@ async def next_cmd(mc, cmds, json_output=False):
else:
cmd = cmds[0]
match cmd :
case "query" | "q":
case "ver" | "v" :
res = await mc.commands.send_device_query()
logger.debug(res)
if res.type == EventType.ERROR :
@ -153,9 +184,10 @@ async def next_cmd(mc, cmds, json_output=False):
print(f" Model: {res.payload['model']}")
print(f" Version: {res.payload['ver']}")
print(f" Build date: {res.payload['fw_build']}")
print(f" Firmware version : {res.payload['fw ver']}")
else :
print(f" Firmware version : {res.payload['fw ver']}")
case "get_time" | "clock" :
case "clock" :
if len(cmds) > 1 and cmds[1] == "sync" :
argnum=1
res = await mc.commands.set_time(int(time.time()))
@ -165,7 +197,7 @@ async def next_cmd(mc, cmds, json_output=False):
elif json_output :
print(json.dumps(res.payload, indent=4))
else :
print("Time set")
print("Time synced")
else:
res = await mc.commands.get_time()
timestamp = res.payload["time"]
@ -178,7 +210,7 @@ async def next_cmd(mc, cmds, json_output=False):
f' {datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")}'
f' ({timestamp})')
case "sync_time"|"clock sync"|"st":
case "sync_time"|"clock sync"|"st": # keep if for the st shortcut
res = await mc.commands.set_time(int(time.time()))
logger.debug(res)
if res.type == EventType.ERROR:
@ -188,7 +220,7 @@ async def next_cmd(mc, cmds, json_output=False):
else:
print("Time synced")
case "set_time" :
case "time" :
argnum = 1
res = await mc.commands.set_time(cmds[1])
logger.debug(res)
@ -197,44 +229,22 @@ async def next_cmd(mc, cmds, json_output=False):
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("Time synced")
case "set_txpower"|"txp" :
argnum = 1
res = await mc.commands.set_tx_power(cmds[1])
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "set_radio"|"rad" :
argnum = 4
res = await mc.commands.set_radio(cmds[1], cmds[2], cmds[3], cmds[4])
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "set_name" :
argnum = 1
res = await mc.commands.set_name(cmds[1])
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error setting name: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
print("Time set")
case "set":
argnum = 2
match cmds[1]:
case "help" :
argnum = 1
print("""Available parameters :
pin <pin> : ble pin
radio <freq,bw,sf,cr> : radio params
tuning <rx_dly,af> : tuning params
tx <dbm> : tx power
name <name> : node name
lat <lat> : latitude
lon <lon> : longitude
coords <lat,lon> : coordinates""")
case "pin":
res = await mc.commands.set_devicepin(cmds[2])
logger.debug(res)
@ -306,25 +316,61 @@ async def next_cmd(mc, cmds, json_output=False):
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "tuning":
params=cmds[2].commands.split(",")
res = await mc.commands.set_tuning(
int(params[0]), int(params[1]))
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "set_tuning"|"tun" :
argnum = 2
res = await mc.commands.set_tuning(cmds[1], cmds[2])
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "get_bat" | "b":
res = await mc.commands.get_bat()
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error getting bat {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print(f"Battery level : {res.payload.level}")
case "get" :
argnum = 1
match cmds[1]:
case "help":
print("""Gets parameters from node
name : node name
bat : battery level in mV
coords : adv coordinates
radio : radio parameters
tx : tx power""")
case "name":
if json_output :
print(json.dumps(mc.self_info["name"]))
else:
print(mc.self_info["name"])
case "tx":
if json_output :
print(json.dumps(mc.self_info["tx_power"]))
else:
print(mc.self_info["tx_power"])
case "coords":
if json_output :
print(json.dumps({"lat": mc.self_info["adv_lat"], "lon":mc.self_info["adv_lon"]}))
else:
print(print(f"{mc.self_info['adv_lat']},{mc.self_info['adv_lon']}"))
case "radio":
if json_output :
print(json.dumps(
{"radio_freq": mc.self_info["radio_freq"],
"radio_sf": mc.self_info["radio_sf"],
"radio_bw": mc.self_info["radio_bw"],
"radio_cr": mc.self_info["radio_cr"]}))
else:
print(f"{mc.self_info['radio_freq']},{mc.self_info['radio_sf']},{mc.self_info['radio_bw']},{mc.self_info['radio_cr']}")
case "bat" :
res = await mc.commands.get_bat()
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error getting bat {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print(f"Battery level : {res.payload.level}")
case "reboot" :
res = await mc.commands.reboot()
@ -332,16 +378,7 @@ async def next_cmd(mc, cmds, json_output=False):
if json_output :
print(json.dumps(res.payload, indent=4))
case "send" :
argnum = 2
res = await mc.commands.send_msg(bytes.fromhex(cmds[1]), cmds[2])
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error sending message {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "msg" | "sendto" | "m" | "{" : # sends to a contact from name
case "msg" | "m" | "{" : # sends to a contact from name
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
@ -353,7 +390,7 @@ async def next_cmd(mc, cmds, json_output=False):
res.payload["expected_ack"] = res.payload["expected_ack"].hex()
print(json.dumps(res.payload, indent=4))
case "chan_msg"|"ch" :
case "chan"|"ch" :
argnum = 2
res = await mc.commands.send_chan_msg(int(cmds[1]), cmds[2])
logger.debug(res)
@ -362,7 +399,7 @@ async def next_cmd(mc, cmds, json_output=False):
elif json_output :
print(json.dumps(res.payload, indent=4))
case "def_chan_msg"|"def_chan"|"dch" : # default chan
case "public" | "dch" : # default chan
argnum = 1
res = await mc.commands.send_chan_msg(0, cmds[1])
logger.debug(res)
@ -417,7 +454,7 @@ async def next_cmd(mc, cmds, json_output=False):
elif json_output :
print(json.dumps(res.payload, indent=4))
case "contacts" | "lc":
case "contacts" | "list" | "lc":
res = await mc.commands.get_contacts()
logger.debug(json.dumps(res.payload,indent=4))
if res.type == EventType.ERROR:
@ -476,7 +513,7 @@ async def next_cmd(mc, cmds, json_output=False):
else :
print(res.payload)
case "export_myself"|"e":
case "card" :
res = await mc.commands.export_contact()
logger.debug(res)
if res.type == EventType.ERROR:
@ -504,10 +541,22 @@ async def next_cmd(mc, cmds, json_output=False):
case "sync_msgs" | "sm":
ret = True
first = True
if json_output :
print("[", end="", flush=True)
end=""
else:
end="\n"
while ret:
res = await mc.commands.get_msg()
logger.debug(res)
ret = await process_event_message(mc, res, json_output)
logger.debug(res)
if res.type != EventType.NO_MORE_MSGS:
if not first and json_output :
print(",")
ret = await process_event_message(mc, res, json_output,end=end)
first = False
if json_output :
print("]")
case "infos" | "i" :
print(json.dumps(mc.self_info,indent=4))
@ -627,8 +676,16 @@ async def next_cmd(mc, cmds, json_output=False):
async def process_cmds (mc, args, json_output=False) :
cmds = args
first = True
if json_output :
print("[")
while len(cmds) > 0 :
if not first and json_output :
print(",")
cmds = await next_cmd(MC, cmds, json_output)
first = False
if json_output :
print("]")
def usage () :
""" Prints some help """