diff --git a/meshcore_multitcp.py b/meshcore_multitcp.py index 351a51c..d4cb89b 100755 --- a/meshcore_multitcp.py +++ b/meshcore_multitcp.py @@ -11,14 +11,17 @@ import socket import os import sys import signal - import argparse import traceback import json import time +import sqlite3 from typing import Any, Dict from meshcore_multitcp_packets import BinaryReqType, PacketType, CmdPacketType +APP = "MeshCore multitcp-proxy" +VERSION = "1.1" + import logging FORMAT = '%(asctime)-15s %(levelname)-10s %(message)s' logging.basicConfig(format=FORMAT, level=logging.INFO) @@ -27,18 +30,27 @@ logger = logging.getLogger(__name__) client_host = "0.0.0.0" client_port = 5000 +forward_host = "0.0.0.0" +forward_port = 5001 + device_host = "192.168.5.62" device_port = 5000 +sqlite_active = False +sqlite_file = 'meshcore_multitcp.db' + clients = [] client_names = {} threads = [] +client_fwd = {} +client_fwd_active = {} stop_device = False device = None #ignore_packets_device = [PacketType.BATTERY.value, PacketType.ADVERTISEMENT.value] -ignore_packets_device = [PacketType.NO_MORE_MSGS.value, PacketType.OK.value, PacketType.BATTERY.value, PacketType.DEVICE_INFO.value, PacketType.LOG_DATA.value] +#ignore_packets_device = [PacketType.NO_MORE_MSGS.value, PacketType.OK.value, PacketType.BATTERY.value, PacketType.DEVICE_INFO.value, PacketType.LOG_DATA.value] +ignore_packets_device = [PacketType.OK.value, PacketType.BATTERY.value, PacketType.DEVICE_INFO.value, PacketType.LOG_DATA.value] #ignore_packets_app = [CmdPacketType.GET_BATT_AND_STORAGE.value] ignore_packets_app = [] @@ -57,6 +69,70 @@ contact_nb = 0 contacts = {} channels = {} +def db_create(): + global sqlite_file + + logger.info(f"SQLITE: creating {sqlite_file} ...") + con = sqlite3.connect(sqlite_file) + cur = con.cursor() + cur.execute('CREATE TABLE packets (timestamp INTEGER, type INTEGER, data BLOB)') + cur.execute('CREATE TABLE clients (hash TEXT, ip TEXT, client TEXT, last_timestamp INTEGER)') + con.commit() + con.close() + +def db_store_packet(packet_timestamp, packet_type, data): + global sqlite_file + + logger.debug(f"SQLITE: storing message to {sqlite_file} ...") + con = sqlite3.connect(sqlite_file) + cur = con.cursor() + cur.execute(f"INSERT INTO packets (timestamp, type, data) VALUES ({packet_timestamp}, {packet_type}, ?)", (data,)) + con.commit() + con.close() + +def db_load_packet(ip,clientname): + global sqlite_file, client_fwd + + if client_fwd[ip]: + logger.debug(f"SQLITE: {ip} ({clientname}) messages already synced") + return False + + logger.info(f"SQLITE: restoring data from {sqlite_file} for {clientname}@{ip}...") + + con = sqlite3.connect(sqlite_file) + con.text_factory = bytes + cur = con.cursor() + res = cur.execute(f"SELECT last_timestamp FROM clients WHERE ip = ? AND client = ?",(ip,clientname)) + row = res.fetchone() + if row: + timestamp = row[0] + else: + cur.execute(f"INSERT INTO clients (hash, ip, client, last_timestamp) VALUES ('',?,?,0)",(ip,clientname)) + con.commit() + timestamp = 0 + + logger.debug(f"SQLITE: {ip} ({clientname}) requests stored messages after {timestamp}") + res = cur.execute(f"SELECT data FROM packets WHERE timestamp > {timestamp}") + rows = res.fetchall() + + data = [] + for row in rows: + data.append(row[0]) + + con.close() + return data + +def db_set_client_time(ip,clientname): + global sqlite_file + + logger.debug(f"SQLITE: set timestamp for {clientname}@{ip} ...") + + con = sqlite3.connect(sqlite_file) + cur = con.cursor() + cur.execute(f"UPDATE clients SET last_timestamp = ? WHERE client = ? AND ip = ?", (time.time(),clientname,ip)) + con.commit() + con.close() + def get_ip(sock_handle): try: host, port = sock_handle.getpeername() @@ -73,6 +149,21 @@ def get_ip_port(sock_handle): logger.error(f"[get_ip_port] while getpeername()") return False +def get_ip_client_name(sock_handle): + global client_names + try: + host, port = sock_handle.getpeername() + except: + logger.error(f"[get_ip_client_name] while getpeername()") + return False + + try: + client_name = host+' ('+client_names[host+':'+str(port)]+')' + except: + client_name = host+':'+str(port) + + return client_name + def get_client_name(sock_handle): global client_names try: @@ -82,7 +173,7 @@ def get_client_name(sock_handle): return False try: - client_name = host+' ('+client_names[host+':'+str(port)]+')' + client_name = client_names[host+':'+str(port)] except: client_name = host+':'+str(port) @@ -100,6 +191,7 @@ def handle_app_data(data: bytearray): logger.debug(f"[handle_app_data] header: {frame_header} size: {frame_size} data: {frame_data}") packet_type_value = data[3] + if packet_type_value in ignore_packets_app: logger.debug(f"[handle_app_data] packet {CmdPacketType(packet_type_value).name} ignored") return False @@ -123,17 +215,21 @@ def handle_device_data(data: bytearray): frame_size = int.from_bytes(frame_header[1:], byteorder="little") frame_data = data[3:] logger.debug(f"[handle_device_data] header: {frame_header} size: {frame_size} data: {frame_data}") - + packet_type_value = data[3] logger.debug(f"[parse_device_data] raw-data: {data.hex()}") - + if packet_type_value in ignore_packets_device: logger.debug(f"[parse_device_data] packet {PacketType(packet_type_value).name} ignored") return False - + if PacketType.exists(packet_type_value): packet_type = PacketType(packet_type_value).name logger.debug(f"[parse_device_data] packet {packet_type} found ...") + + if packet_type_value in [7,8,16,17] and sqlite_active: + db_store_packet(time.time(),packet_type_value,data) + return packet_type elif len(data) == 0: @@ -211,13 +307,15 @@ def device_handle(): message_type = handle_device_data(message) if message_type == -1: - print('device lost') raise socket.error if message_type and message_type != 'NONE': logger.info(f"DEVICE {message_type}") client_forward(message, message_type) + + if stop_device: + return except socket.error: logger.error(f"DEVICE: lost connection on read {get_ip(device)}") @@ -251,67 +349,124 @@ def device_write(message): sys.exit(1) def client_forward(message, message_type): + global client_fwd, client_fwd_active + for client in clients: + ip = get_ip(client) + ipclientname = get_ip_client_name(client) if message_type == -1: - logger.debug(f"CLIENT {get_client_name(client)} lost connection on Forward") + logger.debug(f"CLIENT {ipclientname} lost connection on Forward") raise socket.error elif message_type and message_type != 'NONE': - logger.debug(f"CLIENT {get_client_name(client)} Forward {message_type}") + logger.debug(f"CLIENT {ipclientname} Forward {message_type}") else: - logger.debug(f"CLIENT {get_client_name(client)} Forward raw-data: {message}") + logger.debug(f"CLIENT {ipclientname} Forward raw-data: {message}") try: + count_wait = 0 + while client_fwd_active[ip]: + count_wait += 1 + logger.debug(f"CLIENT {ipclientname} Forward is waiting while syncing stored messages ({count_wait}/3) ...") + + if count_wait == 3: + client_fwd[ip] = True + client_fwd_active[ip] = False + logger.info(f"CLIENT {ipclientname} possible crash of message-sync! forward anyway ...") + + time.sleep(5) + client.send(message) except (socket.error, BrokenPipeError): - logger.error(f"CLIENT {get_client_name(client)} lost connection on write") + logger.error(f"CLIENT {get_ip_client_name(client)} lost connection on write") if client in clients: index = clients.index(client) clients.remove(client) client.close() continue -def client_receive(client): - while True: +def client_receive(client, forwarding = False): + global client_fwd, client_fwd_active, stop_device + + while not stop_device: try: + ip = get_ip(client) + ipclientname = get_ip_client_name(client) + message = client.recv(1024) message_type = handle_app_data(message) if message_type == -1: - logger.debug(f"CLIENT {get_client_name(client)} lost connection on send") + logger.debug(f"CLIENT {ipclientname} lost connection on send") raise socket.error elif message_type: - logger.info(f"CLIENT {get_client_name(client)} Sent {message_type}") + logger.info(f"CLIENT {ipclientname} Sent {message_type}") else: - logger.debug(f"CLIENT {get_client_name(client)} Sent raw-data: {message}") + logger.debug(f"CLIENT {ipclientname} Sent raw-data: {message}") if message_type == CmdPacketType.APP_START.name: app_ver = message[4] app_name = message[11:].decode('utf-8') client_names[get_ip_port(client)] = app_name logger.debug(f"CLIENT {get_ip(client)} app_name: {app_name}") + + if message_type == CmdPacketType.SYNC_NEXT_MESSAGE.name and not client_fwd[ip] and forwarding: + logger.debug(f"CLIENT {ipclientname} starting message-sync ...") + client_fwd_active[ip] = True + + data = db_load_packet(ip, get_client_name(client)) + if data: + msg_count = len(data) + logger.info(f"CLIENT {ipclientname} Forward {msg_count} stored messages") + for packet in data: + logger.debug(f"CLIENT {ipclientname} Forward stored data: {packet}") + client.send(packet) + + message = client.recv(1024) + logger.debug(f"CLIENT {ipclientname} responds: {message}") + else: + logger.info(f"CLIENT {ipclientname} no stored messages to forward") + + client_fwd[ip] = True + client_fwd_active[ip] = False + logger.info(f"CLIENT {ipclientname} finished message-sync") + + if client_fwd[ip] and forwarding: + db_set_client_time(ip, get_client_name(client)) device_write(message) except (socket.error, BrokenPipeError): - logger.error(f"CLIENT {get_client_name(device)} lost connection on read") + logger.error(f"CLIENT {get_ip_client_name(device)} lost connection on read") if client in clients: index = clients.index(client) clients.remove(client) client.close() break -def client_connect(): - global clients, client_names, threads +def client_connect(server_sock,forwarding = False): + global clients, client_names, threads, client_fwd, stop_device - while True: + if forwarding: + info = 'for store-forward ' + else: + info = '' + + while not stop_device: + ip, port = server_sock.getsockname() + logger.info(f"CLIENT: listening at {ip}:{port} {info}...") + try: - client, address = server.accept() - logger.info(f"CLIENT: new connection from {get_ip(client)}") + client, address = server_sock.accept() + ip = get_ip(client) + logger.info(f"CLIENT: new connection from {ip}") clients.append(client) - thread = threading.Thread(target=client_receive, args=(client,), name="CLIENT "+get_ip(client)) + client_fwd[ip] = False + client_fwd_active[ip] = False + + thread = threading.Thread(target=client_receive, args=(client, forwarding, ), name="CLIENT "+ip) thread.start() threads.append(thread) @@ -327,8 +482,6 @@ def client_connect(): def device_state(running_threads): while True: - #print(running_threads) - for thread in running_threads: if thread.name == "DEVICE_HANDLE": print('device_state',thread.is_alive()) @@ -340,7 +493,10 @@ def device_state(running_threads): parser = argparse.ArgumentParser(description='TCP meshcore proxy') parser.add_argument('-s', '--server', required=True, help='Server IP and port, i.e.: 127.0.0.1:5000') +parser.add_argument('-f', '--forward', required=False, help='Server IP and port for clients using message storage, i.e.: 127.0.0.1:5001') parser.add_argument('-d', '--device', required=True, help='Device IP and port, i.e.: 127.0.0.2:5000') +parser.add_argument('-sql', '--sqlite', action='store_true', help='enable SQLite-function') +parser.add_argument('-ver', '--version', action='store_true', help='shows current app-version') output_group = parser.add_mutually_exclusive_group() output_group.add_argument('-q', '--quiet', action='store_true', help='minimal logging to CLI') output_group.add_argument('-v', '--verbose', action='store_true', help='maximum logging to CLI') @@ -349,12 +505,25 @@ args = parser.parse_args() client_host, client_port = ip_to_tuple(args.server) device_host, device_port = ip_to_tuple(args.device) +if args.forward: + forward_host, forward_port = ip_to_tuple(args.forward) + +logger.info(f"{APP} v{VERSION}") + +if args.version: + sys.exit() if args.quiet: logger.setLevel(logging.CRITICAL) if args.verbose: logger.setLevel(logging.DEBUG) +if args.sqlite: + logger.info('SQLITE: enable message storage') + sqlite_active = True -logger.info('Welcome') +if sqlite_active: + logger.debug(f"SQLITE: using local database file at {sqlite_file} ...") + if not os.path.exists(sqlite_file): + db_create() if logger.getEffectiveLevel() == logging.DEBUG: logger.info('LOGLEVEL is set to DEBUG') @@ -362,12 +531,24 @@ if logger.getEffectiveLevel() == logging.DEBUG: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((client_host,client_port)) -logger.info(f"CLIENT: listening at {client_host}:{client_port} ...") +#logger.info(f"CLIENT: listening at {client_host}:{client_port} ...") server.listen() +if sqlite_active: + server2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server2.bind((forward_host,forward_port)) + server2.listen() + device_handle_thread = threading.Thread(target=device_handle, name="DEVICE_HANDLE") device_handle_thread.start() threads.append(device_handle_thread) +if sqlite_active: + logger.debug(f"CLIENT: waiting for forward-clients to connect ...") + forward_handle_thread = threading.Thread(target=client_connect, args=(server2,True,), kwargs={}, name="FORWARD_HANDLE") + threads.append(forward_handle_thread) + forward_handle_thread.start() + logger.debug(f"CLIENT: waiting for clients to connect ...") -client_connect() +client_connect(server)