Added additional typing and typing fixes

This commit is contained in:
Alex Wolden 2025-04-13 22:37:00 -07:00
parent a1fb931200
commit 84de232f4b
3 changed files with 51 additions and 37 deletions

View file

@ -15,6 +15,6 @@ async def main () :
mc = MeshCore(con)
await mc.connect()
await mc.send_chan_msg(0, MSG)
await mc.commands.send_chan_msg(0, MSG)
asyncio.run(main())

View file

@ -14,7 +14,11 @@ async def main () :
mc = MeshCore(con)
await mc.connect()
await mc.get_contacts()
await mc.commands.send_msg(bytes.fromhex(mc.contacts[DEST]["public_key"])[0:6],MSG)
await mc.ensure_contacts()
contact = mc.get_contact_by_name(DEST)
if contact is None:
print(f"Contact '{DEST}' not found in contacts.")
return
await mc.commands.send_msg(contact,MSG)
asyncio.run(main())

View file

@ -48,24 +48,25 @@ def _validate_destination(dst: DestinationType, prefix_length: int = 6) -> bytes
class CommandHandler:
DEFAULT_TIMEOUT = 5.0
def __init__(self, default_timeout=None):
def __init__(self, default_timeout: Optional[float] = None):
self._sender_func = None
self._reader = None
self.dispatcher = None
self.default_timeout = default_timeout if default_timeout is not None else self.DEFAULT_TIMEOUT
def set_connection(self, connection):
async def sender(data):
def set_connection(self, connection: Any) -> None:
async def sender(data: bytes) -> None:
await connection.send(data)
self._sender_func = sender
def set_reader(self, reader):
def set_reader(self, reader: Any) -> None:
self._reader = reader
def set_dispatcher(self, dispatcher):
def set_dispatcher(self, dispatcher: Any) -> None:
self.dispatcher = dispatcher
async def send(self, data, expected_events=None, timeout=None) -> Dict[str, Any]:
async def send(self, data: bytes, expected_events: Optional[Union[EventType, List[EventType]]] = None,
timeout: Optional[float] = None) -> Dict[str, Any]:
"""
Send a command and wait for expected event responses.
@ -109,54 +110,54 @@ class CommandHandler:
return {"success": True}
async def send_appstart(self):
async def send_appstart(self) -> Dict[str, Any]:
logger.debug("Sending appstart command")
b1 = bytearray(b'\x01\x03 mccli')
return await self.send(b1, [EventType.SELF_INFO])
async def send_device_query(self):
async def send_device_query(self) -> Dict[str, Any]:
logger.debug("Sending device query command")
return await self.send(b"\x16\x03", [EventType.DEVICE_INFO, EventType.ERROR])
async def send_advert(self, flood=False):
async def send_advert(self, flood: bool = False) -> Dict[str, Any]:
logger.debug(f"Sending advertisement command (flood={flood})")
if flood:
return await self.send(b"\x07\x01", [EventType.OK, EventType.ERROR])
else:
return await self.send(b"\x07", [EventType.OK, EventType.ERROR])
async def set_name(self, name):
async def set_name(self, name: str) -> Dict[str, Any]:
logger.debug(f"Setting device name to: {name}")
return await self.send(b'\x08' + name.encode("ascii"), [EventType.OK, EventType.ERROR])
async def set_coords(self, lat, lon):
async def set_coords(self, lat: float, lon: float) -> Dict[str, Any]:
logger.debug(f"Setting coordinates to: lat={lat}, lon={lon}")
return await self.send(b'\x0e'\
+ int(lat*1e6).to_bytes(4, 'little', signed=True)\
+ int(lon*1e6).to_bytes(4, 'little', signed=True)\
+ int(0).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def reboot(self):
async def reboot(self) -> Dict[str, Any]:
logger.debug("Sending reboot command")
return await self.send(b'\x13reboot')
async def get_bat(self):
async def get_bat(self) -> Dict[str, Any]:
logger.debug("Getting battery information")
return await self.send(b'\x14', [EventType.BATTERY, EventType.ERROR])
async def get_time(self):
async def get_time(self) -> Dict[str, Any]:
logger.debug("Getting device time")
return await self.send(b"\x05", [EventType.CURRENT_TIME, EventType.ERROR])
async def set_time(self, val):
async def set_time(self, val: int) -> Dict[str, Any]:
logger.debug(f"Setting device time to: {val}")
return await self.send(b"\x06" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def set_tx_power(self, val):
async def set_tx_power(self, val: int) -> Dict[str, Any]:
logger.debug(f"Setting TX power to: {val}")
return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def set_radio(self, freq, bw, sf, cr):
async def set_radio(self, freq: float, bw: float, sf: int, cr: int) -> Dict[str, Any]:
logger.debug(f"Setting radio params: freq={freq}, bw={bw}, sf={sf}, cr={cr}")
return await self.send(b"\x0b" \
+ int(float(freq)*1000).to_bytes(4, 'little')\
@ -164,7 +165,7 @@ class CommandHandler:
+ int(sf).to_bytes(1, 'little')\
+ int(cr).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR])
async def set_tuning(self, rx_dly, af):
async def set_tuning(self, rx_dly: int, af: int) -> Dict[str, Any]:
logger.debug(f"Setting tuning params: rx_dly={rx_dly}, af={af}")
return await self.send(b"\x15" \
+ int(rx_dly).to_bytes(4, 'little')\
@ -172,36 +173,44 @@ class CommandHandler:
+ int(0).to_bytes(1, 'little')\
+ int(0).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR])
async def set_devicepin(self, pin):
async def set_devicepin(self, pin: int) -> Dict[str, Any]:
logger.debug(f"Setting device PIN to: {pin}")
return await self.send(b"\x25" \
+ int(pin).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def get_contacts(self):
async def get_contacts(self) -> Dict[str, Any]:
logger.debug("Getting contacts")
return await self.send(b"\x04", [EventType.CONTACTS, EventType.ERROR])
async def reset_path(self, key):
logger.debug(f"Resetting path for contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x0D" + key
async def reset_path(self, key: DestinationType) -> Dict[str, Any]:
key_bytes = _validate_destination(key)
logger.debug(f"Resetting path for contact: {key_bytes.hex()}")
data = b"\x0D" + key_bytes
return await self.send(data, [EventType.OK, EventType.ERROR])
async def share_contact(self, key):
logger.debug(f"Sharing contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x10" + key
async def share_contact(self, key: DestinationType) -> Dict[str, Any]:
key_bytes = _validate_destination(key)
logger.debug(f"Sharing contact: {key_bytes.hex()}")
data = b"\x10" + key_bytes
return await self.send(data, [EventType.CONTACT_SHARE, EventType.ERROR])
async def export_contact(self, key=b""):
logger.debug(f"Exporting contact: {key.hex() if key else 'all'}")
data = b"\x11" + key
async def export_contact(self, key: Optional[DestinationType] = None) -> Dict[str, Any]:
if key:
key_bytes = _validate_destination(key)
logger.debug(f"Exporting contact: {key_bytes.hex()}")
data = b"\x11" + key_bytes
else:
logger.debug("Exporting all contacts")
data = b"\x11"
return await self.send(data, [EventType.OK, EventType.ERROR])
async def remove_contact(self, key):
logger.debug(f"Removing contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x0f" + key
async def remove_contact(self, key: DestinationType) -> Dict[str, Any]:
key_bytes = _validate_destination(key)
logger.debug(f"Removing contact: {key_bytes.hex()}")
data = b"\x0f" + key_bytes
return await self.send(data, [EventType.OK, EventType.ERROR])
async def get_msg(self, timeout=1):
async def get_msg(self, timeout: Optional[float] = 1) -> Dict[str, Any]:
logger.debug("Requesting pending messages")
return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], timeout)
@ -261,7 +270,8 @@ class CommandHandler:
data = b"\x32" + cmd.encode('ascii')
return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR])
async def send_trace(self, auth_code=0, tag=None, flags=0, path=None):
async def send_trace(self, auth_code: int = 0, tag: Optional[int] = None,
flags: int = 0, path: Optional[Union[str, bytes, bytearray]] = None) -> Dict[str, Any]:
"""
Send a trace packet to test routing through specific repeaters