From 6f364b8e6a4b9e1096ac3093ee363d4e2e8bbf85 Mon Sep 17 00:00:00 2001 From: Bastian Schroll Date: Sun, 20 Oct 2019 17:11:20 +0200 Subject: [PATCH] fix server client issues - add test for server stops until clients connected - add header for data packets - check header on read and read only data size in - make readings non blocking --- boswatch/network/client.py | 27 ++++++++++++++----- boswatch/network/server.py | 43 ++++++++++++++++++++++-------- bw_client.py | 6 ++--- test/boswatch/test_ServerClient.py | 9 +++++++ 4 files changed, 65 insertions(+), 20 deletions(-) diff --git a/boswatch/network/client.py b/boswatch/network/client.py index 7f9417a..b045d24 100644 --- a/boswatch/network/client.py +++ b/boswatch/network/client.py @@ -16,9 +16,12 @@ """ import logging import socket +import select logging.debug("- %s loaded", __name__) +HEADERSIZE = 10 + class TCPClient: """!TCP client class""" @@ -61,7 +64,7 @@ class TCPClient: self._sock = None logging.debug("disconnected") return True - logging.warning("client not connected") + logging.warning("client always disconnected") return True except AttributeError: logging.error("cannot disconnect - no connection established") @@ -74,7 +77,8 @@ class TCPClient: @return True or False""" try: logging.debug("transmitting: %s", data) - self._sock.sendall(bytes(data + "\n", "utf-8")) + header = str(len(data)).ljust(HEADERSIZE) + self._sock.sendall(bytes(header + data, "utf-8")) logging.debug("transmitted...") return True except AttributeError: @@ -88,8 +92,17 @@ class TCPClient: @return received data""" try: - received = str(self._sock.recv(1024), "utf-8") - logging.debug("received: %s", received) + if not self._sock: # check if socket is still available + return False + read, _, _ = select.select([self._sock], [], [], 1) + if not read: # check if there is something to read + return False + header = self._sock.recv(HEADERSIZE) + if not len(header): # check if there data + return False + length = int(header.decode("utf-8").strip()) + received = self._sock.recv(length).decode("utf-8") + logging.debug("received %d bytes: %s", length, received) return received except AttributeError: logging.error("cannot receive - no connection established") @@ -104,8 +117,10 @@ class TCPClient: """!Property of client connected state""" if self._sock: try: - self._sock.sendall(bytes("", "utf-8")) + aliveMsg = "" + header = str(len(aliveMsg)).ljust(HEADERSIZE) + self._sock.sendall(bytes(header + aliveMsg, "utf-8")) return True - except AttributeError: + except (AttributeError, BrokenPipeError): pass return False diff --git a/boswatch/network/server.py b/boswatch/network/server.py index 3001725..04c1fa5 100644 --- a/boswatch/network/server.py +++ b/boswatch/network/server.py @@ -15,14 +15,18 @@ @description: Class implementation for a threaded TCP socket server """ import logging +import socket import socketserver import threading import time +import select logging.debug("- %s loaded", __name__) +HEADERSIZE = 10 -class _ThreadedTCPRequestHandler(socketserver.ThreadingMixIn, socketserver.BaseRequestHandler): + +class _ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): """!ThreadedTCPRequestHandler class for our TCPServer class.""" def handle(self): @@ -33,22 +37,36 @@ class _ThreadedTCPRequestHandler(socketserver.ThreadingMixIn, socketserver.BaseR self.server.clientsConnected[threading.current_thread().name] = {"address": self.client_address[0], "timestamp": time.time()} logging.info("Client connected: %s", self.client_address[0]) - data = 1 # to enter while loop cur_thread = threading.current_thread().name req_name = str(cur_thread) + " " + self.client_address[0] try: - while data: - data = str(self.request.recv(1024).strip(), 'utf-8') - if data != "": - logging.debug("%s recv: %s", req_name, data) + while self.server.isActive: + read, _, _ = select.select([self.request], [], [], 0.5) + if not read: + continue # nothing to read on the socket - # add a new entry and the decoded data dict as an string in utf-8 and an timestamp - self.server.alarmQueue.put_nowait((self.client_address[0], data, time.time())) # queue is threadsafe - logging.debug("Add data to queue") + header = self.request.recv(HEADERSIZE) + if not len(header): + break # empty data -> socked closed - logging.debug("%s send: [ack]", req_name) - self.request.sendall(bytes("[ack]", "utf-8")) + length = int(header.decode("utf-8").strip()) + data = self.request.recv(length).decode("utf-8") + + if data == "": + continue + + logging.debug("%s recv %d bytes: %s", req_name, length, data) + + # add a new entry and the decoded data dict as an string in utf-8 and an timestamp + self.server.alarmQueue.put_nowait((self.client_address[0], data, time.time())) # queue is threadsafe + logging.debug("Add data to queue") + + logging.debug("%s send: [ack]", req_name) + + data = "[ack]" + header = str(len(data)).ljust(HEADERSIZE) + self.request.sendall(bytes(header + data, "utf-8")) self.request.close() except (ConnectionResetError, ConnectionAbortedError): # pragma: no cover @@ -96,9 +114,11 @@ class TCPServer: @return True or False""" if not self.isRunning: try: + socketserver.TCPServer.allow_reuse_address = True self._server = _ThreadedTCPServer(("", port), _ThreadedTCPRequestHandler) self._server.timeout = self._timeout self._server.alarmQueue = self._alarmQueue + self._server.isActive = True self._server.clientsConnectedLock = self._clientsConnectedLock self._server.clientsConnected = self._clientsConnected @@ -122,6 +142,7 @@ class TCPServer: @return True or False""" if self.isRunning: self._server.shutdown() + self._server.isActive = False self._server_thread.join() self._server_thread = None self._server = None diff --git a/bw_client.py b/bw_client.py index d9644db..eb96ca7 100644 --- a/bw_client.py +++ b/bw_client.py @@ -83,7 +83,7 @@ try: inputThreadRunning = True # ========== INPUT CODE ========== - def handleSDRInput(dataQueue, sdrConfig, decoderConfig): + def handleSDRInput(dataQueue, sdrConfig, decoderConfig): # todo exception handling inside sdrProc = ProcessManager("/usr/bin/rtl_fm") sdrProc.addArgument("-d " + str(sdrConfig.get("device", default="0"))) # device id sdrProc.addArgument("-f " + sdrConfig.get("frequency")) # frequencies @@ -111,7 +111,7 @@ try: mmProc.addArgument("-t raw -") mmProc.setStdin(sdrProc.stdout) mmProc.start() - mmProc.skipLinesUntil("Enabled Demodulators:") + mmProc.skipLinesUntil("Available demodulators:") logging.info("start decoding") while inputThreadRunning: @@ -122,7 +122,7 @@ try: elif not mmProc.isRunning: logging.warning("multimon was down - try to restart") mmProc.start() - mmProc.skipLinesUntil("Enabled Demodulators:") # last line from mm before data + mmProc.skipLinesUntil("Available demodulators:") # last line from mm before data elif sdrProc.isRunning and mmProc.isRunning: line = mmProc.readline() if line: diff --git a/test/boswatch/test_ServerClient.py b/test/boswatch/test_ServerClient.py index f46182e..660e337 100644 --- a/test/boswatch/test_ServerClient.py +++ b/test/boswatch/test_ServerClient.py @@ -160,6 +160,15 @@ def test_serverDoubleStart(): assert testServer2.stop() +def test_serverStopsWhileConnected(getRunningServer, getClient): + """!Shutdown server while client is connected""" + getClient.connect() + getRunningServer.stop() + time.sleep(1) + assert getClient.isConnected # todo check why the first always return true here + assert not getClient.isConnected + + @pytest.mark.skip("needs fixture for more than one client") def test_serverGetOutput(getRunningServer): """!Send data to server with 2 clients, check '[ack]' and data on server queue"""