Change decompression threading to have a group of threads for each stream (2 in total), thus making mulithreaded decompression more robust.

This commit is contained in:
Con Kolivas 2010-12-03 19:35:48 +11:00
parent 688aa55c79
commit 2da407a178
2 changed files with 39 additions and 62 deletions

3
rzip.h
View file

@ -270,6 +270,9 @@ struct stream {
i64 buflen;
i64 bufp;
int eos;
long uthread_no;
long unext_thread;
long base_thread;
};
struct stream_info {

View file

@ -751,22 +751,23 @@ retest_malloc:
void *open_stream_in(int f, int n)
{
struct stream_info *sinfo;
int total_threads, i;
i64 header_length;
int i;
sinfo = calloc(sizeof(*sinfo), 1);
if (unlikely(!sinfo))
return NULL;
threads = (pthread_t *)calloc(sizeof(pthread_t), control.threads);
total_threads = control.threads * n;
threads = (pthread_t *)calloc(sizeof(pthread_t), total_threads);
if (unlikely(!threads))
return NULL;
ucthread = calloc(sizeof(struct uncomp_thread), control.threads);
ucthread = calloc(sizeof(struct uncomp_thread), total_threads);
if (unlikely(!ucthread))
fatal("Unable to calloc cthread in open_stream_out\n");
for (i = 0; i < control.threads; i++) {
for (i = 0; i < total_threads; i++) {
init_sem(&ucthread[i].complete);
init_sem(&ucthread[i].free);
post_sem(&ucthread[i].free);
@ -783,13 +784,14 @@ void *open_stream_in(int f, int n)
return NULL;
}
/* Flag that this is the start and stream 0 should be read first */
sinfo->s[0].eos = -1;
for (i = 0; i < n; i++) {
uchar c;
i64 v1, v2;
sinfo->s[i].base_thread = control.threads * i;
sinfo->s[i].uthread_no = sinfo->s[i].base_thread;
sinfo->s[i].unext_thread = sinfo->s[i].base_thread;
again:
if (unlikely(read_u8(f, &c)))
goto failed;
@ -976,34 +978,19 @@ static void *ucompthread(void *t)
}
/* fill a buffer from a stream - return -1 on failure */
static int fill_buffer(struct stream_info *sinfo, int ret_stream)
static int fill_buffer(struct stream_info *sinfo, int stream)
{
i64 header_length, u_len, c_len, last_head;
struct stream *s = &sinfo->s[stream];
uchar c_type, *s_buf;
static int stream = 0, last_block = 0;
if (sinfo->s[ret_stream].buf)
free(sinfo->s[ret_stream].buf);
if (s->buf)
free(s->buf);
fill_another:
/* The first time we starting filling the buffer, the first two chunks
* will be stream 0, then 1. After that we keep getting stream 1 until
* stream 1 has last_head == total_read, then there will be one more
* block of stream 1. If the next last_head is 0, the stream has ended
* otherwise we get one block of stream 0 before going back to stream
* 1 */
if (sinfo->s[0].eos == 1 && sinfo->s[1].eos == 1)
if (s->eos)
goto out;
if (sinfo->s[0].eos == -1) {
last_block = 0;
sinfo->s[0].eos = 0;
stream = 0;
} else if (sinfo->s[0].eos == 1) {
stream = 1;
} else if (sinfo->s[1].eos == 1) {
stream = 0;
}
if (unlikely(seekto(sinfo, sinfo->s[stream].last_head)))
if (unlikely(seekto(sinfo, s->last_head)))
return -1;
if (unlikely(read_u8(sinfo->fd, &c_type)))
@ -1036,7 +1023,7 @@ fill_another:
sinfo->total_read += header_length;
/* Wait till the next thread is free */
wait_sem(&ucthread[sinfo->thread_no].free);
wait_sem(&ucthread[s->uthread_no].free);
s_buf = malloc(u_len);
if (unlikely(u_len && !s_buf))
@ -1047,50 +1034,37 @@ fill_another:
sinfo->total_read += c_len;
ucthread[sinfo->thread_no].s_buf = s_buf;
ucthread[sinfo->thread_no].c_len = c_len;
ucthread[sinfo->thread_no].u_len = u_len;
ucthread[sinfo->thread_no].c_type = c_type;
ucthread[sinfo->thread_no].stream = stream;
sinfo->s[stream].last_head = last_head;
ucthread[s->uthread_no].s_buf = s_buf;
ucthread[s->uthread_no].c_len = c_len;
ucthread[s->uthread_no].u_len = u_len;
ucthread[s->uthread_no].c_type = c_type;
ucthread[s->uthread_no].stream = stream;
s->last_head = last_head;
print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
sinfo->thread_no, c_len, stream);
create_pthread(&threads[sinfo->thread_no], NULL, ucompthread, (void *)sinfo->thread_no);
s->uthread_no, c_len, stream);
create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no);
if (!last_head)
sinfo->s[stream].eos = 1;
s->eos = 1;
if (stream == 1) {
if (!last_block) {
if (last_head == sinfo->total_read)
last_block = 1;
} else {
last_block = 0;
stream = 0;
}
} else
stream = 1;
if (++sinfo->thread_no == control.threads)
sinfo->thread_no = 0;
if (!trywait_sem(&ucthread[sinfo->thread_no].free)) {
post_sem(&ucthread[sinfo->thread_no].free);
if (++s->uthread_no == s->base_thread + control.threads)
s->uthread_no = s->base_thread;
if (!trywait_sem(&ucthread[s->uthread_no].free)) {
post_sem(&ucthread[s->uthread_no].free);
goto fill_another;
}
out:
wait_sem(&ucthread[sinfo->next_thread].complete);
wait_sem(&ucthread[s->unext_thread].complete);
if (ret_stream != ucthread[sinfo->next_thread].stream)
fatal("Uncompressed stream doesn't match! Should be %d but is %d\n", ret_stream, ucthread[sinfo->next_thread].stream);
sinfo->s[ret_stream].buf = ucthread[sinfo->next_thread].s_buf;
sinfo->s[ret_stream].buflen = ucthread[sinfo->next_thread].u_len;
sinfo->s[ret_stream].bufp = 0;
s->buf = ucthread[s->unext_thread].s_buf;
s->buflen = ucthread[s->unext_thread].u_len;
s->bufp = 0;
post_sem(&ucthread[sinfo->next_thread].ready);
post_sem(&ucthread[s->unext_thread].ready);
if (++sinfo->next_thread == control.threads)
sinfo->next_thread = 0;
if (++s->unext_thread == s->base_thread + control.threads)
s->unext_thread = s->base_thread;
return 0;
}