add unittest

This commit is contained in:
Bastian Schroll 2018-09-25 18:50:52 +02:00
parent 26af2ef18b
commit 22588caac5
2 changed files with 79 additions and 9 deletions

View file

@ -17,7 +17,6 @@
import logging
import socket
import threading
import time
logging.debug("- %s loaded", __name__)
@ -40,7 +39,7 @@ class BroadcastClient:
self._socket.settimeout(3)
def getConnInfo(self, retry=0):
"""!Send broadcastpackets
"""!Get the connection info from server over udp broadcast
This function will send broadcast package(s)
to get connection info from the server.
@ -52,11 +51,12 @@ class BroadcastClient:
@param retry: Count of retry - 0 is infinite (0)
@return True or False"""
sendPackages = 1
while sendPackages <= retry or retry == 0:
sendPackages = 0
while sendPackages < retry or retry == 0:
try:
logging.debug("send magic <BW3-Request> as broadcast - Try: %d", sendPackages)
self._socket.sendto("<BW3-Request>".encode(), ('255.255.255.255', self._broadcastPort))
sendPackages += 1
payload, address = self._socket.recvfrom(1024)
payload = str(payload, "UTF-8")
@ -68,9 +68,9 @@ class BroadcastClient:
return True
except socket.timeout: # nothing received - retry
logging.debug("no magic packet received")
sendPackages += 1
except:
except: # pragma: no cover
logging.exception("error on getting connection info")
logging.warning("cannot fetch connection info after %d tries", sendPackages)
return False
@ -117,7 +117,7 @@ class BroadcastServer:
else:
logging.warning("udp broadcast server always started")
return True
except:
except: # pragma: no cover
logging.exception("cannot start udp broadcast server thread")
return False
@ -137,7 +137,7 @@ class BroadcastServer:
else:
logging.warning("udp broadcast server always stopped")
return True
except:
except: # pragma: no cover
logging.exception("cannot stop udp broadcast server thread")
return False
@ -160,7 +160,7 @@ class BroadcastServer:
self._socket.sendto("<BW3-Result>;".encode() + str(self._servePort).encode(), address)
except socket.timeout:
continue # timeout is accepted (not block at recvfrom())
except:
except: # pragma: no cover
logging.exception("error while listening for clients")
self._serverThread = None
logging.debug("udp broadcast server stopped")

70
test/test_broadcast.py Normal file
View file

@ -0,0 +1,70 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: test_broadcast.py
@date: 25.09.2018
@author: Bastian Schroll
@description: Unittests for BOSWatch. File must be _run as "pytest" unittest
"""
import logging
import time
import pytest
from boswatch.network.broadcast import BroadcastServer
from boswatch.network.broadcast import BroadcastClient
class Test_Timer:
"""!Unittest for the timer class"""
def setup_method(self, method):
logging.debug("[TEST] %s.%s", type(self).__name__, method.__name__)
@pytest.fixture(scope="function")
def useBroadcastServer(self):
"""!Server a BroadcastServer instance"""
self.broadcastServer = BroadcastServer()
yield 1 # server the server instance
if self.broadcastServer.isRunning:
assert self.broadcastServer.stop()
while self.broadcastServer.isRunning:
pass
@pytest.fixture(scope="function")
def useBroadcastClient(self):
"""!Server a BroadcastClient instance"""
self.broadcastClient = BroadcastClient()
yield 1 # server the server instance
# tests start here
def test_serverStartStop(self, useBroadcastServer):
assert self.broadcastServer.start()
assert self.broadcastServer.isRunning
assert self.broadcastServer.stop()
def test_serverDoubleStart(self, useBroadcastServer):
assert self.broadcastServer.start()
assert self.broadcastServer.start()
assert self.broadcastServer.stop()
def test_serverStopNotStarted(self, useBroadcastServer):
assert self.broadcastServer.stop()
def test_clientWithoutServer(self, useBroadcastClient):
assert not self.broadcastClient.getConnInfo(1)
def test_serverClientFetchConnInfo(self, useBroadcastServer, useBroadcastClient):
assert self.broadcastServer.start()
assert self.broadcastClient.getConnInfo()
assert self.broadcastServer.stop()
assert self.broadcastClient.serverIP
assert self.broadcastClient.serverPort