diff --git a/src/ccodecstream.h b/src/ccodecstream.h index c22e864..e11f520 100644 --- a/src/ccodecstream.h +++ b/src/ccodecstream.h @@ -97,7 +97,7 @@ protected: CPacketQueue m_LocalQueue; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; CTimePoint m_TimeoutTimer; CTimePoint m_StatsTimer; diff --git a/src/cdmriddir.cpp b/src/cdmriddir.cpp index 5d62db3..17b5400 100644 --- a/src/cdmriddir.cpp +++ b/src/cdmriddir.cpp @@ -29,6 +29,8 @@ #include "cdmriddirfile.h" #include "cdmriddirhttp.h" +#include + //////////////////////////////////////////////////////////////////////////////////////// // constructor & destructor @@ -42,6 +44,7 @@ CDmridDir::~CDmridDir() { // kill threads m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -55,12 +58,12 @@ CDmridDir::~CDmridDir() bool CDmridDir::Init(void) { - // load content - Reload(); + // load content + Reload(); // reset stop flag m_bStopThread = false; - + // start thread; m_pThread = new std::thread(CDmridDir::Thread, this); @@ -70,6 +73,7 @@ bool CDmridDir::Init(void) void CDmridDir::Close(void) { m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -83,15 +87,13 @@ void CDmridDir::Close(void) void CDmridDir::Thread(CDmridDir *This) { - while ( !This->m_bStopThread ) + std::chrono::minutes timeout(DMRIDDB_REFRESH_RATE); + while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;})) { - // Wait 30 seconds - CTimePoint::TaskSleepFor(DMRIDDB_REFRESH_RATE * 60000); - // have lists files changed ? if ( This->NeedReload() ) { - This->Reload(); + This->Reload(); } } } diff --git a/src/cdmriddir.h b/src/cdmriddir.h index 23334e3..f98b9fd 100644 --- a/src/cdmriddir.h +++ b/src/cdmriddir.h @@ -31,6 +31,7 @@ #include #include "cbuffer.h" #include "ccallsign.h" +#include "csimplecondition.h" // compare function for std::map::find @@ -84,9 +85,10 @@ protected: // Lock() std::mutex m_Mutex; + CSimpleCondition m_cv; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; }; diff --git a/src/cg3protocol.cpp b/src/cg3protocol.cpp index 9f85420..3ae72eb 100755 --- a/src/cg3protocol.cpp +++ b/src/cg3protocol.cpp @@ -80,8 +80,8 @@ bool CG3Protocol::Init(void) { // start helper threads m_pPresenceThread = new std::thread(PresenceThread, this); - m_pPresenceThread = new std::thread(ConfigThread, this); - m_pPresenceThread = new std::thread(IcmpThread, this); + m_pConfigThread = new std::thread(ConfigThread, this); + m_pIcmpThread = new std::thread(IcmpThread, this); } #endif @@ -94,6 +94,7 @@ bool CG3Protocol::Init(void) void CG3Protocol::Close(void) { + m_bStopThread = true; if (m_pPresenceThread != NULL) { m_pPresenceThread->join(); diff --git a/src/cg3protocol.h b/src/cg3protocol.h index 5ffdb79..53d5d6a 100644 --- a/src/cg3protocol.h +++ b/src/cg3protocol.h @@ -64,7 +64,13 @@ class CG3Protocol : public CProtocol { public: // constructor - CG3Protocol() : m_GwAddress(0u), m_Modules("*"), m_LastModTime(0) {}; + CG3Protocol() : + m_pPresenceThread(nullptr), + m_pConfigThread(nullptr), + m_pIcmpThread(nullptr), + m_GwAddress(0u), + m_Modules("*"), + m_LastModTime(0) {} // destructor virtual ~CG3Protocol() {}; diff --git a/src/cgatekeeper.cpp b/src/cgatekeeper.cpp index 6bbacca..2bc4eb9 100644 --- a/src/cgatekeeper.cpp +++ b/src/cgatekeeper.cpp @@ -25,6 +25,9 @@ #include "main.h" #include "ctimepoint.h" #include "cgatekeeper.h" +#include "csimplecondition.h" + +#include //////////////////////////////////////////////////////////////////////////////////////// @@ -47,6 +50,7 @@ CGateKeeper::~CGateKeeper() { // kill threads m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -78,6 +82,7 @@ bool CGateKeeper::Init(void) void CGateKeeper::Close(void) { m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -179,11 +184,9 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot void CGateKeeper::Thread(CGateKeeper *This) { - while ( !This->m_bStopThread ) + std::chrono::seconds timeout(30); + while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;})) { - // Wait 30 seconds - CTimePoint::TaskSleepFor(30000); - // have lists files changed ? if ( This->m_NodeWhiteList.NeedReload() ) { diff --git a/src/cgatekeeper.h b/src/cgatekeeper.h index 9ee7504..fe57456 100644 --- a/src/cgatekeeper.h +++ b/src/cgatekeeper.h @@ -30,6 +30,7 @@ #include "cip.h" #include "ccallsignlist.h" #include "cpeercallsignlist.h" +#include "csimplecondition.h" //////////////////////////////////////////////////////////////////////////////////////// // class @@ -71,8 +72,10 @@ protected: CPeerCallsignList m_PeerList; // thread - bool m_bStopThread; + CSimpleCondition m_cv; + std::atomic_bool m_bStopThread; std::thread *m_pThread; + }; diff --git a/src/cprotocol.h b/src/cprotocol.h index 1b43371..a6365c1 100644 --- a/src/cprotocol.h +++ b/src/cprotocol.h @@ -132,7 +132,7 @@ protected: CPacketQueue m_Queue; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; // identity diff --git a/src/creflector.cpp b/src/creflector.cpp index a813df9..7e6ef29 100644 --- a/src/creflector.cpp +++ b/src/creflector.cpp @@ -32,6 +32,8 @@ #include "cysfnodedirfile.h" #include "cysfnodedirhttp.h" +#include + //////////////////////////////////////////////////////////////////////////////////////// // constructor @@ -72,6 +74,7 @@ CReflector::CReflector(const CCallsign &callsign) CReflector::~CReflector() { m_bStopThreads = true; + m_cv.signal(); if ( m_XmlReportThread != NULL ) { m_XmlReportThread->join(); @@ -146,6 +149,7 @@ void CReflector::Stop(void) { // stop & delete all threads m_bStopThreads = true; + m_cv.signal(); // stop & delete report threads if ( m_XmlReportThread != NULL ) @@ -388,6 +392,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn) void CReflector::XmlReportThread(CReflector *This) { + const std::chrono::minutes timeout(XML_UPDATE_PERIOD); while ( !This->m_bStopThreads ) { // report to xml file @@ -408,8 +413,7 @@ void CReflector::XmlReportThread(CReflector *This) } #endif - // and wait a bit - CTimePoint::TaskSleepFor(XML_UPDATE_PERIOD * 1000); + This->m_cv.wait(timeout, [&]{return This->m_bStopThreads==true;}); } } @@ -493,7 +497,8 @@ void CReflector::JsonReportThread(CReflector *This) case NOTIFICATION_NONE: default: // nothing to do, just sleep a bit - CTimePoint::TaskSleepFor(250); + std::chrono::milliseconds timeout(250); + This->m_cv.wait(timeout, [&]{return This->m_bStopThreads==true;}); break; } } diff --git a/src/creflector.h b/src/creflector.h index ece983e..e809328 100644 --- a/src/creflector.h +++ b/src/creflector.h @@ -31,6 +31,8 @@ #include "cprotocols.h" #include "cpacketstream.h" #include "cnotificationqueue.h" +#include "cysfnodedir.h" +#include "csimplecondition.h" //////////////////////////////////////////////////////////////////////////////////////// @@ -137,7 +139,8 @@ protected: std::array m_Streams; // threads - bool m_bStopThreads; + CSimpleCondition m_cv; + std::atomic_bool m_bStopThreads; std::array m_RouterThreads; std::thread *m_XmlReportThread; std::thread *m_JsonReportThread; diff --git a/src/csimplecondition.h b/src/csimplecondition.h new file mode 100644 index 0000000..0845c06 --- /dev/null +++ b/src/csimplecondition.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +class CSimpleCondition final +{ +public: + CSimpleCondition() : m_Mutex(), m_Condition() {} + CSimpleCondition(const CSimpleCondition&) = delete; + CSimpleCondition& operator=(const CSimpleCondition&) = delete; + CSimpleCondition(CSimpleCondition&&) = delete; + ~CSimpleCondition() {}; + + // Wait up to @duration to be signaled, or until @predicate is true. + // Returns result of predicate after timing out or being signaled. + template + bool wait(Duration, Predicate); + + // Signal waiters. If @all is true, all waiters will be woken up. + void signal(bool all=true) + { + if (all) + m_Condition.notify_all(); + else + m_Condition.notify_one(); + } + +private: + std::mutex m_Mutex; + std::condition_variable m_Condition; +}; + +// Note: @timeout is a relative duration, e.g., "30s". +template +bool CSimpleCondition::wait(Duration timeout, Predicate predicate) +{ + std::unique_lock lock(m_Mutex); + auto bound = std::chrono::system_clock::now() + timeout; + return m_Condition.wait_until(lock, bound, predicate); +} diff --git a/src/ctranscoder.h b/src/ctranscoder.h index 7d54c58..470e9bf 100644 --- a/src/ctranscoder.h +++ b/src/ctranscoder.h @@ -92,7 +92,7 @@ protected: uint16 m_PortOpenStream; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; // socket diff --git a/src/cwiresxcmdhandler.h b/src/cwiresxcmdhandler.h index 3a3b7a3..429bc08 100644 --- a/src/cwiresxcmdhandler.h +++ b/src/cwiresxcmdhandler.h @@ -86,7 +86,7 @@ protected: CWiresxPacketQueue m_PacketQueue; // thread - bool m_bStopThread; + std::atomic_bool m_bStopThread; std::thread *m_pThread; }; diff --git a/src/cysfnodedir.cpp b/src/cysfnodedir.cpp index a8722af..74e50d9 100644 --- a/src/cysfnodedir.cpp +++ b/src/cysfnodedir.cpp @@ -26,7 +26,7 @@ #include "main.h" #include "creflector.h" #include "cysfnodedir.h" - +#include //////////////////////////////////////////////////////////////////////////////////////// // constructor & destructor @@ -41,6 +41,7 @@ CYsfNodeDir::~CYsfNodeDir() { // kill threads m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -69,6 +70,7 @@ bool CYsfNodeDir::Init(void) void CYsfNodeDir::Close(void) { m_bStopThread = true; + m_cv.signal(); if ( m_pThread != NULL ) { m_pThread->join(); @@ -82,11 +84,9 @@ void CYsfNodeDir::Close(void) void CYsfNodeDir::Thread(CYsfNodeDir *This) { - while ( !This->m_bStopThread ) + const std::chrono::minutes timeout(YSFNODEDB_REFRESH_RATE); + while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;})) { - // Wait 30 seconds - CTimePoint::TaskSleepFor(YSFNODEDB_REFRESH_RATE * 60000); - // have lists files changed ? if ( This->NeedReload() ) { diff --git a/src/cysfnodedir.h b/src/cysfnodedir.h index b4e3804..916b2c2 100644 --- a/src/cysfnodedir.h +++ b/src/cysfnodedir.h @@ -32,6 +32,7 @@ #include "cbuffer.h" #include "ccallsign.h" #include "cysfnode.h" +#include "csimplecondition.h" //////////////////////////////////////////////////////////////////////////////////////// // define @@ -84,9 +85,10 @@ protected: protected: // Lock() std::mutex m_Mutex; - + // thread - bool m_bStopThread; + CSimpleCondition m_cv; + std::atomic_bool m_bStopThread; std::thread *m_pThread; }; diff --git a/src/main.cpp b/src/main.cpp index 723ab5b..6d8a17e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -26,6 +26,7 @@ #include "creflector.h" #include "syslog.h" +#include #include @@ -39,6 +40,37 @@ CReflector g_Reflector; #include "cusers.h" +// Returns caught termination signal or -1 on error +static int wait_for_termination() +{ + sigset_t waitset; + siginfo_t siginfo; + + sigemptyset(&waitset); + sigaddset(&waitset, SIGTERM); + sigaddset(&waitset, SIGINT); + sigaddset(&waitset, SIGQUIT); + sigaddset(&waitset, SIGHUP); + pthread_sigmask(SIG_BLOCK, &waitset, nullptr); + + // Wait for a termination signal + int result = -1; + while (result < 0) + { + result = sigwaitinfo(&waitset, &siginfo); + if (result == -1 && errno == EINTR) + { + // try again + if (errno == EINTR) + continue; + + // an unexpected error occurred, consider it fatal + break; + } + } + return result; +} + int main(int argc, const char * argv[]) { #ifdef RUN_AS_DAEMON @@ -101,36 +133,29 @@ int main(int argc, const char * argv[]) g_Reflector.SetCallsign(argv[1]); g_Reflector.SetListenIp(CIp(argv[2])); g_Reflector.SetTranscoderIp(CIp(CIp(argv[3]))); - + + // Block all signals while starting up the reflector -- we don't + // want any of the worker threads handling them. + sigset_t sigblockall, sigorig; + sigfillset(&sigblockall); + pthread_sigmask(SIG_SETMASK, &sigblockall, &sigorig); + // and let it run if ( !g_Reflector.Start() ) { std::cout << "Error starting reflector" << std::endl; exit(EXIT_FAILURE); } + + // Restore main thread default signal state + pthread_sigmask(SIG_SETMASK, &sigorig, nullptr); + std::cout << "Reflector " << g_Reflector.GetCallsign() << "started and listening on " << g_Reflector.GetListenIp() << std::endl; -#ifdef RUN_AS_DAEMON - // run forever - while ( true ) - { - // sleep 60 seconds - CTimePoint::TaskSleepFor(60000); - } -#else - // wait any key - for (;;) - { - // sleep 60 seconds - CTimePoint::TaskSleepFor(60000); -#ifdef DEBUG_DUMPFILE - g_Reflector.m_DebugFile.close(); -#endif - } -#endif - // and wait for end + wait_for_termination(); + g_Reflector.Stop(); std::cout << "Reflector stopped" << std::endl;