diff --git a/orbis-kernel/CMakeLists.txt b/orbis-kernel/CMakeLists.txt index daf17846a..aecc11b24 100644 --- a/orbis-kernel/CMakeLists.txt +++ b/orbis-kernel/CMakeLists.txt @@ -62,6 +62,7 @@ add_library(obj.orbis-kernel OBJECT src/utils/Logs.cpp src/utils/SharedMutex.cpp + src/utils/SharedCV.cpp ) target_link_libraries(obj.orbis-kernel PUBLIC orbis::kernel::config) diff --git a/orbis-kernel/include/orbis/utils/SharedCV.hpp b/orbis-kernel/include/orbis/utils/SharedCV.hpp new file mode 100644 index 000000000..4aa494a38 --- /dev/null +++ b/orbis-kernel/include/orbis/utils/SharedCV.hpp @@ -0,0 +1,69 @@ +#pragma once + +#include +#include +#include + +namespace orbis { +inline namespace utils { +// IPC-ready lightweight condition variable +class shared_cv { + enum : unsigned { + c_waiter_mask = 0xffff, + c_signal_mask = 0xffffffff & ~c_waiter_mask, + }; + + std::atomic m_value{0}; + +protected: + // Increment waiter count + unsigned add_waiter() noexcept { + return atomic_op(m_value, [](unsigned &value) -> unsigned { + if ((value & c_signal_mask) == c_signal_mask || + (value & c_waiter_mask) == c_waiter_mask) { + // Signal or waiter overflow, return immediately + return 0; + } + + // Add waiter (c_waiter_mask) + value += 1; + return value; + }); + } + + // Internal waiting function + void impl_wait(shared_mutex &mutex, unsigned _old, + std::uint64_t usec_timeout) noexcept; + + // Try to notify up to _count threads + void impl_wake(shared_mutex &mutex, int _count) noexcept; + +public: + constexpr shared_cv() = default; + + void wait(shared_mutex &mutex, std::uint64_t usec_timeout = -1) noexcept { + const unsigned _old = add_waiter(); + if (!_old) { + return; + } + + mutex.unlock(); + impl_wait(mutex, _old, usec_timeout); + } + + // Wake one thread + void notify_one(shared_mutex &mutex) noexcept { + if (m_value) { + impl_wake(mutex, 1); + } + } + + // Wake all threads + void notify_all(shared_mutex &mutex) noexcept { + if (m_value) { + impl_wake(mutex, INT_MAX); + } + } +}; +} // namespace utils +} // namespace orbis diff --git a/orbis-kernel/include/orbis/utils/SharedMutex.hpp b/orbis-kernel/include/orbis/utils/SharedMutex.hpp index eddc5cc4a..4feef1f96 100644 --- a/orbis-kernel/include/orbis/utils/SharedMutex.hpp +++ b/orbis-kernel/include/orbis/utils/SharedMutex.hpp @@ -101,6 +101,9 @@ public: // Check whether can immediately obtain a shared (reader) lock bool is_lockable() const { return m_value.load() < c_one - 1; } + + // For CV + unsigned lock_forced(); }; // Simplified shared (reader) lock implementation. diff --git a/orbis-kernel/src/utils/SharedCV.cpp b/orbis-kernel/src/utils/SharedCV.cpp new file mode 100644 index 000000000..0f8a9dfb8 --- /dev/null +++ b/orbis-kernel/src/utils/SharedCV.cpp @@ -0,0 +1,82 @@ +#include "orbis/utils/SharedCV.hpp" +#include +#include +#include + +namespace orbis::utils { +void shared_cv::impl_wait(shared_mutex &mutex, unsigned _old, + std::uint64_t usec_timeout) noexcept { + // Not supposed to fail + if (!_old) + std::abort(); + + // Wait with timeout + struct timespec timeout {}; + timeout.tv_nsec = (usec_timeout % 1000'000) * 1000; + timeout.tv_sec = (usec_timeout / 1000'000); + syscall(SYS_futex, &m_value, FUTEX_WAIT, _old, + usec_timeout + 1 ? &timeout : nullptr, 0, 0); + + // Cleanup + const auto old = atomic_fetch_op(m_value, [](unsigned &value) { + // Remove waiter (c_waiter_mask) + value -= 1; + + // Try to remove signal + if (value & c_signal_mask) + value -= c_signal_mask & (0 - c_signal_mask); + }); + + // Lock is already acquired + if (old & c_signal_mask) { + return; + } + + mutex.lock(); +} + +void shared_cv::impl_wake(shared_mutex &mutex, int _count) noexcept { + unsigned _old = m_value.load(); + const bool is_one = _count == 1; + + // Enqueue _count waiters + _count = std::min(_count, _old & c_waiter_mask); + if (_count <= 0) + return; + + // Try to lock the mutex + unsigned _m_locks = mutex.lock_forced(); + + const int max_sig = atomic_op(m_value, [&](unsigned &value) { + // Verify the number of waiters + int max_sig = std::min(_count, value & c_waiter_mask); + + // Add lock signal (mutex was immediately locked) + if (_m_locks == 0) + value += c_signal_mask & (0 - c_signal_mask); + _old = value; + return max_sig; + }); + + if (max_sig < _count) { + _count = max_sig; + } + + if (_count) { + // Wake up one thread + requeue remaining waiters + unsigned do_wake = _m_locks == 0; + if (auto r = syscall(SYS_futex, &m_value, FUTEX_CMP_REQUEUE, do_wake, + &mutex, _count - do_wake, _old); + r > 0) { + if (mutex.is_free()) // Avoid deadlock (TODO: proper fix?) + syscall(SYS_futex, &mutex, FUTEX_WAKE, 1, nullptr, 0, 0); + if (!is_one) // Keep awaking waiters + return impl_wake(mutex, INT_MAX); + } else if (r == EAGAIN) { + // Retry if something has changed (TODO: is it necessary?) + return impl_wake(mutex, is_one ? 1 : INT_MAX); + } else if (r < 0) + std::abort(); // Unknown error + } +} +} // namespace orbis::utils diff --git a/orbis-kernel/src/utils/SharedMutex.cpp b/orbis-kernel/src/utils/SharedMutex.cpp index 1afdaf790..ff8a105eb 100644 --- a/orbis-kernel/src/utils/SharedMutex.cpp +++ b/orbis-kernel/src/utils/SharedMutex.cpp @@ -149,4 +149,20 @@ void shared_mutex::impl_lock_upgrade() { impl_wait(); } +unsigned shared_mutex::lock_forced() { + return atomic_op(m_value, [](unsigned &v) { + if (v & c_sig) { + v -= c_sig; + v += c_one; + return 0u; + } + + if (v == 0) { + v += c_one; + return 0u; + } + + return v; + }); +} } // namespace orbis::utils