implement apply_to command that runs a command to a set of contacts specified with a filter

This commit is contained in:
Florent 2025-11-04 22:18:22 +01:00
parent 04f2f3ced0
commit 47c855ee7c
2 changed files with 119 additions and 10 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "meshcore-cli"
version = "1.2.5"
version = "1.2.6"
authors = [
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
]

View file

@ -33,7 +33,7 @@ import re
from meshcore import MeshCore, EventType, logger
# Version
VERSION = "v1.2.5"
VERSION = "v1.2.6"
# default ble address is stored in a config file
MCCLI_CONFIG_DIR = str(Path.home()) + "/.config/meshcore/"
@ -210,7 +210,7 @@ async def handle_log_rx(event):
pkt = bytes().fromhex(event.payload["payload"])
if handle_log_rx.channel_echoes:
if pkt[0] == 0x15:
if pkt[0] == 0x15:
chan_name = ""
path_len = pkt[1]
path = pkt[2:path_len+2].hex()
@ -235,7 +235,7 @@ async def handle_log_rx(event):
aes_key = bytes.fromhex(channel["channel_secret"])
cipher = AES.new(aes_key, AES.MODE_ECB)
message = cipher.decrypt(msg)[5:].decode("utf-8").strip("\x00")
if chan_name != "" :
width = os.get_terminal_size().columns
cars = width - 13 - 2 * path_len - len(chan_name) - 1
@ -245,7 +245,7 @@ async def handle_log_rx(event):
print_above(txt)
else:
print(txt)
handle_log_rx.json_log_rx = False
handle_log_rx.channel_echoes = False
handle_log_rx.mc = None
@ -366,7 +366,7 @@ async def subscribe_to_msgs(mc, json_output=False, above=False):
class MyNestedCompleter(NestedCompleter):
def get_completions( self, document, complete_event):
txt = document.text_before_cursor.lstrip()
if not " " in txt:
if not " " in txt:
if txt != "" and txt[0] == "/" and txt.count("/") == 1:
opts = []
for k in self.options.keys():
@ -465,6 +465,8 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None):
"set_channel": None,
"get_channels": None,
"remove_channel": None,
"apply_to": None,
"at": None,
"set" : {
"name" : None,
"pin" : None,
@ -531,6 +533,8 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None):
contact_completion_list = {
"contact_info": None,
"contact_name": None,
"contact_lastmod": None,
"export_contact" : None,
"share_contact" : None,
"upload_contact" : None,
@ -652,7 +656,7 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None):
slash_root_completion_list = {}
for k,v in root_completion_list.items():
slash_root_completion_list["/"+k]=v
completion_list.update(slash_root_completion_list)
slash_contacts_completion_list = {}
@ -905,6 +909,13 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
if last_ack == False :
contact = ln
elif contact is None and\
(line.startswith("apply_to ") or line.startswith("at ")):
try:
await apply_command_to_contacts(mc, line.split(" ",2)[1], line.split(" ",2)[2])
except IndexError:
logger.error(f"Error with apply_to command parameters")
# commands are passed through if at root
elif contact is None or line.startswith(".") :
try:
@ -966,6 +977,23 @@ async def process_contact_chat_line(mc, contact, line):
await process_cmds(mc, args)
return True
if line.startswith("contact_name") or line.startswith("cn"):
print(contact['adv_name'],end="")
if " " in line:
print(" ", end="")
secline = line.split(" ", 1)[1]
await process_contact_chat_line(mc, contact, secline)
else:
print("")
return True
if line == "contact_lastmod":
timestamp = contact["lastmod"]
print(f"{contact['adv_name']} updated"
f" {datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d at %H:%M:%S')}"
f" ({timestamp})")
return True
# commands that take contact as second arg will be sent to recipient
if line == "sc" or line == "share_contact" or\
line == "ec" or line == "export_contact" or\
@ -1053,7 +1081,7 @@ async def process_contact_chat_line(mc, contact, line):
return True
# same but for commands with a parameter
if line.startswith("cmd ") or\
if line.startswith("cmd ") or line.startswith("msg ") or\
line.startswith("cp ") or line.startswith("change_path ") or\
line.startswith("cf ") or line.startswith("change_flags ") or\
line.startswith("req_binary ") or\
@ -1085,7 +1113,7 @@ async def process_contact_chat_line(mc, contact, line):
if password == "":
try:
sess = PromptSession("Password: ", is_password=True)
sess = PromptSession(f"Password for {contact['adv_name']}: ", is_password=True)
password = await sess.prompt_async()
except EOFError:
logger.info("Canceled")
@ -1124,6 +1152,87 @@ async def process_contact_chat_line(mc, contact, line):
return False
async def apply_command_to_contacts(mc, contact_filter, line):
upd_before = None
upd_after = None
contact_type = None
min_hops = None
max_hops = None
filters = contact_filter.split(",")
for f in filters:
if f == "all":
pass
elif f[0] == "u": #updated
val_str = f[2:]
t = time.time()
if val_str[-1] == "d": # value in days
t = t - float(val_str[0:-1]) * 86400
elif val_str[-1] == "h": # value in hours
t = t - float(val_str[0:-1]) * 3600
elif val_str[-1] == "m": # value in minutes
t = t - float(val_str[0:-1]) * 60
else:
t = int(val_str)
if f[1] == "<": #before
upd_before = t
elif f[1] == ">":
upd_after = t
else:
logger.error(f"Time filter can only be < or >")
return
elif f[0] == "t": # type
if f[1] == "=":
contact_type = int(f[2:])
else:
logger.error(f"Type can only be equals to a value")
return
elif f[0] == "d": # direct
min_hops=0
elif f[0] == "f": # flood
max_hops=-1
elif f[0] == "h": # hop number
if f[1] == ">":
min_hops = int(f[2:])+1
elif f[1] == "<":
max_hops = int(f[2:])-1
elif f[1] == "=":
min_hops = int(f[2:])
max_hops = int(f[2:])
else:
logger.error(f"Unknown filter {f}")
return
for c in dict(mc._contacts).items():
contact = c[1]
if (contact_type is None or contact["type"] == contact_type) and\
(upd_before is None or contact["lastmod"] < upd_before) and\
(upd_after is None or contact["lastmod"] > upd_after) and\
(min_hops is None or contact["out_path_len"] >= min_hops) and\
(max_hops is None or contact["out_path_len"] <= max_hops):
if await process_contact_chat_line(mc, contact, line):
pass
elif line == "remove_contact":
args = [line, contact['adv_name']]
await process_cmds(mc, args)
elif line.startswith("send") or line.startswith("\"") :
if line.startswith("send") :
line = line[5:]
if line.startswith("\"") :
line = line[1:]
await msg_ack(mc, contact, line)
elif contact["type"] == 2 or\
contact["type"] == 3 or\
contact["type"] == 4 : # repeater, room, sensor send cmd
await process_cmds(mc, ["cmd", contact["adv_name"], line])
# wait for a reply from cmd
await mc.wait_for_event(EventType.MESSAGES_WAITING)
else:
logger.error(f"Can't send {line} to {contact['adv_name']}")
async def send_cmd (mc, contact, cmd) :
res = await mc.commands.send_cmd(contact, cmd)
@ -2718,7 +2827,7 @@ async def next_cmd(mc, cmds, json_output=False):
await mc.ensure_contacts()
contact = mc.get_contact_by_name(cmds[0])
if contact is None:
logger.error(f"Unknown command : {cmd}. {cmds} not executed ...")
logger.error(f"Unknown command : {cmd}, {cmds} not executed ...")
return None
await interactive_loop(mc, to=contact)