diff --git a/stream.c b/stream.c index 03961bb..51cba11 100644 --- a/stream.c +++ b/stream.c @@ -165,21 +165,26 @@ static inline int fake_open_memstream_update_buffer(FILE *fp, uchar **buf, size_ length in c_len */ -static void zpaq_compress_buf(struct compress_thread *cthread, long thread) +static int zpaq_compress_buf(struct compress_thread *cthread, long thread) { uchar *c_buf = NULL; size_t dlen = 0; FILE *in, *out; if (!lzo_compresses(cthread->s_buf, cthread->s_len)) - return; + return 0; in = fmemopen(cthread->s_buf, cthread->s_len, "r"); - if (unlikely(!in)) - fatal("Failed to fmemopen in zpaq_compress_buf\n"); + if (unlikely(!in)) { + print_maxverbose("Failed to fmemopen in zpaq_compress_buf\n"); + return -1; + } out = open_memstream((char **)&c_buf, &dlen); - if (unlikely(!out)) - fatal("Failed to open_memstream in zpaq_compress_buf\n"); + if (unlikely(!out)) { + fclose(in); + print_maxverbose("Failed to open_memstream in zpaq_compress_buf\n"); + return -1; + } zpipe_compress(in, out, control.msgout, cthread->s_len, (int)(SHOW_PROGRESS), thread); @@ -194,104 +199,116 @@ static void zpaq_compress_buf(struct compress_thread *cthread, long thread) print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ free(c_buf); - return; + return 0; } cthread->c_len = dlen; free(cthread->s_buf); cthread->s_buf = c_buf; cthread->c_type = CTYPE_ZPAQ; + return 0; } -static void bzip2_compress_buf(struct compress_thread *cthread) +static int bzip2_compress_buf(struct compress_thread *cthread) { u32 dlen = cthread->s_len; uchar *c_buf; if (!lzo_compresses(cthread->s_buf, cthread->s_len)) - return; + return 0; c_buf = malloc(dlen); - if (!c_buf) - return; + if (!c_buf) { + print_maxverbose("Unable to allocate c_buf\n"); + return -1; + } if (BZ2_bzBuffToBuffCompress((char *)c_buf, &dlen, (char *)cthread->s_buf, cthread->s_len, control.compression_level, 0, control.compression_level * 10) != BZ_OK) { free(c_buf); - return; + print_maxverbose("BZ2 compress failed\n"); + return -1; } - if (dlen >= cthread->c_len) { + if (unlikely(dlen >= cthread->c_len)) { print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ free(c_buf); - return; + return 0; } cthread->c_len = dlen; free(cthread->s_buf); cthread->s_buf = c_buf; cthread->c_type = CTYPE_BZIP2; + return 0; } -static void gzip_compress_buf(struct compress_thread *cthread) +static int gzip_compress_buf(struct compress_thread *cthread) { unsigned long dlen = cthread->s_len; uchar *c_buf; c_buf = malloc(dlen); - if (!c_buf) - return; + if (!c_buf) { + print_maxverbose("Unable to allocate c_buf\n"); + return -1; + } if (compress2(c_buf, &dlen, cthread->s_buf, cthread->s_len, control.compression_level) != Z_OK) { free(c_buf); - return; + print_maxverbose("compress2 failed\n"); + return -1; } if ((i64)dlen >= cthread->c_len) { print_maxverbose("Incompressible block\n"); /* Incompressible, leave as CTYPE_NONE */ free(c_buf); - return; + return 0; } cthread->c_len = dlen; free(cthread->s_buf); cthread->s_buf = c_buf; cthread->c_type = CTYPE_GZIP; + return 0; } -static void lzma_compress_buf(struct compress_thread *cthread) +static int lzma_compress_buf(struct compress_thread *cthread) { + int lzma_level, lzma_ret; size_t prop_size = 5; /* return value for lzma_properties */ uchar *c_buf; size_t dlen; - int lzma_ret; if (!lzo_compresses(cthread->s_buf, cthread->s_len)) - return; + return 0; + /* only 7 levels with lzma, scale them */ + lzma_level = control.compression_level * 7 / 9 ? : 1; + + print_verbose("Starting lzma back end compression thread...\n"); +retry: dlen = cthread->s_len; c_buf = malloc(dlen); if (!c_buf) - return; + return -1; - print_verbose("Starting lzma back end compression thread...\n"); /* with LZMA SDK 4.63, we pass compression level and threads only * and receive properties in control->lzma_properties */ lzma_ret = LzmaCompress(c_buf, &dlen, cthread->s_buf, (size_t)cthread->s_len, control.lzma_properties, &prop_size, - control.compression_level * 7 / 9 ? : 1, /* only 7 levels with lzma, scale them */ + lzma_level, 0, /* dict size. set default, choose by level */ -1, -1, -1, -1, /* lc, lp, pb, fb */ control.threads); if (lzma_ret != SZ_OK) { switch (lzma_ret) { case SZ_ERROR_MEM: - print_verbose("LZMA ERROR: %d. Can't allocate enough RAM for compression window.\n", SZ_ERROR_MEM); break; case SZ_ERROR_PARAM: print_err("LZMA Parameter ERROR: %d. This should not happen.\n", SZ_ERROR_PARAM); @@ -309,44 +326,55 @@ static void lzma_compress_buf(struct compress_thread *cthread) /* can pass -1 if not compressible! Thanks Lasse Collin */ free(c_buf); if (lzma_ret == SZ_ERROR_MEM) { + if (lzma_level > 1) { + lzma_level--; + print_verbose("LZMA Warning: %d. Can't allocate enough RAM for compression window, trying smaller.\n", SZ_ERROR_MEM); + goto retry; + } /* lzma compress can be fragile on 32 bit. If it fails, * fall back to bzip2 compression so the block doesn't * remain uncompressed */ - print_verbose("Falling back to bzip2 compression.\n"); - bzip2_compress_buf(cthread); + print_verbose("Unable to allocate enough RAM for any sized compression window, falling back to bzip2 compression.\n"); + return bzip2_compress_buf(cthread); } - return; + return -1; } - if ((i64)dlen >= cthread->c_len) { + + if (unlikely((i64)dlen >= cthread->c_len)) { /* Incompressible, leave as CTYPE_NONE */ print_maxverbose("Incompressible block\n"); free(c_buf); - return; + return 0; } cthread->c_len = dlen; free(cthread->s_buf); cthread->s_buf = c_buf; cthread->c_type = CTYPE_LZMA; + return 0; } -static void lzo_compress_buf(struct compress_thread *cthread) +static int lzo_compress_buf(struct compress_thread *cthread) { lzo_uint in_len = cthread->s_len; lzo_uint dlen = in_len + in_len / 16 + 64 + 3; lzo_int return_var; /* lzo1x_1_compress does not return anything but LZO_OK */ lzo_bytep wrkmem; uchar *c_buf; + int ret = -1; wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS); - if (wrkmem == NULL) - return; + if (unlikely(wrkmem == NULL)) { + print_maxverbose("Failed to malloc wkmem\n"); + return ret; + } c_buf = malloc(dlen); if (!c_buf) goto out_free; return_var = lzo1x_1_compress(cthread->s_buf, in_len, c_buf, &dlen, wrkmem); + ret = 0; if (dlen >= in_len){ /* Incompressible, leave as CTYPE_NONE */ @@ -361,6 +389,7 @@ static void lzo_compress_buf(struct compress_thread *cthread) cthread->c_type = CTYPE_LZO; out_free: free(wrkmem); + return ret; } /* @@ -874,6 +903,7 @@ static void *compthread(void *t) long i = (long)t; struct compress_thread *cti = &cthread[i]; struct stream_info *ctis = cti->sinfo; + int waited = 0, ret = 0; if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) print_err("Warning, unable to set nice value on thread\n"); @@ -881,22 +911,35 @@ static void *compthread(void *t) cti->c_type = CTYPE_NONE; cti->c_len = cti->s_len; +retry: if (!NO_COMPRESS && cti->c_len) { if (LZMA_COMPRESS) - lzma_compress_buf(cti); + ret = lzma_compress_buf(cti); else if (LZO_COMPRESS) - lzo_compress_buf(cti); + ret = lzo_compress_buf(cti); else if (BZIP2_COMPRESS) - bzip2_compress_buf(cti); + ret = bzip2_compress_buf(cti); else if (ZLIB_COMPRESS) - gzip_compress_buf(cti); + ret = gzip_compress_buf(cti); else if (ZPAQ_COMPRESS) - zpaq_compress_buf(cti, i); + ret = zpaq_compress_buf(cti, i); else fatal("Dunno wtf compression to use!\n"); } + /* If compression fails for whatever reason multithreaded, then wait + * for the previous thread to finish, serialising the work to decrease + * the memory requirements, increasing the chance of success */ + if (ret){ + if (waited) + fatal("Failed to compress in compthread\n"); + print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n"); + } + if (control.threads > 1) wait_sem(&cthread[cti->wait_on].complete); + waited = 1; + if (ret) + goto retry; if (!ctis->chunks++) { int j; @@ -988,30 +1031,49 @@ static void *ucompthread(void *t) { long i = (long)t; struct uncomp_thread *uci = &ucthread[i]; + int waited = 0, ret = 0; if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) print_err("Warning, unable to set nice value on thread\n"); +retry: if (uci->c_type != CTYPE_NONE) { - if (uci->c_type == CTYPE_LZMA) { - if (unlikely(lzma_decompress_buf(uci))) - fatal("Failed to lzma_decompress_buf in ucompthread\n"); - } else if (uci->c_type == CTYPE_LZO) { - if (unlikely(lzo_decompress_buf(uci))) - fatal("Failed to lzo_decompress_buf in ucompthread\n"); - } else if (uci->c_type == CTYPE_BZIP2) { - if (unlikely(bzip2_decompress_buf(uci))) - fatal("Failed to bzip2_decompress_buf in ucompthread\n"); - } else if (uci->c_type == CTYPE_GZIP) { - if (unlikely(gzip_decompress_buf(uci))) - fatal("Failed to gzip_decompress_buf in ucompthread\n"); - } else if (uci->c_type == CTYPE_ZPAQ) { - if (unlikely(zpaq_decompress_buf(uci, i))) - fatal("Failed to zpaq_decompress_buf in ucompthread\n"); - } else fatal("Dunno wtf decompression type to use!\n"); + switch (uci->c_type) { + case CTYPE_LZMA: + ret = lzma_decompress_buf(uci); + break; + case CTYPE_LZO: + ret = lzo_decompress_buf(uci); + break; + case CTYPE_BZIP2: + ret = bzip2_decompress_buf(uci); + break; + case CTYPE_GZIP: + ret = gzip_decompress_buf(uci); + break; + case CTYPE_ZPAQ: + ret = zpaq_decompress_buf(uci, i); + break; + default: + fatal("Dunno wtf decompression type to use!\n"); + break; + } } + + /* As per compression, serialise the decompression if it fails in + * parallel */ + if (ret) { + if (waited) + fatal("Failed to decompress in ucompthread\n"); + print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n"); + } + post_sem(&uci->complete); wait_sem(&uci->ready); + waited = 1; + if (ret) + goto retry; + print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream); post_sem(&uci->free);