This commit is contained in:
mark 2023-01-23 09:11:29 -06:00 committed by GitHub
commit 63e068bb8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 145 additions and 52 deletions

View file

@ -97,7 +97,7 @@ protected:
CPacketQueue m_LocalQueue; CPacketQueue m_LocalQueue;
// thread // thread
bool m_bStopThread; std::atomic_bool m_bStopThread;
std::thread *m_pThread; std::thread *m_pThread;
CTimePoint m_TimeoutTimer; CTimePoint m_TimeoutTimer;
CTimePoint m_StatsTimer; CTimePoint m_StatsTimer;

View file

@ -29,6 +29,8 @@
#include "cdmriddirfile.h" #include "cdmriddirfile.h"
#include "cdmriddirhttp.h" #include "cdmriddirhttp.h"
#include <chrono>
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// constructor & destructor // constructor & destructor
@ -42,6 +44,7 @@ CDmridDir::~CDmridDir()
{ {
// kill threads // kill threads
m_bStopThread = true; m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -55,12 +58,12 @@ CDmridDir::~CDmridDir()
bool CDmridDir::Init(void) bool CDmridDir::Init(void)
{ {
// load content // load content
Reload(); Reload();
// reset stop flag // reset stop flag
m_bStopThread = false; m_bStopThread = false;
// start thread; // start thread;
m_pThread = new std::thread(CDmridDir::Thread, this); m_pThread = new std::thread(CDmridDir::Thread, this);
@ -70,6 +73,7 @@ bool CDmridDir::Init(void)
void CDmridDir::Close(void) void CDmridDir::Close(void)
{ {
m_bStopThread = true; m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -83,15 +87,13 @@ void CDmridDir::Close(void)
void CDmridDir::Thread(CDmridDir *This) void CDmridDir::Thread(CDmridDir *This)
{ {
while ( !This->m_bStopThread ) std::chrono::minutes timeout(DMRIDDB_REFRESH_RATE);
while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;}))
{ {
// Wait 30 seconds
CTimePoint::TaskSleepFor(DMRIDDB_REFRESH_RATE * 60000);
// have lists files changed ? // have lists files changed ?
if ( This->NeedReload() ) if ( This->NeedReload() )
{ {
This->Reload(); This->Reload();
} }
} }
} }

View file

@ -31,6 +31,7 @@
#include <netdb.h> #include <netdb.h>
#include "cbuffer.h" #include "cbuffer.h"
#include "ccallsign.h" #include "ccallsign.h"
#include "csimplecondition.h"
// compare function for std::map::find // compare function for std::map::find
@ -84,9 +85,10 @@ protected:
// Lock() // Lock()
std::mutex m_Mutex; std::mutex m_Mutex;
CSimpleCondition m_cv;
// thread // thread
bool m_bStopThread; std::atomic_bool m_bStopThread;
std::thread *m_pThread; std::thread *m_pThread;
}; };

View file

@ -80,8 +80,8 @@ bool CG3Protocol::Init(void)
{ {
// start helper threads // start helper threads
m_pPresenceThread = new std::thread(PresenceThread, this); m_pPresenceThread = new std::thread(PresenceThread, this);
m_pPresenceThread = new std::thread(ConfigThread, this); m_pConfigThread = new std::thread(ConfigThread, this);
m_pPresenceThread = new std::thread(IcmpThread, this); m_pIcmpThread = new std::thread(IcmpThread, this);
} }
#endif #endif
@ -94,6 +94,7 @@ bool CG3Protocol::Init(void)
void CG3Protocol::Close(void) void CG3Protocol::Close(void)
{ {
m_bStopThread = true;
if (m_pPresenceThread != NULL) if (m_pPresenceThread != NULL)
{ {
m_pPresenceThread->join(); m_pPresenceThread->join();

View file

@ -64,7 +64,13 @@ class CG3Protocol : public CProtocol
{ {
public: public:
// constructor // constructor
CG3Protocol() : m_GwAddress(0u), m_Modules("*"), m_LastModTime(0) {}; CG3Protocol() :
m_pPresenceThread(nullptr),
m_pConfigThread(nullptr),
m_pIcmpThread(nullptr),
m_GwAddress(0u),
m_Modules("*"),
m_LastModTime(0) {}
// destructor // destructor
virtual ~CG3Protocol() {}; virtual ~CG3Protocol() {};

View file

@ -25,6 +25,9 @@
#include "main.h" #include "main.h"
#include "ctimepoint.h" #include "ctimepoint.h"
#include "cgatekeeper.h" #include "cgatekeeper.h"
#include "csimplecondition.h"
#include <chrono>
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -47,6 +50,7 @@ CGateKeeper::~CGateKeeper()
{ {
// kill threads // kill threads
m_bStopThread = true; m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -78,6 +82,7 @@ bool CGateKeeper::Init(void)
void CGateKeeper::Close(void) void CGateKeeper::Close(void)
{ {
m_bStopThread = true; m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -179,11 +184,9 @@ bool CGateKeeper::MayTransmit(const CCallsign &callsign, const CIp &ip, int prot
void CGateKeeper::Thread(CGateKeeper *This) void CGateKeeper::Thread(CGateKeeper *This)
{ {
while ( !This->m_bStopThread ) std::chrono::seconds timeout(30);
while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;}))
{ {
// Wait 30 seconds
CTimePoint::TaskSleepFor(30000);
// have lists files changed ? // have lists files changed ?
if ( This->m_NodeWhiteList.NeedReload() ) if ( This->m_NodeWhiteList.NeedReload() )
{ {

View file

@ -30,6 +30,7 @@
#include "cip.h" #include "cip.h"
#include "ccallsignlist.h" #include "ccallsignlist.h"
#include "cpeercallsignlist.h" #include "cpeercallsignlist.h"
#include "csimplecondition.h"
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// class // class
@ -71,8 +72,10 @@ protected:
CPeerCallsignList m_PeerList; CPeerCallsignList m_PeerList;
// thread // thread
bool m_bStopThread; CSimpleCondition m_cv;
std::atomic_bool m_bStopThread;
std::thread *m_pThread; std::thread *m_pThread;
}; };

View file

@ -132,7 +132,7 @@ protected:
CPacketQueue m_Queue; CPacketQueue m_Queue;
// thread // thread
bool m_bStopThread; std::atomic_bool m_bStopThread;
std::thread *m_pThread; std::thread *m_pThread;
// identity // identity

View file

@ -32,6 +32,8 @@
#include "cysfnodedirfile.h" #include "cysfnodedirfile.h"
#include "cysfnodedirhttp.h" #include "cysfnodedirhttp.h"
#include <chrono>
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// constructor // constructor
@ -72,6 +74,7 @@ CReflector::CReflector(const CCallsign &callsign)
CReflector::~CReflector() CReflector::~CReflector()
{ {
m_bStopThreads = true; m_bStopThreads = true;
m_cv.signal();
if ( m_XmlReportThread != NULL ) if ( m_XmlReportThread != NULL )
{ {
m_XmlReportThread->join(); m_XmlReportThread->join();
@ -146,6 +149,7 @@ void CReflector::Stop(void)
{ {
// stop & delete all threads // stop & delete all threads
m_bStopThreads = true; m_bStopThreads = true;
m_cv.signal();
// stop & delete report threads // stop & delete report threads
if ( m_XmlReportThread != NULL ) if ( m_XmlReportThread != NULL )
@ -388,6 +392,7 @@ void CReflector::RouterThread(CReflector *This, CPacketStream *streamIn)
void CReflector::XmlReportThread(CReflector *This) void CReflector::XmlReportThread(CReflector *This)
{ {
const std::chrono::minutes timeout(XML_UPDATE_PERIOD);
while ( !This->m_bStopThreads ) while ( !This->m_bStopThreads )
{ {
// report to xml file // report to xml file
@ -408,8 +413,7 @@ void CReflector::XmlReportThread(CReflector *This)
} }
#endif #endif
// and wait a bit This->m_cv.wait(timeout, [&]{return This->m_bStopThreads==true;});
CTimePoint::TaskSleepFor(XML_UPDATE_PERIOD * 1000);
} }
} }
@ -493,7 +497,8 @@ void CReflector::JsonReportThread(CReflector *This)
case NOTIFICATION_NONE: case NOTIFICATION_NONE:
default: default:
// nothing to do, just sleep a bit // nothing to do, just sleep a bit
CTimePoint::TaskSleepFor(250); std::chrono::milliseconds timeout(250);
This->m_cv.wait(timeout, [&]{return This->m_bStopThreads==true;});
break; break;
} }
} }

View file

@ -31,6 +31,8 @@
#include "cprotocols.h" #include "cprotocols.h"
#include "cpacketstream.h" #include "cpacketstream.h"
#include "cnotificationqueue.h" #include "cnotificationqueue.h"
#include "cysfnodedir.h"
#include "csimplecondition.h"
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -137,7 +139,8 @@ protected:
std::array<CPacketStream, NB_OF_MODULES> m_Streams; std::array<CPacketStream, NB_OF_MODULES> m_Streams;
// threads // threads
bool m_bStopThreads; CSimpleCondition m_cv;
std::atomic_bool m_bStopThreads;
std::array<std::thread *, NB_OF_MODULES> m_RouterThreads; std::array<std::thread *, NB_OF_MODULES> m_RouterThreads;
std::thread *m_XmlReportThread; std::thread *m_XmlReportThread;
std::thread *m_JsonReportThread; std::thread *m_JsonReportThread;

41
src/csimplecondition.h Normal file
View file

@ -0,0 +1,41 @@
#pragma once
#include <condition_variable>
#include <mutex>
class CSimpleCondition final
{
public:
CSimpleCondition() : m_Mutex(), m_Condition() {}
CSimpleCondition(const CSimpleCondition&) = delete;
CSimpleCondition& operator=(const CSimpleCondition&) = delete;
CSimpleCondition(CSimpleCondition&&) = delete;
~CSimpleCondition() {};
// Wait up to @duration to be signaled, or until @predicate is true.
// Returns result of predicate after timing out or being signaled.
template<typename Duration, typename Predicate>
bool wait(Duration, Predicate);
// Signal waiters. If @all is true, all waiters will be woken up.
void signal(bool all=true)
{
if (all)
m_Condition.notify_all();
else
m_Condition.notify_one();
}
private:
std::mutex m_Mutex;
std::condition_variable m_Condition;
};
// Note: @timeout is a relative duration, e.g., "30s".
template<typename Duration, typename Predicate>
bool CSimpleCondition::wait(Duration timeout, Predicate predicate)
{
std::unique_lock<std::mutex> lock(m_Mutex);
auto bound = std::chrono::system_clock::now() + timeout;
return m_Condition.wait_until(lock, bound, predicate);
}

View file

@ -92,7 +92,7 @@ protected:
uint16 m_PortOpenStream; uint16 m_PortOpenStream;
// thread // thread
bool m_bStopThread; std::atomic_bool m_bStopThread;
std::thread *m_pThread; std::thread *m_pThread;
// socket // socket

View file

@ -86,7 +86,7 @@ protected:
CWiresxPacketQueue m_PacketQueue; CWiresxPacketQueue m_PacketQueue;
// thread // thread
bool m_bStopThread; std::atomic_bool m_bStopThread;
std::thread *m_pThread; std::thread *m_pThread;
}; };

View file

@ -26,7 +26,7 @@
#include "main.h" #include "main.h"
#include "creflector.h" #include "creflector.h"
#include "cysfnodedir.h" #include "cysfnodedir.h"
#include <chrono>
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// constructor & destructor // constructor & destructor
@ -41,6 +41,7 @@ CYsfNodeDir::~CYsfNodeDir()
{ {
// kill threads // kill threads
m_bStopThread = true; m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -69,6 +70,7 @@ bool CYsfNodeDir::Init(void)
void CYsfNodeDir::Close(void) void CYsfNodeDir::Close(void)
{ {
m_bStopThread = true; m_bStopThread = true;
m_cv.signal();
if ( m_pThread != NULL ) if ( m_pThread != NULL )
{ {
m_pThread->join(); m_pThread->join();
@ -82,11 +84,9 @@ void CYsfNodeDir::Close(void)
void CYsfNodeDir::Thread(CYsfNodeDir *This) void CYsfNodeDir::Thread(CYsfNodeDir *This)
{ {
while ( !This->m_bStopThread ) const std::chrono::minutes timeout(YSFNODEDB_REFRESH_RATE);
while (!This->m_cv.wait(timeout, [&]{return This->m_bStopThread==true;}))
{ {
// Wait 30 seconds
CTimePoint::TaskSleepFor(YSFNODEDB_REFRESH_RATE * 60000);
// have lists files changed ? // have lists files changed ?
if ( This->NeedReload() ) if ( This->NeedReload() )
{ {

View file

@ -32,6 +32,7 @@
#include "cbuffer.h" #include "cbuffer.h"
#include "ccallsign.h" #include "ccallsign.h"
#include "cysfnode.h" #include "cysfnode.h"
#include "csimplecondition.h"
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
// define // define
@ -84,9 +85,10 @@ protected:
protected: protected:
// Lock() // Lock()
std::mutex m_Mutex; std::mutex m_Mutex;
// thread // thread
bool m_bStopThread; CSimpleCondition m_cv;
std::atomic_bool m_bStopThread;
std::thread *m_pThread; std::thread *m_pThread;
}; };

View file

@ -26,6 +26,7 @@
#include "creflector.h" #include "creflector.h"
#include "syslog.h" #include "syslog.h"
#include <csignal>
#include <sys/stat.h> #include <sys/stat.h>
@ -39,6 +40,37 @@ CReflector g_Reflector;
#include "cusers.h" #include "cusers.h"
// Returns caught termination signal or -1 on error
static int wait_for_termination()
{
sigset_t waitset;
siginfo_t siginfo;
sigemptyset(&waitset);
sigaddset(&waitset, SIGTERM);
sigaddset(&waitset, SIGINT);
sigaddset(&waitset, SIGQUIT);
sigaddset(&waitset, SIGHUP);
pthread_sigmask(SIG_BLOCK, &waitset, nullptr);
// Wait for a termination signal
int result = -1;
while (result < 0)
{
result = sigwaitinfo(&waitset, &siginfo);
if (result == -1 && errno == EINTR)
{
// try again
if (errno == EINTR)
continue;
// an unexpected error occurred, consider it fatal
break;
}
}
return result;
}
int main(int argc, const char * argv[]) int main(int argc, const char * argv[])
{ {
#ifdef RUN_AS_DAEMON #ifdef RUN_AS_DAEMON
@ -101,36 +133,29 @@ int main(int argc, const char * argv[])
g_Reflector.SetCallsign(argv[1]); g_Reflector.SetCallsign(argv[1]);
g_Reflector.SetListenIp(CIp(argv[2])); g_Reflector.SetListenIp(CIp(argv[2]));
g_Reflector.SetTranscoderIp(CIp(CIp(argv[3]))); g_Reflector.SetTranscoderIp(CIp(CIp(argv[3])));
// Block all signals while starting up the reflector -- we don't
// want any of the worker threads handling them.
sigset_t sigblockall, sigorig;
sigfillset(&sigblockall);
pthread_sigmask(SIG_SETMASK, &sigblockall, &sigorig);
// and let it run // and let it run
if ( !g_Reflector.Start() ) if ( !g_Reflector.Start() )
{ {
std::cout << "Error starting reflector" << std::endl; std::cout << "Error starting reflector" << std::endl;
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// Restore main thread default signal state
pthread_sigmask(SIG_SETMASK, &sigorig, nullptr);
std::cout << "Reflector " << g_Reflector.GetCallsign() std::cout << "Reflector " << g_Reflector.GetCallsign()
<< "started and listening on " << g_Reflector.GetListenIp() << std::endl; << "started and listening on " << g_Reflector.GetListenIp() << std::endl;
#ifdef RUN_AS_DAEMON
// run forever
while ( true )
{
// sleep 60 seconds
CTimePoint::TaskSleepFor(60000);
}
#else
// wait any key
for (;;)
{
// sleep 60 seconds
CTimePoint::TaskSleepFor(60000);
#ifdef DEBUG_DUMPFILE
g_Reflector.m_DebugFile.close();
#endif
}
#endif
// and wait for end // and wait for end
wait_for_termination();
g_Reflector.Stop(); g_Reflector.Stop();
std::cout << "Reflector stopped" << std::endl; std::cout << "Reflector stopped" << std::endl;