diff --git a/stream.c b/stream.c index fbbf5b7..1cd18ac 100644 --- a/stream.c +++ b/stream.c @@ -41,7 +41,7 @@ struct uncomp_thread{ uchar c_type; sem_t complete; sem_t ready; /* Taken this thread's data so it can die */ - sem_t free; + int busy; int stream; } *ucthread; @@ -71,20 +71,6 @@ retry: } } -static inline int trywait_sem(sem_t *s) -{ - int ret; - -retry: - if ((ret = sem_trywait(s)) == -1) { - if (errno == EINTR) - goto retry; - if (errno != EAGAIN) - fatal("sem_trywait"); - } - return ret; -} - static inline void destroy_sem(sem_t *s) { if (unlikely(sem_destroy(s))) @@ -868,8 +854,6 @@ void *open_stream_in(int f, int n) for (i = 0; i < total_threads; i++) { init_sem(&ucthread[i].complete); - init_sem(&ucthread[i].free); - post_sem(&ucthread[i].free); init_sem(&ucthread[i].ready); } @@ -1186,8 +1170,6 @@ fill_another: sinfo->total_read += header_length; - /* Wait till the next thread is free */ - wait_sem(&ucthread[s->uthread_no].free); fsync(control.fd_out); s_buf = malloc(u_len); @@ -1206,6 +1188,8 @@ fill_another: ucthread[s->uthread_no].stream = stream; s->last_head = last_head; + /* List this thread as busy */ + ucthread[s->uthread_no].busy = 1; print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n", s->uthread_no, c_len, stream); create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no); @@ -1213,16 +1197,12 @@ fill_another: if (++s->uthread_no == s->base_thread + s->total_threads) s->uthread_no = s->base_thread; - /* Reached the end of this stream */ + /* Reached the end of this stream, no more data to read in, otherwise + * see if the next thread is free to grab more data */ if (!last_head) s->eos = 1; - else if (s->uthread_no != s->unext_thread) { - /* See if the next thread is free as well */ - if (!trywait_sem(&ucthread[s->uthread_no].free)) { - post_sem(&ucthread[s->uthread_no].free); - goto fill_another; - } - } + else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy) + goto fill_another; out: /* "ready" tells the decompression thread we're ready for its data */ post_sem(&ucthread[s->unext_thread].ready); @@ -1236,7 +1216,7 @@ out: s->bufp = 0; join_pthread(threads[s->unext_thread], NULL); - post_sem(&ucthread[s->unext_thread].free); + ucthread[s->unext_thread].busy = 0; /* As the ready semaphore may or may not have been waited on in * ucompthread, we reset it regardless. */ init_sem(&ucthread[s->unext_thread].ready); @@ -1334,9 +1314,7 @@ int close_stream_in(void *ss) free(sinfo->s[i].buf); for (i = 0; i < control.threads + 1; i++) { - wait_sem(&ucthread[i].free); destroy_sem(&ucthread[i].complete); - destroy_sem(&ucthread[i].free); destroy_sem(&ucthread[i].ready); }