orbis-kernel: Add SharedAtomic utility

Initial shared atomic implementation for Darwin
This commit is contained in:
DH 2024-10-31 22:54:16 +03:00
parent d83a0723a7
commit 7d0f277ad5
10 changed files with 578 additions and 97 deletions

View file

@ -33,7 +33,7 @@ orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode,
thread->evfIsCancelled = -1;
std::unique_lock lock(queueMtx);
int result = 0;
orbis::ErrorCode result = {};
while (true) {
if (isDeleted) {
if (thread->evfIsCancelled == UINT64_MAX)
@ -78,10 +78,10 @@ orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode,
waitingThreads.emplace_back(waitingThread);
if (timeout) {
result = thread->sync_cv.wait(queueMtx, *timeout);
result = toErrorCode(thread->sync_cv.wait(queueMtx, *timeout));
update_timeout();
} else {
result = thread->sync_cv.wait(queueMtx);
result = toErrorCode(thread->sync_cv.wait(queueMtx));
}
if (thread->evfIsCancelled == UINT64_MAX) {

View file

@ -1,9 +1,8 @@
#include "umtx.hpp"
#include "error.hpp"
#include "orbis/KernelContext.hpp"
#include "orbis/thread.hpp"
#include "orbis/utils/AtomicOp.hpp"
#include "orbis/utils/Logs.hpp"
#include "time.hpp"
#include <limits>
namespace orbis {
@ -41,7 +40,7 @@ uint UmtxChain::notify_n(const UmtxKey &key, sint count) {
return 0;
uint n = 0;
while (count > 0) {
while (count > 0) {
it->second.thr = nullptr;
it->second.cv.notify_all(mtx);
it = erase(it);
@ -57,9 +56,7 @@ uint UmtxChain::notify_n(const UmtxKey &key, sint count) {
return n;
}
uint UmtxChain::notify_one(const UmtxKey &key) {
return notify_n(key, 1);
}
uint UmtxChain::notify_one(const UmtxKey &key) { return notify_n(key, 1); }
uint UmtxChain::notify_all(const UmtxKey &key) {
return notify_n(key, std::numeric_limits<sint>::max());
@ -94,7 +91,7 @@ orbis::ErrorCode orbis::umtx_wait(Thread *thread, ptr<void> addr, ulong id,
if (val == id) {
if (ut + 1 == 0) {
while (true) {
result = ErrorCode{node->second.cv.wait(chain.mtx)};
result = orbis::toErrorCode(node->second.cv.wait(chain.mtx));
if (result != ErrorCode{} || node->second.thr != thread)
break;
}
@ -102,7 +99,8 @@ orbis::ErrorCode orbis::umtx_wait(Thread *thread, ptr<void> addr, ulong id,
auto start = std::chrono::steady_clock::now();
std::uint64_t udiff = 0;
while (true) {
result = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)};
result =
orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff));
if (node->second.thr != thread)
break;
udiff = std::chrono::duration_cast<std::chrono::microseconds>(
@ -193,7 +191,7 @@ static ErrorCode do_lock_normal(Thread *thread, ptr<umutex> m, uint flags,
auto [chain, key, lock] = g_context.getUmtxChain1(thread, flags, m);
auto node = chain.enqueue(key, thread);
if (m->owner.compare_exchange_strong(owner, owner | kUmutexContested)) {
error = ErrorCode{node->second.cv.wait(chain.mtx, ut)};
error = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut));
if (error == ErrorCode{} && node->second.thr == thread) {
error = ErrorCode::TIMEDOUT;
}
@ -323,7 +321,6 @@ orbis::ErrorCode orbis::umtx_cv_wait(Thread *thread, ptr<ucond> cv,
ORBIS_LOG_WARNING("umtx_cv_wait: CLOCK_ID", wflags, cv->clockid);
// std::abort();
return ErrorCode::NOSYS;
}
if ((wflags & kCvWaitAbsTime) != 0 && ut + 1) {
ORBIS_LOG_WARNING("umtx_cv_wait: ABSTIME unimplemented", wflags);
@ -353,7 +350,7 @@ orbis::ErrorCode orbis::umtx_cv_wait(Thread *thread, ptr<ucond> cv,
if (result == ErrorCode{}) {
if (ut + 1 == 0) {
while (true) {
result = ErrorCode{node->second.cv.wait(chain.mtx, ut)};
result = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut));
if (result != ErrorCode{} || node->second.thr != thread) {
break;
}
@ -362,7 +359,8 @@ orbis::ErrorCode orbis::umtx_cv_wait(Thread *thread, ptr<ucond> cv,
auto start = std::chrono::steady_clock::now();
std::uint64_t udiff = 0;
while (true) {
result = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)};
result =
orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff));
if (node->second.thr != thread) {
break;
}
@ -457,7 +455,7 @@ orbis::ErrorCode orbis::umtx_rw_rdlock(Thread *thread, ptr<urwlock> rwlock,
if (ut + 1 == 0) {
while (true) {
result = ErrorCode{node->second.cv.wait(chain.mtx, ut)};
result = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut));
if (result != ErrorCode{} || node->second.thr != thread) {
break;
}
@ -466,7 +464,8 @@ orbis::ErrorCode orbis::umtx_rw_rdlock(Thread *thread, ptr<urwlock> rwlock,
auto start = std::chrono::steady_clock::now();
std::uint64_t udiff = 0;
while (true) {
result = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)};
result =
orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff));
if (node->second.thr != thread)
break;
udiff = std::chrono::duration_cast<std::chrono::microseconds>(
@ -557,7 +556,7 @@ orbis::ErrorCode orbis::umtx_rw_wrlock(Thread *thread, ptr<urwlock> rwlock,
if (ut + 1 == 0) {
while (true) {
error = ErrorCode{node->second.cv.wait(chain.mtx, ut)};
error = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut));
if (error != ErrorCode{} || node->second.thr != thread) {
break;
}
@ -566,7 +565,8 @@ orbis::ErrorCode orbis::umtx_rw_wrlock(Thread *thread, ptr<urwlock> rwlock,
auto start = std::chrono::steady_clock::now();
std::uint64_t udiff = 0;
while (true) {
error = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)};
error =
orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff));
if (node->second.thr != thread)
break;
udiff = std::chrono::duration_cast<std::chrono::microseconds>(
@ -729,7 +729,7 @@ orbis::ErrorCode orbis::umtx_sem_wait(Thread *thread, ptr<usem> sem,
if (!sem->count) {
if (ut + 1 == 0) {
while (true) {
result = ErrorCode{node->second.cv.wait(chain.mtx, ut)};
result = orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut));
if (result != ErrorCode{} || node->second.thr != thread)
break;
}
@ -737,7 +737,8 @@ orbis::ErrorCode orbis::umtx_sem_wait(Thread *thread, ptr<usem> sem,
auto start = std::chrono::steady_clock::now();
std::uint64_t udiff = 0;
while (true) {
result = ErrorCode{node->second.cv.wait(chain.mtx, ut - udiff)};
result =
orbis::toErrorCode(node->second.cv.wait(chain.mtx, ut - udiff));
if (node->second.thr != thread)
break;
udiff = std::chrono::duration_cast<std::chrono::microseconds>(

View file

@ -0,0 +1,95 @@
#include "utils/SharedAtomic.hpp"
using namespace orbis;
#ifdef ORBIS_HAS_FUTEX
#include <linux/futex.h>
std::errc shared_atomic32::wait_impl(std::uint32_t oldValue,
std::chrono::microseconds usec_timeout) {
auto usec_timeout_count = usec_timeout.count();
struct timespec timeout{};
timeout.tv_nsec = (usec_timeout_count % 1000'000) * 1000;
timeout.tv_sec = (usec_timeout_count / 1000'000);
int result = syscall(
SYS_futex, this, FUTEX_WAIT, oldValue,
usec_timeout != std::chrono::microseconds::max() ? &timeout : nullptr, 0,
0);
if (result < 0) {
return static_cast<std::errc>(errno);
}
return {};
}
int shared_atomic32::notify_n(int count) const {
return syscall(SYS_futex, this, FUTEX_WAKE, count);
}
#elif defined(ORBIS_HAS_ULOCK)
#include <limits>
#define UL_COMPARE_AND_WAIT 1
#define UL_UNFAIR_LOCK 2
#define UL_COMPARE_AND_WAIT_SHARED 3
#define UL_UNFAIR_LOCK64_SHARED 4
#define UL_COMPARE_AND_WAIT64 5
#define UL_COMPARE_AND_WAIT64_SHARED 6
#define ULF_WAKE_ALL 0x00000100
#define ULF_WAKE_THREAD 0x00000200
#define ULF_WAKE_ALLOW_NON_OWNER 0x00000400
#define ULF_WAIT_WORKQ_DATA_CONTENTION 0x00010000
#define ULF_WAIT_CANCEL_POINT 0x00020000
#define ULF_WAIT_ADAPTIVE_SPIN 0x00040000
#define ULF_NO_ERRNO 0x01000000
#define UL_OPCODE_MASK 0x000000FF
#define UL_FLAGS_MASK 0xFFFFFF00
#define ULF_GENERIC_MASK 0xFFFF0000
extern int __ulock_wait(uint32_t operation, void *addr, uint64_t value,
uint32_t timeout);
extern int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value);
std::errc shared_atomic32::wait_impl(std::uint32_t oldValue,
std::chrono::microseconds usec_timeout) {
int result = __ulock_wait(UL_COMPARE_AND_WAIT_SHARED, (void *)this, oldValue,
usec_timeout.count());
if (result < 0) {
return static_cast<std::errc>(errno);
}
return {};
}
int shared_atomic32::notify_n(int count) const {
int result = 0;
uint32_t operation = UL_COMPARE_AND_WAIT_SHARED | ULF_NO_ERRNO;
if (count == 1) {
result = __ulock_wake(operation, (void *)this, 0);
} else if (count == std::numeric_limits<int>::max()) {
result = __ulock_wake(ULF_WAKE_ALL | operation, (void *)this, 0);
} else {
for (int i = 0; i < count; ++i) {
auto ret = __ulock_wake(operation, (void *)this, 0);
if (ret != 0) {
if (result == 0) {
result = ret;
}
break;
}
result++;
}
}
return result;
}
#else
#error Unimplemented atomic for this platform
#endif

View file

@ -1,35 +1,30 @@
#include "orbis/utils/SharedCV.hpp"
#include "orbis/utils/Logs.hpp"
#include <chrono>
#ifdef ORBIS_HAS_FUTEX
#include <linux/futex.h>
#include <syscall.h>
#include <unistd.h>
#endif
namespace orbis::utils {
int shared_cv::impl_wait(shared_mutex &mutex, unsigned _val,
std::uint64_t usec_timeout) noexcept {
std::errc shared_cv::impl_wait(shared_mutex &mutex, unsigned _val,
std::uint64_t usec_timeout) noexcept {
// Not supposed to fail
if (!_val) {
std::abort();
}
// Wait with timeout
struct timespec timeout {};
timeout.tv_nsec = (usec_timeout % 1000'000) * 1000;
timeout.tv_sec = (usec_timeout / 1000'000);
int result = 0;
std::errc result = {};
while (true) {
result = syscall(SYS_futex, &m_value, FUTEX_WAIT, _val,
usec_timeout + 1 ? &timeout : nullptr, 0, 0);
if (result < 0) {
result = errno;
}
result = m_value.wait(_val, std::chrono::microseconds(usec_timeout));
// Cleanup
const auto old = atomic_fetch_op(m_value, [&](unsigned &value) {
const auto old = m_value.fetch_op([&](unsigned &value) {
// Remove waiter if no signals
if (!(value & ~c_waiter_mask) && result != EAGAIN) {
if (!(value & ~c_waiter_mask) &&
result != std::errc::resource_unavailable_try_again) {
value -= 1;
}
@ -38,23 +33,32 @@ int shared_cv::impl_wait(shared_mutex &mutex, unsigned _val,
value -= c_signal_one;
}
#ifdef ORBIS_HAS_FUTEX
if (value & c_locked_mask) {
value -= c_locked_mask;
}
#endif
});
#ifdef ORBIS_HAS_FUTEX
// Lock is already acquired
if (old & c_locked_mask) {
return 0;
return {};
}
// Wait directly (waiter has been added)
if (old & c_signal_mask) {
return mutex.impl_wait();
}
#else
if (old & c_signal_mask) {
result = {};
break;
}
#endif
// Possibly spurious wakeup
if (result != EAGAIN) {
if (result != std::errc::resource_unavailable_try_again) {
break;
}
@ -66,27 +70,69 @@ int shared_cv::impl_wait(shared_mutex &mutex, unsigned _val,
}
void shared_cv::impl_wake(shared_mutex &mutex, int _count) noexcept {
unsigned _old = m_value.load();
const bool is_one = _count == 1;
#ifdef ORBIS_HAS_FUTEX
while (true) {
unsigned _old = m_value.load();
const bool is_one = _count == 1;
// Enqueue _count waiters
// Enqueue _count waiters
_count = std::min<int>(_count, _old & c_waiter_mask);
if (_count <= 0)
return;
// Try to lock the mutex
const bool locked = mutex.lock_forced(_count);
const int max_sig = m_value.op([&](unsigned &value) {
// Verify the number of waiters
int max_sig = std::min<int>(_count, value & c_waiter_mask);
// Add lock signal (mutex was immediately locked)
if (locked && max_sig)
value |= c_locked_mask;
else if (locked)
std::abort();
// Add normal signals
value += c_signal_one * max_sig;
// Remove waiters
value -= max_sig;
_old = value;
return max_sig;
});
if (max_sig < _count) {
// Fixup mutex
mutex.lock_forced(max_sig - _count);
_count = max_sig;
}
if (_count) {
// Wake up one thread + requeue remaining waiters
unsigned awake_count = locked ? 1 : 0;
if (auto r = syscall(SYS_futex, &m_value, FUTEX_REQUEUE, awake_count,
_count - awake_count, &mutex, 0);
r < _count) {
// Keep awaking waiters
_count = is_one ? 1 : INT_MAX;
continue;
}
}
break;
}
#else
unsigned _old = m_value.load();
_count = std::min<int>(_count, _old & c_waiter_mask);
if (_count <= 0)
return;
// Try to lock the mutex
const bool locked = mutex.lock_forced(_count);
mutex.lock_forced(1);
const int max_sig = atomic_op(m_value, [&](unsigned &value) {
// Verify the number of waiters
const int wakeupWaiters = m_value.op([&](unsigned &value) {
int max_sig = std::min<int>(_count, value & c_waiter_mask);
// Add lock signal (mutex was immediately locked)
if (locked && max_sig)
value |= c_locked_mask;
else if (locked)
std::abort();
// Add normal signals
value += c_signal_one * max_sig;
@ -96,21 +142,11 @@ void shared_cv::impl_wake(shared_mutex &mutex, int _count) noexcept {
return max_sig;
});
if (max_sig < _count) {
// Fixup mutex
mutex.lock_forced(max_sig - _count);
_count = max_sig;
if (wakeupWaiters > 0) {
m_value.notify_n(wakeupWaiters);
}
if (_count) {
// Wake up one thread + requeue remaining waiters
unsigned awake_count = locked ? 1 : 0;
if (auto r = syscall(SYS_futex, &m_value, FUTEX_REQUEUE, awake_count,
_count - awake_count, &mutex, 0);
r < _count) {
// Keep awaking waiters
return impl_wake(mutex, is_one ? 1 : INT_MAX);
}
}
mutex.unlock();
#endif
}
} // namespace orbis::utils

View file

@ -1,6 +1,5 @@
#include "utils/SharedMutex.hpp"
#include "utils/Logs.hpp"
#include <linux/futex.h>
#include <syscall.h>
#include <unistd.h>
#include <xmmintrin.h>
@ -48,7 +47,7 @@ void shared_mutex::impl_lock_shared(unsigned val) {
if ((old % c_sig) + c_one >= c_sig)
std::abort(); // "shared_mutex overflow"
while (impl_wait() != 0) {}
while (impl_wait() != std::errc{}) {}
lock_downgrade();
}
void shared_mutex::impl_unlock_shared(unsigned old) {
@ -60,9 +59,9 @@ void shared_mutex::impl_unlock_shared(unsigned old) {
impl_signal();
}
}
int shared_mutex::impl_wait() {
std::errc shared_mutex::impl_wait() {
while (true) {
const auto [old, ok] = atomic_fetch_op(m_value, [](unsigned &value) {
const auto [old, ok] = m_value.fetch_op([](unsigned &value) {
if (value >= c_sig) {
value -= c_sig;
return true;
@ -75,19 +74,17 @@ int shared_mutex::impl_wait() {
break;
}
int result = syscall(SYS_futex, &m_value, FUTEX_WAIT, old, 0, 0, 0);
if (result < 0) {
result = errno;
}
if (result == EINTR) {
return EINTR;
auto result = m_value.wait(old);
if (result == std::errc::interrupted) {
return result;
}
}
return{};
}
void shared_mutex::impl_signal() {
m_value += c_sig;
syscall(SYS_futex, &m_value, FUTEX_WAKE, 1, 0, 0, 0);
m_value.notify_one();
}
void shared_mutex::impl_lock(unsigned val) {
if (val >= c_err)
@ -123,7 +120,7 @@ void shared_mutex::impl_lock(unsigned val) {
if ((old % c_sig) + c_one >= c_sig)
std::abort(); // "shared_mutex overflow"
while (impl_wait() != 0) {}
while (impl_wait() != std::errc{}) {}
}
void shared_mutex::impl_unlock(unsigned old) {
if (old - c_one >= c_err)
@ -155,27 +152,23 @@ void shared_mutex::impl_lock_upgrade() {
return;
}
while (impl_wait() != 0) {}
while (impl_wait() != std::errc{}) {}
}
bool shared_mutex::lock_forced(int count) {
if (count == 0)
return false;
if (count > 0) {
// Lock
return atomic_op(m_value, [&](unsigned &v) {
return m_value.op([&](std::uint32_t &v) {
if (v & c_sig) {
v -= c_sig;
v += c_one * count;
return true;
}
if (v == 0) {
v += c_one * count;
return true;
}
bool firstLock = v == 0;
v += c_one * count;
return false;
return firstLock;
});
}