diff --git a/stream.c b/stream.c index 91ae25e..fad5886 100644 --- a/stream.c +++ b/stream.c @@ -1126,24 +1126,20 @@ retry: if (waited) fatal("Failed to decompress in ucompthread\n"); print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n"); - } - - /* First ready, are up to this thread yet? */ - if (!waited) { + /* This "ready" tells this thread that we're ready to receive + * its data. We do not strictly need to wait for this, so it's + * used when decompression fails due to inadequate memory to + * try again serialised. */ wait_sem(&uci->ready); waited = 1; - } - if (ret) goto retry; + } - /* This complete tells the main thread that this thread has its + /* This "complete" tells the main thread that this thread has its * decompressed data ready */ post_sem(&uci->complete); - /* Second ready, the buffer has been grabbed by the main thread so this - * thread can exit */ - wait_sem(&uci->ready); - print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream); + print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->stream); return 0; } @@ -1230,21 +1226,22 @@ fill_another: } } out: - /* This "ready" tells the decompression thread we're up to it */ + /* "ready" tells the decompression thread we're ready for its data */ post_sem(&ucthread[s->unext_thread].ready); /* This "complete" is the deco thread telling us it's finished * decompressing data */ wait_sem(&ucthread[s->unext_thread].complete); + print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread); s->buf = ucthread[s->unext_thread].s_buf; s->buflen = ucthread[s->unext_thread].u_len; s->bufp = 0; - /* This "ready" tells the decompression thread we've taken the buffer so - * it can exit */ - post_sem(&ucthread[s->unext_thread].ready); join_pthread(threads[s->unext_thread], NULL); post_sem(&ucthread[s->unext_thread].free); + /* As the ready semaphore may or may not have been waited on in + * ucompthread, we reset it regardless. */ + init_sem(&ucthread[s->unext_thread].ready); if (++s->unext_thread == s->base_thread + s->total_threads) s->unext_thread = s->base_thread;