diff --git a/pyproject.toml b/pyproject.toml index 5a05215..2bf2112 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "meshcore-cli" -version = "1.2.12" +version = "1.2.13" authors = [ { name="Florent de Lamotte", email="florent@frizoncorrea.fr" }, ] diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 40ccb1b..c249371 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -4,7 +4,7 @@ """ import asyncio -import os, sys, io, platform +import os, sys, io import time, datetime import getopt, json, shlex, re import logging @@ -32,7 +32,7 @@ import re from meshcore import MeshCore, EventType, logger # Version -VERSION = "v1.2.12" +VERSION = "v1.2.13" # default ble address is stored in a config file MCCLI_CONFIG_DIR = str(Path.home()) + "/.config/meshcore/" @@ -76,13 +76,11 @@ ANSI_YELLOW = "\033[0;33m" ANSI_BYELLOW = "\033[1;33m" #Unicode chars -# some possible symbols for prompts 🭬🬛🬗🭬🬛🬃🬗🭬🬛🬃🬗🬏🭀🭋🭨🮋 -ARROW_TAIL = "🭨" -ARROW_HEAD = "🭬" - -if platform.system() == 'Windows' or platform.system() == 'Darwin': - ARROW_TAIL = "" - ARROW_HEAD = " " +# some possible symbols for prompts 🭬🬛🬗🭬🬛🬃🬗🭬🬛🬃🬗🬏🭀🭋🭨🮋 +ARROW_HEAD = "" +SLASH_END = "" +SLASH_START = "" +INVERT_SLASH = False def escape_ansi(line): ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') @@ -470,7 +468,7 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None): "login" : contact_list, "cmd" : contact_list, "req_status" : contact_list, - "req_bstatus" : contact_list, + "req_neighbours": contact_list, "logout" : contact_list, "req_telemetry" : contact_list, "req_binary" : contact_list, @@ -495,7 +493,6 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None): "print_snr" : {"on":None, "off": None}, "json_msgs" : {"on":None, "off": None}, "color" : {"on":None, "off":None}, - "print_name" : {"on":None, "off":None}, "print_adverts" : {"on":None, "off":None}, "json_log_rx" : {"on":None, "off":None}, "channel_echoes" : {"on":None, "off":None}, @@ -525,7 +522,6 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None): "print_snr":None, "json_msgs":None, "color":None, - "print_name":None, "print_adverts":None, "json_log_rx":None, "channel_echoes":None, @@ -577,7 +573,6 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None): "login" : None, "logout" : None, "req_status" : None, - "req_bstatus" : None, "req_neighbours": None, "cmd" : None, "ver" : None, @@ -765,26 +760,32 @@ Line starting with \"$\" or \".\" will issue a meshcli command. color = process_event_message.color classic = interactive_loop.classic or not color - print_name = interactive_loop.print_name if classic: prompt = "" else: prompt = f"{ANSI_INVERT}" - if print_name or contact is None : - if color: - prompt = prompt + f"{ANSI_BGRAY}" - prompt = prompt + f"{mc.self_info['name']}" - if contact is None: # display scope - if not scope is None: - prompt = prompt + f"|{scope}" - if classic : - prompt = prompt + "> " - else : - prompt = prompt + f"{ANSI_NORMAL}{ARROW_HEAD}{ANSI_INVERT}" + prompt = prompt + f"{ANSI_BGRAY}" + prompt = prompt + f"{mc.self_info['name']}" + if contact is None: # display scope + if not scope is None: + prompt = prompt + f"|{scope}" - if not contact is None : + if contact is None : + if classic : + prompt = prompt + ">" + else : + prompt = prompt + f"{ANSI_NORMAL}{ARROW_HEAD}" + else: + if classic : + prompt = prompt + "/" + else : + if INVERT_SLASH: + prompt = prompt + f"{ANSI_INVERT}" + else: + prompt = prompt + f"{ANSI_NORMAL}" + prompt = prompt + f"{SLASH_START}" if not last_ack: prompt = prompt + f"{ANSI_BRED}" if classic : @@ -800,11 +801,9 @@ Line starting with \"$\" or \".\" will issue a meshcli command. else : prompt = prompt + f"{ANSI_BBLUE}" if not classic: + prompt = prompt + f"{SLASH_END}" prompt = prompt + f"{ANSI_INVERT}" - if print_name and not classic : - prompt = prompt + f"{ANSI_NORMAL}{ARROW_TAIL}{ANSI_INVERT}" - prompt = prompt + f"{contact['adv_name']}" if contact["type"] == 0 or contact["out_path_len"]==-1: if scope is None: @@ -818,14 +817,15 @@ Line starting with \"$\" or \".\" will issue a meshcli command. prompt = prompt + "|" + contact["out_path"] if classic : - prompt = prompt + f"{ANSI_NORMAL}> " + prompt = prompt + f"{ANSI_NORMAL}>" else: prompt = prompt + f"{ANSI_NORMAL}{ARROW_HEAD}" prompt = prompt + f"{ANSI_END}" - if not color : - prompt=escape_ansi(prompt) + prompt = prompt + " " + if not color : + prompt=escape_ansi(prompt) session.app.ttimeoutlen = 0.2 session.app.timeoutlen = 0.2 @@ -1037,7 +1037,6 @@ Line starting with \"$\" or \".\" will issue a meshcli command. # Handle task cancellation from KeyboardInterrupt in asyncio.run() print("Exiting cli") interactive_loop.classic = False -interactive_loop.print_name = True async def process_contact_chat_line(mc, contact, line): if contact["type"] == 0: @@ -1085,7 +1084,6 @@ async def process_contact_chat_line(mc, contact, line): line == "contact_info" or line == "ci" or\ line == "req_status" or line == "rs" or\ line == "req_neighbours" or line == "rn" or\ - line == "req_bstatus" or line == "rbs" or\ line == "req_telemetry" or line == "rt" or\ line == "req_acl" or\ line == "path" or\ @@ -1615,7 +1613,7 @@ async def print_disc_trace_to (mc, contact): async def next_cmd(mc, cmds, json_output=False): """ process next command """ - global ARROW_TAIL, ARROW_HEAD + global ARROW_HEAD, SLASH_START, SLASH_END, INVERT_SLASH try : argnum = 0 @@ -1741,18 +1739,18 @@ async def next_cmd(mc, cmds, json_output=False): msg_ack.max_attempts=int(cmds[2]) case "flood_after": msg_ack.flood_after=int(cmds[2]) - case "print_name": - interactive_loop.print_name = (cmds[2] == "on") - if json_output : - print(json.dumps({"cmd" : cmds[1], "param" : cmds[2]})) case "classic_prompt": interactive_loop.classic = (cmds[2] == "on") if json_output : print(json.dumps({"cmd" : cmds[1], "param" : cmds[2]})) - case "arrow_tail": - ARROW_TAIL = cmds[2] case "arrow_head": ARROW_HEAD = cmds[2] + case "slash_start": + SLASH_START = cmds[2] + case "slash_end": + SLASH_END = cmds[2] + case "invert_slash": + INVERT_SLASH = cmds[2] == "on" case "color" : process_event_message.color = (cmds[2] == "on") if json_output : @@ -1983,11 +1981,6 @@ async def next_cmd(mc, cmds, json_output=False): print(json.dumps({"flood_after" : msg_ack.flood_after})) else: print(f"flood_after: {msg_ack.flood_after}") - case "print_name": - if json_output : - print(json.dumps({"print_name" : interactive_loop.print_name})) - else: - print(f"{'on' if interactive_loop.print_name else 'off'}") case "classic_prompt": if json_output : print(json.dumps({"classic_prompt" : interactive_loop.classic})) @@ -2362,7 +2355,7 @@ async def next_cmd(mc, cmds, json_output=False): if classic : print("→",end="") else : - print(f"{ANSI_NORMAL}{ARROW_HEAD}",end="") + print(f"{ANSI_NORMAL}{ARROW_HEAD} ",end="") if color: print(ANSI_END, end="") if "hash" in t: @@ -2428,46 +2421,6 @@ async def next_cmd(mc, cmds, json_output=False): contact = mc.get_contact_by_name(cmds[1]) contact["timeout"] = float(cmds[2]) - case "req_status" | "rs" : - argnum = 1 - await mc.ensure_contacts() - contact = mc.get_contact_by_name(cmds[1]) - res = await mc.commands.send_statusreq(contact) - logger.debug(res) - if res.type == EventType.ERROR: - print(f"Error while requesting status: {res}") - else : - timeout = res.payload["suggested_timeout"]/800 if not "timeout" in contact or contact['timeout']==0 else contact["timeout"] - res = await mc.wait_for_event(EventType.STATUS_RESPONSE, timeout=timeout) - logger.debug(res) - if res is None: - if json_output : - print(json.dumps({"error" : "Timeout waiting status"})) - else: - print("Timeout waiting status") - else : - print(json.dumps(res.payload, indent=4)) - - case "req_telemetry" | "rt" : - argnum = 1 - await mc.ensure_contacts() - contact = mc.get_contact_by_name(cmds[1]) - res = await mc.commands.send_telemetry_req(contact) - logger.debug(res) - if res.type == EventType.ERROR: - print(f"Error while requesting telemetry") - else: - timeout = res.payload["suggested_timeout"]/800 if not "timeout" in contact or contact['timeout']==0 else contact["timeout"] - res = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=timeout) - logger.debug(res) - if res is None: - if json_output : - print(json.dumps({"error" : "Timeout waiting telemetry"})) - else: - print("Timeout waiting telemetry") - else : - print(json.dumps(res.payload, indent=4)) - case "disc_path" | "dp" : argnum = 1 await mc.ensure_contacts() @@ -2553,7 +2506,7 @@ async def next_cmd(mc, cmds, json_output=False): print(f" {name:16} {type:>4} SNR: {n['SNR_in']:6,.2f}->{n['SNR']:6,.2f} RSSI: ->{n['RSSI']:4}") - case "req_btelemetry"|"rbt" : + case "req_telemetry"|"rt" : argnum = 1 await mc.ensure_contacts() contact = mc.get_contact_by_name(cmds[1]) @@ -2565,9 +2518,13 @@ async def next_cmd(mc, cmds, json_output=False): else: print("Error getting data") else : - print(json.dumps(res)) + print(json.dumps({ + "name": contact["adv_name"], + "pubkey_pre": contact["public_key"][0:12], + "lpp": res, + }, indent = 4)) - case "req_bstatus"|"rbs" : + case "req_status"|"rs" : argnum = 1 await mc.ensure_contacts() contact = mc.get_contact_by_name(cmds[1])