This commit is contained in:
David van Leeuwen 2026-04-15 10:46:47 +02:00 committed by GitHub
commit e933420b77
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -12,8 +12,10 @@ import requests
import serial.tools.list_ports import serial.tools.list_ports
from pathlib import Path from pathlib import Path
import traceback import traceback
from prompt_toolkit.application.current import get_app_or_none
from prompt_toolkit.shortcuts import PromptSession from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.shortcuts import CompleteStyle from prompt_toolkit.shortcuts import CompleteStyle
from prompt_toolkit.shortcuts import print_formatted_text
from prompt_toolkit.completion import NestedCompleter, PathCompleter from prompt_toolkit.completion import NestedCompleter, PathCompleter
from prompt_toolkit.completion import CompleteEvent, Completer, Completion from prompt_toolkit.completion import CompleteEvent, Completer, Completion
from prompt_toolkit.history import FileHistory from prompt_toolkit.history import FileHistory
@ -22,6 +24,7 @@ from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.shortcuts import radiolist_dialog from prompt_toolkit.shortcuts import radiolist_dialog
from prompt_toolkit.completion.word_completer import WordCompleter from prompt_toolkit.completion.word_completer import WordCompleter
from prompt_toolkit.document import Document from prompt_toolkit.document import Document
from prompt_toolkit.utils import get_cwidth
try: try:
from bleak import BleakScanner, BleakClient from bleak import BleakScanner, BleakClient
@ -98,26 +101,55 @@ def escape_ansi(line):
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', line) return ansi_escape.sub('', line)
def print_one_line_above(str): def truncate_to_display_width(text, max_width):
""" prints a string above current line """ """Return the longest prefix of text whose terminal display width is <= max_width."""
width = os.get_terminal_size().columns if max_width <= 0:
stringlen = len(escape_ansi(str))-1 return ""
lines = divmod(stringlen, width)[0] + 1 lo, hi = 0, len(text)
print("\u001B[s", end="") # Save current cursor position while lo < hi:
print("\u001B[A", end="") # Move cursor up one line mid = (lo + hi + 1) // 2
print("\u001B[999D", end="") # Move cursor to beginning of line if get_cwidth(text[:mid]) <= max_width:
for _ in range(lines): lo = mid
print("\u001B[S", end="") # Scroll up/pan window down 1 line else:
print("\u001B[L", end="") # Insert new line hi = mid - 1
for _ in range(lines - 1): return text[:lo]
print("\u001B[A", end="") # Move cursor up one line
print(str, end="") # Print output status msg
print("\u001B[u", end="", flush=True) # Jump back to saved cursor position
def print_above(str): def print_one_line_above(text):
lines = str.split('\n') """Status line(s) above the prompt; delegates to print_above."""
for l in lines: print_above(text)
print_one_line_above(l)
def print_above(text):
"""
Show text above the current input line.
When prompt_toolkit has a running Application (interactive prompt), plain
print() and DEC save/restore fight the renderer use print_formatted_text,
which suspends the UI, writes via the Output layer (UTF-8), and redraws.
With no active app (scripts, pipes), use CSI save/move/clear/restore only.
"""
if text == "":
return
if get_app_or_none() is not None:
print_formatted_text(ANSI(text), end="\n")
return
lines = text.split("\n")
n = len(lines)
chunks = ["\033[s", f"\033[{n}A"]
for i, line in enumerate(lines):
chunks.append("\033[1G\033[2K")
chunks.append(line)
if i < n - 1:
chunks.append("\033[B")
chunks.append("\033[u")
combined = "".join(chunks)
enc = getattr(sys.stdout, "encoding", None) or "utf-8"
try:
sys.stdout.buffer.write(combined.encode(enc, errors="replace"))
sys.stdout.buffer.flush()
except (AttributeError, TypeError):
sys.stdout.write(combined)
sys.stdout.flush()
async def process_event_message(mc, ev, json_output, end="\n", above=False): async def process_event_message(mc, ev, json_output, end="\n", above=False):
""" display incoming message """ """ display incoming message """
@ -252,9 +284,30 @@ async def handle_log_rx(event):
if chan_name != "" : if chan_name != "" :
width = os.get_terminal_size().columns width = os.get_terminal_size().columns
cars = width - 13 - len(event.payload["path"]) - len(chan_name) - 1 path = event.payload["path"]
dispmsg = message.replace("\n","")[0:cars] snr_str = f"{event.payload['snr']:6,.2f}"
txt = f"{ANSI_LIGHT_GRAY}{chan_name} {ANSI_DGREEN}{dispmsg+(cars-len(dispmsg))*' '} {ANSI_START}{width-11-len(event.payload['path'])}G{ANSI_YELLOW}[{event.payload['path']}]{ANSI_LIGHT_GRAY}{event.payload['snr']:6,.2f}{event.payload['rssi']:4}{ANSI_END}" rssi_str = f"{event.payload['rssi']:4}"
rhs_plain = f"[{path}]{snr_str}{rssi_str}"
rhs_w = get_cwidth(rhs_plain)
rhs_col = max(1, width - rhs_w + 1)
prefix_plain = chan_name + " "
prefix_w = get_cwidth(prefix_plain)
trailing_before_csi_w = 1
max_msg_w = max(0, rhs_col - 1 - prefix_w - trailing_before_csi_w)
raw_msg = message.replace("\n", "")
dispmsg = truncate_to_display_width(raw_msg, max_msg_w)
while True:
left_w = get_cwidth(prefix_plain + dispmsg + " ")
if left_w >= rhs_col - 1:
break
nxt = dispmsg + " "
if get_cwidth(prefix_plain + nxt + " ") > rhs_col - 1:
break
dispmsg = nxt
txt = (
f"{ANSI_LIGHT_GRAY}{prefix_plain}{ANSI_DGREEN}{dispmsg} "
f"{ANSI_START}{rhs_col}G{ANSI_YELLOW}[{path}]{ANSI_LIGHT_GRAY}{snr_str}{rssi_str}{ANSI_END}"
)
if handle_message.above: if handle_message.above:
print_above(txt) print_above(txt)
@ -3373,7 +3426,7 @@ async def next_cmd(mc, cmds, json_output=False):
else: else:
if json_output: if json_output:
print(json.dumps(res.payload)) print(json.dumps(res.payload))
else : else:
path_len = res.payload['path_len'] path_len = res.payload['path_len']
if (path_len == 0) : if (path_len == 0) :
print("0 hop") print("0 hop")