Perform all checksumming in a separate thread to speed up the hash search in the rzip phase.

This commit is contained in:
Con Kolivas 2012-03-11 11:49:40 +11:00
parent f4165ec263
commit 5edf8471d1
4 changed files with 76 additions and 26 deletions

View file

@ -297,6 +297,12 @@ struct sliding_buffer {
int fd; /* The fd of the mmap */ int fd; /* The fd of the mmap */
}; };
struct checksum {
uint32_t *cksum;
uchar *buf;
i64 len;
};
struct rzip_control { struct rzip_control {
char *infile; char *infile;
FILE *inFILE; // if a FILE is being read from FILE *inFILE; // if a FILE is being read from
@ -347,9 +353,13 @@ struct rzip_control {
uchar *hash; uchar *hash;
unsigned char eof; unsigned char eof;
unsigned char magic_written; unsigned char magic_written;
pthread_mutex_t cksumlock;
md5_ctx ctx; md5_ctx ctx;
uchar md5_resblock[MD5_DIGEST_SIZE]; uchar md5_resblock[MD5_DIGEST_SIZE];
i64 md5_read; // How far into the file the md5 has done so far i64 md5_read; // How far into the file the md5 has done so far
struct checksum checksum;
const char *util_infile; const char *util_infile;
char delete_infile; char delete_infile;
const char *util_outfile; const char *util_outfile;

67
rzip.c
View file

@ -556,6 +556,28 @@ static void show_distrib(rzip_control *control, struct rzip_state *st)
primary * 100.0 / (total ? : 1)); primary * 100.0 / (total ? : 1));
} }
/* Perform all checksumming in a separate thread to speed up the hash search. */
static void *cksumthread(void *data)
{
rzip_control *control = (rzip_control *)data;
pthread_detach(pthread_self());
*control->checksum.cksum = CrcUpdate(*control->checksum.cksum, control->checksum.buf, control->checksum.len);
if (!NO_MD5)
md5_process_bytes(control->checksum.buf, control->checksum.len, &control->ctx);
free(control->checksum.buf);
unlock_mutex(control, &control->cksumlock);
return NULL;
}
static void cksum_update(rzip_control *control)
{
pthread_t thread;
create_pthread(control, &thread, NULL, cksumthread, control);
}
static bool hash_search(rzip_control *control, struct rzip_state *st, double pct_base, double pct_multiple) static bool hash_search(rzip_control *control, struct rzip_state *st, double pct_base, double pct_multiple)
{ {
struct sliding_buffer *sb = &control->sb; struct sliding_buffer *sb = &control->sb;
@ -670,18 +692,20 @@ static bool hash_search(rzip_control *control, struct rzip_state *st, double pct
} }
} }
if (p > (i64)cksum_limit) { if (p > cksum_limit) {
i64 n = MIN(st->chunk_size - p, control->page_size); /* We lock the mutex here and unlock it in the
uchar *ckbuf = malloc(n); * cksumthread. This lock protects all the data in
* control->checksum.
if (unlikely(!ckbuf)) */
lock_mutex(control, &control->cksumlock);
control->checksum.len = MIN(st->chunk_size - p, control->page_size);
control->checksum.buf = malloc(control->checksum.len);
if (unlikely(!control->checksum.buf))
fatal_return(("Failed to malloc ckbuf in hash_search\n"), false); fatal_return(("Failed to malloc ckbuf in hash_search\n"), false);
control->do_mcpy(control, ckbuf, cksum_limit, n); control->do_mcpy(control, control->checksum.buf, cksum_limit, control->checksum.len);
st->cksum = CrcUpdate(st->cksum, ckbuf, n); control->checksum.cksum = &st->cksum;
if (!NO_MD5) cksum_update(control);
md5_process_bytes(ckbuf, n, &control->ctx); cksum_limit += control->checksum.len;
cksum_limit += n;
free(ckbuf);
} }
} }
@ -692,19 +716,19 @@ static bool hash_search(rzip_control *control, struct rzip_state *st, double pct
put_literal(control, st, st->last_match, st->chunk_size); put_literal(control, st, st->last_match, st->chunk_size);
if (st->chunk_size > cksum_limit) { if (st->chunk_size > cksum_limit) {
i64 n = st->chunk_size - cksum_limit; lock_mutex(control, &control->cksumlock);
uchar *ckbuf = malloc(n); control->checksum.len = st->chunk_size - cksum_limit;
control->checksum.buf = malloc(control->checksum.len);
if (unlikely(!ckbuf)) if (unlikely(!control->checksum.buf))
fatal_return(("Failed to malloc ckbuf in hash_search\n"), false); fatal_return(("Failed to malloc ckbuf in hash_search\n"), false);
control->do_mcpy(control, ckbuf, cksum_limit, n); control->do_mcpy(control, control->checksum.buf, cksum_limit, control->checksum.len);
st->cksum = CrcUpdate(st->cksum, ckbuf, n); control->checksum.cksum = &st->cksum;
if (!NO_MD5) cksum_update(control);
md5_process_bytes(ckbuf, n, &control->ctx); cksum_limit += control->checksum.len;
cksum_limit += n;
free(ckbuf);
} }
wait_mutex(control, &control->cksumlock);
if (unlikely(!put_literal(control, st, 0, 0))) if (unlikely(!put_literal(control, st, 0, 0)))
return false; return false;
if (unlikely(!put_u32(control, st->ss, st->cksum))) if (unlikely(!put_u32(control, st->ss, st->cksum)))
@ -852,6 +876,7 @@ bool rzip_fd(rzip_control *control, int fd_in, int fd_out)
if (!NO_MD5) if (!NO_MD5)
md5_init_ctx(&control->ctx); md5_init_ctx(&control->ctx);
init_mutex(control, &control->cksumlock);
st = calloc(sizeof(*st), 1); st = calloc(sizeof(*st), 1);
if (unlikely(!st)) if (unlikely(!st))

View file

@ -104,27 +104,38 @@ static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *threads; static pthread_t *threads;
static bool init_mutex(rzip_control *control, pthread_mutex_t *mutex) bool init_mutex(rzip_control *control, pthread_mutex_t *mutex)
{ {
if (unlikely(pthread_mutex_init(mutex, NULL))) if (unlikely(pthread_mutex_init(mutex, NULL)))
fatal_return(("pthread_mutex_init failed"), false); fatal_return(("pthread_mutex_init failed"), false);
return true; return true;
} }
static bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex) bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex)
{ {
if (unlikely(pthread_mutex_unlock(mutex))) if (unlikely(pthread_mutex_unlock(mutex)))
fatal_return(("pthread_mutex_unlock failed"), false); fatal_return(("pthread_mutex_unlock failed"), false);
return true; return true;
} }
static bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex) bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex)
{ {
if (unlikely(pthread_mutex_lock(mutex))) if (unlikely(pthread_mutex_lock(mutex)))
fatal_return(("pthread_mutex_lock failed"), false); fatal_return(("pthread_mutex_lock failed"), false);
return true; return true;
} }
/* Lock and unlock a mutex */
bool wait_mutex(rzip_control *control, pthread_mutex_t *mutex)
{
bool ret;
ret = lock_mutex(control, mutex);
if (likely(ret))
ret = unlock_mutex(control, mutex);
return ret;
}
static bool cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex) static bool cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex)
{ {
if (unlikely(pthread_cond_wait(cond, mutex))) if (unlikely(pthread_cond_wait(cond, mutex)))

View file

@ -23,9 +23,13 @@
#include "lrzip_private.h" #include "lrzip_private.h"
#include <pthread.h> #include <pthread.h>
bool create_pthread(pthread_t *thread, pthread_attr_t *attr, bool create_pthread(rzip_control *control, pthread_t *thread, pthread_attr_t * attr,
void * (*start_routine)(void *), void *arg); void * (*start_routine)(void *), void *arg);
bool join_pthread(pthread_t th, void **thread_return); bool join_pthread(pthread_t th, void **thread_return);
bool init_mutex(rzip_control *control, pthread_mutex_t *mutex);
bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex);
bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex);
bool wait_mutex(rzip_control *control, pthread_mutex_t *mutex);
ssize_t write_1g(rzip_control *control, void *buf, i64 len); ssize_t write_1g(rzip_control *control, void *buf, i64 len);
ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len); ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len);
i64 get_readseek(rzip_control *control, int fd); i64 get_readseek(rzip_control *control, int fd);