Implement multithreaded back end compression by splitting up the compression stream into multiple threads, dependant on the number of CPUs detected.

This facilitates a massive speed up on SMP machines proportional to the number of CPUs during the back end compression phase, but does so at some cost to the final size.
Limit the number of threads to ensure that each thread at least works on a window of STREAM_BUFSIZE.
Disable the lzma threading library as it does not contribute any more to the scalability of this new approach, yet compromises compression.
Increase the size of the windows passed to all the compression back end types now as they need more to split them up into multiple threads, and the number of blocks increases the compressed size slightly.
This commit is contained in:
Con Kolivas 2010-11-10 20:56:17 +11:00
parent b469e7b56c
commit 5505097b2f
3 changed files with 218 additions and 134 deletions

5
main.c
View file

@ -334,7 +334,8 @@ static void decompress_file(void)
static void get_fileinfo(void)
{
int fd_in, ctype = 0;
int fd_in;
uchar ctype = 0;
long double cratio;
i64 expected_size;
i64 infile_size;
@ -704,7 +705,7 @@ int main(int argc, char *argv[])
print_verbose("The following options are in effect for this %s.\n",
DECOMPRESS ? "DECOMPRESSION" : "COMPRESSION");
if (LZMA_COMPRESS)
print_verbose("Threading is %s. Number of CPUs detected: %lu\n", control.threads > 1? "ENABLED" : "DISABLED",
print_verbose("Threading is %s. Number of CPUs detected: %d\n", control.threads > 1? "ENABLED" : "DISABLED",
control.threads);
print_verbose("Detected %lld bytes ram\n", control.ramsize);
print_verbose("Nice Value: %d\n", control.nice_val);

4
rzip.h
View file

@ -35,6 +35,8 @@
#include <signal.h>
#include <bzlib.h>
#include <zlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/resource.h>
#include <netinet/in.h>
@ -233,7 +235,7 @@ struct rzip_control {
unsigned long long window;
unsigned long flags;
unsigned long long ramsize;
unsigned long threads;
int threads;
int nice_val; // added for consistency
int major_version;
int minor_version;

323
stream.c
View file

@ -23,11 +23,66 @@
#define STREAM_BUFSIZE (1024 * 1024 * 10)
struct compress_thread{
uchar *s_buf; /* Uncompressed buffer */
uchar *c_buf; /* Compressed buffer */
uchar c_type; /* Compression type */
i64 s_len; /* Data length uncompressed */
i64 c_len; /* Data length compressed */
sem_t complete; /* Signal when this thread has finished */
struct stream *s;
} *cthread;
void init_sem(sem_t *sem)
{
if (unlikely(sem_init(sem, 0, 0)))
fatal("sem_init\n");
}
static inline void post_sem(sem_t *s)
{
retry:
if (unlikely((sem_post(s)) == -1)) {
if (errno == EINTR)
goto retry;
fatal("sem_post failed");
}
}
static inline void wait_sem(sem_t *s)
{
retry:
if (unlikely((sem_wait(s)) == -1)) {
if (errno == EINTR)
goto retry;
fatal("sem_wait failed");
}
}
static inline void destroy_sem(sem_t *s)
{
if (unlikely(sem_destroy(s)))
fatal("sem_destroy failed\n");
}
void create_pthread(pthread_t * thread, pthread_attr_t * attr,
void * (*start_routine)(void *), void *arg)
{
if (pthread_create(thread, attr, start_routine, arg))
fatal("pthread_create");
}
void join_pthread(pthread_t th, void **thread_return)
{
if (pthread_join(th, thread_return))
fatal("pthread_join");
}
/* just to keep things clean, declare function here
* but move body to the end since it's a work function
*/
static int lzo_compresses(struct stream *s);
static int lzo_compresses(uchar *s_buf, i64 s_len);
static inline FILE *fake_fmemopen(void *buf, size_t buflen, char *mode)
{
@ -84,122 +139,105 @@ static inline int fake_open_memstream_update_buffer(FILE *fp, uchar **buf, size_
length in c_len
*/
static void zpaq_compress_buf(struct stream *s, int *c_type, i64 *c_len)
static void zpaq_compress_buf(struct compress_thread *cthread)
{
uchar *c_buf = NULL;
size_t dlen = 0;
FILE *in, *out;
if (!lzo_compresses(s))
if (!lzo_compresses(cthread->s_buf, cthread->s_len))
return;
in = fmemopen(s->buf, s->buflen, "r");
free(cthread->c_buf); /* Will be reallocated in memopen */
in = fmemopen(cthread->s_buf, cthread->s_len, "r");
if (unlikely(!in))
fatal("Failed to fmemopen in zpaq_compress_buf\n");
out = open_memstream((char **)&c_buf, &dlen);
out = open_memstream((char **)&cthread->c_buf, &dlen);
if (unlikely(!out))
fatal("Failed to open_memstream in zpaq_compress_buf\n");
zpipe_compress(in, out, control.msgout, s->buflen, (int)(SHOW_PROGRESS));
zpipe_compress(in, out, control.msgout, cthread->s_len, (int)(SHOW_PROGRESS));
if (unlikely(memstream_update_buffer(out, &c_buf, &dlen)))
if (unlikely(memstream_update_buffer(out, &cthread->c_buf, &dlen)))
fatal("Failed to memstream_update_buffer in zpaq_compress_buf");
fclose(in);
fclose(out);
if ((i64)dlen >= *c_len) {
if ((i64)dlen >= cthread->c_len) {
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
}
*c_len = dlen;
free(s->buf);
s->buf = c_buf;
*c_type = CTYPE_ZPAQ;
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
cthread->c_type = CTYPE_ZPAQ;
}
static void bzip2_compress_buf(struct stream *s, int *c_type, i64 *c_len)
static void bzip2_compress_buf(struct compress_thread *cthread)
{
u32 dlen = s->buflen;
uchar *c_buf;
u32 dlen = cthread->s_len;
if (!lzo_compresses(s))
if (!lzo_compresses(cthread->s_buf, cthread->s_len))
return;
c_buf = malloc(dlen);
if (!c_buf)
return;
if (BZ2_bzBuffToBuffCompress((char*)c_buf, &dlen, (char*)s->buf, s->buflen,
control.compression_level, 0,
control.compression_level * 10) != BZ_OK) {
free(c_buf);
if (BZ2_bzBuffToBuffCompress((char *)cthread->c_buf, &dlen,
(char *)cthread->s_buf, cthread->s_len,
control.compression_level, 0, control.compression_level * 10) != BZ_OK) {
return;
}
if (dlen >= *c_len) {
if (dlen >= cthread->c_len) {
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
}
*c_len = dlen;
free(s->buf);
s->buf = c_buf;
*c_type = CTYPE_BZIP2;
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
cthread->c_type = CTYPE_BZIP2;
}
static void gzip_compress_buf(struct stream *s, int *c_type, i64 *c_len)
static void gzip_compress_buf(struct compress_thread *cthread)
{
unsigned long dlen = s->buflen;
uchar *c_buf;
unsigned long dlen = cthread->s_len;
c_buf = malloc(dlen);
if (!c_buf)
return;
if (compress2(c_buf, &dlen, s->buf, s->buflen, control.compression_level) != Z_OK) {
free(c_buf);
if (compress2(cthread->c_buf, &dlen, cthread->s_buf, cthread->s_len,
control.compression_level) != Z_OK) {
return;
}
if ((i64)dlen >= *c_len) {
if ((i64)dlen >= cthread->c_len) {
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
}
*c_len = dlen;
free(s->buf);
s->buf = c_buf;
*c_type = CTYPE_GZIP;
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
cthread->c_type = CTYPE_GZIP;
}
static void lzma_compress_buf(struct stream *s, int *c_type, i64 *c_len)
static void lzma_compress_buf(struct compress_thread *cthread)
{
size_t prop_size = 5; /* return value for lzma_properties */
uchar *c_buf;
size_t dlen;
int lzma_ret;
if (!lzo_compresses(s))
goto out;
dlen = s->buflen;
c_buf = malloc(dlen);
if (!c_buf)
if (!lzo_compresses(cthread->s_buf, cthread->s_len))
return;
print_progress("\tProgress percentage pausing during lzma compression...");
dlen = cthread->s_len;
print_progress("\r\tProgress percentage pausing during lzma compression...");
/* with LZMA SDK 4.63, we pass compression level and threads only
* and receive properties in control->lzma_properties */
lzma_ret = LzmaCompress(c_buf, &dlen, s->buf, (size_t)s->buflen, control.lzma_properties, &prop_size, control.compression_level,
lzma_ret = LzmaCompress(cthread->c_buf, &dlen, cthread->s_buf,
(size_t)cthread->c_len, control.lzma_properties, &prop_size, control.compression_level,
0, /* dict size. set default */
-1, -1, -1, -1, /* lc, lp, pb, fb */
control.threads);
0); /* Threads - no threading since we do it globally */
if (lzma_ret != SZ_OK) {
switch (lzma_ret) {
case SZ_ERROR_MEM:
@ -209,7 +247,7 @@ static void lzma_compress_buf(struct stream *s, int *c_type, i64 *c_len)
print_err("\nLZMA Parameter ERROR: %d. This should not happen.\n", SZ_ERROR_PARAM);
break;
case SZ_ERROR_OUTPUT_EOF:
print_err("\nHarmless LZMA Output Buffer Overflow error: %d. Incompressible block.\n", SZ_ERROR_OUTPUT_EOF);
print_maxverbose("\nHarmless LZMA Output Buffer Overflow error: %d. Incompressible block.\n", SZ_ERROR_OUTPUT_EOF);
break;
case SZ_ERROR_THREAD:
print_err("\nLZMA Multi Thread ERROR: %d. This should not happen.\n", SZ_ERROR_THREAD);
@ -219,19 +257,18 @@ static void lzma_compress_buf(struct stream *s, int *c_type, i64 *c_len)
break;
}
/* can pass -1 if not compressible! Thanks Lasse Collin */
free(c_buf);
print_maxverbose("Incompressible block\n");
goto out;
}
if ((i64)dlen >= *c_len) {
if ((i64)dlen >= cthread->c_len) {
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
print_maxverbose("Incompressible block\n");
goto out;
}
*c_len = dlen;
free(s->buf);
s->buf = c_buf;
*c_type = CTYPE_LZMA;
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
cthread->c_type = CTYPE_LZMA;
out:
if (MAX_VERBOSE)
print_output("\n");
@ -239,35 +276,29 @@ out:
print_progress("\r\t \r");
}
static void lzo_compress_buf(struct stream *s, int *c_type, i64 *c_len)
static void lzo_compress_buf(struct compress_thread *cthread)
{
lzo_uint in_len = s->buflen;
lzo_uint dlen = in_len + in_len / 16 + 64 + 3;
lzo_uint in_len = cthread->s_len;
lzo_uint dlen = in_len;
lzo_int return_var; /* lzo1x_1_compress does not return anything but LZO_OK */
lzo_bytep wrkmem;
uchar *c_buf;
wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS);
if (wrkmem == NULL)
return;
c_buf = malloc(dlen);
if (!c_buf)
goto out_free;
return_var = lzo1x_1_compress((uchar *)s->buf, in_len, (uchar *)c_buf,
&dlen,wrkmem);
return_var = lzo1x_1_compress(cthread->s_buf, in_len, cthread->c_buf,
&dlen, wrkmem);
if (dlen >= in_len){
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
print_maxverbose("Incompressible block\n");
goto out_free;
}
*c_len = dlen;
free(s->buf);
s->buf = c_buf;
*c_type = CTYPE_LZO;
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
cthread->c_type = CTYPE_LZO;
out_free:
free(wrkmem);
}
@ -578,27 +609,18 @@ void *open_stream_out(int f, int n, i64 limit)
sinfo->cur_pos = 0;
sinfo->fd = f;
/* 10MB streams for non lzma compress. There is virtually no gain
in lzo, gzip and bzip2 with larger streams. With lzma and zpaq,
however, the larger the buffer, the better the compression so we
make it as large as the window up to the limit the compressor
will take */
if (BITS32) {
/* Largest window supported on 32bit is 600MB */
if (!cwindow || cwindow > 6)
cwindow = 6;
}
if (LZMA_COMPRESS || ZPAQ_COMPRESS)
sinfo->bufsize = STREAM_BUFSIZE * 10 * cwindow;
else
sinfo->bufsize = STREAM_BUFSIZE;
/* No point making the stream larger than the amount of data */
if (sinfo->bufsize)
sinfo->bufsize = MIN(sinfo->bufsize, limit);
if (cwindow)
sinfo->bufsize = MIN(STREAM_BUFSIZE * 10 * cwindow, limit);
else
sinfo->bufsize = limit;
sinfo->initial_pos = lseek(f, 0, SEEK_CUR);
sinfo->s = (struct stream *)calloc(sizeof(sinfo->s[0]), n);
@ -721,14 +743,77 @@ failed:
return NULL;
}
void *compthread(void *t)
{
long i = (long)t;
/* Must allocate extra for lzo's sake */
cthread[i].c_buf = malloc(cthread[i].c_len + cthread[i].c_len / 16 + 64 + 3);
if (unlikely(cthread[i].c_len && !cthread[i].c_buf))
fatal("Failed to malloc c_buf in compthread\n");
cthread[i].c_type = CTYPE_NONE;
if (!NO_COMPRESS && cthread[i].c_len) {
if (LZMA_COMPRESS)
lzma_compress_buf(&cthread[i]);
else if (LZO_COMPRESS)
lzo_compress_buf(&cthread[i]);
else if (BZIP2_COMPRESS)
bzip2_compress_buf(&cthread[i]);
else if (ZLIB_COMPRESS)
gzip_compress_buf(&cthread[i]);
else if (ZPAQ_COMPRESS)
zpaq_compress_buf(&cthread[i]);
else fatal("Dunno wtf compression to use!\n");
}
post_sem(&cthread[i].complete);
return 0;
}
/* flush out any data in a stream buffer. Return -1 on failure */
int flush_buffer(struct stream_info *sinfo, int stream)
{
i64 c_len = sinfo->s[stream].buflen;
int c_type = CTYPE_NONE;
i64 bufsize, buflen = sinfo->s[stream].buflen, offset = 0;
pthread_t threads[control.threads];
int chunks = 1;
long i;
bufsize = buflen;
/* Increase the number of threads but don't let each buffer get
* smaller than STREAM_BUFSIZE as the backend compression efficency
* significantly drops off.
*/
while (chunks < control.threads && bufsize > STREAM_BUFSIZE) {
chunks++;
bufsize = buflen / chunks + 1;
}
cthread = calloc(sizeof(struct compress_thread), chunks);
if (unlikely(!cthread))
fatal("Unable to calloc cthread in flush_buffer\n");
for (i = 0; i < chunks; i++) {
init_sem(&cthread[i].complete);
if (offset + bufsize > buflen)
bufsize = buflen - offset;
cthread[i].s_buf = sinfo->s[stream].buf + offset;
cthread[i].s_len = cthread[i].c_len = bufsize;
cthread[i].s = sinfo->s;
print_maxverbose("\rStarting thread %lu to compress %lld bytes at offset %lld\n", i, bufsize, offset);
create_pthread(&threads[i], NULL, compthread, (void *)i);
offset += bufsize;
}
for (i = 0; i < chunks; i++) {
if (unlikely(seekto(sinfo, sinfo->s[stream].last_head)))
return -1;
if (unlikely(write_i64(sinfo->fd, sinfo->cur_pos)))
return -1;
@ -736,38 +821,34 @@ int flush_buffer(struct stream_info *sinfo, int stream)
if (unlikely(seekto(sinfo, sinfo->cur_pos)))
return -1;
if (!(NO_COMPRESS)) {
if (LZMA_COMPRESS)
lzma_compress_buf(&sinfo->s[stream], &c_type, &c_len);
else if (LZO_COMPRESS)
lzo_compress_buf(&sinfo->s[stream], &c_type, &c_len);
else if (BZIP2_COMPRESS)
bzip2_compress_buf(&sinfo->s[stream], &c_type, &c_len);
else if (ZLIB_COMPRESS)
gzip_compress_buf(&sinfo->s[stream], &c_type, &c_len);
else if (ZPAQ_COMPRESS)
zpaq_compress_buf(&sinfo->s[stream], &c_type, &c_len);
else fatal("Dunno wtf compression to use!\n");
}
wait_sem(&cthread[i].complete);
if (unlikely(write_u8(sinfo->fd, c_type) ||
write_i64(sinfo->fd, c_len) ||
write_i64(sinfo->fd, sinfo->s[stream].buflen) ||
print_maxverbose("Reading thread %ld to write %lld compressed bytes\n", i, cthread[i].c_len);
if (unlikely(write_u8(sinfo->fd, cthread[i].c_type) ||
write_i64(sinfo->fd, cthread[i].c_len) ||
write_i64(sinfo->fd, cthread[i].s_len) ||
write_i64(sinfo->fd, 0))) {
return -1;
}
sinfo->cur_pos += 25;
if (unlikely(write_buf(sinfo->fd, sinfo->s[stream].buf, c_len)))
if (unlikely(write_buf(sinfo->fd, cthread[i].s_buf, cthread[i].c_len)))
return -1;
fsync(sinfo->fd);
sinfo->cur_pos += c_len;
sinfo->cur_pos += cthread[i].c_len;
destroy_sem(&cthread[i].complete);
free(cthread[i].c_buf);
}
free(cthread);
sinfo->s[stream].buflen = 0;
sinfo->s[stream].buf = realloc(sinfo->s[stream].buf, sinfo->bufsize);
if (unlikely(!sinfo->s[stream].buf))
fatal("Failed to realloc in flush_buffer\n");
return 0;
}
@ -812,7 +893,7 @@ static int fill_buffer(struct stream_info *sinfo, int stream)
sinfo->s[stream].buf = realloc(sinfo->s[stream].buf, u_len);
else
sinfo->s[stream].buf = malloc(u_len);
if (unlikely(!sinfo->s[stream].buf))
if (unlikely(u_len && !sinfo->s[stream].buf))
fatal("Unable to malloc buffer of size %lld in fill_buffer\n", u_len);
if (unlikely(read_buf(sinfo->fd, sinfo->s[stream].buf, c_len)))
return -1;
@ -854,7 +935,7 @@ int write_stream(void *ss, int stream, uchar *p, i64 len)
n = MIN(sinfo->bufsize - sinfo->s[stream].buflen, len);
memcpy(sinfo->s[stream].buf+sinfo->s[stream].buflen, p, n);
memcpy(sinfo->s[stream].buf + sinfo->s[stream].buflen, p, n);
sinfo->s[stream].buflen += n;
p += n;
len -= n;
@ -880,7 +961,7 @@ i64 read_stream(void *ss, int stream, uchar *p, i64 len)
n = MIN(sinfo->s[stream].buflen-sinfo->s[stream].bufp, len);
if (n > 0) {
memcpy(p, sinfo->s[stream].buf+sinfo->s[stream].bufp, n);
memcpy(p, sinfo->s[stream].buf + sinfo->s[stream].bufp, n);
sinfo->s[stream].bufp += n;
p += n;
len -= n;
@ -945,13 +1026,13 @@ int close_stream_in(void *ss)
to see if there is any compression at all with lzo first. It is unlikely
that others will be able to compress if lzo is unable to drop a single byte
so do not compress any block that is incompressible by lzo. */
static int lzo_compresses(struct stream *s)
static int lzo_compresses(uchar *s_buf, i64 s_len)
{
lzo_bytep wrkmem=NULL;
lzo_uint in_len, test_len = s->buflen, save_len = s->buflen;
lzo_bytep wrkmem = NULL;
lzo_uint in_len, test_len = s_len, save_len = s_len;
lzo_uint dlen;
lzo_int return_var; /* lzo1x_1_compress does not return anything but LZO_OK */
uchar *c_buf = NULL, *test_buf = s->buf;
uchar *c_buf = NULL, *test_buf = s_buf;
/* set minimum buffer test size based on the length of the test stream */
unsigned long buftest_size = (test_len > 5 * STREAM_BUFSIZE ? STREAM_BUFSIZE : STREAM_BUFSIZE / 4096);
int ret = 0;