Convert semaphore primitives to pthread_mutexes making them more portable, thus allowing multithreading to work on OSX.

This commit is contained in:
Con Kolivas 2011-02-17 00:24:28 +11:00
parent 05c5326df3
commit 626e0be281
2 changed files with 73 additions and 90 deletions

1
rzip.h
View file

@ -36,7 +36,6 @@
#include <bzlib.h>
#include <zlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/resource.h>
#include <netinet/in.h>

162
stream.c
View file

@ -27,9 +27,7 @@ struct compress_thread{
uchar c_type; /* Compression type */
i64 s_len; /* Data length uncompressed */
i64 c_len; /* Data length compressed */
sem_t complete; /* Signal when this thread has finished */
sem_t free; /* This thread no longer exists */
int wait_on; /* Which thread has to complete before this can write its data */
pthread_mutex_t mutex; /* This thread's mutex */
struct stream_info *sinfo;
int stream;
} *cthread;
@ -39,42 +37,38 @@ struct uncomp_thread{
i64 u_len, c_len;
i64 last_head;
uchar c_type;
sem_t complete;
sem_t ready; /* Taken this thread's data so it can die */
int busy;
int stream;
} *ucthread;
void init_sem(sem_t *sem)
static void init_mutex(pthread_mutex_t *mutex)
{
if (unlikely(sem_init(sem, 0, 0)))
fatal("sem_init\n");
if (unlikely(pthread_mutex_init(mutex, NULL)))
fatal("pthread_mutex_init failed");
}
static inline void post_sem(sem_t *s)
static void unlock_mutex(pthread_mutex_t *mutex)
{
retry:
if (unlikely((sem_post(s)) == -1)) {
if (errno == EINTR)
goto retry;
fatal("sem_post failed");
}
if (unlikely(pthread_mutex_unlock(mutex)))
fatal("pthread_mutex_unlock failed");
}
static inline void wait_sem(sem_t *s)
static void lock_mutex(pthread_mutex_t *mutex)
{
retry:
if (unlikely((sem_wait(s)) == -1)) {
if (errno == EINTR)
goto retry;
fatal("sem_wait failed");
}
if (unlikely(pthread_mutex_lock(mutex)))
fatal("pthread_mutex_lock failed");
}
static inline void destroy_sem(sem_t *s)
static void cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
if (unlikely(sem_destroy(s)))
fatal("sem_destroy failed\n");
if (unlikely(pthread_cond_wait(cond, mutex)))
fatal("pthread_cond_wait failed");
}
static void cond_broadcast(pthread_cond_t *cond)
{
if (unlikely(pthread_cond_broadcast(cond)))
fatal("pthread_cond_broadcast failed");
}
void create_pthread(pthread_t * thread, pthread_attr_t * attr,
@ -731,31 +725,24 @@ void prepare_streamout_threads(void)
if (unlikely(!cthread))
fatal("Unable to calloc cthread in prepare_streamout_threads\n");
for (i = 0; i < control.threads; i++) {
init_sem(&cthread[i].complete);
init_sem(&cthread[i].free);
post_sem(&cthread[i].free);
}
/* Threads need to wait on the thread before them before dumping their
* data. This is done in a circle up to control.threads */
cthread[0].wait_on = control.threads - 1;
for (i = 1; i < control.threads; i++)
cthread[i].wait_on = i - 1;
/* Signal thread 0 that it can start */
if (control.threads > 1)
post_sem(&cthread[control.threads - 1].complete);
for (i = 0; i < control.threads; i++)
init_mutex(&cthread[i].mutex);
}
static long output_thread;
static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
void close_streamout_threads(void)
{
int i;
int i, close_thread = output_thread;
/* Wait for the threads in the correct order in case they end up
* serialised */
for (i = 0; i < control.threads; i++) {
wait_sem(&cthread[i].free);
destroy_sem(&cthread[i].complete);
destroy_sem(&cthread[i].free);
lock_mutex(&cthread[close_thread].mutex);
if (++close_thread == control.threads)
close_thread = 0;
}
free(cthread);
free(threads);
@ -852,11 +839,6 @@ void *open_stream_in(int f, int n)
if (unlikely(!ucthread))
fatal("Unable to calloc cthread in open_stream_in\n");
for (i = 0; i < total_threads; i++) {
init_sem(&ucthread[i].complete);
init_sem(&ucthread[i].ready);
}
sinfo->num_streams = n;
sinfo->fd = f;
sinfo->initial_pos = lseek(f, 0, SEEK_CUR);
@ -973,17 +955,20 @@ retry:
/* If compression fails for whatever reason multithreaded, then wait
* for the previous thread to finish, serialising the work to decrease
* the memory requirements, increasing the chance of success */
if (ret){
if (waited)
fatal("Failed to compress in compthread\n");
print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n");
}
if (unlikely(ret && waited))
fatal("Failed to compress in compthread\n");
if (control.threads > 1 && !waited)
wait_sem(&cthread[cti->wait_on].complete);
waited = 1;
if (ret)
if (!waited) {
lock_mutex(&output_lock);
while (output_thread != i)
cond_wait(&output_cond, &output_lock);
unlock_mutex(&output_lock);
waited = 1;
}
if (unlikely(ret)) {
print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n");
goto retry;
}
if (!ctis->chunks++) {
int j;
@ -1029,8 +1014,13 @@ retry:
ctis->cur_pos += cti->c_len;
free(cti->s_buf);
post_sem(&cti->complete);
post_sem(&cti->free);
lock_mutex(&output_lock);
if (++output_thread == control.threads)
output_thread = 0;
cond_broadcast(&output_cond);
unlock_mutex(&output_lock);
unlock_mutex(&cti->mutex);
return 0;
}
@ -1040,7 +1030,7 @@ static void clear_buffer(struct stream_info *sinfo, int stream, int newbuf)
static long i = 0;
/* Make sure this thread doesn't already exist */
wait_sem(&cthread[i].free);
lock_mutex(&cthread[i].mutex);
cthread[i].sinfo = sinfo;
cthread[i].stream = stream;
@ -1104,23 +1094,21 @@ retry:
/* As per compression, serialise the decompression if it fails in
* parallel */
if (ret) {
if (waited)
if (unlikely(ret)) {
if (unlikely(waited))
fatal("Failed to decompress in ucompthread\n");
print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n");
/* This "ready" tells this thread that we're ready to receive
* its data. We do not strictly need to wait for this, so it's
* used when decompression fails due to inadequate memory to
* try again serialised. */
wait_sem(&uci->ready);
/* We do not strictly need to wait for this, so it's used when
* decompression fails due to inadequate memory to try again
* serialised. */
lock_mutex(&output_lock);
while (output_thread != i)
cond_wait(&output_cond, &output_lock);
unlock_mutex(&output_lock);
waited = 1;
goto retry;
}
/* This "complete" tells the main thread that this thread has its
* decompressed data ready */
post_sem(&uci->complete);
print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->stream);
return 0;
@ -1138,6 +1126,9 @@ static int fill_buffer(struct stream_info *sinfo, int stream)
if (s->eos)
goto out;
fill_another:
if (unlikely(ucthread[s->uthread_no].busy))
fatal("Trying to start a busy thread, this shouldn't happen!\n");
if (unlikely(seekto(sinfo, s->last_head)))
return -1;
@ -1204,23 +1195,20 @@ fill_another:
else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy)
goto fill_another;
out:
/* "ready" tells the decompression thread we're ready for its data */
post_sem(&ucthread[s->unext_thread].ready);
/* This "complete" is the deco thread telling us it's finished
* decompressing data */
wait_sem(&ucthread[s->unext_thread].complete);
print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread);
lock_mutex(&output_lock);
output_thread = s->unext_thread;
cond_broadcast(&output_cond);
unlock_mutex(&output_lock);
/* join_pthread here will make it wait till the data is ready */
join_pthread(threads[s->unext_thread], NULL);
ucthread[s->unext_thread].busy = 0;
print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread);
s->buf = ucthread[s->unext_thread].s_buf;
s->buflen = ucthread[s->unext_thread].u_len;
s->bufp = 0;
join_pthread(threads[s->unext_thread], NULL);
ucthread[s->unext_thread].busy = 0;
/* As the ready semaphore may or may not have been waited on in
* ucompthread, we reset it regardless. */
init_sem(&ucthread[s->unext_thread].ready);
if (++s->unext_thread == s->base_thread + s->total_threads)
s->unext_thread = s->base_thread;
@ -1313,11 +1301,7 @@ int close_stream_in(void *ss)
for (i = 0; i < sinfo->num_streams; i++)
free(sinfo->s[i].buf);
for (i = 0; i < control.threads + 1; i++) {
destroy_sem(&ucthread[i].complete);
destroy_sem(&ucthread[i].ready);
}
output_thread = 0;
free(ucthread);
free(threads);
free(sinfo->s);