diff --git a/rzip.h b/rzip.h index 2f6dd79..38a7a86 100644 --- a/rzip.h +++ b/rzip.h @@ -278,7 +278,6 @@ struct stream { long uthread_no; long unext_thread; long base_thread; - int total_threads; }; struct stream_info { diff --git a/stream.c b/stream.c index 1cd18ac..363c1bd 100644 --- a/stream.c +++ b/stream.c @@ -843,7 +843,7 @@ void *open_stream_in(int f, int n) if (unlikely(!sinfo)) return NULL; - total_threads = control.threads + 1; + total_threads = control.threads * 2; threads = calloc(sizeof(pthread_t), total_threads); if (unlikely(!threads)) return NULL; @@ -867,14 +867,11 @@ void *open_stream_in(int f, int n) return NULL; } - sinfo->s[0].total_threads = 1; - sinfo->s[1].total_threads = control.threads; - for (i = 0; i < n; i++) { uchar c; i64 v1, v2; - sinfo->s[i].base_thread = i; + sinfo->s[i].base_thread = control.threads * i; sinfo->s[i].uthread_no = sinfo->s[i].base_thread; sinfo->s[i].unext_thread = sinfo->s[i].base_thread; @@ -1126,17 +1123,27 @@ retry: return 0; } +static int threads_busy; + /* fill a buffer from a stream - return -1 on failure */ static int fill_buffer(struct stream_info *sinfo, int stream) { i64 header_length, u_len, c_len, last_head; - struct stream *s = &sinfo->s[stream]; + struct stream *ret_s, *s; uchar c_type, *s_buf; + ret_s = s = &sinfo->s[stream]; + +fill_different_stream: if (s->buf) free(s->buf); - if (s->eos) - goto out; + if (s->eos) { + stream ^= 1; + if (sinfo->s[stream].eos) + goto out; + s = &sinfo->s[stream]; + goto fill_different_stream; + } fill_another: if (unlikely(seekto(sinfo, s->last_head))) return -1; @@ -1190,20 +1197,31 @@ fill_another: /* List this thread as busy */ ucthread[s->uthread_no].busy = 1; + threads_busy++; print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n", s->uthread_no, c_len, stream); create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no); - if (++s->uthread_no == s->base_thread + s->total_threads) + if (++s->uthread_no == s->base_thread + control.threads) s->uthread_no = s->base_thread; /* Reached the end of this stream, no more data to read in, otherwise * see if the next thread is free to grab more data */ if (!last_head) s->eos = 1; - else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy) + if (s->eos) { + stream ^= 1; + if (sinfo->s[stream].eos) + goto out; + s = &sinfo->s[stream]; + goto fill_different_stream; + } + if (s->uthread_no != s->unext_thread && + !ucthread[s->uthread_no].busy && threads_busy < control.threads) goto fill_another; out: + s = ret_s; + /* "ready" tells the decompression thread we're ready for its data */ post_sem(&ucthread[s->unext_thread].ready); /* This "complete" is the deco thread telling us it's finished @@ -1217,11 +1235,12 @@ out: join_pthread(threads[s->unext_thread], NULL); ucthread[s->unext_thread].busy = 0; + threads_busy--; /* As the ready semaphore may or may not have been waited on in * ucompthread, we reset it regardless. */ init_sem(&ucthread[s->unext_thread].ready); - if (++s->unext_thread == s->base_thread + s->total_threads) + if (++s->unext_thread == s->base_thread + control.threads) s->unext_thread = s->base_thread; return 0;