color in incoming messages

This commit is contained in:
Florent 2025-04-18 22:24:47 +02:00
parent 708c7c54e2
commit c15ddd9f1a

View file

@ -9,6 +9,7 @@ import getopt, json, shlex
import logging
from pathlib import Path
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.shortcuts import CompleteStyle
from prompt_toolkit.completion import NestedCompleter
from prompt_toolkit.history import FileHistory
@ -28,6 +29,15 @@ JSON = False
PS = None
CS = None
# Ansi colors
ANSI_END = "\033[0m"
ANSI_GREEN = "\033[0;32m"
ANSI_BLUE = "\033[0;34m"
ANSI_YELLOW = "\033[1;33m"
ANSI_RED = "\033[0;31m"
ANSI_LIGHT_BLUE = "\033[1;34m"
ANSI_LIGHT_GREEN = "\033[1;32m"
def print_above(str):
""" prints a string above current line """
width = os.get_terminal_size().columns
@ -70,11 +80,11 @@ async def process_event_message(mc, ev, json_output, end="\n", above=False):
else :
path_str = str(data['path_len'])
disp = f" {name}"
disp = f" {ANSI_GREEN}{name}"
if 'signature' in data:
sender = mc.get_contact_by_key_prefix(data['signature'])
disp = disp + f"/{sender['adv_name']}"
disp = disp + f"({path_str}): "
disp = disp + f"{ANSI_YELLOW}({path_str}){ANSI_END}: "
disp = disp + f"{data['text']}"
if above:
@ -85,9 +95,10 @@ async def process_event_message(mc, ev, json_output, end="\n", above=False):
if data['path_len'] == 255 :
path_str = "D"
else :
path_str = str(data['path_len'])
path_str = f"{ANSI_YELLOW}({data['path_len']}){ANSI_END}"
if above:
print_above(f" ch{data['channel_idx']}({path_str}): {data['text']}")
print_above(f" {ANSI_GREEN}ch{data['channel_idx']}({path_str}): {data['text']}")
else:
print(f"ch{data['channel_idx']}({path_str}): {data['text']}")
else:
@ -133,7 +144,7 @@ def make_completion_dict(contacts):
"$login" : contact_list,
"$logout" : contact_list,
"$wl" : contact_list,
"$ec" : contact_list
"quit" : None
}
return completion_list
@ -179,13 +190,17 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
else:
our_history = None
session = PromptSession(completer=completer, history=our_history)
session = PromptSession(completer=completer,
history=our_history,
wrap_lines=False,
mouse_support=True,
complete_style=CompleteStyle.MULTI_COLUMN)
last_ack = True
while True:
prompt = ""
if not last_ack:
prompt = prompt + "!"
prompt = prompt + f"!"
prompt = prompt + f"{contact['adv_name']}> "
line = await session.prompt_async(prompt, complete_while_typing=False)
@ -1052,8 +1067,18 @@ async def main(argv):
MC = MeshCore(con, debug=debug)
await MC.connect()
res = await MC.commands.send_device_query()
if res.type == EventType.ERROR :
logger.error(f"Error while querying device: {res}")
return
if (json_output) :
logger.setLevel(logging.ERROR)
else :
if res.payload["fw ver"] > 2 :
logger.info(f"Connected to {MC.self_info['name']} running on a {res.payload["ver"]} fw.")
else :
logger.info(f"Connected to {MC.self_info['name']}.")
if len(args) == 0 : # no args, run in chat mode
await process_cmds(MC, ["chat"], json_output)