The free semaphore is now only updated from the main process on decompression so there are no synchronisation concerns.

Remove the free semaphore and the fragile use of sem_trywait and replace it with a simple busy flag for threads on decompression.
This commit is contained in:
Con Kolivas 2011-02-08 08:55:36 +11:00
parent 9c2b86fec6
commit aa00c29fba

View file

@ -41,7 +41,7 @@ struct uncomp_thread{
uchar c_type;
sem_t complete;
sem_t ready; /* Taken this thread's data so it can die */
sem_t free;
int busy;
int stream;
} *ucthread;
@ -71,20 +71,6 @@ retry:
}
}
static inline int trywait_sem(sem_t *s)
{
int ret;
retry:
if ((ret = sem_trywait(s)) == -1) {
if (errno == EINTR)
goto retry;
if (errno != EAGAIN)
fatal("sem_trywait");
}
return ret;
}
static inline void destroy_sem(sem_t *s)
{
if (unlikely(sem_destroy(s)))
@ -868,8 +854,6 @@ void *open_stream_in(int f, int n)
for (i = 0; i < total_threads; i++) {
init_sem(&ucthread[i].complete);
init_sem(&ucthread[i].free);
post_sem(&ucthread[i].free);
init_sem(&ucthread[i].ready);
}
@ -1186,8 +1170,6 @@ fill_another:
sinfo->total_read += header_length;
/* Wait till the next thread is free */
wait_sem(&ucthread[s->uthread_no].free);
fsync(control.fd_out);
s_buf = malloc(u_len);
@ -1206,6 +1188,8 @@ fill_another:
ucthread[s->uthread_no].stream = stream;
s->last_head = last_head;
/* List this thread as busy */
ucthread[s->uthread_no].busy = 1;
print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
s->uthread_no, c_len, stream);
create_pthread(&threads[s->uthread_no], NULL, ucompthread, (void *)s->uthread_no);
@ -1213,16 +1197,12 @@ fill_another:
if (++s->uthread_no == s->base_thread + s->total_threads)
s->uthread_no = s->base_thread;
/* Reached the end of this stream */
/* Reached the end of this stream, no more data to read in, otherwise
* see if the next thread is free to grab more data */
if (!last_head)
s->eos = 1;
else if (s->uthread_no != s->unext_thread) {
/* See if the next thread is free as well */
if (!trywait_sem(&ucthread[s->uthread_no].free)) {
post_sem(&ucthread[s->uthread_no].free);
goto fill_another;
}
}
else if (s->uthread_no != s->unext_thread && !ucthread[s->uthread_no].busy)
goto fill_another;
out:
/* "ready" tells the decompression thread we're ready for its data */
post_sem(&ucthread[s->unext_thread].ready);
@ -1236,7 +1216,7 @@ out:
s->bufp = 0;
join_pthread(threads[s->unext_thread], NULL);
post_sem(&ucthread[s->unext_thread].free);
ucthread[s->unext_thread].busy = 0;
/* As the ready semaphore may or may not have been waited on in
* ucompthread, we reset it regardless. */
init_sem(&ucthread[s->unext_thread].ready);
@ -1334,9 +1314,7 @@ int close_stream_in(void *ss)
free(sinfo->s[i].buf);
for (i = 0; i < control.threads + 1; i++) {
wait_sem(&ucthread[i].free);
destroy_sem(&ucthread[i].complete);
destroy_sem(&ucthread[i].free);
destroy_sem(&ucthread[i].ready);
}