diff --git a/boswatch/network/client.py b/boswatch/network/client.py index a8604da..6c785db 100644 --- a/boswatch/network/client.py +++ b/boswatch/network/client.py @@ -26,7 +26,7 @@ class TCPClient: def __init__(self, timeout=3): """!Create a new instance - @param timeout: timout for the client in sec. (3)""" + @param timeout: timeout for the client in sec. (3)""" try: self._sock = None self._timeout = timeout @@ -36,7 +36,7 @@ class TCPClient: def connect(self, host="localhost", port=8080): """!Connect to the server - @param host: Server IP address (localhost) + @param host: Server IP address ("localhost") @param port: Server Port (8080) @return True or False""" try: @@ -97,7 +97,7 @@ class TCPClient: @return received data""" try: received = str(self._sock.recv(1024), "utf-8") - logging.debug("received: %d", received) + logging.debug("received: %s", received) return received except AttributeError: logging.error("cannot receive - no connection established") diff --git a/boswatch/network/server.py b/boswatch/network/server.py index 27df38c..9cd53c4 100644 --- a/boswatch/network/server.py +++ b/boswatch/network/server.py @@ -21,24 +21,16 @@ import time logging.debug("- %s loaded", __name__) -# module wide global list for received data sets -_dataPackets = [] -_lockDataPackets = threading.Lock() -# module wide global list for all currently connected clients -_clients = {} # _clients[ThreadName] = {"address", "timestamp"} -_lockClients = threading.Lock() - - -class TCPHandler(socketserver.BaseRequestHandler): - """!RequestHandler class for our TCPServer class.""" +class ThreadedTCPRequestHandler(socketserver.ThreadingMixIn, socketserver.BaseRequestHandler): + """!ThreadedTCPRequestHandler class for our TCPServer class.""" def handle(self): """!Handles the request from an single client in a own thread Insert a request in the clients[] list and send a [ack]""" - with _lockClients: - _clients[threading.current_thread().name] = {"address": self.client_address[0], "timestamp": time.time()} + with self.server.clientsConnctedLock: # because our list is not threadsafe + self.server.clientsConnected[threading.current_thread().name] = {"address": self.client_address[0], "timestamp": time.time()} logging.info("Client connected: %s", self.client_address[0]) data = 1 # to enter while loop @@ -51,10 +43,8 @@ class TCPHandler(socketserver.BaseRequestHandler): if data != "": logging.debug("%s recv: %s", req_name, data) - # add a new entry at first position (index 0) with client IP - # and the decoded data dict as an string in utf-8 and an timestamp - with _lockDataPackets: - _dataPackets.insert(0, (self.client_address[0], data, time.time())) # time() to calc time in queue + # add a new entry and the decoded data dict as an string in utf-8 and an timestamp + self.server.alarmQueue.put_nowait((self.client_address[0], data, time.time())) # queue is threadsafe logging.debug("Add data to queue") logging.debug("%s send: [ack]", req_name) @@ -66,21 +56,31 @@ class TCPHandler(socketserver.BaseRequestHandler): except: # pragma: no cover logging.exception("%s error while receiving", req_name) finally: - with _lockClients: - del _clients[threading.current_thread().name] + del self.server.clientsConnected[threading.current_thread().name] logging.info("Client disconnected: %s", self.client_address[0]) -class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + """!ThreadedTCPServer class for our TCPServer class.""" + pass + + +class TCPServer: """!TCP server class""" - def __init__(self, timeout=3): + def __init__(self, alarmQueue, timeout=3): """!Create a new instance - @param timeout: timeout for the server in sec. (3)""" + @param alarmQueue: python queue instance + @param timeout: server timeout in sec (3) + """ self._server = None self._server_thread = None self._timeout = timeout + self._alarmQueue = alarmQueue + + self._clientsConnectedLock = threading.Lock() + self._clientsConnected = {} def start(self, port=8080): """!Start a threaded TCP socket server @@ -93,10 +93,12 @@ class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): @return True or False""" try: - self._server = socketserver.ThreadingTCPServer(("", port), TCPHandler) + self._server = ThreadedTCPServer(("", port), ThreadedTCPRequestHandler) self._server.timeout = self._timeout + self._server.alarmQueue = self._alarmQueue - self.flushQueue() + self._server.clientsConnctedLock = self._clientsConnectedLock + self._server.clientsConnected = self._clientsConnected self._server_thread = threading.Thread(target=self._server.serve_forever) self._server_thread.name = "Thread-BWServer" @@ -117,7 +119,9 @@ class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): try: self._server.shutdown() self._server_thread.join() + self._server_thread = None self._server.socket.close() + self._server = None logging.debug("TCPServer stopped") return True except AttributeError: @@ -127,47 +131,19 @@ class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): logging.exception("cannot stop the server") return False - @staticmethod - def countClientsConnected(): + def countClientsConnected(self): """!Number of currently connected Clients @return Connected clients""" - with _lockClients: - return len(_clients) + with self._clientsConnectedLock: # because our list is not threadsafe + return len(self._clientsConnected) - @staticmethod - def getClientsConnected(): + def getClientsConnected(self): """!A list of all connected clients with their IP address and last seen timestamp _clients[ThreadName] = {"address", "timestamp"} @return List of onnected clients""" # todo return full list or write a print/debug method? - return _clients - - @staticmethod - def getDataFromQueue(): - """!Function to get the data packages from server - must be polled by main program - - @return Next data packet.py from intern queue""" - if _dataPackets: - with _lockDataPackets: - message = _dataPackets.pop() - logging.debug("Get data from queue") - return message - return None - - @staticmethod - def countPacketsInQueue(): - """!Get packets waiting in queue - - @return Packets in queue""" - return len(_dataPackets) # no lock needed - only reading - - @staticmethod - def flushQueue(): - """!To flush all existing data in queue""" - logging.debug("Flush data queue") - with _lockDataPackets: - _dataPackets.clear() + with self._clientsConnectedLock: # because our list is not threadsafe + return self.clientsConnected diff --git a/bw_client.py b/bw_client.py index 530abe5..4cb4d56 100644 --- a/bw_client.py +++ b/bw_client.py @@ -95,7 +95,7 @@ try: failedTransmits = 0 while not bwClient.receive() == "[ack]": # wait for ack or timeout if failedTransmits >= 3: - logging.error("cannot transmit after 5 retires") + logging.error("cannot transmit after 3 retires") break failedTransmits += 1 logging.warning("attempt %d to resend packet", failedTransmits) diff --git a/bw_server.py b/bw_server.py index 8ab6b48..4814dcf 100644 --- a/bw_server.py +++ b/bw_server.py @@ -41,7 +41,7 @@ try: import time import sys import threading - import threading + import queue logging.debug("Import BOSWatch modules") from boswatch.config import Config @@ -123,7 +123,8 @@ try: # t2.start() # t3.start() - bwServer = TCPServer(bwConfig.getInt("Server", "PORT")) + incomingQueue = queue.Queue() + bwServer = TCPServer(incomingQueue) if bwServer.start(): while 1: @@ -133,7 +134,7 @@ try: packetsOld = 0 while serverPaused is True: time.sleep(0.2) # reduce cpu load (run all 200ms) - packetsNew = bwServer.countPacketsInQueue() + packetsNew = incomingQueue.qsize() if packetsNew is not packetsOld: logging.debug("%s packet(s) waiting in queue", packetsNew) packetsOld = packetsNew @@ -143,13 +144,13 @@ try: logging.warning("Server stop flag received ...") break - if not bwServer.countPacketsInQueue(): # pause only when no data + if incomingQueue.empty(): # pause only when no data time.sleep(0.1) # reduce cpu load (run all 100ms) - data = bwServer.getDataFromQueue() + data = incomingQueue.get() if data is not None: logging.info("get data from %s (waited in queue %0.3f sec.)", data[0], time.time() - data[2]) - logging.debug("%s packet(s) waiting in queue", bwServer.countPacketsInQueue()) + logging.debug("%s packet(s) waiting in queue", incomingQueue.qsize()) bwPacket = Packet((data[1])) if not bwDoubleFilter.filter(bwPacket): @@ -163,6 +164,7 @@ try: bwPluginManager.runAllPlugins(bwPacket) # print(bwPacket.get("clientVersion")["major"]) + incomingQueue.task_done() except KeyboardInterrupt: # pragma: no cover logging.warning("Keyboard interrupt") diff --git a/test/test_ServerClient.py b/test/test_ServerClient.py index a4398f5..fcba7a7 100644 --- a/test/test_ServerClient.py +++ b/test/test_ServerClient.py @@ -16,6 +16,7 @@ """ import logging import time +import queue import pytest from boswatch.network.server import TCPServer @@ -31,11 +32,19 @@ class Test_ServerClient: @pytest.fixture(scope="function") def useServer(self): """!Start and serve the sever for each functions where useServer is given""" - self.testServer = TCPServer() + self.dataQueue = queue.Queue() + self.testServer = TCPServer(self.dataQueue) + logging.debug("start server") assert self.testServer.start() time.sleep(0.1) # wait for server - yield self.testServer # server to all test where useServer is given - assert self.testServer.stop() + # serv the instances - created in self context + yield 1 + try: + logging.debug("stop server") + self.testServer.stop() + except: + logging.warning("server still stopped") + time.sleep(0.1) # wait for server def test_clientConnectFailed(self): @@ -80,7 +89,7 @@ class Test_ServerClient: assert self.testClient2.connect() time.sleep(0.1) # wait for all clients connected # check connected clients - assert useServer.countClientsConnected() == 2 + assert self.testServer.countClientsConnected() == 2 # disconnect all assert self.testClient1.disconnect() assert self.testClient2.disconnect() @@ -111,29 +120,28 @@ class Test_ServerClient: assert self.testClient2.receive() == "[ack]" assert self.testClient1.receive() == "[ack]" # check server msg queue - assert useServer.countPacketsInQueue() == 3 + assert self.dataQueue.qsize() == 3 # disconnect all assert self.testClient1.disconnect() assert self.testClient2.disconnect() assert self.testClient3.disconnect() - def test_serverRestart(self): - """!Test a restart of the server""" - self.testServer = TCPServer() - assert self.testServer.start() + def test_serverRestart(self, useServer): + """!Test a stop and restart of the server""" assert self.testServer.stop() assert self.testServer.start() assert self.testServer.stop() - def test_serverStopFailed(self): - """!Test to start the server twice""" - self.testServer = TCPServer() + def test_serverStopFailed(self, useServer): + """!Test to stop a stopped server""" + assert self.testServer.stop() assert not self.testServer.stop() def test_serverDoubleStart(self): """!Test to start the server twice""" - self.testServer1 = TCPServer() - self.testServer2 = TCPServer() + self.dataQueue = queue.Queue() + self.testServer1 = TCPServer(self.dataQueue) + self.testServer2 = TCPServer(self.dataQueue) assert self.testServer1.start() assert not self.testServer2.start() assert self.testServer1.stop() @@ -147,7 +155,6 @@ class Test_ServerClient: self.testClient2 = TCPClient() assert self.testClient2.connect() # send all - useServer.flushQueue() assert self.testClient1.transmit("test1") time.sleep(0.1) # wait for recv to prevent fail of false order assert self.testClient2.transmit("test2") @@ -155,10 +162,10 @@ class Test_ServerClient: assert self.testClient1.receive() == "[ack]" assert self.testClient2.receive() == "[ack]" # _check server output data - assert useServer.countPacketsInQueue() == 2 - assert useServer.getDataFromQueue()[1] == "test1" - assert useServer.getDataFromQueue()[1] == "test2" - assert useServer.getDataFromQueue() is None # Last _check must be None + assert self.dataQueue.qsize() == 2 + assert self.dataQueue.get(True, 1)[1] == "test1" + assert self.dataQueue.get(True, 1)[1] == "test2" + assert self.dataQueue.qsize() is 0 # Last _check must be None # disconnect all assert self.testClient1.disconnect() assert self.testClient2.disconnect()