diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 07ea6c9..a62d95b 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -34,10 +34,77 @@ JSON = False PS = None CS = None -# Subscribe to incoming messages +def print_above(str): + """ prints a string above current line """ + width = os.get_terminal_size().columns + lines = divmod(len(str), width)[0] + 1 + print("\u001B[s", end="") # Save current cursor position + print("\u001B[A", end="") # Move cursor up one line + print("\u001B[999D", end="") # Move cursor to beginning of line + for _ in range(lines): + print("\u001B[S", end="") # Scroll up/pan window down 1 line + print("\u001B[L", end="") # Insert new line + for _ in range(lines - 1): + print("\u001B[A", end="") # Move cursor up one line + print(str, end="") # Print output status msg + print("\u001B[u", end="", flush=True) # Jump back to saved cursor position + +async def process_event_message(mc, ev, json_output, end="\n", above=False): + """ display incoming message """ + if ev is None : + logger.error("Event does not contain message.") + elif ev.type == EventType.NO_MORE_MSGS: + logger.debug("No more messages") + return False + elif ev.type == EventType.ERROR: + logger.error(f"Error retrieving messages: {ev.payload}") + return False + elif json_output : + print(json.dumps(ev.payload, indent=4), end=end, flush=True) + else : + await mc.ensure_contacts() + data = ev.payload + if (data['type'] == "PRIV") : + ct = mc.get_contact_by_key_prefix(data['pubkey_prefix']) + if ct is None: + logger.info(f"Unknown contact with pubkey prefix: {data['pubkey_prefix']}") + name = data["pubkey_prefix"] + else: + name = ct["adv_name"] + if data['path_len'] == 255 : + path_str = "D" + else : + path_str = str(data['path_len']) + + disp = f" {name}" + if 'signature' in data: + sender = mc.get_contact_by_key_prefix(data['signature']) + disp = disp + f"/{sender['adv_name']}" + disp = disp + f"({path_str}): " + disp = disp + f"{data['text']}" + + if above: + print_above(disp) + else: + print(disp) + elif (data['type'] == "CHAN") : + if data['path_len'] == 255 : + path_str = "D" + else : + path_str = str(data['path_len']) + if above: + print_above(f" ch{data['channel_idx']}({path_str}): {data['text']}") + else: + print(f"ch{data['channel_idx']}({path_str}): {data['text']}") + else: + print(json.dumps(ev.payload)) + return True + async def handle_message(event): + """ Process incoming message events """ await process_event_message(MC, event, False, above=True) +# Subscribe to incoming messages async def subscribe_to_msgs(mc): global PS, CS await mc.ensure_contacts() @@ -75,10 +142,12 @@ Line starting with \"$\" or \".\" will issue a meshcli command. if line == "" : # blank line pass - elif line.startswith("$") : # command + # raw meshcli command as on command line + elif line.startswith("$") : args = shlex.split(line[1:]) await process_cmds(mc, args) + # commands that are passed through elif line.startswith(".") or\ line.startswith("set ") or\ line.startswith("get ") or\ @@ -89,7 +158,8 @@ Line starting with \"$\" or \".\" will issue a meshcli command. line.startswith("advert") or\ line.startswith("floodadv") or\ line.startswith("chan") or\ - line.startswith("card") : # commands are passed through + line.startswith("card") or \ + line == "infos" or line == "i" : args = shlex.split(line) await process_cmds(mc, args) @@ -100,22 +170,20 @@ Line starting with \"$\" or \".\" will issue a meshcli command. await process_cmds(mc, args) # commands that take contact as second arg will be sent to recipient + elif line == "sc" or line == "share_contact" or\ + line == "ec" or line == "export_contact" or\ + line == "rp" or line == "reset_path" or\ + line == "logout" : + args = [line, contact['adv_name']] + await process_cmds(mc, args) + + # same but for commands with a parameter elif line.startswith("cmd ") or line.startswith("cli ") or\ - line.startswith("logout ") or\ line.startswith("cp ") or line.startswith("change_path ") : cmds = line.split(" ", 1) args = [cmds[0], contact['adv_name'], cmds[1]] await process_cmds(mc, args) - # same but with no parameter - elif line.startswith("sc ") or line.startswith("share_contact ") or\ - line.startswith("ec ") or line.startswith("export_contact ") or\ - line.startswith("rp ") or line.startswith("reset_path ") or\ - line.startswith("remove_contact ") : - cmd = line.split(" ") - args = [cmd, contact['adv_name']] - await process_cmds(mc, args) - # special treatment for login (wait for login to complete) elif line.startswith("login ") : cmds = line.split(" ", 1) @@ -134,7 +202,7 @@ Line starting with \"$\" or \".\" will issue a meshcli command. await process_cmds(mc, args) elif line.startswith("@") : # send a cli command that won't need quotes ! - args=["cmd", line[1:]] + args=["cli", contact['adv_name'], line[1:]] await process_cmds(mc, args) elif line.startswith("to ") : # dest @@ -191,60 +259,6 @@ Line starting with \"$\" or \".\" will issue a meshcli command. # Handle task cancellation from KeyboardInterrupt in asyncio.run() print("Exiting cli") -def print_above(str): - width = os.get_terminal_size().columns - lines = divmod(len(str), width)[0] + 1 - print("\u001B[s", end="") # Save current cursor position - print("\u001B[A", end="") # Move cursor up one line - print("\u001B[999D", end="") # Move cursor to beginning of line - for _ in range(lines): - print("\u001B[S", end="") # Scroll up/pan window down 1 line - print("\u001B[L", end="") # Insert new line - for _ in range(lines - 1): - print("\u001B[A", end="") # Move cursor up one line - print(str, end="") # Print output status msg - print("\u001B[u", end="", flush=True) # Jump back to saved cursor position - -async def process_event_message(mc, ev, json_output, end="\n", above=False): - if ev.type == EventType.NO_MORE_MSGS: - logger.debug("No more messages") - return False - elif ev.type == EventType.ERROR: - logger.error(f"Error retrieving messages: {ev.payload}") - return False - elif json_output : - print(json.dumps(ev.payload, indent=4), end=end, flush=True) - else : - await mc.ensure_contacts() - data = ev.payload - if (data['type'] == "PRIV") : - ct = mc.get_contact_by_key_prefix(data['pubkey_prefix']) - if ct is None: - logger.info(f"Unknown contact with pubkey prefix: {data['pubkey_prefix']}") - name = data["pubkey_prefix"] - else: - name = ct["adv_name"] - if data['path_len'] == 255 : - path_str = "D" - else : - path_str = str(data['path_len']) - if above: - print_above(f" {name}({path_str}): {data['text']}") - else: - print(f"{name}({path_str}): {data['text']}") - elif (data['type'] == "CHAN") : - if data['path_len'] == 255 : - path_str = "D" - else : - path_str = str(data['path_len']) - if above: - print_above(f" ch{data['channel_idx']}({path_str}): {data['text']}") - else: - print(f"ch{data['channel_idx']}({path_str}): {data['text']}") - else: - print(json.dumps(ev.payload)) - return True - async def next_cmd(mc, cmds, json_output=False): """ process next command """ argnum = 0 @@ -799,7 +813,7 @@ async def next_cmd(mc, cmds, json_output=False): case "interactive" | "im" | "chat" : await interactive_loop(mc) - case "chat_to" | "to" : + case "chat_to" | "imto" | "to" : argnum = 1 await mc.ensure_contacts() contact = mc.get_contact_by_name(cmds[1])