From 5edf8471d1eedf29ad2e477653062b3b7ca33c9e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 11 Mar 2012 11:49:40 +1100 Subject: [PATCH] Perform all checksumming in a separate thread to speed up the hash search in the rzip phase. --- lrzip_private.h | 10 +++++++ rzip.c | 69 +++++++++++++++++++++++++++++++++---------------- stream.c | 17 +++++++++--- stream.h | 6 ++++- 4 files changed, 76 insertions(+), 26 deletions(-) diff --git a/lrzip_private.h b/lrzip_private.h index c361216..72225b1 100644 --- a/lrzip_private.h +++ b/lrzip_private.h @@ -297,6 +297,12 @@ struct sliding_buffer { int fd; /* The fd of the mmap */ }; +struct checksum { + uint32_t *cksum; + uchar *buf; + i64 len; +}; + struct rzip_control { char *infile; FILE *inFILE; // if a FILE is being read from @@ -347,9 +353,13 @@ struct rzip_control { uchar *hash; unsigned char eof; unsigned char magic_written; + + pthread_mutex_t cksumlock; md5_ctx ctx; uchar md5_resblock[MD5_DIGEST_SIZE]; i64 md5_read; // How far into the file the md5 has done so far + struct checksum checksum; + const char *util_infile; char delete_infile; const char *util_outfile; diff --git a/rzip.c b/rzip.c index e03ce5b..581624a 100644 --- a/rzip.c +++ b/rzip.c @@ -556,6 +556,28 @@ static void show_distrib(rzip_control *control, struct rzip_state *st) primary * 100.0 / (total ? : 1)); } +/* Perform all checksumming in a separate thread to speed up the hash search. */ +static void *cksumthread(void *data) +{ + rzip_control *control = (rzip_control *)data; + + pthread_detach(pthread_self()); + + *control->checksum.cksum = CrcUpdate(*control->checksum.cksum, control->checksum.buf, control->checksum.len); + if (!NO_MD5) + md5_process_bytes(control->checksum.buf, control->checksum.len, &control->ctx); + free(control->checksum.buf); + unlock_mutex(control, &control->cksumlock); + return NULL; +} + +static void cksum_update(rzip_control *control) +{ + pthread_t thread; + + create_pthread(control, &thread, NULL, cksumthread, control); +} + static bool hash_search(rzip_control *control, struct rzip_state *st, double pct_base, double pct_multiple) { struct sliding_buffer *sb = &control->sb; @@ -670,18 +692,20 @@ static bool hash_search(rzip_control *control, struct rzip_state *st, double pct } } - if (p > (i64)cksum_limit) { - i64 n = MIN(st->chunk_size - p, control->page_size); - uchar *ckbuf = malloc(n); - - if (unlikely(!ckbuf)) + if (p > cksum_limit) { + /* We lock the mutex here and unlock it in the + * cksumthread. This lock protects all the data in + * control->checksum. + */ + lock_mutex(control, &control->cksumlock); + control->checksum.len = MIN(st->chunk_size - p, control->page_size); + control->checksum.buf = malloc(control->checksum.len); + if (unlikely(!control->checksum.buf)) fatal_return(("Failed to malloc ckbuf in hash_search\n"), false); - control->do_mcpy(control, ckbuf, cksum_limit, n); - st->cksum = CrcUpdate(st->cksum, ckbuf, n); - if (!NO_MD5) - md5_process_bytes(ckbuf, n, &control->ctx); - cksum_limit += n; - free(ckbuf); + control->do_mcpy(control, control->checksum.buf, cksum_limit, control->checksum.len); + control->checksum.cksum = &st->cksum; + cksum_update(control); + cksum_limit += control->checksum.len; } } @@ -692,19 +716,19 @@ static bool hash_search(rzip_control *control, struct rzip_state *st, double pct put_literal(control, st, st->last_match, st->chunk_size); if (st->chunk_size > cksum_limit) { - i64 n = st->chunk_size - cksum_limit; - uchar *ckbuf = malloc(n); - - if (unlikely(!ckbuf)) + lock_mutex(control, &control->cksumlock); + control->checksum.len = st->chunk_size - cksum_limit; + control->checksum.buf = malloc(control->checksum.len); + if (unlikely(!control->checksum.buf)) fatal_return(("Failed to malloc ckbuf in hash_search\n"), false); - control->do_mcpy(control, ckbuf, cksum_limit, n); - st->cksum = CrcUpdate(st->cksum, ckbuf, n); - if (!NO_MD5) - md5_process_bytes(ckbuf, n, &control->ctx); - cksum_limit += n; - free(ckbuf); + control->do_mcpy(control, control->checksum.buf, cksum_limit, control->checksum.len); + control->checksum.cksum = &st->cksum; + cksum_update(control); + cksum_limit += control->checksum.len; } + wait_mutex(control, &control->cksumlock); + if (unlikely(!put_literal(control, st, 0, 0))) return false; if (unlikely(!put_u32(control, st->ss, st->cksum))) @@ -851,7 +875,8 @@ bool rzip_fd(rzip_control *control, int fd_in, int fd_out) i64 free_space; if (!NO_MD5) - md5_init_ctx (&control->ctx); + md5_init_ctx(&control->ctx); + init_mutex(control, &control->cksumlock); st = calloc(sizeof(*st), 1); if (unlikely(!st)) diff --git a/stream.c b/stream.c index 50480bf..c896418 100644 --- a/stream.c +++ b/stream.c @@ -104,27 +104,38 @@ static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER; static pthread_t *threads; -static bool init_mutex(rzip_control *control, pthread_mutex_t *mutex) +bool init_mutex(rzip_control *control, pthread_mutex_t *mutex) { if (unlikely(pthread_mutex_init(mutex, NULL))) fatal_return(("pthread_mutex_init failed"), false); return true; } -static bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex) +bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex) { if (unlikely(pthread_mutex_unlock(mutex))) fatal_return(("pthread_mutex_unlock failed"), false); return true; } -static bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex) +bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex) { if (unlikely(pthread_mutex_lock(mutex))) fatal_return(("pthread_mutex_lock failed"), false); return true; } +/* Lock and unlock a mutex */ +bool wait_mutex(rzip_control *control, pthread_mutex_t *mutex) +{ + bool ret; + + ret = lock_mutex(control, mutex); + if (likely(ret)) + ret = unlock_mutex(control, mutex); + return ret; +} + static bool cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex) { if (unlikely(pthread_cond_wait(cond, mutex))) diff --git a/stream.h b/stream.h index abf00c5..d8152ec 100644 --- a/stream.h +++ b/stream.h @@ -23,9 +23,13 @@ #include "lrzip_private.h" #include -bool create_pthread(pthread_t *thread, pthread_attr_t *attr, +bool create_pthread(rzip_control *control, pthread_t *thread, pthread_attr_t * attr, void * (*start_routine)(void *), void *arg); bool join_pthread(pthread_t th, void **thread_return); +bool init_mutex(rzip_control *control, pthread_mutex_t *mutex); +bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex); +bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex); +bool wait_mutex(rzip_control *control, pthread_mutex_t *mutex); ssize_t write_1g(rzip_control *control, void *buf, i64 len); ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len); i64 get_readseek(rzip_control *control, int fd);