mirror of
https://github.com/meshcore-dev/meshcore-cli.git
synced 2026-04-20 22:13:48 +00:00
let user send rpt cmd from cmdline
This commit is contained in:
parent
439324b325
commit
3bd01ae578
1 changed files with 163 additions and 131 deletions
|
|
@ -3931,7 +3931,131 @@ REPEATER_HELP = f"""
|
||||||
{ANSI_BYELLOW}Type 'quit' or 'q' to exit, Ctrl+C to abort{ANSI_END}
|
{ANSI_BYELLOW}Type 'quit' or 'q' to exit, Ctrl+C to abort{ANSI_END}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async def repeater_loop(port, baudrate):
|
async def prompt_for_file():
|
||||||
|
try:
|
||||||
|
if os.path.isDIR(MCCLI_CONFIG_DIR):
|
||||||
|
region_files_history = FileHistory(MCCLI_REGION_FILES_HISTORY)
|
||||||
|
else:
|
||||||
|
region_files_history = None
|
||||||
|
except Exception:
|
||||||
|
region_files_history = None
|
||||||
|
|
||||||
|
file_session = PromptSession(
|
||||||
|
history=region_files_history,
|
||||||
|
wrap_lines=False,
|
||||||
|
mouse_support=False,
|
||||||
|
complete_style=CompleteStyle.MULTI_COLUMN
|
||||||
|
)
|
||||||
|
|
||||||
|
# Setup key bindings
|
||||||
|
bindings = KeyBindings()
|
||||||
|
|
||||||
|
@bindings.add("escape")
|
||||||
|
def _(event):
|
||||||
|
event.app.current_buffer.cancel_completion()
|
||||||
|
|
||||||
|
path_completer = PathCompleter(expanduser=True)
|
||||||
|
|
||||||
|
file_path = await file_session.prompt_async(
|
||||||
|
"Enter filename (Tab to complete CTRL+C to cancel): ",
|
||||||
|
completer=path_completer,
|
||||||
|
complete_while_typing=False,
|
||||||
|
key_bindings=bindings
|
||||||
|
)
|
||||||
|
|
||||||
|
return file_path
|
||||||
|
|
||||||
|
async def process_repeater_line(ser, cmd) :
|
||||||
|
if cmd.lower() == "help":
|
||||||
|
print(REPEATER_HELP)
|
||||||
|
return
|
||||||
|
|
||||||
|
if cmd.lower() == "clock sync" or cmd.lower() == "st" or cmd.lower() == "sync_time":
|
||||||
|
cur_time = int(time.time())
|
||||||
|
print(f'{ANSI_GREEN}Syncing clock to'
|
||||||
|
f' {datetime.datetime.fromtimestamp(cur_time).strftime("%Y-%m-%d %H:%M:%S")}'
|
||||||
|
f' ({cur_time}){ANSI_END}')
|
||||||
|
cmd = f"time {cur_time}"
|
||||||
|
|
||||||
|
if cmd.lower().startswith("region upload"):
|
||||||
|
try:
|
||||||
|
if cmd.lower() == "region upload": # prompt for a filename
|
||||||
|
file_path = await prompt_for_file()
|
||||||
|
else :
|
||||||
|
file_path = cmd.lower().split(" ", 3)[2]
|
||||||
|
|
||||||
|
file_path = file_path.replace("~", str(Path.home()))
|
||||||
|
with open(file_path, "r") as file:
|
||||||
|
ser.write("region load\r".encode())
|
||||||
|
for line in file:
|
||||||
|
if line.strip() == "": # terminate on empty line
|
||||||
|
break
|
||||||
|
if not line.startswith(";"): # don't send lines starting with ;
|
||||||
|
ser.write(f"{line.rstrip()}\r".encode())
|
||||||
|
ser.write("\r".encode())
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
logger.error("File not found")
|
||||||
|
except (EOFError, KeyboardInterrupt):
|
||||||
|
logger.info("Region upload canceled")
|
||||||
|
|
||||||
|
# in any case, send an empty line and clean buffer
|
||||||
|
cmd = ""
|
||||||
|
|
||||||
|
if cmd.lower().startswith("region download"):
|
||||||
|
try:
|
||||||
|
if cmd.lower() == "region download": # prompt for a filename
|
||||||
|
file_path = await prompt_for_file()
|
||||||
|
else :
|
||||||
|
file_path = cmd.lower().split(" ", 3)[2]
|
||||||
|
|
||||||
|
file_path = file_path.replace("~", str(Path.home()))
|
||||||
|
|
||||||
|
with open(file_path, "w") as file:
|
||||||
|
ser.write("region\r".encode()) # send regions command
|
||||||
|
|
||||||
|
# seek start of regions description
|
||||||
|
line = ser.readline().decode(errors='ignore')
|
||||||
|
while not line.startswith(" ->") :
|
||||||
|
line = ser.readline().decode(errors='ignore')
|
||||||
|
|
||||||
|
line = line[5:]
|
||||||
|
|
||||||
|
while line.rstrip() != "":
|
||||||
|
file.write(line)
|
||||||
|
line = ser.readline().decode(errors='ignore')
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
logger.error("File not found")
|
||||||
|
except (EOFError, KeyboardInterrupt):
|
||||||
|
logger.info("Region download canceled")
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
# Send command with CR terminator
|
||||||
|
if cmd != "":
|
||||||
|
ser.write(f"{cmd}\r".encode())
|
||||||
|
await asyncio.sleep(0.3)
|
||||||
|
|
||||||
|
# Read response
|
||||||
|
response = ser.read(ser.in_waiting or 4096).decode(errors='ignore')
|
||||||
|
if response:
|
||||||
|
# Clean up echo and format response
|
||||||
|
lines = response.rstrip().split('\n')
|
||||||
|
for line in lines:
|
||||||
|
line = line.rstrip()
|
||||||
|
if line and line != cmd: # Skip echo of command
|
||||||
|
# Color code certain responses
|
||||||
|
if line.strip().startswith("OK") or line.strip().startswith("ok"):
|
||||||
|
print(f"{ANSI_GREEN}{line}{ANSI_END}")
|
||||||
|
elif line.strip().startswith("Error") or line.startswith("ERR"):
|
||||||
|
print(f"{ANSI_RED}{line}{ANSI_END}")
|
||||||
|
elif line.strip().startswith("->"):
|
||||||
|
print(f"{ANSI_CYAN}{line}{ANSI_END}")
|
||||||
|
else:
|
||||||
|
print(line)
|
||||||
|
|
||||||
|
async def setup_repeater_serial(port, baudrate):
|
||||||
"""Interactive loop for repeater text CLI (raw serial commands)"""
|
"""Interactive loop for repeater text CLI (raw serial commands)"""
|
||||||
import serial as pyserial
|
import serial as pyserial
|
||||||
|
|
||||||
|
|
@ -3953,18 +4077,32 @@ async def repeater_loop(port, baudrate):
|
||||||
await asyncio.sleep(0.2)
|
await asyncio.sleep(0.2)
|
||||||
ser.reset_input_buffer()
|
ser.reset_input_buffer()
|
||||||
|
|
||||||
# Try to get device info
|
return ser
|
||||||
ser.write(b"ver\r")
|
|
||||||
await asyncio.sleep(0.3)
|
|
||||||
ver_response = ser.read(ser.in_waiting or 256).decode(errors='ignore').strip()
|
|
||||||
device_name = "Repeater"
|
|
||||||
for line in ver_response.split('\n'):
|
|
||||||
line = line.strip()
|
|
||||||
if line and not line.startswith("ver") and ">" not in line[:3]:
|
|
||||||
device_name = line.split('(')[0].strip() if '(' in line else line
|
|
||||||
break
|
|
||||||
|
|
||||||
print(f"{ANSI_BGREEN}Connected!{ANSI_END} Device: {ANSI_BMAGENTA}{device_name}{ANSI_END}")
|
async def repeater_loop(ser):
|
||||||
|
# Try to get device name
|
||||||
|
ser.write(b"get name\r")
|
||||||
|
line = ""
|
||||||
|
while not line.startswith("get name"):
|
||||||
|
line = ser.readline().decode(errors="ignore").rstrip()
|
||||||
|
|
||||||
|
device_name = "Repeater"
|
||||||
|
line = ser.readline().decode(errors="ignore").rstrip()
|
||||||
|
if (line.startswith(" -> > ")):
|
||||||
|
device_name = line[7:]
|
||||||
|
|
||||||
|
# Getting version
|
||||||
|
ser.write(b"ver\r")
|
||||||
|
line = ser.readline().decode(errors="ignore").rstrip()
|
||||||
|
while not line == "ver":
|
||||||
|
line = ser.readline().decode(errors="ignore").rstrip()
|
||||||
|
|
||||||
|
device_version = "unknown"
|
||||||
|
line = ser.readline().decode(errors="ignore").rstrip()
|
||||||
|
if (line.startswith(" -> ")):
|
||||||
|
device_version = line[6:]
|
||||||
|
|
||||||
|
print(f"{ANSI_BGREEN}Connected!{ANSI_END} Device: {ANSI_BMAGENTA}{device_name}{ANSI_END} version {ANSI_BMAGENTA}{device_version}{ANSI_END}")
|
||||||
print(f"Type {ANSI_BCYAN}help{ANSI_END} for commands, {ANSI_BCYAN}quit{ANSI_END} to exit, {ANSI_BCYAN}Tab{ANSI_END} for completion")
|
print(f"Type {ANSI_BCYAN}help{ANSI_END} for commands, {ANSI_BCYAN}quit{ANSI_END} to exit, {ANSI_BCYAN}Tab{ANSI_END} for completion")
|
||||||
print("-" * 50)
|
print("-" * 50)
|
||||||
|
|
||||||
|
|
@ -3984,21 +4122,6 @@ async def repeater_loop(port, baudrate):
|
||||||
complete_style=CompleteStyle.MULTI_COLUMN
|
complete_style=CompleteStyle.MULTI_COLUMN
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
|
||||||
if os.path.isDIR(MCCLI_CONFIG_DIR):
|
|
||||||
region_files_history = FileHistory(MCCLI_REGION_FILES_HISTORY)
|
|
||||||
else:
|
|
||||||
region_files_history = None
|
|
||||||
except Exception:
|
|
||||||
region_files_history = None
|
|
||||||
|
|
||||||
file_session = PromptSession(
|
|
||||||
history=region_files_history,
|
|
||||||
wrap_lines=False,
|
|
||||||
mouse_support=False,
|
|
||||||
complete_style=CompleteStyle.MULTI_COLUMN
|
|
||||||
)
|
|
||||||
|
|
||||||
# Setup key bindings
|
# Setup key bindings
|
||||||
bindings = KeyBindings()
|
bindings = KeyBindings()
|
||||||
|
|
||||||
|
|
@ -4031,110 +4154,8 @@ async def repeater_loop(port, baudrate):
|
||||||
if cmd.lower() in ("quit", "exit", "q"):
|
if cmd.lower() in ("quit", "exit", "q"):
|
||||||
break
|
break
|
||||||
|
|
||||||
if cmd.lower() == "help":
|
await process_repeater_line(ser, cmd)
|
||||||
print(REPEATER_HELP)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if cmd.lower() == "clock sync" or cmd.lower() == "st" or cmd.lower() == "sync_time":
|
|
||||||
cur_time = int(time.time())
|
|
||||||
print(f'{ANSI_GREEN}Syncing clock to'
|
|
||||||
f' {datetime.datetime.fromtimestamp(cur_time).strftime("%Y-%m-%d %H:%M:%S")}'
|
|
||||||
f' ({cur_time}){ANSI_END}')
|
|
||||||
cmd = f"time {cur_time}"
|
|
||||||
|
|
||||||
if cmd.lower().startswith("region upload"):
|
|
||||||
try:
|
|
||||||
if cmd.lower() == "region upload": # prompt for a filename
|
|
||||||
path_completer = PathCompleter(expanduser=True)
|
|
||||||
file_path = await session.prompt_async(
|
|
||||||
"Filename: ",
|
|
||||||
completer=path_completer,
|
|
||||||
complete_while_typing=False,
|
|
||||||
key_bindings=bindings
|
|
||||||
)
|
|
||||||
else :
|
|
||||||
file_path = cmd.lower().split(" ", 3)[2]
|
|
||||||
|
|
||||||
file_path = file_path.replace("~", str(Path.home()))
|
|
||||||
with open(file_path, "r") as file:
|
|
||||||
ser.write("region load\r".encode())
|
|
||||||
for line in file:
|
|
||||||
if line.strip() == "": # terminate on empty line
|
|
||||||
break
|
|
||||||
if not line.startswith(";"): # don't send lines starting with ;
|
|
||||||
ser.write(f"{line.rstrip()}\r".encode())
|
|
||||||
ser.write("\r".encode())
|
|
||||||
|
|
||||||
except FileNotFoundError:
|
|
||||||
logger.error("File not found")
|
|
||||||
except (EOFError, KeyboardInterrupt):
|
|
||||||
logger.info("Region upload canceled")
|
|
||||||
|
|
||||||
# in any case, send an empty line and clean buffer
|
|
||||||
cmd = ""
|
|
||||||
|
|
||||||
if cmd.lower().startswith("region download"):
|
|
||||||
try:
|
|
||||||
if cmd.lower() == "region download": # prompt for a filename
|
|
||||||
path_completer = PathCompleter(expanduser=True)
|
|
||||||
file_path = await session.prompt_async(
|
|
||||||
"Filename: ",
|
|
||||||
completer=path_completer,
|
|
||||||
complete_while_typing=False,
|
|
||||||
key_bindings=bindings
|
|
||||||
)
|
|
||||||
else :
|
|
||||||
file_path = cmd.lower().split(" ", 3)[2]
|
|
||||||
|
|
||||||
file_path = file_path.replace("~", str(Path.home()))
|
|
||||||
|
|
||||||
with open(file_path, "w") as file:
|
|
||||||
ser.write("region\r".encode()) # send regions command
|
|
||||||
|
|
||||||
# seek start of regions description
|
|
||||||
line = ser.readline().decode(errors='ignore')
|
|
||||||
while not line.startswith(" ->") :
|
|
||||||
line = ser.readline().decode(errors='ignore')
|
|
||||||
|
|
||||||
line = line[5:]
|
|
||||||
|
|
||||||
while line.rstrip() != "":
|
|
||||||
file.write(line)
|
|
||||||
line = ser.readline().decode(errors='ignore')
|
|
||||||
|
|
||||||
except FileNotFoundError:
|
|
||||||
logger.error("File not found")
|
|
||||||
except (EOFError, KeyboardInterrupt):
|
|
||||||
logger.info("Region download canceled")
|
|
||||||
|
|
||||||
# in any case, send an empty line to clean buffer
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Send command with CR terminator
|
|
||||||
if cmd != "":
|
|
||||||
ser.write(f"{cmd}\r".encode())
|
|
||||||
await asyncio.sleep(0.3)
|
|
||||||
|
|
||||||
# Read response
|
|
||||||
response = ser.read(ser.in_waiting or 4096).decode(errors='ignore')
|
|
||||||
if response:
|
|
||||||
# Clean up echo and format response
|
|
||||||
lines = response.rstrip().split('\n')
|
|
||||||
for line in lines:
|
|
||||||
line = line.rstrip()
|
|
||||||
if line and line != cmd: # Skip echo of command
|
|
||||||
# Color code certain responses
|
|
||||||
if line.strip().startswith("OK") or line.strip().startswith("ok"):
|
|
||||||
print(f"{ANSI_GREEN}{line}{ANSI_END}")
|
|
||||||
elif line.strip().startswith("Error") or line.startswith("ERR"):
|
|
||||||
print(f"{ANSI_RED}{line}{ANSI_END}")
|
|
||||||
elif line.strip().startswith("->"):
|
|
||||||
print(f"{ANSI_CYAN}{line}{ANSI_END}")
|
|
||||||
else:
|
|
||||||
print(line)
|
|
||||||
|
|
||||||
ser.close()
|
|
||||||
print(f"\n{ANSI_BGRAY}Disconnected from repeater.{ANSI_END}")
|
|
||||||
|
|
||||||
async def main(argv):
|
async def main(argv):
|
||||||
""" Do the job """
|
""" Do the job """
|
||||||
|
|
@ -4264,7 +4285,18 @@ async def main(argv):
|
||||||
print("Error: Repeater mode (-r) requires serial port (-s)")
|
print("Error: Repeater mode (-r) requires serial port (-s)")
|
||||||
command_usage()
|
command_usage()
|
||||||
return
|
return
|
||||||
await repeater_loop(serial_port, baudrate)
|
|
||||||
|
ser = await setup_repeater_serial(serial_port, baudrate)
|
||||||
|
|
||||||
|
print(ser)
|
||||||
|
|
||||||
|
if (len(args) > 0) :
|
||||||
|
await process_repeater_line(ser, " ".join(args))
|
||||||
|
else:
|
||||||
|
await repeater_loop(ser)
|
||||||
|
|
||||||
|
ser.close()
|
||||||
|
print(f"\n{ANSI_BGRAY}Disconnected from repeater.{ANSI_END}")
|
||||||
return
|
return
|
||||||
|
|
||||||
mc = None
|
mc = None
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue