diff --git a/rzip.c b/rzip.c index ea2d424..49ec57b 100644 --- a/rzip.c +++ b/rzip.c @@ -100,13 +100,6 @@ struct sliding_buffer { int fd; /* The fd of the mmap */ } sb; /* Sliding buffer */ -static void round_to_page(i64 *size) -{ - *size -= *size % control.page_size; - if (unlikely(!*size)) - *size = control.page_size; -} - static void remap_low_sb(void) { i64 new_offset; diff --git a/rzip.h b/rzip.h index f73b634..349d6e7 100644 --- a/rzip.h +++ b/rzip.h @@ -301,9 +301,11 @@ struct stream_info { int num_streams; int fd; i64 bufsize; + i64 max_bufsize; i64 cur_pos; i64 initial_pos; i64 total_read; + i64 ram_alloced; long thread_no; long next_thread; int chunks; @@ -328,6 +330,7 @@ void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, const i64 two_gig; void prepare_streamout_threads(void); void close_streamout_threads(void); +void round_to_page(i64 *size); #define print_err(format, args...) do {\ fprintf(stderr, format, ##args); \ diff --git a/stream.c b/stream.c index 5efe422..1fef9fa 100644 --- a/stream.c +++ b/stream.c @@ -718,6 +718,11 @@ void prepare_streamout_threads(void) { int i; + /* As we serialise the generation of threads during the rzip + * pre-processing stage, it's faster to have one more thread available + * to keep all CPUs busy. */ + if (control.threads > 1) + ++control.threads; threads = calloc(sizeof(pthread_t), control.threads); if (unlikely(!threads)) fatal("Unable to calloc threads in prepare_streamout_threads\n"); @@ -797,16 +802,21 @@ retest_malloc: free(testmalloc); print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", testsize); - sinfo->bufsize = limit; - - /* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the - * bufsize to fit X threads into it */ - sinfo->bufsize = MIN(sinfo->bufsize, - MAX((sinfo->bufsize + control.threads - 1) / control.threads, STREAM_BUFSIZE)); + /* We start with slightly smaller buffers to start loading CPUs as soon + * as possible and make them exponentially larger approaching the + * tested maximum size. We ensure the buffers are of a minimum size, + * though, as compression efficency drops off dramatically with tiny + * buffers. */ + if (control.threads > 1) { + sinfo->max_bufsize = limit / control.threads; + sinfo->bufsize = sinfo->max_bufsize * 63 / 100; + round_to_page(&sinfo->bufsize); + sinfo->bufsize = MAX(sinfo->bufsize, STREAM_BUFSIZE); + } if (control.threads > 1) - print_maxverbose("Using %d threads to compress up to %lld bytes each.\n", - control.threads, sinfo->bufsize); + print_maxverbose("Using up to %d threads to compress up to %lld bytes each.\n", + control.threads, sinfo->max_bufsize); else print_maxverbose("Using 1 thread to compress up to %lld bytes\n", sinfo->bufsize); @@ -831,7 +841,9 @@ void *open_stream_in(int f, int n) if (unlikely(!sinfo)) return NULL; - total_threads = control.threads + 1; + /* We have one thread dedicated to stream 0, and one more thread than + * CPUs to keep them busy. */ + total_threads = control.threads + 2; threads = calloc(sizeof(pthread_t), total_threads); if (unlikely(!threads)) return NULL; @@ -851,7 +863,7 @@ void *open_stream_in(int f, int n) } sinfo->s[0].total_threads = 1; - sinfo->s[1].total_threads = control.threads; + sinfo->s[1].total_threads = total_threads - 1; for (i = 0; i < n; i++) { uchar c; @@ -1042,6 +1054,12 @@ static void clear_buffer(struct stream_info *sinfo, int stream, int newbuf) i, cthread[i].s_len, stream); create_pthread(&threads[i], NULL, compthread, (void *)i); + if (control.threads > 1) { + sinfo->bufsize += (sinfo->max_bufsize - sinfo->bufsize) * 63 / 100; + round_to_page(&sinfo->bufsize); + sinfo->bufsize = MAX(sinfo->bufsize, STREAM_BUFSIZE); + } + if (newbuf) { /* The stream buffer has been given to the thread, allocate a new one */ sinfo->s[stream].buf = malloc(sinfo->bufsize); @@ -1167,6 +1185,7 @@ fill_another: s_buf = malloc(u_len); if (unlikely(u_len && !s_buf)) fatal("Unable to malloc buffer of size %lld in fill_buffer\n", u_len); + sinfo->ram_alloced += u_len; if (unlikely(read_buf(sinfo->fd, s_buf, c_len))) return -1; @@ -1190,11 +1209,14 @@ fill_another: s->uthread_no = s->base_thread; /* Reached the end of this stream, no more data to read in, otherwise - * see if the next thread is free to grab more data */ + * see if the next thread is free to grab more data. We also check that + * we're not going to be allocating too much ram to generate all these + * threads. */ if (!last_head) s->eos = 1; - else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy) - goto fill_another; + else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy && + sinfo->ram_alloced < control.ramsize / 3) + goto fill_another; out: lock_mutex(&output_lock); output_thread = s->unext_thread; @@ -1208,6 +1230,7 @@ out: print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread); s->buf = ucthread[s->unext_thread].s_buf; s->buflen = ucthread[s->unext_thread].u_len; + sinfo->ram_alloced -= s->buflen; s->bufp = 0; if (++s->unext_thread == s->base_thread + s->total_threads) diff --git a/util.c b/util.c index 27b8da6..66f890e 100644 --- a/util.c +++ b/util.c @@ -83,6 +83,13 @@ void sighandler() exit(0); } +void round_to_page(i64 *size) +{ + *size -= *size % control.page_size; + if (unlikely(!*size)) + *size = control.page_size; +} + void read_config( struct rzip_control *control ) { /* check for lrzip.conf in ., $HOME/.lrzip and /etc/lrzip */