ask trace for a contact in chat mode

This commit is contained in:
Florent 2025-10-29 16:30:34 +01:00
parent f4671ff790
commit 4904e22ab8

View file

@ -445,6 +445,8 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None):
"upload_contact" : None,
"path": None,
"disc_path": None,
"trace": None,
"dtrace": None,
"reset_path" : None,
"change_path" : None,
"change_flags" : None,
@ -810,6 +812,15 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
await process_cmds(mc, ["cmd", contact["adv_name"], newline])
except IndexError:
print("Wrong number of parameters")
# trace called on a contact
elif contact["type"] > 0 and (
line == "trace" or line == "tr") :
await print_trace_to(mc, contact)
elif contact["type"] > 0 and (
line == "dtrace" or line == "dt") :
await print_disc_trace_to(mc, contact)
# same but for commands with a parameter
elif contact["type"] > 0 and (line.startswith("cmd ") or\
@ -1064,6 +1075,70 @@ async def get_channels (mc, anim=False) :
print (" Done")
return mc.channels
async def print_trace_to (mc, contact):
path = contact["out_path"]
path_len = contact["out_path_len"]
trace = ""
if path_len == -1:
print ("No path to destination")
return
if contact["type"] == 2 or contact["type"] == 3:
# repeater or room, can trace to the contact itself
trace = contact["public_key"][0:2]
for i in range(0, path_len):
elem = path[2*(path_len-i-1):2*(path_len-i)]
trace = elem if trace=="" else f"{elem},{trace},{elem}"
await next_cmd(mc, ["trace", trace])
async def discover_path(mc, contact):
await mc.ensure_contacts()
res = await mc.commands.send_path_discovery(contact)
if res.type == EventType.ERROR:
return None
else:
timeout = res.payload["suggested_timeout"]/600 if not "timeout" in contact or contact['timeout']==0 else contact["timeout"]
res = await mc.wait_for_event(EventType.PATH_RESPONSE, timeout=timeout)
if res is None:
return {"error": "timeout"}
else :
return res.payload
async def print_disc_trace_to (mc, contact):
p = await discover_path(mc, contact)
if p is None:
print("Error discovering path")
return
if "error" in p:
print("Timeout discovering path")
return
inp = p["in_path"]
outp = p["out_path"]
inp_l = int(len(inp)/2)
outp_l = int(len(outp)/2)
trace = ""
for i in range(0, outp_l):
elem = outp[2*i:2*(i+1)]
trace = elem if trace == "" else f"{trace},{elem}"
if contact["type"] == 2 or contact["type"] == 3:
# repeater or room, can trace to the contact itself
elem = contact["public_key"][0:2]
trace = elem if trace == "" else f"{trace},{elem}"
for i in range(0, inp_l):
elem = inp[2*i:2*(i+1)]
trace = elem if trace == "" else f"{trace},{elem}"
await next_cmd(mc, ["trace", trace])
async def next_cmd(mc, cmds, json_output=False):
""" process next command """
try :
@ -1703,7 +1778,7 @@ async def next_cmd(mc, cmds, json_output=False):
print(res.payload)
print(json.dumps(res.payload, indent=4))
case "trace" :
case "trace" | "tr":
argnum = 1
res = await mc.commands.send_trace(path=cmds[1])
if res and res.type != EventType.ERROR:
@ -1716,7 +1791,7 @@ async def next_cmd(mc, cmds, json_output=False):
if json_output:
print(json.dumps({"error" : "timeout waiting trace"}))
else :
print("Timeout waiting trace")
print(f"Timeout waiting trace for path {cmds[1]}")
elif ev.type == EventType.ERROR:
if json_output:
print(json.dumps(ev.payload))
@ -1840,26 +1915,19 @@ async def next_cmd(mc, cmds, json_output=False):
argnum = 1
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
res = await mc.commands.send_path_discovery(contact)
logger.debug(res)
if res.type == EventType.ERROR:
res = await discover_path(mc, contact)
if res is None:
print(f"Error while discovering path")
else:
timeout = res.payload["suggested_timeout"]/800 if not "timeout" in contact or contact['timeout']==0 else contact["timeout"]
res = await mc.wait_for_event(EventType.PATH_RESPONSE, timeout=timeout)
logger.debug(res)
if res is None:
if json_output :
print(json.dumps({"error" : "Timeout discovering path"}))
if json_output :
print(json.dumps(res, indent=4))
else:
if "error" in res :
print("Timeout while discovering path")
else:
print("Timeout discovering path")
else :
if json_output :
print(json.dumps(res.payload, indent=4))
else:
outp = res.payload['out_path']
outp = res['out_path']
outp = outp if outp != "" else "direct"
inp = res.payload['in_path']
inp = res['in_path']
inp = inp if inp != "" else "direct"
print(f"Path for {contact['adv_name']}: out {outp}, in {inp}")
@ -2054,12 +2122,16 @@ async def next_cmd(mc, cmds, json_output=False):
else:
print(f"Unknown contact {cmds[1]}")
else:
res = await mc.commands.change_contact_path(contact, cmds[2])
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error setting path: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
path = cmds[2].replace(",","") # we'll accept path with ,
try:
res = await mc.commands.change_contact_path(contact, path)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error setting path: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
except ValueError:
print(f"Bad path format {cmds[2]}")
case "change_flags" | "cf":
argnum = 2