rpcsx/orbis-kernel/src/evf.cpp

192 lines
4.9 KiB
C++
Raw Normal View History

2023-07-05 00:43:47 +02:00
#include "evf.hpp"
#include "error/ErrorCode.hpp"
#include "utils/Logs.hpp"
#include "utils/SharedCV.hpp"
2023-07-05 00:43:47 +02:00
#include <atomic>
orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode,
std::uint64_t bitPattern,
std::uint64_t *patternSet,
std::uint32_t *timeout) {
utils::shared_cv cv;
2023-07-05 00:43:47 +02:00
bool isCanceled = false;
using namespace std::chrono;
steady_clock::time_point start{};
if (timeout)
start = steady_clock::now();
uint64_t elapsed = 0;
auto update_timeout = [&] {
if (!timeout)
return;
auto now = steady_clock::now();
elapsed = duration_cast<microseconds>(now - start).count();
if (*timeout >= elapsed) {
return;
}
*timeout = 0;
};
2023-07-05 00:43:47 +02:00
std::unique_lock lock(queueMtx);
while (true) {
2023-07-05 00:43:47 +02:00
if (isDeleted) {
return ErrorCode::ACCES;
}
if (isCanceled) {
return ErrorCode::CANCELED;
}
2023-07-05 00:43:47 +02:00
auto waitingThread = WaitingThread{.thread = thread,
.cv = &cv,
2023-07-05 00:43:47 +02:00
.patternSet = patternSet,
.isCanceled = &isCanceled,
.bitPattern = bitPattern,
.waitMode = waitMode};
if (auto patValue = value.load(std::memory_order::relaxed);
waitingThread.test(patValue)) {
auto resultValue = waitingThread.applyClear(patValue);
value.store(resultValue, std::memory_order::relaxed);
if (patternSet != nullptr) {
*patternSet = resultValue;
}
// Success
break;
}
if (timeout && *timeout == 0) {
return ErrorCode::TIMEDOUT;
2023-07-05 00:43:47 +02:00
}
std::size_t position;
if (attrs & kEvfAttrSingle) {
if (waitingThreadsCount.fetch_add(1, std::memory_order::relaxed) != 0) {
waitingThreadsCount.store(1, std::memory_order::relaxed);
return ErrorCode::PERM;
}
position = 0;
} else {
if (attrs & kEvfAttrThFifo) {
position = waitingThreadsCount.fetch_add(1, std::memory_order::relaxed);
} else {
// FIXME: sort waitingThreads by priority
position = waitingThreadsCount.fetch_add(1, std::memory_order::relaxed);
}
}
if (position >= std::size(waitingThreads)) {
std::abort();
}
waitingThreads[position] = waitingThread;
if (timeout) {
cv.wait(queueMtx, *timeout - elapsed);
update_timeout();
continue;
}
cv.wait(queueMtx);
2023-07-05 00:43:47 +02:00
}
// TODO: update thread state
2023-07-05 00:43:47 +02:00
return {};
}
2023-07-06 18:16:25 +02:00
orbis::ErrorCode orbis::EventFlag::tryWait(Thread *, std::uint8_t waitMode,
2023-07-05 00:43:47 +02:00
std::uint64_t bitPattern,
std::uint64_t *patternSet) {
writer_lock lock(queueMtx);
if (isDeleted) {
return ErrorCode::ACCES;
}
2023-07-06 18:16:25 +02:00
auto waitingThread =
WaitingThread{.bitPattern = bitPattern, .waitMode = waitMode};
2023-07-05 00:43:47 +02:00
if (auto patValue = value.load(std::memory_order::relaxed);
waitingThread.test(patValue)) {
auto resultValue = waitingThread.applyClear(patValue);
value.store(resultValue, std::memory_order::relaxed);
if (patternSet != nullptr) {
*patternSet = resultValue;
}
return {};
}
return ErrorCode::BUSY;
}
std::size_t orbis::EventFlag::notify(NotifyType type, std::uint64_t bits) {
2023-07-05 00:43:47 +02:00
writer_lock lock(queueMtx);
auto count = waitingThreadsCount.load(std::memory_order::relaxed);
auto patValue = value.load(std::memory_order::relaxed);
if (type == NotifyType::Destroy) {
isDeleted = true;
} else if (type == NotifyType::Set) {
patValue |= bits;
}
2023-07-05 00:43:47 +02:00
auto testThread = [&](WaitingThread *thread) {
if (type == NotifyType::Set && !thread->test(patValue)) {
2023-07-05 00:43:47 +02:00
return false;
}
auto resultValue = thread->applyClear(patValue);
patValue = resultValue;
if (thread->patternSet != nullptr) {
*thread->patternSet = resultValue;
}
if (type == NotifyType::Cancel) {
2023-07-05 00:43:47 +02:00
*thread->isCanceled = true;
}
// TODO: update thread state
// release wait on waiter thread
thread->cv->notify_one(queueMtx);
2023-07-05 00:43:47 +02:00
waitingThreadsCount.fetch_sub(1, std::memory_order::relaxed);
2023-07-06 18:16:25 +02:00
std::memmove(thread, thread + 1,
(waitingThreads + count - (thread + 1)) *
sizeof(*waitingThreads));
2023-07-05 00:43:47 +02:00
--count;
return true;
};
std::size_t result = 0;
if (attrs & kEvfAttrThFifo) {
for (std::size_t i = count; i > 0;) {
if (!testThread(waitingThreads + i - 1)) {
--i;
continue;
}
++result;
}
} else {
for (std::size_t i = 0; i < count;) {
if (!testThread(waitingThreads + i)) {
++i;
continue;
}
++result;
}
}
if (type == NotifyType::Cancel) {
value.store(bits, std::memory_order::relaxed);
} else {
value.store(patValue, std::memory_order::relaxed);
2023-07-05 00:43:47 +02:00
}
return result;
}