Merge branch 'develop' into broadcast

This commit is contained in:
Bastian Schroll 2018-09-25 19:06:56 +02:00 committed by GitHub
commit bbf71a34a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 288 additions and 8 deletions

View file

@ -26,11 +26,7 @@ class TCPClient:
def __init__(self, timeout=3):
"""!Create a new instance
Create a new instance of an TCP Client.
And set the timeout
@param timeout: client timeout in sec (3)
"""
@param timeout: timeout for the client in sec. (3)"""
try:
self._sock = None
self._timeout = timeout

View file

@ -0,0 +1,53 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: netCheck.py
@date: 21.09.2018
@author: Bastian Schroll
@description: Worker to check internet connection
"""
import logging
from urllib.request import urlopen
logging.debug("- %s loaded", __name__)
class NetCheck:
"""!Worker class to check internet connection"""
def __init__(self, hostname="https://www.google.com/", timeout=1):
"""!Create a new NetCheck instance
@param hostname: host against connection check is running ("https://www.google.com/")
@param timout: timout for connection check in sec. (1)"""
self._hostname = hostname
self._timeout = timeout
self._connectionState = False
self.checkConn() # initiate a first check
def checkConn(self):
"""!Check the connection
@return True or False"""
try:
urlopen(self._hostname, timeout=self._timeout)
logging.debug("%s is reachable", self._hostname)
self._connectionState = True
return True
except:
logging.warning("%s is not reachable", self._hostname)
self._connectionState = False
return False
@property
def connectionState(self):
"""!Property for the last connection state from checkConn()"""
return self._connectionState

125
boswatch/utils/timer.py Normal file
View file

@ -0,0 +1,125 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: timer.py
@date: 21.09.2018
@author: Bastian Schroll
@description: Timer class for interval timed events
"""
import logging
import time
from threading import Thread, Event
logging.debug("- %s loaded", __name__)
class RepeatedTimer:
def __init__(self, interval, targetFunction, *args, **kwargs):
"""!Create a new instance of the RepeatedTimer
@param interval: interval in sec. to recall target function
@param targetFunction: function to call on timer event
@param *args: arguments for the called function
@param *kwargs: keyword arguments for the called function
"""
self._interval = interval
self._function = targetFunction
self._args = args
self._kwargs = kwargs
self._start = 0
self._overdueCount = 0
self._lostEvents = 0
self._isRunning = False
self._event = Event()
self._thread = None
def start(self):
"""!Start a new timer worker thread
@return True or False"""
try:
if self._thread is None:
self._event.clear()
self._thread = Thread(target=self._target)
self._thread.name = "RepTim(" + str(self._interval) + ")"
self._thread.daemon = True # start as daemon (thread dies if main program ends)
self._thread.start()
logging.debug("start repeatedTimer: %s", self._thread.name)
return True
else:
logging.debug("repeatedTimer always started")
return True
except: # pragma: no cover
logging.exception("cannot start timer worker thread")
return False
def stop(self):
"""!Stop the timer worker thread
@return True or False"""
try:
self._event.set()
if self._thread is not None:
logging.debug("stop repeatedTimer: %s", self._thread.name)
self._thread.join()
return True
logging.warning("repeatedTimer always stopped")
return True
except: # pragma: no cover
logging.exception("cannot stop repeatedTimer")
return False
def _target(self):
"""!Runs the target function with his arguments in own thread"""
self._start = time.time()
while not self._event.wait(self.restTime):
logging.debug("work")
startTime = time.time()
try:
self._function(*self._args, **self._kwargs)
except: # pragma: no cover
logging.exception("target throws an exception")
runTime = time.time() - startTime
if runTime < self._interval:
logging.debug("ready after: %0.3f sec. - next call in: %0.3f sec.", runTime, self.restTime)
else:
lostEvents = int(runTime / self._interval)
logging.warning("timer overdue! interval: %0.3f sec. - runtime: %0.3f sec. - "
"%d events lost - next call in: %0.3f sec.", self._interval, runTime, lostEvents, self.restTime)
self._lostEvents += lostEvents
self._overdueCount += 1
logging.debug("repeatedTimer thread stopped: %s", self._thread.name)
self._thread = None # set to none after leave teh thread (running recognize)
@property
def isRunning(self):
"""!Property for repeatedTimer running state"""
if self._thread:
return True
return False
@property
def restTime(self):
"""!Property to get remaining time till next call"""
return self._interval - ((time.time() - self._start) % self._interval)
@property
def overdueCount(self):
"""!Property to get a count over all overdues"""
return self._overdueCount
@property
def lostEvents(self):
"""!Property to get a count over all los events"""
return self._lostEvents

View file

@ -71,6 +71,16 @@ print(server.isRunning)
time.sleep(2)
print(server.isRunning)
# test for the timer class
from boswatch.utils.timer import RepeatedTimer
from boswatch.network.netCheck import NetCheck
net = NetCheck()
test = RepeatedTimer(3, net.checkConn)
test.start()
time.sleep(10)
print(net.connectionState)
test.stop()
try:
header.logoToLog()
header.infoToLog()
@ -125,7 +135,7 @@ try:
#
# t1 = threading.Timer(1, eins)
# t2 = threading.Timer(5, zwei)
# t3 = threading.Timer(15, drei)
# t3 = threading.Timer(600, drei)
# t1.start()
# t2.start()
# t3.start()

View file

@ -22,7 +22,7 @@ format=%(asctime)s,%(msecs)03d - %(module)-15s [%(levelname)-8s] %(message)s
datefmt=%d.%m.%Y %H:%M:%S
[formatter_complex]
format=%(asctime)s,%(msecs)03d - %(module)-15s %(funcName)-18s [%(levelname)-8s] %(message)s
format=%(asctime)s,%(msecs)03d - %(threadName)-15s %(module)-15s %(funcName)-18s [%(levelname)-8s] %(message)s
datefmt=%d.%m.%Y %H:%M:%S
[handlers]

View file

@ -22,7 +22,7 @@ format=%(asctime)s,%(msecs)03d - %(module)-15s [%(levelname)-8s] %(message)s
datefmt=%d.%m.%Y %H:%M:%S
[formatter_complex]
format=%(asctime)s,%(msecs)03d - %(module)-15s %(funcName)-18s [%(levelname)-8s] %(message)s
format=%(asctime)s,%(msecs)03d - %(threadName)-15s %(module)-15s %(funcName)-18s [%(levelname)-8s] %(message)s
datefmt=%d.%m.%Y %H:%M:%S
[handlers]

96
test/test_timer.py Normal file
View file

@ -0,0 +1,96 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: test_timer.py
@date: 21.09.2018
@author: Bastian Schroll
@description: Unittests for BOSWatch. File must be _run as "pytest" unittest
"""
import logging
import time
import pytest
from boswatch.utils.timer import RepeatedTimer
class Test_Timer:
"""!Unittest for the timer class"""
def setup_method(self, method):
logging.debug("[TEST] %s.%s", type(self).__name__, method.__name__)
@staticmethod
def testTargetFast():
"""!Fast worker thread"""
logging.debug("run testTargetFast")
@staticmethod
def testTargetSlow():
"""!Slow worker thread"""
logging.debug("run testTargetSlow start")
time.sleep(0.51)
logging.debug("run testTargetSlow end")
@pytest.fixture(scope="function")
def useTimerFast(self):
"""!Server a RepeatedTimer instance with fast worker"""
self.testTimer = RepeatedTimer(0.1, Test_Timer.testTargetFast)
yield 1 # server the timer instance
if self.testTimer.isRunning:
assert self.testTimer.stop()
@pytest.fixture(scope="function")
def useTimerSlow(self):
"""!Server a RepeatedTimer instance slow worker"""
self.testTimer = RepeatedTimer(0.1, Test_Timer.testTargetSlow)
yield 1 # server the timer instance
if self.testTimer.isRunning:
assert self.testTimer.stop()
# test cases starts here
def test_timerStartStop(self, useTimerFast):
assert self.testTimer.start()
assert self.testTimer.stop()
def test_timerDoubleStart(self, useTimerFast):
assert self.testTimer.start()
assert self.testTimer.start()
assert self.testTimer.stop()
def test_timerStopNotStarted(self, useTimerFast):
assert self.testTimer.stop()
def test_timerIsRunning(self, useTimerFast):
assert self.testTimer.start()
assert self.testTimer.isRunning
assert self.testTimer.stop()
def test_timerRun(self, useTimerFast):
assert self.testTimer.start()
time.sleep(0.2)
assert self.testTimer.stop()
assert self.testTimer.overdueCount == 0
assert self.testTimer.lostEvents == 0
def test_timerOverdue(self, useTimerSlow):
assert self.testTimer.start()
time.sleep(0.2)
assert self.testTimer.stop()
assert self.testTimer.overdueCount == 1
assert self.testTimer.lostEvents == 5
def test_timerOverdueLong(self, useTimerSlow):
assert self.testTimer.start()
time.sleep(1)
assert self.testTimer.stop()
assert self.testTimer.overdueCount == 2
assert self.testTimer.lostEvents == 10