add the ability to subscribe to LOGIN_SUCCESS event

This commit is contained in:
Alex Wolden 2025-05-12 22:35:53 -07:00
parent b3cf2ba7b4
commit 8d805de78d
5 changed files with 117 additions and 9 deletions

View file

@ -0,0 +1,91 @@
#!/usr/bin/python
import asyncio
import argparse
import logging
from math import log
from meshcore import MeshCore
from meshcore.events import EventType
# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='Get status from a repeater via TCP connection')
parser.add_argument('-host', '--hostname', required=True, help='TCP hostname or IP address')
parser.add_argument('-port', '--port', type=int, required=True, help='TCP port number')
parser.add_argument('-r', '--repeater', required=True, help='Repeater name')
parser.add_argument('-pw', '--password', required=True, help='Password for login')
args = parser.parse_args()
# Connect to the device
print(f"Connecting to TCP {args.hostname}:{args.port}...")
mc = await MeshCore.create_tcp(args.hostname, args.port, debug=True)
try:
# Set up a simple event handler to log all events
async def log_event(event):
print(f"EVENT: {event.type.name} - Payload: {event.payload}")
# Subscribe to login events
mc.subscribe(EventType.LOGIN_SUCCESS, log_event)
mc.subscribe(EventType.LOGIN_FAILED, log_event)
mc.subscribe(EventType.STATUS_RESPONSE, log_event)
# Get contacts
await mc.ensure_contacts()
repeater = mc.get_contact_by_name(args.repeater)
if repeater is None:
print(f"Repeater '{args.repeater}' not found in contacts.")
print(f"Available contacts: {mc.contacts}")
return
print(f"Found repeater: {repeater}")
# Send login request
print(f"Sending login request to '{args.repeater}'...")
login_cmd = await mc.commands.send_login(repeater, args.password)
if login_cmd.type == EventType.ERROR:
print(f"Login failed: {login_cmd.payload}")
return
filter = {"pubkey_prefix": repeater["public_key"][0:12]}
login_result = await mc.wait_for_event(EventType.LOGIN_SUCCESS, filter, timeout=10)
print(f"Login result: {login_result}")
# Wait a bit for the login response
print("Waiting for login events...")
await asyncio.sleep(3)
# Send status request
print("Sending status request...")
await mc.commands.send_statusreq(repeater)
# Wait for status response
print("Waiting for status response event...")
status_event = await mc.wait_for_event(EventType.STATUS_RESPONSE, timeout=5.0)
if status_event:
print(f"Status response received: {status_event.payload}")
else:
print("No status response received within timeout")
finally:
# Always disconnect properly
print("Disconnecting...")
await mc.disconnect()
print("Disconnected from device")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nOperation cancelled by user")
except Exception as e:
print(f"Error: {e}")

View file

@ -96,4 +96,4 @@ class BLEConnection:
"""Disconnect from the BLE device."""
if self.client and self.client.is_connected:
await self.client.disconnect()
logger.info("BLE Connection closed")
logger.debug("BLE Connection closed")

View file

@ -251,14 +251,31 @@ class MessageReader:
await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res))
elif packet_type_value == PacketType.LOGIN_SUCCESS.value:
logger.debug("Login success")
# TODO: Read login attributes
await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, {}))
res = {}
if len(data) > 1:
res["permissions"] = data[1]
res["is_admin"] = (data[1] & 1) == 1 # Check if admin bit is set
if len(data) > 7:
res["pubkey_prefix"] = data[2:8].hex()
attributes = {
"pubkey_prefix": res.get("pubkey_prefix")
}
await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, res, attributes))
elif packet_type_value == PacketType.LOGIN_FAILED.value:
logger.debug("Login failed")
# TODO: Read login attributes
await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, {}))
res = {}
if len(data) > 7:
res["pubkey_prefix"] = data[2:8].hex()
attributes = {
"pubkey_prefix": res.get("pubkey_prefix")
}
await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, res, attributes))
elif packet_type_value == PacketType.STATUS_RESPONSE.value:
res = {}

View file

@ -93,4 +93,4 @@ class SerialConnection:
if self.transport:
self.transport.close()
self.transport = None
logger.info("Serial Connection closed")
logger.debug("Serial Connection closed")

View file

@ -91,4 +91,4 @@ class TCPConnection:
if self.transport:
self.transport.close()
self.transport = None
logger.info("TCP Connection closed")
logger.debug("TCP Connection closed")