extract some info from log_rx

This commit is contained in:
Florent 2026-03-05 09:32:19 -04:00
parent 3716ebf77e
commit b1abb8e4d3
2 changed files with 67 additions and 4 deletions

View file

@ -18,7 +18,7 @@ classifiers = [
]
license = "MIT"
license-files = ["LICEN[CS]E*"]
dependencies = [ "bleak", "pyserial-asyncio-fast", "pycayennelpp" ]
dependencies = [ "bleak", "pyserial-asyncio-fast", "pycayennelpp", "pycryptodome" ]
[project.optional-dependencies]
dev = ["pytest", "pytest-asyncio", "black", "ruff"]

View file

@ -9,9 +9,14 @@ from .packets import BinaryReqType, PacketType, ControlType
from .parsing import lpp_parse, lpp_parse_mma, parse_acl, parse_status
from cayennelpp import LppFrame, LppData
from meshcore.lpp_json_encoder import lpp_json_encoder
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
logger = logging.getLogger("meshcore")
PAYLOAD_TYPENAMES = ["REQ", "RESPONSE", "TEXT_MSG", "ACK", "ADVERT", "GRP_TXT", "GRP_DATA", "ANON_REQ", "PATH", "TRACE", "MULTIPART", "CONTROL"]
ROUTE_TYPENAMES = ["TC_FLOOD", "FLOOD", "DIRECT", "TC_DIRECT"]
CONTACT_TYPENAMES = ["NONE","CLI","REP","ROOM","SENS"]
class MessageReader:
def __init__(self, dispatcher: EventDispatcher):
@ -404,6 +409,7 @@ class MessageReader:
res["channel_name"] = name_bytes.decode("utf-8", "ignore")
res["channel_secret"] = dbuf.read(16)
res["channel_hash"] = SHA256.new(res["channel_secret"]).hexdigest()[0:2]
await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res))
# Push notifications
@ -492,6 +498,11 @@ class MessageReader:
# Parse as raw RX data
log_data: Dict[str, Any] = {"raw_hex": data[1:].hex()}
attributes = {}
recv_time = time.time()
log_data["recv_time"] = recv_time
attributes["recv_time"] = recv_time
# First byte is SNR (signed byte, multiplied by 4)
if len(data) > 1:
@ -508,14 +519,66 @@ class MessageReader:
log_data["rssi"] = rssi
# Remaining bytes are the raw data payload
payload = None
if len(data) > 3:
payload=dbuf.read()
log_data["payload"] = payload.hex()
log_data["payload_length"] = len(payload)
attributes = {
"pubkey_prefix": log_data["raw_hex"],
}
# Parse payload and get some info from it
if not payload is None:
pbuf = io.BytesIO(payload)
header = pbuf.read(1)[0]
route_type = header & 0x03
payload_type = (header & 0x3c) >> 2
payload_ver = (header & 0xc0) >> 6
transport_code = None
if route_type == 0x00 or route_type == 0x03: # has transport code
transport_code = pbuf.read(4) # discard transport code
path_byte = pbuf.read(1)[0]
path_hash_size = ((path_byte & 0xC0) >> 6) + 1
path_len = (path_byte & 0x3F)
# here path_len is number of hops, not number of bytes
path = pbuf.read(path_len*path_hash_size).hex() # Beware of traces where pathes are mixed
try :
route_typename = ROUTE_TYPENAMES[route_type]
except IndexError:
logger.debug(f"Unknown route type {route_type}")
route_typename = "UNK"
try :
payload_typename = PAYLOAD_TYPENAMES[payload_type]
except IndexError:
logger.debug(f"Unknown payload type {payload_type}")
payload_typename = "UNK"
pkt_payload = pbuf.read()
log_data["header"] = header
log_data["route_type"] = route_type
attributes["route_type"] = route_type
log_data["route_typename"] = route_typename
log_data["payload_type"] = payload_type
attributes["payload_type"] = payload_type
log_data["payload_typename"]= payload_typename
log_data["payload_ver"] = payload_ver
if not transport_code is None:
log_data["transport_code"] = transport_code.hex()
log_data["path_len"] = path_len
attributes["path_len"] = path_len
log_data["path_hash_size"] = path_hash_size
log_data["path"] = path
attributes["path"] = path
log_data["pkt_payload"] = pkt_payload
# Dispatch as RF log data
await self.dispatcher.dispatch(