From 2fb8cbda385028ab5cf465623badd5dd08b25032 Mon Sep 17 00:00:00 2001 From: Elad <18193363+elad335@users.noreply.github.com> Date: Mon, 10 Nov 2025 12:38:57 +0200 Subject: [PATCH] SaveStates/ZSTD: Improve termination and file handling --- rpcs3/util/serialization_ext.cpp | 44 +++++++++++++++----------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/rpcs3/util/serialization_ext.cpp b/rpcs3/util/serialization_ext.cpp index 508cd79168..71e3439556 100644 --- a/rpcs3/util/serialization_ext.cpp +++ b/rpcs3/util/serialization_ext.cpp @@ -814,7 +814,7 @@ void compressed_zstd_serialization_file_handler::initialize(utils::serial& ar) // Make sure at least one thread is free // Limit thread count in order to make sure memory limits are under control (TODO: scale with RAM size) - const usz thread_count = std::min(std::max(utils::get_thread_count(), 2) - 1, 16); + const usz thread_count = std::min(std::max(utils::get_thread_count(), 2) - 1, 32); for (usz i = 0; i < thread_count; i++) { @@ -1142,42 +1142,38 @@ void compressed_zstd_serialization_file_handler::finalize(utils::serial& ar) } } - for (auto& context : m_compression_threads) - { - // Notify to abort - while (!context.notified) - { - const auto data = context.m_input.compare_and_swap(null_ptr, empty_data); + usz pending_threads = 0; - if (!data) + do + { + pending_threads = 0; + + for (auto& context : m_compression_threads) + { + // Try to notify all in bulk + if (!context.notified && !context.m_input && context.m_input.compare_and_swap_test(null_ptr, empty_data)) { context.notified = true; context.m_input.notify_one(); - break; } - - // Wait until valid input is processed - thread_ctrl::wait_for(1000); } - } - for (auto& context : m_compression_threads) - { - // Wait for notification to be consumed - while (context.m_input) + for (auto& context : m_compression_threads) { - thread_ctrl::wait_for(1000); + // Wait for notification to be sent + // And wait for data to be written to be read by the thread + if (context.m_output || !context.notified) + { + pending_threads++; + } } - } - for (auto& context : m_compression_threads) - { - // Wait for data to be writen to be read by the thread - while (context.m_output) + if (pending_threads) { - thread_ctrl::wait_for(1000); + thread_ctrl::wait_for(500); } } + while (pending_threads); for (usz idx = m_output_buffer_index;;) {