diff --git a/pyproject.toml b/pyproject.toml index 5822d37..a42c885 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "meshcore-cli" -version = "1.3.20" +version = "1.3.21" authors = [ { name="Florent de Lamotte", email="florent@frizoncorrea.fr" }, ] diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 227c25c..7428c38 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -32,7 +32,7 @@ import re from meshcore import MeshCore, EventType, logger # Version -VERSION = "v1.3.20" +VERSION = "v1.3.21" # default ble address is stored in a config file MCCLI_CONFIG_DIR = str(Path.home()) + "/.config/meshcore/" @@ -3628,7 +3628,8 @@ def command_usage() : -h : prints help for arguments and commands -v : prints version -j : json output (disables init file) - -D : debug + -D : debug (sets logging to DEBUG) + -q : quiet (sets logging to ERROR) -S : scan for devices and show a selector -l : list available ble/serial devices and exit -T : timeout for the ble scan (-S and -l) default 2s @@ -3938,6 +3939,7 @@ REPEATER_HELP = f""" {ANSI_BGREEN}System:{ANSI_END} reboot - Reboot device erase - Erase filesystem (serial only) + script - Execute script {ANSI_BYELLOW}Type 'quit' or 'q' to exit, Ctrl+C to abort{ANSI_END} """ @@ -3976,10 +3978,31 @@ async def prompt_for_file(): return file_path -async def process_repeater_line(ser, cmd) : +async def process_repeater_script(ser, file): + if not os.path.exists(file) : + logger.info(f"file {file} not found") + return + + with open(file, "r") as f : + lines=f.readlines() + + for line in lines: + line = line.strip() + if not (line == "" or line[0] == "#"): + logger.debug(f"processing {line}") + try : + res = await process_repeater_line(ser, line, echo=True) + if not res: + logger.info("Error during script execution, exiting") + break + except ValueError: + logger.error(f"Error processing {line}") + break + +async def process_repeater_line(ser, cmd, echo=False) : if cmd.lower() == "help": print(REPEATER_HELP) - return + return True if cmd.lower() == "clock sync" or cmd.lower() == "st" or cmd.lower() == "sync_time": cur_time = int(time.time()) @@ -4007,8 +4030,10 @@ async def process_repeater_line(ser, cmd) : except FileNotFoundError: logger.error("File not found") + return False except (EOFError, KeyboardInterrupt): logger.info("Region upload canceled") + return False # in any case, send an empty line and clean buffer cmd = "" @@ -4035,13 +4060,32 @@ async def process_repeater_line(ser, cmd) : while line.rstrip() != "": file.write(line) line = ser.readline().decode(errors='ignore') + except FileNotFoundError: + logger.error("File not found") + return False + except (EOFError, KeyboardInterrupt): + logger.info("Region download canceled") + return False + + return True + + if cmd.lower().startswith("script"): + try: + if cmd.lower() == "script": # prompt for a filename + file_path = await prompt_for_file() + else : + file_path = cmd.lower().split(" ", 2)[1] + + file_path = file_path.replace("~", str(Path.home())) + + return await process_repeater_script(ser, file_path) except FileNotFoundError: logger.error("File not found") + return False except (EOFError, KeyboardInterrupt): - logger.info("Region download canceled") - - return + logger.info("Script canceled") + return False # Send command with CR terminator if cmd != "": @@ -4049,36 +4093,41 @@ async def process_repeater_line(ser, cmd) : await asyncio.sleep(0.3) # Read response + result = True response = ser.read(ser.in_waiting or 4096).decode(errors='ignore') if response: # Clean up echo and format response lines = response.rstrip().split('\n') for line in lines: line = line.rstrip() - if line and line != cmd: # Skip echo of command + if line and (echo or line != cmd): # Skip echo of command # Color code certain responses if line.strip().startswith("OK") or line.strip().startswith("ok"): print(f"{ANSI_GREEN}{line}{ANSI_END}") - elif line.strip().startswith("Error") or line.startswith("ERR"): + elif line.strip().startswith("Error") or line.strip().startswith("ERR"): print(f"{ANSI_RED}{line}{ANSI_END}") elif line.strip().startswith("->"): print(f"{ANSI_CYAN}{line}{ANSI_END}") else: print(line) + if "-> Unknown command" in line or \ + "-> Error" in line : + result = False + return result async def setup_repeater_serial(port, baudrate): """Interactive loop for repeater text CLI (raw serial commands)""" import serial as pyserial - print(f"{ANSI_BCYAN}Connecting to repeater at {port} ({baudrate} baud)...{ANSI_END}") + logger.info(f"{ANSI_BCYAN}Connecting to repeater at {port} ({baudrate} baud)...{ANSI_END}") try: ser = pyserial.Serial(port, baudrate, timeout=1) except PermissionError: print(f"{ANSI_BRED}Error: Permission denied. Try running with sudo or add user to dialout group.{ANSI_END}") - return + return None except Exception as e: print(f"{ANSI_BRED}Error opening serial port: {e}{ANSI_END}") - return + return None await asyncio.sleep(0.5) # Wait for connection to stabilize ser.reset_input_buffer() @@ -4182,6 +4231,7 @@ async def main(argv): timeout = 2 pin = None first_device = False + quiet = False # If there is an address in config file, use it by default # unless an arg is explicitely given if os.path.exists(MCCLI_ADDRESS) : @@ -4189,7 +4239,7 @@ async def main(argv): address = f.readline().strip() try: - opts, args = getopt.getopt(argv, "a:d:s:ht:p:b:fjDhvSlT:Pc:Cr") + opts, args = getopt.getopt(argv, "a:d:s:ht:p:b:fjDhvSlT:Pc:Crq") except getopt.GetoptError: print("Unrecognized option, use -h to get more help") command_usage() @@ -4233,6 +4283,8 @@ async def main(argv): case "-f": # connect to first encountered device address = "" first_device = True + case "-q": # quiet (turns logger to ERROR only) + quiet = True case "-l" : print("BLE devices:") try : @@ -4287,19 +4339,19 @@ async def main(argv): if (debug==True): logger.setLevel(logging.DEBUG) - elif (json_output) : + elif (json_output or quiet) : logger.setLevel(logging.ERROR) # Repeater mode - raw text CLI over serial if repeater_mode: if serial_port is None: - print("Error: Repeater mode (-r) requires serial port (-s)") + logger.error("Repeater mode (-r) requires serial port (-s)") command_usage() return ser = await setup_repeater_serial(serial_port, baudrate) - print(ser) + logger.debug(f"Serial port opened: {ser}") if (len(args) > 0) : await process_repeater_line(ser, " ".join(args)) @@ -4307,7 +4359,7 @@ async def main(argv): await repeater_loop(ser) ser.close() - print(f"\n{ANSI_BGRAY}Disconnected from repeater.{ANSI_END}") + logger.info(f"{ANSI_BGRAY}Disconnected from repeater.{ANSI_END}") return mc = None @@ -4419,7 +4471,7 @@ async def main(argv): if os.path.isdir(MCCLI_CONFIG_DIR) : log_message.file = MCCLI_CONFIG_DIR + mc.self_info["name"] + ".msgs" - if (json_output) : + if (json_output or quiet) : logger.setLevel(logging.ERROR) else : if res.payload["fw ver"] > 2 :