Convert the thread locking to use cksems

This commit is contained in:
Con Kolivas 2014-05-30 21:27:21 +10:00
parent 298d6a6715
commit 22496bdd5a

View file

@ -67,7 +67,7 @@ static struct compress_thread{
uchar c_type; /* Compression type */
i64 s_len; /* Data length uncompressed */
i64 c_len; /* Data length compressed */
pthread_mutex_t mutex; /* This thread's mutex */
cksem_t cksem; /* This thread's semaphore */
struct stream_info *sinfo;
int streamno;
uchar salt[SALT_LEN];
@ -898,14 +898,10 @@ bool prepare_streamout_threads(rzip_control *control)
fatal_return(("Unable to calloc cthread in prepare_streamout_threads\n"), false);
}
for (i = 0; i < control->threads; i++)
if (unlikely(!init_mutex(control, &cthread[i].mutex))) {
int x;
for (x = 0; x < i; x++) pthread_mutex_destroy(&cthread[x].mutex);
free(threads);
free(cthread);
return false;
}
for (i = 0; i < control->threads; i++) {
cksem_init(control, &cthread[i].cksem);
cksem_post(control, &cthread[i].cksem);
}
return true;
}
@ -917,13 +913,7 @@ bool close_streamout_threads(rzip_control *control)
/* Wait for the threads in the correct order in case they end up
* serialised */
for (i = 0; i < control->threads; i++) {
if (unlikely(!lock_mutex(control, &cthread[close_thread].mutex))) {
int x;
for (x = 0; x < i; x++) unlock_mutex(control, &cthread[close_thread].mutex);
free(cthread);
free(threads);
return false;
}
cksem_wait(control, &cthread[close_thread].cksem);
if (++close_thread == control->threads)
close_thread = 0;
@ -1440,7 +1430,7 @@ retry:
unlock_mutex(control, &output_lock);
error:
unlock_mutex(control, &cti->mutex);
cksem_post(control, &cti->cksem);
return NULL;
}
@ -1451,7 +1441,7 @@ static bool clear_buffer(rzip_control *control, struct stream_info *sinfo, int s
stream_thread_struct *s;
/* Make sure this thread doesn't already exist */
lock_mutex(control, &cthread[i].mutex);
cksem_wait(control, &cthread[i].cksem);
cthread[i].sinfo = sinfo;
cthread[i].streamno = streamno;
@ -1463,14 +1453,14 @@ static bool clear_buffer(rzip_control *control, struct stream_info *sinfo, int s
s = malloc(sizeof(stream_thread_struct));
if (unlikely(!s)) {
unlock_mutex(control, &cthread[i].mutex);
cksem_post(control, &cthread[i].cksem);
fatal_return(("Unable to malloc in clear_buffer"), false);
}
s->i = i;
s->control = control;
if (unlikely((!create_pthread(control, &threads[i], NULL, compthread, s)) ||
(!detach_pthread(control, &threads[i])))) {
unlock_mutex(control, &cthread[i].mutex);
cksem_post(control, &cthread[i].cksem);
return false;
}
@ -1480,7 +1470,7 @@ static bool clear_buffer(rzip_control *control, struct stream_info *sinfo, int s
* new one. */
sinfo->s[streamno].buf = malloc(sinfo->bufsize);
if (unlikely(!sinfo->s[streamno].buf)) {
unlock_mutex(control, &cthread[i].mutex);
cksem_post(control, &cthread[i].cksem);
fatal_return(("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize), false);
}
sinfo->s[streamno].buflen = 0;
@ -1770,8 +1760,8 @@ int close_stream_out(rzip_control *control, void *ss)
int close_thread = output_thread;
for (i = 0; i < control->threads; i++) {
lock_mutex(control, &cthread[close_thread].mutex);
unlock_mutex(control, &cthread[close_thread].mutex);
cksem_wait(control, &cthread[close_thread].cksem);
cksem_post(control, &cthread[close_thread].cksem);
if (++close_thread == control->threads)
close_thread = 0;
}