Move binary process to reader for consistent eventing

This commit is contained in:
Alex Wolden 2025-08-29 11:54:21 -07:00
parent 28957a4b60
commit 9aeffb41a1
5 changed files with 257 additions and 165 deletions

View file

@ -9,22 +9,20 @@ from meshcore.events import EventType
async def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='Get status from a repeater via serial connection')
# parser.add_argument('-p', '--port', required=True, help='Serial port')
# parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate')
parser.add_argument('-p', '--port', required=True, help='Serial port')
parser.add_argument('-b', '--baudrate', type=int, default=115200, help='Baud rate')
parser.add_argument('-r', '--repeater', required=True, help='Repeater name')
parser.add_argument('-pw', '--password', required=True, help='Password for login')
# parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds')
parser.add_argument('-t', '--timeout', type=float, default=10.0, help='Timeout for responses in seconds')
args = parser.parse_args()
# Connect to the device
mc = await MeshCore.create_ble("lora-py-tester")
534463
mc = await MeshCore.create_serial(args.port, args.baudrate, debug=True)
try:
# Get contacts
result = await mc.commands.get_contacts()
print(result)
print(mc._contacts)
repeater = mc.get_contact_by_key_prefix(args.repeater)
await mc.ensure_contacts()
repeater = mc.get_contact_by_name(args.repeater)
if repeater is None:
print(f"Repeater '{args.repeater}' not found in contacts.")
@ -37,25 +35,14 @@ async def main():
if login_event.type != EventType.ERROR:
print("Login successful")
# Continuously poll for telemetry every 60 seconds
print("Starting continuous telemetry polling every 60 seconds...")
while True:
try:
# Send status request
print("Sending status request...")
await mc.commands.send_telemetry_req(repeater)
# Wait for status response
telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=10)
print(telemetry_event)
# Wait 60 seconds before next poll
await asyncio.sleep(60)
except Exception as e:
print(f"Error during telemetry poll: {e}")
# Wait before retrying
await asyncio.sleep(60)
# Send status request
print("Sending status request...")
await mc.commands.send_telemetry_req(repeater)
# Wait for status response
telemetry_event = await mc.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=args.timeout)
print(telemetry_event.payload["lpp"])
else:
print("Login failed or timed out")

View file

@ -0,0 +1,71 @@
import logging
from enum import Enum
import json
from cayennelpp import LppFrame, LppData
from cayennelpp.lpp_type import LppType
from .lpp_json_encoder import lpp_json_encoder, my_lpp_types, lpp_format_val
logger = logging.getLogger("meshcore")
class BinaryReqType(Enum):
STATUS = 0x01
KEEP_ALIVE = 0x02
TELEMETRY = 0x03
MMA = 0x04
ACL = 0x05
def lpp_parse(buf):
"""Parse a given byte string and return as a LppFrame object."""
i = 0
lpp_data_list = []
while i < len(buf) and buf[i] != 0:
lppdata = LppData.from_bytes(buf[i:])
lpp_data_list.append(lppdata)
i = i + len(lppdata)
return json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder))
def lpp_parse_mma(buf):
i = 0
res = []
while i < len(buf) and buf[i] != 0:
chan = buf[i]
i = i + 1
type = buf[i]
lpp_type = LppType.get_lpp_type(type)
if lpp_type is None:
logger.error(f"Unknown LPP type: {type}")
return None
size = lpp_type.size
i = i + 1
min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
max = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
avg = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
res.append(
{
"channel": chan,
"type": my_lpp_types[type][0],
"min": min,
"max": max,
"avg": avg,
}
)
return res
def parse_acl(buf):
i = 0
res = []
while i + 7 <= len(buf):
key = buf[i : i + 6].hex()
perm = buf[i + 6]
if key != "000000000000":
res.append({"key": key, "perm": perm})
i = i + 7
return res

View file

@ -1,167 +1,102 @@
import logging
from enum import Enum
import json
from mailbox import Message
from meshcore.commands.messaging import MessagingCommands
from .base import CommandHandlerBase
from ..events import EventType
from cayennelpp import LppFrame, LppData
from cayennelpp.lpp_type import LppType
from ..lpp_json_encoder import lpp_json_encoder, my_lpp_types, lpp_format_val
from ..binary_parsing import BinaryReqType, lpp_parse, lpp_parse_mma, parse_acl
logger = logging.getLogger("meshcore")
class BinaryReqType(Enum):
STATUS = 0x01
KEEP_ALIVE = 0x02
TELEMETRY = 0x03
MMA = 0x04
ACL = 0x05
def lpp_parse(buf):
"""Parse a given byte string and return as a LppFrame object."""
i = 0
lpp_data_list = []
while i < len(buf) and buf[i] != 0:
lppdata = LppData.from_bytes(buf[i:])
lpp_data_list.append(lppdata)
i = i + len(lppdata)
return json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder))
def lpp_parse_mma(buf):
i = 0
res = []
while i < len(buf) and buf[i] != 0:
chan = buf[i]
i = i + 1
type = buf[i]
lpp_type = LppType.get_lpp_type(type)
if lpp_type is None:
logger.error(f"Unknown LPP type: {type}")
return None
size = lpp_type.size
i = i + 1
min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
max = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
avg = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size]))
i = i + size
res.append(
{
"channel": chan,
"type": my_lpp_types[type][0],
"min": min,
"max": max,
"avg": avg,
}
)
return res
def parse_acl(buf):
i = 0
res = []
while i + 7 <= len(buf):
key = buf[i : i + 6].hex()
perm = buf[i + 6]
if key != "000000000000":
res.append({"key": key, "perm": perm})
i = i + 7
return res
class BinaryCommandHandler(MessagingCommands):
"""Helper functions to handle binary requests through binary commands"""
async def req_binary(self, contact, request, timeout=0):
res = await self.send_binary_req(contact, request)
logger.debug(res)
if res.type == EventType.ERROR:
logger.error("Error while requesting binary data")
return None
else:
exp_tag = res.payload["expected_ack"].hex()
timeout = (
res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
)
if self.dispatcher is None:
logger.error("No dispatcher set, cannot wait for response")
return None
res2 = await self.dispatcher.wait_for_event(
EventType.BINARY_RESPONSE,
attribute_filters={"tag": exp_tag},
timeout=timeout,
)
logger.debug(res2)
if res2 is None:
return None
else:
return res2.payload
async def req_status(self, contact, timeout=0):
code = BinaryReqType.STATUS.value
req = code.to_bytes(1, "little", signed=False)
rep = await self.req_binary(contact, req, timeout)
if rep is None :
res = await self.send_binary_req(contact, BinaryReqType.STATUS.value.to_bytes(1, "little"))
if res.type == EventType.ERROR:
return None
else:
data=bytes.fromhex(rep["data"])
res = {}
res["pubkey_pre"] = contact["public_key"][0:12]
res["bat"] = int.from_bytes(data[0:2], byteorder="little")
res["tx_queue_len"] = int.from_bytes(data[2:4], byteorder="little")
res["noise_floor"] = int.from_bytes(data[4:6], byteorder="little", signed=True)
res["last_rssi"] = int.from_bytes(data[6:8], byteorder="little", signed=True)
res["nb_recv"] = int.from_bytes(data[8:12], byteorder="little", signed=False)
res["nb_sent"] = int.from_bytes(data[12:16], byteorder="little", signed=False)
res["airtime"] = int.from_bytes(data[16:20], byteorder="little")
res["uptime"] = int.from_bytes(data[20:24], byteorder="little")
res["sent_flood"] = int.from_bytes(data[24:28], byteorder="little")
res["sent_direct"] = int.from_bytes(data[28:32], byteorder="little")
res["recv_flood"] = int.from_bytes(data[32:36], byteorder="little")
res["recv_direct"] = int.from_bytes(data[36:40], byteorder="little")
res["full_evts"] = int.from_bytes(data[40:42], byteorder="little")
res["last_snr"] = (int.from_bytes(data[42:44], byteorder="little", signed=True) / 4)
res["direct_dups"] = int.from_bytes(data[44:46], byteorder="little")
res["flood_dups"] = int.from_bytes(data[46:48], byteorder="little")
res["rx_airtime"] = int.from_bytes(data[48:52], byteorder="little")
return res if res["uptime"] > 0 else None
exp_tag = res.payload["expected_ack"].hex()
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
if self.dispatcher is None:
return None
# Listen for STATUS_RESPONSE event with matching pubkey
contact_pubkey_prefix = contact["public_key"][0:12]
status_event = await self.dispatcher.wait_for_event(
EventType.STATUS_RESPONSE,
attribute_filters={"pubkey_prefix": contact_pubkey_prefix},
timeout=timeout,
)
return status_event.payload if status_event else None
async def req_telemetry(self, contact, timeout=0):
code = BinaryReqType.TELEMETRY.value
req = code.to_bytes(1, "little", signed=False)
res = await self.req_binary(contact, req, timeout)
if res is None:
res = await self.send_binary_req(contact, BinaryReqType.TELEMETRY.value.to_bytes(1, "little"))
if res.type == EventType.ERROR:
return None
else:
return lpp_parse(bytes.fromhex(res["data"]))
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
if self.dispatcher is None:
return None
# Listen for TELEMETRY_RESPONSE event with matching pubkey
contact_pubkey_prefix = contact["public_key"][0:12]
telem_event = await self.dispatcher.wait_for_event(
EventType.TELEMETRY_RESPONSE,
attribute_filters={"pubkey_prefix": contact_pubkey_prefix},
timeout=timeout,
)
return telem_event.payload["lpp"] if telem_event else None
async def req_mma(self, contact, start, end, timeout=0):
code = BinaryReqType.MMA.value
req = (
code.to_bytes(1, "little", signed=False)
BinaryReqType.MMA.value.to_bytes(1, "little", signed=False)
+ start.to_bytes(4, "little", signed=False)
+ end.to_bytes(4, "little", signed=False)
+ b"\0\0"
)
res = await self.req_binary(contact, req, timeout)
if res is None:
res = await self.send_binary_req(contact, req)
if res.type == EventType.ERROR:
return None
else:
return lpp_parse_mma(bytes.fromhex(res["data"])[4:])
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
if self.dispatcher is None:
return None
# Listen for MMA_RESPONSE event with matching pubkey
contact_pubkey_prefix = contact["public_key"][0:12]
mma_event = await self.dispatcher.wait_for_event(
EventType.MMA_RESPONSE,
attribute_filters={"pubkey_prefix": contact_pubkey_prefix},
timeout=timeout,
)
return mma_event.payload["mma_data"] if mma_event else None
async def req_acl(self, contact, timeout=0):
code = BinaryReqType.ACL.value
req = code.to_bytes(1, "little", signed=False) + b"\0\0"
res = await self.req_binary(contact, req, timeout)
if res is None:
req = BinaryReqType.ACL.value.to_bytes(1, "little", signed=False) + b"\0\0"
res = await self.send_binary_req(contact, req)
if res.type == EventType.ERROR:
return None
else:
return parse_acl(bytes.fromhex(res["data"]))
timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout
if self.dispatcher is None:
return None
# Listen for ACL_RESPONSE event with matching pubkey
contact_pubkey_prefix = contact["public_key"][0:12]
acl_event = await self.dispatcher.wait_for_event(
EventType.ACL_RESPONSE,
attribute_filters={"pubkey_prefix": contact_pubkey_prefix},
timeout=timeout,
)
return acl_event.payload["acl_data"] if acl_event else None

View file

@ -37,6 +37,8 @@ class EventType(Enum):
RX_LOG_DATA = "rx_log_data"
TELEMETRY_RESPONSE = "telemetry_response"
BINARY_RESPONSE = "binary_response"
MMA_RESPONSE = "mma_response"
ACL_RESPONSE = "acl_response"
CUSTOM_VARS = "custom_vars"
CHANNEL_INFO = "channel_info"
PATH_RESPONSE = "path_response"

View file

@ -3,6 +3,7 @@ import json
from typing import Any, Dict
from .events import Event, EventType, EventDispatcher
from .packets import PacketType
from .binary_parsing import BinaryReqType, lpp_parse, lpp_parse_mma, parse_acl
from cayennelpp import LppFrame, LppData
from meshcore.lpp_json_encoder import lpp_json_encoder
@ -497,10 +498,106 @@ class MessageReader:
attributes = {"tag": res["tag"]}
# Always dispatch the generic BINARY_RESPONSE event first
await self.dispatcher.dispatch(
Event(EventType.BINARY_RESPONSE, res, attributes)
)
# Parse the request type from the response data and dispatch specific events
response_data = data[6:]
if response_data: # Check if there's response data
request_type = response_data[0]
if request_type == BinaryReqType.STATUS.value:
# Parse as status response - use same parsing as STATUS_RESPONSE
if len(response_data) >= 53: # Minimum size for status data
status_res = {}
status_res["pubkey_pre"] = data[2:8].hex() # Use pubkey from tag area
status_data = response_data[1:] # Skip the request type byte
status_res["bat"] = int.from_bytes(status_data[0:2], byteorder="little")
status_res["tx_queue_len"] = int.from_bytes(status_data[2:4], byteorder="little")
status_res["noise_floor"] = int.from_bytes(status_data[4:6], byteorder="little", signed=True)
status_res["last_rssi"] = int.from_bytes(status_data[6:8], byteorder="little", signed=True)
status_res["nb_recv"] = int.from_bytes(status_data[8:12], byteorder="little", signed=False)
status_res["nb_sent"] = int.from_bytes(status_data[12:16], byteorder="little", signed=False)
status_res["airtime"] = int.from_bytes(status_data[16:20], byteorder="little")
status_res["uptime"] = int.from_bytes(status_data[20:24], byteorder="little")
status_res["sent_flood"] = int.from_bytes(status_data[24:28], byteorder="little")
status_res["sent_direct"] = int.from_bytes(status_data[28:32], byteorder="little")
status_res["recv_flood"] = int.from_bytes(status_data[32:36], byteorder="little")
status_res["recv_direct"] = int.from_bytes(status_data[36:40], byteorder="little")
status_res["full_evts"] = int.from_bytes(status_data[40:42], byteorder="little")
status_res["last_snr"] = int.from_bytes(status_data[42:44], byteorder="little", signed=True) / 4
status_res["direct_dups"] = int.from_bytes(status_data[44:46], byteorder="little")
status_res["flood_dups"] = int.from_bytes(status_data[46:48], byteorder="little")
status_res["rx_airtime"] = int.from_bytes(status_data[48:52], byteorder="little")
status_attributes = {"pubkey_prefix": status_res["pubkey_pre"]}
await self.dispatcher.dispatch(
Event(EventType.STATUS_RESPONSE, status_res, status_attributes)
)
elif request_type == BinaryReqType.TELEMETRY.value:
# Parse as telemetry response
try:
telemetry_data = response_data[1:] # Skip the request type byte
lpp = lpp_parse(telemetry_data)
telem_res = {
"pubkey_pre": data[2:8].hex(),
"lpp": lpp
}
telem_attributes = {
"raw": telemetry_data.hex(),
"pubkey_prefix": telem_res["pubkey_pre"]
}
await self.dispatcher.dispatch(
Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_attributes)
)
except Exception as e:
logger.error(f"Error parsing binary telemetry response: {e}")
elif request_type == BinaryReqType.MMA.value:
# Parse as MMA response
try:
mma_data = response_data[5:] # Skip request type + 4 bytes header
mma_result = lpp_parse_mma(mma_data)
mma_res = {
"pubkey_pre": data[2:8].hex(),
"mma_data": mma_result
}
mma_attributes = {"pubkey_prefix": mma_res["pubkey_pre"]}
await self.dispatcher.dispatch(
Event(EventType.MMA_RESPONSE, mma_res, mma_attributes)
)
except Exception as e:
logger.error(f"Error parsing binary MMA response: {e}")
elif request_type == BinaryReqType.ACL.value:
# Parse as ACL response
try:
acl_data = response_data[1:] # Skip the request type byte
acl_result = parse_acl(acl_data)
acl_res = {
"pubkey_pre": data[2:8].hex(),
"acl_data": acl_result
}
acl_attributes = {"pubkey_prefix": acl_res["pubkey_pre"]}
await self.dispatcher.dispatch(
Event(EventType.ACL_RESPONSE, acl_res, acl_attributes)
)
except Exception as e:
logger.error(f"Error parsing binary ACL response: {e}")
elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value:
logger.debug(f"Received path discovery response: {data.hex()}")
res = {}