Let the user manually add contacts (from a pending list)

This commit is contained in:
Florent de Lamotte 2025-07-11 16:57:35 +02:00
parent 83b119f198
commit bb1e152968

View file

@ -106,7 +106,7 @@ async def process_event_message(mc, ev, json_output, end="\n", above=False):
if data['path_len'] == 255 :
path_str = "D"
else :
path_str = f"{data['path_len']}"
path_str = f"{data['path_len']}"
if "SNR" in data and process_event_message.print_snr:
path_str = path_str + f",{data['SNR']}dB"
@ -204,7 +204,7 @@ async def handle_path_update(event):
if not handle_path_update.print_path_updates:
return
if handle_path_update.json_output:
if handle_message.json_output:
msg = json.dumps({"event": "path_update", "public_key" : event.payload["public_key"]})
else:
key = event.payload["public_key"]
@ -222,12 +222,29 @@ async def handle_path_update(event):
print(msg)
handle_path_update.print_path_updates=False
handle_path_update.mc=None
handle_path_update.json_output=False
async def handle_new_contact(event):
if not handle_new_contact.print_new_contacts:
return
if handle_message.json_output:
msg = json.dumps({"event": "new_contact", "contact" : event.payload})
else:
key = event.payload["public_key"]
name = event.payload["adv_name"]
msg = f"New pending contact {name} [{key}]"
if handle_message.above:
print_above(msg)
else :
print(msg)
handle_new_contact.print_new_contacts=False
async def handle_message(event):
""" Process incoming message events """
await process_event_message(handle_message.mc, event,
above=handle_message.above,
await process_event_message(handle_message.mc, event,
above=handle_message.above,
json_output=handle_message.json_output)
handle_message.json_output=False
handle_message.mc=None
@ -247,8 +264,9 @@ async def subscribe_to_msgs(mc, json_output=False, above=False):
CS = mc.subscribe(EventType.CHANNEL_MSG_RECV, handle_message)
await mc.start_auto_message_fetching()
def make_completion_dict(contacts, to=None):
contact_list = {}
def make_completion_dict(contacts, pending={}, to=None):
contact_list = {}
pending_list = {}
to_list = {}
to_list["~"] = None
@ -262,6 +280,10 @@ def make_completion_dict(contacts, to=None):
for c in it :
contact_list[c[1]['adv_name']] = None
pit = iter(pending.items())
for c in pit :
pending_list[c[1]['public_key']] = None
to_list.update(contact_list)
to_list["ch"] = None
@ -286,6 +308,9 @@ def make_completion_dict(contacts, to=None):
"reboot" : None,
"card" : None,
"upload_card" : None,
"contacts": None,
"pending_contacts": None,
"add_pending": pending_list,
"contact_info": contact_list,
"export_contact" : contact_list,
"upload_contact" : contact_list,
@ -305,19 +330,20 @@ def make_completion_dict(contacts, to=None):
"get_channel": None,
"set_channel": None,
"set" : {
"name" : None,
"pin" : None,
"radio" : {",,,":None, "f,bw,sf,cr":None},
"tx" : None,
"tuning" : {",", "af,tx_d"},
"lat" : None,
"lon" : None,
"coords" : None,
"name" : None,
"pin" : None,
"radio" : {",,,":None, "f,bw,sf,cr":None},
"tx" : None,
"tuning" : {",", "af,tx_d"},
"lat" : None,
"lon" : None,
"coords" : None,
"print_snr" : {"on":None, "off": None},
"json_msgs" : {"on":None, "off": None},
"color" : {"on":None, "off":None},
"print_name" : {"on":None, "off":None},
"print_adverts" : {"on":None, "off":None},
"print_new_contacts" : {"on": None, "off":None},
"print_path_updates" : {"on":None,"off":None},
"classic_prompt" : {"on" : None, "off":None},
"manual_add_contacts" : {"on" : None, "off":None},
@ -326,20 +352,21 @@ def make_completion_dict(contacts, to=None):
"telemetry_mode_env" : {"always" : None, "device":None, "never":None},
"advert_loc_policy" : {"none" : None, "share" : None},
},
"get" : {"name":None,
"bat":None,
"get" : {"name":None,
"bat":None,
"fstats": None,
"radio":None,
"tx":None,
"coords":None,
"radio":None,
"tx":None,
"coords":None,
"lat":None,
"lon":None,
"print_snr":None,
"json_msgs":None,
"print_snr":None,
"json_msgs":None,
"color":None,
"print_name":None,
"print_adverts":None,
"print_path_updates":None,
"print_name":None,
"print_adverts":None,
"print_path_updates":None,
"print_new_contacts":None,
"classic_prompt":None,
"manual_add_contacts":None,
"telemetry_mode_base":None,
@ -383,41 +410,41 @@ def make_completion_dict(contacts, to=None):
"start ota" : None,
"password" : None,
"neighbors" : None,
"get" : {"name" : None,
"get" : {"name" : None,
"role":None,
"radio" : None,
"freq":None,
"tx":None,
"af" : None,
"radio" : None,
"freq":None,
"tx":None,
"af" : None,
"repeat" : None,
"allow.read.only" : None,
"flood.advert.interval" : None,
"flood.max":None,
"allow.read.only" : None,
"flood.advert.interval" : None,
"flood.max":None,
"advert.interval" : None,
"guest.password" : None,
"rxdelay": None,
"txdelay": None,
"guest.password" : None,
"rxdelay": None,
"txdelay": None,
"direct.tx_delay":None,
"public.key":None,
"lat" : None,
"lon" : None,
"public.key":None,
"lat" : None,
"lon" : None,
},
"set" : {"name" : None,
"radio" : {",,,":None, "f,bw,sf,cr": None},
"freq" : None,
"tx" : None,
"af": None,
"repeat" : {"on": None, "off": None},
"flood.advert.interval" : None,
"flood.max" : None,
"advert.interval" : None,
"guest.password" : None,
"set" : {"name" : None,
"radio" : {",,,":None, "f,bw,sf,cr": None},
"freq" : None,
"tx" : None,
"af": None,
"repeat" : {"on": None, "off": None},
"flood.advert.interval" : None,
"flood.max" : None,
"advert.interval" : None,
"guest.password" : None,
"allow.read.only" : {"on": None, "off": None},
"rxdelay" : None,
"txdelay": None,
"direct.txdelay" : None,
"lat" : None,
"lon" : None,
"rxdelay" : None,
"txdelay": None,
"direct.txdelay" : None,
"lat" : None,
"lon" : None,
},
"erase": None,
"log" : {"start" : None, "stop" : None, "erase" : None}
@ -444,20 +471,22 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
await mc.ensure_contacts()
await subscribe_to_msgs(mc, above=True)
handle_new_contact.print_new_contacts = True
try:
while True: # purge msgs
res = await mc.commands.get_msg()
if res.type == EventType.NO_MORE_MSGS:
break
if os.path.isdir(MCCLI_CONFIG_DIR) :
our_history = FileHistory(MCCLI_HISTORY_FILE)
else:
our_history = None
# beware, mouse support breaks mouse scroll ...
session = PromptSession(history=our_history,
wrap_lines=False,
session = PromptSession(history=our_history,
wrap_lines=False,
mouse_support=False,
complete_style=CompleteStyle.MULTI_COLUMN)
@ -482,17 +511,17 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
if classic:
prompt = ""
else:
else:
prompt = f"{ANSI_INVERT}"
# some possible symbols for prompts 🭬🬛🬗🭬🬛🬃🬗🭬🬛🬃🬗🬏🭀🭋🭨🮋
if print_name or contact is None :
prompt = prompt + f"{ANSI_BGRAY}"
prompt = prompt + f"{mc.self_info['name']}"
if classic :
if classic :
prompt = prompt + " > "
else :
prompt = prompt + "🭨"
prompt = prompt + "🭨"
if not contact is None :
if not last_ack:
@ -530,9 +559,11 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
session.app.timeoutlen = 0.2
completer = NestedCompleter.from_nested_dict(
make_completion_dict(mc.contacts, to=contact))
make_completion_dict(mc.contacts,
mc.pending_contacts,
to=contact))
line = await session.prompt_async(ANSI(prompt),
line = await session.prompt_async(ANSI(prompt),
complete_while_typing=False,
completer=completer,
key_bindings=bindings)
@ -552,7 +583,7 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
elif line.startswith("to ") : # dest
dest = line[3:]
if dest.startswith("\"") or dest.startswith("\'") : # if name starts with a quote
dest = shlex.split(dest)[0] # use shlex.split to get contact name between quotes
dest = shlex.split(dest)[0] # use shlex.split to get contact name between quotes
nc = mc.get_contact_by_name(dest)
if nc is None:
if dest == "public" :
@ -560,7 +591,7 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
elif dest.startswith("ch"):
dest = int(dest[2:])
nc = {"adv_name" : "chan" + str(dest), "type" : 0, "chan_nb" : dest}
elif dest == ".." : # previous recipient
elif dest == ".." : # previous recipient
nc = prev_contact
elif dest == "~" or dest == "/" or dest == mc.self_info['name']:
nc = None
@ -646,7 +677,7 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
print(f"{c[1]['adv_name']}", end="")
print("")
elif line.startswith("send") or line.startswith("\"") :
elif line.startswith("send") or line.startswith("\"") :
if line.startswith("send") :
line = line[5:]
if line.startswith("\"") :
@ -775,15 +806,18 @@ async def next_cmd(mc, cmds, json_output=False):
case "help" :
argnum = 1
print("""Available parameters :
pin <pin> : ble pin
radio <freq,bw,sf,cr> : radio params
tuning <rx_dly,af> : tuning params
tx <dbm> : tx power
name <name> : node name
lat <lat> : latitude
lon <lon> : longitude
coords <lat,lon> : coordinates
print_snr <on/off> : toggle snr display in messages""")
pin <pin> : ble pin
radio <freq,bw,sf,cr> : radio params
tuning <rx_dly,af> : tuning params
tx <dbm> : tx power
name <name> : node name
lat <lat> : latitude
lon <lon> : longitude
coords <lat,lon> : coordinates
print_snr <on/off> : toggle snr display in messages
print_adverts <on/off> : display adverts as they come
print_new_contacts <on/off> : display new pending contacts when available
print_path_updates <on/off> : display path updates as they come""")
case "print_name":
interactive_loop.print_name = (cmds[2] == "on")
if json_output :
@ -808,6 +842,10 @@ async def next_cmd(mc, cmds, json_output=False):
handle_path_update.print_path_updates = (cmds[2] == "on")
if json_output :
print(json.dumps({"cmd" : cmds[1], "param" : cmds[2]}))
case "print_new_contacts" :
handle_new_contact.print_new_contacts = (cmds[2] == "on")
if json_output :
print(json.dumps({"cmd" : cmds[1], "param" : cmds[2]}))
case "json_msgs" :
handle_message.json_output = (cmds[2] == "on")
if json_output :
@ -972,16 +1010,19 @@ async def next_cmd(mc, cmds, json_output=False):
match cmds[1]:
case "help":
print("""Gets parameters from node
name : node name
bat : battery level in mV
fstats : fs statistics
coords : adv coordinates
lat : latitude
lon : longitude
radio : radio parameters
tx : tx power
print_snr : snr display in messages
custom : all custom variables in json format
name : node name
bat : battery level in mV
fstats : fs statistics
coords : adv coordinates
lat : latitude
lon : longitude
radio : radio parameters
tx : tx power
print_snr : snr display in messages
print_adverts : display adverts as they come
print_new_contacts : display new pending contacts when available
print_path_updates : display path updates as they come
custom : all custom variables in json format
each custom var can also be get/set directly""")
case "print_name":
if json_output :
@ -1003,6 +1044,21 @@ async def next_cmd(mc, cmds, json_output=False):
print(json.dumps({"color" : process_event_message.color}))
else:
print(f"{'on' if process_event_message.color else 'off'}")
case "print_adverts":
if json_output :
print(json.dumps({"print_adverts" : handle_advert.print_adverts}))
else:
print(f"{'on' if handle_advert.print_adverts else 'off'}")
case "print_path_updates":
if json_output :
print(json.dumps({"print_path_updates" : handle_path_update.print_path_updates}))
else:
print(f"{'on' if handle_path_update.print_path_updates else 'off'}")
case "print_new_contacts":
if json_output :
print(json.dumps({"print_new_contacts" : handle_new_contact.print_new_contacts}))
else:
print(f"{'on' if handle_new_contact.print_new_contacts else 'off'}")
case "print_snr":
if json_output :
print(json.dumps({"print_snr" : process_event_message.print_snr}))
@ -1308,16 +1364,39 @@ async def next_cmd(mc, cmds, json_output=False):
print(json.dumps(res.payload, indent=4))
case "contacts" | "list" | "lc":
res = await mc.commands.get_contacts()
logger.debug(json.dumps(res.payload,indent=4))
if res.type == EventType.ERROR:
print(f"Error asking for contacts: {res}")
elif json_output :
print(json.dumps(res.payload, indent=4))
await mc.ensure_contacts(follow=True)
res = mc.contacts
if json_output :
print(json.dumps(res, indent=4))
else :
for c in res.payload.items():
for c in res.items():
print(c[1]["adv_name"])
case "pending_contacts":
if json_output:
print(json.dumps(mc.pending_contacts, indent=4))
else:
for c in mc.pending_contacts.items():
print(f"{c[1]["adv_name"]}: {c[1]["public_key"]}")
case "add_pending":
argnum = 1
contact = mc.pending_contacts.pop(cmds[1], None)
if contact is None:
if json_output:
print(json.dumps({"error":"Contact does not exist"}))
else:
logger.error(f"Contact {cmds[1]} does not exist")
else:
res = await mc.commands.add_contact(contact)
logger.debug(res)
if res.type == EventType.ERROR:
print(f"Error adding contact: {res}")
else:
mc.contacts[contact["public_key"]]=contact
if json_output :
print(json.dumps(res.payload, indent=4))
case "path":
argnum = 1
res = await mc.ensure_contacts(follow=True)
@ -1329,7 +1408,7 @@ async def next_cmd(mc, cmds, json_output=False):
print(f"Unknown contact {cmds[1]}")
else:
path = contact["out_path"]
path_len = contact["out_path_len"]
path_len = contact["out_path_len"]
if json_output :
print(json.dumps({"adv_name" : contact["adv_name"],
"out_path_len" : path_len,
@ -1341,7 +1420,7 @@ async def next_cmd(mc, cmds, json_output=False):
print("Path not set")
else:
print(path)
case "contact_info" | "ci":
argnum = 1
res = await mc.ensure_contacts(follow=True)
@ -1355,7 +1434,7 @@ async def next_cmd(mc, cmds, json_output=False):
print(json.dumps(contact, indent=4))
case "change_path" | "cp":
argnum = 2
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
if contact is None:
@ -1372,7 +1451,7 @@ async def next_cmd(mc, cmds, json_output=False):
print(json.dumps(res.payload, indent=4))
case "change_flags" | "cf":
argnum = 2
argnum = 2
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[1])
if contact is None:
@ -1518,7 +1597,7 @@ async def next_cmd(mc, cmds, json_output=False):
else:
if json_output :
print(json.dumps(res.payload, indent=4))
await mc.commands.get_contacts()
del mc.contacts[contact["public_key"]]
case "recv" | "r" :
res = await mc.commands.get_msg()
@ -1663,7 +1742,7 @@ async def next_cmd(mc, cmds, json_output=False):
return None
await interactive_loop(mc, to=contact)
logger.debug(f"cmd {cmds[0:argnum+1]} processed ...")
return cmds[argnum+1:]
@ -1721,7 +1800,7 @@ def command_help():
advert : sends advert a
floodadv : flood advert
get <param> : gets a param, \"get help\" for more
set <param> <value> : sets a param, \"set help\" for more
set <param> <value> : sets a param, \"set help\" for more
time <epoch> : sets time to given epoch
clock : get current time
clock sync : sync device clock st
@ -1737,6 +1816,8 @@ def command_help():
change_path <ct> <pth> : change the path to a contact cp
change_flags <ct> <f> : change contact flags (tel_l|tel_a|star)cf
req_telemetry <ct> : prints telemetry data as json rt
pending_contacts : show pending contacts
add_pending <key> : manually add pending contact from key
Repeaters
login <name> <pwd> : log into a node (rep) with given pwd l
logout <name> : log out of a repeater
@ -1765,11 +1846,11 @@ def usage () :
-s <port> : use serial port <port>
-b <baudrate> : specify baudrate
Available Commands and shorcuts (can be chained) :""")
Available Commands and shorcuts (can be chained) :""")
command_help()
async def main(argv):
""" Do the job """
async def main(argv):
""" Do the job """
json_output = JSON
debug = False
address = ADDRESS
@ -1795,15 +1876,13 @@ async def main(argv):
serial_port = arg
case "-b" :
baudrate = int(arg)
case "-t" :
case "-t" :
hostname = arg
case "-p" :
port = int(arg)
case "-j" :
json_output=True
handle_message.json_output=True
handle_advert.json_output=True
handle_path_update.json_output=True
case "-D" :
debug=True
case "-h" :
@ -1820,23 +1899,23 @@ async def main(argv):
logger.error("No ble device found")
for d in devices :
if not d.name is None and d.name.startswith("MeshCore-"):
print(f"{d.address} {d.name}")
print(f"{d.address} {d.name}")
return
case "-S" :
devices = await BleakScanner.discover(timeout=timeout)
choices = []
for d in devices:
choices = []
for d in devices:
if not d.name is None and d.name.startswith("MeshCore-"):
choices.append((d.address, f"{d.address} {d.name}"))
if len(choices) == 0:
logger.error("No BLE device found, exiting")
return
result = await radiolist_dialog(
title="MeshCore-cli BLE device selector",
title="MeshCore-cli BLE device selector",
text="Chose the device to connect to :",
values=choices
).run_async()
values=choices
).run_async()
if result is None:
logger.info("No choice made, exiting")
@ -1855,11 +1934,11 @@ async def main(argv):
elif not serial_port is None : # connect via serial port
mc = await MeshCore.create_serial(port=serial_port, baudrate=baudrate, debug=debug)
else : #connect via ble
if address is None or address == "" or len(address.split(":")) != 6 :
if address is None or address == "" or len(address.split(":")) != 6 :
logger.info(f"Scanning BLE for device matching {address}")
devices = await BleakScanner.discover(timeout=timeout)
found = False
for d in devices:
for d in devices:
if not d.name is None and d.name.startswith("MeshCore-") and\
(address is None or address in d.name) :
address=d.address
@ -1883,6 +1962,7 @@ async def main(argv):
mc.subscribe(EventType.ADVERTISEMENT, handle_advert)
mc.subscribe(EventType.PATH_UPDATE, handle_path_update)
mc.subscribe(EventType.NEW_CONTACT, handle_new_contact)
res = await mc.commands.send_device_query()
if res.type == EventType.ERROR :
@ -1894,7 +1974,7 @@ async def main(argv):
else :
if res.payload["fw ver"] > 2 :
logger.info(f"Connected to {mc.self_info['name']} running on a {res.payload['ver']} fw.")
else :
else :
logger.info(f"Connected to {mc.self_info['name']}.")
if os.path.exists(MCCLI_INIT_SCRIPT) and not json_output :