diff --git a/stream.c b/stream.c index 4b9c16c..4ed7e72 100644 --- a/stream.c +++ b/stream.c @@ -960,7 +960,7 @@ retry: print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n"); } - if (control.threads > 1) + if (control.threads > 1 && !waited) wait_sem(&cthread[cti->wait_on].complete); waited = 1; if (ret) @@ -1093,12 +1093,20 @@ retry: print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n"); } - post_sem(&uci->complete); - wait_sem(&uci->ready); + /* First ready, are up to this thread yet? */ + if (!waited) + wait_sem(&uci->ready); waited = 1; if (ret) goto retry; + /* This complete tells the main thread that this thread has its + * decompressed data ready */ + post_sem(&uci->complete); + /* Second ready, the buffer has been grabbed by the main thread so this + * thread can exit */ + wait_sem(&uci->ready); + print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream); post_sem(&uci->free); @@ -1184,12 +1192,18 @@ fill_another: goto fill_another; } out: + /* This ready tells the decompression thread we're up to it */ + post_sem(&ucthread[s->unext_thread].ready); + /* This complete is the deco thread telling us it's finished + * decompressing data */ wait_sem(&ucthread[s->unext_thread].complete); s->buf = ucthread[s->unext_thread].s_buf; s->buflen = ucthread[s->unext_thread].u_len; s->bufp = 0; + /* This ready tells the decompression thread we've taken the buffer so + * it can exit */ post_sem(&ucthread[s->unext_thread].ready); if (++s->unext_thread == s->base_thread + s->total_threads) @@ -1284,6 +1298,13 @@ int close_stream_in(void *ss) for (i = 0; i < sinfo->num_streams; i++) free(sinfo->s[i].buf); + for (i = 0; i < control.threads + 1; i++) { + wait_sem(&ucthread[i].free); + destroy_sem(&ucthread[i].complete); + destroy_sem(&ucthread[i].free); + destroy_sem(&ucthread[i].ready); + } + free(ucthread); free(threads); free(sinfo->s);