rpcsx/orbis-kernel/src/evf.cpp

174 lines
4.5 KiB
C++
Raw Normal View History

2023-07-05 00:43:47 +02:00
#include "evf.hpp"
#include "error/ErrorCode.hpp"
#include "utils/Logs.hpp"
#include <atomic>
orbis::ErrorCode orbis::EventFlag::wait(Thread *thread, std::uint8_t waitMode,
std::uint64_t bitPattern,
std::uint64_t *patternSet,
std::uint32_t *timeout) {
if (timeout != nullptr) {
ORBIS_LOG_FATAL("evf: timeout is not implemented");
std::abort();
}
shared_mutex mtx;
writer_lock lock(mtx);
bool isCanceled = false;
{
writer_lock lock(queueMtx);
if (isDeleted) {
return ErrorCode::ACCES;
}
auto waitingThread = WaitingThread{.thread = thread,
.mtx = &mtx,
.patternSet = patternSet,
.isCanceled = &isCanceled,
.bitPattern = bitPattern,
.waitMode = waitMode};
if (auto patValue = value.load(std::memory_order::relaxed);
waitingThread.test(patValue)) {
auto resultValue = waitingThread.applyClear(patValue);
value.store(resultValue, std::memory_order::relaxed);
if (patternSet != nullptr) {
*patternSet = resultValue;
}
return {};
}
std::size_t position;
if (attrs & kEvfAttrSingle) {
if (waitingThreadsCount.fetch_add(1, std::memory_order::relaxed) != 0) {
waitingThreadsCount.store(1, std::memory_order::relaxed);
return ErrorCode::PERM;
}
position = 0;
} else {
if (attrs & kEvfAttrThFifo) {
position = waitingThreadsCount.fetch_add(1, std::memory_order::relaxed);
} else {
// FIXME: sort waitingThreads by priority
position = waitingThreadsCount.fetch_add(1, std::memory_order::relaxed);
}
}
if (position >= std::size(waitingThreads)) {
std::abort();
}
waitingThreads[position] = waitingThread;
}
// TODO: update thread state
mtx.lock(); // HACK: lock self to wait unlock from another thread :)
if (isCanceled) {
return ErrorCode::CANCELED;
}
if (isDeleted) {
return ErrorCode::ACCES;
}
return {};
}
2023-07-06 18:16:25 +02:00
orbis::ErrorCode orbis::EventFlag::tryWait(Thread *, std::uint8_t waitMode,
2023-07-05 00:43:47 +02:00
std::uint64_t bitPattern,
std::uint64_t *patternSet) {
writer_lock lock(queueMtx);
if (isDeleted) {
return ErrorCode::ACCES;
}
2023-07-06 18:16:25 +02:00
auto waitingThread =
WaitingThread{.bitPattern = bitPattern, .waitMode = waitMode};
2023-07-05 00:43:47 +02:00
if (auto patValue = value.load(std::memory_order::relaxed);
waitingThread.test(patValue)) {
auto resultValue = waitingThread.applyClear(patValue);
value.store(resultValue, std::memory_order::relaxed);
if (patternSet != nullptr) {
*patternSet = resultValue;
}
return {};
}
return ErrorCode::BUSY;
}
std::size_t orbis::EventFlag::notify(NotifyType type, std::uint64_t bits) {
2023-07-05 00:43:47 +02:00
writer_lock lock(queueMtx);
auto count = waitingThreadsCount.load(std::memory_order::relaxed);
auto patValue = value.load(std::memory_order::relaxed);
if (type == NotifyType::Destroy) {
isDeleted = true;
} else if (type == NotifyType::Set) {
patValue |= bits;
}
2023-07-05 00:43:47 +02:00
auto testThread = [&](WaitingThread *thread) {
if (type != NotifyType::Set && !thread->test(patValue)) {
2023-07-05 00:43:47 +02:00
return false;
}
auto resultValue = thread->applyClear(patValue);
patValue = resultValue;
if (thread->patternSet != nullptr) {
*thread->patternSet = resultValue;
}
if (type == NotifyType::Cancel) {
2023-07-05 00:43:47 +02:00
*thread->isCanceled = true;
}
// TODO: update thread state
thread->mtx->unlock(); // release wait on waiter thread
waitingThreadsCount.fetch_sub(1, std::memory_order::relaxed);
2023-07-06 18:16:25 +02:00
std::memmove(thread, thread + 1,
(waitingThreads + count - (thread + 1)) *
sizeof(*waitingThreads));
2023-07-05 00:43:47 +02:00
--count;
return true;
};
std::size_t result = 0;
if (attrs & kEvfAttrThFifo) {
for (std::size_t i = count; i > 0;) {
if (!testThread(waitingThreads + i - 1)) {
--i;
continue;
}
++result;
}
} else {
for (std::size_t i = 0; i < count;) {
if (!testThread(waitingThreads + i)) {
++i;
continue;
}
++result;
}
}
if (type == NotifyType::Cancel) {
value.store(bits, std::memory_order::relaxed);
} else {
value.store(patValue, std::memory_order::relaxed);
2023-07-05 00:43:47 +02:00
}
return result;
}