meshcore-cli/mccli.py

358 lines
13 KiB
Python
Raw Normal View History

2025-02-02 17:51:09 +01:00
#!/usr/bin/python
2025-02-04 15:03:44 +01:00
"""
mccli.py : CLI interface to MeschCore BLE companion app
"""
2025-02-02 17:51:09 +01:00
import asyncio
2025-02-04 15:03:44 +01:00
import os
2025-02-02 17:51:09 +01:00
import sys
2025-02-04 15:03:44 +01:00
import getopt
2025-02-02 17:51:09 +01:00
import json
2025-02-04 09:53:07 +01:00
import datetime
2025-02-04 15:03:44 +01:00
import time
from pathlib import Path
2025-02-02 17:51:09 +01:00
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
2025-02-04 15:03:44 +01:00
from bleak.exc import BleakDeviceNotFoundError
2025-02-02 17:51:09 +01:00
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
2025-02-04 15:03:44 +01:00
# default address is stored in a config file
MCCLI_CONFIG_DIR = str(Path.home()) + "/.config/mc-cli/"
MCCLI_ADDRESS = MCCLI_CONFIG_DIR + "default_address"
# Fallback address if config file not found
# if None or "" then a scan is performed
ADDRESS = ""
2025-02-02 17:51:09 +01:00
2025-02-14 13:19:01 +01:00
def printerr (str) :
sys.stderr.write(str)
2025-02-02 17:51:09 +01:00
class MeshCore:
"""
Interface to a BLE MeshCore device
"""
self_info={}
contacts={}
def __init__(self, address):
2025-02-04 15:03:44 +01:00
""" Constructor : specify address """
self.address = address
self.client = None
2025-02-04 15:03:44 +01:00
self.rx_char = None
self.time = 0
self.result = asyncio.Future()
self.contact_nb = 0
2025-02-10 09:55:56 +01:00
self.rx_sem = asyncio.Semaphore(0)
self.ack_ev = asyncio.Event()
2025-02-02 17:51:09 +01:00
async def connect(self):
2025-02-04 15:03:44 +01:00
"""
Connects to the device
Returns : the address used for connection
"""
def match_meshcore_device(_: BLEDevice, adv: AdvertisementData):
""" Filter to mach MeshCore devices """
2025-02-14 07:26:34 +01:00
if adv.local_name.startswith("MeshCore") :
return True
return False
if self.address is None or self.address == "" :
scanner = BleakScanner()
2025-02-14 13:19:01 +01:00
printerr("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device)
2025-02-04 15:03:44 +01:00
if device is None :
return None
2025-02-14 13:19:01 +01:00
printerr(f"Found device : {device}")
self.client = BleakClient(device)
2025-02-04 15:03:44 +01:00
self.address = self.client.address
else:
self.client = BleakClient(self.address)
2025-02-04 15:03:44 +01:00
try:
await self.client.connect(disconnected_callback=self.handle_disconnect)
except BleakDeviceNotFoundError:
return None
except TimeoutError:
return None
2025-02-02 17:51:09 +01:00
await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx)
2025-02-04 15:03:44 +01:00
nus = self.client.services.get_service(UART_SERVICE_UUID)
self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID)
2025-02-02 17:51:09 +01:00
await self.send_appstart()
2025-02-14 13:19:01 +01:00
printerr("Connexion started")
2025-02-04 15:03:44 +01:00
return self.address
2025-02-02 17:51:09 +01:00
2025-02-04 15:03:44 +01:00
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
""" Callback to handle received data """
2025-02-02 17:51:09 +01:00
match data[0]:
case 0: # ok
2025-02-04 13:02:00 +01:00
if len(data) == 5 : # an integer
2025-02-02 17:51:09 +01:00
self.result.set_result(int.from_bytes(data[1:5], byteorder='little'))
else:
self.result.set_result(True)
case 1: # error
self.result.set_result(False)
case 2: # contact start
self.contact_nb = int.from_bytes(data[1:5], byteorder='little')
self.contacts={}
2025-02-04 13:02:00 +01:00
case 3: # contact
2025-02-02 17:51:09 +01:00
c = {}
2025-02-02 21:58:59 +01:00
c["public_key"] = data[1:33].hex()
2025-02-02 17:51:09 +01:00
c["type"] = data[33]
c["flags"] = data[34]
c["out_path_len"] = data[35]
2025-02-02 21:58:59 +01:00
c["out_path"] = data[36:100].hex()
2025-02-02 17:51:09 +01:00
c["adv_name"] = data[100:132].decode().replace("\0","")
c["last_advert"] = int.from_bytes(data[132:136], byteorder='little')
c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little')
c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little')
c["lastmod"] = int.from_bytes(data[144:148], byteorder='little')
self.contacts[c["adv_name"]]=c
case 4: # end of contacts
self.result.set_result(self.contacts)
case 5: # self info
self.self_info["adv_type"] = data[1]
2025-02-02 21:58:59 +01:00
self.self_info["public_key"] = data[4:36].hex()
2025-02-13 22:56:24 +01:00
self.self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)
self.self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)
self.self_info["reserved_44:48"] = data[44:48].hex()
2025-02-02 17:51:09 +01:00
self.self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little')
self.self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little')
self.self_info["radio_sf"] = data[56]
self.self_info["radio_cr"] = data[57]
self.self_info["name"] = data[58:].decode()
self.result.set_result(True)
case 6: # msg sent
res = {}
res["type"] = data[1]
res["expected_ack"] = bytes(data[2:6])
res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little')
self.result.set_result(res)
case 7: # contact msg recv
res = {}
res["type"] = "PRIV"
res["pubkey_prefix"] = data[1:7].hex()
2025-02-02 17:51:09 +01:00
res["path_len"] = data[7]
res["txt_type"] = data[8]
res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little')
res["text"] = data[13:].decode()
self.result.set_result(res)
case 8 : # chanel msg recv
res = {}
res["type"] = "CHAN"
res["pubkey_prefix"] = data[1:7].hex()
2025-02-02 17:51:09 +01:00
res["path_len"] = data[7]
res["txt_type"] = data[8]
res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little')
res["text"] = data[13:].decode()
self.result.set_result(res)
2025-02-06 17:06:16 +01:00
case 9: # current time
self.result.set_result(int.from_bytes(data[1:5], byteorder='little'))
case 10: # no more msgs
self.result.set_result(False)
2025-02-02 17:51:09 +01:00
# push notifications
case 0x80:
2025-02-14 13:19:01 +01:00
printerr ("Advertisment received")
case 0x81:
2025-02-14 13:19:01 +01:00
printerr ("Code path update")
case 0x82:
2025-02-10 09:55:56 +01:00
self.ack_ev.set()
2025-02-14 13:19:01 +01:00
printerr ("Received ACK")
case 0x83:
2025-02-10 09:55:56 +01:00
self.rx_sem.release()
2025-02-14 13:19:01 +01:00
printerr ("Msgs are waiting")
2025-02-02 17:51:09 +01:00
# unhandled
case _:
2025-02-14 13:19:01 +01:00
printerr (f"Unhandled data received {data}")
2025-02-02 17:51:09 +01:00
def handle_disconnect(self, _: BleakClient):
2025-02-04 15:03:44 +01:00
""" Callback to handle disconnection """
2025-02-14 13:19:01 +01:00
printerr ("Device was disconnected, goodbye.")
2025-02-02 17:51:09 +01:00
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
2025-02-04 11:04:23 +01:00
async def send(self, data, timeout = 5):
2025-02-04 15:03:44 +01:00
""" Helper function to synchronously send (and receive) data to the node """
2025-02-02 17:51:09 +01:00
self.result = asyncio.Future()
2025-02-04 11:04:23 +01:00
try:
await self.client.write_gatt_char(self.rx_char, bytes(data), response=False)
res = await asyncio.wait_for(self.result, timeout)
return res
except TimeoutError :
2025-02-14 13:19:01 +01:00
printerr ("Timeout ...")
2025-02-04 11:04:23 +01:00
return False
2025-02-02 17:51:09 +01:00
async def send_appstart(self):
2025-02-04 15:03:44 +01:00
""" Send APPSTART to the node """
2025-02-07 12:20:06 +01:00
b1 = bytearray(b'\x01\x03 mccli')
2025-02-02 17:51:09 +01:00
return await self.send(b1)
async def send_advert(self):
2025-02-04 15:03:44 +01:00
""" Make the node send an advertisement """
2025-02-02 17:51:09 +01:00
return await self.send(b"\x07")
async def set_name(self, name):
2025-02-04 15:03:44 +01:00
""" Changes the name of the node """
2025-02-02 17:51:09 +01:00
return await self.send(b'\x08' + name.encode("ascii"))
async def get_time(self):
2025-02-04 15:03:44 +01:00
""" Get the time (epoch) of the node """
2025-02-02 17:51:09 +01:00
self.time = await self.send(b"\x05")
return self.time
2025-02-04 09:53:07 +01:00
async def set_time(self, val):
2025-02-04 15:03:44 +01:00
""" Sets a new epoch """
2025-02-04 10:05:37 +01:00
return await self.send(b"\x06" + int(val).to_bytes(4, 'little'))
2025-02-04 09:53:07 +01:00
2025-02-02 17:51:09 +01:00
async def get_contacts(self):
2025-02-04 15:03:44 +01:00
""" Starts retreiving contacts """
2025-02-02 17:51:09 +01:00
return await self.send(b"\x04")
async def send_msg(self, dst, msg):
2025-02-04 15:03:44 +01:00
""" Send a message to a node """
2025-02-02 17:51:09 +01:00
timestamp = (await self.get_time()).to_bytes(4, 'little')
data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii")
2025-02-10 09:55:56 +01:00
self.ack_ev.clear()
2025-02-02 17:51:09 +01:00
return await self.send(data)
async def get_msg(self):
2025-02-04 15:03:44 +01:00
""" Get message from the node (stored in queue) """
2025-02-10 09:55:56 +01:00
res = await self.send(b"\x0A", 1)
if res is False :
self.rx_sem=asyncio.Semaphore(0) # reset semaphore as there are no msgs in queue
return res
async def wait_msg(self):
""" Wait for a message """
await self.rx_sem.acquire()
async def wait_ack(self):
""" Wait ack """
await self.ack_ev.wait()
2025-02-02 17:51:09 +01:00
2025-02-04 10:05:37 +01:00
async def next_cmd(mc, cmds):
2025-02-04 15:03:44 +01:00
""" process next command """
2025-02-04 10:05:37 +01:00
argnum = 0
match cmds[0] :
2025-02-04 09:53:07 +01:00
case "get_time" :
timestamp = await mc.get_time()
print('Current time :'
f' {datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")}'
f' ({timestamp})')
case "sync_time" :
print(await mc.set_time(int(time.time())))
case "set_time" :
2025-02-04 10:05:37 +01:00
argnum = 1
print(await mc.set_time(cmds[1]))
2025-02-04 13:02:00 +01:00
case "send" :
2025-02-04 10:05:37 +01:00
argnum = 2
print(await mc.send_msg(bytes.fromhex(cmds[1]), cmds[2]))
2025-02-04 15:03:44 +01:00
case "sendto" : # sends to a contact from name
2025-02-04 10:05:37 +01:00
argnum = 2
2025-02-02 17:51:09 +01:00
await mc.get_contacts()
2025-02-04 15:03:44 +01:00
print(await mc.send_msg(bytes.fromhex(mc.contacts[cmds[1]]["public_key"])[0:6],
cmds[2]))
2025-02-02 18:12:26 +01:00
case "contacts" :
2025-02-02 21:58:59 +01:00
print(json.dumps(await mc.get_contacts(),indent=4))
2025-02-02 18:12:26 +01:00
case "recv" :
2025-02-02 17:51:09 +01:00
print(await mc.get_msg())
case "sync_msgs" :
res=True
while res:
res = await mc.get_msg()
print (res)
2025-02-10 09:55:56 +01:00
case "wait_msg" :
await mc.wait_msg()
res = await mc.get_msg()
print (res)
case "wait_ack" :
print (await mc.wait_ack())
2025-02-02 18:12:26 +01:00
case "infos" :
2025-02-02 17:51:09 +01:00
print(mc.self_info)
2025-02-02 18:12:26 +01:00
case "advert" :
print(await mc.send_advert())
case "set_name" :
2025-02-04 10:05:37 +01:00
argnum = 1
print(await mc.set_name(cmds[1]))
case "sleep" :
2025-02-04 10:05:37 +01:00
argnum = 1
await asyncio.sleep(int(cmds[1]))
2025-02-14 13:19:01 +01:00
printerr (f"cmd {cmds[0:argnum+1]} processed ...")
2025-02-04 10:05:37 +01:00
return cmds[argnum+1:]
2025-02-04 13:02:00 +01:00
def usage () :
2025-02-04 15:03:44 +01:00
""" Prints some help """
2025-02-04 15:16:08 +01:00
print("""mccli.py : CLI interface to MeschCore BLE companion app
2025-02-04 13:02:00 +01:00
2025-02-04 15:16:08 +01:00
Usage : mccli.py <args> <commands>
2025-02-04 13:02:00 +01:00
Arguments :
-h : prints this help
-a <address> : specifies device address
-s : forces ble scan for a MeshCore device
Available Commands (can be chained) :
2025-02-04 15:17:22 +01:00
infos : print informations about the node
2025-02-04 13:02:00 +01:00
send <key> <msg> : sends msg to the node with pubkey starting by key
sendto <name> <msg> : sends msg to the node with given name
2025-02-10 09:55:56 +01:00
wait_ack : wait an ack for last sent msg
2025-02-04 13:02:00 +01:00
recv : reads next msg
sync_msgs : gets all unread msgs from the node
2025-02-10 09:55:56 +01:00
wait_msg : wait for a message
2025-02-04 13:02:00 +01:00
advert : sends advert
contacts : gets contact list
sync_time : sync time with system
set_time <epoch> : sets time to given epoch
get_time : gets current time
set_name <name> : sets node name
2025-02-04 15:03:44 +01:00
sleep <secs> : sleeps for a given amount of secs""")
2025-02-04 13:02:00 +01:00
async def main(argv):
2025-02-04 15:03:44 +01:00
""" Do the job """
address = ADDRESS
2025-02-04 15:03:44 +01:00
# If there is an address in config file, use it by default
# unless an arg is explicitely given
if os.path.exists(MCCLI_ADDRESS) :
with open(MCCLI_ADDRESS, encoding="utf-8") as f :
address = f.readline().strip()
2025-02-04 15:03:44 +01:00
opts, args = getopt.getopt(argv, "a:sh")
for opt, arg in opts :
match opt:
case "-a" : # address specified on cmdline
address = arg
case "-s" : # explicitely ask to scan address
address = None
2025-02-04 10:05:37 +01:00
2025-02-04 15:03:44 +01:00
if len(args) == 0 : # no args, no action
2025-02-04 13:02:00 +01:00
usage()
2025-02-04 10:05:37 +01:00
return
mc = MeshCore(address)
2025-02-04 15:03:44 +01:00
address = await mc.connect()
if address is None or address == "" : # no device, no action
2025-02-14 13:19:01 +01:00
printerr ("No device found, exiting ...")
2025-02-04 15:03:44 +01:00
return
# Store device address in configuration
if os.path.isdir(MCCLI_CONFIG_DIR) :
with open(MCCLI_ADDRESS, "w", encoding="utf-8") as f :
f.write(address)
2025-02-04 10:05:37 +01:00
cmds = args
while len(cmds) > 0 :
2025-02-04 10:05:37 +01:00
cmds = await next_cmd(mc, cmds)
2025-02-02 17:51:09 +01:00
2025-02-04 15:03:44 +01:00
asyncio.run(main(sys.argv[1:]))