Refactor to event system

This commit is contained in:
Alex Wolden 2025-04-08 22:56:16 -07:00
parent 5fd4db660d
commit 8f0ecd7d75
24 changed files with 1625 additions and 500 deletions

View file

@ -1,5 +1,10 @@
from meshcore.meshcore import printerr
from meshcore.meshcore import MeshCore
import logging
# Setup default logger
logging.basicConfig(level=logging.INFO)
from meshcore.events import EventType
from meshcore.meshcore import MeshCore, logger
from meshcore.tcp_cx import TCPConnection
from meshcore.ble_cx import BLEConnection
from meshcore.serial_cx import SerialConnection

View file

@ -3,8 +3,10 @@
"""
import asyncio
import sys
import logging
from meshcore import printerr
# Get logger
logger = logging.getLogger("meshcore")
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
@ -40,11 +42,11 @@ class BLEConnection:
if self.address is None or self.address == "" or len(self.address.split(":")) != 6 :
scanner = BleakScanner()
printerr("Scanning for devices")
logger.info("Scanning for devices")
device = await scanner.find_device_by_filter(match_meshcore_device)
if device is None :
return None
printerr(f"Found device : {device}")
logger.info(f"Found device : {device}")
self.client = BleakClient(device)
self.address = self.client.address
else:
@ -62,22 +64,22 @@ class BLEConnection:
nus = self.client.services.get_service(UART_SERVICE_UUID)
self.rx_char = nus.get_characteristic(UART_RX_CHAR_UUID)
printerr("BLE Connexion started")
logger.info("BLE Connection started")
return self.address
def handle_disconnect(self, _: BleakClient):
""" Callback to handle disconnection """
printerr ("Device was disconnected, goodbye.")
logger.info("Device was disconnected, goodbye.")
# cancelling all tasks effectively ends the program
for task in asyncio.all_tasks():
task.cancel()
def set_mc(self, mc) :
self.mc = mc
def set_reader(self, reader) :
self.reader = reader
def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray):
if not self.mc is None:
self.mc.handle_rx(data)
if not self.reader is None:
asyncio.create_task(self.reader.handle_rx(data))
async def send(self, data):
await self.client.write_gatt_char(self.rx_char, bytes(data), response=False)

216
src/meshcore/commands.py Normal file
View file

@ -0,0 +1,216 @@
import functools
import asyncio
import logging
import warnings
import time
from typing import Any, Callable, Awaitable, Optional, Union
from .events import EventType
logger = logging.getLogger("meshcore")
class CommandError(Exception):
def __init__(self, details=None):
self.details = details
super().__init__(f"Command error: {details}")
def deprecated(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
warnings.warn(
f"Method {func.__name__} is deprecated. Use commands.{func.__name__} instead.",
DeprecationWarning,
stacklevel=2
)
return await func(*args, **kwargs)
return wrapper
class CommandHandler:
def __init__(self):
self._sender_func = None
self._reader = None
self.dispatcher = None
def set_connection(self, connection):
async def sender(data):
await connection.send(data)
self._sender_func = sender
def set_reader(self, reader):
self._reader = reader
def set_dispatcher(self, dispatcher):
self.dispatcher = dispatcher
async def send(self, data, expected_events=None, timeout=5.0):
if not self.dispatcher:
raise RuntimeError("Dispatcher not set, cannot send commands")
if self._sender_func:
logger.debug(f"Sending raw data: {data.hex() if isinstance(data, bytes) else data}")
await self._sender_func(data)
if expected_events:
try:
# Convert single event to list if needed
if not isinstance(expected_events, list):
expected_events = [expected_events]
logger.debug(f"Waiting for events {expected_events}, timeout={timeout}")
for event_type in expected_events:
event = await self.dispatcher.wait_for_event(event_type, timeout)
if event:
return event.payload
return False
except asyncio.TimeoutError:
logger.debug(f"Command timed out {data}")
return False
except Exception as e:
logger.debug(f"Command error: {e}")
return {"error": str(e)}
return True
async def send_appstart(self):
logger.debug("Sending appstart command")
b1 = bytearray(b'\x01\x03 mccli')
return await self.send(b1, [EventType.SELF_INFO])
async def send_device_query(self):
logger.debug("Sending device query command")
return await self.send(b"\x16\x03", [EventType.DEVICE_INFO, EventType.ERROR])
async def send_advert(self, flood=False):
logger.debug(f"Sending advertisement command (flood={flood})")
if flood:
return await self.send(b"\x07\x01", [EventType.OK, EventType.ERROR])
else:
return await self.send(b"\x07", [EventType.OK, EventType.ERROR])
async def set_name(self, name):
logger.debug(f"Setting device name to: {name}")
return await self.send(b'\x08' + name.encode("ascii"), [EventType.OK, EventType.ERROR])
async def set_coords(self, lat, lon):
logger.debug(f"Setting coordinates to: lat={lat}, lon={lon}")
return await self.send(b'\x0e'\
+ int(lat*1e6).to_bytes(4, 'little', signed=True)\
+ int(lon*1e6).to_bytes(4, 'little', signed=True)\
+ int(0).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def reboot(self):
logger.debug("Sending reboot command")
return await self.send(b'\x13reboot')
async def get_bat(self):
logger.debug("Getting battery information")
return await self.send(b'\x14', [EventType.BATTERY, EventType.ERROR])
async def get_time(self):
logger.debug("Getting device time")
return await self.send(b"\x05", [EventType.CURRENT_TIME, EventType.ERROR])
async def set_time(self, val):
logger.debug(f"Setting device time to: {val}")
return await self.send(b"\x06" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def set_tx_power(self, val):
logger.debug(f"Setting TX power to: {val}")
return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def set_radio(self, freq, bw, sf, cr):
logger.debug(f"Setting radio params: freq={freq}, bw={bw}, sf={sf}, cr={cr}")
return await self.send(b"\x0b" \
+ int(float(freq)*1000).to_bytes(4, 'little')\
+ int(float(bw)*1000).to_bytes(4, 'little')\
+ int(sf).to_bytes(1, 'little')\
+ int(cr).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR])
async def set_tuning(self, rx_dly, af):
logger.debug(f"Setting tuning params: rx_dly={rx_dly}, af={af}")
return await self.send(b"\x15" \
+ int(rx_dly).to_bytes(4, 'little')\
+ int(af).to_bytes(4, 'little')\
+ int(0).to_bytes(1, 'little')\
+ int(0).to_bytes(1, 'little'), [EventType.OK, EventType.ERROR])
async def set_devicepin(self, pin):
logger.debug(f"Setting device PIN to: {pin}")
return await self.send(b"\x25" \
+ int(pin).to_bytes(4, 'little'), [EventType.OK, EventType.ERROR])
async def get_contacts(self):
logger.debug("Getting contacts")
return await self.send(b"\x04", [EventType.CONTACTS, EventType.ERROR])
async def reset_path(self, key):
logger.debug(f"Resetting path for contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x0D" + key
return await self.send(data, [EventType.OK, EventType.ERROR])
async def share_contact(self, key):
logger.debug(f"Sharing contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x10" + key
return await self.send(data, [EventType.CONTACT_SHARE, EventType.ERROR])
async def export_contact(self, key=b""):
logger.debug(f"Exporting contact: {key.hex() if key else 'all'}")
data = b"\x11" + key
return await self.send(data, [EventType.OK, EventType.ERROR])
async def remove_contact(self, key):
logger.debug(f"Removing contact: {key.hex() if isinstance(key, bytes) else key}")
data = b"\x0f" + key
return await self.send(data, [EventType.OK, EventType.ERROR])
async def get_msg(self):
logger.debug("Requesting pending messages")
return await self.send(b"\x0A", [EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV, EventType.ERROR], 1)
async def send_login(self, dst, pwd):
logger.debug(f"Sending login request to: {dst.hex() if isinstance(dst, bytes) else dst}")
data = b"\x1a" + dst + pwd.encode("ascii")
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_statusreq(self, dst):
logger.debug(f"Sending status request to: {dst.hex() if isinstance(dst, bytes) else dst}")
data = b"\x1b" + dst
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_cmd(self, dst, cmd, timestamp=None):
logger.debug(f"Sending command to {dst.hex() if isinstance(dst, bytes) else dst}: {cmd}")
# Default to current time if timestamp not provided
if timestamp is None:
import time
timestamp = int(time.time()).to_bytes(4, 'little')
data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii")
return await self.send(data, [EventType.OK, EventType.ERROR])
async def send_msg(self, dst, msg, timestamp=None):
logger.debug(f"Sending message to {dst.hex() if isinstance(dst, bytes) else dst}: {msg}")
# Default to current time if timestamp not provided
if timestamp is None:
import time
timestamp = int(time.time()).to_bytes(4, 'little')
data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii")
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_chan_msg(self, chan, msg, timestamp=None):
logger.debug(f"Sending channel message to channel {chan}: {msg}")
# Default to current time if timestamp not provided
if timestamp is None:
import time
timestamp = int(time.time()).to_bytes(4, 'little')
data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii")
return await self.send(data, [EventType.MSG_SENT, EventType.ERROR])
async def send_cli(self, cmd):
logger.debug(f"Sending CLI command: {cmd}")
data = b"\x32" + cmd.encode('ascii')
return await self.send(data, [EventType.CLI_RESPONSE, EventType.ERROR])

123
src/meshcore/events.py Normal file
View file

@ -0,0 +1,123 @@
from enum import Enum
import logging
from typing import Any, Dict, Optional, Callable, List, Union
import asyncio
from dataclasses import dataclass
logger = logging.getLogger("meshcore")
# Public event types for users to subscribe to
class EventType(Enum):
CONTACTS = "contacts"
SELF_INFO = "self_info"
CONTACT_MSG_RECV = "contact_message"
CHANNEL_MSG_RECV = "channel_message"
CURRENT_TIME = "time_update"
NO_MORE_MSGS = "no_more_messages"
CONTACT_SHARE = "contact_share"
BATTERY = "battery_info"
DEVICE_INFO = "device_info"
CLI_RESPONSE = "cli_response"
MSG_SENT = "message_sent"
# Push notifications
ADVERTISEMENT = "advertisement"
PATH_UPDATE = "path_update"
ACK = "acknowledgement"
MESSAGES_WAITING = "messages_waiting"
RAW_DATA = "raw_data"
LOGIN_SUCCESS = "login_success"
LOGIN_FAILED = "login_failed"
STATUS_RESPONSE = "status_response"
LOG_DATA = "log_data"
# Command response types
OK = "command_ok"
ERROR = "command_error"
@dataclass
class Event:
type: EventType
payload: Any
attributes: Dict[str, Any] = None
def __post_init__(self):
if self.attributes is None:
self.attributes = {}
class Subscription:
def __init__(self, dispatcher, event_type, callback):
self.dispatcher = dispatcher
self.event_type = event_type
self.callback = callback
def unsubscribe(self):
self.dispatcher._remove_subscription(self)
class EventDispatcher:
def __init__(self):
self.queue = asyncio.Queue()
self.subscriptions: List[Subscription] = []
self.running = False
self._task = None
def subscribe(self, event_type: Union[EventType, None], callback: Callable[[Event], None]) -> Subscription:
subscription = Subscription(self, event_type, callback)
self.subscriptions.append(subscription)
return subscription
def _remove_subscription(self, subscription: Subscription):
if subscription in self.subscriptions:
self.subscriptions.remove(subscription)
async def dispatch(self, event: Event):
await self.queue.put(event)
async def _process_events(self):
while self.running:
event = await self.queue.get()
logger.debug(f"Dispatching event: {event.type}, {event.payload}")
for subscription in self.subscriptions.copy():
if subscription.event_type is None or subscription.event_type == event.type:
try:
await subscription.callback(event)
except Exception as e:
print(f"Error in event handler: {e}")
self.queue.task_done()
async def start(self):
if not self.running:
self.running = True
self._task = asyncio.create_task(self._process_events())
async def stop(self):
if self.running:
self.running = False
if self._task:
await self.queue.join()
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def wait_for_event(self, event_type: EventType, timeout: float = None) -> Optional[Event]:
future = asyncio.Future()
async def event_handler(event: Event):
if not future.done():
future.set_result(event)
subscription = self.subscribe(event_type, event_handler)
try:
return await asyncio.wait_for(future, timeout)
except asyncio.TimeoutError:
return None
finally:
subscription.unsubscribe()

View file

@ -1,414 +1,288 @@
"""
mccli.py : CLI interface to MeschCore BLE companion app
"""
import asyncio
import sys
import functools
import warnings
import logging
from typing import Optional
def printerr (s) :
sys.stderr.write(str(s))
sys.stderr.write("\n")
sys.stderr.flush()
from .events import EventDispatcher, EventType
from .reader import MessageReader
from .commands import CommandHandler
# Setup default logger
logger = logging.getLogger("meshcore")
def deprecated(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
warnings.warn(
f"Method {func.__name__} is deprecated. Use commands.{func.__name__} instead.",
DeprecationWarning,
stacklevel=2
)
return await func(*args, **kwargs)
return wrapper
class MeshCore:
"""
Interface to a BLE MeshCore device
Interface to a MeshCore device
"""
self_info={}
contacts={}
def __init__(self, cx):
""" Constructor : specify address """
self.time = 0
self.result = asyncio.Future()
self.contact_nb = 0
self.rx_sem = asyncio.Semaphore(0)
self.ack_ev = asyncio.Event()
self.login_resp = asyncio.Future()
self.status_resp = asyncio.Future()
def __init__(self, cx, debug=False):
self.cx = cx
cx.set_mc(self)
async def connect(self) :
await self.send_appstart()
def handle_rx(self, data: bytearray):
""" Callback to handle received data """
match data[0]:
case 0: # ok
if len(data) == 5 : # an integer
self.result.set_result(int.from_bytes(data[1:5], byteorder='little'))
else:
self.result.set_result(True)
case 1: # error
if len(data) > 1:
res = {}
res["error_code"] = data[1]
self.result.set_result(res) # error code if fw > 1.4
else:
self.result.set_result(False)
case 2: # contact start
self.contact_nb = int.from_bytes(data[1:5], byteorder='little')
self.contacts={}
case 3: # contact
c = {}
c["public_key"] = data[1:33].hex()
c["type"] = data[33]
c["flags"] = data[34]
c["out_path_len"] = int.from_bytes(data[35:36], signed=True)
plen = int.from_bytes(data[35:36], signed=True)
if plen == -1 :
plen = 0
c["out_path"] = data[36:36+plen].hex()
c["adv_name"] = data[100:132].decode().replace("\0","")
c["last_advert"] = int.from_bytes(data[132:136], byteorder='little')
c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6
c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6
c["lastmod"] = int.from_bytes(data[144:148], byteorder='little')
self.contacts[c["adv_name"]]=c
case 4: # end of contacts
self.result.set_result(self.contacts)
case 5: # self info
self.self_info["adv_type"] = data[1]
self.self_info["tx_power"] = data[2]
self.self_info["max_tx_power"] = data[3]
self.self_info["public_key"] = data[4:36].hex()
self.self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6
self.self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6
#self.self_info["reserved_44:48"] = data[44:48].hex()
self.self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000
self.self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000
self.self_info["radio_sf"] = data[56]
self.self_info["radio_cr"] = data[57]
self.self_info["name"] = data[58:].decode()
self.result.set_result(True)
case 6: # msg sent
res = {}
res["type"] = data[1]
res["expected_ack"] = bytes(data[2:6])
res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little')
self.result.set_result(res)
case 7: # contact msg recv
res = {}
res["type"] = "PRIV"
res["pubkey_prefix"] = data[1:7].hex()
res["path_len"] = data[7]
res["txt_type"] = data[8]
res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little')
if data[8] == 2 : # signed packet
res["signature"] = data[13:17].hex()
res["text"] = data[17:].decode()
else :
res["text"] = data[13:].decode()
self.result.set_result(res)
case 16: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "PRIV"
res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4;
res["pubkey_prefix"] = data[4:10].hex()
res["path_len"] = data[10]
res["txt_type"] = data[11]
res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little')
if data[11] == 2 : # signed packet
res["signature"] = data[16:20].hex()
res["text"] = data[20:].decode()
else :
res["text"] = data[16:].decode()
self.result.set_result(res)
case 8 : # chanel msg recv
res = {}
res["type"] = "CHAN"
res["channel_idx"] = data[1]
res["path_len"] = data[2]
res["txt_type"] = data[3]
res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little')
res["text"] = data[8:].decode()
self.result.set_result(res)
case 17: # a reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "CHAN"
res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4;
res["channel_idx"] = data[4]
res["path_len"] = data[5]
res["txt_type"] = data[6]
res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little')
res["text"] = data[11:].decode()
self.result.set_result(res)
case 9: # current time
self.result.set_result(int.from_bytes(data[1:5], byteorder='little'))
case 10: # no more msgs
self.result.set_result(False)
case 11: # contact
self.result.set_result("meshcore://" + data[1:].hex())
case 12: # battery voltage
self.result.set_result(int.from_bytes(data[1:3], byteorder='little'))
case 13: # device info
res = {}
res["fw ver"] = data[1]
if data[1] >= 3:
res["max_contacts"] = data[2] * 2
res["max_channels"] = data[3]
res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little')
res["fw_build"] = data[8:20].decode().replace("\0","")
res["model"] = data[20:60].decode().replace("\0","")
res["ver"] = data[60:80].decode().replace("\0","")
self.result.set_result(res)
case 50: # cli response
res = {}
res["response"] = data[1:].decode()
self.result.set_result(res)
# push notifications
case 0x80:
printerr ("Advertisment received")
case 0x81:
printerr ("Code path update")
case 0x82:
self.ack_ev.set()
printerr ("Received ACK")
case 0x83:
self.rx_sem.release()
printerr ("Msgs are waiting")
case 0x84:
printerr ("Received raw data")
res = {}
res["SNR"] = data[1] / 4
res["RSSI"] = data[2]
res["payload"] = data[4:].hex()
print(res)
case 0x85:
self.login_resp.set_result(True)
printerr ("Login success")
case 0x86:
self.login_resp.set_result(False)
printerr ("Login failed")
case 0x87:
res = {}
res["pubkey_pre"] = data[2:8].hex()
res["bat"] = int.from_bytes(data[8:10], byteorder='little')
res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little')
res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little')
res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True)
res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False)
res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False)
res["airtime"] = int.from_bytes(data[24:28], byteorder='little')
res["uptime"] = int.from_bytes(data[28:32], byteorder='little')
res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little')
res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little')
res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little')
res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little')
res["full_evts"] = int.from_bytes(data[48:50], byteorder='little')
res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4
res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little')
res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little')
self.status_resp.set_result(res)
data_hex = data[8:].hex()
printerr (f"Status response: {data_hex}")
#printerr(res)
case 0x88:
printerr ("Received log data")
# unhandled
case _:
printerr (f"Unhandled data received {data}")
async def send(self, data, timeout = 5):
""" Helper function to synchronously send (and receive) data to the node """
self.result = asyncio.Future()
try:
await self.cx.send(data)
res = await asyncio.wait_for(self.result, timeout)
return res
except TimeoutError :
printerr ("Timeout while sending message ...")
return False
async def send_only(self, data): # don't wait reply
await self.cx.send(data)
async def send_appstart(self):
""" Send APPSTART to the node """
b1 = bytearray(b'\x01\x03 mccli')
return await self.send(b1)
async def send_device_qeury(self):
return await self.send(b"\x16\x03");
async def send_advert(self, flood=False):
""" Make the node send an advertisement """
if flood :
return await self.send(b"\x07\x01")
else :
return await self.send(b"\x07")
async def set_name(self, name):
""" Changes the name of the node """
return await self.send(b'\x08' + name.encode("ascii"))
async def set_coords(self, lat, lon):
return await self.send(b'\x0e'\
+ int(lat*1e6).to_bytes(4, 'little', signed=True)\
+ int(lon*1e6).to_bytes(4, 'little', signed=True)\
+ int(0).to_bytes(4, 'little'))
async def reboot(self):
await self.send_only(b'\x13reboot')
return True
async def get_bat(self):
return await self.send(b'\x14')
async def get_time(self):
""" Get the time (epoch) of the node """
self.time = await self.send(b"\x05")
return self.time
async def set_time(self, val):
""" Sets a new epoch """
return await self.send(b"\x06" + int(val).to_bytes(4, 'little'))
async def set_tx_power(self, val):
""" Sets tx power """
return await self.send(b"\x0c" + int(val).to_bytes(4, 'little'))
async def set_radio (self, freq, bw, sf, cr):
""" Sets radio params """
return await self.send(b"\x0b" \
+ int(float(freq)*1000).to_bytes(4, 'little')\
+ int(float(bw)*1000).to_bytes(4, 'little')\
+ int(sf).to_bytes(1, 'little')\
+ int(cr).to_bytes(1, 'little'))
async def set_tuning (self, rx_dly, af):
""" Sets radio params """
return await self.send(b"\x15" \
+ int(rx_dly).to_bytes(4, 'little')\
+ int(af).to_bytes(4, 'little')\
+ int(0).to_bytes(1, 'little')\
+ int(0).to_bytes(1, 'little'))
async def set_devicepin (self, pin):
return await self.send(b"\x25" \
+ int(pin).to_bytes(4, 'little'))
async def get_contacts(self):
""" Starts retreiving contacts """
return await self.send(b"\x04")
self.dispatcher = EventDispatcher()
self._reader = MessageReader(self.dispatcher)
self.commands = CommandHandler()
# Set up logger
if debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# Set up connections
self.commands.set_connection(cx)
# Set the dispatcher in the command handler
self.commands.set_dispatcher(self.dispatcher)
self.commands.set_reader(self._reader)
# Initialize state (private)
self._contacts = {}
self._self_info = {}
self._time = 0
# Set up event subscriptions to track data
self._setup_data_tracking()
cx.set_reader(self._reader)
@classmethod
async def create_tcp(cls, host: str, port: int, debug: bool = False) -> 'MeshCore':
"""Create and connect a MeshCore instance using TCP connection"""
from .tcp_cx import TCPConnection
connection = TCPConnection(host, port)
await connection.connect()
mc = cls(connection, debug=debug)
await mc.connect()
return mc
@classmethod
async def create_serial(cls, port: str, baudrate: int = 115200, debug: bool = False) -> 'MeshCore':
"""Create and connect a MeshCore instance using serial connection"""
from .serial_cx import SerialConnection
import asyncio
connection = SerialConnection(port, baudrate)
await connection.connect()
await asyncio.sleep(0.1) # Time for transport to establish
mc = cls(connection, debug=debug)
await mc.connect()
return mc
@classmethod
async def create_ble(cls, address: Optional[str] = None, debug: bool = False) -> 'MeshCore':
"""Create and connect a MeshCore instance using BLE connection
If address is None, it will scan for and connect to the first available MeshCore device.
"""
from .ble_cx import BLEConnection
connection = BLEConnection(address)
result = await connection.connect()
if result is None:
raise ConnectionError("Failed to connect to BLE device")
mc = cls(connection, debug=debug)
await mc.connect()
return mc
async def connect(self):
await self.dispatcher.start()
return await self.commands.send_appstart()
async def disconnect(self):
await self.dispatcher.stop()
def stop(self):
"""Synchronously stop the event dispatcher task"""
if self.dispatcher._task and not self.dispatcher._task.done():
self.dispatcher.running = False
self.dispatcher._task.cancel()
def subscribe(self, event_type: EventType, callback):
"""
Subscribe to events using EventType enum
Args:
event_type: Type of event to subscribe to, from EventType enum
callback: Async function to call when event occurs
Returns:
Subscription object that can be used to unsubscribe
"""
return self.dispatcher.subscribe(event_type, callback)
def unsubscribe(self, subscription):
"""
Unsubscribe from events using a subscription object
Args:
subscription: Subscription object returned from subscribe()
"""
if subscription:
subscription.unsubscribe()
async def wait_for_event(self, event_type: EventType, timeout=None):
"""
Wait for an event using EventType enum
Args:
event_type: Type of event to wait for, from EventType enum
timeout: Maximum time to wait in seconds, or None for no timeout
Returns:
Event object or None if timeout
"""
return await self.dispatcher.wait_for_event(event_type, timeout)
def _setup_data_tracking(self):
"""Set up event subscriptions to track data internally"""
async def _update_contacts(event):
self._contacts = event.payload
async def _update_self_info(event):
self._self_info = event.payload
async def _update_time(event):
self._time = event.payload
# Subscribe to events to update internal state
self.subscribe(EventType.CONTACTS, _update_contacts)
self.subscribe(EventType.SELF_INFO, _update_self_info)
self.subscribe(EventType.CURRENT_TIME, _update_time)
# Getter methods for state
@property
def contacts(self):
"""Get the current contacts"""
return self._contacts
@property
def self_info(self):
"""Get device self info"""
return self._self_info
@property
def time(self):
"""Get the current device time"""
return self._time
def get_contact_by_name(self, name):
"""
Find a contact by its name (adv_name field)
Args:
name: The name to search for
Returns:
Contact dictionary or None if not found
"""
if not self._contacts:
return None
for contact_id, contact in self._contacts.items():
if contact.get("adv_name", "").lower() == name.lower():
return contact
return None
def get_contact_by_key_prefix(self, prefix):
"""
Find a contact by its public key prefix
Args:
prefix: The public key prefix to search for (can be a partial prefix)
Returns:
Contact dictionary or None if not found
"""
if not self._contacts or not prefix:
return None
# Convert the prefix to lowercase for case-insensitive matching
prefix = prefix.lower()
for contact_id, contact in self._contacts.items():
public_key = contact.get("public_key", "").lower()
if public_key.startswith(prefix):
return contact
return None
async def start_auto_message_fetching(self):
"""
Start automatically fetching messages when messages_waiting events are received.
This will continuously check for new messages when the device indicates
messages are waiting.
"""
self._auto_fetch_task = None
self._auto_fetch_running = True
async def _handle_messages_waiting(event):
# Only start a new fetch task if one isn't already running
if not self._auto_fetch_task or self._auto_fetch_task.done():
self._auto_fetch_task = asyncio.create_task(_fetch_messages_loop())
async def _fetch_messages_loop():
while self._auto_fetch_running:
try:
# Request the next message
result = await self.commands.get_msg()
# If we got a NO_MORE_MSGS event or an error, stop fetching
if not result or isinstance(result, dict) and "error" in result:
break
# Small delay to prevent overwhelming the device
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"Error fetching messages: {e}")
break
# Subscribe to MESSAGES_WAITING events
self._auto_fetch_subscription = self.subscribe(EventType.MESSAGES_WAITING, _handle_messages_waiting)
# Check for any pending messages immediately
await self.commands.get_msg()
return self._auto_fetch_subscription
async def stop_auto_message_fetching(self):
"""
Stop automatically fetching messages when messages_waiting events are received.
"""
if hasattr(self, '_auto_fetch_subscription') and self._auto_fetch_subscription:
self.unsubscribe(self._auto_fetch_subscription)
self._auto_fetch_subscription = None
if hasattr(self, '_auto_fetch_running'):
self._auto_fetch_running = False
if hasattr(self, '_auto_fetch_task') and self._auto_fetch_task and not self._auto_fetch_task.done():
self._auto_fetch_task.cancel()
try:
await self._auto_fetch_task
except asyncio.CancelledError:
pass
self._auto_fetch_task = None
async def ensure_contacts(self):
if len(self.contacts) == 0 :
await self.get_contacts()
async def reset_path(self, key):
data = b"\x0D" + key
return await self.send(data)
async def share_contact(self, key):
data = b"\x10" + key
return await self.send(data)
async def export_contact(self, key=b""):
data = b"\x11" + key
return await self.send(data)
async def remove_contact(self, key):
data = b"\x0f" + key
return await self.send(data)
async def set_out_path(self, contact, path):
contact["out_path"] = path
contact["out_path_len"] = -1
contact["out_path_len"] = int(len(path) / 2)
async def update_contact(self, contact):
out_path_hex = contact["out_path"]
out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0"
adv_name_hex = contact["adv_name"].encode().hex()
adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0"
data = b"\x09" \
+ bytes.fromhex(contact["public_key"])\
+ contact["type"].to_bytes(1)\
+ contact["flags"].to_bytes(1)\
+ contact["out_path_len"].to_bytes(1, 'little', signed=True)\
+ bytes.fromhex(out_path_hex)\
+ bytes.fromhex(adv_name_hex)\
+ contact["last_advert"].to_bytes(4, 'little')\
+ int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\
+ int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True)
return await self.send(data)
async def send_login(self, dst, pwd):
self.login_resp = asyncio.Future()
data = b"\x1a" + dst + pwd.encode("ascii")
return await self.send(data)
async def wait_login(self, timeout = 5):
try :
return await asyncio.wait_for(self.login_resp, timeout)
except TimeoutError :
printerr ("Timeout ...")
return False
async def send_statusreq(self, dst):
self.status_resp = asyncio.Future()
data = b"\x1b" + dst
return await self.send(data)
async def wait_status(self, timeout = 5):
try :
return await asyncio.wait_for(self.status_resp, timeout)
except TimeoutError :
printerr ("Timeout...")
return False
async def send_cmd(self, dst, cmd):
""" Send a cmd to a node """
timestamp = (await self.get_time()).to_bytes(4, 'little')
data = b"\x02\x01\x00" + timestamp + dst + cmd.encode("ascii")
#self.ack_ev.clear() # no ack ?
return await self.send(data)
async def send_msg(self, dst, msg):
""" Send a message to a node """
timestamp = (await self.get_time()).to_bytes(4, 'little')
data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii")
self.ack_ev.clear()
return await self.send(data)
async def send_chan_msg(self, chan, msg):
""" Send a message to a public channel """
timestamp = (await self.get_time()).to_bytes(4, 'little')
data = b"\x03\x00" + chan.to_bytes(1, 'little') + timestamp + msg.encode("ascii")
return await self.send(data)
async def get_msg(self):
""" Get message from the node (stored in queue) """
res = await self.send(b"\x0A", 1)
if res is False :
self.rx_sem=asyncio.Semaphore(0) # reset semaphore as there are no msgs in queue
return res
async def wait_msg(self, timeout=-1):
""" Wait for a message """
if timeout == -1 :
await self.rx_sem.acquire()
"""Ensure contacts are fetched"""
if not self._contacts:
await self.commands.get_contacts()
return True
try:
await asyncio.wait_for(self.rx_sem.acquire(), timeout)
return True
except TimeoutError :
printerr("Timeout waiting msg")
return False
async def wait_ack(self, timeout=6):
""" Wait ack """
try:
await asyncio.wait_for(self.ack_ev.wait(), timeout)
return True
except TimeoutError :
printerr("Timeout waiting ack")
return False
async def send_cli(self, cmd):
data = b"\x32" + cmd.encode('ascii')
return await self.send(data)
return False

View file

@ -0,0 +1,249 @@
import asyncio
from typing import Dict, Any, Optional, Callable
from .events import EventDispatcher, MessageType, Event
from .reader import MessageReader
from .commands import CommandHandler, deprecated
class MeshCore:
def __init__(self, cx):
self.cx = cx
self.dispatcher = EventDispatcher()
self._reader = MessageReader(self.dispatcher)
self.commands = CommandHandler()
# Set up connections
self.commands.set_connection(cx)
# Initialize state
self.contacts = {}
self.self_info = {}
self.time = 0
# Set the message handler in the connection
cx.set_mc(self)
async def connect(self):
# Start the event dispatcher
await self.dispatcher.start()
# Start the command handler
await self.commands.start()
# Send the initial app start
return await self.commands.send_appstart()
async def disconnect(self):
# Stop the event dispatcher
await self.dispatcher.stop()
# Stop the command handler
await self.commands.stop()
# Internal method - called by the connection
def handle_rx(self, data: bytearray):
asyncio.create_task(self._reader.handle_rx(data))
# Expose subscribe/wait capabilities from the event system
def subscribe(self, message_type, callback):
return self.dispatcher.subscribe(message_type, callback)
async def wait_for_event(self, message_type, timeout=None):
return await self.dispatcher.wait_for_event(message_type, timeout)
# Legacy method implementations that delegate to the command handler
# using the deprecated decorator from commands.py
@deprecated
async def send(self, data, timeout=5):
return await self.commands.send(data, timeout)
@deprecated
async def send_only(self, data):
await self.commands.send_only(data)
@deprecated
async def send_appstart(self):
return await self.commands.send_appstart()
@deprecated
async def send_device_query(self):
return await self.commands.send_device_query()
@deprecated
async def send_advert(self, flood=False):
return await self.commands.send_advert(flood)
@deprecated
async def set_name(self, name):
return await self.commands.set_name(name)
@deprecated
async def set_coords(self, lat, lon):
return await self.commands.set_coords(lat, lon)
@deprecated
async def reboot(self):
return await self.commands.reboot()
@deprecated
async def get_bat(self):
return await self.commands.get_bat()
@deprecated
async def get_time(self):
time_result = await self.commands.get_time()
if isinstance(time_result, int):
self.time = time_result
return self.time
@deprecated
async def set_time(self, val):
return await self.commands.set_time(val)
@deprecated
async def set_tx_power(self, val):
return await self.commands.set_tx_power(val)
@deprecated
async def set_radio(self, freq, bw, sf, cr):
return await self.commands.set_radio(freq, bw, sf, cr)
@deprecated
async def set_tuning(self, rx_dly, af):
return await self.commands.set_tuning(rx_dly, af)
@deprecated
async def set_devicepin(self, pin):
return await self.commands.set_devicepin(pin)
@deprecated
async def get_contacts(self):
await self.commands.get_contacts()
contact_end = await self.dispatcher.wait_for_event(MessageType.CONTACT_END)
if contact_end:
self.contacts = contact_end.payload
return self.contacts
@deprecated
async def ensure_contacts(self):
if not self.contacts:
await self.get_contacts()
@deprecated
async def reset_path(self, key):
return await self.commands.reset_path(key)
@deprecated
async def share_contact(self, key):
return await self.commands.share_contact(key)
@deprecated
async def export_contact(self, key=b""):
return await self.commands.export_contact(key)
@deprecated
async def remove_contact(self, key):
return await self.commands.remove_contact(key)
@deprecated
async def set_out_path(self, contact, path):
contact["out_path"] = path
contact["out_path_len"] = -1
contact["out_path_len"] = int(len(path) / 2)
@deprecated
async def update_contact(self, contact):
out_path_hex = contact["out_path"]
out_path_hex = out_path_hex + (128-len(out_path_hex)) * "0"
adv_name_hex = contact["adv_name"].encode().hex()
adv_name_hex = adv_name_hex + (64-len(adv_name_hex)) * "0"
data = b"\x09" \
+ bytes.fromhex(contact["public_key"])\
+ contact["type"].to_bytes(1)\
+ contact["flags"].to_bytes(1)\
+ contact["out_path_len"].to_bytes(1, 'little', signed=True)\
+ bytes.fromhex(out_path_hex)\
+ bytes.fromhex(adv_name_hex)\
+ contact["last_advert"].to_bytes(4, 'little')\
+ int(contact["adv_lat"]*1e6).to_bytes(4, 'little', signed=True)\
+ int(contact["adv_lon"]*1e6).to_bytes(4, 'little', signed=True)
return await self.send(data)
@deprecated
async def send_login(self, dst, pwd):
await self.commands.send_login(dst, pwd)
login_event = await self.dispatcher.wait_for_event(MessageType.LOGIN_SUCCESS, 0.1)
if login_event:
return True
return await self.commands.send_login(dst, pwd)
@deprecated
async def wait_login(self, timeout=5):
login_event = await self.dispatcher.wait_for_event(MessageType.LOGIN_SUCCESS, timeout)
if login_event:
return True
login_failed = await self.dispatcher.wait_for_event(MessageType.LOGIN_FAILED, 0)
if login_failed:
return False
return False
@deprecated
async def send_statusreq(self, dst):
await self.commands.send_statusreq(dst)
@deprecated
async def wait_status(self, timeout=5):
status_event = await self.dispatcher.wait_for_event(MessageType.STATUS_RESPONSE, timeout)
if status_event:
return status_event.payload
return False
@deprecated
async def send_cmd(self, dst, cmd):
timestamp = await self.get_time()
return await self.commands.send_cmd(dst, cmd, timestamp.to_bytes(4, 'little'))
@deprecated
async def send_msg(self, dst, msg):
timestamp = await self.get_time()
result = await self.commands.send_msg(dst, msg, timestamp.to_bytes(4, 'little'))
return result
@deprecated
async def send_chan_msg(self, chan, msg):
timestamp = await self.get_time()
return await self.commands.send_chan_msg(chan, msg, timestamp.to_bytes(4, 'little'))
@deprecated
async def get_msg(self):
await self.commands.get_msg()
# Wait for any message type that could be received
message_types = [
MessageType.CONTACT_MSG_RECV,
MessageType.CHANNEL_MSG_RECV,
MessageType.NO_MORE_MSGS
]
for msg_type in message_types:
event = await self.dispatcher.wait_for_event(msg_type, 0)
if event:
return event.payload
return False
@deprecated
async def wait_msg(self, timeout=-1):
msg_event = await self.dispatcher.wait_for_event(MessageType.MESSAGES_WAITING, timeout)
return msg_event is not None
@deprecated
async def wait_ack(self, timeout=6):
ack_event = await self.dispatcher.wait_for_event(MessageType.ACK, timeout)
return ack_event is not None
@deprecated
async def send_cli(self, cmd):
return await self.commands.send_cli(cmd)

30
src/meshcore/packets.py Normal file
View file

@ -0,0 +1,30 @@
from enum import Enum
# Packet prefixes for the protocol
class PacketType(Enum):
OK = 0
ERROR = 1
CONTACT_START = 2
CONTACT = 3
CONTACT_END = 4
SELF_INFO = 5
MSG_SENT = 6
CONTACT_MSG_RECV = 7
CHANNEL_MSG_RECV = 8
CURRENT_TIME = 9
NO_MORE_MSGS = 10
CONTACT_SHARE = 11
BATTERY = 12
DEVICE_INFO = 13
CLI_RESPONSE = 50
# Push notifications
ADVERTISEMENT = 0x80
PATH_UPDATE = 0x81
ACK = 0x82
MESSAGES_WAITING = 0x83
RAW_DATA = 0x84
LOGIN_SUCCESS = 0x85
LOGIN_FAILED = 0x86
STATUS_RESPONSE = 0x87
LOG_DATA = 0x88

235
src/meshcore/reader.py Normal file
View file

@ -0,0 +1,235 @@
import sys
import logging
import asyncio
from typing import Any, Optional, Dict
from .events import Event, EventType, EventDispatcher
from .packets import PacketType
logger = logging.getLogger("meshcore")
class MessageReader:
def __init__(self, dispatcher: EventDispatcher):
self.dispatcher = dispatcher
# We're only keeping state here that's needed for processing
# before events are dispatched
self.contacts = {} # Temporary storage during contact list building
self.contact_nb = 0 # Used for contact processing
async def handle_rx(self, data: bytearray):
packet_type_value = data[0]
logger.debug(f"Received data: {data.hex()}")
# Handle command responses
if packet_type_value == PacketType.OK.value:
result = None
if len(data) == 5:
result = int.from_bytes(data[1:5], byteorder='little')
else:
result = True
# Dispatch event for the OK response
await self.dispatcher.dispatch(Event(EventType.OK, result))
elif packet_type_value == PacketType.ERROR.value:
result = False
if len(data) > 1:
result = {"error_code": data[1]}
# Dispatch event for the ERROR response
await self.dispatcher.dispatch(Event(EventType.ERROR, result))
elif packet_type_value == PacketType.CONTACT_START.value:
self.contact_nb = int.from_bytes(data[1:5], byteorder='little')
self.contacts = {}
elif packet_type_value == PacketType.CONTACT.value:
c = {}
c["public_key"] = data[1:33].hex()
c["type"] = data[33]
c["flags"] = data[34]
c["out_path_len"] = int.from_bytes(data[35:36], signed=True)
plen = int.from_bytes(data[35:36], signed=True)
if plen == -1:
plen = 0
c["out_path"] = data[36:36+plen].hex()
c["adv_name"] = data[100:132].decode().replace("\0","")
c["last_advert"] = int.from_bytes(data[132:136], byteorder='little')
c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True)/1e6
c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True)/1e6
c["lastmod"] = int.from_bytes(data[144:148], byteorder='little')
self.contacts[c["public_key"]] = c
elif packet_type_value == PacketType.CONTACT_END.value:
await self.dispatcher.dispatch(Event(EventType.CONTACTS, self.contacts))
elif packet_type_value == PacketType.SELF_INFO.value:
self_info = {}
self_info["adv_type"] = data[1]
self_info["tx_power"] = data[2]
self_info["max_tx_power"] = data[3]
self_info["public_key"] = data[4:36].hex()
self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6
self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6
self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') / 1000
self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') / 1000
self_info["radio_sf"] = data[56]
self_info["radio_cr"] = data[57]
self_info["name"] = data[58:].decode()
await self.dispatcher.dispatch(Event(EventType.SELF_INFO, self_info))
elif packet_type_value == PacketType.MSG_SENT.value:
res = {}
res["type"] = data[1]
res["expected_ack"] = bytes(data[2:6])
res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little')
await self.dispatcher.dispatch(Event(EventType.MSG_SENT, res))
elif packet_type_value == PacketType.CONTACT_MSG_RECV.value:
res = {}
res["type"] = "PRIV"
res["pubkey_prefix"] = data[1:7].hex()
res["path_len"] = data[7]
res["txt_type"] = data[8]
res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little')
if data[8] == 2:
res["signature"] = data[13:17].hex()
res["text"] = data[17:].decode()
else:
res["text"] = data[13:].decode()
await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res))
elif packet_type_value == 16: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "PRIV"
res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4
res["pubkey_prefix"] = data[4:10].hex()
res["path_len"] = data[10]
res["txt_type"] = data[11]
res["sender_timestamp"] = int.from_bytes(data[12:16], byteorder='little')
if data[11] == 2:
res["signature"] = data[16:20].hex()
res["text"] = data[20:].decode()
else:
res["text"] = data[16:].decode()
await self.dispatcher.dispatch(Event(EventType.CONTACT_MSG_RECV, res, {"extended": True}))
elif packet_type_value == PacketType.CHANNEL_MSG_RECV.value:
res = {}
res["type"] = "CHAN"
res["channel_idx"] = data[1]
res["path_len"] = data[2]
res["txt_type"] = data[3]
res["sender_timestamp"] = int.from_bytes(data[4:8], byteorder='little')
res["text"] = data[8:].decode()
await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res))
elif packet_type_value == 17: # A reply to CMD_SYNC_NEXT_MESSAGE (ver >= 3)
res = {}
res["type"] = "CHAN"
res["SNR"] = int.from_bytes(data[1:2], byteorder='little', signed=True) * 4
res["channel_idx"] = data[4]
res["path_len"] = data[5]
res["txt_type"] = data[6]
res["sender_timestamp"] = int.from_bytes(data[7:11], byteorder='little')
res["text"] = data[11:].decode()
await self.dispatcher.dispatch(Event(EventType.CHANNEL_MSG_RECV, res, {"extended": True}))
elif packet_type_value == PacketType.CURRENT_TIME.value:
result = int.from_bytes(data[1:5], byteorder='little')
await self.dispatcher.dispatch(Event(EventType.CURRENT_TIME, result))
elif packet_type_value == PacketType.NO_MORE_MSGS.value:
await self.dispatcher.dispatch(Event(EventType.NO_MORE_MSGS, False))
elif packet_type_value == PacketType.CONTACT_SHARE.value:
result = "meshcore://" + data[1:].hex()
await self.dispatcher.dispatch(Event(EventType.CONTACT_SHARE, result))
elif packet_type_value == PacketType.BATTERY.value:
result = int.from_bytes(data[1:3], byteorder='little')
await self.dispatcher.dispatch(Event(EventType.BATTERY, result))
elif packet_type_value == PacketType.DEVICE_INFO.value:
res = {}
res["fw ver"] = data[1]
if data[1] >= 3:
res["max_contacts"] = data[2] * 2
res["max_channels"] = data[3]
res["ble_pin"] = int.from_bytes(data[4:8], byteorder='little')
res["fw_build"] = data[8:20].decode().replace("\0","")
res["model"] = data[20:60].decode().replace("\0","")
res["ver"] = data[60:80].decode().replace("\0","")
await self.dispatcher.dispatch(Event(EventType.DEVICE_INFO, res))
elif packet_type_value == PacketType.CLI_RESPONSE.value:
res = {}
res["response"] = data[1:].decode()
await self.dispatcher.dispatch(Event(EventType.CLI_RESPONSE, res))
# Push notifications
elif packet_type_value == PacketType.ADVERTISEMENT.value:
logger.debug("Advertisement received")
# todo: Read advertisement?
await self.dispatcher.dispatch(Event(EventType.ADVERTISEMENT, None))
elif packet_type_value == PacketType.PATH_UPDATE.value:
logger.debug("Code path update")
await self.dispatcher.dispatch(Event(EventType.PATH_UPDATE, None))
elif packet_type_value == PacketType.ACK.value:
logger.debug("Received ACK")
await self.dispatcher.dispatch(Event(EventType.ACK, None))
elif packet_type_value == PacketType.MESSAGES_WAITING.value:
logger.debug("Msgs are waiting")
await self.dispatcher.dispatch(Event(EventType.MESSAGES_WAITING, None))
elif packet_type_value == PacketType.RAW_DATA.value:
res = {}
res["SNR"] = data[1] / 4
res["RSSI"] = data[2]
res["payload"] = data[4:].hex()
logger.debug("Received raw data")
print(res)
await self.dispatcher.dispatch(Event(EventType.RAW_DATA, res))
elif packet_type_value == PacketType.LOGIN_SUCCESS.value:
logger.debug("Login success")
await self.dispatcher.dispatch(Event(EventType.LOGIN_SUCCESS, None))
elif packet_type_value == PacketType.LOGIN_FAILED.value:
logger.debug("Login failed")
await self.dispatcher.dispatch(Event(EventType.LOGIN_FAILED, None))
elif packet_type_value == PacketType.STATUS_RESPONSE.value:
res = {}
res["pubkey_pre"] = data[2:8].hex()
res["bat"] = int.from_bytes(data[8:10], byteorder='little')
res["tx_queue_len"] = int.from_bytes(data[10:12], byteorder='little')
res["free_queue_len"] = int.from_bytes(data[12:14], byteorder='little')
res["last_rssi"] = int.from_bytes(data[14:16], byteorder='little', signed=True)
res["nb_recv"] = int.from_bytes(data[16:20], byteorder='little', signed=False)
res["nb_sent"] = int.from_bytes(data[20:24], byteorder='little', signed=False)
res["airtime"] = int.from_bytes(data[24:28], byteorder='little')
res["uptime"] = int.from_bytes(data[28:32], byteorder='little')
res["sent_flood"] = int.from_bytes(data[32:36], byteorder='little')
res["sent_direct"] = int.from_bytes(data[36:40], byteorder='little')
res["recv_flood"] = int.from_bytes(data[40:44], byteorder='little')
res["recv_direct"] = int.from_bytes(data[44:48], byteorder='little')
res["full_evts"] = int.from_bytes(data[48:50], byteorder='little')
res["last_snr"] = int.from_bytes(data[50:52], byteorder='little', signed=True) / 4
res["direct_dups"] = int.from_bytes(data[52:54], byteorder='little')
res["flood_dups"] = int.from_bytes(data[54:56], byteorder='little')
data_hex = data[8:].hex()
logger.debug(f"Status response: {data_hex}")
await self.dispatcher.dispatch(Event(EventType.STATUS_RESPONSE, res))
elif packet_type_value == PacketType.LOG_DATA.value:
logger.debug("Received log data")
await self.dispatcher.dispatch(Event(EventType.LOG_DATA, data[1:].decode('utf-8', errors='replace')))
else:
logger.debug(f"Unhandled data received {data}")
logger.debug(f"Unhandled packet type: {packet_type_value}")

View file

@ -3,9 +3,11 @@
"""
import asyncio
import sys
import logging
import serial_asyncio
from meshcore import printerr
# Get logger
logger = logging.getLogger("meshcore")
class SerialConnection:
def __init__(self, port, baudrate):
@ -22,21 +24,20 @@ class SerialConnection:
def connection_made(self, transport):
self.cx.transport = transport
# printerr('port opened')
logger.debug('port opened')
transport.serial.rts = False # You can manipulate Serial object via transport
def data_received(self, data):
# printerr('data received')
self.cx.handle_rx(data)
def connection_lost(self, exc):
printerr('port closed')
logger.info('port closed')
def pause_writing(self):
printerr('pause writing')
logger.debug('pause writing')
def resume_writing(self):
printerr('resume writing')
logger.debug('resume writing')
async def connect(self):
"""
@ -47,11 +48,11 @@ class SerialConnection:
loop, lambda: self.MCSerialClientProtocol(self),
self.port, baudrate=self.baudrate)
printerr("Serial Connexion started")
logger.info("Serial Connection started")
return self.port
def set_mc(self, mc) :
self.mc = mc
def set_reader(self, reader) :
self.reader = reader
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
@ -69,8 +70,8 @@ class SerialConnection:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
if not self.reader is None:
asyncio.create_task(self.reader.handle_rx(self.inframe))
self.frame_started = False
self.header = b""
self.inframe = b""
@ -80,5 +81,5 @@ class SerialConnection:
async def send(self, data):
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
# printerr(f"sending pkt : {pkt}")
self.transport.write(pkt)
logger.debug(f"sending pkt : {pkt}")
self.transport.write(pkt)

View file

@ -3,8 +3,10 @@
"""
import asyncio
import sys
import logging
from meshcore import printerr
# Get logger
logger = logging.getLogger("meshcore")
class TCPConnection:
def __init__(self, host, port):
@ -22,15 +24,17 @@ class TCPConnection:
def connection_made(self, transport):
self.cx.transport = transport
logger.debug('connection established')
def data_received(self, data):
logger.debug('data received')
self.cx.handle_rx(data)
def error_received(self, exc):
printerr(f'Error received: {exc}')
logger.error(f'Error received: {exc}')
def connection_lost(self, exc):
printerr('The server closed the connection')
logger.info('The server closed the connection')
async def connect(self):
"""
@ -41,11 +45,11 @@ class TCPConnection:
lambda: self.MCClientProtocol(self),
self.host, self.port)
printerr("TCP Connexion started")
logger.info("TCP Connection started")
return self.host
def set_mc(self, mc) :
self.mc = mc
def set_reader(self, reader) :
self.reader = reader
def handle_rx(self, data: bytearray):
headerlen = len(self.header)
@ -63,8 +67,8 @@ class TCPConnection:
self.inframe = self.inframe + data
else:
self.inframe = self.inframe + data[:self.frame_size-framelen]
if not self.mc is None:
self.mc.handle_rx(self.inframe)
if not self.reader is None:
asyncio.create_task(self.reader.handle_rx(self.inframe))
self.frame_started = False
self.header = b""
self.inframe = b""
@ -74,4 +78,5 @@ class TCPConnection:
async def send(self, data):
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + data
logger.debug(f"sending pkt : {pkt}")
self.transport.write(pkt)