if possible, add path and rssi to channel messages

This commit is contained in:
Florent 2026-03-05 11:50:41 -04:00
parent b1abb8e4d3
commit 3d47d6d8b2
3 changed files with 118 additions and 5 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "meshcore"
version = "2.2.15"
version = "2.2.16"
authors = [
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
{ name="Alex Wolden", email="awolden@gmail.com" },

View file

@ -360,6 +360,9 @@ class MeshCore:
"""Get pending contacts"""
return self._pending_contacts
def set_decrypt_channel_logs(self, v):
self._reader.decrypt_channels = v
def pop_pending_contact(self, key: str) -> Optional[Dict[str, Any]]:
return self._pending_contacts.pop(key, None)

View file

@ -29,6 +29,10 @@ class MessageReader:
# Track pending binary requests by tag for proper response parsing
self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at}
self.channels = [{} for _ in range(20)] # keep our own copy of channels, 20 elements by default
self.decrypt_channels = True
self.channels_log = [] # stores the channel msg events
def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float, context={}, is_anon=False):
"""Register a pending binary request for proper response parsing"""
# Clean up expired requests before adding new one
@ -216,7 +220,17 @@ class MessageReader:
res["path_len"] = dbuf.read(1)[0]
res["txt_type"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
res["text"] = dbuf.read().decode("utf-8", "ignore")
text = dbuf.read().strip(b"\0")
res["text"] = text.decode("utf-8", "ignore")
# search for text in log_channels
txt_hash = int.from_bytes(SHA256.new(text).digest()[0:4], "little", signed=False)
if self.decrypt_channels:
logged = next((l for l in self.channels_log if l['msg_hash'] == txt_hash), None)
if not logged is None:
res["path"] = logged["path"]
res["RSSI"] = logged["rssi"]
res["SNR"] = logged["snr"]
attributes = {
"channel_idx": res["channel_idx"],
@ -236,7 +250,18 @@ class MessageReader:
res["path_len"] = dbuf.read(1)[0]
res["txt_type"] = dbuf.read(1)[0]
res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little")
res["text"] = dbuf.read().decode("utf-8", "ignore")
text = dbuf.read()
res["text"] = text.decode("utf-8", "ignore")
# search for text in log_channels
txt_hash = int.from_bytes(SHA256.new(text).digest()[0:4], "little", signed=False)
res["txt_hash"] = txt_hash
logged = next((l for l in self.channels_log if l['msg_hash'] == txt_hash), None)
if self.decrypt_channels:
if not logged is None:
res["path"] = logged["path"]
res["RSSI"] = logged["rssi"]
attributes = {
"channel_idx": res["channel_idx"],
@ -398,7 +423,12 @@ class MessageReader:
elif packet_type_value == PacketType.CHANNEL_INFO.value:
logger.debug(f"received channel info response: {data.hex()}")
res = {}
res["channel_idx"] = dbuf.read(1)[0]
idx = dbuf.read(1)[0]
res["channel_idx"] = idx
if idx >= len(self.channels):
self.channels = self.channels.extend(
[{} for _ in range(idx - len(self.channels) + 1)])
# Channel name is null-terminated, so find the first null byte
name_bytes = dbuf.read(32)
@ -410,6 +440,9 @@ class MessageReader:
res["channel_secret"] = dbuf.read(16)
res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2]
self.channels[idx] = res
await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res))
# Push notifications
@ -500,7 +533,7 @@ class MessageReader:
log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()}
attributes = {}
recv_time = time.time()
recv_time = int(time.time())
log_data["recv_time"] = recv_time
attributes["recv_time"] = recv_time
@ -558,6 +591,7 @@ class MessageReader:
payload_typename = "UNK"
pkt_payload = pbuf.read()
pkt_hash = int.from_bytes(SHA256.new(pkt_payload).digest()[0:4], "little", signed=False)
log_data["header"] = header
log_data["route_type"] = route_type
@ -579,6 +613,82 @@ class MessageReader:
attributes["path"] = path
log_data["pkt_payload"] = pkt_payload
log_data["pkt_hash"] = pkt_hash
self.channels_log.append(log_data)
if len(self.channels_log) > 150:
self.channels_log = self.channels_log[:-100]
if not payload is None and payload_type == 0x05: # flood msg / channel
pk_buf = io.BytesIO(pkt_payload)
chan_hash = pk_buf.read(1).hex()
cipher_mac = pk_buf.read(2)
msg = pk_buf.read() # until the end of buffer
channel = None
for c in self.channels:
if c["channel_hash"] == chan_hash : # validate against MAC
h = HMAC.new(c["channel_secret"], digestmod=SHA256)
h.update(msg)
if h.digest()[0:2] == cipher_mac:
channel = c
break
chan_name = ""
if channel is None :
chan_name = chan_hash
else:
chan_name = channel["channel_name"]
log_data["chan_hash"] = chan_hash
log_data["cipher_mac"] = cipher_mac.hex()
log_data["crypted"] = msg.hex()
log_data["chan_name"] = chan_name
if not channel is None and self.decrypt_channels:
aes_key = channel["channel_secret"]
cipher = AES.new(aes_key, AES.MODE_ECB)
message = cipher.decrypt(msg)[5:].strip(b"\0")
msg_hash = int.from_bytes(SHA256.new(message).digest()[0:4], "little", signed=False)
log_data["message"] = message.decode("utf-8", "ignore")
log_data["msg_hash"] = msg_hash
elif not payload is None and payload_type == 0x04: # Advert
pk_buf = io.BytesIO(pkt_payload)
adv_key = pk_buf.read(32).hex()
adv_timestamp = int.from_bytes(pk_buf.read(4), "little", signed=False)
signature = pk_buf.read(64).hex()
flags = pk_buf.read(1)[0]
adv_type = flags & 0x0F
adv_lat = None
adv_lon = None
adv_feat1 = None
adv_feat2 = None
if flags & 0x10 > 0: #has location
adv_lat = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
adv_lon = int.from_bytes(pk_buf.read(4), "little", signed=True)/1000000.0
if flags & 0x20 > 0: #has feature1
adv_feat1 = pk_buf.read(2).hex()
if flags & 0x40 > 0: #has feature2
adv_feat2 = pk_buf.read(2).hex()
if flags & 0x80 > 0: #has name
adv_name = pk_buf.read().decode("utf-8", "ignore").strip("\x00")
log_data["adv_name"] = adv_name
log_data["adv_key"] = adv_key
log_data["adv_timestamp"] = adv_timestamp
log_data["signature"] = signature
log_data["adv_flags"] = flags
log_data["adv_type"] = adv_type
if not adv_lat is None :
log_data["adv_lat"] = adv_lat
if not adv_lon is None :
log_data["adv_lon"] = adv_lon
if not adv_feat1 is None:
log_data["adv_feat1"] = adv_feat1
if not adv_feat2 is None:
log_data["adv_feat2"] = adv_feat2
# Dispatch as RF log data
await self.dispatcher.dispatch(