Decompress more than one stream at a time if there are threads free and the end of one stream is reached.

Still limit total threads running to control.threads.
This affords a small speedup on decompression.
This commit is contained in:
Con Kolivas 2011-02-08 11:58:01 +11:00
parent aa00c29fba
commit 8ee9ef64f5
2 changed files with 30 additions and 12 deletions

1
rzip.h
View file

@ -278,7 +278,6 @@ struct stream {
long uthread_no; long uthread_no;
long unext_thread; long unext_thread;
long base_thread; long base_thread;
int total_threads;
}; };
struct stream_info { struct stream_info {

View file

@ -843,7 +843,7 @@ void *open_stream_in(int f, int n)
if (unlikely(!sinfo)) if (unlikely(!sinfo))
return NULL; return NULL;
total_threads = control.threads + 1; total_threads = control.threads * 2;
threads = calloc(sizeof(pthread_t), total_threads); threads = calloc(sizeof(pthread_t), total_threads);
if (unlikely(!threads)) if (unlikely(!threads))
return NULL; return NULL;
@ -867,14 +867,11 @@ void *open_stream_in(int f, int n)
return NULL; return NULL;
} }
sinfo->s[0].total_threads = 1;
sinfo->s[1].total_threads = control.threads;
for (i = 0; i < n; i++) { for (i = 0; i < n; i++) {
uchar c; uchar c;
i64 v1, v2; i64 v1, v2;
sinfo->s[i].base_thread = i; sinfo->s[i].base_thread = control.threads * i;
sinfo->s[i].uthread_no = sinfo->s[i].base_thread; sinfo->s[i].uthread_no = sinfo->s[i].base_thread;
sinfo->s[i].unext_thread = sinfo->s[i].base_thread; sinfo->s[i].unext_thread = sinfo->s[i].base_thread;
@ -1126,17 +1123,27 @@ retry:
return 0; return 0;
} }
static int threads_busy;
/* fill a buffer from a stream - return -1 on failure */ /* fill a buffer from a stream - return -1 on failure */
static int fill_buffer(struct stream_info *sinfo, int stream) static int fill_buffer(struct stream_info *sinfo, int stream)
{ {
i64 header_length, u_len, c_len, last_head; i64 header_length, u_len, c_len, last_head;
struct stream *s = &sinfo->s[stream]; struct stream *ret_s, *s;
uchar c_type, *s_buf; uchar c_type, *s_buf;
ret_s = s = &sinfo->s[stream];
fill_different_stream:
if (s->buf) if (s->buf)
free(s->buf); free(s->buf);
if (s->eos) if (s->eos) {
goto out; stream ^= 1;
if (sinfo->s[stream].eos)
goto out;
s = &sinfo->s[stream];
goto fill_different_stream;
}
fill_another: fill_another:
if (unlikely(seekto(sinfo, s->last_head))) if (unlikely(seekto(sinfo, s->last_head)))
return -1; return -1;
@ -1190,20 +1197,31 @@ fill_another:
/* List this thread as busy */ /* List this thread as busy */
ucthread[s->uthread_no].busy = 1; ucthread[s->uthread_no].busy = 1;
threads_busy++;
print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n", print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
s->uthread_no, c_len, stream); s->uthread_no, c_len, stream);
create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no); create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no);
if (++s->uthread_no == s->base_thread + s->total_threads) if (++s->uthread_no == s->base_thread + control.threads)
s->uthread_no = s->base_thread; s->uthread_no = s->base_thread;
/* Reached the end of this stream, no more data to read in, otherwise /* Reached the end of this stream, no more data to read in, otherwise
* see if the next thread is free to grab more data */ * see if the next thread is free to grab more data */
if (!last_head) if (!last_head)
s->eos = 1; s->eos = 1;
else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy) if (s->eos) {
stream ^= 1;
if (sinfo->s[stream].eos)
goto out;
s = &sinfo->s[stream];
goto fill_different_stream;
}
if (s->uthread_no != s->unext_thread &&
!ucthread[s->uthread_no].busy && threads_busy < control.threads)
goto fill_another; goto fill_another;
out: out:
s = ret_s;
/* "ready" tells the decompression thread we're ready for its data */ /* "ready" tells the decompression thread we're ready for its data */
post_sem(&ucthread[s->unext_thread].ready); post_sem(&ucthread[s->unext_thread].ready);
/* This "complete" is the deco thread telling us it's finished /* This "complete" is the deco thread telling us it's finished
@ -1217,11 +1235,12 @@ out:
join_pthread(threads[s->unext_thread], NULL); join_pthread(threads[s->unext_thread], NULL);
ucthread[s->unext_thread].busy = 0; ucthread[s->unext_thread].busy = 0;
threads_busy--;
/* As the ready semaphore may or may not have been waited on in /* As the ready semaphore may or may not have been waited on in
* ucompthread, we reset it regardless. */ * ucompthread, we reset it regardless. */
init_sem(&ucthread[s->unext_thread].ready); init_sem(&ucthread[s->unext_thread].ready);
if (++s->unext_thread == s->base_thread + s->total_threads) if (++s->unext_thread == s->base_thread + control.threads)
s->unext_thread = s->base_thread; s->unext_thread = s->base_thread;
return 0; return 0;