Split mclib into different modules

This commit is contained in:
Florent 2025-04-04 21:51:34 +02:00
parent fe0eec924e
commit 0e7c04b740
6 changed files with 250 additions and 229 deletions

View file

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "meshcore"
version = "0.3.3"
version = "0.4"
authors = [
{ name="Florent de Lamotte", email="florent@frizoncorrea.fr" },
]

View file

@ -1,5 +1,5 @@
from meshcore.mclib import printerr
from meshcore.mclib import MeshCore
from meshcore.mclib import TCPConnection
from meshcore.mclib import BLEConnection
from meshcore.mclib import SerialConnection
from meshcore.meshcore import printerr
from meshcore.meshcore import MeshCore
from meshcore.tcp_cx import TCPConnection
from meshcore.ble_cx import BLEConnection
from meshcore.serial_cx import SerialConnection

83
src/meshcore/ble_cx.py Normal file
View file

@ -0,0 +1,83 @@
"""
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import sys
from meshcore import printerr
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bleak.exc import BleakDeviceNotFoundError
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
class BLEConnection:
def __init__(self, address):
""" Constructor : specify address """
self.address = address
self.client = None
self.rx_char = None
self.mc = None
async def connect(self):
"""
Connects to the device
Returns : the address used for connection
"""
def match_meshcore_device(_: BLEDevice, adv: AdvertisementData):
""" Filter to mach MeshCore devices """
if not adv.local_name is None\
and adv.local_name.startswith("MeshCore")\
and (self.address is None or self.address in adv.local_name) :
return True
return False
if self.address is None or self.address == "" or len(self.address.split(":")) != 6 :
scanner = BleakScanner()
printerr("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device)
if device is None :
return None
printerr(f"Found device : {device}")
self.client = BleakClient(device)
self.address = self.client.address
else:
self.client = BleakClient(self.address)
try:
await self.client.connect(disconnected_callback=self.handle_disconnect)
except BleakDeviceNotFoundError:
return None
except TimeoutError:
return None
await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx)
nus = self.client.services.get_service(UART_SERVICE_UUID)
self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID)
printerr("BLE Connexion started")
return self.address
def handle_disconnect(self, _: BleakClient):
""" Callback to handle disconnection """
printerr ("Device was disconnected, goodbye.")
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
def set_mc(self, mc) :
self.mc = mc
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
if not self.mc is None:
self.mc.handle_rx(data)
async def send(self, data):
await self.client.write_gatt_char(self.rx_char, bytes(data), response=False)

View file

@ -3,235 +3,12 @@
"""
import asyncio
import sys
import serial_asyncio
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bleak.exc import BleakDeviceNotFoundError
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
def printerr (s) :
sys.stderr.write(str(s))
sys.stderr.write("\n")
sys.stderr.flush()
class SerialConnection:
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self.frame_started = False
self.frame_size = 0
self.header = b""
self.inframe = b""
class MCSerialClientProtocol(asyncio.Protocol):
def __init__(self, cx):
self.cx = cx
def connection_made(self, transport):
self.cx.transport = transport
# printerr('port opened')
transport.serial.rts = False # You can manipulate Serial object via transport
def data_received(self, data):
# printerr('data received')
self.cx.handle_rx(data)
def connection_lost(self, exc):
printerr('port closed')
def pause_writing(self):
printerr('pause writing')
def resume_writing(self):
printerr('resume writing')
async def connect(self):
"""
Connects to the device
"""
loop = asyncio.get_running_loop()
await serial_asyncio.create_serial_connection(
loop, lambda: self.MCSerialClientProtocol(self),
self.port, baudrate=self.baudrate)
printerr("Serial Connexion started")
return self.port
def set_mc(self, mc) :
self.mc = mc
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
framelen = len(self.inframe)
if not self.frame_started : # wait start of frame
if len(data) >= 3 - headerlen:
self.header = self.header + data[:3-headerlen]
self.frame_started = True
self.frame_size = int.from_bytes(self.header[1:], byteorder='little')
self.handle_rx(data[3-headerlen:])
else:
self.header = self.header + data
else:
if framelen + len(data) < self.frame_size:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
self.frame_started = False
self.header = b""
self.inframe = b""
if framelen + len(data) > self.frame_size:
self.handle_rx(data[self.frame_size-framelen:])
async def send(self, data):
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
# printerr(f"sending pkt : {pkt}")
self.transport.write(pkt)
class TCPConnection:
def __init__(self, host, port):
self.host = host
self.port = port
self.transport = None
self.frame_started = False
self.frame_size = 0
self.header = b""
self.inframe = b""
class MCClientProtocol:
def __init__(self, cx):
self.cx = cx
def connection_made(self, transport):
self.cx.transport = transport
def data_received(self, data):
self.cx.handle_rx(data)
def error_received(self, exc):
printerr(f'Error received: {exc}')
def connection_lost(self, exc):
printerr('The server closed the connection')
async def connect(self):
"""
Connects to the device
"""
loop = asyncio.get_running_loop()
await loop.create_connection(
lambda: self.MCClientProtocol(self),
self.host, self.port)
printerr("TCP Connexion started")
return self.host
def set_mc(self, mc) :
self.mc = mc
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
framelen = len(self.inframe)
if not self.frame_started : # wait start of frame
if len(data) >= 3 - headerlen:
self.header = self.header + data[:3-headerlen]
self.frame_started = True
self.frame_size = int.from_bytes(self.header[1:], byteorder='little')
self.handle_rx(data[3-headerlen:])
else:
self.header = self.header + data
else:
if framelen + len(data) < self.frame_size:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
self.frame_started = False
self.header = b""
self.inframe = b""
if framelen + len(data) > self.frame_size:
self.handle_rx(data[self.frame_size-framelen:])
async def send(self, data):
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
self.transport.write(pkt)
class BLEConnection:
def __init__(self, address):
""" Constructor : specify address """
self.address = address
self.client = None
self.rx_char = None
self.mc = None
async def connect(self):
"""
Connects to the device
Returns : the address used for connection
"""
def match_meshcore_device(_: BLEDevice, adv: AdvertisementData):
""" Filter to mach MeshCore devices """
if not adv.local_name is None\
and adv.local_name.startswith("MeshCore")\
and (self.address is None or self.address in adv.local_name) :
return True
return False
if self.address is None or self.address == "" or len(self.address.split(":")) != 6 :
scanner = BleakScanner()
printerr("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device)
if device is None :
return None
printerr(f"Found device : {device}")
self.client = BleakClient(device)
self.address = self.client.address
else:
self.client = BleakClient(self.address)
try:
await self.client.connect(disconnected_callback=self.handle_disconnect)
except BleakDeviceNotFoundError:
return None
except TimeoutError:
return None
await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_rx)
nus = self.client.services.get_service(UART_SERVICE_UUID)
self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID)
printerr("BLE Connexion started")
return self.address
def handle_disconnect(self, _: BleakClient):
""" Callback to handle disconnection """
printerr ("Device was disconnected, goodbye.")
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
def set_mc(self, mc) :
self.mc = mc
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
if not self.mc is None:
self.mc.handle_rx(data)
async def send(self, data):
await self.client.write_gatt_char(self.rx_char, bytes(data), response=False)
class MeshCore:
"""
Interface to a BLE MeshCore device

84
src/meshcore/serial_cx.py Normal file
View file

@ -0,0 +1,84 @@
"""
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import sys
import serial_asyncio
from meshcore import printerr
class SerialConnection:
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self.frame_started = False
self.frame_size = 0
self.header = b""
self.inframe = b""
class MCSerialClientProtocol(asyncio.Protocol):
def __init__(self, cx):
self.cx = cx
def connection_made(self, transport):
self.cx.transport = transport
# printerr('port opened')
transport.serial.rts = False # You can manipulate Serial object via transport
def data_received(self, data):
# printerr('data received')
self.cx.handle_rx(data)
def connection_lost(self, exc):
printerr('port closed')
def pause_writing(self):
printerr('pause writing')
def resume_writing(self):
printerr('resume writing')
async def connect(self):
"""
Connects to the device
"""
loop = asyncio.get_running_loop()
await serial_asyncio.create_serial_connection(
loop, lambda: self.MCSerialClientProtocol(self),
self.port, baudrate=self.baudrate)
printerr("Serial Connexion started")
return self.port
def set_mc(self, mc) :
self.mc = mc
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
framelen = len(self.inframe)
if not self.frame_started : # wait start of frame
if len(data) >= 3 - headerlen:
self.header = self.header + data[:3-headerlen]
self.frame_started = True
self.frame_size = int.from_bytes(self.header[1:], byteorder='little')
self.handle_rx(data[3-headerlen:])
else:
self.header = self.header + data
else:
if framelen + len(data) < self.frame_size:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
self.frame_started = False
self.header = b""
self.inframe = b""
if framelen + len(data) > self.frame_size:
self.handle_rx(data[self.frame_size-framelen:])
async def send(self, data):
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
# printerr(f"sending pkt : {pkt}")
self.transport.write(pkt)

77
src/meshcore/tcp_cx.py Normal file
View file

@ -0,0 +1,77 @@
"""
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import sys
from meshcore import printerr
class TCPConnection:
def __init__(self, host, port):
self.host = host
self.port = port
self.transport = None
self.frame_started = False
self.frame_size = 0
self.header = b""
self.inframe = b""
class MCClientProtocol:
def __init__(self, cx):
self.cx = cx
def connection_made(self, transport):
self.cx.transport = transport
def data_received(self, data):
self.cx.handle_rx(data)
def error_received(self, exc):
printerr(f'Error received: {exc}')
def connection_lost(self, exc):
printerr('The server closed the connection')
async def connect(self):
"""
Connects to the device
"""
loop = asyncio.get_running_loop()
await loop.create_connection(
lambda: self.MCClientProtocol(self),
self.host, self.port)
printerr("TCP Connexion started")
return self.host
def set_mc(self, mc) :
self.mc = mc
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
framelen = len(self.inframe)
if not self.frame_started : # wait start of frame
if len(data) >= 3 - headerlen:
self.header = self.header + data[:3-headerlen]
self.frame_started = True
self.frame_size = int.from_bytes(self.header[1:], byteorder='little')
self.handle_rx(data[3-headerlen:])
else:
self.header = self.header + data
else:
if framelen + len(data) < self.frame_size:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
self.frame_started = False
self.header = b""
self.inframe = b""
if framelen + len(data) > self.frame_size:
self.handle_rx(data[self.frame_size-framelen:])
async def send(self, data):
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
self.transport.write(pkt)