From 2da407a178e83a4c8cfbd19806573a892afd8f7b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 3 Dec 2010 19:35:48 +1100 Subject: [PATCH] Change decompression threading to have a group of threads for each stream (2 in total), thus making mulithreaded decompression more robust. --- rzip.h | 3 ++ stream.c | 98 +++++++++++++++++++++----------------------------------- 2 files changed, 39 insertions(+), 62 deletions(-) diff --git a/rzip.h b/rzip.h index 1dd6442..4517bbd 100644 --- a/rzip.h +++ b/rzip.h @@ -270,6 +270,9 @@ struct stream { i64 buflen; i64 bufp; int eos; + long uthread_no; + long unext_thread; + long base_thread; }; struct stream_info { diff --git a/stream.c b/stream.c index 0c07309..2adbb83 100644 --- a/stream.c +++ b/stream.c @@ -751,22 +751,23 @@ retest_malloc: void *open_stream_in(int f, int n) { struct stream_info *sinfo; + int total_threads, i; i64 header_length; - int i; sinfo = calloc(sizeof(*sinfo), 1); if (unlikely(!sinfo)) return NULL; - threads = (pthread_t *)calloc(sizeof(pthread_t), control.threads); + total_threads = control.threads * n; + threads = (pthread_t *)calloc(sizeof(pthread_t), total_threads); if (unlikely(!threads)) return NULL; - ucthread = calloc(sizeof(struct uncomp_thread), control.threads); + ucthread = calloc(sizeof(struct uncomp_thread), total_threads); if (unlikely(!ucthread)) fatal("Unable to calloc cthread in open_stream_out\n"); - for (i = 0; i < control.threads; i++) { + for (i = 0; i < total_threads; i++) { init_sem(&ucthread[i].complete); init_sem(&ucthread[i].free); post_sem(&ucthread[i].free); @@ -783,13 +784,14 @@ void *open_stream_in(int f, int n) return NULL; } - /* Flag that this is the start and stream 0 should be read first */ - sinfo->s[0].eos = -1; - for (i = 0; i < n; i++) { uchar c; i64 v1, v2; + sinfo->s[i].base_thread = control.threads * i; + sinfo->s[i].uthread_no = sinfo->s[i].base_thread; + sinfo->s[i].unext_thread = sinfo->s[i].base_thread; + again: if (unlikely(read_u8(f, &c))) goto failed; @@ -976,34 +978,19 @@ static void *ucompthread(void *t) } /* fill a buffer from a stream - return -1 on failure */ -static int fill_buffer(struct stream_info *sinfo, int ret_stream) +static int fill_buffer(struct stream_info *sinfo, int stream) { i64 header_length, u_len, c_len, last_head; + struct stream *s = &sinfo->s[stream]; uchar c_type, *s_buf; - static int stream = 0, last_block = 0; - if (sinfo->s[ret_stream].buf) - free(sinfo->s[ret_stream].buf); + if (s->buf) + free(s->buf); fill_another: - /* The first time we starting filling the buffer, the first two chunks - * will be stream 0, then 1. After that we keep getting stream 1 until - * stream 1 has last_head == total_read, then there will be one more - * block of stream 1. If the next last_head is 0, the stream has ended - * otherwise we get one block of stream 0 before going back to stream - * 1 */ - if (sinfo->s[0].eos == 1 && sinfo->s[1].eos == 1) + if (s->eos) goto out; - if (sinfo->s[0].eos == -1) { - last_block = 0; - sinfo->s[0].eos = 0; - stream = 0; - } else if (sinfo->s[0].eos == 1) { - stream = 1; - } else if (sinfo->s[1].eos == 1) { - stream = 0; - } - if (unlikely(seekto(sinfo, sinfo->s[stream].last_head))) + if (unlikely(seekto(sinfo, s->last_head))) return -1; if (unlikely(read_u8(sinfo->fd, &c_type))) @@ -1036,7 +1023,7 @@ fill_another: sinfo->total_read += header_length; /* Wait till the next thread is free */ - wait_sem(&ucthread[sinfo->thread_no].free); + wait_sem(&ucthread[s->uthread_no].free); s_buf = malloc(u_len); if (unlikely(u_len && !s_buf)) @@ -1047,50 +1034,37 @@ fill_another: sinfo->total_read += c_len; - ucthread[sinfo->thread_no].s_buf = s_buf; - ucthread[sinfo->thread_no].c_len = c_len; - ucthread[sinfo->thread_no].u_len = u_len; - ucthread[sinfo->thread_no].c_type = c_type; - ucthread[sinfo->thread_no].stream = stream; - sinfo->s[stream].last_head = last_head; + ucthread[s->uthread_no].s_buf = s_buf; + ucthread[s->uthread_no].c_len = c_len; + ucthread[s->uthread_no].u_len = u_len; + ucthread[s->uthread_no].c_type = c_type; + ucthread[s->uthread_no].stream = stream; + s->last_head = last_head; print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n", - sinfo->thread_no, c_len, stream); - create_pthread(&threads[sinfo->thread_no], NULL, ucompthread, (void *)sinfo->thread_no); + s->uthread_no, c_len, stream); + create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no); if (!last_head) - sinfo->s[stream].eos = 1; + s->eos = 1; - if (stream == 1) { - if (!last_block) { - if (last_head == sinfo->total_read) - last_block = 1; - } else { - last_block = 0; - stream = 0; - } - } else - stream = 1; - - if (++sinfo->thread_no == control.threads) - sinfo->thread_no = 0; - if (!trywait_sem(&ucthread[sinfo->thread_no].free)) { - post_sem(&ucthread[sinfo->thread_no].free); + if (++s->uthread_no == s->base_thread + control.threads) + s->uthread_no = s->base_thread; + if (!trywait_sem(&ucthread[s->uthread_no].free)) { + post_sem(&ucthread[s->uthread_no].free); goto fill_another; } out: - wait_sem(&ucthread[sinfo->next_thread].complete); + wait_sem(&ucthread[s->unext_thread].complete); - if (ret_stream != ucthread[sinfo->next_thread].stream) - fatal("Uncompressed stream doesn't match! Should be %d but is %d\n", ret_stream, ucthread[sinfo->next_thread].stream); - sinfo->s[ret_stream].buf = ucthread[sinfo->next_thread].s_buf; - sinfo->s[ret_stream].buflen = ucthread[sinfo->next_thread].u_len; - sinfo->s[ret_stream].bufp = 0; + s->buf = ucthread[s->unext_thread].s_buf; + s->buflen = ucthread[s->unext_thread].u_len; + s->bufp = 0; - post_sem(&ucthread[sinfo->next_thread].ready); + post_sem(&ucthread[s->unext_thread].ready); - if (++sinfo->next_thread == control.threads) - sinfo->next_thread = 0; + if (++s->unext_thread == s->base_thread + control.threads) + s->unext_thread = s->base_thread; return 0; }