move IPC utilities from orbis-kernel to rx

This commit is contained in:
DH 2025-10-05 00:09:42 +03:00
parent 30469f7fb9
commit e73a0b962d
41 changed files with 558 additions and 172 deletions

204
rx/src/SharedAtomic.cpp Normal file
View file

@ -0,0 +1,204 @@
#include "SharedAtomic.hpp"
using namespace rx;
#ifdef __linux__
#include <linux/futex.h>
std::errc shared_atomic32::wait_impl(std::uint32_t oldValue,
std::chrono::microseconds usec_timeout) {
auto usec_timeout_count = usec_timeout.count();
struct timespec timeout{};
bool useTimeout = usec_timeout != std::chrono::microseconds::max();
if (useTimeout) {
timeout.tv_nsec = (usec_timeout_count % 1000'000) * 1000;
timeout.tv_sec = (usec_timeout_count / 1000'000);
}
bool unblock = (!useTimeout || usec_timeout.count() > 1000) &&
g_scopedUnblock != nullptr;
if (unblock) {
if (!g_scopedUnblock(true)) {
return std::errc::interrupted;
}
}
int result = syscall(SYS_futex, this, FUTEX_WAIT, oldValue,
useTimeout ? &timeout : nullptr);
auto errorCode = result < 0 ? static_cast<std::errc>(errno) : std::errc{};
if (unblock) {
if (!g_scopedUnblock(false)) {
if (result < 0) {
return std::errc::interrupted;
}
return {};
}
}
if (result < 0) {
if (errorCode == std::errc::interrupted) {
return std::errc::resource_unavailable_try_again;
}
return errorCode;
}
return {};
}
int shared_atomic32::notify_n(int count) const {
return syscall(SYS_futex, this, FUTEX_WAKE, count);
}
#elif defined(__APPLE__)
#include <limits>
#define UL_COMPARE_AND_WAIT 1
#define UL_UNFAIR_LOCK 2
#define UL_COMPARE_AND_WAIT_SHARED 3
#define UL_UNFAIR_LOCK64_SHARED 4
#define UL_COMPARE_AND_WAIT64 5
#define UL_COMPARE_AND_WAIT64_SHARED 6
#define ULF_WAKE_ALL 0x00000100
#define ULF_WAKE_THREAD 0x00000200
#define ULF_WAKE_ALLOW_NON_OWNER 0x00000400
#define ULF_WAIT_WORKQ_DATA_CONTENTION 0x00010000
#define ULF_WAIT_CANCEL_POINT 0x00020000
#define ULF_WAIT_ADAPTIVE_SPIN 0x00040000
#define ULF_NO_ERRNO 0x01000000
#define UL_OPCODE_MASK 0x000000FF
#define UL_FLAGS_MASK 0xFFFFFF00
#define ULF_GENERIC_MASK 0xFFFF0000
extern int __ulock_wait(uint32_t operation, void *addr, uint64_t value,
uint32_t timeout);
extern int __ulock_wake(uint32_t operation, void *addr, uint64_t wake_value);
std::errc shared_atomic32::wait_impl(std::uint32_t oldValue,
std::chrono::microseconds usec_timeout) {
bool useTimeout = usec_timeout != std::chrono::microseconds::max();
bool unblock = (!useTimeout || usec_timeout.count() > 1000) &&
g_scopedUnblock != nullptr;
if (unblock) {
if (!g_scopedUnblock(true)) {
return std::errc::interrupted;
}
}
int result = __ulock_wait(UL_COMPARE_AND_WAIT_SHARED, (void *)this, oldValue,
usec_timeout.count());
if (unblock) {
if (!g_scopedUnblock(false)) {
if (result < 0) {
return std::errc::interrupted;
}
return {};
}
}
if (result < 0) {
return static_cast<std::errc>(errno);
}
return {};
}
int shared_atomic32::notify_n(int count) const {
int result = 0;
uint32_t operation = UL_COMPARE_AND_WAIT_SHARED | ULF_NO_ERRNO;
if (count == 1) {
result = __ulock_wake(operation, (void *)this, 0);
} else if (count == std::numeric_limits<int>::max()) {
result = __ulock_wake(ULF_WAKE_ALL | operation, (void *)this, 0);
} else {
for (int i = 0; i < count; ++i) {
auto ret = __ulock_wake(operation, (void *)this, 0);
if (ret != 0) {
if (result == 0) {
result = ret;
}
break;
}
result++;
}
}
return result;
}
#elif defined(_WIN32)
#include <cmath>
#include <windows.h>
std::errc shared_atomic32::wait_impl(std::uint32_t oldValue,
std::chrono::microseconds usec_timeout) {
bool useTimeout = usec_timeout != std::chrono::microseconds::max();
bool unblock = (!useTimeout || usec_timeout.count() > 1000) &&
g_scopedUnblock != nullptr;
if (unblock) {
if (!g_scopedUnblock(true)) {
return std::errc::interrupted;
}
}
BOOL result = WaitOnAddress(
this, &oldValue, sizeof(std::uint32_t),
useTimeout
? std::chrono::duration_cast<std::chrono::milliseconds>(usec_timeout)
.count()
: INFINITY);
DWORD error = 0;
if (!result) {
error = GetLastError();
} else {
if (load(std::memory_order::relaxed) == oldValue) {
error = ERROR_ALERTED; // dummy error
}
}
if (unblock) {
if (!g_scopedUnblock(false)) {
if (result != TRUE) {
return std::errc::interrupted;
}
return {};
}
}
if (error == ERROR_TIMEOUT) {
return std::errc::timed_out;
}
return std::errc::resource_unavailable_try_again;
}
int shared_atomic32::notify_n(int count) const {
if (count == 1) {
WakeByAddressSingle(const_cast<shared_atomic32 *>(this));
} else if (count == std::numeric_limits<int>::max()) {
WakeByAddressAll(const_cast<shared_atomic32 *>(this));
} else {
for (int i = 0; i < count; ++i) {
WakeByAddressSingle(const_cast<shared_atomic32 *>(this));
}
}
}
#else
#error Unimplemented atomic for this platform
#endif

158
rx/src/SharedCV.cpp Normal file
View file

@ -0,0 +1,158 @@
#include "SharedCV.hpp"
#include <chrono>
#ifdef __linux
#include <linux/futex.h>
#include <syscall.h>
#include <unistd.h>
#endif
namespace rx {
std::errc shared_cv::impl_wait(shared_mutex &mutex, unsigned _val,
std::uint64_t usec_timeout) noexcept {
// Not supposed to fail
if (!_val) {
std::abort();
}
std::errc result = {};
bool useTimeout = usec_timeout != static_cast<std::uint64_t>(-1);
while (true) {
result =
m_value.wait(_val, useTimeout ? std::chrono::microseconds(usec_timeout)
: std::chrono::microseconds::max());
bool spurious = result == std::errc::resource_unavailable_try_again;
// Cleanup
const auto old = m_value.fetch_op([&](unsigned &value) {
// Remove waiter if no signals
if ((value & ~c_waiter_mask) == 0) {
if (!spurious) {
value -= 1;
}
}
// Try to remove signal
if (value & c_signal_mask) {
value -= c_signal_one;
}
#ifdef __linux
if (value & c_locked_mask) {
value -= c_locked_mask;
}
#endif
});
#ifdef __linux
// Lock is already acquired
if (old & c_locked_mask) {
return {};
}
// Wait directly (waiter has been added)
if (old & c_signal_mask) {
return mutex.impl_wait();
}
#else
if (old & c_signal_mask) {
result = {};
break;
}
#endif
// Possibly spurious wakeup
if (!spurious) {
break;
}
_val = old;
}
mutex.lock();
return result;
}
void shared_cv::impl_wake(shared_mutex &mutex, int _count) noexcept {
#ifdef __linux
while (true) {
unsigned _old = m_value.load();
const bool is_one = _count == 1;
// Enqueue _count waiters
_count = std::min<int>(_count, _old & c_waiter_mask);
if (_count <= 0)
return;
// Try to lock the mutex
const bool locked = mutex.lock_forced(_count);
const int max_sig = m_value.op([&](unsigned &value) {
// Verify the number of waiters
int max_sig = std::min<int>(_count, value & c_waiter_mask);
// Add lock signal (mutex was immediately locked)
if (locked && max_sig)
value |= c_locked_mask;
else if (locked)
std::abort();
// Add normal signals
value += c_signal_one * max_sig;
// Remove waiters
value -= max_sig;
_old = value;
return max_sig;
});
if (max_sig < _count) {
// Fixup mutex
mutex.lock_forced(max_sig - _count);
_count = max_sig;
}
if (_count) {
// Wake up one thread + requeue remaining waiters
unsigned awake_count = locked ? 1 : 0;
if (auto r = syscall(SYS_futex, &m_value, FUTEX_REQUEUE, awake_count,
_count - awake_count, &mutex, 0);
r < _count) {
// Keep awaking waiters
_count = is_one ? 1 : INT_MAX;
continue;
}
}
break;
}
#else
unsigned _old = m_value.load();
_count = std::min<int>(_count, _old & c_waiter_mask);
if (_count <= 0)
return;
mutex.lock_forced(1);
const int wakeupWaiters = m_value.op([&](unsigned &value) {
int max_sig = std::min<int>(_count, value & c_waiter_mask);
// Add normal signals
value += c_signal_one * max_sig;
// Remove waiters
value -= max_sig;
_old = value;
return max_sig;
});
if (wakeupWaiters > 0) {
m_value.notify_n(wakeupWaiters);
}
mutex.unlock();
#endif
}
} // namespace orbis::utils

181
rx/src/SharedMutex.cpp Normal file
View file

@ -0,0 +1,181 @@
#include "SharedMutex.hpp"
#include <syscall.h>
#include <unistd.h>
#include <xmmintrin.h>
static void busy_wait(unsigned long long cycles = 3000) {
const auto stop = __builtin_ia32_rdtsc() + cycles;
do
_mm_pause();
while (__builtin_ia32_rdtsc() < stop);
}
namespace rx {
void shared_mutex::impl_lock_shared(unsigned val) {
if (val >= c_err)
std::abort(); // "shared_mutex underflow"
// Try to steal the notification bit
unsigned _old = val;
if (val & c_sig && m_value.compare_exchange_strong(_old, val - c_sig + 1)) {
return;
}
for (int i = 0; i < 10; i++) {
if (try_lock_shared()) {
return;
}
unsigned old = m_value;
if (old & c_sig && m_value.compare_exchange_strong(old, old - c_sig + 1)) {
return;
}
busy_wait();
}
// Acquire writer lock and downgrade
const unsigned old = m_value.fetch_add(c_one);
if (old == 0) {
lock_downgrade();
return;
}
if ((old % c_sig) + c_one >= c_sig)
std::abort(); // "shared_mutex overflow"
while (impl_wait() != std::errc{}) {
}
lock_downgrade();
}
void shared_mutex::impl_unlock_shared(unsigned old) {
if (old - 1 >= c_err)
std::abort(); // "shared_mutex underflow"
// Check reader count, notify the writer if necessary
if ((old - 1) % c_one == 0) {
impl_signal();
}
}
std::errc shared_mutex::impl_wait() {
while (true) {
const auto [old, ok] = m_value.fetch_op([](unsigned &value) {
if (value >= c_sig) {
value -= c_sig;
return true;
}
return false;
});
if (ok) {
break;
}
auto result = m_value.wait(old);
if (result == std::errc::interrupted) {
return result;
}
}
return {};
}
void shared_mutex::impl_signal() {
m_value += c_sig;
m_value.notify_one();
}
void shared_mutex::impl_lock(unsigned val) {
if (val >= c_err)
std::abort(); // "shared_mutex underflow"
// Try to steal the notification bit
unsigned _old = val;
if (val & c_sig &&
m_value.compare_exchange_strong(_old, val - c_sig + c_one)) {
return;
}
for (int i = 0; i < 10; i++) {
busy_wait();
unsigned old = m_value;
if (!old && try_lock()) {
return;
}
if (old & c_sig &&
m_value.compare_exchange_strong(old, old - c_sig + c_one)) {
return;
}
}
const unsigned old = m_value.fetch_add(c_one);
if (old == 0) {
return;
}
if ((old % c_sig) + c_one >= c_sig)
std::abort(); // "shared_mutex overflow"
while (impl_wait() != std::errc{}) {
}
}
void shared_mutex::impl_unlock(unsigned old) {
if (old - c_one >= c_err)
std::abort(); // "shared_mutex underflow"
// 1) Notify the next writer if necessary
// 2) Notify all readers otherwise if necessary (currently indistinguishable
// from writers)
if (old - c_one) {
impl_signal();
}
}
void shared_mutex::impl_lock_upgrade() {
for (int i = 0; i < 10; i++) {
busy_wait();
if (try_lock_upgrade()) {
return;
}
}
// Convert to writer lock
const unsigned old = m_value.fetch_add(c_one - 1);
if ((old % c_sig) + c_one - 1 >= c_sig)
std::abort(); // "shared_mutex overflow"
if (old % c_one == 1) {
return;
}
while (impl_wait() != std::errc{}) {
}
}
bool shared_mutex::lock_forced(int count) {
if (count == 0)
return false;
if (count > 0) {
// Lock
return m_value.op([&](std::uint32_t &v) {
if (v & c_sig) {
v -= c_sig;
v += c_one * count;
return true;
}
bool firstLock = v == 0;
v += c_one * count;
return firstLock;
});
}
// Remove waiters
m_value.fetch_add(c_one * count);
return true;
}
} // namespace rx