diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 4debe58..4e80806 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -3931,7 +3931,131 @@ REPEATER_HELP = f""" {ANSI_BYELLOW}Type 'quit' or 'q' to exit, Ctrl+C to abort{ANSI_END} """ -async def repeater_loop(port, baudrate): +async def prompt_for_file(): + try: + if os.path.isDIR(MCCLI_CONFIG_DIR): + region_files_history = FileHistory(MCCLI_REGION_FILES_HISTORY) + else: + region_files_history = None + except Exception: + region_files_history = None + + file_session = PromptSession( + history=region_files_history, + wrap_lines=False, + mouse_support=False, + complete_style=CompleteStyle.MULTI_COLUMN + ) + + # Setup key bindings + bindings = KeyBindings() + + @bindings.add("escape") + def _(event): + event.app.current_buffer.cancel_completion() + + path_completer = PathCompleter(expanduser=True) + + file_path = await file_session.prompt_async( + "Enter filename (Tab to complete CTRL+C to cancel): ", + completer=path_completer, + complete_while_typing=False, + key_bindings=bindings + ) + + return file_path + +async def process_repeater_line(ser, cmd) : + if cmd.lower() == "help": + print(REPEATER_HELP) + return + + if cmd.lower() == "clock sync" or cmd.lower() == "st" or cmd.lower() == "sync_time": + cur_time = int(time.time()) + print(f'{ANSI_GREEN}Syncing clock to' + f' {datetime.datetime.fromtimestamp(cur_time).strftime("%Y-%m-%d %H:%M:%S")}' + f' ({cur_time}){ANSI_END}') + cmd = f"time {cur_time}" + + if cmd.lower().startswith("region upload"): + try: + if cmd.lower() == "region upload": # prompt for a filename + file_path = await prompt_for_file() + else : + file_path = cmd.lower().split(" ", 3)[2] + + file_path = file_path.replace("~", str(Path.home())) + with open(file_path, "r") as file: + ser.write("region load\r".encode()) + for line in file: + if line.strip() == "": # terminate on empty line + break + if not line.startswith(";"): # don't send lines starting with ; + ser.write(f"{line.rstrip()}\r".encode()) + ser.write("\r".encode()) + + except FileNotFoundError: + logger.error("File not found") + except (EOFError, KeyboardInterrupt): + logger.info("Region upload canceled") + + # in any case, send an empty line and clean buffer + cmd = "" + + if cmd.lower().startswith("region download"): + try: + if cmd.lower() == "region download": # prompt for a filename + file_path = await prompt_for_file() + else : + file_path = cmd.lower().split(" ", 3)[2] + + file_path = file_path.replace("~", str(Path.home())) + + with open(file_path, "w") as file: + ser.write("region\r".encode()) # send regions command + + # seek start of regions description + line = ser.readline().decode(errors='ignore') + while not line.startswith(" ->") : + line = ser.readline().decode(errors='ignore') + + line = line[5:] + + while line.rstrip() != "": + file.write(line) + line = ser.readline().decode(errors='ignore') + + except FileNotFoundError: + logger.error("File not found") + except (EOFError, KeyboardInterrupt): + logger.info("Region download canceled") + + return + + # Send command with CR terminator + if cmd != "": + ser.write(f"{cmd}\r".encode()) + await asyncio.sleep(0.3) + + # Read response + response = ser.read(ser.in_waiting or 4096).decode(errors='ignore') + if response: + # Clean up echo and format response + lines = response.rstrip().split('\n') + for line in lines: + line = line.rstrip() + if line and line != cmd: # Skip echo of command + # Color code certain responses + if line.strip().startswith("OK") or line.strip().startswith("ok"): + print(f"{ANSI_GREEN}{line}{ANSI_END}") + elif line.strip().startswith("Error") or line.startswith("ERR"): + print(f"{ANSI_RED}{line}{ANSI_END}") + elif line.strip().startswith("->"): + print(f"{ANSI_CYAN}{line}{ANSI_END}") + else: + print(line) + +async def setup_repeater_serial(port, baudrate): """Interactive loop for repeater text CLI (raw serial commands)""" import serial as pyserial @@ -3953,18 +4077,32 @@ async def repeater_loop(port, baudrate): await asyncio.sleep(0.2) ser.reset_input_buffer() - # Try to get device info - ser.write(b"ver\r") - await asyncio.sleep(0.3) - ver_response = ser.read(ser.in_waiting or 256).decode(errors='ignore').strip() - device_name = "Repeater" - for line in ver_response.split('\n'): - line = line.strip() - if line and not line.startswith("ver") and ">" not in line[:3]: - device_name = line.split('(')[0].strip() if '(' in line else line - break + return ser - print(f"{ANSI_BGREEN}Connected!{ANSI_END} Device: {ANSI_BMAGENTA}{device_name}{ANSI_END}") +async def repeater_loop(ser): + # Try to get device name + ser.write(b"get name\r") + line = "" + while not line.startswith("get name"): + line = ser.readline().decode(errors="ignore").rstrip() + + device_name = "Repeater" + line = ser.readline().decode(errors="ignore").rstrip() + if (line.startswith(" -> > ")): + device_name = line[7:] + + # Getting version + ser.write(b"ver\r") + line = ser.readline().decode(errors="ignore").rstrip() + while not line == "ver": + line = ser.readline().decode(errors="ignore").rstrip() + + device_version = "unknown" + line = ser.readline().decode(errors="ignore").rstrip() + if (line.startswith(" -> ")): + device_version = line[6:] + + print(f"{ANSI_BGREEN}Connected!{ANSI_END} Device: {ANSI_BMAGENTA}{device_name}{ANSI_END} version {ANSI_BMAGENTA}{device_version}{ANSI_END}") print(f"Type {ANSI_BCYAN}help{ANSI_END} for commands, {ANSI_BCYAN}quit{ANSI_END} to exit, {ANSI_BCYAN}Tab{ANSI_END} for completion") print("-" * 50) @@ -3984,21 +4122,6 @@ async def repeater_loop(port, baudrate): complete_style=CompleteStyle.MULTI_COLUMN ) - try: - if os.path.isDIR(MCCLI_CONFIG_DIR): - region_files_history = FileHistory(MCCLI_REGION_FILES_HISTORY) - else: - region_files_history = None - except Exception: - region_files_history = None - - file_session = PromptSession( - history=region_files_history, - wrap_lines=False, - mouse_support=False, - complete_style=CompleteStyle.MULTI_COLUMN - ) - # Setup key bindings bindings = KeyBindings() @@ -4031,110 +4154,8 @@ async def repeater_loop(port, baudrate): if cmd.lower() in ("quit", "exit", "q"): break - if cmd.lower() == "help": - print(REPEATER_HELP) - continue + await process_repeater_line(ser, cmd) - if cmd.lower() == "clock sync" or cmd.lower() == "st" or cmd.lower() == "sync_time": - cur_time = int(time.time()) - print(f'{ANSI_GREEN}Syncing clock to' - f' {datetime.datetime.fromtimestamp(cur_time).strftime("%Y-%m-%d %H:%M:%S")}' - f' ({cur_time}){ANSI_END}') - cmd = f"time {cur_time}" - - if cmd.lower().startswith("region upload"): - try: - if cmd.lower() == "region upload": # prompt for a filename - path_completer = PathCompleter(expanduser=True) - file_path = await session.prompt_async( - "Filename: ", - completer=path_completer, - complete_while_typing=False, - key_bindings=bindings - ) - else : - file_path = cmd.lower().split(" ", 3)[2] - - file_path = file_path.replace("~", str(Path.home())) - with open(file_path, "r") as file: - ser.write("region load\r".encode()) - for line in file: - if line.strip() == "": # terminate on empty line - break - if not line.startswith(";"): # don't send lines starting with ; - ser.write(f"{line.rstrip()}\r".encode()) - ser.write("\r".encode()) - - except FileNotFoundError: - logger.error("File not found") - except (EOFError, KeyboardInterrupt): - logger.info("Region upload canceled") - - # in any case, send an empty line and clean buffer - cmd = "" - - if cmd.lower().startswith("region download"): - try: - if cmd.lower() == "region download": # prompt for a filename - path_completer = PathCompleter(expanduser=True) - file_path = await session.prompt_async( - "Filename: ", - completer=path_completer, - complete_while_typing=False, - key_bindings=bindings - ) - else : - file_path = cmd.lower().split(" ", 3)[2] - - file_path = file_path.replace("~", str(Path.home())) - - with open(file_path, "w") as file: - ser.write("region\r".encode()) # send regions command - - # seek start of regions description - line = ser.readline().decode(errors='ignore') - while not line.startswith(" ->") : - line = ser.readline().decode(errors='ignore') - - line = line[5:] - - while line.rstrip() != "": - file.write(line) - line = ser.readline().decode(errors='ignore') - - except FileNotFoundError: - logger.error("File not found") - except (EOFError, KeyboardInterrupt): - logger.info("Region download canceled") - - # in any case, send an empty line to clean buffer - continue - - # Send command with CR terminator - if cmd != "": - ser.write(f"{cmd}\r".encode()) - await asyncio.sleep(0.3) - - # Read response - response = ser.read(ser.in_waiting or 4096).decode(errors='ignore') - if response: - # Clean up echo and format response - lines = response.rstrip().split('\n') - for line in lines: - line = line.rstrip() - if line and line != cmd: # Skip echo of command - # Color code certain responses - if line.strip().startswith("OK") or line.strip().startswith("ok"): - print(f"{ANSI_GREEN}{line}{ANSI_END}") - elif line.strip().startswith("Error") or line.startswith("ERR"): - print(f"{ANSI_RED}{line}{ANSI_END}") - elif line.strip().startswith("->"): - print(f"{ANSI_CYAN}{line}{ANSI_END}") - else: - print(line) - - ser.close() - print(f"\n{ANSI_BGRAY}Disconnected from repeater.{ANSI_END}") async def main(argv): """ Do the job """ @@ -4264,7 +4285,18 @@ async def main(argv): print("Error: Repeater mode (-r) requires serial port (-s)") command_usage() return - await repeater_loop(serial_port, baudrate) + + ser = await setup_repeater_serial(serial_port, baudrate) + + print(ser) + + if (len(args) > 0) : + await process_repeater_line(ser, " ".join(args)) + else: + await repeater_loop(ser) + + ser.close() + print(f"\n{ANSI_BGRAY}Disconnected from repeater.{ANSI_END}") return mc = None