Cope with multithreaded memory failures better.

Instead of failing completely, detect when a failure has occurred, and serialise work for that thread to decrease the memory required to complete compression / decompression.
Do that by waiting for the thread before it to complete before trying to work on that block again.
Check internally when lzma compress has failed and try a lower compression level before falling back to bzip2 compression.
This commit is contained in:
Con Kolivas 2010-12-11 13:19:34 +11:00
parent 4331ca4058
commit aeeeedcab2

172
stream.c
View file

@ -165,21 +165,26 @@ static inline int fake_open_memstream_update_buffer(FILE *fp, uchar **buf, size_
length in c_len
*/
static void zpaq_compress_buf(struct compress_thread *cthread, long thread)
static int zpaq_compress_buf(struct compress_thread *cthread, long thread)
{
uchar *c_buf = NULL;
size_t dlen = 0;
FILE *in, *out;
if (!lzo_compresses(cthread->s_buf, cthread->s_len))
return;
return 0;
in = fmemopen(cthread->s_buf, cthread->s_len, "r");
if (unlikely(!in))
fatal("Failed to fmemopen in zpaq_compress_buf\n");
if (unlikely(!in)) {
print_maxverbose("Failed to fmemopen in zpaq_compress_buf\n");
return -1;
}
out = open_memstream((char **)&c_buf, &dlen);
if (unlikely(!out))
fatal("Failed to open_memstream in zpaq_compress_buf\n");
if (unlikely(!out)) {
fclose(in);
print_maxverbose("Failed to open_memstream in zpaq_compress_buf\n");
return -1;
}
zpipe_compress(in, out, control.msgout, cthread->s_len,
(int)(SHOW_PROGRESS), thread);
@ -194,104 +199,116 @@ static void zpaq_compress_buf(struct compress_thread *cthread, long thread)
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
return 0;
}
cthread->c_len = dlen;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_ZPAQ;
return 0;
}
static void bzip2_compress_buf(struct compress_thread *cthread)
static int bzip2_compress_buf(struct compress_thread *cthread)
{
u32 dlen = cthread->s_len;
uchar *c_buf;
if (!lzo_compresses(cthread->s_buf, cthread->s_len))
return;
return 0;
c_buf = malloc(dlen);
if (!c_buf)
return;
if (!c_buf) {
print_maxverbose("Unable to allocate c_buf\n");
return -1;
}
if (BZ2_bzBuffToBuffCompress((char *)c_buf, &dlen,
(char *)cthread->s_buf, cthread->s_len,
control.compression_level, 0, control.compression_level * 10) != BZ_OK) {
free(c_buf);
return;
print_maxverbose("BZ2 compress failed\n");
return -1;
}
if (dlen >= cthread->c_len) {
if (unlikely(dlen >= cthread->c_len)) {
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
return 0;
}
cthread->c_len = dlen;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_BZIP2;
return 0;
}
static void gzip_compress_buf(struct compress_thread *cthread)
static int gzip_compress_buf(struct compress_thread *cthread)
{
unsigned long dlen = cthread->s_len;
uchar *c_buf;
c_buf = malloc(dlen);
if (!c_buf)
return;
if (!c_buf) {
print_maxverbose("Unable to allocate c_buf\n");
return -1;
}
if (compress2(c_buf, &dlen, cthread->s_buf, cthread->s_len,
control.compression_level) != Z_OK) {
free(c_buf);
return;
print_maxverbose("compress2 failed\n");
return -1;
}
if ((i64)dlen >= cthread->c_len) {
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
return 0;
}
cthread->c_len = dlen;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_GZIP;
return 0;
}
static void lzma_compress_buf(struct compress_thread *cthread)
static int lzma_compress_buf(struct compress_thread *cthread)
{
int lzma_level, lzma_ret;
size_t prop_size = 5; /* return value for lzma_properties */
uchar *c_buf;
size_t dlen;
int lzma_ret;
if (!lzo_compresses(cthread->s_buf, cthread->s_len))
return;
return 0;
/* only 7 levels with lzma, scale them */
lzma_level = control.compression_level * 7 / 9 ? : 1;
print_verbose("Starting lzma back end compression thread...\n");
retry:
dlen = cthread->s_len;
c_buf = malloc(dlen);
if (!c_buf)
return;
return -1;
print_verbose("Starting lzma back end compression thread...\n");
/* with LZMA SDK 4.63, we pass compression level and threads only
* and receive properties in control->lzma_properties */
lzma_ret = LzmaCompress(c_buf, &dlen, cthread->s_buf,
(size_t)cthread->s_len, control.lzma_properties, &prop_size,
control.compression_level * 7 / 9 ? : 1, /* only 7 levels with lzma, scale them */
lzma_level,
0, /* dict size. set default, choose by level */
-1, -1, -1, -1, /* lc, lp, pb, fb */
control.threads);
if (lzma_ret != SZ_OK) {
switch (lzma_ret) {
case SZ_ERROR_MEM:
print_verbose("LZMA ERROR: %d. Can't allocate enough RAM for compression window.\n", SZ_ERROR_MEM);
break;
case SZ_ERROR_PARAM:
print_err("LZMA Parameter ERROR: %d. This should not happen.\n", SZ_ERROR_PARAM);
@ -309,44 +326,55 @@ static void lzma_compress_buf(struct compress_thread *cthread)
/* can pass -1 if not compressible! Thanks Lasse Collin */
free(c_buf);
if (lzma_ret == SZ_ERROR_MEM) {
if (lzma_level > 1) {
lzma_level--;
print_verbose("LZMA Warning: %d. Can't allocate enough RAM for compression window, trying smaller.\n", SZ_ERROR_MEM);
goto retry;
}
/* lzma compress can be fragile on 32 bit. If it fails,
* fall back to bzip2 compression so the block doesn't
* remain uncompressed */
print_verbose("Falling back to bzip2 compression.\n");
bzip2_compress_buf(cthread);
print_verbose("Unable to allocate enough RAM for any sized compression window, falling back to bzip2 compression.\n");
return bzip2_compress_buf(cthread);
}
return;
return -1;
}
if ((i64)dlen >= cthread->c_len) {
if (unlikely((i64)dlen >= cthread->c_len)) {
/* Incompressible, leave as CTYPE_NONE */
print_maxverbose("Incompressible block\n");
free(c_buf);
return;
return 0;
}
cthread->c_len = dlen;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_LZMA;
return 0;
}
static void lzo_compress_buf(struct compress_thread *cthread)
static int lzo_compress_buf(struct compress_thread *cthread)
{
lzo_uint in_len = cthread->s_len;
lzo_uint dlen = in_len + in_len / 16 + 64 + 3;
lzo_int return_var; /* lzo1x_1_compress does not return anything but LZO_OK */
lzo_bytep wrkmem;
uchar *c_buf;
int ret = -1;
wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS);
if (wrkmem == NULL)
return;
if (unlikely(wrkmem == NULL)) {
print_maxverbose("Failed to malloc wkmem\n");
return ret;
}
c_buf = malloc(dlen);
if (!c_buf)
goto out_free;
return_var = lzo1x_1_compress(cthread->s_buf, in_len, c_buf, &dlen, wrkmem);
ret = 0;
if (dlen >= in_len){
/* Incompressible, leave as CTYPE_NONE */
@ -361,6 +389,7 @@ static void lzo_compress_buf(struct compress_thread *cthread)
cthread->c_type = CTYPE_LZO;
out_free:
free(wrkmem);
return ret;
}
/*
@ -874,6 +903,7 @@ static void *compthread(void *t)
long i = (long)t;
struct compress_thread *cti = &cthread[i];
struct stream_info *ctis = cti->sinfo;
int waited = 0, ret = 0;
if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1))
print_err("Warning, unable to set nice value on thread\n");
@ -881,22 +911,35 @@ static void *compthread(void *t)
cti->c_type = CTYPE_NONE;
cti->c_len = cti->s_len;
retry:
if (!NO_COMPRESS && cti->c_len) {
if (LZMA_COMPRESS)
lzma_compress_buf(cti);
ret = lzma_compress_buf(cti);
else if (LZO_COMPRESS)
lzo_compress_buf(cti);
ret = lzo_compress_buf(cti);
else if (BZIP2_COMPRESS)
bzip2_compress_buf(cti);
ret = bzip2_compress_buf(cti);
else if (ZLIB_COMPRESS)
gzip_compress_buf(cti);
ret = gzip_compress_buf(cti);
else if (ZPAQ_COMPRESS)
zpaq_compress_buf(cti, i);
ret = zpaq_compress_buf(cti, i);
else fatal("Dunno wtf compression to use!\n");
}
/* If compression fails for whatever reason multithreaded, then wait
* for the previous thread to finish, serialising the work to decrease
* the memory requirements, increasing the chance of success */
if (ret){
if (waited)
fatal("Failed to compress in compthread\n");
print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n");
}
if (control.threads > 1)
wait_sem(&cthread[cti->wait_on].complete);
waited = 1;
if (ret)
goto retry;
if (!ctis->chunks++) {
int j;
@ -988,30 +1031,49 @@ static void *ucompthread(void *t)
{
long i = (long)t;
struct uncomp_thread *uci = &ucthread[i];
int waited = 0, ret = 0;
if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1))
print_err("Warning, unable to set nice value on thread\n");
retry:
if (uci->c_type != CTYPE_NONE) {
if (uci->c_type == CTYPE_LZMA) {
if (unlikely(lzma_decompress_buf(uci)))
fatal("Failed to lzma_decompress_buf in ucompthread\n");
} else if (uci->c_type == CTYPE_LZO) {
if (unlikely(lzo_decompress_buf(uci)))
fatal("Failed to lzo_decompress_buf in ucompthread\n");
} else if (uci->c_type == CTYPE_BZIP2) {
if (unlikely(bzip2_decompress_buf(uci)))
fatal("Failed to bzip2_decompress_buf in ucompthread\n");
} else if (uci->c_type == CTYPE_GZIP) {
if (unlikely(gzip_decompress_buf(uci)))
fatal("Failed to gzip_decompress_buf in ucompthread\n");
} else if (uci->c_type == CTYPE_ZPAQ) {
if (unlikely(zpaq_decompress_buf(uci, i)))
fatal("Failed to zpaq_decompress_buf in ucompthread\n");
} else fatal("Dunno wtf decompression type to use!\n");
switch (uci->c_type) {
case CTYPE_LZMA:
ret = lzma_decompress_buf(uci);
break;
case CTYPE_LZO:
ret = lzo_decompress_buf(uci);
break;
case CTYPE_BZIP2:
ret = bzip2_decompress_buf(uci);
break;
case CTYPE_GZIP:
ret = gzip_decompress_buf(uci);
break;
case CTYPE_ZPAQ:
ret = zpaq_decompress_buf(uci, i);
break;
default:
fatal("Dunno wtf decompression type to use!\n");
break;
}
}
/* As per compression, serialise the decompression if it fails in
* parallel */
if (ret) {
if (waited)
fatal("Failed to decompress in ucompthread\n");
print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n");
}
post_sem(&uci->complete);
wait_sem(&uci->ready);
waited = 1;
if (ret)
goto retry;
print_maxverbose("Thread %ld returning %lld uncompressed bytes from stream %d\n", i, uci->u_len, uci->stream);
post_sem(&uci->free);