diff --git a/orbis-kernel/include/orbis/evf.hpp b/orbis-kernel/include/orbis/evf.hpp index 5f88c1245..8ad5bce46 100644 --- a/orbis-kernel/include/orbis/evf.hpp +++ b/orbis-kernel/include/orbis/evf.hpp @@ -1,6 +1,7 @@ #pragma once #include "KernelAllocator.hpp" #include "thread/Thread.hpp" +#include "utils/SharedCV.hpp" #include "utils/SharedMutex.hpp" #include #include @@ -31,7 +32,7 @@ struct EventFlag final { struct WaitingThread { Thread *thread; - shared_mutex *mtx; + utils::shared_cv *cv; std::uint64_t *patternSet; bool *isCanceled; std::uint64_t bitPattern; diff --git a/orbis-kernel/src/evf.cpp b/orbis-kernel/src/evf.cpp index 0634df182..e45b2578e 100644 --- a/orbis-kernel/src/evf.cpp +++ b/orbis-kernel/src/evf.cpp @@ -1,31 +1,45 @@ #include "evf.hpp" #include "error/ErrorCode.hpp" #include "utils/Logs.hpp" +#include "utils/SharedCV.hpp" #include orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode, std::uint64_t bitPattern, std::uint64_t *patternSet, std::uint32_t *timeout) { - if (timeout != nullptr) { - ORBIS_LOG_FATAL("evf: timeout is not implemented"); - std::abort(); - } - - shared_mutex mtx; - writer_lock lock(mtx); + utils::shared_cv cv; bool isCanceled = false; - { - writer_lock lock(queueMtx); + std::chrono::steady_clock::time_point start{}; + if (timeout) + start = std::chrono::steady_clock::now(); + auto update_timeout = [&] { + if (!timeout) + return; + auto now = std::chrono::steady_clock::now(); + auto dif = (now - start).count() / 1000; + if (*timeout > dif) { + *timeout -= dif; + start += std::chrono::microseconds(dif); + return; + } + *timeout = 0; + }; + + std::unique_lock lock(queueMtx); + while (true) { if (isDeleted) { return ErrorCode::ACCES; } + if (isCanceled) { + return ErrorCode::CANCELED; + } auto waitingThread = WaitingThread{.thread = thread, - .mtx = &mtx, + .cv = &cv, .patternSet = patternSet, .isCanceled = &isCanceled, .bitPattern = bitPattern, @@ -38,7 +52,12 @@ orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode, if (patternSet != nullptr) { *patternSet = resultValue; } - return {}; + // Success + break; + } + + if (timeout && *timeout == 0) { + return ErrorCode::TIMEDOUT; } std::size_t position; @@ -64,20 +83,12 @@ orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode, } waitingThreads[position] = waitingThread; + + cv.wait(queueMtx, timeout ? *timeout : -1ul); + update_timeout(); } // TODO: update thread state - - mtx.lock(); // HACK: lock self to wait unlock from another thread :) - - if (isCanceled) { - return ErrorCode::CANCELED; - } - - if (isDeleted) { - return ErrorCode::ACCES; - } - return {}; } @@ -133,9 +144,8 @@ std::size_t orbis::EventFlag::notify(NotifyType type, std::uint64_t bits) { } // TODO: update thread state - if (thread->mtx->is_free()) - std::abort(); - thread->mtx->unlock(); // release wait on waiter thread + // release wait on waiter thread + thread->cv->notify_one(queueMtx); waitingThreadsCount.fetch_sub(1, std::memory_order::relaxed); std::memmove(thread, thread + 1,