diff --git a/stream.c b/stream.c index a795772..cf84193 100644 --- a/stream.c +++ b/stream.c @@ -67,7 +67,7 @@ static struct compress_thread{ uchar c_type; /* Compression type */ i64 s_len; /* Data length uncompressed */ i64 c_len; /* Data length compressed */ - pthread_mutex_t mutex; /* This thread's mutex */ + cksem_t cksem; /* This thread's semaphore */ struct stream_info *sinfo; int streamno; uchar salt[SALT_LEN]; @@ -898,14 +898,10 @@ bool prepare_streamout_threads(rzip_control *control) fatal_return(("Unable to calloc cthread in prepare_streamout_threads\n"), false); } - for (i = 0; i < control->threads; i++) - if (unlikely(!init_mutex(control, &cthread[i].mutex))) { - int x; - for (x = 0; x < i; x++) pthread_mutex_destroy(&cthread[x].mutex); - free(threads); - free(cthread); - return false; - } + for (i = 0; i < control->threads; i++) { + cksem_init(control, &cthread[i].cksem); + cksem_post(control, &cthread[i].cksem); + } return true; } @@ -917,13 +913,7 @@ bool close_streamout_threads(rzip_control *control) /* Wait for the threads in the correct order in case they end up * serialised */ for (i = 0; i < control->threads; i++) { - if (unlikely(!lock_mutex(control, &cthread[close_thread].mutex))) { - int x; - for (x = 0; x < i; x++) unlock_mutex(control, &cthread[close_thread].mutex); - free(cthread); - free(threads); - return false; - } + cksem_wait(control, &cthread[close_thread].cksem); if (++close_thread == control->threads) close_thread = 0; @@ -1440,7 +1430,7 @@ retry: unlock_mutex(control, &output_lock); error: - unlock_mutex(control, &cti->mutex); + cksem_post(control, &cti->cksem); return NULL; } @@ -1451,7 +1441,7 @@ static bool clear_buffer(rzip_control *control, struct stream_info *sinfo, int s stream_thread_struct *s; /* Make sure this thread doesn't already exist */ - lock_mutex(control, &cthread[i].mutex); + cksem_wait(control, &cthread[i].cksem); cthread[i].sinfo = sinfo; cthread[i].streamno = streamno; @@ -1463,14 +1453,14 @@ static bool clear_buffer(rzip_control *control, struct stream_info *sinfo, int s s = malloc(sizeof(stream_thread_struct)); if (unlikely(!s)) { - unlock_mutex(control, &cthread[i].mutex); + cksem_post(control, &cthread[i].cksem); fatal_return(("Unable to malloc in clear_buffer"), false); } s->i = i; s->control = control; if (unlikely((!create_pthread(control, &threads[i], NULL, compthread, s)) || (!detach_pthread(control, &threads[i])))) { - unlock_mutex(control, &cthread[i].mutex); + cksem_post(control, &cthread[i].cksem); return false; } @@ -1480,7 +1470,7 @@ static bool clear_buffer(rzip_control *control, struct stream_info *sinfo, int s * new one. */ sinfo->s[streamno].buf = malloc(sinfo->bufsize); if (unlikely(!sinfo->s[streamno].buf)) { - unlock_mutex(control, &cthread[i].mutex); + cksem_post(control, &cthread[i].cksem); fatal_return(("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize), false); } sinfo->s[streamno].buflen = 0; @@ -1770,8 +1760,8 @@ int close_stream_out(rzip_control *control, void *ss) int close_thread = output_thread; for (i = 0; i < control->threads; i++) { - lock_mutex(control, &cthread[close_thread].mutex); - unlock_mutex(control, &cthread[close_thread].mutex); + cksem_wait(control, &cthread[close_thread].cksem); + cksem_post(control, &cthread[close_thread].cksem); if (++close_thread == control->threads) close_thread = 0; }