Add options for specifying address or scan for device

This commit is contained in:
Florent de Lamotte 2025-02-04 11:58:06 +01:00
parent 00d3fb7c4a
commit a40375c442

View file

@ -1,13 +1,10 @@
#!/usr/bin/python
import asyncio
import sys
import json
import time
import datetime
from itertools import count, takewhile
from typing import Iterator
import getopt
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
@ -18,7 +15,10 @@ UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
ADDRESS = "F0:F5:BD:4F:9B:AD"
# BLE adress of the device
# if None or "" then a scan is performed
#ADDRESS = "F0:F5:BD:4F:9B:AD"
ADDRESS = ""
class MeshCore:
"""
@ -28,9 +28,26 @@ class MeshCore:
contacts={}
def __init__(self, address):
self.client = BleakClient(address)
self.address = address
self.client = None
async def connect(self):
def match_meshcore_device(device: BLEDevice, adv: AdvertisementData):
if adv.local_name == "MeshCore" :
return True
return False
if self.address is None or self.address == "" :
scanner = BleakScanner()
print("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device)
print(f"Found device : {device}")
self.client = BleakClient(device)
else:
self.client = BleakClient(self.address)
result = asyncio.Future()
await self.client.connect(disconnected_callback=self.handle_disconnect)
await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx)
@ -220,17 +237,26 @@ async def next_cmd(mc, cmds):
print (f"cmd {cmds[0:argnum+1]} processed ...")
return cmds[argnum+1:]
async def main(args):
async def main(argv):
address = ADDRESS
if len(args) < 2:
opts, args = getopt.getopt(sys.argv[1:], "a:s")
for opt, arg in opts :
match opt:
case "-a" : # address specified on cmdline
address = arg
case "-s" : # explicitely ask to scan address
address = None
if len(args) == 0 :
print("Commands : send, sendto, recv, contacts, infos")
return
mc = MeshCore(ADDRESS)
mc = MeshCore(address)
await mc.connect()
cmds = args[1:]
while len(cmds)>0 :
cmds = args
while len(cmds) > 0 :
cmds = await next_cmd(mc, cmds)
asyncio.run(main(sys.argv))