Reworked the multithreading massively.

Place the data from each stream into a buffer that then is handed over to one thread which is allowed to begin doing the backend compression while the main rzip stream continues operating.
Fork up to as many threads as CPUs and feed data to them in a ring fashion, parallelising the workload as much as possible.
This causes a big speed up on the compression side on SMP machines.
Thread compression is limited to a minimum of 10MB compressed per thread to minimise the compromise to compression of smaller windows.
Alter the progress output to match some of the changes in verbose modes.
This commit is contained in:
Con Kolivas 2010-11-13 01:26:09 +11:00
parent 5505097b2f
commit 02de002c58
4 changed files with 183 additions and 147 deletions

17
rzip.c
View file

@ -221,16 +221,15 @@ int write_sbstream(void *ss, int stream, i64 p, i64 len)
n = MIN(sinfo->bufsize - sinfo->s[stream].buflen, len);
for (i = 0; i < n; i++) {
memcpy(sinfo->s[stream].buf+sinfo->s[stream].buflen + i,
memcpy(sinfo->s[stream].buf + sinfo->s[stream].buflen + i,
get_sb(p + i), 1);
}
sinfo->s[stream].buflen += n;
p += n;
len -= n;
if (sinfo->s[stream].buflen == sinfo->bufsize) {
if (unlikely(flush_buffer(sinfo, stream)))
return -1;
}
if (sinfo->s[stream].buflen == sinfo->bufsize)
flush_buffer(sinfo, stream);
}
return 0;
}
@ -350,7 +349,7 @@ static tag clean_one_from_hash(struct rzip_state *st)
again:
better_than_min = increase_mask(st->minimum_tag_mask);
if (!st->tag_clean_ptr)
print_maxverbose("\nStarting sweep for mask %u\n", (unsigned int)st->minimum_tag_mask);
print_maxverbose("Starting sweep for mask %u\n", (unsigned int)st->minimum_tag_mask);
for (; st->tag_clean_ptr < (1U << st->hash_bits); st->tag_clean_ptr++) {
if (empty_hash(st, st->tag_clean_ptr))
@ -478,9 +477,9 @@ static void show_distrib(struct rzip_state *st)
}
if (total != st->hash_count)
print_err("/tWARNING: hash_count says total %lld\n", st->hash_count);
print_err("WARNING: hash_count says total %lld\n", st->hash_count);
print_output("\t%lld total hashes -- %lld in primary bucket (%-2.3f%%)\n", total, primary,
print_output("%lld total hashes -- %lld in primary bucket (%-2.3f%%)\n", total, primary,
primary*100.0/total);
}
@ -711,8 +710,6 @@ static void rzip_chunk(struct rzip_state *st, int fd_in, int fd_out, i64 offset,
fatal("Failed to munmap in rzip_chunk\n");
}
if (!NO_COMPRESS)
print_verbose("Performing backend compression phase\n");
if (unlikely(close_stream_out(st->ss)))
fatal("Failed to flush/close streams in rzip_chunk\n");
}

3
rzip.h
View file

@ -258,6 +258,7 @@ struct stream_info {
i64 cur_pos;
i64 initial_pos;
i64 total_read;
long thread_no;
};
void fatal(const char *format, ...);
@ -270,7 +271,7 @@ int write_stream(void *ss, int stream, uchar *p, i64 len);
i64 read_stream(void *ss, int stream, uchar *p, i64 len);
int close_stream_out(void *ss);
int close_stream_in(void *ss);
int flush_buffer(struct stream_info *sinfo, int stream);
void flush_buffer(struct stream_info *sinfo, int stream);
void read_config(struct rzip_control *s);
ssize_t write_1g(int fd, void *buf, i64 len);
ssize_t read_1g(int fd, void *buf, i64 len);

302
stream.c
View file

@ -24,13 +24,15 @@
#define STREAM_BUFSIZE (1024 * 1024 * 10)
struct compress_thread{
uchar *s_buf; /* Uncompressed buffer */
uchar *c_buf; /* Compressed buffer */
uchar *s_buf; /* Uncompressed buffer -> Compressed buffer */
uchar c_type; /* Compression type */
i64 s_len; /* Data length uncompressed */
i64 c_len; /* Data length compressed */
sem_t complete; /* Signal when this thread has finished */
struct stream *s;
sem_t free; /* This thread no longer exists */
int wait_on; /* Which thread has to complete before this can write its data */
struct stream_info *sinfo;
int stream;
} *cthread;
void init_sem(sem_t *sem)
@ -39,7 +41,6 @@ void init_sem(sem_t *sem)
fatal("sem_init\n");
}
static inline void post_sem(sem_t *s)
{
retry:
@ -141,23 +142,23 @@ static inline int fake_open_memstream_update_buffer(FILE *fp, uchar **buf, size_
static void zpaq_compress_buf(struct compress_thread *cthread)
{
uchar *c_buf = NULL;
size_t dlen = 0;
FILE *in, *out;
if (!lzo_compresses(cthread->s_buf, cthread->s_len))
return;
free(cthread->c_buf); /* Will be reallocated in memopen */
in = fmemopen(cthread->s_buf, cthread->s_len, "r");
if (unlikely(!in))
fatal("Failed to fmemopen in zpaq_compress_buf\n");
out = open_memstream((char **)&cthread->c_buf, &dlen);
out = open_memstream((char **)&c_buf, &dlen);
if (unlikely(!out))
fatal("Failed to open_memstream in zpaq_compress_buf\n");
zpipe_compress(in, out, control.msgout, cthread->s_len, (int)(SHOW_PROGRESS));
if (unlikely(memstream_update_buffer(out, &cthread->c_buf, &dlen)))
if (unlikely(memstream_update_buffer(out, &c_buf, &dlen)))
fatal("Failed to memstream_update_buffer in zpaq_compress_buf");
fclose(in);
@ -166,61 +167,80 @@ static void zpaq_compress_buf(struct compress_thread *cthread)
if ((i64)dlen >= cthread->c_len) {
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
}
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_ZPAQ;
}
static void bzip2_compress_buf(struct compress_thread *cthread)
{
u32 dlen = cthread->s_len;
uchar *c_buf;
if (!lzo_compresses(cthread->s_buf, cthread->s_len))
return;
if (BZ2_bzBuffToBuffCompress((char *)cthread->c_buf, &dlen,
c_buf = malloc(dlen);
if (!c_buf)
return;
if (BZ2_bzBuffToBuffCompress((char *)c_buf, &dlen,
(char *)cthread->s_buf, cthread->s_len,
control.compression_level, 0, control.compression_level * 10) != BZ_OK) {
free(c_buf);
return;
}
if (dlen >= cthread->c_len) {
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
}
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_BZIP2;
}
static void gzip_compress_buf(struct compress_thread *cthread)
{
unsigned long dlen = cthread->s_len;
uchar *c_buf;
if (compress2(cthread->c_buf, &dlen, cthread->s_buf, cthread->s_len,
c_buf = malloc(dlen);
if (!c_buf)
return;
if (compress2(c_buf, &dlen, cthread->s_buf, cthread->s_len,
control.compression_level) != Z_OK) {
free(c_buf);
return;
}
if ((i64)dlen >= cthread->c_len) {
print_maxverbose("Incompressible block\n");
/* Incompressible, leave as CTYPE_NONE */
free(c_buf);
return;
}
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_GZIP;
}
static void lzma_compress_buf(struct compress_thread *cthread)
{
size_t prop_size = 5; /* return value for lzma_properties */
uchar *c_buf;
size_t dlen;
int lzma_ret;
@ -228,16 +248,19 @@ static void lzma_compress_buf(struct compress_thread *cthread)
return;
dlen = cthread->s_len;
c_buf = malloc(dlen);
if (!c_buf)
return;
print_progress("\r\tProgress percentage pausing during lzma compression...");
print_progress("Starting lzma back end compression thread...\n");
/* with LZMA SDK 4.63, we pass compression level and threads only
* and receive properties in control->lzma_properties */
lzma_ret = LzmaCompress(cthread->c_buf, &dlen, cthread->s_buf,
(size_t)cthread->c_len, control.lzma_properties, &prop_size, control.compression_level,
lzma_ret = LzmaCompress(c_buf, &dlen, cthread->s_buf,
(size_t)cthread->s_len, control.lzma_properties, &prop_size, control.compression_level,
0, /* dict size. set default */
-1, -1, -1, -1, /* lc, lp, pb, fb */
0); /* Threads - no threading since we do it globally */
control.threads);
if (lzma_ret != SZ_OK) {
switch (lzma_ret) {
case SZ_ERROR_MEM:
@ -258,46 +281,50 @@ static void lzma_compress_buf(struct compress_thread *cthread)
}
/* can pass -1 if not compressible! Thanks Lasse Collin */
print_maxverbose("Incompressible block\n");
goto out;
free(c_buf);
return;
}
if ((i64)dlen >= cthread->c_len) {
/* Incompressible, leave as CTYPE_NONE */
print_maxverbose("Incompressible block\n");
goto out;
free(c_buf);
return;
}
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_LZMA;
out:
if (MAX_VERBOSE)
print_output("\n");
else
print_progress("\r\t \r");
}
static void lzo_compress_buf(struct compress_thread *cthread)
{
lzo_uint in_len = cthread->s_len;
lzo_uint dlen = in_len;
lzo_uint dlen = in_len + in_len / 16 + 64 + 3;
lzo_int return_var; /* lzo1x_1_compress does not return anything but LZO_OK */
lzo_bytep wrkmem;
uchar *c_buf;
wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS);
if (wrkmem == NULL)
return;
return_var = lzo1x_1_compress(cthread->s_buf, in_len, cthread->c_buf,
&dlen, wrkmem);
c_buf = malloc(dlen);
if (!c_buf)
goto out_free;
return_var = lzo1x_1_compress(cthread->s_buf, in_len, c_buf, &dlen, wrkmem);
if (dlen >= in_len){
/* Incompressible, leave as CTYPE_NONE */
print_maxverbose("Incompressible block\n");
free(c_buf);
goto out_free;
}
cthread->c_len = dlen;
cthread->s_buf = cthread->c_buf;
free(cthread->s_buf);
cthread->s_buf = c_buf;
cthread->c_type = CTYPE_LZO;
out_free:
free(wrkmem);
@ -604,7 +631,29 @@ void *open_stream_out(int f, int n, i64 limit)
if (unlikely(!sinfo))
return NULL;
sinfo->bufsize = 0;
sinfo->bufsize = limit;
sinfo->thread_no = 0;
cthread = calloc(sizeof(struct compress_thread), control.threads);
if (unlikely(!cthread))
fatal("Unable to calloc cthread in open_stream_out\n");
for (i = 0; i < control.threads; i++) {
init_sem(&cthread[i].complete);
init_sem(&cthread[i].free);
post_sem(&cthread[i].free);
}
/* Threads need to wait on the thread before them before dumping their
* data. This is done in a circle up to control.threads */
cthread[0].wait_on = control.threads - 1;
for (i = 1; i < control.threads; i++)
cthread[i].wait_on = i - 1;
/* Signal thread 0 that it can start */
if (control.threads > 1)
post_sem(&cthread[control.threads - 1].complete);
sinfo->num_streams = n;
sinfo->cur_pos = 0;
sinfo->fd = f;
@ -639,7 +688,18 @@ retest_malloc:
goto retest_malloc;
}
free(testmalloc);
print_maxverbose("Succeeded to malloc for compression bufsize of %lld\n", sinfo->bufsize);
/* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the
* bufsize to fit X threads into it */
sinfo->bufsize = MIN(sinfo->bufsize,
MAX((sinfo->bufsize + control.threads - 1) / control.threads, STREAM_BUFSIZE));
if (control.threads > 1)
print_maxverbose("Using %d threads to compress up to %lld bytes each.\n",
control.threads, sinfo->bufsize);
else
print_maxverbose("Using 1 thread to compress up to %lld bytes\n",
sinfo->bufsize);
for (i = 0; i < n; i++) {
sinfo->s[i].buf = malloc(sinfo->bufsize);
@ -743,113 +803,92 @@ failed:
return NULL;
}
/* Enter with s_buf allocated,s_buf points to the compressed data after the
* backend compression and is then freed here */
void *compthread(void *t)
{
long i = (long)t;
struct compress_thread *cti = &cthread[i];
/* Must allocate extra for lzo's sake */
cthread[i].c_buf = malloc(cthread[i].c_len + cthread[i].c_len / 16 + 64 + 3);
if (unlikely(cthread[i].c_len && !cthread[i].c_buf))
fatal("Failed to malloc c_buf in compthread\n");
cti->c_type = CTYPE_NONE;
cti->c_len = cti->s_len;
cthread[i].c_type = CTYPE_NONE;
if (!NO_COMPRESS && cthread[i].c_len) {
if (!NO_COMPRESS && cti->c_len) {
if (LZMA_COMPRESS)
lzma_compress_buf(&cthread[i]);
lzma_compress_buf(cti);
else if (LZO_COMPRESS)
lzo_compress_buf(&cthread[i]);
lzo_compress_buf(cti);
else if (BZIP2_COMPRESS)
bzip2_compress_buf(&cthread[i]);
bzip2_compress_buf(cti);
else if (ZLIB_COMPRESS)
gzip_compress_buf(&cthread[i]);
gzip_compress_buf(cti);
else if (ZPAQ_COMPRESS)
zpaq_compress_buf(&cthread[i]);
zpaq_compress_buf(cti);
else fatal("Dunno wtf compression to use!\n");
}
post_sem(&cthread[i].complete);
if (control.threads > 1)
wait_sem(&cthread[cti->wait_on].complete);
if (unlikely(seekto(cti->sinfo, cti->sinfo->s[cti->stream].last_head)))
fatal("Failed to seekto in compthread %d\n", i);
if (unlikely(write_i64(cti->sinfo->fd, cti->sinfo->cur_pos)))
fatal("Failed to write_i64 in compthread %d\n", i);
cti->sinfo->s[cti->stream].last_head = cti->sinfo->cur_pos + 17;
if (unlikely(seekto(cti->sinfo, cti->sinfo->cur_pos)))
fatal("Failed to seekto cur_pos in compthread %d\n", i);
print_maxverbose("Writing %lld compressed bytes from thread %ld\n", cti->c_len, i);
if (unlikely(write_u8(cti->sinfo->fd, cti->c_type) ||
write_i64(cti->sinfo->fd, cti->c_len) ||
write_i64(cti->sinfo->fd, cti->s_len) ||
write_i64(cti->sinfo->fd, 0))) {
fatal("Failed write in compthread %d\n", i);
}
cti->sinfo->cur_pos += 25;
if (unlikely(write_buf(cti->sinfo->fd, cti->s_buf, cti->c_len)))
fatal("Failed to write_buf in compthread %d\n", i);
cti->sinfo->cur_pos += cti->c_len;
free(cti->s_buf);
fsync(cti->sinfo->fd);
post_sem(&cti->complete);
post_sem(&cti->free);
return 0;
}
/* flush out any data in a stream buffer. Return -1 on failure */
int flush_buffer(struct stream_info *sinfo, int stream)
/* flush out any data in a stream buffer */
void flush_buffer(struct stream_info *sinfo, int stream)
{
i64 bufsize, buflen = sinfo->s[stream].buflen, offset = 0;
pthread_t threads[control.threads];
int chunks = 1;
long i;
long i = sinfo->thread_no;
bufsize = buflen;
/* Make sure this thread doesn't already exist */
wait_sem(&cthread[i].free);
/* Increase the number of threads but don't let each buffer get
* smaller than STREAM_BUFSIZE as the backend compression efficency
* significantly drops off.
*/
while (chunks < control.threads && bufsize > STREAM_BUFSIZE) {
chunks++;
bufsize = buflen / chunks + 1;
}
cthread[i].sinfo = sinfo;
cthread[i].stream = stream;
cthread[i].s_buf = sinfo->s[stream].buf;
cthread[i].s_len = sinfo->s[stream].buflen;
cthread = calloc(sizeof(struct compress_thread), chunks);
if (unlikely(!cthread))
fatal("Unable to calloc cthread in flush_buffer\n");
for (i = 0; i < chunks; i++) {
init_sem(&cthread[i].complete);
if (offset + bufsize > buflen)
bufsize = buflen - offset;
cthread[i].s_buf = sinfo->s[stream].buf + offset;
cthread[i].s_len = cthread[i].c_len = bufsize;
cthread[i].s = sinfo->s;
print_maxverbose("\rStarting thread %lu to compress %lld bytes at offset %lld\n", i, bufsize, offset);
print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n",
i, cthread[i].s_len, stream);
create_pthread(&threads[i], NULL, compthread, (void *)i);
offset += bufsize;
}
for (i = 0; i < chunks; i++) {
if (unlikely(seekto(sinfo, sinfo->s[stream].last_head)))
return -1;
if (unlikely(write_i64(sinfo->fd, sinfo->cur_pos)))
return -1;
sinfo->s[stream].last_head = sinfo->cur_pos + 17;
if (unlikely(seekto(sinfo, sinfo->cur_pos)))
return -1;
wait_sem(&cthread[i].complete);
print_maxverbose("Reading thread %ld to write %lld compressed bytes\n", i, cthread[i].c_len);
if (unlikely(write_u8(sinfo->fd, cthread[i].c_type) ||
write_i64(sinfo->fd, cthread[i].c_len) ||
write_i64(sinfo->fd, cthread[i].s_len) ||
write_i64(sinfo->fd, 0))) {
return -1;
}
sinfo->cur_pos += 25;
if (unlikely(write_buf(sinfo->fd, cthread[i].s_buf, cthread[i].c_len)))
return -1;
fsync(sinfo->fd);
sinfo->cur_pos += cthread[i].c_len;
destroy_sem(&cthread[i].complete);
free(cthread[i].c_buf);
}
free(cthread);
/* The stream buffer has been given to the thread, allocate a new one */
sinfo->s[stream].buf = malloc(sinfo->bufsize);
if (unlikely(!sinfo->s[stream].buf))
fatal("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize);
sinfo->s[stream].buflen = 0;
sinfo->s[stream].buf = realloc(sinfo->s[stream].buf, sinfo->bufsize);
if (unlikely(!sinfo->s[stream].buf))
fatal("Failed to realloc in flush_buffer\n");
return 0;
if (++sinfo->thread_no == control.threads)
sinfo->thread_no = 0;
}
/* fill a buffer from a stream - return -1 on failure */
@ -895,6 +934,7 @@ static int fill_buffer(struct stream_info *sinfo, int stream)
sinfo->s[stream].buf = malloc(u_len);
if (unlikely(u_len && !sinfo->s[stream].buf))
fatal("Unable to malloc buffer of size %lld in fill_buffer\n", u_len);
if (unlikely(read_buf(sinfo->fd, sinfo->s[stream].buf, c_len)))
return -1;
@ -940,10 +980,9 @@ int write_stream(void *ss, int stream, uchar *p, i64 len)
p += n;
len -= n;
if (sinfo->s[stream].buflen == sinfo->bufsize) {
if (unlikely(flush_buffer(sinfo, stream)))
return -1;
}
/* Flush the buffer every sinfo->bufsize into one thread */
if (sinfo->s[stream].buflen == sinfo->bufsize)
flush_buffer(sinfo, stream);
}
return 0;
}
@ -985,21 +1024,24 @@ int close_stream_out(void *ss)
struct stream_info *sinfo = ss;
int i;
/* reallocate buffers to try and save space */
for (i = 0; i < sinfo->num_streams; i++) {
if (sinfo->s[i].buflen) {
if (unlikely(!realloc(sinfo->s[i].buf, sinfo->s[i].buflen)))
fatal("Error Reallocating Output Buffer %d\n", i);
if (sinfo->s[i].buflen)
flush_buffer(sinfo, i);
}
for (i = 0; i < control.threads; i++) {
wait_sem(&cthread[i].free);
destroy_sem(&cthread[i].complete);
destroy_sem(&cthread[i].free);
}
for (i = 0; i < sinfo->num_streams; i++) {
if (unlikely(sinfo->s[i].buflen && flush_buffer(sinfo, i)))
return -1;
if (sinfo->s[i].buf)
for (i = 0; i < sinfo->num_streams; i++)
free(sinfo->s[i].buf);
}
free(cthread);
free(sinfo->s);
free(sinfo);
return 0;
}
@ -1012,10 +1054,8 @@ int close_stream_in(void *ss)
if (unlikely(lseek(sinfo->fd, sinfo->initial_pos + sinfo->total_read,
SEEK_SET) != sinfo->initial_pos + sinfo->total_read))
return -1;
for (i = 0; i < sinfo->num_streams; i++) {
if (sinfo->s[i].buf)
for (i = 0; i < sinfo->num_streams; i++)
free(sinfo->s[i].buf);
}
free(sinfo->s);
free(sinfo);
@ -1052,7 +1092,7 @@ static int lzo_compresses(uchar *s_buf, i64 s_len)
if (unlikely(!c_buf))
fatal("Unable to allocate c_buf in lzo_compresses\n");
print_progress("\tlzo testing for incompressible data...");
print_progress("lzo testing for incompressible data...\n");
/* Test progressively larger blocks at a time and as soon as anything
compressible is found, jump out as a success */
@ -1081,9 +1121,7 @@ static int lzo_compresses(uchar *s_buf, i64 s_len)
(ret == 0? "FAILED - below threshold" : "OK"), save_len,
100 * ((double) best_dlen / (double) in_len), workcounter);
else if (VERBOSE)
print_output("%s\r", (ret == 0? "FAILED - below threshold" : "OK"));
else
print_progress("\r\t \r");
print_output("%s\n", (ret == 0? "FAILED - below threshold" : "OK"));
free(wrkmem);
free(c_buf);

View file

@ -1738,7 +1738,7 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, i
len++;
pct = (len * 100 / buf_len);
if (pct != last_pct) {
fprintf(msgout, "\r\tZPAQ Chunk %i of 2 compress: %i%% \r", (chunk + 1), pct);
fprintf(msgout, "\r\t\t\tZPAQ Chunk %i of 2 compress: %i%% \r", (chunk + 1), pct);
fflush(msgout);
last_pct = pct;
}