diff --git a/runzip.c b/runzip.c index d698dfa..93b2fca 100644 --- a/runzip.c +++ b/runzip.c @@ -197,7 +197,7 @@ static i64 runzip_chunk(int fd_in, int fd_out, int fd_hist, i64 expected_size, i break; } p = 100 * ((double)(tally + total) / (double)expected_size); - if (p != l) { + if (p / 10 != l / 10) { prog_done = (double)(tally + total) / (double)divisor[divisor_index]; print_progress("%3d%% %9.2f / %9.2f %s\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b", p, prog_done, prog_tsize, suffix[divisor_index] ); diff --git a/rzip.h b/rzip.h index 6fbabb3..3db3237 100644 --- a/rzip.h +++ b/rzip.h @@ -257,6 +257,9 @@ struct stream_info { i64 initial_pos; i64 total_read; long thread_no; + long next_thread; + int uncomp_stream; + int eos; /* End of streams */ }; void fatal(const char *format, ...); @@ -274,7 +277,7 @@ void read_config(struct rzip_control *s); ssize_t write_1g(int fd, void *buf, i64 len); ssize_t read_1g(int fd, void *buf, i64 len); void zpipe_compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress, long thread); -void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress); +void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress, long thread); const i64 two_gig; #define print_err(format, args...) do {\ diff --git a/stream.c b/stream.c index 0c6d618..d981cb4 100644 --- a/stream.c +++ b/stream.c @@ -35,6 +35,16 @@ struct compress_thread{ int stream; } *cthread; +struct uncomp_thread{ + uchar *s_buf; + i64 u_len, c_len; + uchar c_type; + sem_t complete; + sem_t ready; /* Taken this thread's data so it can die */ + sem_t free; + int stream; +} *ucthread; + void init_sem(sem_t *sem) { if (unlikely(sem_init(sem, 0, 0))) @@ -61,6 +71,20 @@ retry: } } +static inline int trywait_sem(sem_t *s) +{ + int ret; + +retry: + if ((ret = sem_trywait(s)) == -1) { + if (errno == EINTR) + goto retry; + if (errno != EAGAIN) + fatal("sem_trywait"); + } + return ret; +} + static inline void destroy_sem(sem_t *s) { if (unlikely(sem_destroy(s))) @@ -156,8 +180,8 @@ static void zpaq_compress_buf(struct compress_thread *cthread, long thread) if (unlikely(!out)) fatal("Failed to open_memstream in zpaq_compress_buf\n"); - zpipe_compress(in, out, control.msgout, cthread->s_len, (int)(SHOW_PROGRESS), - thread); + zpipe_compress(in, out, control.msgout, cthread->s_len, + (int)(SHOW_PROGRESS), thread); if (unlikely(memstream_update_buffer(out, &c_buf, &dlen))) fatal("Failed to memstream_update_buffer in zpaq_compress_buf"); @@ -253,7 +277,7 @@ static void lzma_compress_buf(struct compress_thread *cthread) if (!c_buf) return; - print_progress("Starting lzma back end compression thread...\n"); + print_verbose("Starting lzma back end compression thread...\n"); /* with LZMA SDK 4.63, we pass compression level and threads only * and receive properties in control->lzma_properties */ @@ -339,13 +363,13 @@ out_free: try to decompress a buffer. Return 0 on success and -1 on failure. */ -static int zpaq_decompress_buf(struct stream *s) +static int zpaq_decompress_buf(struct uncomp_thread *ucthread, long thread) { uchar *c_buf = NULL; size_t dlen = 0; FILE *in, *out; - in = fmemopen(s->buf, s->buflen, "r"); + in = fmemopen(ucthread->s_buf, ucthread->u_len, "r"); if (unlikely(!in)) { print_err("Failed to fmemopen in zpaq_decompress_buf\n"); return -1; @@ -356,45 +380,45 @@ static int zpaq_decompress_buf(struct stream *s) return -1; } - zpipe_decompress(in, out, control.msgout, s->buflen, (int)(SHOW_PROGRESS)); + zpipe_decompress(in, out, control.msgout, ucthread->u_len, (int)(SHOW_PROGRESS), thread); if (unlikely(memstream_update_buffer(out, &c_buf, &dlen))) fatal("Failed to memstream_update_buffer in zpaq_decompress_buf"); fclose(in); fclose(out); - free(s->buf); - s->buf = c_buf; + free(ucthread->s_buf); + ucthread->s_buf = c_buf; - if (unlikely((i64)dlen != s->buflen)) { - print_err("Inconsistent length after decompression. Got %lld bytes, expected %lld\n", (i64)dlen, s->buflen); + if (unlikely((i64)dlen != ucthread->u_len)) { + print_err("Inconsistent length after decompression. Got %lld bytes, expected %lld\n", (i64)dlen, ucthread->u_len); return -1; } return 0; } -static int bzip2_decompress_buf(struct stream *s, i64 c_len) +static int bzip2_decompress_buf(struct uncomp_thread *ucthread) { - u32 dlen = s->buflen; + u32 dlen = ucthread->u_len; uchar *c_buf; int bzerr; - c_buf = s->buf; - s->buf = malloc(dlen); - if (unlikely(!s->buf)) { + c_buf = ucthread->s_buf; + ucthread->s_buf = malloc(dlen); + if (unlikely(!ucthread->s_buf)) { print_err("Failed to allocate %d bytes for decompression\n", dlen); return -1; } - bzerr = BZ2_bzBuffToBuffDecompress((char*)s->buf, &dlen, (char*)c_buf, c_len, 0, 0); + bzerr = BZ2_bzBuffToBuffDecompress((char*)ucthread->s_buf, &dlen, (char*)c_buf, ucthread->c_len, 0, 0); if (unlikely(bzerr != BZ_OK)) { print_err("Failed to decompress buffer - bzerr=%d\n", bzerr); return -1; } - if (unlikely(dlen != s->buflen)) { - print_err("Inconsistent length after decompression. Got %d bytes, expected %lld\n", dlen, s->buflen); + if (unlikely(dlen != ucthread->u_len)) { + print_err("Inconsistent length after decompression. Got %d bytes, expected %lld\n", dlen, ucthread->u_len); return -1; } @@ -402,27 +426,27 @@ static int bzip2_decompress_buf(struct stream *s, i64 c_len) return 0; } -static int gzip_decompress_buf(struct stream *s, i64 c_len) +static int gzip_decompress_buf(struct uncomp_thread *ucthread) { - unsigned long dlen = s->buflen; + unsigned long dlen = ucthread->u_len; uchar *c_buf; int gzerr; - c_buf = s->buf; - s->buf = malloc(dlen); - if (unlikely(!s->buf)) { + c_buf = ucthread->s_buf; + ucthread->s_buf = malloc(dlen); + if (unlikely(!ucthread->s_buf)) { print_err("Failed to allocate %ld bytes for decompression\n", dlen); return -1; } - gzerr = uncompress(s->buf, &dlen, c_buf, c_len); + gzerr = uncompress(ucthread->s_buf, &dlen, c_buf, ucthread->c_len); if (unlikely(gzerr != Z_OK)) { print_err("Failed to decompress buffer - bzerr=%d\n", gzerr); return -1; } - if (unlikely((i64)dlen != s->buflen)) { - print_err("Inconsistent length after decompression. Got %ld bytes, expected %lld\n", dlen, s->buflen); + if (unlikely((i64)dlen != ucthread->u_len)) { + print_err("Inconsistent length after decompression. Got %ld bytes, expected %lld\n", dlen, ucthread->u_len); return -1; } @@ -430,29 +454,29 @@ static int gzip_decompress_buf(struct stream *s, i64 c_len) return 0; } -static int lzma_decompress_buf(struct stream *s, size_t c_len) +static int lzma_decompress_buf(struct uncomp_thread *ucthread) { - size_t dlen = (size_t)s->buflen; + size_t dlen = (size_t)ucthread->u_len; uchar *c_buf; int lzmaerr; - c_buf = s->buf; - s->buf = malloc(dlen); - if (unlikely(!s->buf)) { + c_buf = ucthread->s_buf; + ucthread->s_buf = malloc(dlen); + if (unlikely(!ucthread->s_buf)) { print_err("Failed to allocate %lldd bytes for decompression\n", (i64)dlen); return -1; } /* With LZMA SDK 4.63 we pass control.lzma_properties * which is needed for proper uncompress */ - lzmaerr = LzmaUncompress(s->buf, &dlen, c_buf, &c_len, control.lzma_properties, 5); + lzmaerr = LzmaUncompress(ucthread->s_buf, &dlen, c_buf, (SizeT *)&ucthread->c_len, control.lzma_properties, 5); if (unlikely(lzmaerr)) { print_err("Failed to decompress buffer - lzmaerr=%d\n", lzmaerr); return -1; } - if (unlikely((i64)dlen != s->buflen)) { - print_err("Inconsistent length after decompression. Got %lld bytes, expected %lld\n", (i64)dlen, s->buflen); + if (unlikely((i64)dlen != ucthread->u_len)) { + print_err("Inconsistent length after decompression. Got %lld bytes, expected %lld\n", (i64)dlen, ucthread->u_len); return -1; } @@ -460,27 +484,27 @@ static int lzma_decompress_buf(struct stream *s, size_t c_len) return 0; } -static int lzo_decompress_buf(struct stream *s, i64 c_len) +static int lzo_decompress_buf(struct uncomp_thread *ucthread) { - lzo_uint dlen = s->buflen; + lzo_uint dlen = ucthread->u_len; uchar *c_buf; int lzerr; - c_buf = s->buf; - s->buf = malloc(dlen); - if (unlikely(!s->buf)) { + c_buf = ucthread->s_buf; + ucthread->s_buf = malloc(dlen); + if (unlikely(!ucthread->s_buf)) { print_err("Failed to allocate %lu bytes for decompression\n", (unsigned long)dlen); return -1; } - lzerr = lzo1x_decompress((uchar*)c_buf,c_len,(uchar*)s->buf,&dlen,NULL); + lzerr = lzo1x_decompress((uchar*)c_buf, ucthread->c_len, (uchar*)ucthread->s_buf, &dlen,NULL); if (unlikely(lzerr != LZO_E_OK)) { print_err("Failed to decompress buffer - lzerr=%d\n", lzerr); return -1; } - if (unlikely((i64)dlen != s->buflen)) { - print_err("Inconsistent length after decompression. Got %lu bytes, expected %lld\n", (unsigned long)dlen, s->buflen); + if (unlikely((i64)dlen != ucthread->u_len)) { + print_err("Inconsistent length after decompression. Got %lu bytes, expected %lld\n", (unsigned long)dlen, ucthread->u_len); return -1; } @@ -619,6 +643,8 @@ static int seekto(struct stream_info *sinfo, i64 pos) return 0; } +static pthread_t *threads; + /* open a set of output streams, compressing with the given compression level and algorithm */ void *open_stream_out(int f, int n, i64 limit) @@ -632,6 +658,10 @@ void *open_stream_out(int f, int n, i64 limit) if (unlikely(!sinfo)) return NULL; + threads = (pthread_t *)calloc(sizeof(pthread_t), control.threads); + if (unlikely(!threads)) + return NULL; + sinfo->bufsize = limit; sinfo->thread_no = 0; @@ -663,6 +693,7 @@ void *open_stream_out(int f, int n, i64 limit) /* Largest window supported on 32bit is 600MB */ if (!cwindow || cwindow > 6) cwindow = 6; + control.window = cwindow; } /* No point making the stream larger than the amount of data */ @@ -683,7 +714,7 @@ void *open_stream_out(int f, int n, i64 limit) * ram. We need enough for the 2 streams and for the compression * backend at most, being conservative. */ retest_malloc: - testmalloc = malloc(sinfo->bufsize * (n + 1)); + testmalloc = malloc(sinfo->bufsize * (n + 2)); if (!testmalloc) { sinfo->bufsize = sinfo->bufsize / 10 * 9; goto retest_malloc; @@ -731,6 +762,21 @@ void *open_stream_in(int f, int n) if (unlikely(!sinfo)) return NULL; + threads = (pthread_t *)calloc(sizeof(pthread_t), control.threads); + if (unlikely(!threads)) + return NULL; + + ucthread = calloc(sizeof(struct uncomp_thread), control.threads); + if (unlikely(!ucthread)) + fatal("Unable to calloc cthread in open_stream_out\n"); + + for (i = 0; i < control.threads; i++) { + init_sem(&ucthread[i].complete); + init_sem(&ucthread[i].free); + post_sem(&ucthread[i].free); + init_sem(&ucthread[i].ready); + } + sinfo->num_streams = n; sinfo->fd = f; sinfo->initial_pos = lseek(f, 0, SEEK_CUR); @@ -806,7 +852,7 @@ failed: /* Enter with s_buf allocated,s_buf points to the compressed data after the * backend compression and is then freed here */ -void *compthread(void *t) +static void *compthread(void *t) { long i = (long)t; struct compress_thread *cti = &cthread[i]; @@ -867,7 +913,6 @@ void *compthread(void *t) /* flush out any data in a stream buffer */ void flush_buffer(struct stream_info *sinfo, int stream) { - pthread_t threads[control.threads]; long i = sinfo->thread_no; /* Make sure this thread doesn't already exist */ @@ -879,7 +924,7 @@ void flush_buffer(struct stream_info *sinfo, int stream) cthread[i].s_len = sinfo->s[stream].buflen; print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n", - i, cthread[i].s_len, stream); + i, cthread[i].s_len, stream); create_pthread(&threads[i], NULL, compthread, (void *)i); /* The stream buffer has been given to the thread, allocate a new one */ @@ -892,13 +937,50 @@ void flush_buffer(struct stream_info *sinfo, int stream) sinfo->thread_no = 0; } +static void *ucompthread(void *t) +{ + long i = (long)t; + struct uncomp_thread *uci = &ucthread[i]; + + if (uci->c_type != CTYPE_NONE) { + if (uci->c_type == CTYPE_LZMA) { + if (unlikely(lzma_decompress_buf(uci))) + fatal("Failed to lzma_decompress_buf in ucompthread\n"); + } else if (uci->c_type == CTYPE_LZO) { + if (unlikely(lzo_decompress_buf(uci))) + fatal("Failed to lzo_decompress_buf in ucompthread\n"); + } else if (uci->c_type == CTYPE_BZIP2) { + if (unlikely(bzip2_decompress_buf(uci))) + fatal("Failed to bzip2_decompress_buf in ucompthread\n"); + } else if (uci->c_type == CTYPE_GZIP) { + if (unlikely(gzip_decompress_buf(uci))) + fatal("Failed to gzip_decompress_buf in ucompthread\n"); + } else if (uci->c_type == CTYPE_ZPAQ) { + if (unlikely(zpaq_decompress_buf(uci, i))) + fatal("Failed to zpaq_decompress_buf in ucompthread\n"); + } else fatal("Dunno wtf decompression type to use!\n"); + } + post_sem(&uci->complete); + wait_sem(&uci->ready); + print_maxverbose("Thread %ld returning %lld uncompressed bytes\n", i, uci->u_len); + post_sem(&uci->free); + + return 0; +} + /* fill a buffer from a stream - return -1 on failure */ static int fill_buffer(struct stream_info *sinfo, int stream) { - i64 header_length, u_len, c_len; - uchar c_type; + i64 header_length, u_len, c_len, last_head; + uchar c_type, *s_buf; + int *ucs = &sinfo->uncomp_stream; - if (unlikely(seekto(sinfo, sinfo->s[stream].last_head))) + if (sinfo->s[stream].buf) + free(sinfo->s[stream].buf); + if (sinfo->eos) + goto out; +fill_another: + if (unlikely(seekto(sinfo, sinfo->s[*ucs].last_head))) return -1; if (unlikely(read_u8(sinfo->fd, &c_type))) @@ -915,53 +997,67 @@ static int fill_buffer(struct stream_info *sinfo, int stream) return -1; c_len = c_len32; u_len = u_len32; - sinfo->s[stream].last_head = last_head32; + last_head = last_head32; header_length = 13; } else { if (unlikely(read_i64(sinfo->fd, &c_len))) return -1; if (unlikely(read_i64(sinfo->fd, &u_len))) return -1; - if (unlikely(read_i64(sinfo->fd, &sinfo->s[stream].last_head))) + if (unlikely(read_i64(sinfo->fd, &last_head))) return -1; header_length = 25; } sinfo->total_read += header_length; - if (sinfo->s[stream].buf) - sinfo->s[stream].buf = realloc(sinfo->s[stream].buf, u_len); - else - sinfo->s[stream].buf = malloc(u_len); - if (unlikely(u_len && !sinfo->s[stream].buf)) + /* Wait till the next thread is free */ + wait_sem(&ucthread[sinfo->thread_no].free); + + s_buf = malloc(u_len); + if (unlikely(u_len && !s_buf)) fatal("Unable to malloc buffer of size %lld in fill_buffer\n", u_len); - if (unlikely(read_buf(sinfo->fd, sinfo->s[stream].buf, c_len))) + if (unlikely(read_buf(sinfo->fd, s_buf, c_len))) return -1; sinfo->total_read += c_len; - sinfo->s[stream].buflen = u_len; + ucthread[sinfo->thread_no].s_buf = s_buf; + ucthread[sinfo->thread_no].c_len = c_len; + ucthread[sinfo->thread_no].u_len = u_len; + ucthread[sinfo->thread_no].c_type = c_type; + ucthread[sinfo->thread_no].stream = *ucs; + sinfo->s[*ucs].last_head = last_head; + + print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n", + sinfo->thread_no, c_len, *ucs); + create_pthread(&threads[sinfo->thread_no], NULL, ucompthread, (void *)sinfo->thread_no); + + if (!last_head) { + if (sinfo->uncomp_stream < NUM_STREAMS - 1) + sinfo->uncomp_stream++; + else + sinfo->eos = 1; + } + + if (++sinfo->thread_no == control.threads) + sinfo->thread_no = 0; + if (!sinfo->eos && !trywait_sem(&ucthread[sinfo->thread_no].free)) { + post_sem(&ucthread[sinfo->thread_no].free); + goto fill_another; + } +out: + wait_sem(&ucthread[sinfo->next_thread].complete); + + sinfo->s[stream].buf = ucthread[sinfo->next_thread].s_buf; + sinfo->s[stream].buflen = ucthread[sinfo->next_thread].u_len; sinfo->s[stream].bufp = 0; - if (c_type != CTYPE_NONE) { - if (c_type == CTYPE_LZMA) { - if (unlikely(lzma_decompress_buf(&sinfo->s[stream], (size_t)c_len))) - return -1; - } else if (c_type == CTYPE_LZO) { - if (unlikely(lzo_decompress_buf(&sinfo->s[stream], c_len))) - return -1; - } else if (c_type == CTYPE_BZIP2) { - if (unlikely(bzip2_decompress_buf(&sinfo->s[stream], c_len))) - return -1; - } else if (c_type == CTYPE_GZIP) { - if (unlikely(gzip_decompress_buf(&sinfo->s[stream], c_len))) - return -1; - } else if (c_type == CTYPE_ZPAQ) { - if (unlikely(zpaq_decompress_buf(&sinfo->s[stream]))) - return -1; - } else fatal("Dunno wtf decompression type to use!\n"); - } + post_sem(&ucthread[sinfo->next_thread].ready); + + if (++sinfo->next_thread == control.threads) + sinfo->next_thread = 0; return 0; } @@ -1040,6 +1136,7 @@ int close_stream_out(void *ss) free(sinfo->s[i].buf); free(cthread); + free(threads); free(sinfo->s); free(sinfo); @@ -1058,8 +1155,11 @@ int close_stream_in(void *ss) for (i = 0; i < sinfo->num_streams; i++) free(sinfo->s[i].buf); + free(ucthread); + free(threads); free(sinfo->s); free(sinfo); + return 0; } @@ -1093,7 +1193,7 @@ static int lzo_compresses(uchar *s_buf, i64 s_len) if (unlikely(!c_buf)) fatal("Unable to allocate c_buf in lzo_compresses\n"); - print_progress("lzo testing for incompressible data...\n"); + print_verbose("lzo testing for incompressible data...\n"); /* Test progressively larger blocks at a time and as soon as anything compressible is found, jump out as a success */ diff --git a/zpipe.cpp b/zpipe.cpp index 165e132..4facd6e 100644 --- a/zpipe.cpp +++ b/zpipe.cpp @@ -1568,11 +1568,12 @@ bool find_start(FILE *in) { } // Decompress to stdout -static void decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress) { +static void decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, + int progress, long thread) { long long int len = 0; - static int last_pct = 0, chunk = 0; - int pct = 0; + int last_pct = 0; + int i, pct = 0; // Find start of block while (find_start(in)) { if (getc(in)!=LEVEL || getc(in)!=1) @@ -1600,8 +1601,12 @@ static void decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, if (progress) { len++; pct = (len * 100 / buf_len); - if (pct != last_pct) { - fprintf(msgout, "\r ZPAQ Chunk %d of 2 Decompress: %i%% \r", (chunk + 1), pct); + if (pct / 10 != last_pct / 10) { + fprintf(msgout, "\r\t\t\t\tZPAQ\t"); + for (i = 0; i < thread; i++) + fprintf(msgout, "\t"); + fprintf(msgout, "%ld: %i%%\r", + thread + 1, pct); fflush(msgout); last_pct = pct; } @@ -1623,16 +1628,11 @@ static void decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, } if (c!=255) error("missing end of block marker"); } - if (progress) { - fprintf(msgout, "\t \r"); - fflush(msgout); - } - chunk ^= 1; } -extern "C" void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress) +extern "C" void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress, long thread) { - decompress(in, out, msgout, buf_len, progress); + decompress(in, out, msgout, buf_len, progress, thread); } //////////////////////////// Compressor //////////////////////////// @@ -1708,7 +1708,7 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, 207,8,112,56,0}; long long int len = 0; - static int last_pct = 0, chunk = 0; + int last_pct = 0; int i, pct = 0; // Initialize ZPAQL z; // model @@ -1738,12 +1738,12 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, if (progress) { len++; pct = (len * 100 / buf_len); - if (pct != last_pct && !(pct % 10)) { + if (pct / 10 != last_pct / 10) { fprintf(msgout, "\r\t\t\t\tZPAQ\t"); for (i = 0; i < thread; i++) - fprintf(msgout, "\t\t"); - fprintf(msgout, "%ld: %i/2: %i%%\r", - thread, (chunk + 1), pct); + fprintf(msgout, "\t"); + fprintf(msgout, "%ld: %i%%\r", + thread + 1, pct); fflush(msgout); last_pct = pct; } @@ -1751,10 +1751,6 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, enc.compress(c); sha1.put(c); } - if (progress) { - //fprintf(msgout, "\t \r"); - //fflush(msgout); - } enc.compress(-1); // end of segment // Write segment checksum and trailer @@ -1767,7 +1763,6 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, // Optional: append a byte not 'z' to allow non-ZPAQ data to be appended //putc(0, out); - chunk ^= 1; } extern "C" void zpipe_compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,