mirror of
https://github.com/RPCSX/rpcsx.git
synced 2026-01-04 15:50:10 +01:00
[orbis-kernel] Implement shared_cv
Shared (IPC-ready) condition variable. Relicensed and improved from RPCS3.
This commit is contained in:
parent
c269d23665
commit
5bb820084e
|
|
@ -62,6 +62,7 @@ add_library(obj.orbis-kernel OBJECT
|
|||
|
||||
src/utils/Logs.cpp
|
||||
src/utils/SharedMutex.cpp
|
||||
src/utils/SharedCV.cpp
|
||||
)
|
||||
|
||||
target_link_libraries(obj.orbis-kernel PUBLIC orbis::kernel::config)
|
||||
|
|
|
|||
69
orbis-kernel/include/orbis/utils/SharedCV.hpp
Normal file
69
orbis-kernel/include/orbis/utils/SharedCV.hpp
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <orbis/utils/AtomicOp.hpp>
|
||||
#include <orbis/utils/SharedMutex.hpp>
|
||||
|
||||
namespace orbis {
|
||||
inline namespace utils {
|
||||
// IPC-ready lightweight condition variable
|
||||
class shared_cv {
|
||||
enum : unsigned {
|
||||
c_waiter_mask = 0xffff,
|
||||
c_signal_mask = 0xffffffff & ~c_waiter_mask,
|
||||
};
|
||||
|
||||
std::atomic<unsigned> m_value{0};
|
||||
|
||||
protected:
|
||||
// Increment waiter count
|
||||
unsigned add_waiter() noexcept {
|
||||
return atomic_op(m_value, [](unsigned &value) -> unsigned {
|
||||
if ((value & c_signal_mask) == c_signal_mask ||
|
||||
(value & c_waiter_mask) == c_waiter_mask) {
|
||||
// Signal or waiter overflow, return immediately
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Add waiter (c_waiter_mask)
|
||||
value += 1;
|
||||
return value;
|
||||
});
|
||||
}
|
||||
|
||||
// Internal waiting function
|
||||
void impl_wait(shared_mutex &mutex, unsigned _old,
|
||||
std::uint64_t usec_timeout) noexcept;
|
||||
|
||||
// Try to notify up to _count threads
|
||||
void impl_wake(shared_mutex &mutex, int _count) noexcept;
|
||||
|
||||
public:
|
||||
constexpr shared_cv() = default;
|
||||
|
||||
void wait(shared_mutex &mutex, std::uint64_t usec_timeout = -1) noexcept {
|
||||
const unsigned _old = add_waiter();
|
||||
if (!_old) {
|
||||
return;
|
||||
}
|
||||
|
||||
mutex.unlock();
|
||||
impl_wait(mutex, _old, usec_timeout);
|
||||
}
|
||||
|
||||
// Wake one thread
|
||||
void notify_one(shared_mutex &mutex) noexcept {
|
||||
if (m_value) {
|
||||
impl_wake(mutex, 1);
|
||||
}
|
||||
}
|
||||
|
||||
// Wake all threads
|
||||
void notify_all(shared_mutex &mutex) noexcept {
|
||||
if (m_value) {
|
||||
impl_wake(mutex, INT_MAX);
|
||||
}
|
||||
}
|
||||
};
|
||||
} // namespace utils
|
||||
} // namespace orbis
|
||||
|
|
@ -101,6 +101,9 @@ public:
|
|||
|
||||
// Check whether can immediately obtain a shared (reader) lock
|
||||
bool is_lockable() const { return m_value.load() < c_one - 1; }
|
||||
|
||||
// For CV
|
||||
unsigned lock_forced();
|
||||
};
|
||||
|
||||
// Simplified shared (reader) lock implementation.
|
||||
|
|
|
|||
82
orbis-kernel/src/utils/SharedCV.cpp
Normal file
82
orbis-kernel/src/utils/SharedCV.cpp
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
#include "orbis/utils/SharedCV.hpp"
|
||||
#include <linux/futex.h>
|
||||
#include <syscall.h>
|
||||
#include <unistd.h>
|
||||
|
||||
namespace orbis::utils {
|
||||
void shared_cv::impl_wait(shared_mutex &mutex, unsigned _old,
|
||||
std::uint64_t usec_timeout) noexcept {
|
||||
// Not supposed to fail
|
||||
if (!_old)
|
||||
std::abort();
|
||||
|
||||
// Wait with timeout
|
||||
struct timespec timeout {};
|
||||
timeout.tv_nsec = (usec_timeout % 1000'000) * 1000;
|
||||
timeout.tv_sec = (usec_timeout / 1000'000);
|
||||
syscall(SYS_futex, &m_value, FUTEX_WAIT, _old,
|
||||
usec_timeout + 1 ? &timeout : nullptr, 0, 0);
|
||||
|
||||
// Cleanup
|
||||
const auto old = atomic_fetch_op(m_value, [](unsigned &value) {
|
||||
// Remove waiter (c_waiter_mask)
|
||||
value -= 1;
|
||||
|
||||
// Try to remove signal
|
||||
if (value & c_signal_mask)
|
||||
value -= c_signal_mask & (0 - c_signal_mask);
|
||||
});
|
||||
|
||||
// Lock is already acquired
|
||||
if (old & c_signal_mask) {
|
||||
return;
|
||||
}
|
||||
|
||||
mutex.lock();
|
||||
}
|
||||
|
||||
void shared_cv::impl_wake(shared_mutex &mutex, int _count) noexcept {
|
||||
unsigned _old = m_value.load();
|
||||
const bool is_one = _count == 1;
|
||||
|
||||
// Enqueue _count waiters
|
||||
_count = std::min<int>(_count, _old & c_waiter_mask);
|
||||
if (_count <= 0)
|
||||
return;
|
||||
|
||||
// Try to lock the mutex
|
||||
unsigned _m_locks = mutex.lock_forced();
|
||||
|
||||
const int max_sig = atomic_op(m_value, [&](unsigned &value) {
|
||||
// Verify the number of waiters
|
||||
int max_sig = std::min<int>(_count, value & c_waiter_mask);
|
||||
|
||||
// Add lock signal (mutex was immediately locked)
|
||||
if (_m_locks == 0)
|
||||
value += c_signal_mask & (0 - c_signal_mask);
|
||||
_old = value;
|
||||
return max_sig;
|
||||
});
|
||||
|
||||
if (max_sig < _count) {
|
||||
_count = max_sig;
|
||||
}
|
||||
|
||||
if (_count) {
|
||||
// Wake up one thread + requeue remaining waiters
|
||||
unsigned do_wake = _m_locks == 0;
|
||||
if (auto r = syscall(SYS_futex, &m_value, FUTEX_CMP_REQUEUE, do_wake,
|
||||
&mutex, _count - do_wake, _old);
|
||||
r > 0) {
|
||||
if (mutex.is_free()) // Avoid deadlock (TODO: proper fix?)
|
||||
syscall(SYS_futex, &mutex, FUTEX_WAKE, 1, nullptr, 0, 0);
|
||||
if (!is_one) // Keep awaking waiters
|
||||
return impl_wake(mutex, INT_MAX);
|
||||
} else if (r == EAGAIN) {
|
||||
// Retry if something has changed (TODO: is it necessary?)
|
||||
return impl_wake(mutex, is_one ? 1 : INT_MAX);
|
||||
} else if (r < 0)
|
||||
std::abort(); // Unknown error
|
||||
}
|
||||
}
|
||||
} // namespace orbis::utils
|
||||
|
|
@ -149,4 +149,20 @@ void shared_mutex::impl_lock_upgrade() {
|
|||
|
||||
impl_wait();
|
||||
}
|
||||
unsigned shared_mutex::lock_forced() {
|
||||
return atomic_op(m_value, [](unsigned &v) {
|
||||
if (v & c_sig) {
|
||||
v -= c_sig;
|
||||
v += c_one;
|
||||
return 0u;
|
||||
}
|
||||
|
||||
if (v == 0) {
|
||||
v += c_one;
|
||||
return 0u;
|
||||
}
|
||||
|
||||
return v;
|
||||
});
|
||||
}
|
||||
} // namespace orbis::utils
|
||||
|
|
|
|||
Loading…
Reference in a new issue