Replace thread sleep with condition variables

Second patch in the improve-shutdown series. This replaces blocking
sleeps with the use of condition variables for signaling thread
shutdown. The details are:

(1) Create CSimpleCondition class to provide a very basic (simple)
    condition variable that can be used in situations where external
    mutex control is not required, i.e., the class contains both a
    managed mutex and condition variable. Instances of this class
    can be used for basic signaling, and waiters can specify user
    defined predicates.

(2) Replace instances of large sleeps in worker threads with use of
    CSimpleCondition. This allows for very quick response times from
    worker threads when a shutdown has been initiated.

(3) Change stop thread booleans to atomic_bool.

(4) Fixes small whitespace discprencies.
This commit is contained in:
Mark Landis (N6AZX) 2023-01-01 17:09:26 -08:00
parent 76497403e0
commit 9a37f2c680
16 changed files with 102 additions and 34 deletions

View file

@ -97,7 +97,7 @@ protected:
CPacketQueue m_LocalQueue;
// thread
bool m_bStopThread;
std::atomic_bool m_bStopThread;
std::thread *m_pThread;
CTimePoint m_TimeoutTimer;
CTimePoint m_StatsTimer;

View file

@ -29,6 +29,8 @@
#include "cdmriddirfile.h"
#include "cdmriddirhttp.h"
#include <chrono>
////////////////////////////////////////////////////////////////////////////////////////
// constructor & destructor
@ -42,6 +44,7 @@ CDmridDir::~CDmridDir()
{
// kill threads
m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL )
{
m_pThread->join();
@ -55,12 +58,12 @@ CDmridDir::~CDmridDir()
bool CDmridDir::Init(void)
{
// load content
Reload();
// load content
Reload();
// reset stop flag
m_bStopThread = false;
// start thread;
m_pThread = new std::thread(CDmridDir::Thread, this);
@ -70,6 +73,7 @@ bool CDmridDir::Init(void)
void CDmridDir::Close(void)
{
m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL )
{
m_pThread->join();
@ -83,15 +87,13 @@ void CDmridDir::Close(void)
void CDmridDir::Thread(CDmridDir *This)
{
while ( !This->m_bStopThread )
std::chrono::minutes timeout(DMRIDDB_REFRESH_RATE);
while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;}))
{
// Wait 30 seconds
CTimePoint::TaskSleepFor(DMRIDDB_REFRESH_RATE * 60000);
// have lists files changed ?
if ( This->NeedReload() )
{
This->Reload();
This->Reload();
}
}
}

View file

@ -31,6 +31,7 @@
#include <netdb.h>
#include "cbuffer.h"
#include "ccallsign.h"
#include "csimplecondition.h"
// compare function for std::map::find
@ -84,9 +85,10 @@ protected:
// Lock()
std::mutex m_Mutex;
CSimpleCondition m_cv;
// thread
bool m_bStopThread;
std::atomic_bool m_bStopThread;
std::thread *m_pThread;
};

View file

@ -80,8 +80,8 @@ bool CG3Protocol::Init(void)
{
// start helper threads
m_pPresenceThread = new std::thread(PresenceThread, this);
m_pPresenceThread = new std::thread(ConfigThread, this);
m_pPresenceThread = new std::thread(IcmpThread, this);
m_pConfigThread = new std::thread(ConfigThread, this);
m_pIcmpThread = new std::thread(IcmpThread, this);
}
#endif
@ -94,6 +94,7 @@ bool CG3Protocol::Init(void)
void CG3Protocol::Close(void)
{
m_bStopThread = true;
if (m_pPresenceThread != NULL)
{
m_pPresenceThread->join();

View file

@ -64,7 +64,13 @@ class CG3Protocol : public CProtocol
{
public:
// constructor
CG3Protocol() : m_GwAddress(0u), m_Modules("*"), m_LastModTime(0) {};
CG3Protocol() :
m_pPresenceThread(nullptr),
m_pConfigThread(nullptr),
m_pIcmpThread(nullptr),
m_GwAddress(0u),
m_Modules("*"),
m_LastModTime(0) {}
// destructor
virtual ~CG3Protocol() {};

View file

@ -25,6 +25,9 @@
#include "main.h"
#include "ctimepoint.h"
#include "cgatekeeper.h"
#include "csimplecondition.h"
#include <chrono>
////////////////////////////////////////////////////////////////////////////////////////
@ -47,6 +50,7 @@ CGateKeeper::~CGateKeeper()
{
// kill threads
m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL )
{
m_pThread->join();
@ -78,6 +82,7 @@ bool CGateKeeper::Init(void)
void CGateKeeper::Close(void)
{
m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL )
{
m_pThread->join();
@ -179,11 +184,9 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot
void CGateKeeper::Thread(CGateKeeper *This)
{
while ( !This->m_bStopThread )
std::chrono::seconds timeout(30);
while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;}))
{
// Wait 30 seconds
CTimePoint::TaskSleepFor(30000);
// have lists files changed ?
if ( This->m_NodeWhiteList.NeedReload() )
{

View file

@ -30,6 +30,7 @@
#include "cip.h"
#include "ccallsignlist.h"
#include "cpeercallsignlist.h"
#include "csimplecondition.h"
////////////////////////////////////////////////////////////////////////////////////////
// class
@ -71,8 +72,10 @@ protected:
CPeerCallsignList m_PeerList;
// thread
bool m_bStopThread;
CSimpleCondition m_cv;
std::atomic_bool m_bStopThread;
std::thread *m_pThread;
};

View file

@ -132,7 +132,7 @@ protected:
CPacketQueue m_Queue;
// thread
bool m_bStopThread;
std::atomic_bool m_bStopThread;
std::thread *m_pThread;
// identity

View file

@ -32,6 +32,8 @@
#include "cysfnodedirfile.h"
#include "cysfnodedirhttp.h"
#include <chrono>
////////////////////////////////////////////////////////////////////////////////////////
// constructor
@ -72,6 +74,7 @@ CReflector::CReflector(const CCallsign &callsign)
CReflector::~CReflector()
{
m_bStopThreads = true;
m_cv.signal();
if ( m_XmlReportThread != NULL )
{
m_XmlReportThread->join();
@ -146,6 +149,7 @@ void CReflector::Stop(void)
{
// stop & delete all threads
m_bStopThreads = true;
m_cv.signal();
// stop & delete report threads
if ( m_XmlReportThread != NULL )
@ -388,6 +392,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
void CReflector::XmlReportThread(CReflector *This)
{
const std::chrono::minutes timeout(XML_UPDATE_PERIOD);
while ( !This->m_bStopThreads )
{
// report to xml file
@ -408,8 +413,7 @@ void CReflector::XmlReportThread(CReflector *This)
}
#endif
// and wait a bit
CTimePoint::TaskSleepFor(XML_UPDATE_PERIOD * 1000);
This->m_cv.wait(timeout, [&]{return This->m_bStopThreads==true;});
}
}
@ -493,7 +497,8 @@ void CReflector::JsonReportThread(CReflector *This)
case NOTIFICATION_NONE:
default:
// nothing to do, just sleep a bit
CTimePoint::TaskSleepFor(250);
std::chrono::milliseconds timeout(250);
This->m_cv.wait(timeout, [&]{return This->m_bStopThreads==true;});
break;
}
}

View file

@ -31,6 +31,8 @@
#include "cprotocols.h"
#include "cpacketstream.h"
#include "cnotificationqueue.h"
#include "cysfnodedir.h"
#include "csimplecondition.h"
////////////////////////////////////////////////////////////////////////////////////////
@ -137,7 +139,8 @@ protected:
std::array<CPacketStream, NB_OF_MODULES> m_Streams;
// threads
bool m_bStopThreads;
CSimpleCondition m_cv;
std::atomic_bool m_bStopThreads;
std::array<std::thread *, NB_OF_MODULES> m_RouterThreads;
std::thread *m_XmlReportThread;
std::thread *m_JsonReportThread;

41
src/csimplecondition.h Normal file
View file

@ -0,0 +1,41 @@
#pragma once
#include <condition_variable>
#include <mutex>
class CSimpleCondition final
{
public:
CSimpleCondition() : m_Mutex(), m_Condition() {}
CSimpleCondition(const CSimpleCondition&) = delete;
CSimpleCondition& operator=(const CSimpleCondition&) = delete;
CSimpleCondition(CSimpleCondition&&) = delete;
~CSimpleCondition() {};
// Wait up to @duration to be signaled, or until @predicate is true.
// Returns result of predicate after timing out or being signaled.
template<typename Duration, typename Predicate>
bool wait(Duration, Predicate);
// Signal waiters. If @all is true, all waiters will be woken up.
void signal(bool all=true)
{
if (all)
m_Condition.notify_all();
else
m_Condition.notify_one();
}
private:
std::mutex m_Mutex;
std::condition_variable m_Condition;
};
// Note: @timeout is a relative duration, e.g., "30s".
template<typename Duration, typename Predicate>
bool CSimpleCondition::wait(Duration timeout, Predicate predicate)
{
std::unique_lock<std::mutex> lock(m_Mutex);
auto bound = std::chrono::system_clock::now() + timeout;
return m_Condition.wait_until(lock, bound, predicate);
}

View file

@ -92,7 +92,7 @@ protected:
uint16 m_PortOpenStream;
// thread
bool m_bStopThread;
std::atomic_bool m_bStopThread;
std::thread *m_pThread;
// socket

View file

@ -86,7 +86,7 @@ protected:
CWiresxPacketQueue m_PacketQueue;
// thread
bool m_bStopThread;
std::atomic_bool m_bStopThread;
std::thread *m_pThread;
};

View file

@ -26,7 +26,7 @@
#include "main.h"
#include "creflector.h"
#include "cysfnodedir.h"
#include <chrono>
////////////////////////////////////////////////////////////////////////////////////////
// constructor & destructor
@ -41,6 +41,7 @@ CYsfNodeDir::~CYsfNodeDir()
{
// kill threads
m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL )
{
m_pThread->join();
@ -69,6 +70,7 @@ bool CYsfNodeDir::Init(void)
void CYsfNodeDir::Close(void)
{
m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL )
{
m_pThread->join();
@ -82,11 +84,9 @@ void CYsfNodeDir::Close(void)
void CYsfNodeDir::Thread(CYsfNodeDir *This)
{
while ( !This->m_bStopThread )
const std::chrono::minutes timeout(YSFNODEDB_REFRESH_RATE);
while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;}))
{
// Wait 30 seconds
CTimePoint::TaskSleepFor(YSFNODEDB_REFRESH_RATE * 60000);
// have lists files changed ?
if ( This->NeedReload() )
{

View file

@ -32,6 +32,7 @@
#include "cbuffer.h"
#include "ccallsign.h"
#include "cysfnode.h"
#include "csimplecondition.h"
////////////////////////////////////////////////////////////////////////////////////////
// define
@ -84,9 +85,10 @@ protected:
protected:
// Lock()
std::mutex m_Mutex;
// thread
bool m_bStopThread;
CSimpleCondition m_cv;
std::atomic_bool m_bStopThread;
std::thread *m_pThread;
};

View file

@ -53,7 +53,7 @@ static int wait_for_termination()
sigaddset(&waitset, SIGHUP);
pthread_sigmask(SIG_BLOCK, &waitset, nullptr);
// Now wait for termination signal
// Wait for a termination signal
int result = -1;
while (result < 0)
{
@ -156,7 +156,7 @@ int main(int argc, const char * argv[])
// and wait for end
wait_for_termination();
g_Reflector->Stop();
g_Reflector.Stop();
std::cout << "Reflector stopped" << std::endl;
// done