From a40375c442b250b8756d2498ff75e73cdbd1fd14 Mon Sep 17 00:00:00 2001 From: Florent de Lamotte Date: Tue, 4 Feb 2025 11:58:06 +0100 Subject: [PATCH] Add options for specifying address or scan for device --- mc-cli.py | 48 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/mc-cli.py b/mc-cli.py index d27296b..0cf26ef 100755 --- a/mc-cli.py +++ b/mc-cli.py @@ -1,13 +1,10 @@ #!/usr/bin/python - import asyncio import sys import json import time import datetime - -from itertools import count, takewhile -from typing import Iterator +import getopt from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic @@ -18,7 +15,10 @@ UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E" UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E" -ADDRESS = "F0:F5:BD:4F:9B:AD" +# BLE adress of the device +# if None or "" then a scan is performed +#ADDRESS = "F0:F5:BD:4F:9B:AD" +ADDRESS = "" class MeshCore: """ @@ -28,9 +28,26 @@ class MeshCore: contacts={} def __init__(self, address): - self.client = BleakClient(address) + self.address = address + self.client = None async def connect(self): + + + def match_meshcore_device(device: BLEDevice, adv: AdvertisementData): + if adv.local_name == "MeshCore" : + return True + return False + + if self.address is None or self.address == "" : + scanner = BleakScanner() + print("Scanning for devices") + device = await scanner.find_device_by_filter(match_meshcore_device) + print(f"Found device : {device}") + self.client = BleakClient(device) + else: + self.client = BleakClient(self.address) + result = asyncio.Future() await self.client.connect(disconnected_callback=self.handle_disconnect) await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx) @@ -220,17 +237,26 @@ async def next_cmd(mc, cmds): print (f"cmd {cmds[0:argnum+1]} processed ...") return cmds[argnum+1:] -async def main(args): +async def main(argv): + address = ADDRESS - if len(args) < 2: + opts, args = getopt.getopt(sys.argv[1:], "a:s") + for opt, arg in opts : + match opt: + case "-a" : # address specified on cmdline + address = arg + case "-s" : # explicitely ask to scan address + address = None + + if len(args) == 0 : print("Commands : send, sendto, recv, contacts, infos") return - mc = MeshCore(ADDRESS) + mc = MeshCore(address) await mc.connect() - cmds = args[1:] - while len(cmds)>0 : + cmds = args + while len(cmds) > 0 : cmds = await next_cmd(mc, cmds) asyncio.run(main(sys.argv))