From c60e5cc48a356f6307effd4f65261b8e5e6aa171 Mon Sep 17 00:00:00 2001 From: Elad <18193363+elad335@users.noreply.github.com> Date: Mon, 10 Nov 2025 20:27:20 +0200 Subject: [PATCH] ZSTD: Fixup threads terminations --- rpcs3/util/serialization_ext.cpp | 39 ++++++++++++-------------------- rpcs3/util/serialization_ext.hpp | 1 + 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/rpcs3/util/serialization_ext.cpp b/rpcs3/util/serialization_ext.cpp index 71e3439556..5e861d97d0 100644 --- a/rpcs3/util/serialization_ext.cpp +++ b/rpcs3/util/serialization_ext.cpp @@ -1132,48 +1132,39 @@ void compressed_zstd_serialization_file_handler::finalize(utils::serial& ar) const stx::shared_ptr> empty_data = stx::make_single>(); const stx::shared_ptr> null_ptr = stx::null_ptr; - for (auto& context : m_compression_threads) + for (bool has_pending_threads = true; has_pending_threads; thread_ctrl::wait_for(500)) { - // Try to notify all on the first iteration - if (context.m_input.compare_and_swap_test(null_ptr, empty_data)) - { - context.notified = true; - context.m_input.notify_one(); - } - } - - usz pending_threads = 0; - - do - { - pending_threads = 0; + has_pending_threads = false; + // Try to notify all in bulk for (auto& context : m_compression_threads) { - // Try to notify all in bulk if (!context.notified && !context.m_input && context.m_input.compare_and_swap_test(null_ptr, empty_data)) { - context.notified = true; - context.m_input.notify_one(); + context.notify_pending = true; } } for (auto& context : m_compression_threads) { - // Wait for notification to be sent - // And wait for data to be written to be read by the thread - if (context.m_output || !context.notified) + if (context.notify_pending) { - pending_threads++; + context.notified = true; + context.notify_pending = false; + context.m_input.notify_all(); } } - if (pending_threads) + for (auto& context : m_compression_threads) { - thread_ctrl::wait_for(500); + // Wait for notification to be sent and received + // And wait for data to be written to be read by the thread + if (!context.notified || context.m_input || context.m_output) + { + has_pending_threads = true; + } } } - while (pending_threads); for (usz idx = m_output_buffer_index;;) { diff --git a/rpcs3/util/serialization_ext.hpp b/rpcs3/util/serialization_ext.hpp index 51071e7188..7f575c088b 100644 --- a/rpcs3/util/serialization_ext.hpp +++ b/rpcs3/util/serialization_ext.hpp @@ -175,6 +175,7 @@ private: { atomic_ptr> m_input; atomic_ptr> m_output; + bool notify_pending = false; bool notified = false; std::unique_ptr>> m_thread; };