From 5c6039ef71da515e8f815ed449d6fdcbdaea6d7f Mon Sep 17 00:00:00 2001 From: Bastian Schroll Date: Thu, 20 Sep 2018 23:35:56 +0200 Subject: [PATCH 1/2] rework server class (added python queue support) --- boswatch/network/client.py | 9 +++-- boswatch/network/server.py | 69 +++++++++++++----------------------- bw_client.py | 2 +- bw_server.py | 18 +++++----- plugins/template/template.py | 2 +- test/test_ServerClient.py | 45 +++++++++++++---------- 6 files changed, 69 insertions(+), 76 deletions(-) diff --git a/boswatch/network/client.py b/boswatch/network/client.py index c0eac72..637a4ff 100644 --- a/boswatch/network/client.py +++ b/boswatch/network/client.py @@ -27,7 +27,10 @@ class TCPClient: """!Create a new instance Create a new instance of an TCP Client. - And set the timeout""" + And set the timeout + + @param timeout: client timeout in sec (3) + """ try: self._sock = None self._timeout = timeout @@ -37,7 +40,7 @@ class TCPClient: def connect(self, host="localhost", port=8080): """!Connect to the server - @param host: Server IP address (localhost) + @param host: Server IP address ("localhost") @param port: Server Port (8080) @return True or False""" try: @@ -98,7 +101,7 @@ class TCPClient: @return received data""" try: received = str(self._sock.recv(1024), "utf-8") - logging.debug("received: %d", received) + logging.debug("received: %s", received) return received except AttributeError: logging.error("cannot receive - no connection established") diff --git a/boswatch/network/server.py b/boswatch/network/server.py index 9dd6567..e59c177 100644 --- a/boswatch/network/server.py +++ b/boswatch/network/server.py @@ -21,17 +21,13 @@ import time logging.debug("- %s loaded", __name__) -# module wide global list for received data sets -_dataPackets = [] -_lockDataPackets = threading.Lock() - # module wide global list for all currently connected clients _clients = {} # _clients[ThreadName] = {"address", "timestamp"} _lockClients = threading.Lock() -class TCPHandler(socketserver.BaseRequestHandler): - """!RequestHandler class for our TCPServer class.""" +class ThreadedTCPRequestHandler(socketserver.ThreadingMixIn, socketserver.BaseRequestHandler): + """!ThreadedTCPRequestHandler class for our TCPServer class.""" def handle(self): """!Handles the request from an single client in a own thread @@ -51,10 +47,8 @@ class TCPHandler(socketserver.BaseRequestHandler): if data != "": logging.debug("%s recv: %s", req_name, data) - # add a new entry at first position (index 0) with client IP - # and the decoded data dict as an string in utf-8 and an timestamp - with _lockDataPackets: - _dataPackets.insert(0, (self.client_address[0], data, time.time())) # time() to calc time in queue + # add a new entry and the decoded data dict as an string in utf-8 and an timestamp + self.server.alarmQueue.put_nowait((self.client_address[0], data, time.time())) logging.debug("Add data to queue") logging.debug("%s send: [ack]", req_name) @@ -71,14 +65,24 @@ class TCPHandler(socketserver.BaseRequestHandler): logging.info("Client disconnected: %s", self.client_address[0]) -class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + """!ThreadedTCPServer class for our TCPServer class.""" + pass + + +class TCPServer: """!TCP server class""" - def __init__(self, timeout=3): - """!Create a new instance""" + def __init__(self, alarmQueue, timeout=3): + """!Create a new instance + + @param alarmQueue: python queue instance + @param timeout: server timeout in sec (3) + """ self._server = None self._server_thread = None self._timeout = timeout + self._alarmQueue = alarmQueue def start(self, port=8080): """!Start a threaded TCP socket server @@ -91,10 +95,9 @@ class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): @return True or False""" try: - self._server = socketserver.ThreadingTCPServer(("", port), TCPHandler) + self._server = ThreadedTCPServer(("", port), ThreadedTCPRequestHandler) self._server.timeout = self._timeout - - self.flushQueue() + self._server.alarmQueue = self._alarmQueue self._server_thread = threading.Thread(target=self._server.serve_forever) self._server_thread.name = "Thread-BWServer" @@ -115,7 +118,9 @@ class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): try: self._server.shutdown() self._server_thread.join() + self._server_thread = None self._server.socket.close() + self._server = None logging.debug("TCPServer stopped") return True except AttributeError: @@ -135,33 +140,9 @@ class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): @staticmethod def getClientsConnected(): - # todo insert comment + """!List of currently connected Clients + _clients[ThreadName] = {"address", "timestamp"} + + @return List of connected clients""" # todo return full list or write a print/debug method? return _clients - - @staticmethod - def getDataFromQueue(): - """!Function to get the data packages from server - must be polled by main program - - @return Next data packet.py from intern queue""" - if _dataPackets: - with _lockDataPackets: - message = _dataPackets.pop() - logging.debug("Get data from queue") - return message - return None - - @staticmethod - def countPacketsInQueue(): - """!Get packets waiting in queue - - @return Packets in queue""" - return len(_dataPackets) # no lock needed - only reading - - @staticmethod - def flushQueue(): - """!To flush all existing data in queue""" - logging.debug("Flush data queue") - with _lockDataPackets: - _dataPackets.clear() diff --git a/bw_client.py b/bw_client.py index 29fee0f..05d3873 100644 --- a/bw_client.py +++ b/bw_client.py @@ -96,7 +96,7 @@ try: failedTransmits = 0 while not bwClient.receive() == "[ack]": # wait for ack or timeout if failedTransmits >= 3: - logging.error("cannot transmit after 5 retires") + logging.error("cannot transmit after 3 retires") break failedTransmits += 1 logging.warning("attempt %d to resend packet", failedTransmits) diff --git a/bw_server.py b/bw_server.py index f9853dd..6b215d6 100644 --- a/bw_server.py +++ b/bw_server.py @@ -41,7 +41,7 @@ try: import time import sys import threading - import threading + import queue logging.debug("Import BOSWatch modules") from boswatch.config import Config @@ -111,12 +111,13 @@ try: # # t1 = threading.Timer(1, eins) # t2 = threading.Timer(5, zwei) - t3 = threading.Timer(15, drei) + # t3 = threading.Timer(15, drei) # t1.start() # t2.start() - t3.start() + # t3.start() - bwServer = TCPServer(bwConfig.getInt("Server", "PORT")) + incomingQueue = queue.Queue() + bwServer = TCPServer(incomingQueue) if bwServer.start(): while 1: @@ -126,7 +127,7 @@ try: packetsOld = 0 while serverPaused is True: time.sleep(0.2) # reduce cpu load (run all 200ms) - packetsNew = bwServer.countPacketsInQueue() + packetsNew = incomingQueue.qsize() if packetsNew is not packetsOld: logging.debug("%s packet(s) waiting in queue", packetsNew) packetsOld = packetsNew @@ -136,13 +137,13 @@ try: logging.warning("Server stop flag received ...") break - if not bwServer.countPacketsInQueue(): # pause only when no data + if incomingQueue.empty(): # pause only when no data time.sleep(0.1) # reduce cpu load (run all 100ms) - data = bwServer.getDataFromQueue() + data = incomingQueue.get() if data is not None: logging.info("get data from %s (waited in queue %0.3f sec.)", data[0], time.time() - data[2]) - logging.debug("%s packet(s) waiting in queue", bwServer.countPacketsInQueue()) + logging.debug("%s packet(s) waiting in queue", incomingQueue.qsize()) bwPacket = Packet((data[1])) if not bwDoubleFilter.filter(bwPacket): @@ -156,6 +157,7 @@ try: bwPluginManager.runAllPlugins(bwPacket) # print(bwPacket.get("clientVersion")["major"]) + incomingQueue.task_done() except KeyboardInterrupt: # pragma: no cover logging.warning("Keyboard interrupt") diff --git a/plugins/template/template.py b/plugins/template/template.py index f570a52..de1b78a 100644 --- a/plugins/template/template.py +++ b/plugins/template/template.py @@ -30,7 +30,7 @@ class BoswatchPlugin(Plugin): def __init__(self): """!Do not change anything here except the PLUGIN NAME in the super() call""" # PLEASE SET YOU PLUGIN NAME HERE !!!! - Plugin.__init__("template") + super().__init__("template") def onLoad(self): """!Called by import of the plugin""" diff --git a/test/test_ServerClient.py b/test/test_ServerClient.py index 48b030c..6e59d7f 100644 --- a/test/test_ServerClient.py +++ b/test/test_ServerClient.py @@ -17,6 +17,7 @@ import pytest import logging import time +import queue from boswatch.network.server import TCPServer from boswatch.network.client import TCPClient @@ -31,11 +32,19 @@ class Test_ServerClient: @pytest.fixture(scope="function") def useServer(self): """!Start and serve the sever for each functions where useServer is given""" - self.testServer = TCPServer() + self.dataQueue = queue.Queue() + self.testServer = TCPServer(self.dataQueue) + logging.debug("start server") assert self.testServer.start() time.sleep(0.1) # wait for server - yield self.testServer # server to all test where useServer is given - assert self.testServer.stop() + # serv the instances - created in self context + yield 1 + try: + logging.debug("stop server") + self.testServer.stop() + except: + logging.warning("server still stopped") + time.sleep(0.1) # wait for server def test_clientConnectFailed(self): @@ -80,7 +89,7 @@ class Test_ServerClient: assert self.testClient2.connect() time.sleep(0.1) # wait for all clients connected # check connected clients - assert useServer.countClientsConnected() == 2 + assert self.testServer.countClientsConnected() == 2 # disconnect all assert self.testClient1.disconnect() assert self.testClient2.disconnect() @@ -111,29 +120,28 @@ class Test_ServerClient: assert self.testClient2.receive() == "[ack]" assert self.testClient1.receive() == "[ack]" # check server msg queue - assert useServer.countPacketsInQueue() == 3 + assert self.dataQueue.qsize() == 3 # disconnect all assert self.testClient1.disconnect() assert self.testClient2.disconnect() assert self.testClient3.disconnect() - def test_serverRestart(self): - """!Test a restart of the server""" - self.testServer = TCPServer() - assert self.testServer.start() + def test_serverRestart(self, useServer): + """!Test a stop and restart of the server""" assert self.testServer.stop() assert self.testServer.start() assert self.testServer.stop() - def test_serverStopFailed(self): - """!Test to start the server twice""" - self.testServer = TCPServer() + def test_serverStopFailed(self, useServer): + """!Test to stop a stopped server""" + assert self.testServer.stop() assert not self.testServer.stop() def test_serverDoubleStart(self): """!Test to start the server twice""" - self.testServer1 = TCPServer() - self.testServer2 = TCPServer() + self.dataQueue = queue.Queue() + self.testServer1 = TCPServer(self.dataQueue) + self.testServer2 = TCPServer(self.dataQueue) assert self.testServer1.start() assert not self.testServer2.start() assert self.testServer1.stop() @@ -147,7 +155,6 @@ class Test_ServerClient: self.testClient2 = TCPClient() assert self.testClient2.connect() # send all - useServer.flushQueue() assert self.testClient1.transmit("test1") time.sleep(0.1) # wait for recv to prevent fail of false order assert self.testClient2.transmit("test2") @@ -155,10 +162,10 @@ class Test_ServerClient: assert self.testClient1.receive() == "[ack]" assert self.testClient2.receive() == "[ack]" # _check server output data - assert useServer.countPacketsInQueue() == 2 - assert useServer.getDataFromQueue()[1] == "test1" - assert useServer.getDataFromQueue()[1] == "test2" - assert useServer.getDataFromQueue() is None # Last _check must be None + assert self.dataQueue.qsize() == 2 + assert self.dataQueue.get(True, 1)[1] == "test1" + assert self.dataQueue.get(True, 1)[1] == "test2" + assert self.dataQueue.qsize() is 0 # Last _check must be None # disconnect all assert self.testClient1.disconnect() assert self.testClient2.disconnect() From 24083ad6e8817abc3861deaca877919a5c5b1e68 Mon Sep 17 00:00:00 2001 From: Bastian Schroll Date: Fri, 21 Sep 2018 16:05:48 +0200 Subject: [PATCH 2/2] little improvements --- boswatch/network/server.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/boswatch/network/server.py b/boswatch/network/server.py index 31a7e3c..9cd53c4 100644 --- a/boswatch/network/server.py +++ b/boswatch/network/server.py @@ -21,10 +21,6 @@ import time logging.debug("- %s loaded", __name__) -# module wide global list for all currently connected clients -_clients = {} # _clients[ThreadName] = {"address", "timestamp"} -_lockClients = threading.Lock() - class ThreadedTCPRequestHandler(socketserver.ThreadingMixIn, socketserver.BaseRequestHandler): """!ThreadedTCPRequestHandler class for our TCPServer class.""" @@ -33,8 +29,8 @@ class ThreadedTCPRequestHandler(socketserver.ThreadingMixIn, socketserver.BaseRe """!Handles the request from an single client in a own thread Insert a request in the clients[] list and send a [ack]""" - with _lockClients: - _clients[threading.current_thread().name] = {"address": self.client_address[0], "timestamp": time.time()} + with self.server.clientsConnctedLock: # because our list is not threadsafe + self.server.clientsConnected[threading.current_thread().name] = {"address": self.client_address[0], "timestamp": time.time()} logging.info("Client connected: %s", self.client_address[0]) data = 1 # to enter while loop @@ -48,7 +44,7 @@ class ThreadedTCPRequestHandler(socketserver.ThreadingMixIn, socketserver.BaseRe logging.debug("%s recv: %s", req_name, data) # add a new entry and the decoded data dict as an string in utf-8 and an timestamp - self.server.alarmQueue.put_nowait((self.client_address[0], data, time.time())) + self.server.alarmQueue.put_nowait((self.client_address[0], data, time.time())) # queue is threadsafe logging.debug("Add data to queue") logging.debug("%s send: [ack]", req_name) @@ -60,8 +56,7 @@ class ThreadedTCPRequestHandler(socketserver.ThreadingMixIn, socketserver.BaseRe except: # pragma: no cover logging.exception("%s error while receiving", req_name) finally: - with _lockClients: - del _clients[threading.current_thread().name] + del self.server.clientsConnected[threading.current_thread().name] logging.info("Client disconnected: %s", self.client_address[0]) @@ -84,6 +79,9 @@ class TCPServer: self._timeout = timeout self._alarmQueue = alarmQueue + self._clientsConnectedLock = threading.Lock() + self._clientsConnected = {} + def start(self, port=8080): """!Start a threaded TCP socket server @@ -99,6 +97,9 @@ class TCPServer: self._server.timeout = self._timeout self._server.alarmQueue = self._alarmQueue + self._server.clientsConnctedLock = self._clientsConnectedLock + self._server.clientsConnected = self._clientsConnected + self._server_thread = threading.Thread(target=self._server.serve_forever) self._server_thread.name = "Thread-BWServer" self._server_thread.daemon = True @@ -130,20 +131,19 @@ class TCPServer: logging.exception("cannot stop the server") return False - @staticmethod - def countClientsConnected(): + def countClientsConnected(self): """!Number of currently connected Clients @return Connected clients""" - with _lockClients: - return len(_clients) + with self._clientsConnectedLock: # because our list is not threadsafe + return len(self._clientsConnected) - @staticmethod - def getClientsConnected(): + def getClientsConnected(self): """!A list of all connected clients with their IP address and last seen timestamp _clients[ThreadName] = {"address", "timestamp"} @return List of onnected clients""" # todo return full list or write a print/debug method? - return _clients + with self._clientsConnectedLock: # because our list is not threadsafe + return self.clientsConnected