serial interface added (beware -s is now for specifying com port)

This commit is contained in:
Florent de Lamotte 2025-03-04 10:45:22 +01:00
parent 8879eb253b
commit a1d7b2eb91

103
mccli.py
View file

@ -3,6 +3,7 @@
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import serial_asyncio
import os
import sys
import getopt
@ -34,6 +35,82 @@ def printerr (str) :
sys.stderr.write("\n")
sys.stderr.flush()
class SerialConnection:
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self.frame_started = False
self.frame_size = 0
self.header = b""
self.inframe = b""
class MCSerialClientProtocol(asyncio.Protocol):
def __init__(self, cx):
self.cx = cx
def connection_made(self, transport):
self.cx.transport = transport
# printerr('port opened')
transport.serial.rts = False # You can manipulate Serial object via transport
def data_received(self, data):
# printerr('data received')
self.cx.handle_rx(data)
def connection_lost(self, exc):
printerr('port closed')
def pause_writing(self):
printerr('pause writing')
def resume_writing(self):
printerr('resume writing')
async def connect(self):
"""
Connects to the device
"""
loop = asyncio.get_running_loop()
await serial_asyncio.create_serial_connection(
loop, lambda: self.MCSerialClientProtocol(self),
self.port, baudrate=self.baudrate)
printerr("Serial Connexion started")
return self.port
def set_mc(self, mc) :
self.mc = mc
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
framelen = len(self.inframe)
if not self.frame_started : # wait start of frame
if len(data) >= 3 - headerlen:
self.header = self.header + data[:3-headerlen]
self.frame_started = True
self.frame_size = int.from_bytes(self.header[1:], byteorder='little')
self.handle_rx(data[3-headerlen:])
else:
self.header = self.header + data
else:
if framelen + len(data) < self.frame_size:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
self.frame_started = False
self.header = b""
self.inframe = b""
if framelen + len(data) > self.frame_size:
self.handle_rx(data[self.frame_size-framelen:])
async def send(self, data):
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
# printerr(f"sending pkt : {pkt}")
self.transport.write(pkt)
class TCPConnection:
def __init__(self, host, port):
self.host = host
@ -74,7 +151,7 @@ class TCPConnection:
def handle_rx(self, data: bytearray):
cur_data = data
while (len(cur_data) > 0) :
if cur_data[0] != 62 :
if cur_data[0] != 0x3E :
printerr(f"Error with received frame trying anyway ... first byte is {cur_data[0]}")
frame_size = int.from_bytes(cur_data[1:2], byteorder='little')
@ -87,7 +164,7 @@ class TCPConnection:
async def send(self, data):
size = len(data)
pkt = b"<" + size.to_bytes(2, byteorder="little") + data
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
self.transport.write(pkt)
class BLEConnection:
@ -483,7 +560,7 @@ def usage () :
-d <name> : filter meshcore devices with name or address
-t <hostname> : connects via tcp/ip
-p <port> : specifies tcp port (default 5000)
-s : forces ble scan for a MeshCore device
-s <port> : use serial port <port>
Available Commands (can be chained) :
infos : print informations about the node
@ -515,21 +592,23 @@ async def main(argv):
address = ADDRESS
port = 5000
hostname = None
serial_port = None
baudrate = 115200
# If there is an address in config file, use it by default
# unless an arg is explicitely given
if os.path.exists(MCCLI_ADDRESS) :
with open(MCCLI_ADDRESS, encoding="utf-8") as f :
address = f.readline().strip()
opts, args = getopt.getopt(argv, "a:d:sht:p:")
opts, args = getopt.getopt(argv, "a:d:s:ht:p:")
for opt, arg in opts :
match opt:
case "-d" : # name specified on cmdline
address = arg
case "-a" : # address specified on cmdline
address = arg
case "-s" : # explicitely ask to scan address
address = None
case "-s" : # serial port
serial_port = arg
case "-t" :
hostname = arg
case "-p" :
@ -540,7 +619,14 @@ async def main(argv):
return
con = None
if hostname is None : # connect via ble
if not hostname is None : # connect via tcp
con = TCPConnection(hostname, port)
await con.connect()
elif not serial_port is None : # connect via serial port
con = SerialConnection(serial_port, baudrate)
await con.connect()
await asyncio.sleep(0.1)
else : #connect via ble
con = BLEConnection(address)
address = await con.connect()
if address is None or address == "" : # no device, no action
@ -551,9 +637,6 @@ async def main(argv):
if os.path.isdir(MCCLI_CONFIG_DIR) :
with open(MCCLI_ADDRESS, "w", encoding="utf-8") as f :
f.write(address)
else :
con = TCPConnection(hostname, port)
await con.connect()
mc = MeshCore(con)
await mc.connect()