BW3-Core/test/boswatch/test_ServerClient.py
Luflosi d4dcc75711
Avoid "DeprecationWarning: invalid escape sequence"
Without this change, many warnings like this will be generated while running pytest:
```
test/test_template.py:3
  /build/source/test/test_template.py:3: DeprecationWarning: invalid escape sequence '\/'
    """!
```
This can also be seen when manually running python with warnings enabled.

This happens because the comment uses a multiline string and Python interprets the backslash in the logo as an escape character and complains that \/ is not a valid escape sequence. To fix this, prepend the string with the letter r to indicate that the backslash should be treated as a literal character, see https://docs.python.org/3/reference/lexical_analysis.html#index-20.
I also applied this change to all the comment strings since that shouldn't break anything and to establish it as a pattern for the future so this problem hopefully never happens again.

This is what I did specifically:
- Change the comment at the top of bw_client.py and bw_server.py to start with `"""!` since that seems to be the pattern here
- Search-and-Replace all occurances of `"""!` with `r"""!`
- Manually change the strings in `logoToLog()` in boswatch/utils/header.py
2023-09-19 17:49:09 +02:00

248 lines
7.6 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
r"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: test_ServerClient.py
@date: 10.12.2017
@author: Bastian Schroll
@description: Unittests for BOSWatch. File have to run as "pytest" unittest
"""
# problem of the pytest fixtures
# pylint: disable=redefined-outer-name
import logging
import time
import queue
import pytest
from boswatch.network.server import TCPServer
from boswatch.network.client import TCPClient
import threading
def setup_function(function):
logging.debug("[TEST] %s.%s", function.__module__, function.__name__)
@pytest.fixture
def getClient():
r"""!Build and serve a TCPCLient"""
return TCPClient()
@pytest.fixture
def getServer():
r"""!Build and serve a TCPServer"""
dataQueue = queue.Queue()
testServer = TCPServer(dataQueue)
return testServer
@pytest.fixture
def getRunningServer(getServer):
r"""!Build and serve a still running TCPServer"""
logging.debug("start server")
assert getServer.start()
while not getServer.isRunning:
pass
yield getServer
logging.debug("stop server")
assert getServer.stop()
time.sleep(0.1) # wait for safe stopped
def test_clientConnectFailed(getClient):
r"""!Connect to a non available server"""
assert not getClient.connect()
def test_clientDisconnectFailed(getClient):
r"""!Disconnect while no connection is established"""
assert getClient.disconnect()
def test_clientTransmitFailed(getClient):
r"""!Transmit while no connection is established"""
assert not getClient.transmit("test")
def test_clientReceiveFailed(getClient):
r"""!Receive while no connection is established"""
assert not getClient.receive()
def test_clientConnect(getClient, getRunningServer):
r"""!Connect to a server"""
assert getClient.connect()
assert getClient.disconnect()
def test_doubleConnect(getClient, getRunningServer):
r"""!Connect to a server twice"""
assert getClient.connect()
assert getClient.connect()
assert getClient.disconnect()
def test_clientReconnect(getClient, getRunningServer):
r"""!Try a reconnect after a established connection"""
assert getClient.connect()
assert getClient.disconnect()
assert getClient.connect()
assert getClient.disconnect()
def test_clientMultiConnect(getClient, getRunningServer):
r"""!Connect with 2 clients to the server"""
assert getClient.connect()
testClient2 = TCPClient()
assert testClient2.connect()
time.sleep(0.1) # wait for all clients connected
# check connected clients
assert getRunningServer.countClientsConnected() == 2
# disconnect all
assert getClient.disconnect()
assert testClient2.disconnect()
def test_clientCommunicate(getClient, getRunningServer):
r"""!Try to send data to the server and check on '[ack]'"""
assert getClient.connect()
assert getClient.transmit("test")
assert getClient.receive() == "[ack]"
assert getClient.disconnect()
@pytest.mark.skip("needs fixture for more than one client")
def test_clientMultiCommunicate(getServer):
r"""!Try to send data to the server with 3 clients and check on '[ack]'"""
# connect all
testClient1 = TCPClient()
assert testClient1.connect()
testClient2 = TCPClient()
assert testClient2.connect()
testClient3 = TCPClient()
assert testClient3.connect()
# send all
assert testClient1.transmit("test")
assert testClient2.transmit("test")
assert testClient3.transmit("test")
# recv all
assert testClient3.receive() == "[ack]"
assert testClient2.receive() == "[ack]"
assert testClient1.receive() == "[ack]"
# check server msg queue
assert getRunningServer._alarmQueue.qsize() == 3
# disconnect all
assert testClient1.disconnect()
assert testClient2.disconnect()
assert testClient3.disconnect()
def test_serverRestart(getRunningServer):
r"""!Test a stop and restart of the server"""
assert getRunningServer.stop()
assert getRunningServer.start()
assert getRunningServer.stop()
def test_serverStopFailed(getServer):
r"""!Test to stop a stopped server"""
assert getServer.stop()
def test_serverDoubleStart(getServer):
r"""!Test to start the server twice"""
assert getServer.start()
assert getServer.start()
assert getServer.stop()
def test_serverStartTwoInstances():
r"""!Test to start two server different server instances"""
dataQueue = queue.Queue()
testServer1 = TCPServer(dataQueue)
testServer2 = TCPServer(dataQueue)
assert testServer1.start()
assert testServer1.isRunning
assert not testServer2.start()
assert testServer1.isRunning
assert not testServer2.isRunning
assert testServer1.stop()
assert testServer2.stop()
def test_serverStopsWhileConnected(getRunningServer, getClient):
r"""!Shutdown server while client is connected"""
getClient.connect()
getRunningServer.stop()
timeout = 5
while getClient.isConnected:
time.sleep(0.1)
timeout = timeout - 1
if timeout == 0:
break
assert timeout
@pytest.mark.skip("needs fixture for more than one client")
def test_serverGetOutput(getRunningServer):
r"""!Send data to server with 2 clients, check '[ack]' and data on server queue"""
# connect all
testClient1 = TCPClient()
assert testClient1.connect()
testClient2 = TCPClient()
assert testClient2.connect()
# send all
assert testClient1.transmit("test1")
time.sleep(0.1) # wait for recv to prevent fail of false order
assert testClient2.transmit("test2")
# recv all
assert testClient1.receive() == "[ack]"
assert testClient2.receive() == "[ack]"
# _check server output data
assert getRunningServer._alarmQueue.qsize() == 2
assert getRunningServer._alarmQueue.get(True, 1)[1] == "test1"
assert getRunningServer._alarmQueue.get(True, 1)[1] == "test2"
assert getRunningServer._alarmQueue.qsize() == 0 # Last _check must be None
# disconnect all
assert testClient1.disconnect()
assert testClient2.disconnect()
def test_serverHighLoad(getRunningServer):
r"""!High load server test with 10 send threads each will send 100 msg with 324 bytes size"""
logging.debug("start sendThreads")
threads = []
for thr_id in range(10):
thr = threading.Thread(target=sendThread, name="sendThread-" + str(thr_id))
thr.daemon = True
thr.start()
threads.append(thr)
for thread in threads:
thread.join()
logging.debug("finished sendThreads")
assert getRunningServer._alarmQueue.qsize() == 1000
def sendThread():
client = TCPClient()
client.connect()
time.sleep(0.1)
for i in range(100):
# actually this string is 324 bytes long
client.transmit("HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-"
"HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-"
"HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-HigLoadTestString-")
if not client.receive() == "[ack]":
logging.error("missing [ACK]")
time.sleep(0.1)
client.disconnect()