Fix the case where a compressed file has more than one stream 0 entry per block.

Limit lzma windows to 300MB in the right place on 32 bit only.
Make the main process less nice than the backend threads since it tends to be the rate limiting step.
This commit is contained in:
Con Kolivas 2010-11-24 20:12:19 +11:00
parent 180f5340fe
commit 6f2b94be3b
3 changed files with 71 additions and 29 deletions

11
main.c
View file

@ -755,8 +755,15 @@ int main(int argc, char *argv[])
}
}
if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1))
print_err("Warning, unable to set nice value\n");
/* Set the main nice value to half that of the backend threads since
* the rzip stage is usually the rate limiting step */
if (control.nice_val > 0) {
if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val / 2) == -1))
print_err("Warning, unable to set nice value\n");
} else {
if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1))
print_err("Warning, unable to set nice value\n");
}
/* One extra iteration for the case of no parameters means we will default to stdin/out */
for (i = 0; i <= argc; i++) {

3
rzip.h
View file

@ -248,6 +248,7 @@ struct stream {
uchar *buf;
i64 buflen;
i64 bufp;
int eos;
};
struct stream_info {
@ -260,8 +261,6 @@ struct stream_info {
i64 total_read;
long thread_no;
long next_thread;
int uncomp_stream;
int eos; /* End of streams */
};
void fatal(const char *format, ...);

View file

@ -38,6 +38,7 @@ struct compress_thread{
struct uncomp_thread{
uchar *s_buf;
i64 u_len, c_len;
i64 last_head;
uchar c_type;
sem_t complete;
sem_t ready; /* Taken this thread's data so it can die */
@ -713,10 +714,6 @@ retest_malloc:
free(testmalloc);
print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", limit * (n + 1));
/* Largest window supported by lzma is 300MB */
if (LZMA_COMPRESS)
limit = MIN(limit, 3 * STREAM_BUFSIZE * 10);
sinfo->bufsize = limit;
/* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the
@ -724,6 +721,10 @@ retest_malloc:
sinfo->bufsize = MIN(sinfo->bufsize,
MAX((sinfo->bufsize + control.threads - 1) / control.threads, STREAM_BUFSIZE));
/* Largest window supported by lzma on 32 bits is 300MB */
if (BITS32 && LZMA_COMPRESS)
sinfo->bufsize = MIN(sinfo->bufsize, 3 * STREAM_BUFSIZE * 10);
if (control.threads > 1)
print_maxverbose("Using %d threads to compress up to %lld bytes each.\n",
control.threads, sinfo->bufsize);
@ -785,6 +786,9 @@ void *open_stream_in(int f, int n)
return NULL;
}
/* Flag that this is the start and stream 0 should be read first */
sinfo->s[0].eos = -1;
for (i = 0; i < n; i++) {
uchar c;
i64 v1, v2;
@ -855,6 +859,9 @@ static void *compthread(void *t)
long i = (long)t;
struct compress_thread *cti = &cthread[i];
if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1))
print_err("Warning, unable to set nice value on thread\n");
cti->c_type = CTYPE_NONE;
cti->c_len = cti->s_len;
@ -940,6 +947,9 @@ static void *ucompthread(void *t)
long i = (long)t;
struct uncomp_thread *uci = &ucthread[i];
if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1))
print_err("Warning, unable to set nice value on thread\n");
if (uci->c_type != CTYPE_NONE) {
if (uci->c_type == CTYPE_LZMA) {
if (unlikely(lzma_decompress_buf(uci)))
@ -960,29 +970,46 @@ static void *ucompthread(void *t)
}
post_sem(&uci->complete);
wait_sem(&uci->ready);
print_maxverbose("Thread %ld returning %lld uncompressed bytes\n", i, uci->u_len);
print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream);
post_sem(&uci->free);
return 0;
}
/* fill a buffer from a stream - return -1 on failure */
static int fill_buffer(struct stream_info *sinfo, int stream)
static int fill_buffer(struct stream_info *sinfo, int ret_stream)
{
i64 header_length, u_len, c_len, last_head;
uchar c_type, *s_buf;
int *ucs = &sinfo->uncomp_stream;
static int stream = 0, last_block = 0;
if (sinfo->s[stream].buf)
free(sinfo->s[stream].buf);
if (sinfo->eos)
goto out;
if (sinfo->s[ret_stream].buf)
free(sinfo->s[ret_stream].buf);
fill_another:
if (unlikely(seekto(sinfo, sinfo->s[*ucs].last_head)))
/* The first time we starting filling the buffer, the first two chunks
* will be stream 0, then 1. After that we keep getting stream 1 until
* stream 1 has last_head == total_read, then there will be one more
* block of stream 1. If the next last_head is 0, the stream has ended
* otherwise we get one block of stream 0 before going back to stream
* 1 */
if (sinfo->s[0].eos == 1 && sinfo->s[1].eos == 1)
goto out;
if (sinfo->s[0].eos == -1) {
last_block = 0;
sinfo->s[0].eos = 0;
stream = 0;
} else if (sinfo->s[0].eos == 1) {
stream = 1;
} else if (sinfo->s[1].eos == 1) {
stream = 0;
}
if (unlikely(seekto(sinfo, sinfo->s[stream].last_head)))
return -1;
if (unlikely(read_u8(sinfo->fd, &c_type)))
return -1;
/* Compatibility crap for versions < 0.4 */
if (control.major_version == 0 && control.minor_version < 4) {
u32 c_len32, u_len32, last_head32;
@ -1025,32 +1052,41 @@ fill_another:
ucthread[sinfo->thread_no].c_len = c_len;
ucthread[sinfo->thread_no].u_len = u_len;
ucthread[sinfo->thread_no].c_type = c_type;
ucthread[sinfo->thread_no].stream = *ucs;
sinfo->s[*ucs].last_head = last_head;
ucthread[sinfo->thread_no].stream = stream;
sinfo->s[stream].last_head = last_head;
print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
sinfo->thread_no, c_len, *ucs);
sinfo->thread_no, c_len, stream);
create_pthread(&threads[sinfo->thread_no], NULL, ucompthread, (void *)sinfo->thread_no);
if (!last_head) {
if (sinfo->uncomp_stream < NUM_STREAMS - 1)
sinfo->uncomp_stream++;
else
sinfo->eos = 1;
}
if (!last_head)
sinfo->s[stream].eos = 1;
if (stream == 1) {
if (!last_block) {
if (last_head == sinfo->total_read)
last_block = 1;
} else {
last_block = 0;
stream = 0;
}
} else
stream = 1;
if (++sinfo->thread_no == control.threads)
sinfo->thread_no = 0;
if (!sinfo->eos && !trywait_sem(&ucthread[sinfo->thread_no].free)) {
if (!trywait_sem(&ucthread[sinfo->thread_no].free)) {
post_sem(&ucthread[sinfo->thread_no].free);
goto fill_another;
}
out:
wait_sem(&ucthread[sinfo->next_thread].complete);
sinfo->s[stream].buf = ucthread[sinfo->next_thread].s_buf;
sinfo->s[stream].buflen = ucthread[sinfo->next_thread].u_len;
sinfo->s[stream].bufp = 0;
if (ret_stream != ucthread[sinfo->next_thread].stream)
fatal("Uncompressed stream doesn't match! Should be %d but is %d\n", ret_stream, ucthread[sinfo->next_thread].stream);
sinfo->s[ret_stream].buf = ucthread[sinfo->next_thread].s_buf;
sinfo->s[ret_stream].buflen = ucthread[sinfo->next_thread].u_len;
sinfo->s[ret_stream].bufp = 0;
post_sem(&ucthread[sinfo->next_thread].ready);