added scripts to repeater mode

This commit is contained in:
Florent 2026-02-06 09:05:10 -04:00
parent 3804f382f1
commit f76be35e88
2 changed files with 71 additions and 19 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "meshcore-cli"
version = "1.3.20"
version = "1.3.21"
authors = [
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
]

View file

@ -32,7 +32,7 @@ import re
from meshcore import MeshCore, EventType, logger
# Version
VERSION = "v1.3.20"
VERSION = "v1.3.21"
# default ble address is stored in a config file
MCCLI_CONFIG_DIR = str(Path.home()) + "/.config/meshcore/"
@ -3628,7 +3628,8 @@ def command_usage() :
-h : prints help for arguments and commands
-v : prints version
-j : json output (disables init file)
-D : debug
-D : debug (sets logging to DEBUG)
-q : quiet (sets logging to ERROR)
-S : scan for devices and show a selector
-l : list available ble/serial devices and exit
-T <timeout> : timeout for the ble scan (-S and -l) default 2s
@ -3938,6 +3939,7 @@ REPEATER_HELP = f"""
{ANSI_BGREEN}System:{ANSI_END}
reboot - Reboot device
erase - Erase filesystem (serial only)
script <file> - Execute script
{ANSI_BYELLOW}Type 'quit' or 'q' to exit, Ctrl+C to abort{ANSI_END}
"""
@ -3976,10 +3978,31 @@ async def prompt_for_file():
return file_path
async def process_repeater_line(ser, cmd) :
async def process_repeater_script(ser, file):
if not os.path.exists(file) :
logger.info(f"file {file} not found")
return
with open(file, "r") as f :
lines=f.readlines()
for line in lines:
line = line.strip()
if not (line == "" or line[0] == "#"):
logger.debug(f"processing {line}")
try :
res = await process_repeater_line(ser, line, echo=True)
if not res:
logger.info("Error during script execution, exiting")
break
except ValueError:
logger.error(f"Error processing {line}")
break
async def process_repeater_line(ser, cmd, echo=False) :
if cmd.lower() == "help":
print(REPEATER_HELP)
return
return True
if cmd.lower() == "clock sync" or cmd.lower() == "st" or cmd.lower() == "sync_time":
cur_time = int(time.time())
@ -4007,8 +4030,10 @@ async def process_repeater_line(ser, cmd) :
except FileNotFoundError:
logger.error("File not found")
return False
except (EOFError, KeyboardInterrupt):
logger.info("Region upload canceled")
return False
# in any case, send an empty line and clean buffer
cmd = ""
@ -4035,13 +4060,32 @@ async def process_repeater_line(ser, cmd) :
while line.rstrip() != "":
file.write(line)
line = ser.readline().decode(errors='ignore')
except FileNotFoundError:
logger.error("File not found")
return False
except (EOFError, KeyboardInterrupt):
logger.info("Region download canceled")
return False
return True
if cmd.lower().startswith("script"):
try:
if cmd.lower() == "script": # prompt for a filename
file_path = await prompt_for_file()
else :
file_path = cmd.lower().split(" ", 2)[1]
file_path = file_path.replace("~", str(Path.home()))
return await process_repeater_script(ser, file_path)
except FileNotFoundError:
logger.error("File not found")
return False
except (EOFError, KeyboardInterrupt):
logger.info("Region download canceled")
return
logger.info("Script canceled")
return False
# Send command with CR terminator
if cmd != "":
@ -4049,36 +4093,41 @@ async def process_repeater_line(ser, cmd) :
await asyncio.sleep(0.3)
# Read response
result = True
response = ser.read(ser.in_waiting or 4096).decode(errors='ignore')
if response:
# Clean up echo and format response
lines = response.rstrip().split('\n')
for line in lines:
line = line.rstrip()
if line and line != cmd: # Skip echo of command
if line and (echo or line != cmd): # Skip echo of command
# Color code certain responses
if line.strip().startswith("OK") or line.strip().startswith("ok"):
print(f"{ANSI_GREEN}{line}{ANSI_END}")
elif line.strip().startswith("Error") or line.startswith("ERR"):
elif line.strip().startswith("Error") or line.strip().startswith("ERR"):
print(f"{ANSI_RED}{line}{ANSI_END}")
elif line.strip().startswith("->"):
print(f"{ANSI_CYAN}{line}{ANSI_END}")
else:
print(line)
if "-> Unknown command" in line or \
"-> Error" in line :
result = False
return result
async def setup_repeater_serial(port, baudrate):
"""Interactive loop for repeater text CLI (raw serial commands)"""
import serial as pyserial
print(f"{ANSI_BCYAN}Connecting to repeater at {port} ({baudrate} baud)...{ANSI_END}")
logger.info(f"{ANSI_BCYAN}Connecting to repeater at {port} ({baudrate} baud)...{ANSI_END}")
try:
ser = pyserial.Serial(port, baudrate, timeout=1)
except PermissionError:
print(f"{ANSI_BRED}Error: Permission denied. Try running with sudo or add user to dialout group.{ANSI_END}")
return
return None
except Exception as e:
print(f"{ANSI_BRED}Error opening serial port: {e}{ANSI_END}")
return
return None
await asyncio.sleep(0.5) # Wait for connection to stabilize
ser.reset_input_buffer()
@ -4182,6 +4231,7 @@ async def main(argv):
timeout = 2
pin = None
first_device = False
quiet = False
# If there is an address in config file, use it by default
# unless an arg is explicitely given
if os.path.exists(MCCLI_ADDRESS) :
@ -4189,7 +4239,7 @@ async def main(argv):
address = f.readline().strip()
try:
opts, args = getopt.getopt(argv, "a:d:s:ht:p:b:fjDhvSlT:Pc:Cr")
opts, args = getopt.getopt(argv, "a:d:s:ht:p:b:fjDhvSlT:Pc:Crq")
except getopt.GetoptError:
print("Unrecognized option, use -h to get more help")
command_usage()
@ -4233,6 +4283,8 @@ async def main(argv):
case "-f": # connect to first encountered device
address = ""
first_device = True
case "-q": # quiet (turns logger to ERROR only)
quiet = True
case "-l" :
print("BLE devices:")
try :
@ -4287,19 +4339,19 @@ async def main(argv):
if (debug==True):
logger.setLevel(logging.DEBUG)
elif (json_output) :
elif (json_output or quiet) :
logger.setLevel(logging.ERROR)
# Repeater mode - raw text CLI over serial
if repeater_mode:
if serial_port is None:
print("Error: Repeater mode (-r) requires serial port (-s)")
logger.error("Repeater mode (-r) requires serial port (-s)")
command_usage()
return
ser = await setup_repeater_serial(serial_port, baudrate)
print(ser)
logger.debug(f"Serial port opened: {ser}")
if (len(args) > 0) :
await process_repeater_line(ser, " ".join(args))
@ -4307,7 +4359,7 @@ async def main(argv):
await repeater_loop(ser)
ser.close()
print(f"\n{ANSI_BGRAY}Disconnected from repeater.{ANSI_END}")
logger.info(f"{ANSI_BGRAY}Disconnected from repeater.{ANSI_END}")
return
mc = None
@ -4419,7 +4471,7 @@ async def main(argv):
if os.path.isdir(MCCLI_CONFIG_DIR) :
log_message.file = MCCLI_CONFIG_DIR + mc.self_info["name"] + ".msgs"
if (json_output) :
if (json_output or quiet) :
logger.setLevel(logging.ERROR)
else :
if res.payload["fw ver"] > 2 :