This commit is contained in:
Florent de Lamotte 2025-02-04 15:03:44 +01:00
parent a62cf6431a
commit 85e9593333
2 changed files with 85 additions and 29 deletions

View file

@ -1,11 +1,11 @@
# mc-cli
# mccli
mc-cli.py : CLI interface to MeschCore BLE companion app
mccli.py : CLI interface to MeschCore BLE companion app
## Usage
<pre>
$ mc-cli.py &lt;args&gt; &lt;commands&gt;
$ mccli.py &lt;args&gt; &lt;commands&gt;
</pre>
### Arguments
@ -40,14 +40,14 @@ Commands are given after arguments, they can be chained.
### Examples
<pre>
$ ./mc-cli.py -s infos
$ ./mccli.py -s infos
Scanning for devices
Found device : F0:F5:BD:4F:9B:AD: MeshCore
Connexion started
{'adv_type': 1, 'public_key': '54c11cff0c2a861cfc5b0bd6e4b81cd5e6ca85e058bf53932d86c87dc7a20011', 'device_loc': '000000000000000000000000', 'radio_freq': 867500, 'radio_bw': 250000, 'radio_sf': 10, 'radio_cr': 5, 'name': 'toto'}
cmd ['infos'] processed ...
$ ./mc-cli.py -a F0:F5:BD:4F:9B:AD get_time
$ ./mccli.py -a F0:F5:BD:4F:9B:AD get_time
Connexion started
Current time : 2024-05-15 12:52:53 (1715770373)
cmd ['get_time'] processed ...
@ -55,19 +55,19 @@ cmd ['get_time'] processed ...
$ date
Tue Feb 4 12:55:05 CET 2025
$ ./mc-cli.py -a F0:F5:BD:4F:9B:AD sync_time get_time
$ ./mccli.py -a F0:F5:BD:4F:9B:AD sync_time get_time
Connexion started
True
cmd ['sync_time'] processed ...
Current time : 2025-02-04 12:55:24 (1738670124)
cmd ['get_time'] processed ...
$ ./mc-cli.py -a F0:F5:BD:4F:9B:AD contacts
$ ./mccli.py -a F0:F5:BD:4F:9B:AD contacts
Connexion started
{}
cmd ['contacts'] processed ...
$ ./mc-cli.py -a F0:F5:BD:4F:9B:AD sleep 10 contacts
$ ./mccli.py -a F0:F5:BD:4F:9B:AD sleep 10 contacts
Connexion started
Advertisment received
cmd ['sleep', '10'] processed ...
@ -87,7 +87,7 @@ cmd ['sleep', '10'] processed ...
}
cmd ['contacts'] processed ...
$ ./mc-cli.py -a F0:F5:BD:4F:9B:AD sendto flo2 "Hello flo2" sleep 10
$ ./mccli.py -a F0:F5:BD:4F:9B:AD sendto flo2 "Hello flo2" sleep 10
Connexion started
{'type': 1, 'expected_ack': b'9\x05\x0c\x12', 'suggested_timeout': 3260}
cmd ['sendto', 'flo2', 'Hello flo2'] processed ...
@ -96,7 +96,7 @@ Received ACK
Msgs are waiting
cmd ['sleep', '10'] processed ...
$ ./mc-cli.py -a F0:F5:BD:4F:9B:AD recv
$ ./mccli.py -a F0:F5:BD:4F:9B:AD recv
Connexion started
{'type': 'PRIV', 'pubkey_prefix': 'd6e43f8e9ef2', 'path_len': 255, 'txt_type': 0, 'sender_timestamp': 1738670421, 'text': 'hi'}
cmd ['recv'] processed ...

View file

@ -1,23 +1,32 @@
#!/usr/bin/python
"""
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import os
import sys
import json
import time
import datetime
import getopt
import json
import datetime
import time
from pathlib import Path
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bleak.exc import BleakDeviceNotFoundError
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
# BLE adress of the device
# default address is stored in a config file
MCCLI_CONFIG_DIR = str(Path.home()) + "/.config/mc-cli/"
MCCLI_ADDRESS = MCCLI_CONFIG_DIR + "default_address"
# Fallback address if config file not found
# if None or "" then a scan is performed
#ADDRESS = "F0:F5:BD:4F:9B:AD"
ADDRESS = ""
class MeshCore:
@ -28,11 +37,22 @@ class MeshCore:
contacts={}
def __init__(self, address):
""" Constructor : specify address """
self.address = address
self.client = None
self.rx_char = None
self.time = 0
self.result = asyncio.Future()
self.contact_nb = 0
async def connect(self):
def match_meshcore_device(device: BLEDevice, adv: AdvertisementData):
"""
Connects to the device
Returns : the address used for connection
"""
def match_meshcore_device(_: BLEDevice, adv: AdvertisementData):
""" Filter to mach MeshCore devices """
if adv.local_name == "MeshCore" :
return True
return False
@ -41,24 +61,33 @@ class MeshCore:
scanner = BleakScanner()
print("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device)
if device is None :
return None
print(f"Found device : {device}")
self.client = BleakClient(device)
self.address = self.client.address
else:
self.client = BleakClient(self.address)
result = asyncio.Future()
await self.client.connect(disconnected_callback=self.handle_disconnect)
try:
await self.client.connect(disconnected_callback=self.handle_disconnect)
except BleakDeviceNotFoundError:
return None
except TimeoutError:
return None
await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx)
self.loop = asyncio.get_running_loop()
self.nus = self.client.services.get_service(UART_SERVICE_UUID)
self.rx_char = self.nus.get_characteristic(UART_RX_CHAR_UUID)
nus = self.client.services.get_service(UART_SERVICE_UUID)
self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID)
await self.send_appstart()
print("Connexion started")
return self.address
def handle_rx(self, charac: BleakGATTCharacteristic, data: bytearray):
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
""" Callback to handle received data """
match data[0]:
case 0: # ok
if len(data) == 5 : # an integer
@ -133,12 +162,14 @@ class MeshCore:
print(f"Unhandled data received {data}")
def handle_disconnect(self, _: BleakClient):
""" Callback to handle disconnection """
print("Device was disconnected, goodbye.")
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
async def send(self, data, timeout = 5):
""" Helper function to synchronously send (and receive) data to the node """
self.result = asyncio.Future()
try:
await self.client.write_gatt_char(self.rx_char, bytes(data), response=False)
@ -149,34 +180,43 @@ class MeshCore:
return False
async def send_appstart(self):
""" Send APPSTART to the node """
b1 = bytearray(b'\x01\x03 TEST')
return await self.send(b1)
async def send_advert(self):
""" Make the node send an advertisement """
return await self.send(b"\x07")
async def set_name(self, name):
""" Changes the name of the node """
return await self.send(b'\x08' + name.encode("ascii"))
async def get_time(self):
""" Get the time (epoch) of the node """
self.time = await self.send(b"\x05")
return self.time
async def set_time(self, val):
""" Sets a new epoch """
return await self.send(b"\x06" + int(val).to_bytes(4, 'little'))
async def get_contacts(self):
""" Starts retreiving contacts """
return await self.send(b"\x04")
async def send_msg(self, dst, msg):
""" Send a message to a node """
timestamp = (await self.get_time()).to_bytes(4, 'little')
data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii")
return await self.send(data)
async def get_msg(self):
""" Get message from the node (stored in queue) """
return await self.send(b"\x0A", 1)
async def next_cmd(mc, cmds):
""" process next command """
argnum = 0
match cmds[0] :
case "get_time" :
@ -192,10 +232,11 @@ async def next_cmd(mc, cmds):
case "send" :
argnum = 2
print(await mc.send_msg(bytes.fromhex(cmds[1]), cmds[2]))
case "sendto" : # sends to a name (need to get contacts first so can take time, contacts should be cached to file ...)
case "sendto" : # sends to a contact from name
argnum = 2
await mc.get_contacts()
print(await mc.send_msg(bytes.fromhex(mc.contacts[cmds[1]]["public_key"])[0:6], cmds[2]))
print(await mc.send_msg(bytes.fromhex(mc.contacts[cmds[1]]["public_key"])[0:6],
cmds[2]))
case "contacts" :
print(json.dumps(await mc.get_contacts(),indent=4))
case "recv" :
@ -220,6 +261,7 @@ async def next_cmd(mc, cmds):
return cmds[argnum+1:]
def usage () :
""" Prints some help """
print("""mc-cli.py : CLI interface to MeschCore BLE companion app
Usage : mc-cli.py <args> <commands>
@ -241,12 +283,18 @@ def usage () :
set_time <epoch> : sets time to given epoch
get_time : gets current time
set_name <name> : sets node name
sleep <secs> : sleeps for a given amount of secs""");
sleep <secs> : sleeps for a given amount of secs""")
async def main(argv):
""" Do the job """
address = ADDRESS
# If there is an address in config file, use it by default
# unless an arg is explicitely given
if os.path.exists(MCCLI_ADDRESS) :
with open(MCCLI_ADDRESS, encoding="utf-8") as f :
address = f.readline().strip()
opts, args = getopt.getopt(sys.argv[1:], "a:sh")
opts, args = getopt.getopt(argv, "a:sh")
for opt, arg in opts :
match opt:
case "-a" : # address specified on cmdline
@ -254,15 +302,23 @@ async def main(argv):
case "-s" : # explicitely ask to scan address
address = None
if len(args) == 0 :
if len(args) == 0 : # no args, no action
usage()
return
mc = MeshCore(address)
await mc.connect()
address = await mc.connect()
if address is None or address == "" : # no device, no action
print("No device found, exiting ...")
return
# Store device address in configuration
if os.path.isdir(MCCLI_CONFIG_DIR) :
with open(MCCLI_ADDRESS, "w", encoding="utf-8") as f :
f.write(address)
cmds = args
while len(cmds) > 0 :
cmds = await next_cmd(mc, cmds)
asyncio.run(main(sys.argv))
asyncio.run(main(sys.argv[1:]))