diff --git a/examples/ble_t1000_chan_msg.py b/examples/ble_t1000_chan_msg.py index 996f718..0eacf7a 100755 --- a/examples/ble_t1000_chan_msg.py +++ b/examples/ble_t1000_chan_msg.py @@ -15,6 +15,6 @@ async def main () : mc = MeshCore(con) await mc.connect() - await mc.send_chan_msg(0, MSG) + await mc.commands.send_chan_msg(0, MSG) asyncio.run(main()) diff --git a/examples/ble_t1000_msg.py b/examples/ble_t1000_msg.py index c9ec543..d190cca 100755 --- a/examples/ble_t1000_msg.py +++ b/examples/ble_t1000_msg.py @@ -14,7 +14,11 @@ async def main () : mc = MeshCore(con) await mc.connect() - await mc.get_contacts() - await mc.commands.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG) + await mc.ensure_contacts() + contact = mc.get_contact_by_name(DEST) + if contact is None: + print(f"Contact '{DEST}' not found in contacts.") + return + await mc.commands.send_msg(contact,MSG) asyncio.run(main()) diff --git a/src/meshcore/commands.py b/src/meshcore/commands.py index 1532f25..85d9f9e 100644 --- a/src/meshcore/commands.py +++ b/src/meshcore/commands.py @@ -48,24 +48,25 @@ def _validate_destination(dst: DestinationType, prefix_length: int = 6) -> bytes class CommandHandler: DEFAULT_TIMEOUT = 5.0 - def __init__(self, default_timeout=None): + def __init__(self, default_timeout: Optional[float] = None): self._sender_func = None self._reader = None self.dispatcher = None self.default_timeout = default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT - def set_connection(self, connection): - async def sender(data): + def set_connection(self, connection: Any) -> None: + async def sender(data: bytes) -> None: await connection.send(data) self._sender_func = sender - def set_reader(self, reader): + def set_reader(self, reader: Any) -> None: self._reader = reader - def set_dispatcher(self, dispatcher): + def set_dispatcher(self, dispatcher: Any) -> None: self.dispatcher = dispatcher - async def send(self, data, expected_events=None, timeout=None) -> Dict[str, Any]: + async def send(self, data: bytes, expected_events: Optional[Union[EventType, List[EventType]]] = None, + timeout: Optional[float] = None) -> Dict[str, Any]: """ Send a command and wait for expected event responses. @@ -109,54 +110,54 @@ class CommandHandler: return {"success": True} - async def send_appstart(self): + async def send_appstart(self) -> Dict[str, Any]: logger.debug("Sending appstart command") b1 = bytearray(b'\x01\x03 mccli') return await self.send(b1, [EventType.SELF_INFO]) - async def send_device_query(self): + async def send_device_query(self) -> Dict[str, Any]: logger.debug("Sending device query command") return await self.send(b"\x16\x03", [EventType.DEVICE_INFO, EventType.ERROR]) - async def send_advert(self, flood=False): + async def send_advert(self, flood: bool = False) -> Dict[str, Any]: logger.debug(f"Sending advertisement command (flood={flood})") if flood: return await self.send(b"\x07\x01", [EventType.OK, EventType.ERROR]) else: return await self.send(b"\x07", [EventType.OK, EventType.ERROR]) - async def set_name(self, name): + async def set_name(self, name: str) -> Dict[str, Any]: logger.debug(f"Setting device name to: {name}") return await self.send(b'\x08' + name.encode("ascii"), [EventType.OK, EventType.ERROR]) - async def set_coords(self, lat, lon): + async def set_coords(self, lat: float, lon: float) -> Dict[str, Any]: logger.debug(f"Setting coordinates to: lat={lat}, lon={lon}") return await self.send(b'\x0e'\ + int(lat*1e6).to_bytes(4, 'little', signed=True)\ + int(lon*1e6).to_bytes(4, 'little', signed=True)\ + int(0).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) - async def reboot(self): + async def reboot(self) -> Dict[str, Any]: logger.debug("Sending reboot command") return await self.send(b'\x13reboot') - async def get_bat(self): + async def get_bat(self) -> Dict[str, Any]: logger.debug("Getting battery information") return await self.send(b'\x14', [EventType.BATTERY, EventType.ERROR]) - async def get_time(self): + async def get_time(self) -> Dict[str, Any]: logger.debug("Getting device time") return await self.send(b"\x05", [EventType.CURRENT_TIME, EventType.ERROR]) - async def set_time(self, val): + async def set_time(self, val: int) -> Dict[str, Any]: logger.debug(f"Setting device time to: {val}") return await self.send(b"\x06" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) - async def set_tx_power(self, val): + async def set_tx_power(self, val: int) -> Dict[str, Any]: logger.debug(f"Setting TX power to: {val}") return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) - async def set_radio(self, freq, bw, sf, cr): + async def set_radio(self, freq: float, bw: float, sf: int, cr: int) -> Dict[str, Any]: logger.debug(f"Setting radio params: freq={freq}, bw={bw}, sf={sf}, cr={cr}") return await self.send(b"\x0b" \ + int(float(freq)*1000).to_bytes(4, 'little')\ @@ -164,7 +165,7 @@ class CommandHandler: + int(sf).to_bytes(1, 'little')\ + int(cr).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR]) - async def set_tuning(self, rx_dly, af): + async def set_tuning(self, rx_dly: int, af: int) -> Dict[str, Any]: logger.debug(f"Setting tuning params: rx_dly={rx_dly}, af={af}") return await self.send(b"\x15" \ + int(rx_dly).to_bytes(4, 'little')\ @@ -172,36 +173,44 @@ class CommandHandler: + int(0).to_bytes(1, 'little')\ + int(0).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR]) - async def set_devicepin(self, pin): + async def set_devicepin(self, pin: int) -> Dict[str, Any]: logger.debug(f"Setting device PIN to: {pin}") return await self.send(b"\x25" \ + int(pin).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR]) - async def get_contacts(self): + async def get_contacts(self) -> Dict[str, Any]: logger.debug("Getting contacts") return await self.send(b"\x04", [EventType.CONTACTS, EventType.ERROR]) - async def reset_path(self, key): - logger.debug(f"Resetting path for contact: {key.hex() if isinstance(key, bytes) else key}") - data = b"\x0D" + key + async def reset_path(self, key: DestinationType) -> Dict[str, Any]: + key_bytes = _validate_destination(key) + logger.debug(f"Resetting path for contact: {key_bytes.hex()}") + data = b"\x0D" + key_bytes return await self.send(data, [EventType.OK, EventType.ERROR]) - async def share_contact(self, key): - logger.debug(f"Sharing contact: {key.hex() if isinstance(key, bytes) else key}") - data = b"\x10" + key + async def share_contact(self, key: DestinationType) -> Dict[str, Any]: + key_bytes = _validate_destination(key) + logger.debug(f"Sharing contact: {key_bytes.hex()}") + data = b"\x10" + key_bytes return await self.send(data, [EventType.CONTACT_SHARE, EventType.ERROR]) - async def export_contact(self, key=b""): - logger.debug(f"Exporting contact: {key.hex() if key else 'all'}") - data = b"\x11" + key + async def export_contact(self, key: Optional[DestinationType] = None) -> Dict[str, Any]: + if key: + key_bytes = _validate_destination(key) + logger.debug(f"Exporting contact: {key_bytes.hex()}") + data = b"\x11" + key_bytes + else: + logger.debug("Exporting all contacts") + data = b"\x11" return await self.send(data, [EventType.OK, EventType.ERROR]) - async def remove_contact(self, key): - logger.debug(f"Removing contact: {key.hex() if isinstance(key, bytes) else key}") - data = b"\x0f" + key + async def remove_contact(self, key: DestinationType) -> Dict[str, Any]: + key_bytes = _validate_destination(key) + logger.debug(f"Removing contact: {key_bytes.hex()}") + data = b"\x0f" + key_bytes return await self.send(data, [EventType.OK, EventType.ERROR]) - async def get_msg(self, timeout=1): + async def get_msg(self, timeout: Optional[float] = 1) -> Dict[str, Any]: logger.debug("Requesting pending messages") return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], timeout) @@ -261,7 +270,8 @@ class CommandHandler: data = b"\x32" + cmd.encode('ascii') return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR]) - async def send_trace(self, auth_code=0, tag=None, flags=0, path=None): + async def send_trace(self, auth_code: int = 0, tag: Optional[int] = None, + flags: int = 0, path: Optional[Union[str, bytes, bytearray]] = None) -> Dict[str, Any]: """ Send a trace packet to test routing through specific repeaters