ThreadBase rewritten (wip)

This commit is contained in:
Nekotekina 2015-07-01 01:25:52 +03:00
parent b7a320fbbd
commit 3aefa2b4e1
85 changed files with 1960 additions and 2183 deletions

View file

@ -2,6 +2,11 @@
#include <emmintrin.h>
// temporarily (until noexcept is available); use `noexcept(true)` instead of `noexcept` if necessary
#if defined(_MSC_VER) && _MSC_VER <= 1800
#define noexcept _NOEXCEPT_OP
#endif
#if defined(_MSC_VER)
#define thread_local __declspec(thread)
#elif __APPLE__

View file

@ -14,7 +14,7 @@ std::unique_ptr<LogManager> g_log_manager;
u32 LogMessage::size() const
{
//1 byte for NULL terminator
return (u32)(sizeof(LogMessage::size_type) + sizeof(LogType) + sizeof(LogSeverity) + sizeof(std::string::value_type) * mText.size() + 1);
return (u32)(sizeof(LogMessage::size_type) + sizeof(LogType) + sizeof(Severity) + sizeof(std::string::value_type) * mText.size() + 1);
}
void LogMessage::serialize(char *output) const
@ -24,8 +24,8 @@ void LogMessage::serialize(char *output) const
output += sizeof(LogMessage::size_type);
memcpy(output, &mType, sizeof(LogType));
output += sizeof(LogType);
memcpy(output, &mServerity, sizeof(LogSeverity));
output += sizeof(LogSeverity);
memcpy(output, &mServerity, sizeof(Severity));
output += sizeof(Severity);
memcpy(output, mText.c_str(), mText.size() );
output += sizeof(std::string::value_type)*mText.size();
*output = '\0';
@ -38,13 +38,13 @@ LogMessage LogMessage::deserialize(char *input, u32* size_out)
input += sizeof(LogMessage::size_type);
msg.mType = *(reinterpret_cast<LogType*>(input));
input += sizeof(LogType);
msg.mServerity = *(reinterpret_cast<LogSeverity*>(input));
input += sizeof(LogSeverity);
msg.mServerity = *(reinterpret_cast<Severity*>(input));
input += sizeof(Severity);
if (msgSize > 9000)
{
int wtf = 6;
}
msg.mText.append(input, msgSize - 1 - sizeof(LogSeverity) - sizeof(LogType));
msg.mText.append(input, msgSize - 1 - sizeof(Severity) - sizeof(LogType));
if (size_out){(*size_out) = msgSize;}
return msg;
}
@ -57,7 +57,7 @@ LogChannel::LogChannel() : LogChannel("unknown")
LogChannel::LogChannel(const std::string& name) :
name(name)
, mEnabled(true)
, mLogLevel(LogSeverityWarning)
, mLogLevel(Severity::Warning)
{}
void LogChannel::log(const LogMessage &msg)
@ -186,22 +186,22 @@ void LogManager::log(LogMessage msg)
std::string prefix;
switch (msg.mServerity)
{
case LogSeveritySuccess:
case Severity::Success:
prefix = "S ";
break;
case LogSeverityNotice:
case Severity::Notice:
prefix = "! ";
break;
case LogSeverityWarning:
case Severity::Warning:
prefix = "W ";
break;
case LogSeverityError:
case Severity::Error:
prefix = "E ";
break;
}
if (NamedThreadBase* thr = GetCurrentNamedThread())
if (auto thr = get_current_thread_ctrl())
{
prefix += "{" + thr->GetThreadName() + "} ";
prefix += "{" + thr->get_name() + "} ";
}
msg.mText.insert(0, prefix);
msg.mText.append(1,'\n');
@ -248,12 +248,12 @@ LogChannel &LogManager::getChannel(LogType type)
return mChannels[static_cast<u32>(type)];
}
void log_message(Log::LogType type, Log::LogSeverity sev, const char* text)
void log_message(Log::LogType type, Log::Severity sev, const char* text)
{
log_message(type, sev, std::string(text));
}
void log_message(Log::LogType type, Log::LogSeverity sev, std::string text)
void log_message(Log::LogType type, Log::Severity sev, std::string text)
{
if (g_log_manager)
{
@ -265,12 +265,12 @@ void log_message(Log::LogType type, Log::LogSeverity sev, std::string text)
else
{
rMessageBox(text,
sev == LogSeverityNotice ? "Notice" :
sev == LogSeverityWarning ? "Warning" :
sev == LogSeveritySuccess ? "Success" :
sev == LogSeverityError ? "Error" : "Unknown",
sev == LogSeverityNotice ? rICON_INFORMATION :
sev == LogSeverityWarning ? rICON_EXCLAMATION :
sev == LogSeverityError ? rICON_ERROR : rICON_INFORMATION);
sev == Severity::Notice ? "Notice" :
sev == Severity::Warning ? "Warning" :
sev == Severity::Success ? "Success" :
sev == Severity::Error ? "Error" : "Unknown",
sev == Severity::Notice ? rICON_INFORMATION :
sev == Severity::Warning ? rICON_EXCLAMATION :
sev == Severity::Error ? rICON_ERROR : rICON_INFORMATION);
}
}

View file

@ -5,10 +5,10 @@
//first parameter is of type Log::LogType and text is of type std::string
#define LOG_SUCCESS(logType, text, ...) log_message(logType, Log::LogSeveritySuccess, text, ##__VA_ARGS__)
#define LOG_NOTICE(logType, text, ...) log_message(logType, Log::LogSeverityNotice, text, ##__VA_ARGS__)
#define LOG_WARNING(logType, text, ...) log_message(logType, Log::LogSeverityWarning, text, ##__VA_ARGS__)
#define LOG_ERROR(logType, text, ...) log_message(logType, Log::LogSeverityError, text, ##__VA_ARGS__)
#define LOG_SUCCESS(logType, text, ...) log_message(logType, Log::Severity::Success, text, ##__VA_ARGS__)
#define LOG_NOTICE(logType, text, ...) log_message(logType, Log::Severity::Notice, text, ##__VA_ARGS__)
#define LOG_WARNING(logType, text, ...) log_message(logType, Log::Severity::Warning, text, ##__VA_ARGS__)
#define LOG_ERROR(logType, text, ...) log_message(logType, Log::Severity::Error, text, ##__VA_ARGS__)
namespace Log
{
@ -48,19 +48,19 @@ namespace Log
{ TTY, "TTY: " }
} };
enum LogSeverity : u32
enum class Severity : u32
{
LogSeverityNotice = 0,
LogSeverityWarning,
LogSeveritySuccess,
LogSeverityError,
Notice = 0,
Warning,
Success,
Error,
};
struct LogMessage
{
using size_type = u32;
LogType mType;
LogSeverity mServerity;
Severity mServerity;
std::string mText;
u32 size() const;
@ -86,7 +86,7 @@ namespace Log
std::string name;
private:
bool mEnabled;
LogSeverity mLogLevel;
Severity mLogLevel;
std::mutex mListenerLock;
std::set<std::shared_ptr<LogListener>> mListeners;
};
@ -126,10 +126,10 @@ static struct { inline operator Log::LogType() { return Log::LogType::SPU; } } S
static struct { inline operator Log::LogType() { return Log::LogType::ARMv7; } } ARMv7;
static struct { inline operator Log::LogType() { return Log::LogType::TTY; } } TTY;
void log_message(Log::LogType type, Log::LogSeverity sev, const char* text);
void log_message(Log::LogType type, Log::LogSeverity sev, std::string text);
void log_message(Log::LogType type, Log::Severity sev, const char* text);
void log_message(Log::LogType type, Log::Severity sev, std::string text);
template<typename... Args> never_inline void log_message(Log::LogType type, Log::LogSeverity sev, const char* fmt, Args... args)
template<typename... Args> never_inline void log_message(Log::LogType type, Log::Severity sev, const char* fmt, Args... args)
{
log_message(type, sev, fmt::Format(fmt, fmt::do_unveil(args)...));
}

View file

@ -277,6 +277,39 @@ namespace fmt
return Format(fmt, do_unveil(args)...);
}
struct exception
{
std::unique_ptr<char[]> message;
template<typename... Args> never_inline safe_buffers exception(const char* file, int line, const char* func, const char* text, Args... args)
{
const std::string data = format(text, args...) + format("\n(in file %s:%d, in function %s)", file, line, func);
message = std::make_unique<char[]>(data.size() + 1);
std::memcpy(message.get(), data.c_str(), data.size() + 1);
}
exception(const exception& other)
{
const std::size_t size = std::strlen(other);
message = std::make_unique<char[]>(size + 1);
std::memcpy(message.get(), other, size + 1);
}
exception(exception&& other)
{
message = std::move(other.message);
}
operator const char*() const
{
return message.get();
}
};
//convert a wxString to a std::string encoded in utf8
//CAUTION, only use this to interface with wxWidgets classes
std::string ToUTF8(const wxString& right);

View file

@ -1108,7 +1108,7 @@ const PVOID exception_handler = (atexit([]{ RemoveVectoredExceptionHandler(excep
if (pExp->ExceptionRecord->ExceptionCode == EXCEPTION_ACCESS_VIOLATION &&
(u32)addr64 == addr64 &&
GetCurrentNamedThread() &&
get_current_thread_ctrl() &&
handle_access_violation((u32)addr64, is_writing, pExp->ContextRecord))
{
return EXCEPTION_CONTINUE_EXECUTION;
@ -1119,6 +1119,13 @@ const PVOID exception_handler = (atexit([]{ RemoveVectoredExceptionHandler(excep
}
}));
const auto exception_filter = SetUnhandledExceptionFilter([](PEXCEPTION_POINTERS pExp) -> LONG
{
_se_translator(pExp->ExceptionRecord->ExceptionCode, pExp);
return EXCEPTION_CONTINUE_SEARCH;
});
#else
void signal_handler(int sig, siginfo_t* info, void* uct)
@ -1131,7 +1138,7 @@ void signal_handler(int sig, siginfo_t* info, void* uct)
const bool is_writing = ((ucontext_t*)uct)->uc_mcontext.gregs[REG_ERR] & 0x2;
#endif
if ((u32)addr64 == addr64 && GetCurrentNamedThread())
if ((u32)addr64 == addr64 && get_current_thread_ctrl())
{
if (handle_access_violation((u32)addr64, is_writing, (ucontext_t*)uct))
{
@ -1158,19 +1165,23 @@ const int sigaction_result = []() -> int
#endif
thread_local NamedThreadBase* g_tls_this_thread = nullptr;
std::atomic<u32> g_thread_count(0);
thread_local thread_ctrl_t* g_tls_this_thread = nullptr;
NamedThreadBase* GetCurrentNamedThread()
const thread_ctrl_t* get_current_thread_ctrl()
{
return g_tls_this_thread;
}
void SetCurrentNamedThread(NamedThreadBase* value)
std::string thread_ctrl_t::get_name() const
{
return name();
}
void thread_ctrl_t::set_current()
{
const auto old_value = g_tls_this_thread;
if (old_value == value)
if (old_value == this)
{
return;
}
@ -1180,76 +1191,82 @@ void SetCurrentNamedThread(NamedThreadBase* value)
vm::reservation_free();
}
if (value && value->m_tls_assigned.exchange(true))
if (true && assigned.exchange(true))
{
LOG_ERROR(GENERAL, "Thread '%s' was already assigned to g_tls_this_thread of another thread", value->GetThreadName());
LOG_ERROR(GENERAL, "Thread '%s' was already assigned to g_tls_this_thread of another thread", get_name());
g_tls_this_thread = nullptr;
}
else
{
g_tls_this_thread = value;
g_tls_this_thread = this;
}
if (old_value)
{
old_value->m_tls_assigned = false;
old_value->assigned = false;
}
}
std::string NamedThreadBase::GetThreadName() const
thread_t::thread_t(std::function<std::string()> name, std::function<void()> func)
{
return m_name;
start(std::move(name), func);
}
void NamedThreadBase::SetThreadName(const std::string& name)
thread_t::~thread_t()
{
m_name = name;
}
void NamedThreadBase::WaitForAnySignal(u64 time) // wait for Notify() signal or sleep
{
std::unique_lock<std::mutex> lock(m_signal_mtx);
m_signal_cv.wait_for(lock, std::chrono::milliseconds(time));
}
void NamedThreadBase::Notify() // wake up waiting thread or nothing
{
m_signal_cv.notify_one();
}
ThreadBase::ThreadBase(const std::string& name)
: NamedThreadBase(name)
, m_executor(nullptr)
, m_destroy(false)
, m_alive(false)
{
}
ThreadBase::~ThreadBase()
{
if(IsAlive())
Stop(false);
delete m_executor;
m_executor = nullptr;
}
void ThreadBase::Start()
{
if(m_executor) Stop();
std::lock_guard<std::mutex> lock(m_main_mutex);
m_destroy = false;
m_alive = true;
m_executor = new std::thread([this]()
if (m_thread)
{
SetCurrentThreadDebugName(GetThreadName().c_str());
if (g_tls_this_thread != m_thread.get())
{
m_thread->m_thread.join();
}
else
{
m_thread->m_thread.detach();
}
}
}
std::string thread_t::get_name() const
{
if (!m_thread)
{
throw EXCEPTION("Invalid thread");
}
if (!m_thread->name)
{
throw EXCEPTION("Invalid name getter");
}
return m_thread->name();
}
std::atomic<u32> g_thread_count{ 0 };
void thread_t::start(std::function<std::string()> name, std::function<void()> func)
{
if (m_thread)
{
throw EXCEPTION("Thread already exists");
}
// create new ctrl and assign it
auto ctrl = std::make_shared<thread_ctrl_t>(std::move(name));
// start thread
ctrl->m_thread = std::thread([ctrl, func]()
{
g_thread_count++;
SetCurrentThreadDebugName(ctrl->get_name().c_str());
#if defined(_MSC_VER)
auto old_se_translator = _set_se_translator(_se_translator);
#endif
#ifdef _WIN32
auto old_se_translator = _set_se_translator(_se_translator);
if (!exception_handler)
if (!exception_handler || !exception_filter)
{
LOG_ERROR(GENERAL, "exception_handler not set");
return;
@ -1262,238 +1279,139 @@ void ThreadBase::Start()
}
#endif
SetCurrentNamedThread(this);
g_thread_count++;
// error handler
const auto error = [&](const char* text)
{
log_message(GENERAL, Emu.IsStopped() ? Log::Severity::Warning : Log::Severity::Error, "Exception: %s", text);
Emu.Pause();
};
try
{
Task();
}
catch (const char* e)
{
LOG_ERROR(GENERAL, "Exception: %s", e);
DumpInformation();
Emu.Pause();
}
catch (const std::string& e)
{
LOG_ERROR(GENERAL, "Exception: %s", e);
DumpInformation();
Emu.Pause();
}
ctrl->set_current();
m_alive = false;
SetCurrentNamedThread(nullptr);
g_thread_count--;
if (Ini.HLELogging.GetValue())
{
LOG_NOTICE(GENERAL, "Thread started");
}
#ifdef _WIN32
_set_se_translator(old_se_translator);
#endif
});
}
void ThreadBase::Stop(bool wait, bool send_destroy)
{
std::lock_guard<std::mutex> lock(m_main_mutex);
if (send_destroy)
m_destroy = true;
if(!m_executor)
return;
if(wait && m_executor->joinable() && m_alive)
{
m_executor->join();
}
else
{
m_executor->detach();
}
delete m_executor;
m_executor = nullptr;
}
bool ThreadBase::Join() const
{
std::lock_guard<std::mutex> lock(m_main_mutex);
if(m_executor->joinable() && m_alive && m_executor != nullptr)
{
m_executor->join();
return true;
}
return false;
}
bool ThreadBase::IsAlive() const
{
std::lock_guard<std::mutex> lock(m_main_mutex);
return m_alive;
}
bool ThreadBase::TestDestroy() const
{
return m_destroy;
}
thread_t::thread_t(const std::string& name, bool autojoin, std::function<void()> func)
: m_name(name)
, m_state(TS_NON_EXISTENT)
, m_autojoin(autojoin)
{
start(func);
}
thread_t::thread_t(const std::string& name, std::function<void()> func)
: m_name(name)
, m_state(TS_NON_EXISTENT)
, m_autojoin(false)
{
start(func);
}
thread_t::thread_t(const std::string& name)
: m_name(name)
, m_state(TS_NON_EXISTENT)
, m_autojoin(false)
{
}
thread_t::thread_t()
: m_state(TS_NON_EXISTENT)
, m_autojoin(false)
{
}
void thread_t::set_name(const std::string& name)
{
m_name = name;
}
thread_t::~thread_t()
{
if (m_state == TS_JOINABLE)
{
if (m_autojoin)
{
m_thr.join();
}
else
{
m_thr.detach();
}
}
}
void thread_t::start(std::function<void()> func)
{
if (m_state.exchange(TS_NON_EXISTENT) == TS_JOINABLE)
{
m_thr.join(); // forcefully join previously created thread
}
std::string name = m_name;
m_thr = std::thread([func, name]()
{
SetCurrentThreadDebugName(name.c_str());
#ifdef _WIN32
auto old_se_translator = _set_se_translator(_se_translator);
#endif
NamedThreadBase info(name);
SetCurrentNamedThread(&info);
g_thread_count++;
if (Ini.HLELogging.GetValue())
{
LOG_NOTICE(HLE, name + " started");
}
try
{
func();
}
catch (const char* e)
{
LOG_ERROR(GENERAL, "Exception: %s", e);
Emu.Pause();
error(e);
}
catch (const std::string& e)
{
LOG_ERROR(GENERAL, "Exception: %s", e.c_str());
Emu.Pause();
error(e.c_str());
}
catch (const fmt::exception& e)
{
error(e);
}
if (Emu.IsStopped())
{
LOG_NOTICE(HLE, name + " aborted");
LOG_NOTICE(GENERAL, "Thread aborted");
}
else if (Ini.HLELogging.GetValue())
{
LOG_NOTICE(HLE, name + " ended");
LOG_NOTICE(GENERAL, "Thread ended");
}
SetCurrentNamedThread(nullptr);
//ctrl->set_current(false);
g_thread_count--;
#ifdef _WIN32
ctrl->joinable = false;
ctrl->join_cv.notify_all();
#if defined(_MSC_VER)
_set_se_translator(old_se_translator);
#endif
});
if (m_state.exchange(TS_JOINABLE) == TS_JOINABLE)
{
assert(!"thread_t::start() failed"); // probably started from another thread
}
// set
m_thread = std::move(ctrl);
}
void thread_t::detach()
{
if (m_state.exchange(TS_NON_EXISTENT) == TS_JOINABLE)
if (!m_thread)
{
m_thr.detach();
throw EXCEPTION("Invalid thread");
}
else
const auto ctrl = std::move(m_thread);
ctrl->m_thread.detach();
}
void thread_t::join(std::unique_lock<std::mutex>& lock)
{
if (!m_thread)
{
assert(!"thread_t::detach() failed"); // probably joined or detached
throw EXCEPTION("Invalid thread");
}
if (g_tls_this_thread == m_thread.get())
{
throw EXCEPTION("Deadlock");
}
const auto ctrl = std::move(m_thread);
// wait for completion
while (ctrl->joinable)
{
CHECK_EMU_STATUS;
ctrl->join_cv.wait_for(lock, std::chrono::milliseconds(1));
}
ctrl->m_thread.join();
}
void thread_t::join()
{
if (m_state.exchange(TS_NON_EXISTENT) == TS_JOINABLE)
if (!m_thread)
{
m_thr.join();
throw EXCEPTION("Invalid thread");
}
else
if (g_tls_this_thread == m_thread.get())
{
assert(!"thread_t::join() failed"); // probably joined or detached
throw EXCEPTION("Deadlock");
}
const auto ctrl = std::move(m_thread);
ctrl->m_thread.join();
}
bool thread_t::joinable() const
bool thread_t::is_current() const
{
//return m_thr.joinable();
return m_state == TS_JOINABLE;
if (!m_thread)
{
throw EXCEPTION("Invalid thread");
}
return g_tls_this_thread == m_thread.get();
}
bool waiter_map_t::is_stopped(u32 addr)
void waiter_map_t::check_emu_status(u32 addr)
{
if (Emu.IsStopped())
{
LOG_WARNING(Log::HLE, "%s: waiter_op() aborted (addr=0x%x)", name.c_str(), addr);
return true;
throw EXCEPTION("Aborted (emulation stopped) (%s, addr=0x%x)", name, addr);
}
return false;
}
void waiter_map_t::notify(u32 addr)
{
// signal appropriate condition variable
cv[get_hash(addr)].notify_all();
// signal an appropriate condition variable
cvs[get_hash(addr)].notify_all();
}
const std::function<bool()> SQUEUE_ALWAYS_EXIT = [](){ return true; };

View file

@ -1,112 +1,95 @@
#pragma once
class NamedThreadBase
const class thread_ctrl_t* get_current_thread_ctrl();
// named thread control class
class thread_ctrl_t final
{
std::string m_name;
std::condition_variable m_signal_cv;
std::mutex m_signal_mtx;
friend class thread_t;
// thread handler
std::thread m_thread;
// name getter
const std::function<std::string()> name;
// condition variable, notified before thread exit
std::condition_variable join_cv;
// thread status (set to false after execution)
std::atomic<bool> joinable{ true };
// true if TLS of some thread points to owner
std::atomic<bool> assigned{ false };
// assign TLS
void set_current();
public:
std::atomic<bool> m_tls_assigned;
NamedThreadBase(const std::string& name) : m_name(name), m_tls_assigned(false)
thread_ctrl_t(std::function<std::string()> name)
: name(std::move(name))
{
}
NamedThreadBase() : m_tls_assigned(false)
{
}
virtual std::string GetThreadName() const;
virtual void SetThreadName(const std::string& name);
void WaitForAnySignal(u64 time = 1);
void Notify();
virtual void DumpInformation() {}
};
NamedThreadBase* GetCurrentNamedThread();
void SetCurrentNamedThread(NamedThreadBase* value);
class ThreadBase : public NamedThreadBase
{
protected:
std::atomic<bool> m_destroy;
std::atomic<bool> m_alive;
std::thread* m_executor;
mutable std::mutex m_main_mutex;
ThreadBase(const std::string& name);
~ThreadBase();
public:
void Start();
void Stop(bool wait = true, bool send_destroy = true);
bool Join() const;
bool IsAlive() const;
bool TestDestroy() const;
virtual void Task() = 0;
// get thread name
std::string get_name() const;
};
class thread_t
{
enum thread_state_t
{
TS_NON_EXISTENT,
TS_JOINABLE,
};
std::atomic<thread_state_t> m_state;
std::string m_name;
std::thread m_thr;
bool m_autojoin;
// pointer to managed resource (shared with actual thread)
std::shared_ptr<thread_ctrl_t> m_thread;
public:
thread_t(const std::string& name, bool autojoin, std::function<void()> func);
thread_t(const std::string& name, std::function<void()> func);
thread_t(const std::string& name);
thread_t();
~thread_t();
// thread mutex for external use
std::mutex mutex;
thread_t(const thread_t& right) = delete;
thread_t(thread_t&& right) = delete;
thread_t& operator =(const thread_t& right) = delete;
thread_t& operator =(thread_t&& right) = delete;
// thread condition variable for external use
std::condition_variable cv;
public:
void set_name(const std::string& name);
void start(std::function<void()> func);
// initialize in empty state
thread_t() = default;
// create named thread
thread_t(std::function<std::string()> name, std::function<void()> func);
// destructor, joins automatically
virtual ~thread_t();
thread_t(const thread_t&) = delete;
thread_t& operator =(const thread_t&) = delete;
public:
// get thread name
std::string get_name() const;
// create named thread (current state must be empty)
void start(std::function<std::string()> name, std::function<void()> func);
// detach thread -> empty state
void detach();
// join thread (provide locked unique_lock, for example, lv2_lock, for interruptibility) -> empty state
void join(std::unique_lock<std::mutex>& lock);
// join thread -> empty state
void join();
bool joinable() const;
};
class slw_mutex_t
{
};
class slw_recursive_mutex_t
{
};
class slw_shared_mutex_t
{
// check if not empty
bool joinable() const { return m_thread.operator bool(); }
// check whether it is the current running thread
bool is_current() const;
};
struct waiter_map_t
{
static const size_t size = 16;
std::array<std::mutex, size> mutex;
std::array<std::condition_variable, size> cv;
std::array<std::mutex, size> mutexes;
std::array<std::condition_variable, size> cvs;
const std::string name;
@ -124,33 +107,32 @@ struct waiter_map_t
return addr % size;
}
// check emu status
bool is_stopped(u32 addr);
void check_emu_status(u32 addr);
// wait until waiter_func() returns true, signal_id is an arbitrary number
// wait until pred() returns true, `addr` is an arbitrary number
template<typename F, typename... Args> safe_buffers auto wait_op(u32 addr, F pred, Args&&... args) -> decltype(static_cast<void>(pred(args...)))
{
const u32 hash = get_hash(addr);
// set mutex locker
std::unique_lock<std::mutex> lock(mutex[hash], std::defer_lock);
std::unique_lock<std::mutex> lock(mutexes[hash], std::defer_lock);
while (true)
{
// check the condition
if (pred(args...)) return;
check_emu_status(addr);
// lock the mutex and initialize waiter (only once)
if (!lock) lock.lock();
// wait on appropriate condition variable for 1 ms or until signal arrived
cv[hash].wait_for(lock, std::chrono::milliseconds(1));
if (is_stopped(addr)) return;
// wait on an appropriate cond var for 1 ms or until a signal arrived
cvs[hash].wait_for(lock, std::chrono::milliseconds(1));
}
}
// signal all threads waiting on waiter_op() with the same signal_id (signaling only hints those threads that corresponding conditions are *probably* met)
// signal all threads waiting on wait_op() with the same `addr` (signaling only hints those threads that corresponding conditions are *probably* met)
void notify(u32 addr);
};