introduce path to node in commands

This commit is contained in:
Florent 2025-10-30 20:17:27 +01:00
parent a5559787f7
commit 478d4c4d03

View file

@ -1,5 +1,5 @@
#!/usr/bin/python
"""
"""
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
@ -484,8 +484,8 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None):
"neighbors" : None,
"req_acl":None,
"setperm":contact_list,
"gps" : {"on":None,"off":None,"sync":None,"setloc":None,
"advert" : {"none": None, "share": None, "prefs": None},
"gps" : {"on":None,"off":None,"sync":None,"setloc":None,
"advert" : {"none": None, "share": None, "prefs": None},
},
"sensor": {"list": None, "set": {"gps": None}, "get": {"gps": None}},
"get" : {"name" : None,
@ -681,7 +681,22 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
pass
# raw meshcli command as on command line
elif line.startswith("$") or line.startswith("/") :
elif line.startswith("/") :
path = line.split(" ", 1)[0]
if path.count("/") == 1:
args = shlex.split(line[1:])
await process_cmds(mc, args)
else:
cmdline = line[1:].split("/",1)[1]
contact_name = path[1:].split("/",1)[0]
tct = mc.get_contact_by_name(contact_name)
if tct is None:
print(f"{contact_name} is not a contact")
else:
if not await process_contact_chat_line(mc, tct, cmdline):
print(f"{cmdline} not found for {contact_name}")
elif line.startswith("$") :
args = shlex.split(line[1:])
await process_cmds(mc, args)
@ -707,7 +722,7 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
elif dest == "!" :
nc = process_event_message.last_node
else :
chan = await get_channel_by_name(mc, dest)
chan = await get_channel_by_name(mc, dest)
if chan is None :
print(f"Contact '{dest}' not found in contacts.")
nc = contact
@ -752,104 +767,8 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
args = shlex.split(line)
await process_cmds(mc, args)
# commands that take contact as second arg will be sent to recipient
elif contact["type"] > 0 and (line == "sc" or line == "share_contact" or\
line == "ec" or line == "export_contact" or\
line == "uc" or line == "upload_contact" or\
line == "rp" or line == "reset_path" or\
line == "dp" or line == "disc_path" or\
line == "contact_info" or line == "ci" or\
line == "req_status" or line == "rs" or\
line == "req_bstatus" or line == "rbs" or\
line == "req_telemetry" or line == "rt" or\
line == "req_acl" or\
line == "path" or\
line == "logout" ) :
args = [line, contact['adv_name']]
await process_cmds(mc, args)
elif contact["type"] > 0 and line.startswith("set timeout "):
cmds=line.split(" ")
contact["timeout"] = float(cmds[2])
elif contact["type"] > 0 and line == "get timeout":
print(f"timeout: {0 if not 'timeout' in contact else contact['timeout']}")
elif contact["type"] == 4 and\
(line.startswith("get mma ")) or\
contact["type"] > 1 and\
(line.startswith("get telemetry") or line.startswith("get status") or line.startswith("get acl")):
cmds = line.split(" ")
args = [f"req_{cmds[1]}", contact['adv_name']]
if len(cmds) > 2 :
args = args + cmds[2:]
if line.startswith("get mma ") and len(args) < 4:
args.append("0")
await process_cmds(mc, args)
# special treatment for setperm to support contact name as param
elif contact["type"] > 1 and\
(line.startswith("setperm ") or line.startswith("set perm ")):
try:
cmds = shlex.split(line)
off = 1 if line.startswith("set perm") else 0
name = cmds[1 + off]
perm_string = cmds[2 + off]
if (perm_string.startswith("0x")):
perm = int(perm_string,0)
elif (perm_string.startswith("#")):
perm = int(perm_string[1:])
else:
perm = int(perm_string,16)
ct=mc.get_contact_by_name(name)
if ct is None:
ct=mc.get_contact_by_key_prefix(name)
if ct is None:
if name == "self" or mc.self_info["public_key"].startswith(name):
key = mc.self_info["public_key"]
else:
key = name
else:
key=ct["public_key"]
newline=f"setperm {key} {perm}"
await process_cmds(mc, ["cmd", contact["adv_name"], newline])
except IndexError:
print("Wrong number of parameters")
# trace called on a contact
elif contact["type"] > 0 and (
line == "trace" or line == "tr") :
await print_trace_to(mc, contact)
elif contact["type"] > 0 and (
line == "dtrace" or line == "dt") :
await print_disc_trace_to(mc, contact)
# same but for commands with a parameter
elif contact["type"] > 0 and (line.startswith("cmd ") or\
line.startswith("cp ") or line.startswith("change_path ") or\
line.startswith("cf ") or line.startswith("change_flags ") or\
line.startswith("req_binary ") or\
line.startswith("login ")) :
cmds = line.split(" ", 1)
args = [cmds[0], contact['adv_name'], cmds[1]]
await process_cmds(mc, args)
elif contact["type"] == 4 and \
(line.startswith("req_mma ") or line.startswith('rm ')) :
cmds = line.split(" ")
if len(cmds) < 3 :
cmds.append("0")
args = [cmds[0], contact['adv_name'], cmds[1], cmds[2]]
await process_cmds(mc, args)
elif line.startswith(":") : # : will send a command to current recipient
args=["cmd", contact['adv_name'], line[1:]]
await process_cmds(mc, args)
elif line == "reset path" : # reset path for compat with terminal chat
args = ["reset_path", contact['adv_name']]
await process_cmds(mc, args)
elif await process_contact_chat_line(mc, contact, line):
pass
elif line == "list" : # list command from chat displays contacts on a line
it = iter(mc.contacts.items())
@ -887,6 +806,121 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
interactive_loop.classic = False
interactive_loop.print_name = True
async def process_contact_chat_line(mc, contact, line):
if contact["type"] == 0:
return False
if line.startswith(":") : # : will send a command to current recipient
args=["cmd", contact['adv_name'], line[1:]]
await process_cmds(mc, args)
return True
if line == "reset path" : # reset path for compat with terminal chat
args = ["reset_path", contact['adv_name']]
await process_cmds(mc, args)
return True
# commands that take contact as second arg will be sent to recipient
if line == "sc" or line == "share_contact" or\
line == "ec" or line == "export_contact" or\
line == "uc" or line == "upload_contact" or\
line == "rp" or line == "reset_path" or\
line == "dp" or line == "disc_path" or\
line == "contact_info" or line == "ci" or\
line == "req_status" or line == "rs" or\
line == "req_bstatus" or line == "rbs" or\
line == "req_telemetry" or line == "rt" or\
line == "req_acl" or\
line == "path" or\
line == "logout" :
args = [line, contact['adv_name']]
await process_cmds(mc, args)
return True
if line.startswith("set timeout "):
cmds=line.split(" ")
contact["timeout"] = float(cmds[2])
return True
if line == "get timeout":
print(f"timeout: {0 if not 'timeout' in contact else contact['timeout']}")
return True
if contact["type"] == 4 and\
(line.startswith("get mma ")) or\
contact["type"] > 1 and\
(line.startswith("get telemetry") or line.startswith("get status") or line.startswith("get acl")):
cmds = line.split(" ")
args = [f"req_{cmds[1]}", contact['adv_name']]
if len(cmds) > 2 :
args = args + cmds[2:]
if line.startswith("get mma ") and len(args) < 4:
args.append("0")
await process_cmds(mc, args)
return True
# special treatment for setperm to support contact name as param
if contact["type"] > 1 and\
(line.startswith("setperm ") or line.startswith("set perm ")):
try:
cmds = shlex.split(line)
off = 1 if line.startswith("set perm") else 0
name = cmds[1 + off]
perm_string = cmds[2 + off]
if (perm_string.startswith("0x")):
perm = int(perm_string,0)
elif (perm_string.startswith("#")):
perm = int(perm_string[1:])
else:
perm = int(perm_string,16)
ct=mc.get_contact_by_name(name)
if ct is None:
ct=mc.get_contact_by_key_prefix(name)
if ct is None:
if name == "self" or mc.self_info["public_key"].startswith(name):
key = mc.self_info["public_key"]
else:
key = name
else:
key=ct["public_key"]
newline=f"setperm {key} {perm}"
await process_cmds(mc, ["cmd", contact["adv_name"], newline])
except IndexError:
print("Wrong number of parameters")
return True
# trace called on a contact
if line == "trace" or line == "tr" :
await print_trace_to(mc, contact)
return True
if line == "dtrace" or line == "dt" :
await print_disc_trace_to(mc, contact)
return True
# same but for commands with a parameter
if line.startswith("cmd ") or\
line.startswith("cp ") or line.startswith("change_path ") or\
line.startswith("cf ") or line.startswith("change_flags ") or\
line.startswith("req_binary ") or\
line.startswith("login ") :
cmds = line.split(" ", 1)
args = [cmds[0], contact['adv_name'], cmds[1]]
await process_cmds(mc, args)
return True
if contact["type"] == 4 and \
(line.startswith("req_mma ") or line.startswith('rm ')) :
cmds = line.split(" ")
if len(cmds) < 3 :
cmds.append("0")
args = [cmds[0], contact['adv_name'], cmds[1], cmds[2]]
await process_cmds(mc, args)
return True
return False
async def send_cmd (mc, contact, cmd) :
res = await mc.commands.send_cmd(contact, cmd)
if not res is None and not res.type == EventType.ERROR:
@ -910,7 +944,7 @@ async def send_chan_msg(mc, nb, msg):
sent["text"] = msg
sent["txt_type"] = 0
sent["name"] = mc.self_info['name']
await log_message(mc, sent)
await log_message(mc, sent)
return res
async def send_msg (mc, contact, msg) :
@ -1021,7 +1055,7 @@ async def get_contacts (mc, anim=False, lastomod=0, timeout=5) :
done, pending = await asyncio.wait(
futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
)
# Check if any future completed successfully
if len(done) == 0:
logger.debug("Timeout while getting contacts")
@ -1041,7 +1075,7 @@ async def get_contacts (mc, anim=False, lastomod=0, timeout=5) :
if anim:
if event.type == EventType.CONTACTS:
print ((len(event.payload)-contact_nb)*"." + " Done")
else :
else :
print(" Error")
for future in pending:
future.cancel()
@ -1703,7 +1737,7 @@ async def next_cmd(mc, cmds, json_output=False):
res = await set_channel(mc, cmds[1], cmds[2])
elif len(cmds[3]) != 32:
res = None
else:
else:
res = await set_channel(mc, cmds[1], cmds[2], bytes.fromhex(cmds[3]))
if res is None:
print("Error setting channel")
@ -1723,8 +1757,8 @@ async def next_cmd(mc, cmds, json_output=False):
case "msg" | "m" | "{" : # sends to a contact from name
argnum = 2
dest = None
if len(cmds[1]) == 12: # possibly an hex prefix
if len(cmds[1]) == 12: # possibly an hex prefix
try:
dest = bytes.fromhex(cmds[1])
except ValueError:
@ -1801,11 +1835,18 @@ async def next_cmd(mc, cmds, json_output=False):
case "trace" | "tr":
argnum = 1
res = await mc.commands.send_trace(path=cmds[1])
path = cmds[1]
plen = int(len(path)/2)
if plen > 1 and path.count(",") == 0:
path = cmds[1][0:2]
for i in range(1, plen):
path = path + "," + cmds[1][2*i:2*i+2]
res = await mc.commands.send_trace(path=path)
if res and res.type != EventType.ERROR:
tag= int.from_bytes(res.payload['expected_ack'], byteorder="little")
timeout = res.payload["suggested_timeout"] / 1000 * 1.2
ev = await mc.wait_for_event(EventType.TRACE_DATA,
ev = await mc.wait_for_event(EventType.TRACE_DATA,
attribute_filters={"tag": tag},
timeout=timeout)
if ev is None:
@ -1939,7 +1980,7 @@ async def next_cmd(mc, cmds, json_output=False):
print("Timeout waiting telemetry")
else :
print(json.dumps(res.payload, indent=4))
case "disc_path" | "dp" :
argnum = 1
await mc.ensure_contacts()
@ -2609,7 +2650,7 @@ async def main(argv):
if not d.name is None and d.name.startswith("MeshCore-"):
print(f" {d.address} {d.name}")
except BleakError:
print(" No BLE HW")
print(" No BLE HW")
print("\nSerial ports:")
ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(ports):
@ -2650,7 +2691,7 @@ async def main(argv):
else:
logger.error("Invalid choice")
return
if (debug==True):
logger.setLevel(logging.DEBUG)
elif (json_output) :