From 89fa2e96e27de61357fc8f0c1e64d12aee2c3405 Mon Sep 17 00:00:00 2001 From: DH Date: Sun, 28 Dec 2025 19:24:03 +0300 Subject: [PATCH] rx/SharedAtomic: fix win32 IPC implementation, wait & notify_one is only supported apis --- rx/src/SharedAtomic.cpp | 78 ++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/rx/src/SharedAtomic.cpp b/rx/src/SharedAtomic.cpp index 6f9ca99c4..e4214200d 100644 --- a/rx/src/SharedAtomic.cpp +++ b/rx/src/SharedAtomic.cpp @@ -143,6 +143,27 @@ int shared_atomic32::notify_n(int count) const { #include #include +static auto g_events = [] { + std::array result; + SECURITY_ATTRIBUTES securityAttr{ + .nLength = sizeof(SECURITY_ATTRIBUTES), + .lpSecurityDescriptor = nullptr, + .bInheritHandle = true, + }; + + for (auto &handle : result) { + handle = CreateEvent(&securityAttr, false, false, nullptr); + } + return result; +}(); + +static HANDLE getEventFor(const shared_atomic32 *atomic) { + auto hash = (2654404609 * std::bit_cast(atomic)) >> 26; + hash ^= hash >> 16; + hash ^= hash >> 8; + return g_events[hash & 0xff]; +} + std::errc shared_atomic32::wait_impl(std::uint32_t oldValue, std::chrono::microseconds usec_timeout) { @@ -151,57 +172,48 @@ std::errc shared_atomic32::wait_impl(std::uint32_t oldValue, bool unblock = (!useTimeout || usec_timeout.count() > 1000) && g_scopedUnblock != nullptr; + auto event = getEventFor(this); + if (unblock) { if (!g_scopedUnblock(true)) { return std::errc::interrupted; } } - BOOL result = WaitOnAddress( - this, &oldValue, sizeof(std::uint32_t), - useTimeout - ? std::chrono::duration_cast(usec_timeout) - .count() - : INFINITY); - - DWORD error = 0; - if (!result) { - error = GetLastError(); - } else { - if (load(std::memory_order::relaxed) == oldValue) { - error = ERROR_ALERTED; // dummy error - } + if (load(std::memory_order::relaxed) != oldValue) { + return {}; } - if (unblock) { - if (!g_scopedUnblock(false)) { - if (result != TRUE) { - return std::errc::interrupted; - } + auto timeoutMs = INFINITE; - return {}; - } + if (useTimeout) { + timeoutMs = + std::chrono::duration_cast(usec_timeout) + .count(); } - if (error == ERROR_TIMEOUT) { + auto result = WaitForSingleObject(event, timeoutMs); + + bool unblockInterrupted = unblock && !g_scopedUnblock(false); + + if (result == WAIT_OBJECT_0) { + return {}; + } + + if (result == WAIT_TIMEOUT) { return std::errc::timed_out; } + if (unblockInterrupted) { + return std::errc::interrupted; + } + return std::errc::resource_unavailable_try_again; } int shared_atomic32::notify_n(int count) const { - if (count == 1) { - WakeByAddressSingle(const_cast(this)); - } else if (count == std::numeric_limits::max()) { - WakeByAddressAll(const_cast(this)); - } else { - for (int i = 0; i < count; ++i) { - WakeByAddressSingle(const_cast(this)); - } - } - - return 1; // FIXME + SetEvent(getEventFor(this)); + return 1; } #else #error Unimplemented atomic for this platform