From 8d110e3366d50d07a0f660f5a6936299822415eb Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 6 Feb 2011 11:21:36 +1100 Subject: [PATCH] Remove the check for interleaved streams as it wasn't achieving anything. Make sure the thread has really exited before moving on, and set the free semaphore from outside the thread once it has joined. --- stream.c | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/stream.c b/stream.c index 808402f..91ae25e 100644 --- a/stream.c +++ b/stream.c @@ -1129,9 +1129,10 @@ retry: } /* First ready, are up to this thread yet? */ - if (!waited) + if (!waited) { wait_sem(&uci->ready); - waited = 1; + waited = 1; + } if (ret) goto retry; @@ -1143,7 +1144,6 @@ retry: wait_sem(&uci->ready); print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream); - post_sem(&uci->free); return 0; } @@ -1157,10 +1157,9 @@ static int fill_buffer(struct stream_info *sinfo, int stream) if (s->buf) free(s->buf); -fill_another: if (s->eos) goto out; - +fill_another: if (unlikely(seekto(sinfo, s->last_head))) return -1; @@ -1217,22 +1216,18 @@ fill_another: s->uthread_no, c_len, stream); create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no); - if (stream == 0 && last_head && control.threads > 1) { - int i; - - print_verbose("Interleaved streams detected, dropping to single threaded decompression as a precaution.\n"); - for (i = 0; i < NUM_STREAMS; i++) - sinfo->s[i].total_threads = 1; - control.threads = 1; - } else if (!last_head) - s->eos = 1; - if (++s->uthread_no == s->base_thread + s->total_threads) s->uthread_no = s->base_thread; - if (s->uthread_no != s->unext_thread && - !trywait_sem(&ucthread[s->uthread_no].free)) { - post_sem(&ucthread[s->uthread_no].free); - goto fill_another; + + /* Reached the end of this stream */ + if (!last_head) + s->eos = 1; + else if (s->uthread_no != s->unext_thread) { + /* See if the next thread is free as well */ + if (!trywait_sem(&ucthread[s->uthread_no].free)) { + post_sem(&ucthread[s->uthread_no].free); + goto fill_another; + } } out: /* This "ready" tells the decompression thread we're up to it */ @@ -1248,6 +1243,8 @@ out: /* This "ready" tells the decompression thread we've taken the buffer so * it can exit */ post_sem(&ucthread[s->unext_thread].ready); + join_pthread(threads[s->unext_thread], NULL); + post_sem(&ucthread[s->unext_thread].free); if (++s->unext_thread == s->base_thread + s->total_threads) s->unext_thread = s->base_thread;