implement log_channels

This commit is contained in:
Florent 2025-11-02 19:13:17 +01:00
parent 17a8ad4522
commit eca02c9621

View file

@ -24,6 +24,8 @@ from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.shortcuts import radiolist_dialog
from prompt_toolkit.completion.word_completer import WordCompleter
from prompt_toolkit.document import Document
from hashlib import sha256
from Crypto.Cipher import AES
import re
@ -194,17 +196,39 @@ process_event_message.color=True
process_event_message.last_node=None
async def handle_log_rx(event):
if not handle_log_rx.print_log_rx:
return
if handle_message.json_output: # json mode ... raw dump
mc = handle_log_rx.mc
if handle_log_rx.json_log_rx: # json mode ... raw dump
msg = json.dumps(event.payload)
if handle_message.above:
print_above(msg)
else :
print(msg)
return
handle_log_rx.print_log_rx = False
pkt = bytes().fromhex(event.payload["payload"])
if handle_log_rx.log_channels:
if pkt[0] == 0x15:
path_len = pkt[1]
path = pkt[2:path_len+2].hex()
chan_hash = pkt[path_len+2:path_len+3].hex()
cipher_mac = int.from_bytes(pkt[path_len+3:path_len+5], byteorder="little")
msg = pkt[path_len+5:]
channel = await get_channel_by_hash(mc, chan_hash)
if channel is None :
chan_name = chan_hash
message = msg.hex()
else:
chan_name = channel["channel_name"]
aes_key = bytes.fromhex(channel["channel_secret"])
cipher = AES.new(aes_key, AES.MODE_ECB)
message = cipher.decrypt(msg)[5:].decode("utf-8").strip("\x00")
print_above(f"{ANSI_LIGHT_GRAY}{chan_name:>10} {ANSI_GREEN}{message[0:25]:25} {ANSI_LIGHT_GRAY}({event.payload['snr']:7,.2f},{event.payload['rssi']:4}){ANSI_YELLOW} [{path}]{ANSI_END}")
handle_log_rx.json_log_rx = False
handle_log_rx.log_channels = False
handle_log_rx.mc = None
async def handle_advert(event):
if not handle_advert.print_adverts:
@ -333,7 +357,7 @@ class MyNestedCompleter(NestedCompleter):
opts = self.options.keys()
completer = WordCompleter(
opts, ignore_case=self.ignore_case,
pattern=re.compile(r"([a-zA-Z0-9_\\/]+|[^a-zA-Z0-9_\s]+)"))
pattern=re.compile(r"([a-zA-Z0-9_\\/\#]+|[^a-zA-Z0-9_\s\#]+)"))
yield from completer.get_completions(document, complete_event)
else: # normal behavior for remainder
yield from super().get_completions(document, complete_event)
@ -430,7 +454,8 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None):
"color" : {"on":None, "off":None},
"print_name" : {"on":None, "off":None},
"print_adverts" : {"on":None, "off":None},
"print_log_rx" : {"on":None, "off":None},
"json_log_rx" : {"on":None, "off":None},
"log_channels" : {"on":None, "off":None},
"print_new_contacts" : {"on": None, "off":None},
"print_path_updates" : {"on":None,"off":None},
"classic_prompt" : {"on" : None, "off":None},
@ -458,7 +483,8 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None):
"color":None,
"print_name":None,
"print_adverts":None,
"print_log_rx":None,
"json_log_rx":None,
"log_channels":None,
"print_path_updates":None,
"print_new_contacts":None,
"classic_prompt":None,
@ -698,7 +724,7 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
if classic :
prompt = prompt + " > "
else :
prompt = prompt + "🭨"
prompt = prompt + f"{ANSI_NORMAL}🭬{ANSI_INVERT}"
if not contact is None :
if not last_ack:
@ -719,7 +745,7 @@ Line starting with \"$\" or \".\" will issue a meshcli command.
prompt = prompt + f"{ANSI_INVERT}"
if print_name and not classic :
prompt = prompt + "🭬"
prompt = prompt + f"{ANSI_NORMAL}🭨{ANSI_INVERT}"
prompt = prompt + f"{contact['adv_name']}"
if classic :
@ -1178,6 +1204,16 @@ async def get_channel_by_name (mc, name):
return None
async def get_channel_by_hash (mc, hash):
if not hasattr(mc, 'channels') :
await_get_channels(mc)
for c in mc.channels:
if c['channel_hash'] == hash:
return c
return None
async def get_contacts (mc, anim=False, lastomod=0, timeout=5) :
if mc._contacts:
return
@ -1249,6 +1285,7 @@ async def get_channels (mc, anim=False) :
if res.type == EventType.ERROR:
break
info = res.payload
info["channel_hash"] = sha256(info["channel_secret"]).digest()[0:1].hex()
info["channel_secret"] = info["channel_secret"].hex()
mc.channels.append(info)
ch = ch + 1
@ -1460,8 +1497,12 @@ async def next_cmd(mc, cmds, json_output=False):
process_event_message.print_snr = (cmds[2] == "on")
if json_output :
print(json.dumps({"cmd" : cmds[1], "param" : cmds[2]}))
case "print_log_rx" :
handle_log_rx.print_log_rx = (cmds[2] == "on")
case "json_log_rx" :
handle_log_rx.json_log_rx = (cmds[2] == "on")
if json_output :
print(json.dumps({"cmd" : cmds[1], "param" : cmds[2]}))
case "log_channels" :
handle_log_rx.log_channels = (cmds[2] == "on")
if json_output :
print(json.dumps({"cmd" : cmds[1], "param" : cmds[2]}))
case "print_adverts" :
@ -1694,11 +1735,16 @@ async def next_cmd(mc, cmds, json_output=False):
print(json.dumps({"color" : process_event_message.color}))
else:
print(f"{'on' if process_event_message.color else 'off'}")
case "print_log_rx":
case "json_log_rx":
if json_output :
print(json.dumps({"print_log_rx" : handle_log_rx.print_log_rx}))
print(json.dumps({"json_log_rx" : handle_log_rx.json_log_rx}))
else:
print(f"{'on' if handle_log_rx.print_log_rx else 'off'}")
print(f"{'on' if handle_log_rx.json_log_rx else 'off'}")
case "log_channels":
if json_output :
print(json.dumps({"log_channels" : handle_log_rx.log_channels}))
else:
print(f"{'on' if handle_log_rx.log_channels else 'off'}")
case "print_adverts":
if json_output :
print(json.dumps({"print_adverts" : handle_advert.print_adverts}))
@ -2939,6 +2985,7 @@ async def main(argv):
handle_message.mc = mc # connect meshcore to handle_message
handle_advert.mc = mc
handle_path_update.mc = mc
handle_log_rx.mc = mc
mc.subscribe(EventType.ADVERTISEMENT, handle_advert)
mc.subscribe(EventType.PATH_UPDATE, handle_path_update)