diff --git a/rzip.c b/rzip.c index 452364a..bc0303c 100644 --- a/rzip.c +++ b/rzip.c @@ -221,16 +221,15 @@ int write_sbstream(void *ss, int stream, i64 p, i64 len) n = MIN(sinfo->bufsize - sinfo->s[stream].buflen, len); for (i = 0; i < n; i++) { - memcpy(sinfo->s[stream].buf+sinfo->s[stream].buflen + i, + memcpy(sinfo->s[stream].buf + sinfo->s[stream].buflen + i, get_sb(p + i), 1); } sinfo->s[stream].buflen += n; p += n; len -= n; - if (sinfo->s[stream].buflen == sinfo->bufsize) { - if (unlikely(flush_buffer(sinfo, stream))) - return -1; - } + + if (sinfo->s[stream].buflen == sinfo->bufsize) + flush_buffer(sinfo, stream); } return 0; } @@ -350,7 +349,7 @@ static tag clean_one_from_hash(struct rzip_state *st) again: better_than_min = increase_mask(st->minimum_tag_mask); if (!st->tag_clean_ptr) - print_maxverbose("\nStarting sweep for mask %u\n", (unsigned int)st->minimum_tag_mask); + print_maxverbose("Starting sweep for mask %u\n", (unsigned int)st->minimum_tag_mask); for (; st->tag_clean_ptr < (1U << st->hash_bits); st->tag_clean_ptr++) { if (empty_hash(st, st->tag_clean_ptr)) @@ -478,9 +477,9 @@ static void show_distrib(struct rzip_state *st) } if (total != st->hash_count) - print_err("/tWARNING: hash_count says total %lld\n", st->hash_count); + print_err("WARNING: hash_count says total %lld\n", st->hash_count); - print_output("\t%lld total hashes -- %lld in primary bucket (%-2.3f%%)\n", total, primary, + print_output("%lld total hashes -- %lld in primary bucket (%-2.3f%%)\n", total, primary, primary*100.0/total); } @@ -711,8 +710,6 @@ static void rzip_chunk(struct rzip_state *st, int fd_in, int fd_out, i64 offset, fatal("Failed to munmap in rzip_chunk\n"); } - if (!NO_COMPRESS) - print_verbose("Performing backend compression phase\n"); if (unlikely(close_stream_out(st->ss))) fatal("Failed to flush/close streams in rzip_chunk\n"); } diff --git a/rzip.h b/rzip.h index 70ced40..b2f4fb6 100644 --- a/rzip.h +++ b/rzip.h @@ -258,6 +258,7 @@ struct stream_info { i64 cur_pos; i64 initial_pos; i64 total_read; + long thread_no; }; void fatal(const char *format, ...); @@ -270,7 +271,7 @@ int write_stream(void *ss, int stream, uchar *p, i64 len); i64 read_stream(void *ss, int stream, uchar *p, i64 len); int close_stream_out(void *ss); int close_stream_in(void *ss); -int flush_buffer(struct stream_info *sinfo, int stream); +void flush_buffer(struct stream_info *sinfo, int stream); void read_config(struct rzip_control *s); ssize_t write_1g(int fd, void *buf, i64 len); ssize_t read_1g(int fd, void *buf, i64 len); diff --git a/stream.c b/stream.c index cb2660e..69c597b 100644 --- a/stream.c +++ b/stream.c @@ -24,13 +24,15 @@ #define STREAM_BUFSIZE (1024 * 1024 * 10) struct compress_thread{ - uchar *s_buf; /* Uncompressed buffer */ - uchar *c_buf; /* Compressed buffer */ + uchar *s_buf; /* Uncompressed buffer -> Compressed buffer */ uchar c_type; /* Compression type */ i64 s_len; /* Data length uncompressed */ i64 c_len; /* Data length compressed */ sem_t complete; /* Signal when this thread has finished */ - struct stream *s; + sem_t free; /* This thread no longer exists */ + int wait_on; /* Which thread has to complete before this can write its data */ + struct stream_info *sinfo; + int stream; } *cthread; void init_sem(sem_t *sem) @@ -39,7 +41,6 @@ void init_sem(sem_t *sem) fatal("sem_init\n"); } - static inline void post_sem(sem_t *s) { retry: @@ -141,23 +142,23 @@ static inline int fake_open_memstream_update_buffer(FILE *fp, uchar **buf, size_ static void zpaq_compress_buf(struct compress_thread *cthread) { + uchar *c_buf = NULL; size_t dlen = 0; FILE *in, *out; if (!lzo_compresses(cthread->s_buf, cthread->s_len)) return; - free(cthread->c_buf); /* Will be reallocated in memopen */ in = fmemopen(cthread->s_buf, cthread->s_len, "r"); if (unlikely(!in)) fatal("Failed to fmemopen in zpaq_compress_buf\n"); - out = open_memstream((char **)&cthread->c_buf, &dlen); + out = open_memstream((char **)&c_buf, &dlen); if (unlikely(!out)) fatal("Failed to open_memstream in zpaq_compress_buf\n"); zpipe_compress(in, out, control.msgout, cthread->s_len, (int)(SHOW_PROGRESS)); - if (unlikely(memstream_update_buffer(out, &cthread->c_buf, &dlen))) + if (unlikely(memstream_update_buffer(out, &c_buf, &dlen))) fatal("Failed to memstream_update_buffer in zpaq_compress_buf"); fclose(in); @@ -166,61 +167,80 @@ static void zpaq_compress_buf(struct compress_thread *cthread) if ((i64)dlen >= cthread->c_len) { print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ + free(c_buf); return; } cthread->c_len = dlen; - cthread->s_buf = cthread->c_buf; + free(cthread->s_buf); + cthread->s_buf = c_buf; cthread->c_type = CTYPE_ZPAQ; } static void bzip2_compress_buf(struct compress_thread *cthread) { u32 dlen = cthread->s_len; + uchar *c_buf; if (!lzo_compresses(cthread->s_buf, cthread->s_len)) return; - if (BZ2_bzBuffToBuffCompress((char *)cthread->c_buf, &dlen, + c_buf = malloc(dlen); + if (!c_buf) + return; + + if (BZ2_bzBuffToBuffCompress((char *)c_buf, &dlen, (char *)cthread->s_buf, cthread->s_len, control.compression_level, 0, control.compression_level * 10) != BZ_OK) { + free(c_buf); return; } if (dlen >= cthread->c_len) { print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ + free(c_buf); return; } cthread->c_len = dlen; - cthread->s_buf = cthread->c_buf; + free(cthread->s_buf); + cthread->s_buf = c_buf; cthread->c_type = CTYPE_BZIP2; } static void gzip_compress_buf(struct compress_thread *cthread) { unsigned long dlen = cthread->s_len; + uchar *c_buf; - if (compress2(cthread->c_buf, &dlen, cthread->s_buf, cthread->s_len, + c_buf = malloc(dlen); + if (!c_buf) + return; + + if (compress2(c_buf, &dlen, cthread->s_buf, cthread->s_len, control.compression_level) != Z_OK) { + free(c_buf); return; } if ((i64)dlen >= cthread->c_len) { print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ + free(c_buf); return; } cthread->c_len = dlen; - cthread->s_buf = cthread->c_buf; + free(cthread->s_buf); + cthread->s_buf = c_buf; cthread->c_type = CTYPE_GZIP; } static void lzma_compress_buf(struct compress_thread *cthread) { size_t prop_size = 5; /* return value for lzma_properties */ + uchar *c_buf; size_t dlen; int lzma_ret; @@ -228,16 +248,19 @@ static void lzma_compress_buf(struct compress_thread *cthread) return; dlen = cthread->s_len; + c_buf = malloc(dlen); + if (!c_buf) + return; - print_progress("\r\tProgress percentage pausing during lzma compression..."); + print_progress("Starting lzma back end compression thread...\n"); /* with LZMA SDK 4.63, we pass compression level and threads only * and receive properties in control->lzma_properties */ - lzma_ret = LzmaCompress(cthread->c_buf, &dlen, cthread->s_buf, - (size_t)cthread->c_len, control.lzma_properties, &prop_size, control.compression_level, + lzma_ret = LzmaCompress(c_buf, &dlen, cthread->s_buf, + (size_t)cthread->s_len, control.lzma_properties, &prop_size, control.compression_level, 0, /* dict size. set default */ -1, -1, -1, -1, /* lc, lp, pb, fb */ - 0); /* Threads - no threading since we do it globally */ + control.threads); if (lzma_ret != SZ_OK) { switch (lzma_ret) { case SZ_ERROR_MEM: @@ -258,46 +281,50 @@ static void lzma_compress_buf(struct compress_thread *cthread) } /* can pass -1 if not compressible! Thanks Lasse Collin */ print_maxverbose("Incompressible block\n"); - goto out; + free(c_buf); + return; } if ((i64)dlen >= cthread->c_len) { /* Incompressible, leave as CTYPE_NONE */ print_maxverbose("Incompressible block\n"); - goto out; + free(c_buf); + return; } cthread->c_len = dlen; - cthread->s_buf = cthread->c_buf; + free(cthread->s_buf); + cthread->s_buf = c_buf; cthread->c_type = CTYPE_LZMA; -out: - if (MAX_VERBOSE) - print_output("\n"); - else - print_progress("\r\t \r"); } static void lzo_compress_buf(struct compress_thread *cthread) { lzo_uint in_len = cthread->s_len; - lzo_uint dlen = in_len; + lzo_uint dlen = in_len + in_len / 16 + 64 + 3; lzo_int return_var; /* lzo1x_1_compress does not return anything but LZO_OK */ lzo_bytep wrkmem; + uchar *c_buf; wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS); if (wrkmem == NULL) return; - return_var = lzo1x_1_compress(cthread->s_buf, in_len, cthread->c_buf, - &dlen, wrkmem); + c_buf = malloc(dlen); + if (!c_buf) + goto out_free; + + return_var = lzo1x_1_compress(cthread->s_buf, in_len, c_buf, &dlen, wrkmem); if (dlen >= in_len){ /* Incompressible, leave as CTYPE_NONE */ print_maxverbose("Incompressible block\n"); + free(c_buf); goto out_free; } cthread->c_len = dlen; - cthread->s_buf = cthread->c_buf; + free(cthread->s_buf); + cthread->s_buf = c_buf; cthread->c_type = CTYPE_LZO; out_free: free(wrkmem); @@ -604,7 +631,29 @@ void *open_stream_out(int f, int n, i64 limit) if (unlikely(!sinfo)) return NULL; - sinfo->bufsize = 0; + sinfo->bufsize = limit; + sinfo->thread_no = 0; + + cthread = calloc(sizeof(struct compress_thread), control.threads); + if (unlikely(!cthread)) + fatal("Unable to calloc cthread in open_stream_out\n"); + + for (i = 0; i < control.threads; i++) { + init_sem(&cthread[i].complete); + init_sem(&cthread[i].free); + post_sem(&cthread[i].free); + } + + /* Threads need to wait on the thread before them before dumping their + * data. This is done in a circle up to control.threads */ + cthread[0].wait_on = control.threads - 1; + for (i = 1; i < control.threads; i++) + cthread[i].wait_on = i - 1; + + /* Signal thread 0 that it can start */ + if (control.threads > 1) + post_sem(&cthread[control.threads - 1].complete); + sinfo->num_streams = n; sinfo->cur_pos = 0; sinfo->fd = f; @@ -639,7 +688,18 @@ retest_malloc: goto retest_malloc; } free(testmalloc); - print_maxverbose("Succeeded to malloc for compression bufsize of %lld\n", sinfo->bufsize); + + /* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the + * bufsize to fit X threads into it */ + sinfo->bufsize = MIN(sinfo->bufsize, + MAX((sinfo->bufsize + control.threads - 1) / control.threads, STREAM_BUFSIZE)); + + if (control.threads > 1) + print_maxverbose("Using %d threads to compress up to %lld bytes each.\n", + control.threads, sinfo->bufsize); + else + print_maxverbose("Using 1 thread to compress up to %lld bytes\n", + sinfo->bufsize); for (i = 0; i < n; i++) { sinfo->s[i].buf = malloc(sinfo->bufsize); @@ -743,113 +803,92 @@ failed: return NULL; } +/* Enter with s_buf allocated,s_buf points to the compressed data after the + * backend compression and is then freed here */ void *compthread(void *t) { long i = (long)t; + struct compress_thread *cti = &cthread[i]; - /* Must allocate extra for lzo's sake */ - cthread[i].c_buf = malloc(cthread[i].c_len + cthread[i].c_len / 16 + 64 + 3); - if (unlikely(cthread[i].c_len && !cthread[i].c_buf)) - fatal("Failed to malloc c_buf in compthread\n"); + cti->c_type = CTYPE_NONE; + cti->c_len = cti->s_len; - cthread[i].c_type = CTYPE_NONE; - - if (!NO_COMPRESS && cthread[i].c_len) { + if (!NO_COMPRESS && cti->c_len) { if (LZMA_COMPRESS) - lzma_compress_buf(&cthread[i]); + lzma_compress_buf(cti); else if (LZO_COMPRESS) - lzo_compress_buf(&cthread[i]); + lzo_compress_buf(cti); else if (BZIP2_COMPRESS) - bzip2_compress_buf(&cthread[i]); + bzip2_compress_buf(cti); else if (ZLIB_COMPRESS) - gzip_compress_buf(&cthread[i]); + gzip_compress_buf(cti); else if (ZPAQ_COMPRESS) - zpaq_compress_buf(&cthread[i]); + zpaq_compress_buf(cti); else fatal("Dunno wtf compression to use!\n"); } - post_sem(&cthread[i].complete); + if (control.threads > 1) + wait_sem(&cthread[cti->wait_on].complete); + + if (unlikely(seekto(cti->sinfo, cti->sinfo->s[cti->stream].last_head))) + fatal("Failed to seekto in compthread %d\n", i); + + if (unlikely(write_i64(cti->sinfo->fd, cti->sinfo->cur_pos))) + fatal("Failed to write_i64 in compthread %d\n", i); + + cti->sinfo->s[cti->stream].last_head = cti->sinfo->cur_pos + 17; + if (unlikely(seekto(cti->sinfo, cti->sinfo->cur_pos))) + fatal("Failed to seekto cur_pos in compthread %d\n", i); + + print_maxverbose("Writing %lld compressed bytes from thread %ld\n", cti->c_len, i); + if (unlikely(write_u8(cti->sinfo->fd, cti->c_type) || + write_i64(cti->sinfo->fd, cti->c_len) || + write_i64(cti->sinfo->fd, cti->s_len) || + write_i64(cti->sinfo->fd, 0))) { + fatal("Failed write in compthread %d\n", i); + } + cti->sinfo->cur_pos += 25; + + if (unlikely(write_buf(cti->sinfo->fd, cti->s_buf, cti->c_len))) + fatal("Failed to write_buf in compthread %d\n", i); + + cti->sinfo->cur_pos += cti->c_len; + free(cti->s_buf); + + fsync(cti->sinfo->fd); + + post_sem(&cti->complete); + post_sem(&cti->free); + return 0; } -/* flush out any data in a stream buffer. Return -1 on failure */ -int flush_buffer(struct stream_info *sinfo, int stream) +/* flush out any data in a stream buffer */ +void flush_buffer(struct stream_info *sinfo, int stream) { - i64 bufsize, buflen = sinfo->s[stream].buflen, offset = 0; pthread_t threads[control.threads]; - int chunks = 1; - long i; + long i = sinfo->thread_no; - bufsize = buflen; + /* Make sure this thread doesn't already exist */ + wait_sem(&cthread[i].free); - /* Increase the number of threads but don't let each buffer get - * smaller than STREAM_BUFSIZE as the backend compression efficency - * significantly drops off. - */ - while (chunks < control.threads && bufsize > STREAM_BUFSIZE) { - chunks++; - bufsize = buflen / chunks + 1; - } + cthread[i].sinfo = sinfo; + cthread[i].stream = stream; + cthread[i].s_buf = sinfo->s[stream].buf; + cthread[i].s_len = sinfo->s[stream].buflen; - cthread = calloc(sizeof(struct compress_thread), chunks); - if (unlikely(!cthread)) - fatal("Unable to calloc cthread in flush_buffer\n"); - - - for (i = 0; i < chunks; i++) { - init_sem(&cthread[i].complete); - - if (offset + bufsize > buflen) - bufsize = buflen - offset; - cthread[i].s_buf = sinfo->s[stream].buf + offset; - cthread[i].s_len = cthread[i].c_len = bufsize; - cthread[i].s = sinfo->s; - - print_maxverbose("\rStarting thread %lu to compress %lld bytes at offset %lld\n", i, bufsize, offset); - create_pthread(&threads[i], NULL, compthread, (void *)i); - offset += bufsize; - } - - for (i = 0; i < chunks; i++) { - if (unlikely(seekto(sinfo, sinfo->s[stream].last_head))) - return -1; - - if (unlikely(write_i64(sinfo->fd, sinfo->cur_pos))) - return -1; - - sinfo->s[stream].last_head = sinfo->cur_pos + 17; - if (unlikely(seekto(sinfo, sinfo->cur_pos))) - return -1; - - wait_sem(&cthread[i].complete); - - print_maxverbose("Reading thread %ld to write %lld compressed bytes\n", i, cthread[i].c_len); - if (unlikely(write_u8(sinfo->fd, cthread[i].c_type) || - write_i64(sinfo->fd, cthread[i].c_len) || - write_i64(sinfo->fd, cthread[i].s_len) || - write_i64(sinfo->fd, 0))) { - return -1; - } - sinfo->cur_pos += 25; - - if (unlikely(write_buf(sinfo->fd, cthread[i].s_buf, cthread[i].c_len))) - return -1; - - fsync(sinfo->fd); - sinfo->cur_pos += cthread[i].c_len; - - destroy_sem(&cthread[i].complete); - free(cthread[i].c_buf); - } - free(cthread); + print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n", + i, cthread[i].s_len, stream); + create_pthread(&threads[i], NULL, compthread, (void *)i); + /* The stream buffer has been given to the thread, allocate a new one */ + sinfo->s[stream].buf = malloc(sinfo->bufsize); + if (unlikely(!sinfo->s[stream].buf)) + fatal("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize); sinfo->s[stream].buflen = 0; - sinfo->s[stream].buf = realloc(sinfo->s[stream].buf, sinfo->bufsize); - if (unlikely(!sinfo->s[stream].buf)) - fatal("Failed to realloc in flush_buffer\n"); - - return 0; + if (++sinfo->thread_no == control.threads) + sinfo->thread_no = 0; } /* fill a buffer from a stream - return -1 on failure */ @@ -895,6 +934,7 @@ static int fill_buffer(struct stream_info *sinfo, int stream) sinfo->s[stream].buf = malloc(u_len); if (unlikely(u_len && !sinfo->s[stream].buf)) fatal("Unable to malloc buffer of size %lld in fill_buffer\n", u_len); + if (unlikely(read_buf(sinfo->fd, sinfo->s[stream].buf, c_len))) return -1; @@ -940,10 +980,9 @@ int write_stream(void *ss, int stream, uchar *p, i64 len) p += n; len -= n; - if (sinfo->s[stream].buflen == sinfo->bufsize) { - if (unlikely(flush_buffer(sinfo, stream))) - return -1; - } + /* Flush the buffer every sinfo->bufsize into one thread */ + if (sinfo->s[stream].buflen == sinfo->bufsize) + flush_buffer(sinfo, stream); } return 0; } @@ -985,21 +1024,24 @@ int close_stream_out(void *ss) struct stream_info *sinfo = ss; int i; - /* reallocate buffers to try and save space */ for (i = 0; i < sinfo->num_streams; i++) { - if (sinfo->s[i].buflen) { - if (unlikely(!realloc(sinfo->s[i].buf, sinfo->s[i].buflen))) - fatal("Error Reallocating Output Buffer %d\n", i); - } + if (sinfo->s[i].buflen) + flush_buffer(sinfo, i); } - for (i = 0; i < sinfo->num_streams; i++) { - if (unlikely(sinfo->s[i].buflen && flush_buffer(sinfo, i))) - return -1; - if (sinfo->s[i].buf) - free(sinfo->s[i].buf); + + for (i = 0; i < control.threads; i++) { + wait_sem(&cthread[i].free); + destroy_sem(&cthread[i].complete); + destroy_sem(&cthread[i].free); } + + for (i = 0; i < sinfo->num_streams; i++) + free(sinfo->s[i].buf); + + free(cthread); free(sinfo->s); free(sinfo); + return 0; } @@ -1012,10 +1054,8 @@ int close_stream_in(void *ss) if (unlikely(lseek(sinfo->fd, sinfo->initial_pos + sinfo->total_read, SEEK_SET) != sinfo->initial_pos + sinfo->total_read)) return -1; - for (i = 0; i < sinfo->num_streams; i++) { - if (sinfo->s[i].buf) - free(sinfo->s[i].buf); - } + for (i = 0; i < sinfo->num_streams; i++) + free(sinfo->s[i].buf); free(sinfo->s); free(sinfo); @@ -1052,7 +1092,7 @@ static int lzo_compresses(uchar *s_buf, i64 s_len) if (unlikely(!c_buf)) fatal("Unable to allocate c_buf in lzo_compresses\n"); - print_progress("\tlzo testing for incompressible data..."); + print_progress("lzo testing for incompressible data...\n"); /* Test progressively larger blocks at a time and as soon as anything compressible is found, jump out as a success */ @@ -1081,9 +1121,7 @@ static int lzo_compresses(uchar *s_buf, i64 s_len) (ret == 0? "FAILED - below threshold" : "OK"), save_len, 100 * ((double) best_dlen / (double) in_len), workcounter); else if (VERBOSE) - print_output("%s\r", (ret == 0? "FAILED - below threshold" : "OK")); - else - print_progress("\r\t \r"); + print_output("%s\n", (ret == 0? "FAILED - below threshold" : "OK")); free(wrkmem); free(c_buf); diff --git a/zpipe.cpp b/zpipe.cpp index 1663972..cb47d1e 100644 --- a/zpipe.cpp +++ b/zpipe.cpp @@ -1738,7 +1738,7 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, i len++; pct = (len * 100 / buf_len); if (pct != last_pct) { - fprintf(msgout, "\r\tZPAQ Chunk %i of 2 compress: %i%% \r", (chunk + 1), pct); + fprintf(msgout, "\r\t\t\tZPAQ Chunk %i of 2 compress: %i%% \r", (chunk + 1), pct); fflush(msgout); last_pct = pct; }