From 626e0be28132301a059bc837b3810110b392138a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 17 Feb 2011 00:24:28 +1100 Subject: [PATCH] Convert semaphore primitives to pthread_mutexes making them more portable, thus allowing multithreading to work on OSX. --- rzip.h | 1 - stream.c | 162 +++++++++++++++++++++++++------------------------------ 2 files changed, 73 insertions(+), 90 deletions(-) diff --git a/rzip.h b/rzip.h index 24cb77f..c8e35a9 100644 --- a/rzip.h +++ b/rzip.h @@ -36,7 +36,6 @@ #include #include #include -#include #include #include diff --git a/stream.c b/stream.c index 2ea6502..b8381d2 100644 --- a/stream.c +++ b/stream.c @@ -27,9 +27,7 @@ struct compress_thread{ uchar c_type; /* Compression type */ i64 s_len; /* Data length uncompressed */ i64 c_len; /* Data length compressed */ - sem_t complete; /* Signal when this thread has finished */ - sem_t free; /* This thread no longer exists */ - int wait_on; /* Which thread has to complete before this can write its data */ + pthread_mutex_t mutex; /* This thread's mutex */ struct stream_info *sinfo; int stream; } *cthread; @@ -39,42 +37,38 @@ struct uncomp_thread{ i64 u_len, c_len; i64 last_head; uchar c_type; - sem_t complete; - sem_t ready; /* Taken this thread's data so it can die */ int busy; int stream; } *ucthread; -void init_sem(sem_t *sem) +static void init_mutex(pthread_mutex_t *mutex) { - if (unlikely(sem_init(sem, 0, 0))) - fatal("sem_init\n"); + if (unlikely(pthread_mutex_init(mutex, NULL))) + fatal("pthread_mutex_init failed"); } -static inline void post_sem(sem_t *s) +static void unlock_mutex(pthread_mutex_t *mutex) { -retry: - if (unlikely((sem_post(s)) == -1)) { - if (errno == EINTR) - goto retry; - fatal("sem_post failed"); - } + if (unlikely(pthread_mutex_unlock(mutex))) + fatal("pthread_mutex_unlock failed"); } -static inline void wait_sem(sem_t *s) +static void lock_mutex(pthread_mutex_t *mutex) { -retry: - if (unlikely((sem_wait(s)) == -1)) { - if (errno == EINTR) - goto retry; - fatal("sem_wait failed"); - } + if (unlikely(pthread_mutex_lock(mutex))) + fatal("pthread_mutex_lock failed"); } -static inline void destroy_sem(sem_t *s) +static void cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) { - if (unlikely(sem_destroy(s))) - fatal("sem_destroy failed\n"); + if (unlikely(pthread_cond_wait(cond, mutex))) + fatal("pthread_cond_wait failed"); +} + +static void cond_broadcast(pthread_cond_t *cond) +{ + if (unlikely(pthread_cond_broadcast(cond))) + fatal("pthread_cond_broadcast failed"); } void create_pthread(pthread_t * thread, pthread_attr_t * attr, @@ -731,31 +725,24 @@ void prepare_streamout_threads(void) if (unlikely(!cthread)) fatal("Unable to calloc cthread in prepare_streamout_threads\n"); - for (i = 0; i < control.threads; i++) { - init_sem(&cthread[i].complete); - init_sem(&cthread[i].free); - post_sem(&cthread[i].free); - } - - /* Threads need to wait on the thread before them before dumping their - * data. This is done in a circle up to control.threads */ - cthread[0].wait_on = control.threads - 1; - for (i = 1; i < control.threads; i++) - cthread[i].wait_on = i - 1; - - /* Signal thread 0 that it can start */ - if (control.threads > 1) - post_sem(&cthread[control.threads - 1].complete); + for (i = 0; i < control.threads; i++) + init_mutex(&cthread[i].mutex); } +static long output_thread; +static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER; + void close_streamout_threads(void) { - int i; + int i, close_thread = output_thread; + /* Wait for the threads in the correct order in case they end up + * serialised */ for (i = 0; i < control.threads; i++) { - wait_sem(&cthread[i].free); - destroy_sem(&cthread[i].complete); - destroy_sem(&cthread[i].free); + lock_mutex(&cthread[close_thread].mutex); + if (++close_thread == control.threads) + close_thread = 0; } free(cthread); free(threads); @@ -852,11 +839,6 @@ void *open_stream_in(int f, int n) if (unlikely(!ucthread)) fatal("Unable to calloc cthread in open_stream_in\n"); - for (i = 0; i < total_threads; i++) { - init_sem(&ucthread[i].complete); - init_sem(&ucthread[i].ready); - } - sinfo->num_streams = n; sinfo->fd = f; sinfo->initial_pos = lseek(f, 0, SEEK_CUR); @@ -973,17 +955,20 @@ retry: /* If compression fails for whatever reason multithreaded, then wait * for the previous thread to finish, serialising the work to decrease * the memory requirements, increasing the chance of success */ - if (ret){ - if (waited) - fatal("Failed to compress in compthread\n"); - print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n"); - } + if (unlikely(ret && waited)) + fatal("Failed to compress in compthread\n"); - if (control.threads > 1 && !waited) - wait_sem(&cthread[cti->wait_on].complete); - waited = 1; - if (ret) + if (!waited) { + lock_mutex(&output_lock); + while (output_thread != i) + cond_wait(&output_cond, &output_lock); + unlock_mutex(&output_lock); + waited = 1; + } + if (unlikely(ret)) { + print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n"); goto retry; + } if (!ctis->chunks++) { int j; @@ -1029,8 +1014,13 @@ retry: ctis->cur_pos += cti->c_len; free(cti->s_buf); - post_sem(&cti->complete); - post_sem(&cti->free); + lock_mutex(&output_lock); + if (++output_thread == control.threads) + output_thread = 0; + cond_broadcast(&output_cond); + unlock_mutex(&output_lock); + + unlock_mutex(&cti->mutex); return 0; } @@ -1040,7 +1030,7 @@ static void clear_buffer(struct stream_info *sinfo, int stream, int newbuf) static long i = 0; /* Make sure this thread doesn't already exist */ - wait_sem(&cthread[i].free); + lock_mutex(&cthread[i].mutex); cthread[i].sinfo = sinfo; cthread[i].stream = stream; @@ -1104,23 +1094,21 @@ retry: /* As per compression, serialise the decompression if it fails in * parallel */ - if (ret) { - if (waited) + if (unlikely(ret)) { + if (unlikely(waited)) fatal("Failed to decompress in ucompthread\n"); print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n"); - /* This "ready" tells this thread that we're ready to receive - * its data. We do not strictly need to wait for this, so it's - * used when decompression fails due to inadequate memory to - * try again serialised. */ - wait_sem(&uci->ready); + /* We do not strictly need to wait for this, so it's used when + * decompression fails due to inadequate memory to try again + * serialised. */ + lock_mutex(&output_lock); + while (output_thread != i) + cond_wait(&output_cond, &output_lock); + unlock_mutex(&output_lock); waited = 1; goto retry; } - /* This "complete" tells the main thread that this thread has its - * decompressed data ready */ - post_sem(&uci->complete); - print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->stream); return 0; @@ -1138,6 +1126,9 @@ static int fill_buffer(struct stream_info *sinfo, int stream) if (s->eos) goto out; fill_another: + if (unlikely(ucthread[s->uthread_no].busy)) + fatal("Trying to start a busy thread, this shouldn't happen!\n"); + if (unlikely(seekto(sinfo, s->last_head))) return -1; @@ -1204,23 +1195,20 @@ fill_another: else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy) goto fill_another; out: - /* "ready" tells the decompression thread we're ready for its data */ - post_sem(&ucthread[s->unext_thread].ready); - /* This "complete" is the deco thread telling us it's finished - * decompressing data */ - wait_sem(&ucthread[s->unext_thread].complete); - print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread); + lock_mutex(&output_lock); + output_thread = s->unext_thread; + cond_broadcast(&output_cond); + unlock_mutex(&output_lock); + /* join_pthread here will make it wait till the data is ready */ + join_pthread(threads[s->unext_thread], NULL); + ucthread[s->unext_thread].busy = 0; + + print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread); s->buf = ucthread[s->unext_thread].s_buf; s->buflen = ucthread[s->unext_thread].u_len; s->bufp = 0; - join_pthread(threads[s->unext_thread], NULL); - ucthread[s->unext_thread].busy = 0; - /* As the ready semaphore may or may not have been waited on in - * ucompthread, we reset it regardless. */ - init_sem(&ucthread[s->unext_thread].ready); - if (++s->unext_thread == s->base_thread + s->total_threads) s->unext_thread = s->base_thread; @@ -1313,11 +1301,7 @@ int close_stream_in(void *ss) for (i = 0; i < sinfo->num_streams; i++) free(sinfo->s[i].buf); - for (i = 0; i < control.threads + 1; i++) { - destroy_sem(&ucthread[i].complete); - destroy_sem(&ucthread[i].ready); - } - + output_thread = 0; free(ucthread); free(threads); free(sinfo->s);