From 6f2b94be3be4981e55d68e6a0993189c3d58f2f1 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 24 Nov 2010 20:12:19 +1100 Subject: [PATCH] Fix the case where a compressed file has more than one stream 0 entry per block. Limit lzma windows to 300MB in the right place on 32 bit only. Make the main process less nice than the backend threads since it tends to be the rate limiting step. --- main.c | 11 ++++++-- rzip.h | 3 +- stream.c | 86 ++++++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 71 insertions(+), 29 deletions(-) diff --git a/main.c b/main.c index 072af80..8b02e7e 100644 --- a/main.c +++ b/main.c @@ -755,8 +755,15 @@ int main(int argc, char *argv[]) } } - if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) - print_err("Warning, unable to set nice value\n"); + /* Set the main nice value to half that of the backend threads since + * the rzip stage is usually the rate limiting step */ + if (control.nice_val > 0) { + if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val / 2) == -1)) + print_err("Warning, unable to set nice value\n"); + } else { + if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) + print_err("Warning, unable to set nice value\n"); + } /* One extra iteration for the case of no parameters means we will default to stdin/out */ for (i = 0; i <= argc; i++) { diff --git a/rzip.h b/rzip.h index 6f2c269..4f4448a 100644 --- a/rzip.h +++ b/rzip.h @@ -248,6 +248,7 @@ struct stream { uchar *buf; i64 buflen; i64 bufp; + int eos; }; struct stream_info { @@ -260,8 +261,6 @@ struct stream_info { i64 total_read; long thread_no; long next_thread; - int uncomp_stream; - int eos; /* End of streams */ }; void fatal(const char *format, ...); diff --git a/stream.c b/stream.c index 0f678c1..c3635af 100644 --- a/stream.c +++ b/stream.c @@ -38,6 +38,7 @@ struct compress_thread{ struct uncomp_thread{ uchar *s_buf; i64 u_len, c_len; + i64 last_head; uchar c_type; sem_t complete; sem_t ready; /* Taken this thread's data so it can die */ @@ -713,10 +714,6 @@ retest_malloc: free(testmalloc); print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", limit * (n + 1)); - /* Largest window supported by lzma is 300MB */ - if (LZMA_COMPRESS) - limit = MIN(limit, 3 * STREAM_BUFSIZE * 10); - sinfo->bufsize = limit; /* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the @@ -724,6 +721,10 @@ retest_malloc: sinfo->bufsize = MIN(sinfo->bufsize, MAX((sinfo->bufsize + control.threads - 1) / control.threads, STREAM_BUFSIZE)); + /* Largest window supported by lzma on 32 bits is 300MB */ + if (BITS32 && LZMA_COMPRESS) + sinfo->bufsize = MIN(sinfo->bufsize, 3 * STREAM_BUFSIZE * 10); + if (control.threads > 1) print_maxverbose("Using %d threads to compress up to %lld bytes each.\n", control.threads, sinfo->bufsize); @@ -785,6 +786,9 @@ void *open_stream_in(int f, int n) return NULL; } + /* Flag that this is the start and stream 0 should be read first */ + sinfo->s[0].eos = -1; + for (i = 0; i < n; i++) { uchar c; i64 v1, v2; @@ -855,6 +859,9 @@ static void *compthread(void *t) long i = (long)t; struct compress_thread *cti = &cthread[i]; + if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) + print_err("Warning, unable to set nice value on thread\n"); + cti->c_type = CTYPE_NONE; cti->c_len = cti->s_len; @@ -940,6 +947,9 @@ static void *ucompthread(void *t) long i = (long)t; struct uncomp_thread *uci = &ucthread[i]; + if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) + print_err("Warning, unable to set nice value on thread\n"); + if (uci->c_type != CTYPE_NONE) { if (uci->c_type == CTYPE_LZMA) { if (unlikely(lzma_decompress_buf(uci))) @@ -960,29 +970,46 @@ static void *ucompthread(void *t) } post_sem(&uci->complete); wait_sem(&uci->ready); - print_maxverbose("Thread %ld returning %lld uncompressed bytes\n", i, uci->u_len); + print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream); post_sem(&uci->free); return 0; } /* fill a buffer from a stream - return -1 on failure */ -static int fill_buffer(struct stream_info *sinfo, int stream) +static int fill_buffer(struct stream_info *sinfo, int ret_stream) { i64 header_length, u_len, c_len, last_head; uchar c_type, *s_buf; - int *ucs = &sinfo->uncomp_stream; + static int stream = 0, last_block = 0; - if (sinfo->s[stream].buf) - free(sinfo->s[stream].buf); - if (sinfo->eos) - goto out; + if (sinfo->s[ret_stream].buf) + free(sinfo->s[ret_stream].buf); fill_another: - if (unlikely(seekto(sinfo, sinfo->s[*ucs].last_head))) + /* The first time we starting filling the buffer, the first two chunks + * will be stream 0, then 1. After that we keep getting stream 1 until + * stream 1 has last_head == total_read, then there will be one more + * block of stream 1. If the next last_head is 0, the stream has ended + * otherwise we get one block of stream 0 before going back to stream + * 1 */ + if (sinfo->s[0].eos == 1 && sinfo->s[1].eos == 1) + goto out; + if (sinfo->s[0].eos == -1) { + last_block = 0; + sinfo->s[0].eos = 0; + stream = 0; + } else if (sinfo->s[0].eos == 1) { + stream = 1; + } else if (sinfo->s[1].eos == 1) { + stream = 0; + } + + if (unlikely(seekto(sinfo, sinfo->s[stream].last_head))) return -1; if (unlikely(read_u8(sinfo->fd, &c_type))) return -1; + /* Compatibility crap for versions < 0.4 */ if (control.major_version == 0 && control.minor_version < 4) { u32 c_len32, u_len32, last_head32; @@ -1025,32 +1052,41 @@ fill_another: ucthread[sinfo->thread_no].c_len = c_len; ucthread[sinfo->thread_no].u_len = u_len; ucthread[sinfo->thread_no].c_type = c_type; - ucthread[sinfo->thread_no].stream = *ucs; - sinfo->s[*ucs].last_head = last_head; + ucthread[sinfo->thread_no].stream = stream; + sinfo->s[stream].last_head = last_head; print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n", - sinfo->thread_no, c_len, *ucs); + sinfo->thread_no, c_len, stream); create_pthread(&threads[sinfo->thread_no], NULL, ucompthread, (void *)sinfo->thread_no); - if (!last_head) { - if (sinfo->uncomp_stream < NUM_STREAMS - 1) - sinfo->uncomp_stream++; - else - sinfo->eos = 1; - } + if (!last_head) + sinfo->s[stream].eos = 1; + + if (stream == 1) { + if (!last_block) { + if (last_head == sinfo->total_read) + last_block = 1; + } else { + last_block = 0; + stream = 0; + } + } else + stream = 1; if (++sinfo->thread_no == control.threads) sinfo->thread_no = 0; - if (!sinfo->eos && !trywait_sem(&ucthread[sinfo->thread_no].free)) { + if (!trywait_sem(&ucthread[sinfo->thread_no].free)) { post_sem(&ucthread[sinfo->thread_no].free); goto fill_another; } out: wait_sem(&ucthread[sinfo->next_thread].complete); - sinfo->s[stream].buf = ucthread[sinfo->next_thread].s_buf; - sinfo->s[stream].buflen = ucthread[sinfo->next_thread].u_len; - sinfo->s[stream].bufp = 0; + if (ret_stream != ucthread[sinfo->next_thread].stream) + fatal("Uncompressed stream doesn't match! Should be %d but is %d\n", ret_stream, ucthread[sinfo->next_thread].stream); + sinfo->s[ret_stream].buf = ucthread[sinfo->next_thread].s_buf; + sinfo->s[ret_stream].buflen = ucthread[sinfo->next_thread].u_len; + sinfo->s[ret_stream].bufp = 0; post_sem(&ucthread[sinfo->next_thread].ready);