mirror of
https://github.com/ckolivas/lrzip.git
synced 2025-12-06 07:12:00 +01:00
Move the threading on compression to higher up in the code, allowing the next stream to start using compression threads before the previous stream has finished.
This overlapping of compressing streams means that when files are large enough to be split into multiple blocks, all CPUs will be used more effectively throughout the compression, affording a nice speedup. Move the writing of the chunk byte size and initial headers into the compthread to prevent any races occurring. Fix a few dodgy callocs that may have been overflowing! The previous commit reverts were done because the changes designed to speed it up actually slowed it down instead.
This commit is contained in:
parent
8dd9b00496
commit
50437a8447
10
rzip.c
10
rzip.c
|
|
@ -702,7 +702,7 @@ static void rzip_chunk(struct rzip_state *st, int fd_in, int fd_out, i64 offset,
|
|||
{
|
||||
init_sliding_mmap(st, fd_in, offset);
|
||||
|
||||
st->ss = open_stream_out(fd_out, NUM_STREAMS, st->chunk_size);
|
||||
st->ss = open_stream_out(fd_out, NUM_STREAMS, st->chunk_size, st->chunk_bytes);
|
||||
if (unlikely(!st->ss))
|
||||
fatal("Failed to open streams in rzip_chunk\n");
|
||||
|
||||
|
|
@ -803,6 +803,8 @@ void rzip_fd(int fd_in, int fd_out)
|
|||
last.tv_sec = last.tv_usec = 0;
|
||||
gettimeofday(&start, NULL);
|
||||
|
||||
prepare_streamout_threads();
|
||||
|
||||
while (len > 0 || (STDIN && !st->stdin_eof)) {
|
||||
double pct_base, pct_multiple;
|
||||
i64 offset = s.st_size - len;
|
||||
|
|
@ -859,7 +861,7 @@ retry:
|
|||
sb.orig_offset = offset;
|
||||
print_maxverbose("Chunk size: %lld\n", st->chunk_size);
|
||||
|
||||
/* Determine the chunk byte width and write it to the file
|
||||
/* Determine the chunk byte width to write to the file
|
||||
* This allows archives of different chunk sizes to have
|
||||
* optimal byte width entries. When working with stdin we
|
||||
* won't know in advance how big it is so it will always be
|
||||
|
|
@ -870,8 +872,6 @@ retry:
|
|||
if (bits % 8)
|
||||
st->chunk_bytes++;
|
||||
print_maxverbose("Byte width: %d\n", st->chunk_bytes);
|
||||
if (unlikely(write(fd_out, &st->chunk_bytes, 1) != 1))
|
||||
fatal("Failed to write chunk_bytes size in rzip_fd\n");
|
||||
|
||||
pct_base = (100.0 * (s.st_size - len)) / s.st_size;
|
||||
pct_multiple = ((double)st->chunk_size) / s.st_size;
|
||||
|
|
@ -905,6 +905,8 @@ retry:
|
|||
len -= st->chunk_size;
|
||||
}
|
||||
|
||||
close_streamout_threads();
|
||||
|
||||
gettimeofday(¤t, NULL);
|
||||
chunkmbs = (s.st_size / 1024 / 1024) / ((double)(current.tv_sec-start.tv_sec)? : 1);
|
||||
|
||||
|
|
|
|||
6
rzip.h
6
rzip.h
|
|
@ -286,12 +286,14 @@ struct stream_info {
|
|||
i64 total_read;
|
||||
long thread_no;
|
||||
long next_thread;
|
||||
int chunks;
|
||||
char chunk_bytes;
|
||||
};
|
||||
|
||||
void sighandler();
|
||||
i64 runzip_fd(int fd_in, int fd_out, int fd_hist, i64 expected_size);
|
||||
void rzip_fd(int fd_in, int fd_out);
|
||||
void *open_stream_out(int f, int n, i64 limit);
|
||||
void *open_stream_out(int f, int n, i64 limit, char cbytes);
|
||||
void *open_stream_in(int f, int n);
|
||||
int write_stream(void *ss, int stream, uchar *p, i64 len);
|
||||
i64 read_stream(void *ss, int stream, uchar *p, i64 len);
|
||||
|
|
@ -304,6 +306,8 @@ ssize_t read_1g(int fd, void *buf, i64 len);
|
|||
void zpipe_compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress, long thread);
|
||||
void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress, long thread);
|
||||
const i64 two_gig;
|
||||
void prepare_streamout_threads(void);
|
||||
void close_streamout_threads(void);
|
||||
|
||||
#define print_err(format, args...) do {\
|
||||
fprintf(stderr, format, ##args); \
|
||||
|
|
|
|||
155
stream.c
155
stream.c
|
|
@ -653,29 +653,17 @@ static int seekto(struct stream_info *sinfo, i64 pos)
|
|||
|
||||
static pthread_t *threads;
|
||||
|
||||
/* open a set of output streams, compressing with the given
|
||||
compression level and algorithm */
|
||||
void *open_stream_out(int f, int n, i64 limit)
|
||||
void prepare_streamout_threads(void)
|
||||
{
|
||||
struct stream_info *sinfo;
|
||||
uchar *testmalloc;
|
||||
i64 testsize;
|
||||
int i;
|
||||
|
||||
sinfo = malloc(sizeof(*sinfo));
|
||||
if (unlikely(!sinfo))
|
||||
return NULL;
|
||||
|
||||
threads = (pthread_t *)calloc(sizeof(pthread_t), control.threads);
|
||||
threads = calloc(sizeof(pthread_t), control.threads);
|
||||
if (unlikely(!threads))
|
||||
return NULL;
|
||||
|
||||
sinfo->bufsize = limit;
|
||||
sinfo->thread_no = 0;
|
||||
fatal("Unable to calloc threads in prepare_streamout_threads\n");
|
||||
|
||||
cthread = calloc(sizeof(struct compress_thread), control.threads);
|
||||
if (unlikely(!cthread))
|
||||
fatal("Unable to calloc cthread in open_stream_out\n");
|
||||
fatal("Unable to calloc cthread in prepare_streamout_threads\n");
|
||||
|
||||
for (i = 0; i < control.threads; i++) {
|
||||
init_sem(&cthread[i].complete);
|
||||
|
|
@ -692,18 +680,45 @@ void *open_stream_out(int f, int n, i64 limit)
|
|||
/* Signal thread 0 that it can start */
|
||||
if (control.threads > 1)
|
||||
post_sem(&cthread[control.threads - 1].complete);
|
||||
}
|
||||
|
||||
void close_streamout_threads(void)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < control.threads; i++) {
|
||||
wait_sem(&cthread[i].free);
|
||||
destroy_sem(&cthread[i].complete);
|
||||
destroy_sem(&cthread[i].free);
|
||||
}
|
||||
free(cthread);
|
||||
free(threads);
|
||||
}
|
||||
|
||||
/* open a set of output streams, compressing with the given
|
||||
compression level and algorithm */
|
||||
void *open_stream_out(int f, int n, i64 limit, char cbytes)
|
||||
{
|
||||
struct stream_info *sinfo;
|
||||
uchar *testmalloc;
|
||||
i64 testsize;
|
||||
int i;
|
||||
|
||||
sinfo = calloc(sizeof(struct stream_info), 1);
|
||||
if (unlikely(!sinfo))
|
||||
return NULL;
|
||||
|
||||
sinfo->bufsize = limit;
|
||||
|
||||
sinfo->chunk_bytes = cbytes;
|
||||
sinfo->num_streams = n;
|
||||
sinfo->cur_pos = 0;
|
||||
sinfo->fd = f;
|
||||
|
||||
/* Serious limits imposed on 32 bit capabilities */
|
||||
if (BITS32)
|
||||
limit = MIN(limit, two_gig / 6);
|
||||
|
||||
sinfo->initial_pos = lseek(f, 0, SEEK_CUR);
|
||||
|
||||
sinfo->s = (struct stream *)calloc(sizeof(sinfo->s[0]), n);
|
||||
sinfo->s = calloc(sizeof(struct stream), n);
|
||||
if (unlikely(!sinfo->s)) {
|
||||
free(sinfo);
|
||||
return NULL;
|
||||
|
|
@ -749,6 +764,7 @@ retest_malloc:
|
|||
fatal("Unable to malloc buffer of size %lld in open_stream_out\n", sinfo->bufsize);
|
||||
}
|
||||
|
||||
#if 0
|
||||
/* write the initial headers */
|
||||
for (i = 0; i < n; i++) {
|
||||
sinfo->s[i].last_head = sinfo->cur_pos + 17;
|
||||
|
|
@ -758,6 +774,7 @@ retest_malloc:
|
|||
write_i64(sinfo->fd, 0);
|
||||
sinfo->cur_pos += 25;
|
||||
}
|
||||
#endif
|
||||
return (void *)sinfo;
|
||||
}
|
||||
|
||||
|
|
@ -768,12 +785,12 @@ void *open_stream_in(int f, int n)
|
|||
int total_threads, i;
|
||||
i64 header_length;
|
||||
|
||||
sinfo = calloc(sizeof(*sinfo), 1);
|
||||
sinfo = calloc(sizeof(struct stream_info), 1);
|
||||
if (unlikely(!sinfo))
|
||||
return NULL;
|
||||
|
||||
total_threads = control.threads * n;
|
||||
threads = (pthread_t *)calloc(sizeof(pthread_t), total_threads);
|
||||
threads = calloc(sizeof(pthread_t), total_threads);
|
||||
if (unlikely(!threads))
|
||||
return NULL;
|
||||
|
||||
|
|
@ -792,7 +809,7 @@ void *open_stream_in(int f, int n)
|
|||
sinfo->fd = f;
|
||||
sinfo->initial_pos = lseek(f, 0, SEEK_CUR);
|
||||
|
||||
sinfo->s = (struct stream *)calloc(sizeof(sinfo->s[0]), n);
|
||||
sinfo->s = calloc(sizeof(struct stream), n);
|
||||
if (unlikely(!sinfo->s)) {
|
||||
free(sinfo);
|
||||
return NULL;
|
||||
|
|
@ -871,6 +888,7 @@ static void *compthread(void *t)
|
|||
{
|
||||
long i = (long)t;
|
||||
struct compress_thread *cti = &cthread[i];
|
||||
struct stream_info *ctis = cti->sinfo;
|
||||
|
||||
if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1))
|
||||
print_err("Warning, unable to set nice value on thread\n");
|
||||
|
|
@ -895,32 +913,51 @@ static void *compthread(void *t)
|
|||
if (control.threads > 1)
|
||||
wait_sem(&cthread[cti->wait_on].complete);
|
||||
|
||||
if (unlikely(seekto(cti->sinfo, cti->sinfo->s[cti->stream].last_head)))
|
||||
if (!ctis->chunks++) {
|
||||
int j;
|
||||
|
||||
/* Write chunk bytes of this block */
|
||||
write_u8(ctis->fd, ctis->chunk_bytes);
|
||||
|
||||
/* First chunk of this stream, write headers */
|
||||
ctis->initial_pos = lseek(ctis->fd, 0, SEEK_CUR);
|
||||
|
||||
for (j = 0; j < ctis->num_streams; j++) {
|
||||
ctis->s[j].last_head = ctis->cur_pos + 17;
|
||||
write_u8(ctis->fd, CTYPE_NONE);
|
||||
write_i64(ctis->fd, 0);
|
||||
write_i64(ctis->fd, 0);
|
||||
write_i64(ctis->fd, 0);
|
||||
ctis->cur_pos += 25;
|
||||
}
|
||||
}
|
||||
|
||||
if (unlikely(seekto(ctis, ctis->s[cti->stream].last_head)))
|
||||
fatal("Failed to seekto in compthread %d\n", i);
|
||||
|
||||
if (unlikely(write_i64(cti->sinfo->fd, cti->sinfo->cur_pos)))
|
||||
if (unlikely(write_i64(ctis->fd, ctis->cur_pos)))
|
||||
fatal("Failed to write_i64 in compthread %d\n", i);
|
||||
|
||||
cti->sinfo->s[cti->stream].last_head = cti->sinfo->cur_pos + 17;
|
||||
if (unlikely(seekto(cti->sinfo, cti->sinfo->cur_pos)))
|
||||
ctis->s[cti->stream].last_head = ctis->cur_pos + 17;
|
||||
if (unlikely(seekto(ctis, ctis->cur_pos)))
|
||||
fatal("Failed to seekto cur_pos in compthread %d\n", i);
|
||||
|
||||
print_maxverbose("Writing %lld compressed bytes from thread %ld\n", cti->c_len, i);
|
||||
if (unlikely(write_u8(cti->sinfo->fd, cti->c_type) ||
|
||||
write_i64(cti->sinfo->fd, cti->c_len) ||
|
||||
write_i64(cti->sinfo->fd, cti->s_len) ||
|
||||
write_i64(cti->sinfo->fd, 0))) {
|
||||
print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, cti->c_len, cti->stream);
|
||||
if (unlikely(write_u8(ctis->fd, cti->c_type) ||
|
||||
write_i64(ctis->fd, cti->c_len) ||
|
||||
write_i64(ctis->fd, cti->s_len) ||
|
||||
write_i64(ctis->fd, 0))) {
|
||||
fatal("Failed write in compthread %d\n", i);
|
||||
}
|
||||
cti->sinfo->cur_pos += 25;
|
||||
ctis->cur_pos += 25;
|
||||
|
||||
if (unlikely(write_buf(cti->sinfo->fd, cti->s_buf, cti->c_len)))
|
||||
if (unlikely(write_buf(ctis->fd, cti->s_buf, cti->c_len)))
|
||||
fatal("Failed to write_buf in compthread %d\n", i);
|
||||
|
||||
cti->sinfo->cur_pos += cti->c_len;
|
||||
ctis->cur_pos += cti->c_len;
|
||||
free(cti->s_buf);
|
||||
|
||||
fsync(cti->sinfo->fd);
|
||||
fsync(ctis->fd);
|
||||
|
||||
post_sem(&cti->complete);
|
||||
post_sem(&cti->free);
|
||||
|
|
@ -928,10 +965,9 @@ static void *compthread(void *t)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* flush out any data in a stream buffer */
|
||||
void flush_buffer(struct stream_info *sinfo, int stream)
|
||||
static void clear_buffer(struct stream_info *sinfo, int stream, int newbuf)
|
||||
{
|
||||
long i = sinfo->thread_no;
|
||||
static long i = 0;
|
||||
|
||||
/* Make sure this thread doesn't already exist */
|
||||
wait_sem(&cthread[i].free);
|
||||
|
|
@ -945,14 +981,22 @@ void flush_buffer(struct stream_info *sinfo, int stream)
|
|||
i, cthread[i].s_len, stream);
|
||||
create_pthread(&threads[i], NULL, compthread, (void *)i);
|
||||
|
||||
/* The stream buffer has been given to the thread, allocate a new one */
|
||||
sinfo->s[stream].buf = malloc(sinfo->bufsize);
|
||||
if (unlikely(!sinfo->s[stream].buf))
|
||||
fatal("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize);
|
||||
sinfo->s[stream].buflen = 0;
|
||||
if (newbuf) {
|
||||
/* The stream buffer has been given to the thread, allocate a new one */
|
||||
sinfo->s[stream].buf = malloc(sinfo->bufsize);
|
||||
if (unlikely(!sinfo->s[stream].buf))
|
||||
fatal("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize);
|
||||
sinfo->s[stream].buflen = 0;
|
||||
}
|
||||
|
||||
if (++sinfo->thread_no == control.threads)
|
||||
sinfo->thread_no = 0;
|
||||
if (++i == control.threads)
|
||||
i = 0;
|
||||
}
|
||||
|
||||
/* flush out any data in a stream buffer */
|
||||
void flush_buffer(struct stream_info *sinfo, int stream)
|
||||
{
|
||||
clear_buffer(sinfo, stream, 1);
|
||||
}
|
||||
|
||||
static void *ucompthread(void *t)
|
||||
|
|
@ -1142,23 +1186,16 @@ int close_stream_out(void *ss)
|
|||
|
||||
for (i = 0; i < sinfo->num_streams; i++) {
|
||||
if (sinfo->s[i].buflen)
|
||||
flush_buffer(sinfo, i);
|
||||
clear_buffer(sinfo, i, 0);
|
||||
}
|
||||
|
||||
for (i = 0; i < control.threads; i++) {
|
||||
wait_sem(&cthread[i].free);
|
||||
destroy_sem(&cthread[i].complete);
|
||||
destroy_sem(&cthread[i].free);
|
||||
}
|
||||
|
||||
for (i = 0; i < sinfo->num_streams; i++)
|
||||
free(sinfo->s[i].buf);
|
||||
|
||||
free(cthread);
|
||||
free(threads);
|
||||
#if 0
|
||||
/* These cannot be freed because their values are read after the next
|
||||
* stream has started so they're not properly freed and just dropped on
|
||||
* program exit! FIXME */
|
||||
free(sinfo->s);
|
||||
free(sinfo);
|
||||
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue