let some extraction work to the lib

This commit is contained in:
Florent 2026-03-05 09:32:42 -04:00
parent db59c48b3f
commit ed1c1d5897
2 changed files with 10 additions and 56 deletions

View file

@ -20,8 +20,7 @@ license-files = ["LICEN[CS]E*"]
dependencies = [ "meshcore >= 2.2.15",
"bleak >= 0.22",
"prompt_toolkit >= 3.0.50",
"requests >= 2.28.0",
"pycryptodome" ]
"requests >= 2.28.0" ]
[project.urls]
Homepage = "https://github.com/fdlamotte/meshcore-cli"

View file

@ -232,58 +232,11 @@ process_event_message.timestamp=""
async def handle_log_rx(event):
mc = handle_log_rx.mc
pkt = bytes().fromhex(event.payload["payload"])
pbuf = io.BytesIO(pkt)
header = pbuf.read(1)[0]
route_type = header & 0x03
payload_type = (header & 0x3c) >> 2
payload_ver = (header & 0xc0) >> 6
transport_code = None
if route_type == 0x00 or route_type == 0x03: # has transport code
transport_code = pbuf.read(4) # discard transport code
path_byte = pbuf.read(1)[0]
path_hash_size = ((path_byte & 0xC0) >> 6) + 1
path_len = (path_byte & 0x3F)
# here path_len is number of hops, not number of bytes
path = pbuf.read(path_len*path_hash_size).hex() # Beware of traces where pathes are mixed
try :
route_typename = ROUTE_TYPENAMES[route_type]
except IndexError:
logger.debug(f"Unknown route type {route_type}")
route_typename = "UNK"
try :
payload_typename = PAYLOAD_TYPENAMES[payload_type]
except IndexError:
logger.debug(f"Unknown payload type {payload_type}")
payload_typename = "UNK"
pkt_payload = pbuf.read()
event.payload["header"] = header
event.payload["route_type"] = route_type
event.payload["route_typename"] = route_typename
event.payload["payload_type"] = payload_type
event.payload["payload_typename"]= payload_typename
event.payload["payload_ver"] = payload_ver
if not transport_code is None:
event.payload["transport_code"] = transport_code.hex()
event.payload["path_len"] = path_len
event.payload["path_hash_size"] = path_hash_size
event.payload["path"] = path
event.payload["pkt_payload"] = pkt_payload.hex()
payload_type = event.payload["payload_type"]
if payload_type == 0x05: # flood msg / channel
if handle_log_rx.channel_echoes:
pk_buf = io.BytesIO(pkt_payload)
pk_buf = io.BytesIO(event.payload["pkt_payload"])
chan_hash = pk_buf.read(1).hex()
cipher_mac = pk_buf.read(2)
msg = pk_buf.read() # until the end of buffer
@ -311,9 +264,9 @@ async def handle_log_rx(event):
if chan_name != "" :
width = os.get_terminal_size().columns
cars = width - 13 - len(path) - len(chan_name) - 1
cars = width - 13 - len(event.payload["path"]) - len(chan_name) - 1
dispmsg = message.replace("\n","")[0:cars]
txt = f"{ANSI_LIGHT_GRAY}{chan_name} {ANSI_DGREEN}{dispmsg+(cars-len(dispmsg))*' '} {ANSI_YELLOW}[{path}]{ANSI_LIGHT_GRAY}{event.payload['snr']:6,.2f}{event.payload['rssi']:4}{ANSI_END}"
txt = f"{ANSI_LIGHT_GRAY}{chan_name} {ANSI_DGREEN}{dispmsg+(cars-len(dispmsg))*' '} {ANSI_YELLOW}[{event.payload['path']}]{ANSI_LIGHT_GRAY}{event.payload['snr']:6,.2f}{event.payload['rssi']:4}{ANSI_END}"
if handle_message.above:
print_above(txt)
else:
@ -321,7 +274,7 @@ async def handle_log_rx(event):
elif payload_type == 0x04: # Advert
if handle_log_rx.advert_echoes:
pk_buf = io.BytesIO(pkt_payload)
pk_buf = io.BytesIO(event.payload['pkt_payload'])
adv_key = pk_buf.read(32).hex()
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
signature = pk_buf.read(64).hex()
@ -362,13 +315,14 @@ async def handle_log_rx(event):
txt = f"{ANSI_LIGHT_GRAY}Advert for{ANSI_END} {adv_name}{ANSI_GREEN}/{CONTACT_TYPENAMES[adv_type]}{ts_str}{ANSI_END}"
if not adv_lat is None:
txt += f" {ANSI_LIGHT_GRAY}coords: {adv_lat},{adv_lon}"
txt += f" {ANSI_YELLOW}path: [{path}] {ANSI_LIGHT_GRAY}snr: {event.payload['snr']:.2f}dB{ANSI_END}"
txt += f" {ANSI_YELLOW}path: [{event.payload['path']}] {ANSI_LIGHT_GRAY}snr: {event.payload['snr']:.2f}dB{ANSI_END}"
if handle_message.above:
print_above(txt)
else:
print(txt)
event.payload["pkt_payload"] = event.payload["pkt_payload"].hex() # convert for json serialization
if handle_log_rx.json_log_rx: # json mode ... raw dump
msg = json.dumps(event.payload)
@ -1717,7 +1671,8 @@ async def set_channel (mc, chan, name, key=None):
return None
info = res.payload
info["channel_hash"] = SHA256.new(info["channel_secret"]).hexdigest()[0:2]
if not "channel_hash" in info:
info["channel_hash"] = SHA256.new(info["channel_secret"]).hexdigest()[0:2]
info["channel_secret"] = info["channel_secret"].hex()
if hasattr(mc,'channels') :