From bb33f7571cca2d151192bcb7bb60f53f2e40447b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 22 Feb 2011 00:49:50 +1100 Subject: [PATCH] Multi-threading speed ups. Add one more thread on compression and decompression to account for the staggered nature of thread recruitment. Make the initial buffer slightly smaller and make it progressively larger, thus recruiting threads sooner and more evenly. This also speeds up decompression for the same reason. Check the amount of memory being used by each thread on decompression to ensure we don't try to recruit too much ram. --- rzip.c | 7 ------- rzip.h | 3 +++ stream.c | 49 ++++++++++++++++++++++++++++++++++++------------- util.c | 7 +++++++ 4 files changed, 46 insertions(+), 20 deletions(-) diff --git a/rzip.c b/rzip.c index ea2d424..49ec57b 100644 --- a/rzip.c +++ b/rzip.c @@ -100,13 +100,6 @@ struct sliding_buffer { int fd; /* The fd of the mmap */ } sb; /* Sliding buffer */ -static void round_to_page(i64 *size) -{ - *size -= *size % control.page_size; - if (unlikely(!*size)) - *size = control.page_size; -} - static void remap_low_sb(void) { i64 new_offset; diff --git a/rzip.h b/rzip.h index f73b634..349d6e7 100644 --- a/rzip.h +++ b/rzip.h @@ -301,9 +301,11 @@ struct stream_info { int num_streams; int fd; i64 bufsize; + i64 max_bufsize; i64 cur_pos; i64 initial_pos; i64 total_read; + i64 ram_alloced; long thread_no; long next_thread; int chunks; @@ -328,6 +330,7 @@ void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, const i64 two_gig; void prepare_streamout_threads(void); void close_streamout_threads(void); +void round_to_page(i64 *size); #define print_err(format, args...) do {\ fprintf(stderr, format, ##args); \ diff --git a/stream.c b/stream.c index 5efe422..1fef9fa 100644 --- a/stream.c +++ b/stream.c @@ -718,6 +718,11 @@ void prepare_streamout_threads(void) { int i; + /* As we serialise the generation of threads during the rzip + * pre-processing stage, it's faster to have one more thread available + * to keep all CPUs busy. */ + if (control.threads > 1) + ++control.threads; threads = calloc(sizeof(pthread_t), control.threads); if (unlikely(!threads)) fatal("Unable to calloc threads in prepare_streamout_threads\n"); @@ -797,16 +802,21 @@ retest_malloc: free(testmalloc); print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", testsize); - sinfo->bufsize = limit; - - /* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the - * bufsize to fit X threads into it */ - sinfo->bufsize = MIN(sinfo->bufsize, - MAX((sinfo->bufsize + control.threads - 1) / control.threads, STREAM_BUFSIZE)); + /* We start with slightly smaller buffers to start loading CPUs as soon + * as possible and make them exponentially larger approaching the + * tested maximum size. We ensure the buffers are of a minimum size, + * though, as compression efficency drops off dramatically with tiny + * buffers. */ + if (control.threads > 1) { + sinfo->max_bufsize = limit / control.threads; + sinfo->bufsize = sinfo->max_bufsize * 63 / 100; + round_to_page(&sinfo->bufsize); + sinfo->bufsize = MAX(sinfo->bufsize, STREAM_BUFSIZE); + } if (control.threads > 1) - print_maxverbose("Using %d threads to compress up to %lld bytes each.\n", - control.threads, sinfo->bufsize); + print_maxverbose("Using up to %d threads to compress up to %lld bytes each.\n", + control.threads, sinfo->max_bufsize); else print_maxverbose("Using 1 thread to compress up to %lld bytes\n", sinfo->bufsize); @@ -831,7 +841,9 @@ void *open_stream_in(int f, int n) if (unlikely(!sinfo)) return NULL; - total_threads = control.threads + 1; + /* We have one thread dedicated to stream 0, and one more thread than + * CPUs to keep them busy. */ + total_threads = control.threads + 2; threads = calloc(sizeof(pthread_t), total_threads); if (unlikely(!threads)) return NULL; @@ -851,7 +863,7 @@ void *open_stream_in(int f, int n) } sinfo->s[0].total_threads = 1; - sinfo->s[1].total_threads = control.threads; + sinfo->s[1].total_threads = total_threads - 1; for (i = 0; i < n; i++) { uchar c; @@ -1042,6 +1054,12 @@ static void clear_buffer(struct stream_info *sinfo, int stream, int newbuf) i, cthread[i].s_len, stream); create_pthread(&threads[i], NULL, compthread, (void *)i); + if (control.threads > 1) { + sinfo->bufsize += (sinfo->max_bufsize - sinfo->bufsize) * 63 / 100; + round_to_page(&sinfo->bufsize); + sinfo->bufsize = MAX(sinfo->bufsize, STREAM_BUFSIZE); + } + if (newbuf) { /* The stream buffer has been given to the thread, allocate a new one */ sinfo->s[stream].buf = malloc(sinfo->bufsize); @@ -1167,6 +1185,7 @@ fill_another: s_buf = malloc(u_len); if (unlikely(u_len && !s_buf)) fatal("Unable to malloc buffer of size %lld in fill_buffer\n", u_len); + sinfo->ram_alloced += u_len; if (unlikely(read_buf(sinfo->fd, s_buf, c_len))) return -1; @@ -1190,11 +1209,14 @@ fill_another: s->uthread_no = s->base_thread; /* Reached the end of this stream, no more data to read in, otherwise - * see if the next thread is free to grab more data */ + * see if the next thread is free to grab more data. We also check that + * we're not going to be allocating too much ram to generate all these + * threads. */ if (!last_head) s->eos = 1; - else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy) - goto fill_another; + else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy && + sinfo->ram_alloced < control.ramsize / 3) + goto fill_another; out: lock_mutex(&output_lock); output_thread = s->unext_thread; @@ -1208,6 +1230,7 @@ out: print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread); s->buf = ucthread[s->unext_thread].s_buf; s->buflen = ucthread[s->unext_thread].u_len; + sinfo->ram_alloced -= s->buflen; s->bufp = 0; if (++s->unext_thread == s->base_thread + s->total_threads) diff --git a/util.c b/util.c index 27b8da6..66f890e 100644 --- a/util.c +++ b/util.c @@ -83,6 +83,13 @@ void sighandler() exit(0); } +void round_to_page(i64 *size) +{ + *size -= *size % control.page_size; + if (unlikely(!*size)) + *size = control.page_size; +} + void read_config( struct rzip_control *control ) { /* check for lrzip.conf in ., $HOME/.lrzip and /etc/lrzip */