diff --git a/boswatch/network/broadcast.py b/boswatch/network/broadcast.py index 27ab130..3cf98b9 100644 --- a/boswatch/network/broadcast.py +++ b/boswatch/network/broadcast.py @@ -17,7 +17,6 @@ import logging import socket import threading -import time logging.debug("- %s loaded", __name__) @@ -40,7 +39,7 @@ class BroadcastClient: self._socket.settimeout(3) def getConnInfo(self, retry=0): - """!Send broadcastpackets + """!Get the connection info from server over udp broadcast This function will send broadcast package(s) to get connection info from the server. @@ -52,11 +51,12 @@ class BroadcastClient: @param retry: Count of retry - 0 is infinite (0) @return True or False""" - sendPackages = 1 - while sendPackages <= retry or retry == 0: + sendPackages = 0 + while sendPackages < retry or retry == 0: try: logging.debug("send magic as broadcast - Try: %d", sendPackages) self._socket.sendto("".encode(), ('255.255.255.255', self._broadcastPort)) + sendPackages += 1 payload, address = self._socket.recvfrom(1024) payload = str(payload, "UTF-8") @@ -68,9 +68,9 @@ class BroadcastClient: return True except socket.timeout: # nothing received - retry logging.debug("no magic packet received") - sendPackages += 1 - except: + except: # pragma: no cover logging.exception("error on getting connection info") + logging.warning("cannot fetch connection info after %d tries", sendPackages) return False @@ -117,7 +117,7 @@ class BroadcastServer: else: logging.warning("udp broadcast server always started") return True - except: + except: # pragma: no cover logging.exception("cannot start udp broadcast server thread") return False @@ -137,7 +137,7 @@ class BroadcastServer: else: logging.warning("udp broadcast server always stopped") return True - except: + except: # pragma: no cover logging.exception("cannot stop udp broadcast server thread") return False @@ -160,7 +160,7 @@ class BroadcastServer: self._socket.sendto(";".encode() + str(self._servePort).encode(), address) except socket.timeout: continue # timeout is accepted (not block at recvfrom()) - except: + except: # pragma: no cover logging.exception("error while listening for clients") self._serverThread = None logging.debug("udp broadcast server stopped") diff --git a/test/test_broadcast.py b/test/test_broadcast.py new file mode 100644 index 0000000..c8e561b --- /dev/null +++ b/test/test_broadcast.py @@ -0,0 +1,70 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: test_broadcast.py +@date: 25.09.2018 +@author: Bastian Schroll +@description: Unittests for BOSWatch. File must be _run as "pytest" unittest +""" +import logging +import time +import pytest + +from boswatch.network.broadcast import BroadcastServer +from boswatch.network.broadcast import BroadcastClient + + +class Test_Timer: + """!Unittest for the timer class""" + + def setup_method(self, method): + logging.debug("[TEST] %s.%s", type(self).__name__, method.__name__) + + @pytest.fixture(scope="function") + def useBroadcastServer(self): + """!Server a BroadcastServer instance""" + self.broadcastServer = BroadcastServer() + yield 1 # server the server instance + if self.broadcastServer.isRunning: + assert self.broadcastServer.stop() + while self.broadcastServer.isRunning: + pass + + @pytest.fixture(scope="function") + def useBroadcastClient(self): + """!Server a BroadcastClient instance""" + self.broadcastClient = BroadcastClient() + yield 1 # server the server instance + + # tests start here + + def test_serverStartStop(self, useBroadcastServer): + assert self.broadcastServer.start() + assert self.broadcastServer.isRunning + assert self.broadcastServer.stop() + + def test_serverDoubleStart(self, useBroadcastServer): + assert self.broadcastServer.start() + assert self.broadcastServer.start() + assert self.broadcastServer.stop() + + def test_serverStopNotStarted(self, useBroadcastServer): + assert self.broadcastServer.stop() + + def test_clientWithoutServer(self, useBroadcastClient): + assert not self.broadcastClient.getConnInfo(1) + + def test_serverClientFetchConnInfo(self, useBroadcastServer, useBroadcastClient): + assert self.broadcastServer.start() + assert self.broadcastClient.getConnInfo() + assert self.broadcastServer.stop() + assert self.broadcastClient.serverIP + assert self.broadcastClient.serverPort