diff --git a/main.c b/main.c index 072af80..8b02e7e 100644 --- a/main.c +++ b/main.c @@ -755,8 +755,15 @@ int main(int argc, char *argv[]) } } - if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) - print_err("Warning, unable to set nice value\n"); + /* Set the main nice value to half that of the backend threads since + * the rzip stage is usually the rate limiting step */ + if (control.nice_val > 0) { + if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val / 2) == -1)) + print_err("Warning, unable to set nice value\n"); + } else { + if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) + print_err("Warning, unable to set nice value\n"); + } /* One extra iteration for the case of no parameters means we will default to stdin/out */ for (i = 0; i <= argc; i++) { diff --git a/rzip.h b/rzip.h index 6f2c269..4f4448a 100644 --- a/rzip.h +++ b/rzip.h @@ -248,6 +248,7 @@ struct stream { uchar *buf; i64 buflen; i64 bufp; + int eos; }; struct stream_info { @@ -260,8 +261,6 @@ struct stream_info { i64 total_read; long thread_no; long next_thread; - int uncomp_stream; - int eos; /* End of streams */ }; void fatal(const char *format, ...); diff --git a/stream.c b/stream.c index 0f678c1..c3635af 100644 --- a/stream.c +++ b/stream.c @@ -38,6 +38,7 @@ struct compress_thread{ struct uncomp_thread{ uchar *s_buf; i64 u_len, c_len; + i64 last_head; uchar c_type; sem_t complete; sem_t ready; /* Taken this thread's data so it can die */ @@ -713,10 +714,6 @@ retest_malloc: free(testmalloc); print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", limit * (n + 1)); - /* Largest window supported by lzma is 300MB */ - if (LZMA_COMPRESS) - limit = MIN(limit, 3 * STREAM_BUFSIZE * 10); - sinfo->bufsize = limit; /* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the @@ -724,6 +721,10 @@ retest_malloc: sinfo->bufsize = MIN(sinfo->bufsize, MAX((sinfo->bufsize + control.threads - 1) / control.threads, STREAM_BUFSIZE)); + /* Largest window supported by lzma on 32 bits is 300MB */ + if (BITS32 && LZMA_COMPRESS) + sinfo->bufsize = MIN(sinfo->bufsize, 3 * STREAM_BUFSIZE * 10); + if (control.threads > 1) print_maxverbose("Using %d threads to compress up to %lld bytes each.\n", control.threads, sinfo->bufsize); @@ -785,6 +786,9 @@ void *open_stream_in(int f, int n) return NULL; } + /* Flag that this is the start and stream 0 should be read first */ + sinfo->s[0].eos = -1; + for (i = 0; i < n; i++) { uchar c; i64 v1, v2; @@ -855,6 +859,9 @@ static void *compthread(void *t) long i = (long)t; struct compress_thread *cti = &cthread[i]; + if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) + print_err("Warning, unable to set nice value on thread\n"); + cti->c_type = CTYPE_NONE; cti->c_len = cti->s_len; @@ -940,6 +947,9 @@ static void *ucompthread(void *t) long i = (long)t; struct uncomp_thread *uci = &ucthread[i]; + if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) + print_err("Warning, unable to set nice value on thread\n"); + if (uci->c_type != CTYPE_NONE) { if (uci->c_type == CTYPE_LZMA) { if (unlikely(lzma_decompress_buf(uci))) @@ -960,29 +970,46 @@ static void *ucompthread(void *t) } post_sem(&uci->complete); wait_sem(&uci->ready); - print_maxverbose("Thread %ld returning %lld uncompressed bytes\n", i, uci->u_len); + print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream); post_sem(&uci->free); return 0; } /* fill a buffer from a stream - return -1 on failure */ -static int fill_buffer(struct stream_info *sinfo, int stream) +static int fill_buffer(struct stream_info *sinfo, int ret_stream) { i64 header_length, u_len, c_len, last_head; uchar c_type, *s_buf; - int *ucs = &sinfo->uncomp_stream; + static int stream = 0, last_block = 0; - if (sinfo->s[stream].buf) - free(sinfo->s[stream].buf); - if (sinfo->eos) - goto out; + if (sinfo->s[ret_stream].buf) + free(sinfo->s[ret_stream].buf); fill_another: - if (unlikely(seekto(sinfo, sinfo->s[*ucs].last_head))) + /* The first time we starting filling the buffer, the first two chunks + * will be stream 0, then 1. After that we keep getting stream 1 until + * stream 1 has last_head == total_read, then there will be one more + * block of stream 1. If the next last_head is 0, the stream has ended + * otherwise we get one block of stream 0 before going back to stream + * 1 */ + if (sinfo->s[0].eos == 1 && sinfo->s[1].eos == 1) + goto out; + if (sinfo->s[0].eos == -1) { + last_block = 0; + sinfo->s[0].eos = 0; + stream = 0; + } else if (sinfo->s[0].eos == 1) { + stream = 1; + } else if (sinfo->s[1].eos == 1) { + stream = 0; + } + + if (unlikely(seekto(sinfo, sinfo->s[stream].last_head))) return -1; if (unlikely(read_u8(sinfo->fd, &c_type))) return -1; + /* Compatibility crap for versions < 0.4 */ if (control.major_version == 0 && control.minor_version < 4) { u32 c_len32, u_len32, last_head32; @@ -1025,32 +1052,41 @@ fill_another: ucthread[sinfo->thread_no].c_len = c_len; ucthread[sinfo->thread_no].u_len = u_len; ucthread[sinfo->thread_no].c_type = c_type; - ucthread[sinfo->thread_no].stream = *ucs; - sinfo->s[*ucs].last_head = last_head; + ucthread[sinfo->thread_no].stream = stream; + sinfo->s[stream].last_head = last_head; print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n", - sinfo->thread_no, c_len, *ucs); + sinfo->thread_no, c_len, stream); create_pthread(&threads[sinfo->thread_no], NULL, ucompthread, (void *)sinfo->thread_no); - if (!last_head) { - if (sinfo->uncomp_stream < NUM_STREAMS - 1) - sinfo->uncomp_stream++; - else - sinfo->eos = 1; - } + if (!last_head) + sinfo->s[stream].eos = 1; + + if (stream == 1) { + if (!last_block) { + if (last_head == sinfo->total_read) + last_block = 1; + } else { + last_block = 0; + stream = 0; + } + } else + stream = 1; if (++sinfo->thread_no == control.threads) sinfo->thread_no = 0; - if (!sinfo->eos && !trywait_sem(&ucthread[sinfo->thread_no].free)) { + if (!trywait_sem(&ucthread[sinfo->thread_no].free)) { post_sem(&ucthread[sinfo->thread_no].free); goto fill_another; } out: wait_sem(&ucthread[sinfo->next_thread].complete); - sinfo->s[stream].buf = ucthread[sinfo->next_thread].s_buf; - sinfo->s[stream].buflen = ucthread[sinfo->next_thread].u_len; - sinfo->s[stream].bufp = 0; + if (ret_stream != ucthread[sinfo->next_thread].stream) + fatal("Uncompressed stream doesn't match! Should be %d but is %d\n", ret_stream, ucthread[sinfo->next_thread].stream); + sinfo->s[ret_stream].buf = ucthread[sinfo->next_thread].s_buf; + sinfo->s[ret_stream].buflen = ucthread[sinfo->next_thread].u_len; + sinfo->s[ret_stream].bufp = 0; post_sem(&ucthread[sinfo->next_thread].ready);