diff --git a/stream.c b/stream.c index 808402f..91ae25e 100644 --- a/stream.c +++ b/stream.c @@ -1129,9 +1129,10 @@ retry: } /* First ready, are up to this thread yet? */ - if (!waited) + if (!waited) { wait_sem(&uci->ready); - waited = 1; + waited = 1; + } if (ret) goto retry; @@ -1143,7 +1144,6 @@ retry: wait_sem(&uci->ready); print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream); - post_sem(&uci->free); return 0; } @@ -1157,10 +1157,9 @@ static int fill_buffer(struct stream_info *sinfo, int stream) if (s->buf) free(s->buf); -fill_another: if (s->eos) goto out; - +fill_another: if (unlikely(seekto(sinfo, s->last_head))) return -1; @@ -1217,22 +1216,18 @@ fill_another: s->uthread_no, c_len, stream); create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no); - if (stream == 0 && last_head && control.threads > 1) { - int i; - - print_verbose("Interleaved streams detected, dropping to single threaded decompression as a precaution.\n"); - for (i = 0; i < NUM_STREAMS; i++) - sinfo->s[i].total_threads = 1; - control.threads = 1; - } else if (!last_head) - s->eos = 1; - if (++s->uthread_no == s->base_thread + s->total_threads) s->uthread_no = s->base_thread; - if (s->uthread_no != s->unext_thread && - !trywait_sem(&ucthread[s->uthread_no].free)) { - post_sem(&ucthread[s->uthread_no].free); - goto fill_another; + + /* Reached the end of this stream */ + if (!last_head) + s->eos = 1; + else if (s->uthread_no != s->unext_thread) { + /* See if the next thread is free as well */ + if (!trywait_sem(&ucthread[s->uthread_no].free)) { + post_sem(&ucthread[s->uthread_no].free); + goto fill_another; + } } out: /* This "ready" tells the decompression thread we're up to it */ @@ -1248,6 +1243,8 @@ out: /* This "ready" tells the decompression thread we've taken the buffer so * it can exit */ post_sem(&ucthread[s->unext_thread].ready); + join_pthread(threads[s->unext_thread], NULL); + post_sem(&ucthread[s->unext_thread].free); if (++s->unext_thread == s->base_thread + s->total_threads) s->unext_thread = s->base_thread;