diff --git a/README.md b/README.md index 6c1aff3..21738f6 100644 --- a/README.md +++ b/README.md @@ -399,6 +399,182 @@ async def channel_handler(event): meshcore.subscribe(EventType.CHANNEL_MSG_RECV, channel_handler) ``` +## API Reference + +### Event Types + +All events in MeshCore are represented by the `EventType` enum. These events are dispatched by the library and can be subscribed to: + +| Event Type | String Value | Description | Typical Payload | +|------------|-------------|-------------|-----------------| +| **Device & Status Events** ||| +| `SELF_INFO` | `"self_info"` | Device's own information after appstart | Device configuration, public key, coordinates | +| `DEVICE_INFO` | `"device_info"` | Device capabilities and firmware info | Firmware version, model, max contacts/channels | +| `BATTERY` | `"battery_info"` | Battery level and storage info | Battery level, used/total storage | +| `CURRENT_TIME` | `"time_update"` | Device time response | Current timestamp | +| `STATUS_RESPONSE` | `"status_response"` | Device status statistics | Battery, TX queue, noise floor, packet counts | +| `CUSTOM_VARS` | `"custom_vars"` | Custom variable responses | Key-value pairs of custom variables | +| **Contact Events** ||| +| `CONTACTS` | `"contacts"` | Contact list response | Dictionary of contacts by public key | +| `NEW_CONTACT` | `"new_contact"` | New contact discovered | Contact information | +| `CONTACT_URI` | `"contact_uri"` | Contact export URI | Shareable contact URI | +| **Messaging Events** ||| +| `CONTACT_MSG_RECV` | `"contact_message"` | Direct message received | Message text, sender prefix, timestamp | +| `CHANNEL_MSG_RECV` | `"channel_message"` | Channel message received | Message text, channel index, timestamp | +| `MSG_SENT` | `"message_sent"` | Message send confirmation | Expected ACK code, suggested timeout | +| `NO_MORE_MSGS` | `"no_more_messages"` | No pending messages | Empty payload | +| `MESSAGES_WAITING` | `"messages_waiting"` | Messages available notification | Empty payload | +| **Network Events** ||| +| `ADVERTISEMENT` | `"advertisement"` | Node advertisement detected | Public key of advertising node | +| `PATH_UPDATE` | `"path_update"` | Routing path update | Public key and path information | +| `ACK` | `"acknowledgement"` | Message acknowledgment | ACK code | +| `PATH_RESPONSE` | `"path_response"` | Path discovery response | Inbound/outbound path data | +| `TRACE_DATA` | `"trace_data"` | Route trace information | Path with SNR data for each hop | +| **Telemetry Events** ||| +| `TELEMETRY_RESPONSE` | `"telemetry_response"` | Telemetry data response | LPP-formatted sensor data | +| `MMA_RESPONSE` | `"mma_response"` | Memory Management Area data | Min/max/avg telemetry over time range | +| `ACL_RESPONSE` | `"acl_response"` | Access Control List data | List of keys and permissions | +| **Channel Events** ||| +| `CHANNEL_INFO` | `"channel_info"` | Channel configuration | Channel name, secret, index | +| **Raw Data Events** ||| +| `RAW_DATA` | `"raw_data"` | Raw radio data | SNR, RSSI, payload hex | +| `RX_LOG_DATA` | `"rx_log_data"` | RF log data | SNR, RSSI, raw payload | +| `LOG_DATA` | `"log_data"` | Generic log data | Various log information | +| **Binary Protocol Events** ||| +| `BINARY_RESPONSE` | `"binary_response"` | Generic binary response | Tag and hex data | +| **Authentication Events** ||| +| `LOGIN_SUCCESS` | `"login_success"` | Successful login | Permissions, admin status, pubkey prefix | +| `LOGIN_FAILED` | `"login_failed"` | Failed login attempt | Pubkey prefix | +| **Command Response Events** ||| +| `OK` | `"command_ok"` | Command successful | Success confirmation, optional value | +| `ERROR` | `"command_error"` | Command failed | Error reason or code | +| **Connection Events** ||| +| `CONNECTED` | `"connected"` | Connection established | Connection details, reconnection status | +| `DISCONNECTED` | `"disconnected"` | Connection lost | Disconnection reason | + +### Available Commands + +All commands are async methods that return `Event` objects. Commands are organized into functional groups: + +#### Device Commands (`meshcore.commands.*`) + +| Command | Parameters | Returns | Description | +|---------|------------|---------|-------------| +| **Device Information** |||| +| `send_appstart()` | None | `SELF_INFO` | Get device self-information and configuration | +| `send_device_query()` | None | `DEVICE_INFO` | Query device capabilities and firmware info | +| `get_bat()` | None | `BATTERY` | Get battery level and storage information | +| `get_time()` | None | `CURRENT_TIME` | Get current device time | +| `get_self_telemetry()` | None | `TELEMETRY_RESPONSE` | Get device's own telemetry data | +| `get_custom_vars()` | None | `CUSTOM_VARS` | Retrieve all custom variables | +| **Device Configuration** |||| +| `set_name(name)` | `name: str` | `OK` | Set device name/identifier | +| `set_coords(lat, lon)` | `lat: float, lon: float` | `OK` | Set device GPS coordinates | +| `set_time(val)` | `val: int` | `OK` | Set device time (Unix timestamp) | +| `set_tx_power(val)` | `val: int` | `OK` | Set radio transmission power level | +| `set_devicepin(pin)` | `pin: int` | `OK` | Set device PIN for security | +| `set_custom_var(key, value)` | `key: str, value: str` | `OK` | Set custom variable | +| **Radio Configuration** |||| +| `set_radio(freq, bw, sf, cr)` | `freq: float, bw: float, sf: int, cr: int` | `OK` | Configure radio (freq MHz, bandwidth kHz, spreading factor, coding rate 5-8) | +| `set_tuning(rx_dly, af)` | `rx_dly: int, af: int` | `OK` | Set radio tuning parameters | +| **Telemetry Configuration** |||| +| `set_telemetry_mode_base(mode)` | `mode: int` | `OK` | Set base telemetry mode | +| `set_telemetry_mode_loc(mode)` | `mode: int` | `OK` | Set location telemetry mode | +| `set_telemetry_mode_env(mode)` | `mode: int` | `OK` | Set environmental telemetry mode | +| `set_manual_add_contacts(enabled)` | `enabled: bool` | `OK` | Enable/disable manual contact addition | +| `set_advert_loc_policy(policy)` | `policy: int` | `OK` | Set location advertisement policy | +| **Channel Management** |||| +| `get_channel(channel_idx)` | `channel_idx: int` | `CHANNEL_INFO` | Get channel configuration | +| `set_channel(channel_idx, name, secret)` | `channel_idx: int, name: str, secret: bytes` | `OK` | Configure channel (secret must be 16 bytes) | +| **Device Actions** |||| +| `send_advert(flood=False)` | `flood: bool` | `OK` | Send advertisement (optionally flood network) | +| `reboot()` | None | None | Reboot device (no response expected) | + +#### Contact Commands (`meshcore.commands.*`) + +| Command | Parameters | Returns | Description | +|---------|------------|---------|-------------| +| **Contact Management** |||| +| `get_contacts(lastmod=0)` | `lastmod: int` | `CONTACTS` | Get contact list (filter by last modification time) | +| `add_contact(contact)` | `contact: dict` | `OK` | Add new contact to device | +| `update_contact(contact, path, flags)` | `contact: dict, path: bytes, flags: int` | `OK` | Update existing contact | +| `remove_contact(key)` | `key: str/bytes` | `OK` | Remove contact by public key | +| **Contact Operations** |||| +| `reset_path(key)` | `key: str/bytes` | `OK` | Reset routing path for contact | +| `share_contact(key)` | `key: str/bytes` | `OK` | Share contact with network | +| `export_contact(key=None)` | `key: str/bytes/None` | `CONTACT_URI` | Export contact as URI (None exports node) | +| `import_contact(card_data)` | `card_data: bytes` | `OK` | Import contact from card data | +| **Contact Modification** |||| +| `change_contact_path(contact, path)` | `contact: dict, path: bytes` | `OK` | Change routing path for contact | +| `change_contact_flags(contact, flags)` | `contact: dict, flags: int` | `OK` | Change contact flags/settings | + +#### Messaging Commands (`meshcore.commands.*`) + +| Command | Parameters | Returns | Description | +|---------|------------|---------|-------------| +| **Message Handling** |||| +| `get_msg(timeout=None)` | `timeout: float` | `CONTACT_MSG_RECV/CHANNEL_MSG_RECV/NO_MORE_MSGS` | Get next pending message | +| `send_msg(dst, msg, timestamp=None)` | `dst: contact/str/bytes, msg: str, timestamp: int` | `MSG_SENT` | Send direct message | +| `send_cmd(dst, cmd, timestamp=None)` | `dst: contact/str/bytes, cmd: str, timestamp: int` | `MSG_SENT` | Send command message | +| `send_chan_msg(chan, msg, timestamp=None)` | `chan: int, msg: str, timestamp: int` | `MSG_SENT` | Send channel message | +| **Authentication** |||| +| `send_login(dst, pwd)` | `dst: contact/str/bytes, pwd: str` | `MSG_SENT` | Send login request | +| `send_logout(dst)` | `dst: contact/str/bytes` | `MSG_SENT` | Send logout request | +| **Information Requests** |||| +| `send_statusreq(dst)` | `dst: contact/str/bytes` | `MSG_SENT` | Request status from contact | +| `send_telemetry_req(dst)` | `dst: contact/str/bytes` | `MSG_SENT` | Request telemetry from contact | +| **Advanced Messaging** |||| +| `send_binary_req(dst, bin_data)` | `dst: contact/str/bytes, bin_data: bytes` | `MSG_SENT` | Send binary data request | +| `send_path_discovery(dst)` | `dst: contact/str/bytes` | `MSG_SENT` | Initiate path discovery | +| `send_trace(auth_code, tag, flags, path=None)` | `auth_code: int, tag: int, flags: int, path: list` | `MSG_SENT` | Send route trace packet | + +#### Binary Protocol Commands (`meshcore.commands.*`) + +| Command | Parameters | Returns | Description | +|---------|------------|---------|-------------| +| `req_status(contact, timeout=0)` | `contact: dict, timeout: float` | `STATUS_RESPONSE` | Get detailed status via binary protocol | +| `req_telemetry(contact, timeout=0)` | `contact: dict, timeout: float` | `TELEMETRY_RESPONSE` | Get telemetry via binary protocol | +| `req_mma(contact, start, end, timeout=0)` | `contact: dict, start: int, end: int, timeout: float` | `MMA_RESPONSE` | Get historical telemetry data | +| `req_acl(contact, timeout=0)` | `contact: dict, timeout: float` | `ACL_RESPONSE` | Get access control list | + +### Helper Methods + +| Method | Returns | Description | +|--------|---------|-------------| +| `get_contact_by_name(name)` | `dict/None` | Find contact by advertisement name | +| `get_contact_by_key_prefix(prefix)` | `dict/None` | Find contact by partial public key | +| `is_connected` | `bool` | Check if device is currently connected | +| `subscribe(event_type, callback, filters=None)` | `Subscription` | Subscribe to events with optional filtering | +| `unsubscribe(subscription)` | None | Remove event subscription | +| `wait_for_event(event_type, filters=None, timeout=None)` | `Event/None` | Wait for specific event | + +### Event Filtering + +Events can be filtered by their attributes when subscribing: + +```python +# Filter by public key prefix +meshcore.subscribe( + EventType.CONTACT_MSG_RECV, + handler, + attribute_filters={"pubkey_prefix": "a1b2c3d4e5f6"} +) + +# Filter by channel index +meshcore.subscribe( + EventType.CHANNEL_MSG_RECV, + handler, + attribute_filters={"channel_idx": 0} +) + +# Filter acknowledgments by code +meshcore.subscribe( + EventType.ACK, + handler, + attribute_filters={"code": "12345678"} +) +``` + ## Examples in the Repo Check the `examples/` directory for more: diff --git a/src/meshcore/binary_parsing.py b/src/meshcore/binary_parsing.py deleted file mode 100644 index 56531ac..0000000 --- a/src/meshcore/binary_parsing.py +++ /dev/null @@ -1,71 +0,0 @@ -import logging -from enum import Enum -import json -from cayennelpp import LppFrame, LppData -from cayennelpp.lpp_type import LppType -from .lpp_json_encoder import lpp_json_encoder, my_lpp_types, lpp_format_val - -logger = logging.getLogger("meshcore") - - -class BinaryReqType(Enum): - STATUS = 0x01 - KEEP_ALIVE = 0x02 - TELEMETRY = 0x03 - MMA = 0x04 - ACL = 0x05 - - -def lpp_parse(buf): - """Parse a given byte string and return as a LppFrame object.""" - i = 0 - lpp_data_list = [] - while i < len(buf) and buf[i] != 0: - lppdata = LppData.from_bytes(buf[i:]) - lpp_data_list.append(lppdata) - i = i + len(lppdata) - - return json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder)) - - -def lpp_parse_mma(buf): - i = 0 - res = [] - while i < len(buf) and buf[i] != 0: - chan = buf[i] - i = i + 1 - type = buf[i] - lpp_type = LppType.get_lpp_type(type) - if lpp_type is None: - logger.error(f"Unknown LPP type: {type}") - return None - size = lpp_type.size - i = i + 1 - min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) - i = i + size - max = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) - i = i + size - avg = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) - i = i + size - res.append( - { - "channel": chan, - "type": my_lpp_types[type][0], - "min": min, - "max": max, - "avg": avg, - } - ) - return res - - -def parse_acl(buf): - i = 0 - res = [] - while i + 7 <= len(buf): - key = buf[i : i + 6].hex() - perm = buf[i + 6] - if key != "000000000000": - res.append({"key": key, "perm": perm}) - i = i + 7 - return res \ No newline at end of file diff --git a/src/meshcore/commands/base.py b/src/meshcore/commands/base.py index e16df57..8f416b1 100644 --- a/src/meshcore/commands/base.py +++ b/src/meshcore/commands/base.py @@ -1,7 +1,10 @@ import asyncio import logging +import random from typing import Any, Callable, Coroutine, Dict, List, Optional, Union +from meshcore.packets import BinaryReqType + from ..events import Event, EventDispatcher, EventType from ..reader import MessageReader @@ -52,21 +55,14 @@ def _validate_destination(dst: DestinationType, prefix_length: int = 6) -> bytes class CommandHandlerBase: DEFAULT_TIMEOUT = 5.0 - MAX_QUEUE_SIZE = 100 - def __init__(self, default_timeout: Optional[float] = None, max_queue_size: Optional[int] = None): + def __init__(self, default_timeout: Optional[float] = None): self._sender_func: Optional[Callable[[bytes], Coroutine[Any, Any, None]]] = None self._reader: Optional[MessageReader] = None self.dispatcher: Optional[EventDispatcher] = None self.default_timeout = ( default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT ) - - max_size = max_queue_size if max_queue_size is not None else self.MAX_QUEUE_SIZE - self._command_queue = asyncio.Queue(maxsize=max_size) - self._start_lock = asyncio.Lock() # Only for start/stop operations - self._queue_processor_task: Optional[asyncio.Task] = None - self._is_running = False def set_connection(self, connection: Any) -> None: async def sender(data: bytes) -> None: @@ -87,48 +83,7 @@ class CommandHandlerBase: timeout: Optional[float] = None, ) -> Event: """ - Queue a command for execution and wait for the response. - - Args: - data: The data to send - expected_events: EventType or list of EventTypes to wait for - timeout: Timeout in seconds, or None to use default_timeout - - Returns: - Event: The full event object that was received in response to the command - - Raises: - RuntimeError: If the command queue is full - """ - async with self._start_lock: - if not self._is_running: - await self._start_queue_processor() - - future = asyncio.Future() - - try: - await asyncio.wait_for( - self._command_queue.put((data, expected_events, timeout, future)), - timeout=1.0 - ) - except asyncio.TimeoutError: - future.set_exception(RuntimeError( - f"Command queue is full ({self._command_queue.maxsize} commands pending)" - )) - except Exception as e: - future.set_exception(e) - - return await future - - async def _send_internal( - self, - data: bytes, - expected_events: Optional[Union[EventType, List[EventType]]] = None, - timeout: Optional[float] = None, - ) -> Event: - """ - Internal method that does the actual sending and waiting for events. - This runs inside the queue processor with lock protection. + Send a command and wait for expected event responses. Args: data: The data to send @@ -141,6 +96,7 @@ class CommandHandlerBase: if not self.dispatcher: raise RuntimeError("Dispatcher not set, cannot send commands") + # Use the provided timeout or fall back to default_timeout timeout = timeout if timeout is not None else self.default_timeout if self._sender_func: @@ -151,11 +107,13 @@ class CommandHandlerBase: if expected_events: try: + # Convert single event to list if needed if not isinstance(expected_events, list): expected_events = [expected_events] logger.debug(f"Waiting for events {expected_events}, timeout={timeout}") + # Create futures for all expected events futures = [] for event_type in expected_events: future = asyncio.create_task( @@ -163,18 +121,22 @@ class CommandHandlerBase: ) futures.append(future) + # Wait for the first event to complete or all to timeout done, pending = await asyncio.wait( futures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED ) + # Cancel all pending futures for future in pending: future.cancel() + # Check if any future completed successfully for future in done: event = await future if event: return event + # Create an error event when no event is received return Event(EventType.ERROR, {"reason": "no_event_received"}) except asyncio.TimeoutError: logger.debug(f"Command timed out {data}") @@ -182,107 +144,26 @@ class CommandHandlerBase: except Exception as e: logger.debug(f"Command error: {e}") return Event(EventType.ERROR, {"error": str(e)}) + # For commands that don't expect events, return a success event return Event(EventType.OK, {}) - async def start_queue_processor(self): - """ - Start the command queue processor. - This should be called once when the connection is established. - """ - async with self._start_lock: - if not self._is_running: - await self._start_queue_processor() - - async def _start_queue_processor(self): - """Internal method to start the background queue processor.""" - if not self._queue_processor_task or self._queue_processor_task.done(): - self._is_running = True - self._queue_processor_task = asyncio.create_task(self._process_queue()) - logger.debug("Started command queue processor") + # attached at base because its a common method + async def send_binary_req(self, dst: DestinationType, request_type: BinaryReqType, data: Optional[bytes] = None, timeout=None) -> Event: + dst_bytes = _validate_destination(dst, prefix_length=32) + pubkey_prefix = _validate_destination(dst, prefix_length=6) + logger.debug(f"Binary request to {dst_bytes.hex()}") + data = b"\x32" + dst_bytes + request_type.value.to_bytes(1, "little", signed=False) + (data if data else b"") - async def _process_queue(self): - """Process commands from the queue sequentially.""" - logger.debug("Command queue processor started") - while self._is_running: - try: - item = await self._command_queue.get() - - # kill queue signal - if item is None: - logger.debug("Received shutdown sentinel") - break - - data, expected_events, timeout, future = item - - if future.cancelled(): - continue - - try: - logger.debug(f"Processing queued command: {data.hex() if isinstance(data, bytes) else data}") - result = await self._send_internal(data, expected_events, timeout) - - if not future.cancelled(): - future.set_result(result) - except Exception as e: - logger.error(f"Error processing command: {e}") - if not future.cancelled(): - future.set_exception(e) - - # Small delay between commands to avoid overwhelming the device - await asyncio.sleep(0.01) - - except asyncio.CancelledError: - logger.debug("Queue processor cancelled") - break - except Exception as e: - logger.error(f"Queue processor error: {e}") - # Continue processing even if there was an error + result = await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) - logger.debug("Command queue processor stopped") - - async def stop_queue_processor(self): - """Stop the queue processor gracefully.""" - logger.debug("Stopping command queue processor") - - if not self._is_running: - return + # Register the request with the reader if we have both reader and request_type + if (result.type == EventType.MSG_SENT and + self._reader is not None and + request_type is not None): - self._is_running = False - - try: - # send kill signal and wait for it to be processed - await asyncio.wait_for(self._command_queue.put(None), timeout=1.0) - except asyncio.TimeoutError: - logger.warning("Could not send shutdown sentinel (queue may be full)") - - if self._queue_processor_task: - try: - await asyncio.wait_for(self._queue_processor_task, timeout=2.0) - except asyncio.TimeoutError: - logger.warning("Queue processor did not stop gracefully, cancelling") - self._queue_processor_task.cancel() - try: - await self._queue_processor_task - except asyncio.CancelledError: - pass - self._queue_processor_task = None - - cancelled_count = 0 - while not self._command_queue.empty(): - try: - item = self._command_queue.get_nowait() - if item is None: - continue - if isinstance(item, tuple) and len(item) == 4: - _, _, _, future = item - if not future.cancelled(): - future.cancel() - cancelled_count += 1 - except Exception as e: - logger.debug(f"Error during cleanup: {e}") - break - - if cancelled_count > 0: - logger.debug(f"Cancelled {cancelled_count} pending commands") - - logger.debug("Command queue processor stopped") + exp_tag = result.payload["expected_ack"].hex() + # Use provided timeout or fallback to suggested timeout (with 5s default) + actual_timeout = timeout if timeout is not None and timeout > 0 else result.payload.get("suggested_timeout", 4000) / 800.0 + self._reader.register_binary_request(pubkey_prefix.hex(), exp_tag, request_type, actual_timeout) + + return result \ No newline at end of file diff --git a/src/meshcore/commands/binary.py b/src/meshcore/commands/binary.py index 2f456d6..2f3138a 100644 --- a/src/meshcore/commands/binary.py +++ b/src/meshcore/commands/binary.py @@ -1,20 +1,21 @@ import logging -from mailbox import Message -from meshcore.commands.messaging import MessagingCommands from .base import CommandHandlerBase from ..events import EventType -from ..binary_parsing import BinaryReqType, lpp_parse, lpp_parse_mma, parse_acl +from ..packets import BinaryReqType logger = logging.getLogger("meshcore") -class BinaryCommandHandler(MessagingCommands): +class BinaryCommandHandler(CommandHandlerBase): """Helper functions to handle binary requests through binary commands""" - async def req_status(self, contact, timeout=0): - res = await self.send_binary_req(contact, BinaryReqType.STATUS.value.to_bytes(1, "little")) + res = await self.send_binary_req( + contact, + BinaryReqType.STATUS, + timeout=timeout + ) if res.type == EventType.ERROR: return None @@ -24,31 +25,32 @@ class BinaryCommandHandler(MessagingCommands): if self.dispatcher is None: return None - # Listen for STATUS_RESPONSE event with matching pubkey - contact_pubkey_prefix = contact["public_key"][0:12] status_event = await self.dispatcher.wait_for_event( EventType.STATUS_RESPONSE, - attribute_filters={"pubkey_prefix": contact_pubkey_prefix}, + attribute_filters={"tag": exp_tag}, timeout=timeout, ) return status_event.payload if status_event else None async def req_telemetry(self, contact, timeout=0): - res = await self.send_binary_req(contact, BinaryReqType.TELEMETRY.value.to_bytes(1, "little")) + res = await self.send_binary_req( + contact, + BinaryReqType.TELEMETRY, + timeout=timeout + ) if res.type == EventType.ERROR: return None timeout = res.payload["suggested_timeout"] / 800 if timeout == 0 else timeout - + if self.dispatcher is None: return None - # Listen for TELEMETRY_RESPONSE event with matching pubkey - contact_pubkey_prefix = contact["public_key"][0:12] + # Listen for TELEMETRY_RESPONSE event telem_event = await self.dispatcher.wait_for_event( EventType.TELEMETRY_RESPONSE, - attribute_filters={"pubkey_prefix": contact_pubkey_prefix}, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, timeout=timeout, ) @@ -56,12 +58,16 @@ class BinaryCommandHandler(MessagingCommands): async def req_mma(self, contact, start, end, timeout=0): req = ( - BinaryReqType.MMA.value.to_bytes(1, "little", signed=False) - + start.to_bytes(4, "little", signed=False) + start.to_bytes(4, "little", signed=False) + end.to_bytes(4, "little", signed=False) + b"\0\0" ) - res = await self.send_binary_req(contact, req) + res = await self.send_binary_req( + contact, + BinaryReqType.MMA, + data=req, + timeout=timeout + ) if res.type == EventType.ERROR: return None @@ -70,19 +76,23 @@ class BinaryCommandHandler(MessagingCommands): if self.dispatcher is None: return None - # Listen for MMA_RESPONSE event with matching pubkey - contact_pubkey_prefix = contact["public_key"][0:12] + # Listen for MMA_RESPONSE mma_event = await self.dispatcher.wait_for_event( EventType.MMA_RESPONSE, - attribute_filters={"pubkey_prefix": contact_pubkey_prefix}, + attribute_filters={"tag": res.payload["expected_ack"].hex()}, timeout=timeout, ) return mma_event.payload["mma_data"] if mma_event else None async def req_acl(self, contact, timeout=0): - req = BinaryReqType.ACL.value.to_bytes(1, "little", signed=False) + b"\0\0" - res = await self.send_binary_req(contact, req) + req = b"\0\0" + res = await self.send_binary_req( + contact, + BinaryReqType.ACL, + data=req, + timeout=timeout + ) if res.type == EventType.ERROR: return None diff --git a/src/meshcore/commands/messaging.py b/src/meshcore/commands/messaging.py index 898e78f..f511aa0 100644 --- a/src/meshcore/commands/messaging.py +++ b/src/meshcore/commands/messaging.py @@ -97,12 +97,6 @@ class MessagingCommands(CommandHandlerBase): data = b"\x27\x00\x00\x00" + dst_bytes return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) - async def send_binary_req(self, dst: DestinationType, bin_data) -> Event: - dst_bytes = _validate_destination(dst, prefix_length=32) - logger.debug(f"Binary request to {dst_bytes.hex()}") - data = b"\x32" + dst_bytes + bin_data - return await self.send(data, [EventType.MSG_SENT, EventType.ERROR]) - async def send_path_discovery(self, dst: DestinationType) -> Event: dst_bytes = _validate_destination(dst, prefix_length=32) logger.debug(f"Path discovery request for {dst_bytes.hex()}") diff --git a/src/meshcore/connection_manager.py b/src/meshcore/connection_manager.py index 4b46403..c95ec37 100644 --- a/src/meshcore/connection_manager.py +++ b/src/meshcore/connection_manager.py @@ -21,7 +21,7 @@ class ConnectionProtocol(Protocol): """Disconnect from the device/server.""" ... - async def send(self, data) -> Any: + async def send(self, data): """Send data through the connection.""" ... diff --git a/src/meshcore/events.py b/src/meshcore/events.py index 7feaa5f..cdc5d9b 100644 --- a/src/meshcore/events.py +++ b/src/meshcore/events.py @@ -1,4 +1,3 @@ -from collections.abc import Coroutine from enum import Enum import inspect import logging @@ -116,7 +115,7 @@ class EventDispatcher: def subscribe( self, event_type: Union[EventType, None], - callback: Callable[[Event], Coroutine[Any, Any, None]], + callback: Callable[[Event], Union[None, asyncio.Future]], attribute_filters: Optional[Dict[str, Any]] = None, ) -> Subscription: """ @@ -229,7 +228,7 @@ class EventDispatcher: """ future = asyncio.Future() - async def event_handler(event: Event): + def event_handler(event: Event): if not future.done(): future.set_result(event) diff --git a/src/meshcore/meshcore.py b/src/meshcore/meshcore.py index 2314da0..84e533f 100644 --- a/src/meshcore/meshcore.py +++ b/src/meshcore/meshcore.py @@ -162,10 +162,6 @@ class MeshCore: result = await self.connection_manager.connect() if result is None: raise ConnectionError("Failed to connect to device") - - # Start the command queue processor after successful connection - await self.commands.start_queue_processor() - return await self.commands.send_appstart() async def disconnect(self): @@ -177,9 +173,6 @@ class MeshCore: if hasattr(self, "_auto_fetch_subscription") and self._auto_fetch_subscription: await self.stop_auto_message_fetching() - # Stop the command queue processor - await self.commands.stop_queue_processor() - # Disconnect the connection object await self.connection_manager.disconnect() diff --git a/src/meshcore/packets.py b/src/meshcore/packets.py index 00b5b27..ce0e797 100644 --- a/src/meshcore/packets.py +++ b/src/meshcore/packets.py @@ -1,5 +1,11 @@ from enum import Enum +class BinaryReqType(Enum): + STATUS = 0x01 + KEEP_ALIVE = 0x02 + TELEMETRY = 0x03 + MMA = 0x04 + ACL = 0x05 # Packet prefixes for the protocol class PacketType(Enum): diff --git a/src/meshcore/parsing.py b/src/meshcore/parsing.py new file mode 100644 index 0000000..e1e0b2f --- /dev/null +++ b/src/meshcore/parsing.py @@ -0,0 +1,109 @@ +import logging +from enum import Enum +import json +from cayennelpp import LppFrame, LppData +from cayennelpp.lpp_type import LppType +from .lpp_json_encoder import lpp_json_encoder, my_lpp_types, lpp_format_val + +logger = logging.getLogger("meshcore") + + +def lpp_parse(buf): + """Parse a given byte string and return as a LppFrame object.""" + i = 0 + lpp_data_list = [] + while i < len(buf) and buf[i] != 0: + lppdata = LppData.from_bytes(buf[i:]) + lpp_data_list.append(lppdata) + i = i + len(lppdata) + + return json.loads(json.dumps(LppFrame(lpp_data_list), default=lpp_json_encoder)) + + +def lpp_parse_mma(buf): + i = 0 + res = [] + while i < len(buf) and buf[i] != 0: + chan = buf[i] + i = i + 1 + type = buf[i] + lpp_type = LppType.get_lpp_type(type) + if lpp_type is None: + logger.error(f"Unknown LPP type: {type}") + return None + size = lpp_type.size + i = i + 1 + min = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) + i = i + size + max = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) + i = i + size + avg = lpp_format_val(lpp_type, lpp_type.decode(buf[i : i + size])) + i = i + size + res.append( + { + "channel": chan, + "type": my_lpp_types[type][0], + "min": min, + "max": max, + "avg": avg, + } + ) + return res + + +def parse_acl(buf): + i = 0 + res = [] + while i + 7 <= len(buf): + key = buf[i : i + 6].hex() + perm = buf[i + 6] + if key != "000000000000": + res.append({"key": key, "perm": perm}) + i = i + 7 + return res + + +def parse_status(data, pubkey_prefix=None, offset=0): + """ + Parse binary data into a dictionary of fields. + + Args: + data: bytes object containing the data to parse + pubkey_prefix: Either a string prefix or None (if None, extract from data) + offset: Starting offset for field parsing (0 or 8) + + Returns: + Dictionary with parsed fields + """ + res = {} + + # Handle pubkey + if pubkey_prefix is None: + # Extract from data (format 1) + res["pubkey_pre"] = data[2:8].hex() + offset = 8 # Fields start at offset 8 + else: + # Use provided prefix (format 2) + res["pubkey_pre"] = pubkey_prefix + # offset stays as provided (typically 0) + + # Parse all fields with the given offset + res["bat"] = int.from_bytes(data[offset:offset+2], byteorder="little") + res["tx_queue_len"] = int.from_bytes(data[offset+2:offset+4], byteorder="little") + res["noise_floor"] = int.from_bytes(data[offset+4:offset+6], byteorder="little", signed=True) + res["last_rssi"] = int.from_bytes(data[offset+6:offset+8], byteorder="little", signed=True) + res["nb_recv"] = int.from_bytes(data[offset+8:offset+12], byteorder="little", signed=False) + res["nb_sent"] = int.from_bytes(data[offset+12:offset+16], byteorder="little", signed=False) + res["airtime"] = int.from_bytes(data[offset+16:offset+20], byteorder="little") + res["uptime"] = int.from_bytes(data[offset+20:offset+24], byteorder="little") + res["sent_flood"] = int.from_bytes(data[offset+24:offset+28], byteorder="little") + res["sent_direct"] = int.from_bytes(data[offset+28:offset+32], byteorder="little") + res["recv_flood"] = int.from_bytes(data[offset+32:offset+36], byteorder="little") + res["recv_direct"] = int.from_bytes(data[offset+36:offset+40], byteorder="little") + res["full_evts"] = int.from_bytes(data[offset+40:offset+42], byteorder="little") + res["last_snr"] = int.from_bytes(data[offset+42:offset+44], byteorder="little", signed=True) / 4 + res["direct_dups"] = int.from_bytes(data[offset+44:offset+46], byteorder="little") + res["flood_dups"] = int.from_bytes(data[offset+46:offset+48], byteorder="little") + res["rx_airtime"] = int.from_bytes(data[offset+48:offset+52], byteorder="little") + + return res \ No newline at end of file diff --git a/src/meshcore/reader.py b/src/meshcore/reader.py index 1b54f60..1ea8587 100644 --- a/src/meshcore/reader.py +++ b/src/meshcore/reader.py @@ -1,9 +1,10 @@ import logging import json +import time from typing import Any, Dict from .events import Event, EventType, EventDispatcher -from .packets import PacketType -from .binary_parsing import BinaryReqType, lpp_parse, lpp_parse_mma, parse_acl +from .packets import BinaryReqType, PacketType +from .parsing import lpp_parse, lpp_parse_mma, parse_acl, parse_status from cayennelpp import LppFrame, LppData from meshcore.lpp_json_encoder import lpp_json_encoder @@ -17,7 +18,35 @@ class MessageReader: # before events are dispatched self.contacts = {} # Temporary storage during contact list building self.contact_nb = 0 # Used for contact processing + + # Track pending binary requests by tag for proper response parsing + self.pending_binary_requests: Dict[str, Dict[str, Any]] = {} # tag -> {request_type, expires_at} + def register_binary_request(self, prefix: str, tag: str, request_type: BinaryReqType, timeout_seconds: float): + """Register a pending binary request for proper response parsing""" + # Clean up expired requests before adding new one + self.cleanup_expired_requests() + + expires_at = time.time() + timeout_seconds + self.pending_binary_requests[tag] = { + "request_type": request_type, + "pubkey_prefix": prefix, + "expires_at": expires_at + } + logger.debug(f"Registered binary request: tag={tag}, type={request_type}, expires in {timeout_seconds}s") + + def cleanup_expired_requests(self): + """Remove expired binary requests""" + current_time = time.time() + expired_tags = [ + tag for tag, info in self.pending_binary_requests.items() + if current_time > info["expires_at"] + ] + + for tag in expired_tags: + logger.debug(f"Cleaning up expired binary request: tag={tag}") + del self.pending_binary_requests[tag] + async def handle_rx(self, data: bytearray): packet_type_value = data[0] logger.debug(f"Received data: {data.hex()}") @@ -331,36 +360,13 @@ class MessageReader: ) elif packet_type_value == PacketType.STATUS_RESPONSE.value: - res = {} - res["pubkey_pre"] = data[2:8].hex() - res["bat"] = int.from_bytes(data[8:10], byteorder="little") - res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder="little") - res["noise_floor"] = int.from_bytes( - data[12:14], byteorder="little", signed=True - ) - res["last_rssi"] = int.from_bytes( - data[14:16], byteorder="little", signed=True - ) - res["nb_recv"] = int.from_bytes( - data[16:20], byteorder="little", signed=False - ) - res["nb_sent"] = int.from_bytes( - data[20:24], byteorder="little", signed=False - ) - res["airtime"] = int.from_bytes(data[24:28], byteorder="little") - res["uptime"] = int.from_bytes(data[28:32], byteorder="little") - res["sent_flood"] = int.from_bytes(data[32:36], byteorder="little") - res["sent_direct"] = int.from_bytes(data[36:40], byteorder="little") - res["recv_flood"] = int.from_bytes(data[40:44], byteorder="little") - res["recv_direct"] = int.from_bytes(data[44:48], byteorder="little") - res["full_evts"] = int.from_bytes(data[48:50], byteorder="little") - res["last_snr"] = ( - int.from_bytes(data[50:52], byteorder="little", signed=True) / 4 - ) - res["direct_dups"] = int.from_bytes(data[52:54], byteorder="little") - res["flood_dups"] = int.from_bytes(data[54:56], byteorder="little") - res["rx_airtime"] = int.from_bytes(data[56:60], byteorder="little") + res = parse_status(data, offset=8) + data_hex = data[8:].hex() + logger.debug(f"Status response: {data_hex}") + attributes = { + "pubkey_prefix": res["pubkey_pre"], + } data_hex = data[8:].hex() logger.debug(f"Status response: {data_hex}") @@ -491,112 +497,60 @@ class MessageReader: elif packet_type_value == PacketType.BINARY_RESPONSE.value: logger.debug(f"Received binary data: {data.hex()}") - res = {} - - res["tag"] = data[2:6].hex() - res["data"] = data[6:].hex() - - attributes = {"tag": res["tag"]} - - # Always dispatch the generic BINARY_RESPONSE event first - await self.dispatcher.dispatch( - Event(EventType.BINARY_RESPONSE, res, attributes) - ) - - # Parse the request type from the response data and dispatch specific events + tag = data[2:6].hex() response_data = data[6:] - if response_data: # Check if there's response data - request_type = response_data[0] + + # Always dispatch generic BINARY_RESPONSE + binary_res = {"tag": tag, "data": response_data.hex()} + await self.dispatcher.dispatch( + Event(EventType.BINARY_RESPONSE, binary_res, {"tag": tag}) + ) + + # Check for tracked request type and dispatch specific response + if tag in self.pending_binary_requests: + request_type = self.pending_binary_requests[tag]["request_type"] + pubkey_prefix = self.pending_binary_requests[tag]["pubkey_prefix"] + del self.pending_binary_requests[tag] + logger.debug(f"Processing binary response for tag {tag}, type {request_type}, pubkey_prefix {pubkey_prefix}") - if request_type == BinaryReqType.STATUS.value: - # Parse as status response - use same parsing as STATUS_RESPONSE - if len(response_data) >= 53: # Minimum size for status data - status_res = {} - status_res["pubkey_pre"] = data[2:8].hex() # Use pubkey from tag area - status_data = response_data[1:] # Skip the request type byte - - status_res["bat"] = int.from_bytes(status_data[0:2], byteorder="little") - status_res["tx_queue_len"] = int.from_bytes(status_data[2:4], byteorder="little") - status_res["noise_floor"] = int.from_bytes(status_data[4:6], byteorder="little", signed=True) - status_res["last_rssi"] = int.from_bytes(status_data[6:8], byteorder="little", signed=True) - status_res["nb_recv"] = int.from_bytes(status_data[8:12], byteorder="little", signed=False) - status_res["nb_sent"] = int.from_bytes(status_data[12:16], byteorder="little", signed=False) - status_res["airtime"] = int.from_bytes(status_data[16:20], byteorder="little") - status_res["uptime"] = int.from_bytes(status_data[20:24], byteorder="little") - status_res["sent_flood"] = int.from_bytes(status_data[24:28], byteorder="little") - status_res["sent_direct"] = int.from_bytes(status_data[28:32], byteorder="little") - status_res["recv_flood"] = int.from_bytes(status_data[32:36], byteorder="little") - status_res["recv_direct"] = int.from_bytes(status_data[36:40], byteorder="little") - status_res["full_evts"] = int.from_bytes(status_data[40:42], byteorder="little") - status_res["last_snr"] = int.from_bytes(status_data[42:44], byteorder="little", signed=True) / 4 - status_res["direct_dups"] = int.from_bytes(status_data[44:46], byteorder="little") - status_res["flood_dups"] = int.from_bytes(status_data[46:48], byteorder="little") - status_res["rx_airtime"] = int.from_bytes(status_data[48:52], byteorder="little") - - status_attributes = {"pubkey_prefix": status_res["pubkey_pre"]} - await self.dispatcher.dispatch( - Event(EventType.STATUS_RESPONSE, status_res, status_attributes) - ) + if request_type == BinaryReqType.STATUS and len(response_data) >= 52: + res = {} + res = parse_status(response_data, pubkey_prefix=pubkey_prefix) + await self.dispatcher.dispatch( + Event(EventType.STATUS_RESPONSE, res, {"pubkey_prefix": res["pubkey_pre"], "tag": tag}) + ) - elif request_type == BinaryReqType.TELEMETRY.value: - # Parse as telemetry response + elif request_type == BinaryReqType.TELEMETRY: try: - telemetry_data = response_data[1:] # Skip the request type byte - lpp = lpp_parse(telemetry_data) - - telem_res = { - "pubkey_pre": data[2:8].hex(), - "lpp": lpp - } - - telem_attributes = { - "raw": telemetry_data.hex(), - "pubkey_prefix": telem_res["pubkey_pre"] - } - + lpp = lpp_parse(response_data) + telem_res = {"tag": tag, "lpp": lpp, "pubkey_prefix": pubkey_prefix} await self.dispatcher.dispatch( - Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_attributes) + Event(EventType.TELEMETRY_RESPONSE, telem_res, telem_res) ) except Exception as e: logger.error(f"Error parsing binary telemetry response: {e}") - elif request_type == BinaryReqType.MMA.value: - # Parse as MMA response + elif request_type == BinaryReqType.MMA: try: - mma_data = response_data[5:] # Skip request type + 4 bytes header - mma_result = lpp_parse_mma(mma_data) - - mma_res = { - "pubkey_pre": data[2:8].hex(), - "mma_data": mma_result - } - - mma_attributes = {"pubkey_prefix": mma_res["pubkey_pre"]} - + mma_result = lpp_parse_mma(response_data[4:]) # Skip 4-byte header + mma_res = {"tag": tag, "mma_data": mma_result, "pubkey_prefix": pubkey_prefix} await self.dispatcher.dispatch( - Event(EventType.MMA_RESPONSE, mma_res, mma_attributes) + Event(EventType.MMA_RESPONSE, mma_res, mma_res) ) except Exception as e: logger.error(f"Error parsing binary MMA response: {e}") - elif request_type == BinaryReqType.ACL.value: - # Parse as ACL response + elif request_type == BinaryReqType.ACL: try: - acl_data = response_data[1:] # Skip the request type byte - acl_result = parse_acl(acl_data) - - acl_res = { - "pubkey_pre": data[2:8].hex(), - "acl_data": acl_result - } - - acl_attributes = {"pubkey_prefix": acl_res["pubkey_pre"]} - + acl_result = parse_acl(response_data) + acl_res = {"tag": tag, "acl_data": acl_result, "pubkey_prefix": pubkey_prefix} await self.dispatcher.dispatch( - Event(EventType.ACL_RESPONSE, acl_res, acl_attributes) + Event(EventType.ACL_RESPONSE, acl_res, {"tag": tag, "pubkey_prefix": pubkey_prefix}) ) except Exception as e: logger.error(f"Error parsing binary ACL response: {e}") + else: + logger.debug(f"No tracked request found for binary response tag {tag}") elif packet_type_value == PacketType.PATH_DISCOVERY_RESPONSE.value: logger.debug(f"Received path discovery response: {data.hex()}") diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py new file mode 100644 index 0000000..088d400 --- /dev/null +++ b/tests/unit/test_reader.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 + +import asyncio +from unittest.mock import AsyncMock +from meshcore.events import EventType +from meshcore.reader import MessageReader + +class MockDispatcher: + def __init__(self): + self.dispatched_events = [] + + async def dispatch(self, event): + self.dispatched_events.append(event) + print(f"Dispatched: {event.type} with payload keys: {list(event.payload.keys()) if hasattr(event.payload, 'keys') else event.payload}") + +import pytest + +@pytest.mark.asyncio +async def test_binary_response(): + mock_dispatcher = MockDispatcher() + reader = MessageReader(mock_dispatcher) + + packet_hex = "8c00417db968993acd42fc77c3bbd1f08b9b84c39756410c58cd03077162bcb489031869586ab4b103000000000000000000" + packet_data = bytearray.fromhex(packet_hex) + + print(f"Testing packet: {packet_hex}") + print(f"Packet type: 0x{packet_data[0]:02x} (should be 0x8c for BINARY_RESPONSE)") + + # Register the binary request first + tag = "417db968" + from meshcore.parsing import BinaryReqType + reader.register_binary_request(tag, BinaryReqType.ACL, 10.0) + print(f"Registered ACL request with tag {tag}") + + await reader.handle_rx(packet_data) + + # Check what was dispatched + print(f"\nTotal events dispatched: {len(mock_dispatcher.dispatched_events)}") + + # Verify BINARY_RESPONSE was dispatched + binary_responses = [e for e in mock_dispatcher.dispatched_events if e.type == EventType.BINARY_RESPONSE] + assert len(binary_responses) == 1, f"Expected 1 BINARY_RESPONSE, got {len(binary_responses)}" + print("✅ BINARY_RESPONSE event dispatched correctly") + + # Check the binary response payload + binary_event = binary_responses[0] + assert "tag" in binary_event.payload, "BINARY_RESPONSE should have 'tag' in payload" + assert "data" in binary_event.payload, "BINARY_RESPONSE should have 'data' in payload" + print(f"✅ Binary response tag: {binary_event.payload['tag']}") + print(f"✅ Binary response data: {binary_event.payload['data']}") + + # Check if a specific parsed event was also dispatched + other_events = [e for e in mock_dispatcher.dispatched_events if e.type != EventType.BINARY_RESPONSE] + if other_events: + print(f"✅ Additional parsed event dispatched: {other_events[0].type}") + print(f" Payload keys: {list(other_events[0].payload.keys()) if hasattr(other_events[0].payload, 'keys') else other_events[0].payload}") + else: + print("⚠️ No additional parsed event dispatched") + + # Parse the response data to see what request type it is + response_data = packet_data[6:] + if response_data: + request_type = response_data[0] + print(f"Request type in response: 0x{request_type:02x} ({request_type})") + + # Map request types to expected events + from meshcore.parsing import BinaryReqType + if request_type == BinaryReqType.STATUS.value: + expected_event = EventType.STATUS_RESPONSE + elif request_type == BinaryReqType.TELEMETRY.value: + expected_event = EventType.TELEMETRY_RESPONSE + elif request_type == BinaryReqType.MMA.value: + expected_event = EventType.MMA_RESPONSE + elif request_type == BinaryReqType.ACL.value: + expected_event = EventType.ACL_RESPONSE + else: + expected_event = None + + if expected_event: + specific_events = [e for e in mock_dispatcher.dispatched_events if e.type == expected_event] + if specific_events: + print(f"✅ Expected {expected_event} event was dispatched") + else: + print(f"❌ Expected {expected_event} event was NOT dispatched") + else: + print(f"⚠️ Unknown request type {request_type}, no specific event expected") + +if __name__ == "__main__": + asyncio.run(test_binary_response()) \ No newline at end of file