diff --git a/orbis-kernel/include/orbis/utils/SharedCV.hpp b/orbis-kernel/include/orbis/utils/SharedCV.hpp index 4aa494a38..14f0f5dd2 100644 --- a/orbis-kernel/include/orbis/utils/SharedCV.hpp +++ b/orbis-kernel/include/orbis/utils/SharedCV.hpp @@ -10,7 +10,9 @@ inline namespace utils { class shared_cv { enum : unsigned { c_waiter_mask = 0xffff, - c_signal_mask = 0xffffffff & ~c_waiter_mask, + c_signal_mask = 0x7fff0000, + c_locked_mask = 0x80000000, + c_signal_one = c_waiter_mask + 1, }; std::atomic m_value{0}; diff --git a/orbis-kernel/include/orbis/utils/SharedMutex.hpp b/orbis-kernel/include/orbis/utils/SharedMutex.hpp index 4feef1f96..d96fa8c82 100644 --- a/orbis-kernel/include/orbis/utils/SharedMutex.hpp +++ b/orbis-kernel/include/orbis/utils/SharedMutex.hpp @@ -7,7 +7,9 @@ namespace orbis { inline namespace utils { // IPC-ready shared mutex, using only writer lock is recommended -struct shared_mutex final { +class shared_mutex final { + friend class shared_cv; + enum : unsigned { c_one = 1u << 14, // Fixed-point 1.0 value (one writer) c_sig = 1u << 30, @@ -102,8 +104,9 @@ public: // Check whether can immediately obtain a shared (reader) lock bool is_lockable() const { return m_value.load() < c_one - 1; } +private: // For CV - unsigned lock_forced(); + bool lock_forced(int count = 1); }; // Simplified shared (reader) lock implementation. diff --git a/orbis-kernel/src/utils/SharedCV.cpp b/orbis-kernel/src/utils/SharedCV.cpp index e9d826818..80af6b38d 100644 --- a/orbis-kernel/src/utils/SharedCV.cpp +++ b/orbis-kernel/src/utils/SharedCV.cpp @@ -19,19 +19,29 @@ void shared_cv::impl_wait(shared_mutex &mutex, unsigned _old, // Cleanup const auto old = atomic_fetch_op(m_value, [](unsigned &value) { - // Remove waiter (c_waiter_mask) - value -= 1; + // Remove waiter if no signals + if (!(value & ~c_waiter_mask)) + value -= 1; // Try to remove signal if (value & c_signal_mask) - value -= c_signal_mask & (0 - c_signal_mask); + value -= c_signal_one; + if (value | c_locked_mask) + value -= c_locked_mask; }); // Lock is already acquired - if (old & c_signal_mask) { + if (old & c_locked_mask) { return; } + // Wait directly (waiter has been added) + if (old & c_signal_mask) { + mutex.impl_wait(); + return; + } + + // Possibly spurious wakeup mutex.lock(); } @@ -45,34 +55,38 @@ void shared_cv::impl_wake(shared_mutex &mutex, int _count) noexcept { return; // Try to lock the mutex - unsigned _m_locks = mutex.lock_forced(); + const bool locked = mutex.lock_forced(_count); const int max_sig = atomic_op(m_value, [&](unsigned &value) { // Verify the number of waiters int max_sig = std::min(_count, value & c_waiter_mask); // Add lock signal (mutex was immediately locked) - if (_m_locks == 0) - value += c_signal_mask & (0 - c_signal_mask); + if (locked && max_sig) + value |= c_locked_mask; + + // Add normal signals + value += c_signal_one * max_sig; + + // Remove waiters + value -= max_sig; _old = value; return max_sig; }); if (max_sig < _count) { + // Fixup mutex + mutex.lock_forced(max_sig - _count); _count = max_sig; } if (_count) { // Wake up one thread + requeue remaining waiters - unsigned do_wake = _m_locks == 0; - if (auto r = syscall(SYS_futex, &m_value, FUTEX_CMP_REQUEUE, do_wake, - &mutex, _count - do_wake, _old); + if (auto r = syscall(SYS_futex, &m_value, FUTEX_CMP_REQUEUE, +locked, + &mutex, _count - +locked, _old); r < _count) { // Keep awaking waiters return impl_wake(mutex, is_one ? 1 : INT_MAX); - } else if (mutex.is_free()) { - // Avoid deadlock (TODO: proper fix?) - syscall(SYS_futex, &mutex, FUTEX_WAKE, 1, nullptr, 0, 0); } } } diff --git a/orbis-kernel/src/utils/SharedMutex.cpp b/orbis-kernel/src/utils/SharedMutex.cpp index ff8a105eb..fd63eba52 100644 --- a/orbis-kernel/src/utils/SharedMutex.cpp +++ b/orbis-kernel/src/utils/SharedMutex.cpp @@ -149,20 +149,34 @@ void shared_mutex::impl_lock_upgrade() { impl_wait(); } -unsigned shared_mutex::lock_forced() { - return atomic_op(m_value, [](unsigned &v) { - if (v & c_sig) { - v -= c_sig; - v += c_one; - return 0u; - } +bool shared_mutex::lock_forced(int count) { + if (count == 0) + std::abort(); + if (count > 0) { + // Lock + return atomic_op(m_value, [&](unsigned &v) { + if (v & c_sig) { + v -= c_sig; + v += c_one * (count - 1); + return true; + } - if (v == 0) { - v += c_one; - return 0u; - } + if (v == 0) { + v += c_one * count; + return true; + } - return v; - }); + v += c_one * count; + return false; + }); + } + + // Unlock + const unsigned value = m_value.fetch_add(c_one * count); + if (value != c_one) [[unlikely]] { + impl_unlock(value); + } + + return false; } } // namespace orbis::utils