#!/usr/bin/python """ mccli.py : CLI interface to MeschCore BLE companion app """ import asyncio import os import sys import getopt import json import datetime import time from pathlib import Path import socket # default address is stored in a config file MCCLI_CONFIG_DIR = str(Path.home()) + "/.config/mc-cli/" MCCLI_ADDRESS = MCCLI_CONFIG_DIR + "default_address" # Fallback address if config file not found # if None or "" then a scan is performed ADDRESS = "" def printerr (str) : sys.stderr.write(str) sys.stderr.write("\n") sys.stderr.flush() class MeshCore: """ Interface to a BLE MeshCore device """ self_info={} contacts={} def __init__(self, address): """ Constructor : specify address """ self.address = address self.client = None self.rx_char = None self.time = 0 self.result = asyncio.Future() self.contact_nb = 0 self.rx_sem = asyncio.Semaphore(0) self.ack_ev = asyncio.Event() async def connect(self): """ Connects to the device """ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(('192.168.1.1', 5000)) await self.send_appstart() printerr("Connexion started") return self.address def handle_rx(self, data: bytearray): """ Callback to handle received data """ match data[0]: case 0: # ok if len(data) == 5 : # an integer self.result.set_result(int.from_bytes(data[1:5], byteorder='little')) else: self.result.set_result(True) case 1: # error self.result.set_result(False) case 2: # contact start self.contact_nb = int.from_bytes(data[1:5], byteorder='little') self.contacts={} case 3: # contact c = {} c["public_key"] = data[1:33].hex() c["type"] = data[33] c["flags"] = data[34] c["out_path_len"] = data[35] c["out_path"] = data[36:100].hex() c["adv_name"] = data[100:132].decode().replace("\0","") c["last_advert"] = int.from_bytes(data[132:136], byteorder='little') c["adv_lat"] = int.from_bytes(data[136:140], byteorder='little',signed=True) c["adv_lon"] = int.from_bytes(data[140:144], byteorder='little',signed=True) c["lastmod"] = int.from_bytes(data[144:148], byteorder='little') self.contacts[c["adv_name"]]=c case 4: # end of contacts self.result.set_result(self.contacts) case 5: # self info self.self_info["adv_type"] = data[1] self.self_info["tx_power"] = data[2] self.self_info["max_tx_power"] = data[3] self.self_info["public_key"] = data[4:36].hex() self.self_info["adv_lat"] = int.from_bytes(data[36:40], byteorder='little', signed=True)/1e6 self.self_info["adv_lon"] = int.from_bytes(data[40:44], byteorder='little', signed=True)/1e6 #self.self_info["reserved_44:48"] = data[44:48].hex() self.self_info["radio_freq"] = int.from_bytes(data[48:52], byteorder='little') self.self_info["radio_bw"] = int.from_bytes(data[52:56], byteorder='little') self.self_info["radio_sf"] = data[56] self.self_info["radio_cr"] = data[57] self.self_info["name"] = data[58:].decode() self.result.set_result(True) case 6: # msg sent res = {} res["type"] = data[1] res["expected_ack"] = bytes(data[2:6]) res["suggested_timeout"] = int.from_bytes(data[6:10], byteorder='little') self.result.set_result(res) case 7: # contact msg recv res = {} res["type"] = "PRIV" res["pubkey_prefix"] = data[1:7].hex() res["path_len"] = data[7] res["txt_type"] = data[8] res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little') res["text"] = data[13:].decode() self.result.set_result(res) case 8 : # chanel msg recv res = {} res["type"] = "CHAN" res["pubkey_prefix"] = data[1:7].hex() res["path_len"] = data[7] res["txt_type"] = data[8] res["sender_timestamp"] = int.from_bytes(data[9:13], byteorder='little') res["text"] = data[13:].decode() self.result.set_result(res) case 9: # current time self.result.set_result(int.from_bytes(data[1:5], byteorder='little')) case 10: # no more msgs self.result.set_result(False) # push notifications case 0x80: printerr ("Advertisment received") case 0x81: printerr ("Code path update") case 0x82: self.ack_ev.set() printerr ("Received ACK") case 0x83: self.rx_sem.release() printerr ("Msgs are waiting") # unhandled case _: printerr (f"Unhandled data received {data}") async def send(self, data, timeout = 5): """ Helper function to synchronously send (and receive) data to the node """ self.result = asyncio.Future() self.sock.send(data) frame = self.sock.recv(1000) self.handle_rx(frame) try: res = await asyncio.wait_for(self.result, timeout) return res except TimeoutError : printerr ("Timeout ...") return False async def send_appstart(self): """ Send APPSTART to the node """ b1 = bytearray(b'\x01\x03 mccli') return await self.send(b1) async def send_advert(self): """ Make the node send an advertisement """ return await self.send(b"\x07") async def set_name(self, name): """ Changes the name of the node """ return await self.send(b'\x08' + name.encode("ascii")) async def get_time(self): """ Get the time (epoch) of the node """ self.time = await self.send(b"\x05") return self.time async def set_time(self, val): """ Sets a new epoch """ return await self.send(b"\x06" + int(val).to_bytes(4, 'little')) async def get_contacts(self): """ Starts retreiving contacts """ return await self.send(b"\x04") async def send_msg(self, dst, msg): """ Send a message to a node """ timestamp = (await self.get_time()).to_bytes(4, 'little') data = b"\x02\x00\x00" + timestamp + dst + msg.encode("ascii") self.ack_ev.clear() return await self.send(data) async def get_msg(self): """ Get message from the node (stored in queue) """ res = await self.send(b"\x0A", 1) if res is False : self.rx_sem=asyncio.Semaphore(0) # reset semaphore as there are no msgs in queue return res async def wait_msg(self): """ Wait for a message """ await self.rx_sem.acquire() async def wait_ack(self): """ Wait ack """ await self.ack_ev.wait() async def next_cmd(mc, cmds): """ process next command """ argnum = 0 match cmds[0] : case "get_time" : timestamp = await mc.get_time() print('Current time :' f' {datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")}' f' ({timestamp})') case "sync_time" : print(await mc.set_time(int(time.time()))) case "set_time" : argnum = 1 print(await mc.set_time(cmds[1])) case "send" : argnum = 2 print(await mc.send_msg(bytes.fromhex(cmds[1]), cmds[2])) case "sendto" : # sends to a contact from name argnum = 2 await mc.get_contacts() print(await mc.send_msg(bytes.fromhex(mc.contacts[cmds[1]]["public_key"])[0:6], cmds[2])) case "contacts" : print(json.dumps(await mc.get_contacts(),indent=4)) case "recv" : print(await mc.get_msg()) case "sync_msgs" : res=True while res: res = await mc.get_msg() print (res) case "wait_msg" : await mc.wait_msg() res = await mc.get_msg() print (res) case "wait_ack" : print (await mc.wait_ack()) case "infos" : print(json.dumps(mc.self_info,indent=4)) case "advert" : print(await mc.send_advert()) case "set_name" : argnum = 1 print(await mc.set_name(cmds[1])) case "sleep" : argnum = 1 await asyncio.sleep(int(cmds[1])) printerr (f"cmd {cmds[0:argnum+1]} processed ...") return cmds[argnum+1:] def usage () : """ Prints some help """ print("""mccli.py : CLI interface to MeschCore BLE companion app Usage : mccli.py Arguments : -h : prints this help -a
: specifies device address (can be a name) -d : filter meshcore devices with name or address -s : forces ble scan for a MeshCore device Available Commands (can be chained) : infos : print informations about the node send : sends msg to the node with pubkey starting by key sendto : sends msg to the node with given name wait_ack : wait an ack for last sent msg recv : reads next msg sync_msgs : gets all unread msgs from the node wait_msg : wait for a message advert : sends advert contacts : gets contact list sync_time : sync time with system set_time : sets time to given epoch get_time : gets current time set_name : sets node name sleep : sleeps for a given amount of secs""") async def main(argv): """ Do the job """ address = ADDRESS # If there is an address in config file, use it by default # unless an arg is explicitely given if os.path.exists(MCCLI_ADDRESS) : with open(MCCLI_ADDRESS, encoding="utf-8") as f : address = f.readline().strip() opts, args = getopt.getopt(argv, "a:d:sh") for opt, arg in opts : match opt: case "-d" : # name specified on cmdline address = arg case "-a" : # address specified on cmdline address = arg case "-s" : # explicitely ask to scan address address = None if len(args) == 0 : # no args, no action usage() return mc = MeshCore(address) await mc.connect() cmds = args while len(cmds) > 0 : cmds = await next_cmd(mc, cmds) asyncio.run(main(sys.argv[1:]))