This commit is contained in:
Nekotekina 2016-05-13 17:01:48 +03:00
parent e2d82394f6
commit 266db1336d
81 changed files with 2247 additions and 1731 deletions

View file

@ -6,98 +6,70 @@
#include "Utilities/Thread.h"
#include "Utilities/SharedMutex.h"
extern std::condition_variable& get_current_thread_cv();
extern std::mutex& get_current_thread_mutex();
#include <unordered_set>
namespace vm
{
static shared_mutex s_mutex;
static std::unordered_set<waiter*> s_waiters(256);
static std::unordered_set<waiter_base*, pointer_hash<waiter_base>> s_waiters(256);
bool waiter::try_notify()
void waiter_base::initialize(u32 addr, u32 size)
{
{
std::lock_guard<mutex_t> lock(*mutex);
EXPECTS(addr && (size & (~size + 1)) == size && (addr & (size - 1)) == 0);
try
{
// Test predicate
if (!pred || !pred())
{
return false;
}
// Clear predicate
pred = nullptr;
}
catch (...)
{
// Capture any exception possibly thrown by predicate
pred = [exception = std::current_exception()]() -> bool
{
// New predicate will throw the captured exception from the original thread
std::rethrow_exception(exception);
};
}
// Set addr and mask to invalid values to prevent further polling
addr = 0;
mask = ~0;
}
// Signal thread
cond->notify_one();
return true;
}
waiter::~waiter()
{
}
waiter_lock::waiter_lock(u32 addr, u32 size)
: m_lock(get_current_thread_mutex(), std::defer_lock)
{
Expects(addr && (size & (~size + 1)) == size && (addr & (size - 1)) == 0);
m_waiter.mutex = m_lock.mutex();
m_waiter.cond = &get_current_thread_cv();
m_waiter.addr = addr;
m_waiter.mask = ~(size - 1);
this->addr = addr;
this->mask = ~(size - 1);
this->thread = thread_ctrl::get_current();
{
writer_lock lock(s_mutex);
s_waiters.emplace(&m_waiter);
s_waiters.emplace(this);
}
m_lock.lock();
// Wait until thread == nullptr
thread_lock(), thread_ctrl::wait(WRAP_EXPR(!thread || test()));
}
void waiter_lock::wait()
bool waiter_base::try_notify()
{
// If another thread successfully called pred(), it must be set to null
while (m_waiter.pred)
const auto _t = atomic_storage<thread_ctrl*>::load(thread);
if (UNLIKELY(!_t))
{
// If pred() called by another thread threw an exception, it'll be rethrown
if (m_waiter.pred())
{
return;
}
CHECK_EMU_STATUS;
m_waiter.cond->wait(m_lock);
// Return if thread not found
return false;
}
// Lock the thread
_t->lock();
try
{
// Test predicate
if (UNLIKELY(!thread || !test()))
{
_t->unlock();
return false;
}
}
catch (...)
{
// Capture any exception thrown by the predicate
_t->set_exception(std::current_exception());
}
// Signal the thread with nullptr
atomic_storage<thread_ctrl*>::store(thread, nullptr);
_t->unlock();
_t->notify();
return true;
}
waiter_lock::~waiter_lock()
waiter_base::~waiter_base()
{
if (m_lock) m_lock.unlock();
writer_lock lock(s_mutex);
s_waiters.erase(&m_waiter);
s_waiters.erase(this);
}
void notify_at(u32 addr, u32 size)
@ -114,44 +86,37 @@ namespace vm
}
}
static bool notify_all()
// Return amount of threads which are not notified
static std::size_t notify_all()
{
reader_lock lock(s_mutex);
std::size_t waiters = 0;
std::size_t signaled = 0;
for (const auto _w : s_waiters)
{
if (_w->addr)
if (_w->try_notify())
{
waiters++;
if (_w->try_notify())
{
signaled++;
}
signaled++;
}
}
// return true if waiter list is empty or all available waiters were signaled
return waiters == signaled;
return s_waiters.size() - signaled;
}
void start()
{
// start notification thread
thread_ctrl::spawn("vm::start thread", []()
thread_ctrl::spawn("vm::wait", []()
{
while (!Emu.IsStopped())
{
// poll waiters periodically (TODO)
while (!notify_all() && !Emu.IsPaused())
// Poll waiters periodically (TODO)
while (notify_all() && !Emu.IsPaused())
{
std::this_thread::yield();
thread_ctrl::sleep(50);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
thread_ctrl::sleep(1000);
}
});
}