diff --git a/rzip.c b/rzip.c index 7130eb5..8ff87d7 100644 --- a/rzip.c +++ b/rzip.c @@ -702,7 +702,7 @@ static void rzip_chunk(struct rzip_state *st, int fd_in, int fd_out, i64 offset, { init_sliding_mmap(st, fd_in, offset); - st->ss = open_stream_out(fd_out, NUM_STREAMS, st->chunk_size); + st->ss = open_stream_out(fd_out, NUM_STREAMS, st->chunk_size, st->chunk_bytes); if (unlikely(!st->ss)) fatal("Failed to open streams in rzip_chunk\n"); @@ -803,6 +803,8 @@ void rzip_fd(int fd_in, int fd_out) last.tv_sec = last.tv_usec = 0; gettimeofday(&start, NULL); + prepare_streamout_threads(); + while (len > 0 || (STDIN && !st->stdin_eof)) { double pct_base, pct_multiple; i64 offset = s.st_size - len; @@ -859,7 +861,7 @@ retry: sb.orig_offset = offset; print_maxverbose("Chunk size: %lld\n", st->chunk_size); - /* Determine the chunk byte width and write it to the file + /* Determine the chunk byte width to write to the file * This allows archives of different chunk sizes to have * optimal byte width entries. When working with stdin we * won't know in advance how big it is so it will always be @@ -870,8 +872,6 @@ retry: if (bits % 8) st->chunk_bytes++; print_maxverbose("Byte width: %d\n", st->chunk_bytes); - if (unlikely(write(fd_out, &st->chunk_bytes, 1) != 1)) - fatal("Failed to write chunk_bytes size in rzip_fd\n"); pct_base = (100.0 * (s.st_size - len)) / s.st_size; pct_multiple = ((double)st->chunk_size) / s.st_size; @@ -905,6 +905,8 @@ retry: len -= st->chunk_size; } + close_streamout_threads(); + gettimeofday(¤t, NULL); chunkmbs = (s.st_size / 1024 / 1024) / ((double)(current.tv_sec-start.tv_sec)? : 1); diff --git a/rzip.h b/rzip.h index 1b7e537..2b4c062 100644 --- a/rzip.h +++ b/rzip.h @@ -286,12 +286,14 @@ struct stream_info { i64 total_read; long thread_no; long next_thread; + int chunks; + char chunk_bytes; }; void sighandler(); i64 runzip_fd(int fd_in, int fd_out, int fd_hist, i64 expected_size); void rzip_fd(int fd_in, int fd_out); -void *open_stream_out(int f, int n, i64 limit); +void *open_stream_out(int f, int n, i64 limit, char cbytes); void *open_stream_in(int f, int n); int write_stream(void *ss, int stream, uchar *p, i64 len); i64 read_stream(void *ss, int stream, uchar *p, i64 len); @@ -304,6 +306,8 @@ ssize_t read_1g(int fd, void *buf, i64 len); void zpipe_compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress, long thread); void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress, long thread); const i64 two_gig; +void prepare_streamout_threads(void); +void close_streamout_threads(void); #define print_err(format, args...) do {\ fprintf(stderr, format, ##args); \ diff --git a/stream.c b/stream.c index 1a47f72..80c12b3 100644 --- a/stream.c +++ b/stream.c @@ -653,29 +653,17 @@ static int seekto(struct stream_info *sinfo, i64 pos) static pthread_t *threads; -/* open a set of output streams, compressing with the given - compression level and algorithm */ -void *open_stream_out(int f, int n, i64 limit) +void prepare_streamout_threads(void) { - struct stream_info *sinfo; - uchar *testmalloc; - i64 testsize; int i; - sinfo = malloc(sizeof(*sinfo)); - if (unlikely(!sinfo)) - return NULL; - - threads = (pthread_t *)calloc(sizeof(pthread_t), control.threads); + threads = calloc(sizeof(pthread_t), control.threads); if (unlikely(!threads)) - return NULL; - - sinfo->bufsize = limit; - sinfo->thread_no = 0; + fatal("Unable to calloc threads in prepare_streamout_threads\n"); cthread = calloc(sizeof(struct compress_thread), control.threads); if (unlikely(!cthread)) - fatal("Unable to calloc cthread in open_stream_out\n"); + fatal("Unable to calloc cthread in prepare_streamout_threads\n"); for (i = 0; i < control.threads; i++) { init_sem(&cthread[i].complete); @@ -692,18 +680,45 @@ void *open_stream_out(int f, int n, i64 limit) /* Signal thread 0 that it can start */ if (control.threads > 1) post_sem(&cthread[control.threads - 1].complete); +} +void close_streamout_threads(void) +{ + int i; + + for (i = 0; i < control.threads; i++) { + wait_sem(&cthread[i].free); + destroy_sem(&cthread[i].complete); + destroy_sem(&cthread[i].free); + } + free(cthread); + free(threads); +} + +/* open a set of output streams, compressing with the given + compression level and algorithm */ +void *open_stream_out(int f, int n, i64 limit, char cbytes) +{ + struct stream_info *sinfo; + uchar *testmalloc; + i64 testsize; + int i; + + sinfo = calloc(sizeof(struct stream_info), 1); + if (unlikely(!sinfo)) + return NULL; + + sinfo->bufsize = limit; + + sinfo->chunk_bytes = cbytes; sinfo->num_streams = n; - sinfo->cur_pos = 0; sinfo->fd = f; /* Serious limits imposed on 32 bit capabilities */ if (BITS32) limit = MIN(limit, two_gig / 6); - sinfo->initial_pos = lseek(f, 0, SEEK_CUR); - - sinfo->s = (struct stream *)calloc(sizeof(sinfo->s[0]), n); + sinfo->s = calloc(sizeof(struct stream), n); if (unlikely(!sinfo->s)) { free(sinfo); return NULL; @@ -749,6 +764,7 @@ retest_malloc: fatal("Unable to malloc buffer of size %lld in open_stream_out\n", sinfo->bufsize); } +#if 0 /* write the initial headers */ for (i = 0; i < n; i++) { sinfo->s[i].last_head = sinfo->cur_pos + 17; @@ -758,6 +774,7 @@ retest_malloc: write_i64(sinfo->fd, 0); sinfo->cur_pos += 25; } +#endif return (void *)sinfo; } @@ -768,12 +785,12 @@ void *open_stream_in(int f, int n) int total_threads, i; i64 header_length; - sinfo = calloc(sizeof(*sinfo), 1); + sinfo = calloc(sizeof(struct stream_info), 1); if (unlikely(!sinfo)) return NULL; total_threads = control.threads * n; - threads = (pthread_t *)calloc(sizeof(pthread_t), total_threads); + threads = calloc(sizeof(pthread_t), total_threads); if (unlikely(!threads)) return NULL; @@ -792,7 +809,7 @@ void *open_stream_in(int f, int n) sinfo->fd = f; sinfo->initial_pos = lseek(f, 0, SEEK_CUR); - sinfo->s = (struct stream *)calloc(sizeof(sinfo->s[0]), n); + sinfo->s = calloc(sizeof(struct stream), n); if (unlikely(!sinfo->s)) { free(sinfo); return NULL; @@ -871,6 +888,7 @@ static void *compthread(void *t) { long i = (long)t; struct compress_thread *cti = &cthread[i]; + struct stream_info *ctis = cti->sinfo; if (unlikely(setpriority(PRIO_PROCESS, 0, control.nice_val) == -1)) print_err("Warning, unable to set nice value on thread\n"); @@ -895,32 +913,51 @@ static void *compthread(void *t) if (control.threads > 1) wait_sem(&cthread[cti->wait_on].complete); - if (unlikely(seekto(cti->sinfo, cti->sinfo->s[cti->stream].last_head))) + if (!ctis->chunks++) { + int j; + + /* Write chunk bytes of this block */ + write_u8(ctis->fd, ctis->chunk_bytes); + + /* First chunk of this stream, write headers */ + ctis->initial_pos = lseek(ctis->fd, 0, SEEK_CUR); + + for (j = 0; j < ctis->num_streams; j++) { + ctis->s[j].last_head = ctis->cur_pos + 17; + write_u8(ctis->fd, CTYPE_NONE); + write_i64(ctis->fd, 0); + write_i64(ctis->fd, 0); + write_i64(ctis->fd, 0); + ctis->cur_pos += 25; + } + } + + if (unlikely(seekto(ctis, ctis->s[cti->stream].last_head))) fatal("Failed to seekto in compthread %d\n", i); - if (unlikely(write_i64(cti->sinfo->fd, cti->sinfo->cur_pos))) + if (unlikely(write_i64(ctis->fd, ctis->cur_pos))) fatal("Failed to write_i64 in compthread %d\n", i); - cti->sinfo->s[cti->stream].last_head = cti->sinfo->cur_pos + 17; - if (unlikely(seekto(cti->sinfo, cti->sinfo->cur_pos))) + ctis->s[cti->stream].last_head = ctis->cur_pos + 17; + if (unlikely(seekto(ctis, ctis->cur_pos))) fatal("Failed to seekto cur_pos in compthread %d\n", i); - print_maxverbose("Writing %lld compressed bytes from thread %ld\n", cti->c_len, i); - if (unlikely(write_u8(cti->sinfo->fd, cti->c_type) || - write_i64(cti->sinfo->fd, cti->c_len) || - write_i64(cti->sinfo->fd, cti->s_len) || - write_i64(cti->sinfo->fd, 0))) { + print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, cti->c_len, cti->stream); + if (unlikely(write_u8(ctis->fd, cti->c_type) || + write_i64(ctis->fd, cti->c_len) || + write_i64(ctis->fd, cti->s_len) || + write_i64(ctis->fd, 0))) { fatal("Failed write in compthread %d\n", i); } - cti->sinfo->cur_pos += 25; + ctis->cur_pos += 25; - if (unlikely(write_buf(cti->sinfo->fd, cti->s_buf, cti->c_len))) + if (unlikely(write_buf(ctis->fd, cti->s_buf, cti->c_len))) fatal("Failed to write_buf in compthread %d\n", i); - cti->sinfo->cur_pos += cti->c_len; + ctis->cur_pos += cti->c_len; free(cti->s_buf); - fsync(cti->sinfo->fd); + fsync(ctis->fd); post_sem(&cti->complete); post_sem(&cti->free); @@ -928,10 +965,9 @@ static void *compthread(void *t) return 0; } -/* flush out any data in a stream buffer */ -void flush_buffer(struct stream_info *sinfo, int stream) +static void clear_buffer(struct stream_info *sinfo, int stream, int newbuf) { - long i = sinfo->thread_no; + static long i = 0; /* Make sure this thread doesn't already exist */ wait_sem(&cthread[i].free); @@ -945,14 +981,22 @@ void flush_buffer(struct stream_info *sinfo, int stream) i, cthread[i].s_len, stream); create_pthread(&threads[i], NULL, compthread, (void *)i); - /* The stream buffer has been given to the thread, allocate a new one */ - sinfo->s[stream].buf = malloc(sinfo->bufsize); - if (unlikely(!sinfo->s[stream].buf)) - fatal("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize); - sinfo->s[stream].buflen = 0; + if (newbuf) { + /* The stream buffer has been given to the thread, allocate a new one */ + sinfo->s[stream].buf = malloc(sinfo->bufsize); + if (unlikely(!sinfo->s[stream].buf)) + fatal("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize); + sinfo->s[stream].buflen = 0; + } - if (++sinfo->thread_no == control.threads) - sinfo->thread_no = 0; + if (++i == control.threads) + i = 0; +} + +/* flush out any data in a stream buffer */ +void flush_buffer(struct stream_info *sinfo, int stream) +{ + clear_buffer(sinfo, stream, 1); } static void *ucompthread(void *t) @@ -1142,23 +1186,16 @@ int close_stream_out(void *ss) for (i = 0; i < sinfo->num_streams; i++) { if (sinfo->s[i].buflen) - flush_buffer(sinfo, i); + clear_buffer(sinfo, i, 0); } - for (i = 0; i < control.threads; i++) { - wait_sem(&cthread[i].free); - destroy_sem(&cthread[i].complete); - destroy_sem(&cthread[i].free); - } - - for (i = 0; i < sinfo->num_streams; i++) - free(sinfo->s[i].buf); - - free(cthread); - free(threads); +#if 0 + /* These cannot be freed because their values are read after the next + * stream has started so they're not properly freed and just dropped on + * program exit! FIXME */ free(sinfo->s); free(sinfo); - +#endif return 0; }