refactor command calls

This commit is contained in:
Florent 2025-04-16 15:20:10 +02:00
parent f1c6a78a41
commit c39e37bac0

View file

@ -9,6 +9,7 @@ import getopt
import json
import datetime
import time
import shlex
import logging
from pathlib import Path
@ -53,18 +54,25 @@ async def subscribe_to_msgs(mc):
await mc.start_auto_message_fetching()
async def interactive_loop(mc) :
print("Interactive mode, use \"to\" to selects contact, \"lc\" to list contacts, \"$\" to issue a command.\n You can send messages using the \"send\" command, a quote, or write your message after the prompt.\n \"quit\" or \"q\" will end interactive mode")
await mc.ensure_contacts()
c = next(iter(mc.contacts.items()))[1]
contact = next(iter(mc.contacts.items()))[1]
try:
while True:
print(f"{contact["adv_name"]}> ", end="", flush=True)
line = (await asyncio.to_thread(sys.stdin.readline)).rstrip('\n')
if line.startswith("to ") :
if line.startswith("$") :
args = shlex.split(line[1:])
await process_cmds(mc, args)
elif line.startswith("to ") :
dest = line[3:]
nc = mc.get_contact_by_name(dest)
if nc is None:
print(f"Contact '{DEST}' not found in contacts.")
print(f"Contact '{dest}' not found in contacts.")
return
else :
contact = nc
@ -72,8 +80,13 @@ async def interactive_loop(mc) :
elif line == "quit" or line == "q" :
break
elif line == "contacts" :
print (json.dumps(mc.contacts,indent=2))
elif line == "lc" :
it = iter(mc.contacts.items())
c = next(it)
print (c[1]["adv_name"], end="")
for c in it :
print(f", {c[1]["adv_name"]}", end="")
print("")
elif line == "" :
pass
@ -81,18 +94,19 @@ async def interactive_loop(mc) :
else :
if line.startswith("send") :
line = line[5:]
if line.startswith("\"") :
line = line[1:]
result = await mc.commands.send_msg(contact, line)
if result.type == EventType.ERROR:
print(f"⚠️ Failed to send message: {result.payload}")
continue
exp_ack = result.payload["expected_ack"].hex()
print(" Sent ... ", end="", flush=True)
res = await mc.wait_for_event(EventType.ACK, attribute_filters={"code": exp_ack}, timeout=5)
if res is None :
print ("No ack !!!")
print ("#", end="")
else :
print ("Ack")
print ("<", end="")
except KeyboardInterrupt:
mc.stop()
@ -105,130 +119,164 @@ async def next_cmd(mc, cmds):
""" process next command """
argnum = 0
match cmds[0] :
case "q":
print(await mc.commands.send_device_query())
case "query" | "q":
res = await mc.commands.send_device_query()
print(res)
case "get_time" | "clock" :
if len(cmds) > 1 and cmds[1] == "sync" :
argnum=1
print(await mc.commands.set_time(int(time.time())))
res = await mc.commands.set_time(int(time.time()))
print(res)
else:
timestamp = (await mc.commands.get_time()).payload["time"]
res = await mc.commands.get_time()
timestamp = res.payload["time"]
print('Current time :'
f' {datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")}'
f' ({timestamp})')
case "sync_time"|"clock sync"|"st":
print(await mc.commands.set_time(int(time.time())))
res = await mc.commands.set_time(int(time.time()))
print(res)
case "set_time" :
argnum = 1
print(await mc.commands.set_time(cmds[1]))
res = await mc.commands.set_time(cmds[1])
print(res)
case "set_txpower"|"txp" :
argnum = 1
print(await mc.commands.set_tx_power(cmds[1]))
res = await mc.commands.set_tx_power(cmds[1])
print(res)
case "set_radio"|"rad" :
argnum = 4
print(await mc.commands.set_radio(cmds[1], cmds[2], cmds[3], cmds[4]))
res = await mc.commands.set_radio(cmds[1], cmds[2], cmds[3], cmds[4])
print(res)
case "set_name" :
argnum = 1
print(await mc.commands.set_name(cmds[1]))
res = await mc.commands.set_name(cmds[1])
print(res)
case "set":
argnum = 2
match cmds[1]:
case "pin":
print (await mc.commands.set_devicepin(cmds[2]))
res = await mc.commands.set_devicepin(cmds[2])
print(res)
case "radio":
params=cmds[2].split(",")
print (await mc.commands.set_radio(params[0], params[1], params[2], params[3]))
res=await mc.commands.set_radio(params[0], params[1], params[2], params[3])
print(res)
case "name":
print (await mc.commands.set_name(cmds[2]))
res = await mc.commands.set_name(cmds[2])
print(res)
case "tx":
print (await mc.commands.set_tx_power(cmds[2]))
res = await mc.commands.set_tx_power(cmds[2])
print(res)
case "lat":
print (await mc.commands.set_coords(\
res = await mc.commands.set_coords(\
float(cmds[2]),\
mc.self_infos['adv_lon']))
mc.self_infos['adv_lon'])
print(res)
case "lon":
print (await mc.commands.set_coords(\
res = await mc.commands.set_coords(\
mc.self_infos['adv_lat'],\
float(cmds[2])))
float(cmds[2]))
print(res)
case "coords":
params=cmds[2].commands.split(",")
print (await mc.commands.set_coords(\
res = await mc.commands.set_coords(\
float(params[0]),\
float(params[1])))
float(params[1]))
print(res)
case "set_tuning"|"tun" :
argnum = 2
print(await mc.commands.set_tuning(cmds[1], cmds[2]))
res = await mc.commands.set_tuning(cmds[1], cmds[2])
print(res)
case "get_bat" | "b":
print(await mc.commands.get_bat())
res = await mc.commands.get_bat()
print(res)
case "reboot" :
print(await mc.commands.reboot())
res = await mc.commands.reboot()
print(res)
case "send" :
argnum = 2
print(await mc.commands.send_msg(bytes.fromhex(cmds[1]), cmds[2]))
res = await mc.commands.send_msg(bytes.fromhex(cmds[1]), cmds[2])
print(res)
case "msg" | "sendto" | "m" | "{" : # sends to a contact from name
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(await mc.commands.send_msg(contact, cmds[2]))
res = await mc.commands.send_msg(contact, cmds[2])
print(res)
case "chan_msg"|"ch" :
argnum = 2
print(await mc.commands.send_chan_msg(int(cmds[1]), cmds[2]))
res = await mc.commands.send_chan_msg(int(cmds[1]), cmds[2])
print(res)
case "def_chan_msg"|"def_chan"|"dch" : # default chan
argnum = 1
print(await mc.commands.send_chan_msg(0, cmds[1]))
res = await mc.commands.send_chan_msg(0, cmds[1])
print(res)
case "cmd" | "c" | "[" :
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(await mc.commands.send_cmd(contact, cmds[2]))
res = await mc.commands.send_cmd(contact, cmds[2])
print(res)
case "login" | "l" | "[[" :
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(contact)
print(await mc.commands.send_login(contact, cmds[2]))
res = await mc.commands.send_login(contact, cmds[2])
print(res)
case "logout" :
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(await mc.send_logout(contact))
res = await mc.send_logout(contact)
print(res)
case "req_status" | "rs" :
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(await mc.commands.send_statusreq(contact))
res = await mc.commands.send_statusreq(contact)
print(res)
case "contacts" | "lc":
print(json.dumps((await mc.commands.get_contacts()).payload,indent=4))
res = await mc.commands.get_contacts()
print(json.dumps(res.payload,indent=4))
case "change_path" | "cp":
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(await mc.commands.change_contact_path(contact, cmds[2]))
res = await mc.commands.change_contact_path(contact, cmds[2])
print(res)
await mc.commands.get_contacts()
case "reset_path" | "rp" :
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(await mc.commands.reset_path(contact))
res = await mc.commands.reset_path(contact)
print(res)
await mc.commands.get_contacts()
case "share_contact" | "sc":
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(await mc.commands.share_contact(contact))
res = await mc.commands.share_contact(contact)
print(res)
case "export_contact"|"ec":
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print((await mc.commands.export_contact(contact)).payload)
res = await mc.commands.export_contact(contact)
print(res.payload)
case "export_myself"|"e":
print((await mc.commands.export_contact()).payload)
res = await mc.commands.export_contact()
print(res.payload)
case "remove_contact" :
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
print(await mc.commands.remove_contact(contact))
res = await mc.commands.remove_contact(contact)
print(res)
case "recv" | "r" :
print(await mc.commands.get_msg())
res = await mc.commands.get_msg()
print(res)
case "sync_msgs" | "sm":
while True:
res = await mc.commands.get_msg()
@ -242,9 +290,11 @@ async def next_cmd(mc, cmds):
case "infos" | "i" :
print(json.dumps(mc.self_info,indent=4))
case "advert" | "a":
print(await mc.commands.send_advert())
res = await mc.commands.send_advert()
print(res)
case "flood_advert":
print(await mc.commands.send_advert(flood=True))
res = await mc.commands.send_advert(flood=True)
print(res)
case "sleep" | "s" :
argnum = 1
await asyncio.sleep(int(cmds[1]))
@ -256,34 +306,46 @@ async def next_cmd(mc, cmds):
argnum = 1
if await mc.wait_for_event(EventType.MESSAGES_WAITING,
timeout=int(cmds[1])) :
print (await mc.commands.get_msg())
res = await mc.commands.get_msg()
print(res)
case "wmt8"|"]":
if await mc.wait_for_event(EventType.MESSAGES_WAITING,
timeout=8) :
print (await mc.commands.get_msg())
res = await mc.commands.get_msg()
print(res)
case "wait_ack" | "wa" | "}":
print(await mc.wait_for_event(EventType.ACK, timeout = 5))
res = await mc.wait_for_event(EventType.ACK, timeout = 5)
print(res)
case "wait_login" | "wl" | "]]":
print(await mc.wait_for_event(EventType.LOGIN_SUCCESS))
res = await mc.wait_for_event(EventType.LOGIN_SUCCESS)
print(res)
case "wait_status" | "ws" :
print(await mc.wait_for_event(EventType.STATUS_RESPONSE))
res = await mc.wait_for_event(EventType.STATUS_RESPONSE)
print(res)
case "msgs_subscribe" | "ms" :
await subscribe_to_msgs(mc)
case "interactive_loop" | "il" | "chat" :
case "interactive" | "im" | "chat" :
await subscribe_to_msgs(mc)
await interactive_loop(mc)
case "cli" | "@" :
argnum = 1
print (await mc.commands.send_cli(cmds[1]))
res = await mc.commands.send_cli(cmds[1])
print(res)
case _ :
if cmds[0][0] == "@" :
print (await mc.commands.send_cli(cmds[0][1:]))
res = await mc.commands.send_cli(cmds[0][1:])
print(res)
else :
logger.info (f"Unknown command : {cmds[0]}")
logger.info (f"cmd {cmds[0:argnum+1]} processed ...")
return cmds[argnum+1:]
async def process_cmds (mc, args) :
cmds = args
while len(cmds) > 0 :
cmds = await next_cmd(MC, cmds)
def usage () :
""" Prints some help """
print("""meshcore-cli : CLI interface to MeschCore BLE companion app
@ -384,9 +446,7 @@ async def main(argv):
MC = MeshCore(con)
await MC.connect()
cmds = args
while len(cmds) > 0 :
cmds = await next_cmd(MC, cmds)
await process_cmds(MC, args)
def cli():
try: