Fix threading errors.

Patch by <mike@zentific.com>
This commit is contained in:
Con Kolivas 2011-03-11 08:33:35 +11:00
parent 844d85bd19
commit 643054ae22
2 changed files with 42 additions and 19 deletions

View file

@ -178,7 +178,6 @@ struct rzip_control {
long page_size;
int fd_out;
md5_ctx ctx;
void *data; // random data pointer associated for use in callbacks
i64 md5_read; // How far into the file the md5 has done so far
};

View file

@ -77,6 +77,16 @@ static struct uncomp_thread{
int stream;
} *ucthread;
typedef struct stream_thread_struct {
int i;
rzip_control *control;
} stream_thread_struct;
static long output_thread;
static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *threads;
static void init_mutex(pthread_mutex_t *mutex)
{
if (unlikely(pthread_mutex_init(mutex, NULL)))
@ -752,8 +762,6 @@ static int seekto(struct stream_info *sinfo, i64 pos)
return 0;
}
static pthread_t *threads;
void prepare_streamout_threads(rzip_control *control)
{
int i;
@ -778,9 +786,6 @@ void prepare_streamout_threads(rzip_control *control)
init_mutex(&cthread[i].mutex);
}
static long output_thread;
static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
void close_streamout_threads(rzip_control *control)
{
@ -991,13 +996,17 @@ failed:
* backend compression and is then freed here */
static void *compthread(void *data)
{
rzip_control *control = data;
long i = (long)control->data;
control->data = NULL;
struct compress_thread *cti = &cthread[i];
struct stream_info *ctis = cti->sinfo;
stream_thread_struct *s = data;
rzip_control *control = s->control;
long i = s->i;
struct compress_thread *cti;
struct stream_info *ctis;
int waited = 0, ret = 0;
free(data);
cti = &cthread[i];
ctis = cti->sinfo;
if (unlikely(setpriority(PRIO_PROCESS, 0, control->nice_val) == -1))
print_err("Warning, unable to set nice value on thread\n");
@ -1098,6 +1107,7 @@ retry:
static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int stream, int newbuf)
{
static long i = 0;
stream_thread_struct *s;
/* Make sure this thread doesn't already exist */
lock_mutex(&cthread[i].mutex);
@ -1109,8 +1119,13 @@ static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int s
print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n",
i, cthread[i].s_len, stream);
control->data = (void*)i;
create_pthread(&threads[i], NULL, compthread, control);
s = malloc(sizeof(stream_thread_struct));
if (unlikely(!s))
fatal("Unable to malloc in clear_buffer");
s->i = i;
s->control = control;
create_pthread(&threads[i], NULL, compthread, s);
if (newbuf) {
/* The stream buffer has been given to the thread, allocate a new one */
@ -1132,12 +1147,15 @@ void flush_buffer(rzip_control *control, struct stream_info *sinfo, int stream)
static void *ucompthread(void *data)
{
rzip_control *control = data;
long i = (long)control->data;
control->data = NULL;
struct uncomp_thread *uci = &ucthread[i];
stream_thread_struct *s = data;
rzip_control *control = s->control;
long i = s->i;
struct uncomp_thread *uci;
int waited = 0, ret = 0;
free(data);
uci = &ucthread[i];
if (unlikely(setpriority(PRIO_PROCESS, 0, control->nice_val) == -1))
print_err("Warning, unable to set nice value on thread\n");
@ -1193,6 +1211,7 @@ static int fill_buffer(rzip_control *control, struct stream_info *sinfo, int str
i64 header_length, u_len, c_len, last_head;
struct stream *s = &sinfo->s[stream];
uchar c_type, *s_buf;
stream_thread_struct *st;
if (s->buf)
free(s->buf);
@ -1257,8 +1276,13 @@ fill_another:
ucthread[s->uthread_no].busy = 1;
print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
s->uthread_no, c_len, stream);
control->data = (void*)s->uthread_no;
create_pthread(&threads[s->uthread_no], NULL, ucompthread, control);
st = malloc(sizeof(stream_thread_struct));
if (unlikely(!st))
fatal("Unable to malloc in fill_buffer");
st->i = s->uthread_no;
st->control = control;
create_pthread(&threads[s->uthread_no], NULL, ucompthread, st);
if (++s->uthread_no == s->base_thread + s->total_threads)
s->uthread_no = s->base_thread;