From b8885e3015c3f1bdf42cf31781801740c050b6ae Mon Sep 17 00:00:00 2001 From: Florent de Lamotte Date: Wed, 5 Nov 2025 13:11:22 +0100 Subject: [PATCH] starting rewriting of reader using io.BytesIO instead of fixed indexes --- src/meshcore/reader.py | 212 +++++++++++++++++++++-------------------- 1 file changed, 110 insertions(+), 102 deletions(-) diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 43031f0..c1d82bd 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -1,6 +1,7 @@ import logging import json import time +import io from typing import Any, Dict from .events import Event, EventType, EventDispatcher from .packets import BinaryReqType, PacketType @@ -18,7 +19,7 @@ class MessageReader: # before events are dispatched self.contacts = {} # Temporary storage during contact list building self.contact_nb = 0 # Used for contact processing - + # Track pending binary requests by tag for proper response parsing self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at} @@ -26,7 +27,7 @@ class MessageReader: """Register a pending binary request for proper response parsing""" # Clean up expired requests before adding new one self.cleanup_expired_requests() - + expires_at = time.time() + timeout_seconds self.pending_binary_requests[tag] = { "request_type": request_type, @@ -42,13 +43,14 @@ class MessageReader: tag for tag, info in self.pending_binary_requests.items() if current_time > info["expires_at"] ] - + for tag in expired_tags: logger.debug(f"Cleaning up expired binary request: tag={tag}") del self.pending_binary_requests[tag] - + async def handle_rx(self, data: bytearray): - packet_type_value = data[0] + dbuf = io.BytesIO(data) + packet_type_value = dbuf.read(1)[0] logger.debug(f"Received data: {data.hex()}") # Handle command responses @@ -78,23 +80,24 @@ class MessageReader: or packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value ): c = {} - c["public_key"] = data[1:33].hex() - c["type"] = data[33] - c["flags"] = data[34] - c["out_path_len"] = int.from_bytes(data[35:36], signed=True, byteorder="little") - plen = int.from_bytes(data[35:36], signed=True, byteorder="little") + c["public_key"] = dbuf.read(32).hex() + c["type"] = dbuf.read(1)[0] + c["flags"] = dbuf.read(1)[0] + plen = int.from_bytes(dbuf.read(1), signed=True, byteorder="little") + c["out_path_len"] = plen if plen == -1: plen = 0 - c["out_path"] = data[36 : 36 + plen].hex() - c["adv_name"] = data[100:132].decode("utf-8", "ignore").replace("\0", "") - c["last_advert"] = int.from_bytes(data[132:136], byteorder="little") + path = dbuf.read(64) + c["out_path"] = path[0:plen].hex() + c["adv_name"] = dbuf.read(32).decode("utf-8", "ignore").replace("\0", "") + c["last_advert"] = int.from_bytes(dbuf.read(4), byteorder="little") c["adv_lat"] = ( - int.from_bytes(data[136:140], byteorder="little", signed=True) / 1e6 + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 ) c["adv_lon"] = ( - int.from_bytes(data[140:144], byteorder="little", signed=True) / 1e6 + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 ) - c["lastmod"] = int.from_bytes(data[144:148], byteorder="little") + c["lastmod"] = int.from_bytes(dbuf.read(4), byteorder="little") if packet_type_value == PacketType.PUSH_CODE_NEW_ADVERT.value: await self.dispatcher.dispatch(Event(EventType.NEW_CONTACT, c)) @@ -103,7 +106,7 @@ class MessageReader: self.contacts[c["public_key"]] = c elif packet_type_value == PacketType.CONTACT_END.value: - lastmod = int.from_bytes(data[1:5], byteorder="little") + lastmod = int.from_bytes(dbuf.read(4), byteorder="little") attributes = { "lastmod": lastmod, } @@ -113,38 +116,39 @@ class MessageReader: elif packet_type_value == PacketType.SELF_INFO.value: self_info = {} - self_info["adv_type"] = data[1] - self_info["tx_power"] = data[2] - self_info["max_tx_power"] = data[3] - self_info["public_key"] = data[4:36].hex() + self_info["adv_type"] = dbuf.read(1)[0] + self_info["tx_power"] = dbuf.read(1)[0] + self_info["max_tx_power"] = dbuf.read(1)[0] + self_info["public_key"] = dbuf.read(32).hex() self_info["adv_lat"] = ( - int.from_bytes(data[36:40], byteorder="little", signed=True) / 1e6 + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 ) self_info["adv_lon"] = ( - int.from_bytes(data[40:44], byteorder="little", signed=True) / 1e6 + int.from_bytes(dbuf.read(4), byteorder="little", signed=True) / 1e6 ) - self_info["multi_acks"] = data[44] - self_info["adv_loc_policy"] = data[45] - self_info["telemetry_mode_env"] = (data[46] >> 4) & 0b11 - self_info["telemetry_mode_loc"] = (data[46] >> 2) & 0b11 - self_info["telemetry_mode_base"] = (data[46]) & 0b11 - self_info["manual_add_contacts"] = data[47] > 0 + self_info["multi_acks"] = dbuf.read(1)[0] + self_info["adv_loc_policy"] = dbuf.read(1)[0] + telemetry_mode = dbuf.read(1)[0] + self_info["telemetry_mode_env"] = (telemetry_mode >> 4) & 0b11 + self_info["telemetry_mode_loc"] = (telemetry_mode >> 2) & 0b11 + self_info["telemetry_mode_base"] = (telemetry_mode) & 0b11 + self_info["manual_add_contacts"] = dbuf.read(1)[0] > 0 self_info["radio_freq"] = ( - int.from_bytes(data[48:52], byteorder="little") / 1000 + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 ) self_info["radio_bw"] = ( - int.from_bytes(data[52:56], byteorder="little") / 1000 + int.from_bytes(dbuf.read(4), byteorder="little") / 1000 ) - self_info["radio_sf"] = data[56] - self_info["radio_cr"] = data[57] - self_info["name"] = data[58:].decode("utf-8", "ignore") + self_info["radio_sf"] = dbuf.read(1)[0] + self_info["radio_cr"] = dbuf.read(1)[0] + self_info["name"] = dbuf.read().decode("utf-8", "ignore") await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info)) elif packet_type_value == PacketType.MSG_SENT.value: res = {} - res["type"] = data[1] - res["expected_ack"] = bytes(data[2:6]) - res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder="little") + res["type"] = dbuf.read(1)[0] + res["expected_ack"] = dbuf.read(4) + res["suggested_timeout"] = int.from_bytes(dbuf.read(4), byteorder="little") attributes = { "type": res["type"], @@ -156,15 +160,14 @@ class MessageReader: elif packet_type_value == PacketType.CONTACT_MSG_RECV.value: res = {} res["type"] = "PRIV" - res["pubkey_prefix"] = data[1:7].hex() - res["path_len"] = data[7] - res["txt_type"] = data[8] - res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder="little") - if data[8] == 2: - res["signature"] = data[13:17].hex() - res["text"] = data[17:].decode("utf-8", "ignore") - else: - res["text"] = data[13:].decode("utf-8", "ignore") + res["pubkey_prefix"] = dbuf.read(6).hex() + res["path_len"] = dbuf.read(1)[0] + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") attributes = { "pubkey_prefix": res["pubkey_prefix"], @@ -178,16 +181,16 @@ class MessageReader: elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} res["type"] = "PRIV" - res["SNR"] = int.from_bytes(data[1:2], byteorder="little", signed=True) / 4 - res["pubkey_prefix"] = data[4:10].hex() - res["path_len"] = data[10] - res["txt_type"] = data[11] - res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder="little") - if data[11] == 2: - res["signature"] = data[16:20].hex() - res["text"] = data[20:].decode("utf-8", "ignore") - else: - res["text"] = data[16:].decode("utf-8", "ignore") + res["SNR"] = int.from_bytes(dbuf.read(2), byteorder="little", signed=True) / 4 + dbuf.read(1) # reserved + res["pubkey_prefix"] = dbuf.read(6).hex() + res["path_len"] = dbuf.read(1)[0] + txt_type = dbuf.read(1)[0] + res["txt_type"] = txt_type + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + if txt_type == 2: + res["signature"] = dbuf.read(4).hex() + res["text"] = dbuf.read().decode("utf-8", "ignore") attributes = { "pubkey_prefix": res["pubkey_prefix"], @@ -201,11 +204,11 @@ class MessageReader: elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value: res = {} res["type"] = "CHAN" - res["channel_idx"] = data[1] - res["path_len"] = data[2] - res["txt_type"] = data[3] - res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder="little") - res["text"] = data[8:].decode("utf-8", "ignore") + res["channel_idx"] = dbuf.read(1)[0] + res["path_len"] = dbuf.read(1)[0] + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + res["text"] = dbuf.read().decode("utf-8", "ignore") attributes = { "channel_idx": res["channel_idx"], @@ -219,12 +222,13 @@ class MessageReader: elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3) res = {} res["type"] = "CHAN" - res["SNR"] = int.from_bytes(data[1:2], byteorder="little", signed=True) / 4 - res["channel_idx"] = data[4] - res["path_len"] = data[5] - res["txt_type"] = data[6] - res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder="little") - res["text"] = data[11:].decode("utf-8", "ignore") + res["SNR"] = int.from_bytes(dbuf.read(2), byteorder="little", signed=True) / 4 + dbuf.read(1) # reserved + res["channel_idx"] = dbuf.read(1)[0] + res["path_len"] = dbuf.read(1)[0] + res["txt_type"] = dbuf.read(1)[0] + res["sender_timestamp"] = int.from_bytes(dbuf.read(4), byteorder="little") + res["text"] = dbuf.read().decode("utf-8", "ignore") attributes = { "channel_idx": res["channel_idx"], @@ -236,7 +240,7 @@ class MessageReader: ) elif packet_type_value == PacketType.CURRENT_TIME.value: - time_value = int.from_bytes(data[1:5], byteorder="little") + time_value = int.from_bytes(dbuf.read(4), byteorder="little") result = {"time": time_value} await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result)) @@ -245,34 +249,34 @@ class MessageReader: await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, result)) elif packet_type_value == PacketType.CONTACT_URI.value: - contact_uri = "meshcore://" + data[1:].hex() + contact_uri = "meshcore://" + dbuf.read().hex() result = {"uri": contact_uri} await self.dispatcher.dispatch(Event(EventType.CONTACT_URI, result)) elif packet_type_value == PacketType.BATTERY.value: - battery_level = int.from_bytes(data[1:3], byteorder="little") + battery_level = int.from_bytes(dbuf.read(2), byteorder="little") result = {"level": battery_level} if len(data) > 3: # has storage info as well - result["used_kb"] = int.from_bytes(data[3:7], byteorder="little") - result["total_kb"] = int.from_bytes(data[7:11], byteorder="little") + result["used_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") + result["total_kb"] = int.from_bytes(dbuf.read(4), byteorder="little") await self.dispatcher.dispatch(Event(EventType.BATTERY, result)) elif packet_type_value == PacketType.DEVICE_INFO.value: res = {} - res["fw ver"] = data[1] + res["fw ver"] = dbuf.read(1)[0] if data[1] >= 3: - res["max_contacts"] = data[2] * 2 - res["max_channels"] = data[3] - res["ble_pin"] = int.from_bytes(data[4:8], byteorder="little") - res["fw_build"] = data[8:20].decode("utf-8", "ignore").replace("\0", "") - res["model"] = data[20:60].decode("utf-8", "ignore").replace("\0", "") - res["ver"] = data[60:80].decode("utf-8", "ignore").replace("\0", "") + res["max_contacts"] = dbuf.read(1)[0] * 2 + res["max_channels"] = dbuf.read(1)[0] + res["ble_pin"] = int.from_bytes(dbuf.read(4), byteorder="little") + res["fw_build"] = dbuf.read(12).decode("utf-8", "ignore").replace("\0", "") + res["model"] = dbuf.read(40).decode("utf-8", "ignore").replace("\0", "") + res["ver"] = dbuf.read(20).decode("utf-8", "ignore").replace("\0", "") await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res)) elif packet_type_value == PacketType.CUSTOM_VARS.value: logger.debug(f"received custom vars response: {data.hex()}") res = {} - rawdata = data[1:].decode("utf-8", "ignore") + rawdata = dbuf.read().decode("utf-8", "ignore") if not rawdata == "": pairs = rawdata.split(",") for p in pairs: @@ -284,30 +288,30 @@ class MessageReader: elif packet_type_value == PacketType.CHANNEL_INFO.value: logger.debug(f"received channel info response: {data.hex()}") res = {} - res["channel_idx"] = data[1] + res["channel_idx"] = dbuf.read(1)[0] # Channel name is null-terminated, so find the first null byte - name_bytes = data[2:34] + name_bytes = dbuf.read(32) null_pos = name_bytes.find(0) if null_pos >= 0: res["channel_name"] = name_bytes[:null_pos].decode("utf-8", "ignore") else: res["channel_name"] = name_bytes.decode("utf-8", "ignore") - res["channel_secret"] = data[34:50] + res["channel_secret"] = dbuf.read(16) await self.dispatcher.dispatch(Event(EventType.CHANNEL_INFO, res, res)) # Push notifications elif packet_type_value == PacketType.ADVERTISEMENT.value: logger.debug("Advertisement received") res = {} - res["public_key"] = data[1:33].hex() + res["public_key"] = dbuf.read(32).hex() await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, res, res)) elif packet_type_value == PacketType.PATH_UPDATE.value: logger.debug("Code path update") res = {} - res["public_key"] = data[1:33].hex() + res["public_key"] = dbuf.read(32).hex() await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, res, res)) elif packet_type_value == PacketType.ACK.value: @@ -315,7 +319,7 @@ class MessageReader: ack_data = {} if len(data) >= 5: - ack_data["code"] = bytes(data[1:5]).hex() + ack_data["code"] = dbuf.read(4).hex() attributes = {"code": ack_data.get("code", "")} @@ -327,23 +331,24 @@ class MessageReader: elif packet_type_value == PacketType.RAW_DATA.value: res = {} - res["SNR"] = data[1] / 4 - res["RSSI"] = data[2] - res["payload"] = data[4:].hex() + res["SNR"] = dbuf.read(1)[0] / 4 + res["RSSI"] = dbuf.read(1)[0] + res["payload"] = dbuf.read(4).hex() logger.debug("Received raw data") print(res) await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res)) elif packet_type_value == PacketType.LOGIN_SUCCESS.value: res = {} + attributes = {} if len(data) > 1: - res["permissions"] = data[1] - res["is_admin"] = (data[1] & 1) == 1 # Check if admin bit is set + perms = dbuf.read(1)[0] + res["permissions"] = perms + res["is_admin"] = (perms & 1) == 1 # Check if admin bit is set - if len(data) > 7: - res["pubkey_prefix"] = data[2:8].hex() + res["pubkey_prefix"] = dbuf.read(6).hex() - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} await self.dispatcher.dispatch( Event(EventType.LOGIN_SUCCESS, res, attributes) @@ -351,11 +356,14 @@ class MessageReader: elif packet_type_value == PacketType.LOGIN_FAILED.value: res = {} + attributes = {} + + pbuf.read(1) if len(data) > 7: - res["pubkey_prefix"] = data[2:8].hex() + res["pubkey_prefix"] = pbuf.read(6).hex() - attributes = {"pubkey_prefix": res.get("pubkey_prefix")} + attributes = {"pubkey_prefix": res.get("pubkey_prefix")} await self.dispatcher.dispatch( Event(EventType.LOGIN_FAILED, res, attributes) @@ -501,27 +509,27 @@ class MessageReader: logger.debug(f"Received binary data: {data.hex()}") tag = data[2:6].hex() response_data = data[6:] - + # Always dispatch generic BINARY_RESPONSE binary_res = {"tag": tag, "data": response_data.hex()} await self.dispatcher.dispatch( Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) ) - + # Check for tracked request type and dispatch specific response if tag in self.pending_binary_requests: request_type = self.pending_binary_requests[tag]["request_type"] pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] del self.pending_binary_requests[tag] logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") - + if request_type == BinaryReqType.STATUS and len(response_data) >= 52: res = {} res = parse_status(response_data, pubkey_prefix=pubkey_prefix) await self.dispatcher.dispatch( Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) ) - + elif request_type == BinaryReqType.TELEMETRY: try: lpp = lpp_parse(response_data) @@ -531,7 +539,7 @@ class MessageReader: ) except Exception as e: logger.error(f"Error parsing binary telemetry response: {e}") - + elif request_type == BinaryReqType.MMA: try: mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header @@ -541,7 +549,7 @@ class MessageReader: ) except Exception as e: logger.error(f"Error parsing binary MMA response: {e}") - + elif request_type == BinaryReqType.ACL: try: acl_result = parse_acl(response_data)