Multi-threading speed ups.

Add one more thread on compression and decompression to account for the staggered nature of thread recruitment.
Make the initial buffer slightly smaller and make it progressively larger, thus recruiting threads sooner and more evenly.
This also speeds up decompression for the same reason.
Check the amount of memory being used by each thread on decompression to ensure we don't try to recruit too much ram.
This commit is contained in:
Con Kolivas 2011-02-22 00:49:50 +11:00
parent 88e3df6af1
commit bb33f7571c
4 changed files with 46 additions and 20 deletions

7
rzip.c
View file

@ -100,13 +100,6 @@ struct sliding_buffer {
int fd; /* The fd of the mmap */
} sb; /* Sliding buffer */
static void round_to_page(i64 *size)
{
*size -= *size % control.page_size;
if (unlikely(!*size))
*size = control.page_size;
}
static void remap_low_sb(void)
{
i64 new_offset;

3
rzip.h
View file

@ -301,9 +301,11 @@ struct stream_info {
int num_streams;
int fd;
i64 bufsize;
i64 max_bufsize;
i64 cur_pos;
i64 initial_pos;
i64 total_read;
i64 ram_alloced;
long thread_no;
long next_thread;
int chunks;
@ -328,6 +330,7 @@ void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,
const i64 two_gig;
void prepare_streamout_threads(void);
void close_streamout_threads(void);
void round_to_page(i64 *size);
#define print_err(format, args...) do {\
fprintf(stderr, format, ##args); \

View file

@ -718,6 +718,11 @@ void prepare_streamout_threads(void)
{
int i;
/* As we serialise the generation of threads during the rzip
* pre-processing stage, it's faster to have one more thread available
* to keep all CPUs busy. */
if (control.threads > 1)
++control.threads;
threads = calloc(sizeof(pthread_t), control.threads);
if (unlikely(!threads))
fatal("Unable to calloc threads in prepare_streamout_threads\n");
@ -797,16 +802,21 @@ retest_malloc:
free(testmalloc);
print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", testsize);
sinfo->bufsize = limit;
/* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the
* bufsize to fit X threads into it */
sinfo->bufsize = MIN(sinfo->bufsize,
MAX((sinfo->bufsize + control.threads - 1) / control.threads, STREAM_BUFSIZE));
/* We start with slightly smaller buffers to start loading CPUs as soon
* as possible and make them exponentially larger approaching the
* tested maximum size. We ensure the buffers are of a minimum size,
* though, as compression efficency drops off dramatically with tiny
* buffers. */
if (control.threads > 1) {
sinfo->max_bufsize = limit / control.threads;
sinfo->bufsize = sinfo->max_bufsize * 63 / 100;
round_to_page(&sinfo->bufsize);
sinfo->bufsize = MAX(sinfo->bufsize, STREAM_BUFSIZE);
}
if (control.threads > 1)
print_maxverbose("Using %d threads to compress up to %lld bytes each.\n",
control.threads, sinfo->bufsize);
print_maxverbose("Using up to %d threads to compress up to %lld bytes each.\n",
control.threads, sinfo->max_bufsize);
else
print_maxverbose("Using 1 thread to compress up to %lld bytes\n",
sinfo->bufsize);
@ -831,7 +841,9 @@ void *open_stream_in(int f, int n)
if (unlikely(!sinfo))
return NULL;
total_threads = control.threads + 1;
/* We have one thread dedicated to stream 0, and one more thread than
* CPUs to keep them busy. */
total_threads = control.threads + 2;
threads = calloc(sizeof(pthread_t), total_threads);
if (unlikely(!threads))
return NULL;
@ -851,7 +863,7 @@ void *open_stream_in(int f, int n)
}
sinfo->s[0].total_threads = 1;
sinfo->s[1].total_threads = control.threads;
sinfo->s[1].total_threads = total_threads - 1;
for (i = 0; i < n; i++) {
uchar c;
@ -1042,6 +1054,12 @@ static void clear_buffer(struct stream_info *sinfo, int stream, int newbuf)
i, cthread[i].s_len, stream);
create_pthread(&threads[i], NULL, compthread, (void *)i);
if (control.threads > 1) {
sinfo->bufsize += (sinfo->max_bufsize - sinfo->bufsize) * 63 / 100;
round_to_page(&sinfo->bufsize);
sinfo->bufsize = MAX(sinfo->bufsize, STREAM_BUFSIZE);
}
if (newbuf) {
/* The stream buffer has been given to the thread, allocate a new one */
sinfo->s[stream].buf = malloc(sinfo->bufsize);
@ -1167,6 +1185,7 @@ fill_another:
s_buf = malloc(u_len);
if (unlikely(u_len && !s_buf))
fatal("Unable to malloc buffer of size %lld in fill_buffer\n", u_len);
sinfo->ram_alloced += u_len;
if (unlikely(read_buf(sinfo->fd, s_buf, c_len)))
return -1;
@ -1190,11 +1209,14 @@ fill_another:
s->uthread_no = s->base_thread;
/* Reached the end of this stream, no more data to read in, otherwise
* see if the next thread is free to grab more data */
* see if the next thread is free to grab more data. We also check that
* we're not going to be allocating too much ram to generate all these
* threads. */
if (!last_head)
s->eos = 1;
else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy)
goto fill_another;
else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy &&
sinfo->ram_alloced < control.ramsize / 3)
goto fill_another;
out:
lock_mutex(&output_lock);
output_thread = s->unext_thread;
@ -1208,6 +1230,7 @@ out:
print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread);
s->buf = ucthread[s->unext_thread].s_buf;
s->buflen = ucthread[s->unext_thread].u_len;
sinfo->ram_alloced -= s->buflen;
s->bufp = 0;
if (++s->unext_thread == s->base_thread + s->total_threads)

7
util.c
View file

@ -83,6 +83,13 @@ void sighandler()
exit(0);
}
void round_to_page(i64 *size)
{
*size -= *size % control.page_size;
if (unlikely(!*size))
*size = control.page_size;
}
void read_config( struct rzip_control *control )
{
/* check for lrzip.conf in ., $HOME/.lrzip and /etc/lrzip */