ZSTD: Fixup threads terminations

This commit is contained in:
Elad 2025-11-10 20:27:20 +02:00
parent be411d2232
commit 6fd5b0fe70
2 changed files with 16 additions and 24 deletions

View file

@ -1132,48 +1132,39 @@ void compressed_zstd_serialization_file_handler::finalize(utils::serial& ar)
const stx::shared_ptr<std::vector<u8>> empty_data = stx::make_single<std::vector<u8>>();
const stx::shared_ptr<std::vector<u8>> null_ptr = stx::null_ptr;
for (auto& context : m_compression_threads)
for (bool has_pending_threads = true; has_pending_threads; thread_ctrl::wait_for(500))
{
// Try to notify all on the first iteration
if (context.m_input.compare_and_swap_test(null_ptr, empty_data))
{
context.notified = true;
context.m_input.notify_one();
}
}
usz pending_threads = 0;
do
{
pending_threads = 0;
has_pending_threads = false;
// Try to notify all in bulk
for (auto& context : m_compression_threads)
{
// Try to notify all in bulk
if (!context.notified && !context.m_input && context.m_input.compare_and_swap_test(null_ptr, empty_data))
{
context.notified = true;
context.m_input.notify_one();
context.notify_pending = true;
}
}
for (auto& context : m_compression_threads)
{
// Wait for notification to be sent
// And wait for data to be written to be read by the thread
if (context.m_output || !context.notified)
if (context.notify_pending)
{
pending_threads++;
context.notified = true;
context.notify_pending = false;
context.m_input.notify_all();
}
}
if (pending_threads)
for (auto& context : m_compression_threads)
{
thread_ctrl::wait_for(500);
// Wait for notification to be sent and received
// And wait for data to be written to be read by the thread
if (!context.notified || context.m_input || context.m_output)
{
has_pending_threads = true;
}
}
}
while (pending_threads);
for (usz idx = m_output_buffer_index;;)
{

View file

@ -175,6 +175,7 @@ private:
{
atomic_ptr<std::vector<u8>> m_input;
atomic_ptr<std::vector<u8>> m_output;
bool notify_pending = false;
bool notified = false;
std::unique_ptr<named_thread<std::function<void()>>> m_thread;
};