some work on output

This commit is contained in:
Florent 2025-04-16 18:23:40 +02:00
parent c39e37bac0
commit 667ae37d18

View file

@ -29,6 +29,7 @@ MCCLI_ADDRESS = MCCLI_CONFIG_DIR + "default_address"
# Fallback address if config file not found
# if None or "" then a scan is performed
ADDRESS = ""
JSON = False
PS = None
CS = None
@ -36,12 +37,13 @@ CS = None
# Subscribe to incoming messages
async def handle_message(event):
data = event.payload
contact = MC.get_contact_by_key_prefix(data['pubkey_prefix'])
if contact is None:
print(f"Unknown contact with pubkey prefix: {data['pubkey_prefix']}")
return
print(f"{contact['adv_name']}: {data['text']}")
name = data["pubkey_prefix"]
else:
name = contact["adv_name"]
print(f"{name}: {data['text']}")
async def subscribe_to_msgs(mc):
global PS, CS
@ -115,213 +117,416 @@ async def interactive_loop(mc) :
# Handle task cancellation from KeyboardInterrupt in asyncio.run()
print("Exiting cli")
async def next_cmd(mc, cmds):
async def next_cmd(mc, cmds, json_output=False):
""" process next command """
argnum = 0
match cmds[0] :
case "query" | "q":
res = await mc.commands.send_device_query()
print(res)
logger.debug(res)
if res.type == EventType.ERROR :
print(f"ERROR: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else :
print("Devince info :")
if res.payload["fw ver"] >= 3:
print(f" Model: {res.payload["model"]}")
print(f" Version: {res.payload["ver"]}")
print(f" Build date: {res.payload["fw_build"]}")
print(f" Firmware version : {res.payload["fw ver"]}")
case "get_time" | "clock" :
if len(cmds) > 1 and cmds[1] == "sync" :
argnum=1
res = await mc.commands.set_time(int(time.time()))
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error setting time: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else :
print("Time set")
else:
res = await mc.commands.get_time()
timestamp = res.payload["time"]
print('Current time :'
f' {datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")}'
f' ({timestamp})')
if res.type == EventType.ERROR:
print(f"Error getting time: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else :
print('Current time :'
f' {datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")}'
f' ({timestamp})')
case "sync_time"|"clock sync"|"st":
res = await mc.commands.set_time(int(time.time()))
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error syncing time: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("Time synced")
case "set_time" :
argnum = 1
res = await mc.commands.set_time(cmds[1])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print (f"Error setting time: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("Time synced")
case "set_txpower"|"txp" :
argnum = 1
res = await mc.commands.set_tx_power(cmds[1])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "set_radio"|"rad" :
argnum = 4
res = await mc.commands.set_radio(cmds[1], cmds[2], cmds[3], cmds[4])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "set_name" :
argnum = 1
res = await mc.commands.set_name(cmds[1])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error setting name: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("Name set")
case "set":
argnum = 2
match cmds[1]:
case "pin":
res = await mc.commands.set_devicepin(cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "radio":
params=cmds[2].split(",")
res=await mc.commands.set_radio(params[0], params[1], params[2], params[3])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "name":
res = await mc.commands.set_name(cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "tx":
res = await mc.commands.set_tx_power(cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "lat":
res = await mc.commands.set_coords(\
float(cmds[2]),\
mc.self_infos['adv_lon'])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "lon":
res = await mc.commands.set_coords(\
mc.self_infos['adv_lat'],\
float(cmds[2]))
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "coords":
params=cmds[2].commands.split(",")
res = await mc.commands.set_coords(\
float(params[0]),\
float(params[1]))
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("ok")
case "set_tuning"|"tun" :
argnum = 2
res = await mc.commands.set_tuning(cmds[1], cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "get_bat" | "b":
res = await mc.commands.get_bat()
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error getting bat {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print(f"Battery level : {res.payload.level}")
case "reboot" :
res = await mc.commands.reboot()
print(res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "send" :
argnum = 2
res = await mc.commands.send_msg(bytes.fromhex(cmds[1]), cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error sending message {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("Message sent")
case "msg" | "sendto" | "m" | "{" : # sends to a contact from name
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.send_msg(contact, cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error sending message: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else :
print("Message sent")
case "chan_msg"|"ch" :
argnum = 2
res = await mc.commands.send_chan_msg(int(cmds[1]), cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error sending message: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else:
print("Message sent")
case "def_chan_msg"|"def_chan"|"dch" : # default chan
argnum = 1
res = await mc.commands.send_chan_msg(0, cmds[1])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error sending message: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "cmd" | "c" | "[" :
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.send_cmd(contact, cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error sending cmd: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "login" | "l" | "[[" :
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.send_login(contact, cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error while loging: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "logout" :
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.send_logout(contact)
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error while logout: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "req_status" | "rs" :
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.send_statusreq(contact)
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error while requesting status: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "contacts" | "lc":
res = await mc.commands.get_contacts()
print(json.dumps(res.payload,indent=4))
logger.debug(json.dumps(res.payload,indent=4))
if res.type == EventType.ERROR:
print(f"Error asking for contacts: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "change_path" | "cp":
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.change_contact_path(contact, cmds[2])
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error setting path: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
await mc.commands.get_contacts()
case "reset_path" | "rp" :
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.reset_path(contact)
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error resetting path: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
await mc.commands.get_contacts()
case "share_contact" | "sc":
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.share_contact(contact)
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error while sharing contact: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "export_contact"|"ec":
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.export_contact(contact)
print(res.payload)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error exporting contact: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else :
print(res.payload)
case "export_myself"|"e":
res = await mc.commands.export_contact()
print(res.payload)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error exporting contact: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
else :
print(res.payload)
case "remove_contact" :
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.remove_contact(contact)
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error removing contact: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "recv" | "r" :
res = await mc.commands.get_msg()
print(res)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error retreiving msg: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
case "sync_msgs" | "sm":
while True:
res = await mc.commands.get_msg()
logger.debug(res)
if res.type == EventType.NO_MORE_MSGS:
logger.error("No more messages")
logger.info("No more messages")
break
elif res.type == EventType.ERROR:
logger.error(f"Error retrieving messages: {res.payload}")
break
print(res)
elif json_output :
print(json.dumps(res.payload, indent=4))
else :
data = res.payload
ct = mc.get_contact_by_key_prefix(data['pubkey_prefix'])
if ct is None:
logger.info(f"Unknown contact with pubkey prefix: {data['pubkey_prefix']}")
name = data["pubkey_prefix"]
else:
name = ct["adv_name"]
print(f"{name}: {data['text']}")
case "infos" | "i" :
print(json.dumps(mc.self_info,indent=4))
case "advert" | "a":
res = await mc.commands.send_advert()
print(res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "flood_advert":
res = await mc.commands.send_advert(flood=True)
print(res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "sleep" | "s" :
argnum = 1
await asyncio.sleep(int(cmds[1]))
case "wait_msg" | "wm" :
await mc.wait_for_event(EventType.MESSAGES_WAITING)
res = await mc.commands.get_msg()
print (res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "trywait_msg" | "wmt" :
argnum = 1
if await mc.wait_for_event(EventType.MESSAGES_WAITING,
timeout=int(cmds[1])) :
if await mc.wait_for_event(EventType.MESSAGES_WAITING, timeout=int(cmds[1])) :
res = await mc.commands.get_msg()
print(res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "wmt8"|"]":
if await mc.wait_for_event(EventType.MESSAGES_WAITING,
timeout=8) :
res = await mc.commands.get_msg()
print(res)
if await mc.wait_for_event(EventType.MESSAGES_WAITING, timeout=8) :
res = await mc.commands.get_msg()
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "wait_ack" | "wa" | "}":
res = await mc.wait_for_event(EventType.ACK, timeout = 5)
print(res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "wait_login" | "wl" | "]]":
res = await mc.wait_for_event(EventType.LOGIN_SUCCESS)
print(res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "wait_status" | "ws" :
res = await mc.wait_for_event(EventType.STATUS_RESPONSE)
print(res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case "msgs_subscribe" | "ms" :
await subscribe_to_msgs(mc)
case "interactive" | "im" | "chat" :
@ -330,21 +535,23 @@ async def next_cmd(mc, cmds):
case "cli" | "@" :
argnum = 1
res = await mc.commands.send_cli(cmds[1])
print(res)
logger.debug(res)
if json_output :
print(json.dumps(res.payload, indent=4))
case _ :
if cmds[0][0] == "@" :
res = await mc.commands.send_cli(cmds[0][1:])
print(res)
logger.debug(res)
else :
logger.info (f"Unknown command : {cmds[0]}")
logger.error(f"Unknown command : {cmds[0]}")
logger.info (f"cmd {cmds[0:argnum+1]} processed ...")
return cmds[argnum+1:]
async def process_cmds (mc, args) :
async def process_cmds (mc, args, json_output=False) :
cmds = args
while len(cmds) > 0 :
cmds = await next_cmd(MC, cmds)
cmds = await next_cmd(MC, cmds, json_output)
def usage () :
""" Prints some help """
@ -354,6 +561,7 @@ def usage () :
Arguments :
-h : prints this help
-j : json output
-a <address> : specifies device address (can be a name)
-d <name> : filter meshcore devices with name or address
-t <hostname> : connects via tcp/ip
@ -392,6 +600,8 @@ def usage () :
async def main(argv):
""" Do the job """
global MC
json_output = JSON
debug = False
address = ADDRESS
port = 5000
hostname = None
@ -403,7 +613,7 @@ async def main(argv):
with open(MCCLI_ADDRESS, encoding="utf-8") as f :
address = f.readline().strip()
opts, args = getopt.getopt(argv, "a:d:s:ht:p:b:")
opts, args = getopt.getopt(argv, "a:d:s:ht:p:b:jD")
for opt, arg in opts :
match opt:
case "-d" : # name specified on cmdline
@ -418,11 +628,20 @@ async def main(argv):
hostname = arg
case "-p" :
port = int(arg)
case "-j" :
json_output=True
case "-D" :
debug=True
if len(args) == 0 : # no args, no action
usage()
return
if (debug==True):
logger.setLevel(logging.DEBUG)
elif (json_output) :
logger.setLevel(logging.ERROR)
con = None
if not hostname is None : # connect via tcp
con = TCPConnection(hostname, port)
@ -443,10 +662,13 @@ async def main(argv):
with open(MCCLI_ADDRESS, "w", encoding="utf-8") as f :
f.write(address)
MC = MeshCore(con)
MC = MeshCore(con, debug=debug)
await MC.connect()
await process_cmds(MC, args)
if (json_output) :
logger.setLevel(logging.ERROR)
await process_cmds(MC, args, json_output)
def cli():
try: