#include "evf.hpp" #include "error/ErrorCode.hpp" #include "utils/Logs.hpp" #include orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode, std::uint64_t bitPattern, std::uint64_t *patternSet, std::uint32_t *timeout) { if (timeout != nullptr) { ORBIS_LOG_FATAL("evf: timeout is not implemented"); std::abort(); } shared_mutex mtx; writer_lock lock(mtx); bool isCanceled = false; { writer_lock lock(queueMtx); if (isDeleted) { return ErrorCode::ACCES; } auto waitingThread = WaitingThread{.thread = thread, .mtx = &mtx, .patternSet = patternSet, .isCanceled = &isCanceled, .bitPattern = bitPattern, .waitMode = waitMode}; if (auto patValue = value.load(std::memory_order::relaxed); waitingThread.test(patValue)) { auto resultValue = waitingThread.applyClear(patValue); value.store(resultValue, std::memory_order::relaxed); if (patternSet != nullptr) { *patternSet = resultValue; } return {}; } std::size_t position; if (attrs & kEvfAttrSingle) { if (waitingThreadsCount.fetch_add(1, std::memory_order::relaxed) != 0) { waitingThreadsCount.store(1, std::memory_order::relaxed); return ErrorCode::PERM; } position = 0; } else { if (attrs & kEvfAttrThFifo) { position = waitingThreadsCount.fetch_add(1, std::memory_order::relaxed); } else { // FIXME: sort waitingThreads by priority position = waitingThreadsCount.fetch_add(1, std::memory_order::relaxed); } } if (position >= std::size(waitingThreads)) { std::abort(); } waitingThreads[position] = waitingThread; } // TODO: update thread state mtx.lock(); // HACK: lock self to wait unlock from another thread :) if (isCanceled) { return ErrorCode::CANCELED; } if (isDeleted) { return ErrorCode::ACCES; } return {}; } orbis::ErrorCode orbis::EventFlag::tryWait(Thread *, std::uint8_t waitMode, std::uint64_t bitPattern, std::uint64_t *patternSet) { writer_lock lock(queueMtx); if (isDeleted) { return ErrorCode::ACCES; } auto waitingThread = WaitingThread{ .bitPattern = bitPattern, .waitMode = waitMode}; if (auto patValue = value.load(std::memory_order::relaxed); waitingThread.test(patValue)) { auto resultValue = waitingThread.applyClear(patValue); value.store(resultValue, std::memory_order::relaxed); if (patternSet != nullptr) { *patternSet = resultValue; } return {}; } return ErrorCode::BUSY; } std::size_t orbis::EventFlag::notify(bool isCancel, std::uint64_t cancelValue) { writer_lock lock(queueMtx); auto count = waitingThreadsCount.load(std::memory_order::relaxed); auto patValue = value.load(std::memory_order::relaxed); auto testThread = [&](WaitingThread *thread) { if (!isCancel && !isDeleted && !thread->test(patValue)) { return false; } auto resultValue = thread->applyClear(patValue); patValue = resultValue; if (thread->patternSet != nullptr) { *thread->patternSet = resultValue; } if (isCancel) { *thread->isCanceled = true; } else { value.store(resultValue, std::memory_order::relaxed); } // TODO: update thread state thread->mtx->unlock(); // release wait on waiter thread waitingThreadsCount.fetch_sub(1, std::memory_order::relaxed); --count; std::memmove(thread, thread + 1, waitingThreads + count - (thread + 1)); return true; }; std::size_t result = 0; if (attrs & kEvfAttrThFifo) { for (std::size_t i = count; i > 0;) { if (!testThread(waitingThreads + i - 1)) { --i; continue; } ++result; } } else { for (std::size_t i = 0; i < count;) { if (!testThread(waitingThreads + i)) { ++i; continue; } ++result; } } if (isCancel) { value.store(cancelValue, std::memory_order::relaxed); } return result; }