diff --git a/main.c b/main.c index 1681a2d..50ee6cc 100644 --- a/main.c +++ b/main.c @@ -334,7 +334,8 @@ static void decompress_file(void) static void get_fileinfo(void) { - int fd_in, ctype = 0; + int fd_in; + uchar ctype = 0; long double cratio; i64 expected_size; i64 infile_size; @@ -704,7 +705,7 @@ int main(int argc, char *argv[]) print_verbose("The following options are in effect for this %s.\n", DECOMPRESS ? "DECOMPRESSION" : "COMPRESSION"); if (LZMA_COMPRESS) - print_verbose("Threading is %s. Number of CPUs detected: %lu\n", control.threads > 1? "ENABLED" : "DISABLED", + print_verbose("Threading is %s. Number of CPUs detected: %d\n", control.threads > 1? "ENABLED" : "DISABLED", control.threads); print_verbose("Detected %lld bytes ram\n", control.ramsize); print_verbose("Nice Value: %d\n", control.nice_val); diff --git a/rzip.h b/rzip.h index a5a8b41..70ced40 100644 --- a/rzip.h +++ b/rzip.h @@ -35,6 +35,8 @@ #include #include #include +#include +#include #include #include @@ -233,7 +235,7 @@ struct rzip_control { unsigned long long window; unsigned long flags; unsigned long long ramsize; - unsigned long threads; + int threads; int nice_val; // added for consistency int major_version; int minor_version; diff --git a/stream.c b/stream.c index 187137c..cb2660e 100644 --- a/stream.c +++ b/stream.c @@ -23,11 +23,66 @@ #define STREAM_BUFSIZE (1024 * 1024 * 10) +struct compress_thread{ + uchar *s_buf; /* Uncompressed buffer */ + uchar *c_buf; /* Compressed buffer */ + uchar c_type; /* Compression type */ + i64 s_len; /* Data length uncompressed */ + i64 c_len; /* Data length compressed */ + sem_t complete; /* Signal when this thread has finished */ + struct stream *s; +} *cthread; + +void init_sem(sem_t *sem) +{ + if (unlikely(sem_init(sem, 0, 0))) + fatal("sem_init\n"); +} + + +static inline void post_sem(sem_t *s) +{ +retry: + if (unlikely((sem_post(s)) == -1)) { + if (errno == EINTR) + goto retry; + fatal("sem_post failed"); + } +} + +static inline void wait_sem(sem_t *s) +{ +retry: + if (unlikely((sem_wait(s)) == -1)) { + if (errno == EINTR) + goto retry; + fatal("sem_wait failed"); + } +} + +static inline void destroy_sem(sem_t *s) +{ + if (unlikely(sem_destroy(s))) + fatal("sem_destroy failed\n"); +} + +void create_pthread(pthread_t * thread, pthread_attr_t * attr, + void * (*start_routine)(void *), void *arg) +{ + if (pthread_create(thread, attr, start_routine, arg)) + fatal("pthread_create"); +} + +void join_pthread(pthread_t th, void **thread_return) +{ + if (pthread_join(th, thread_return)) + fatal("pthread_join"); +} + /* just to keep things clean, declare function here * but move body to the end since it's a work function */ - -static int lzo_compresses(struct stream *s); +static int lzo_compresses(uchar *s_buf, i64 s_len); static inline FILE *fake_fmemopen(void *buf, size_t buflen, char *mode) { @@ -84,122 +139,105 @@ static inline int fake_open_memstream_update_buffer(FILE *fp, uchar **buf, size_ length in c_len */ -static void zpaq_compress_buf(struct stream *s, int *c_type, i64 *c_len) +static void zpaq_compress_buf(struct compress_thread *cthread) { - uchar *c_buf = NULL; size_t dlen = 0; FILE *in, *out; - if (!lzo_compresses(s)) + if (!lzo_compresses(cthread->s_buf, cthread->s_len)) return; - in = fmemopen(s->buf, s->buflen, "r"); + free(cthread->c_buf); /* Will be reallocated in memopen */ + in = fmemopen(cthread->s_buf, cthread->s_len, "r"); if (unlikely(!in)) fatal("Failed to fmemopen in zpaq_compress_buf\n"); - out = open_memstream((char **)&c_buf, &dlen); + out = open_memstream((char **)&cthread->c_buf, &dlen); if (unlikely(!out)) fatal("Failed to open_memstream in zpaq_compress_buf\n"); - zpipe_compress(in, out, control.msgout, s->buflen, (int)(SHOW_PROGRESS)); + zpipe_compress(in, out, control.msgout, cthread->s_len, (int)(SHOW_PROGRESS)); - if (unlikely(memstream_update_buffer(out, &c_buf, &dlen))) + if (unlikely(memstream_update_buffer(out, &cthread->c_buf, &dlen))) fatal("Failed to memstream_update_buffer in zpaq_compress_buf"); fclose(in); fclose(out); - if ((i64)dlen >= *c_len) { + if ((i64)dlen >= cthread->c_len) { + print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ - free(c_buf); return; } - *c_len = dlen; - free(s->buf); - s->buf = c_buf; - *c_type = CTYPE_ZPAQ; + cthread->c_len = dlen; + cthread->s_buf = cthread->c_buf; + cthread->c_type = CTYPE_ZPAQ; } -static void bzip2_compress_buf(struct stream *s, int *c_type, i64 *c_len) +static void bzip2_compress_buf(struct compress_thread *cthread) { - u32 dlen = s->buflen; - uchar *c_buf; + u32 dlen = cthread->s_len; - if (!lzo_compresses(s)) + if (!lzo_compresses(cthread->s_buf, cthread->s_len)) return; - c_buf = malloc(dlen); - if (!c_buf) - return; - - if (BZ2_bzBuffToBuffCompress((char*)c_buf, &dlen, (char*)s->buf, s->buflen, - control.compression_level, 0, - control.compression_level * 10) != BZ_OK) { - free(c_buf); - return; + if (BZ2_bzBuffToBuffCompress((char *)cthread->c_buf, &dlen, + (char *)cthread->s_buf, cthread->s_len, + control.compression_level, 0, control.compression_level * 10) != BZ_OK) { + return; } - if (dlen >= *c_len) { + if (dlen >= cthread->c_len) { + print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ - free(c_buf); return; } - *c_len = dlen; - free(s->buf); - s->buf = c_buf; - *c_type = CTYPE_BZIP2; + cthread->c_len = dlen; + cthread->s_buf = cthread->c_buf; + cthread->c_type = CTYPE_BZIP2; } -static void gzip_compress_buf(struct stream *s, int *c_type, i64 *c_len) +static void gzip_compress_buf(struct compress_thread *cthread) { - unsigned long dlen = s->buflen; - uchar *c_buf; + unsigned long dlen = cthread->s_len; - c_buf = malloc(dlen); - if (!c_buf) - return; - - if (compress2(c_buf, &dlen, s->buf, s->buflen, control.compression_level) != Z_OK) { - free(c_buf); - return; + if (compress2(cthread->c_buf, &dlen, cthread->s_buf, cthread->s_len, + control.compression_level) != Z_OK) { + return; } - if ((i64)dlen >= *c_len) { + if ((i64)dlen >= cthread->c_len) { + print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ - free(c_buf); return; } - *c_len = dlen; - free(s->buf); - s->buf = c_buf; - *c_type = CTYPE_GZIP; + cthread->c_len = dlen; + cthread->s_buf = cthread->c_buf; + cthread->c_type = CTYPE_GZIP; } -static void lzma_compress_buf(struct stream *s, int *c_type, i64 *c_len) +static void lzma_compress_buf(struct compress_thread *cthread) { size_t prop_size = 5; /* return value for lzma_properties */ - uchar *c_buf; size_t dlen; int lzma_ret; - if (!lzo_compresses(s)) - goto out; - - dlen = s->buflen; - c_buf = malloc(dlen); - if (!c_buf) + if (!lzo_compresses(cthread->s_buf, cthread->s_len)) return; - print_progress("\tProgress percentage pausing during lzma compression..."); + dlen = cthread->s_len; + + print_progress("\r\tProgress percentage pausing during lzma compression..."); /* with LZMA SDK 4.63, we pass compression level and threads only * and receive properties in control->lzma_properties */ - lzma_ret = LzmaCompress(c_buf, &dlen, s->buf, (size_t)s->buflen, control.lzma_properties, &prop_size, control.compression_level, + lzma_ret = LzmaCompress(cthread->c_buf, &dlen, cthread->s_buf, + (size_t)cthread->c_len, control.lzma_properties, &prop_size, control.compression_level, 0, /* dict size. set default */ -1, -1, -1, -1, /* lc, lp, pb, fb */ - control.threads); + 0); /* Threads - no threading since we do it globally */ if (lzma_ret != SZ_OK) { switch (lzma_ret) { case SZ_ERROR_MEM: @@ -209,7 +247,7 @@ static void lzma_compress_buf(struct stream *s, int *c_type, i64 *c_len) print_err("\nLZMA Parameter ERROR: %d. This should not happen.\n", SZ_ERROR_PARAM); break; case SZ_ERROR_OUTPUT_EOF: - print_err("\nHarmless LZMA Output Buffer Overflow error: %d. Incompressible block.\n", SZ_ERROR_OUTPUT_EOF); + print_maxverbose("\nHarmless LZMA Output Buffer Overflow error: %d. Incompressible block.\n", SZ_ERROR_OUTPUT_EOF); break; case SZ_ERROR_THREAD: print_err("\nLZMA Multi Thread ERROR: %d. This should not happen.\n", SZ_ERROR_THREAD); @@ -219,19 +257,18 @@ static void lzma_compress_buf(struct stream *s, int *c_type, i64 *c_len) break; } /* can pass -1 if not compressible! Thanks Lasse Collin */ - free(c_buf); + print_maxverbose("Incompressible block\n"); goto out; } - if ((i64)dlen >= *c_len) { + if ((i64)dlen >= cthread->c_len) { /* Incompressible, leave as CTYPE_NONE */ - free(c_buf); + print_maxverbose("Incompressible block\n"); goto out; } - *c_len = dlen; - free(s->buf); - s->buf = c_buf; - *c_type = CTYPE_LZMA; + cthread->c_len = dlen; + cthread->s_buf = cthread->c_buf; + cthread->c_type = CTYPE_LZMA; out: if (MAX_VERBOSE) print_output("\n"); @@ -239,35 +276,29 @@ out: print_progress("\r\t \r"); } -static void lzo_compress_buf(struct stream *s, int *c_type, i64 *c_len) +static void lzo_compress_buf(struct compress_thread *cthread) { - lzo_uint in_len = s->buflen; - lzo_uint dlen = in_len + in_len / 16 + 64 + 3; + lzo_uint in_len = cthread->s_len; + lzo_uint dlen = in_len; lzo_int return_var; /* lzo1x_1_compress does not return anything but LZO_OK */ lzo_bytep wrkmem; - uchar *c_buf; wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS); if (wrkmem == NULL) return; - c_buf = malloc(dlen); - if (!c_buf) - goto out_free; - - return_var = lzo1x_1_compress((uchar *)s->buf, in_len, (uchar *)c_buf, - &dlen,wrkmem); + return_var = lzo1x_1_compress(cthread->s_buf, in_len, cthread->c_buf, + &dlen, wrkmem); if (dlen >= in_len){ /* Incompressible, leave as CTYPE_NONE */ - free(c_buf); + print_maxverbose("Incompressible block\n"); goto out_free; } - *c_len = dlen; - free(s->buf); - s->buf = c_buf; - *c_type = CTYPE_LZO; + cthread->c_len = dlen; + cthread->s_buf = cthread->c_buf; + cthread->c_type = CTYPE_LZO; out_free: free(wrkmem); } @@ -578,27 +609,18 @@ void *open_stream_out(int f, int n, i64 limit) sinfo->cur_pos = 0; sinfo->fd = f; - /* 10MB streams for non lzma compress. There is virtually no gain - in lzo, gzip and bzip2 with larger streams. With lzma and zpaq, - however, the larger the buffer, the better the compression so we - make it as large as the window up to the limit the compressor - will take */ if (BITS32) { /* Largest window supported on 32bit is 600MB */ if (!cwindow || cwindow > 6) cwindow = 6; } - if (LZMA_COMPRESS || ZPAQ_COMPRESS) - sinfo->bufsize = STREAM_BUFSIZE * 10 * cwindow; - else - sinfo->bufsize = STREAM_BUFSIZE; - /* No point making the stream larger than the amount of data */ - if (sinfo->bufsize) - sinfo->bufsize = MIN(sinfo->bufsize, limit); + if (cwindow) + sinfo->bufsize = MIN(STREAM_BUFSIZE * 10 * cwindow, limit); else sinfo->bufsize = limit; + sinfo->initial_pos = lseek(f, 0, SEEK_CUR); sinfo->s = (struct stream *)calloc(sizeof(sinfo->s[0]), n); @@ -721,53 +743,112 @@ failed: return NULL; } -/* flush out any data in a stream buffer. Return -1 on failure */ -int flush_buffer(struct stream_info *sinfo, int stream) +void *compthread(void *t) { - i64 c_len = sinfo->s[stream].buflen; - int c_type = CTYPE_NONE; + long i = (long)t; - if (unlikely(seekto(sinfo, sinfo->s[stream].last_head))) - return -1; - if (unlikely(write_i64(sinfo->fd, sinfo->cur_pos))) - return -1; + /* Must allocate extra for lzo's sake */ + cthread[i].c_buf = malloc(cthread[i].c_len + cthread[i].c_len / 16 + 64 + 3); + if (unlikely(cthread[i].c_len && !cthread[i].c_buf)) + fatal("Failed to malloc c_buf in compthread\n"); - sinfo->s[stream].last_head = sinfo->cur_pos + 17; - if (unlikely(seekto(sinfo, sinfo->cur_pos))) - return -1; + cthread[i].c_type = CTYPE_NONE; - if (!(NO_COMPRESS)) { + if (!NO_COMPRESS && cthread[i].c_len) { if (LZMA_COMPRESS) - lzma_compress_buf(&sinfo->s[stream], &c_type, &c_len); + lzma_compress_buf(&cthread[i]); else if (LZO_COMPRESS) - lzo_compress_buf(&sinfo->s[stream], &c_type, &c_len); + lzo_compress_buf(&cthread[i]); else if (BZIP2_COMPRESS) - bzip2_compress_buf(&sinfo->s[stream], &c_type, &c_len); + bzip2_compress_buf(&cthread[i]); else if (ZLIB_COMPRESS) - gzip_compress_buf(&sinfo->s[stream], &c_type, &c_len); + gzip_compress_buf(&cthread[i]); else if (ZPAQ_COMPRESS) - zpaq_compress_buf(&sinfo->s[stream], &c_type, &c_len); + zpaq_compress_buf(&cthread[i]); else fatal("Dunno wtf compression to use!\n"); } - if (unlikely(write_u8(sinfo->fd, c_type) || - write_i64(sinfo->fd, c_len) || - write_i64(sinfo->fd, sinfo->s[stream].buflen) || - write_i64(sinfo->fd, 0))) { - return -1; - } - sinfo->cur_pos += 25; + post_sem(&cthread[i].complete); + return 0; +} - if (unlikely(write_buf(sinfo->fd, sinfo->s[stream].buf, c_len))) - return -1; - fsync(sinfo->fd); - sinfo->cur_pos += c_len; +/* flush out any data in a stream buffer. Return -1 on failure */ +int flush_buffer(struct stream_info *sinfo, int stream) +{ + i64 bufsize, buflen = sinfo->s[stream].buflen, offset = 0; + pthread_t threads[control.threads]; + int chunks = 1; + long i; + + bufsize = buflen; + + /* Increase the number of threads but don't let each buffer get + * smaller than STREAM_BUFSIZE as the backend compression efficency + * significantly drops off. + */ + while (chunks < control.threads && bufsize > STREAM_BUFSIZE) { + chunks++; + bufsize = buflen / chunks + 1; + } + + cthread = calloc(sizeof(struct compress_thread), chunks); + if (unlikely(!cthread)) + fatal("Unable to calloc cthread in flush_buffer\n"); + + + for (i = 0; i < chunks; i++) { + init_sem(&cthread[i].complete); + + if (offset + bufsize > buflen) + bufsize = buflen - offset; + cthread[i].s_buf = sinfo->s[stream].buf + offset; + cthread[i].s_len = cthread[i].c_len = bufsize; + cthread[i].s = sinfo->s; + + print_maxverbose("\rStarting thread %lu to compress %lld bytes at offset %lld\n", i, bufsize, offset); + create_pthread(&threads[i], NULL, compthread, (void *)i); + offset += bufsize; + } + + for (i = 0; i < chunks; i++) { + if (unlikely(seekto(sinfo, sinfo->s[stream].last_head))) + return -1; + + if (unlikely(write_i64(sinfo->fd, sinfo->cur_pos))) + return -1; + + sinfo->s[stream].last_head = sinfo->cur_pos + 17; + if (unlikely(seekto(sinfo, sinfo->cur_pos))) + return -1; + + wait_sem(&cthread[i].complete); + + print_maxverbose("Reading thread %ld to write %lld compressed bytes\n", i, cthread[i].c_len); + if (unlikely(write_u8(sinfo->fd, cthread[i].c_type) || + write_i64(sinfo->fd, cthread[i].c_len) || + write_i64(sinfo->fd, cthread[i].s_len) || + write_i64(sinfo->fd, 0))) { + return -1; + } + sinfo->cur_pos += 25; + + if (unlikely(write_buf(sinfo->fd, cthread[i].s_buf, cthread[i].c_len))) + return -1; + + fsync(sinfo->fd); + sinfo->cur_pos += cthread[i].c_len; + + destroy_sem(&cthread[i].complete); + free(cthread[i].c_buf); + } + free(cthread); sinfo->s[stream].buflen = 0; sinfo->s[stream].buf = realloc(sinfo->s[stream].buf, sinfo->bufsize); if (unlikely(!sinfo->s[stream].buf)) fatal("Failed to realloc in flush_buffer\n"); + return 0; } @@ -812,7 +893,7 @@ static int fill_buffer(struct stream_info *sinfo, int stream) sinfo->s[stream].buf = realloc(sinfo->s[stream].buf, u_len); else sinfo->s[stream].buf = malloc(u_len); - if (unlikely(!sinfo->s[stream].buf)) + if (unlikely(u_len && !sinfo->s[stream].buf)) fatal("Unable to malloc buffer of size %lld in fill_buffer\n", u_len); if (unlikely(read_buf(sinfo->fd, sinfo->s[stream].buf, c_len))) return -1; @@ -854,7 +935,7 @@ int write_stream(void *ss, int stream, uchar *p, i64 len) n = MIN(sinfo->bufsize - sinfo->s[stream].buflen, len); - memcpy(sinfo->s[stream].buf+sinfo->s[stream].buflen, p, n); + memcpy(sinfo->s[stream].buf + sinfo->s[stream].buflen, p, n); sinfo->s[stream].buflen += n; p += n; len -= n; @@ -880,7 +961,7 @@ i64 read_stream(void *ss, int stream, uchar *p, i64 len) n = MIN(sinfo->s[stream].buflen-sinfo->s[stream].bufp, len); if (n > 0) { - memcpy(p, sinfo->s[stream].buf+sinfo->s[stream].bufp, n); + memcpy(p, sinfo->s[stream].buf + sinfo->s[stream].bufp, n); sinfo->s[stream].bufp += n; p += n; len -= n; @@ -945,13 +1026,13 @@ int close_stream_in(void *ss) to see if there is any compression at all with lzo first. It is unlikely that others will be able to compress if lzo is unable to drop a single byte so do not compress any block that is incompressible by lzo. */ -static int lzo_compresses(struct stream *s) +static int lzo_compresses(uchar *s_buf, i64 s_len) { - lzo_bytep wrkmem=NULL; - lzo_uint in_len, test_len = s->buflen, save_len = s->buflen; + lzo_bytep wrkmem = NULL; + lzo_uint in_len, test_len = s_len, save_len = s_len; lzo_uint dlen; lzo_int return_var; /* lzo1x_1_compress does not return anything but LZO_OK */ - uchar *c_buf = NULL, *test_buf = s->buf; + uchar *c_buf = NULL, *test_buf = s_buf; /* set minimum buffer test size based on the length of the test stream */ unsigned long buftest_size = (test_len > 5 * STREAM_BUFSIZE ? STREAM_BUFSIZE : STREAM_BUFSIZE / 4096); int ret = 0;