BW3-Core/test/test_ServerClient.py

156 lines
5.8 KiB
Python
Raw Normal View History

2018-01-07 21:51:43 +01:00
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: test_ServerClient.py
@date: 10.12.2017
@author: Bastian Schroll
2018-01-08 23:41:33 +01:00
@description: Unittests for BOSWatch. File must be _run as "pytest" unittest
2018-01-07 21:51:43 +01:00
"""
import pytest # import the pytest framework
import time
2018-01-11 12:59:53 +01:00
from boswatch.network.server import TCPServer
from boswatch.network.client import TCPClient
2018-01-07 21:51:43 +01:00
class Test_ServerClient:
"""!Unittest for the server/client environment"""
@pytest.fixture(scope="function")
def useServer(self):
"""!Start and serve the sever for each functions where useServer is given"""
2018-01-11 12:59:53 +01:00
self.testServer = TCPServer()
2018-01-07 21:51:43 +01:00
assert self.testServer.start()
time.sleep(0.1) # wait for server
yield self.testServer # server to all test where useServer is given
assert self.testServer.stop()
time.sleep(0.1) # wait for server
2018-01-11 12:59:53 +01:00
def test_clientConnectFailed(self):
2018-01-07 21:51:43 +01:00
"""!Connect to a non available server"""
2018-01-11 12:59:53 +01:00
self.testClient = TCPClient()
2018-01-07 21:51:43 +01:00
assert not self.testClient.connect()
2018-01-11 12:59:53 +01:00
def test_clientDisconnectFailed(self):
2018-01-07 21:51:43 +01:00
"""!Disconnect while no connection is established"""
2018-01-11 12:59:53 +01:00
self.testClient = TCPClient()
2018-01-07 21:51:43 +01:00
assert not self.testClient.disconnect()
2018-01-11 12:59:53 +01:00
def test_clientTransmitFailed(self):
2018-01-07 21:51:43 +01:00
"""!Transmit while no connection is established"""
2018-01-11 12:59:53 +01:00
self.testClient = TCPClient()
2018-01-07 21:51:43 +01:00
assert not self.testClient.transmit("test")
2018-01-11 12:59:53 +01:00
def test_clientReceiveFailed(self):
2018-01-07 21:51:43 +01:00
"""!Receive while no connection is established"""
2018-01-11 12:59:53 +01:00
self.testClient = TCPClient()
2018-01-07 21:51:43 +01:00
assert not self.testClient.receive()
2018-01-11 12:59:53 +01:00
def test_clientConnect(self, useServer):
2018-01-07 21:51:43 +01:00
"""!Connect to a server"""
2018-01-11 12:59:53 +01:00
self.testClient = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient.connect()
assert self.testClient.disconnect()
2018-01-11 12:59:53 +01:00
def test_clientReconnect(self, useServer):
2018-01-07 21:51:43 +01:00
"""!Try a reconnect after a established connection"""
2018-01-11 12:59:53 +01:00
self.testClient = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient.connect()
assert self.testClient.disconnect()
assert self.testClient.connect()
assert self.testClient.disconnect()
2018-01-11 12:59:53 +01:00
def test_clientMultiConnect(self, useServer):
2018-01-07 21:51:43 +01:00
"""!Connect with 2 clients to the server"""
2018-01-11 12:59:53 +01:00
self.testClient1 = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient1.connect()
2018-01-11 12:59:53 +01:00
self.testClient2 = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient2.connect()
# disconnect all
assert self.testClient1.disconnect()
assert self.testClient2.disconnect()
2018-01-11 12:59:53 +01:00
def test_clientCommunicate(self, useServer):
2018-01-07 21:51:43 +01:00
"""!Try to send data to the server and check on '[ack]'"""
2018-01-11 12:59:53 +01:00
self.testClient = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient.connect()
assert self.testClient.transmit("test")
assert self.testClient.receive() == "[ack]"
assert self.testClient.disconnect()
2018-01-11 12:59:53 +01:00
def test_clientMultiCommunicate(self, useServer):
2018-01-07 21:51:43 +01:00
"""!Try to send data to the server with 3 clients and check on '[ack]'"""
# connect all
2018-01-11 12:59:53 +01:00
self.testClient1 = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient1.connect()
2018-01-11 12:59:53 +01:00
self.testClient2 = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient2.connect()
2018-01-11 12:59:53 +01:00
self.testClient3 = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient3.connect()
# send all
assert self.testClient1.transmit("test")
assert self.testClient2.transmit("test")
assert self.testClient3.transmit("test")
# recv all
assert self.testClient3.receive() == "[ack]"
assert self.testClient2.receive() == "[ack]"
assert self.testClient1.receive() == "[ack]"
# disconnect all
assert self.testClient1.disconnect()
assert self.testClient2.disconnect()
assert self.testClient3.disconnect()
2018-01-11 12:59:53 +01:00
def test_serverRestart(self):
2018-01-07 21:51:43 +01:00
"""!Test a restart of the server"""
2018-01-11 12:59:53 +01:00
self.testServer = TCPServer()
2018-01-07 21:51:43 +01:00
assert self.testServer.start()
assert self.testServer.stop()
assert self.testServer.start()
assert self.testServer.stop()
2018-01-11 12:59:53 +01:00
def test_serverStopFailed(self):
2018-01-07 21:51:43 +01:00
"""!Test to start the server twice"""
2018-01-11 12:59:53 +01:00
self.testServer = TCPServer()
2018-01-07 21:51:43 +01:00
assert not self.testServer.stop()
2018-01-11 12:59:53 +01:00
def test_serverDoubleStart(self):
2018-01-07 21:51:43 +01:00
"""!Test to start the server twice"""
2018-01-11 12:59:53 +01:00
self.testServer1 = TCPServer()
self.testServer2 = TCPServer()
2018-01-07 21:51:43 +01:00
assert self.testServer1.start()
assert not self.testServer2.start()
assert self.testServer1.stop()
assert not self.testServer2.stop()
2018-01-11 12:59:53 +01:00
def test_serverGetOutput(self, useServer):
2018-01-07 21:51:43 +01:00
"""!Send data to server with 2 clients, check '[ack]' and data on server queue"""
# connect all
2018-01-11 12:59:53 +01:00
self.testClient1 = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient1.connect()
2018-01-11 12:59:53 +01:00
self.testClient2 = TCPClient()
2018-01-07 21:51:43 +01:00
assert self.testClient2.connect()
# send all
useServer.flushData()
assert self.testClient1.transmit("test1")
time.sleep(0.1) # wait for recv to prevent fail of false order
assert self.testClient2.transmit("test2")
# recv all
assert self.testClient1.receive() == "[ack]"
assert self.testClient2.receive() == "[ack]"
# check server output data
assert useServer.getData() == ("127.0.0.1", "test1")
assert useServer.getData() == ("127.0.0.1", "test2")
assert useServer.getData() is None # Last check must be None
# disconnect all
assert self.testClient1.disconnect()
assert self.testClient2.disconnect()