offload channel messages decoding to meshcore_py

This commit is contained in:
Florent 2026-03-05 11:53:10 -04:00
parent ed1c1d5897
commit 7caca2db79
2 changed files with 28 additions and 49 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "meshcore-cli"
version = "1.4.5"
version = "1.4.6"
authors = [
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
]
@ -17,7 +17,7 @@ classifiers = [
]
license = "MIT"
license-files = ["LICEN[CS]E*"]
dependencies = [ "meshcore >= 2.2.15",
dependencies = [ "meshcore >= 2.2.16",
"bleak >= 0.22",
"prompt_toolkit >= 3.0.50",
"requests >= 2.28.0" ]

View file

@ -22,8 +22,6 @@ from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.shortcuts import radiolist_dialog
from prompt_toolkit.completion.word_completer import WordCompleter
from prompt_toolkit.document import Document
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
try:
from bleak import BleakScanner, BleakClient
@ -37,7 +35,7 @@ import re
from meshcore import MeshCore, EventType, logger
# Version
VERSION = "v1.4.5"
VERSION = "v1.4.6"
# default ble address is stored in a config file
MCCLI_CONFIG_DIR = str(Path.home()) + "/.config/meshcore/"
@ -236,37 +234,21 @@ async def handle_log_rx(event):
if payload_type == 0x05: # flood msg / channel
if handle_log_rx.channel_echoes:
pk_buf = io.BytesIO(event.payload["pkt_payload"])
chan_hash = pk_buf.read(1).hex()
cipher_mac = pk_buf.read(2)
msg = pk_buf.read() # until the end of buffer
channel = None
for c in await get_channels(mc):
if c["channel_hash"] == chan_hash : # validate against MAC
h = HMAC.new(bytes.fromhex(c["channel_secret"]), digestmod=SHA256)
h.update(msg)
if h.digest()[0:2] == cipher_mac:
channel = c
break
chan_name = ""
if channel is None :
if handle_log_rx.echo_unk_chans:
chan_name = chan_hash
message = msg.hex()
else:
chan_name = channel["channel_name"]
aes_key = bytes.fromhex(channel["channel_secret"])
cipher = AES.new(aes_key, AES.MODE_ECB)
message = cipher.decrypt(msg)[5:].decode("utf-8", "ignore").strip("\x00")
if "message" in event.payload :
chan_name = event.payload["chan_name"]
message = event.payload["message"]
elif handle_log_rx.echo_unk_chans:
chan_name = event.payload["chan_hash"]
message = event.payload["crypted"]
if chan_name != "" :
width = os.get_terminal_size().columns
cars = width - 13 - len(event.payload["path"]) - len(chan_name) - 1
dispmsg = message.replace("\n","")[0:cars]
txt = f"{ANSI_LIGHT_GRAY}{chan_name} {ANSI_DGREEN}{dispmsg+(cars-len(dispmsg))*' '} {ANSI_YELLOW}[{event.payload['path']}]{ANSI_LIGHT_GRAY}{event.payload['snr']:6,.2f}{event.payload['rssi']:4}{ANSI_END}"
if handle_message.above:
print_above(txt)
else:
@ -274,25 +256,24 @@ async def handle_log_rx(event):
elif payload_type == 0x04: # Advert
if handle_log_rx.advert_echoes:
pk_buf = io.BytesIO(event.payload['pkt_payload'])
adv_key = pk_buf.read(32).hex()
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
signature = pk_buf.read(64).hex()
flags = pk_buf.read(1)[0]
adv_type = flags & 0x0F
adv_lat = None
adv_lon = None
if flags & 0x10 > 0: #has location
adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
if flags & 0x20 > 0: #has feature1
adv_feat1 = pk_buf.read(2).hex()
if flags & 0x40 > 0: #has feature2
adv_feat2 = pk_buf.read(2).hex()
if flags & 0x80 > 0: #has name
adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00")
if adv_name is None:
adv_key = event.payload["adv_key"]
adv_timestamp = event.payload["adv_timestamp"]
signature = event.payload["signature"]
flags = event.payload["adv_flags"]
adv_type = event.payload["adv_type"]
if "adv_lat" in event.payload:
adv_lat = event.payload["adv_lat"]
if "adv_lon" in event.payload:
adv_lon = event.payload["adv_lon"]
if "adv_feat1" in event.payload:
adv_feat1 = event.payload["adv_feat1"]
if "adv_feat2" in event.payload:
adv_feat2 = event.payload["adv_feat2"]
if "adv_name" in event.payload:
adv_name = event.payload["adv_name"]
else:
# try to get the name from the contact
ct = handle_log_rx.mc.get_contact_by_key_prefix(adv_key)
if ct is None:
@ -1671,8 +1652,6 @@ async def set_channel (mc, chan, name, key=None):
return None
info = res.payload
if not "channel_hash" in info:
info["channel_hash"] = SHA256.new(info["channel_secret"]).hexdigest()[0:2]
info["channel_secret"] = info["channel_secret"].hex()
if hasattr(mc,'channels') :
@ -1761,7 +1740,6 @@ async def get_channels (mc, anim=False) :
if res.type == EventType.ERROR:
break
info = res.payload
info["channel_hash"] = SHA256.new(info["channel_secret"]).hexdigest()[0:2]
info["channel_secret"] = info["channel_secret"].hex()
mc.channels.append(info)
ch = ch + 1
@ -4540,6 +4518,7 @@ async def main(argv):
mc.subscribe(EventType.RX_LOG_DATA, handle_log_rx)
mc.auto_update_contacts = True
mc.set_decrypt_channel_logs = True
res = await mc.commands.send_device_query()
if res.type == EventType.ERROR :