fix-undefined-mutex-behavior-rename-stream-variable.

Patch by <mike@zentific.com>
This commit is contained in:
Con Kolivas 2011-03-11 08:35:15 +11:00
parent 643054ae22
commit a8dcecd721
2 changed files with 44 additions and 37 deletions

View file

@ -65,7 +65,7 @@ static struct compress_thread{
i64 c_len; /* Data length compressed */
pthread_mutex_t mutex; /* This thread's mutex */
struct stream_info *sinfo;
int stream;
int streamno;
} *cthread;
static struct uncomp_thread{
@ -74,7 +74,7 @@ static struct uncomp_thread{
i64 last_head;
uchar c_type;
int busy;
int stream;
int streamno;
} *ucthread;
typedef struct stream_thread_struct {
@ -416,7 +416,7 @@ static int lzo_compress_buf(rzip_control *control, struct compress_thread *cthre
uchar *c_buf;
int ret = -1;
wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS);
wrkmem = (lzo_bytep) calloc(1, LZO1X_1_MEM_COMPRESS);
if (unlikely(wrkmem == NULL)) {
print_maxverbose("Failed to malloc wkmem\n");
return ret;
@ -789,7 +789,10 @@ void prepare_streamout_threads(rzip_control *control)
void close_streamout_threads(rzip_control *control)
{
int i, close_thread = output_thread;
int i, close_thread;
lock_mutex(&output_lock);
close_thread = output_thread;
unlock_mutex(&output_lock);
/* Wait for the threads in the correct order in case they end up
* serialised */
@ -1003,6 +1006,9 @@ static void *compthread(void *data)
struct stream_info *ctis;
int waited = 0, ret = 0;
/* Make sure this thread doesn't already exist */
lock_mutex(&cthread[i].mutex);
free(data);
cti = &cthread[i];
ctis = cti->sinfo;
@ -1068,17 +1074,17 @@ retry:
}
}
if (unlikely(seekto(ctis, ctis->s[cti->stream].last_head)))
if (unlikely(seekto(ctis, ctis->s[cti->streamno].last_head)))
fatal("Failed to seekto in compthread %d\n", i);
if (unlikely(write_i64(ctis->fd, ctis->cur_pos)))
fatal("Failed to write_i64 in compthread %d\n", i);
ctis->s[cti->stream].last_head = ctis->cur_pos + 17;
ctis->s[cti->streamno].last_head = ctis->cur_pos + 17;
if (unlikely(seekto(ctis, ctis->cur_pos)))
fatal("Failed to seekto cur_pos in compthread %d\n", i);
print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, cti->c_len, cti->stream);
print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, cti->c_len, cti->streamno);
if (unlikely(write_u8(ctis->fd, cti->c_type) ||
write_i64(ctis->fd, cti->c_len) ||
write_i64(ctis->fd, cti->s_len) ||
@ -1104,21 +1110,22 @@ retry:
return 0;
}
static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int stream, int newbuf)
static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int streamno, int newbuf)
{
static long i = 0;
stream_thread_struct *s;
/* Make sure this thread doesn't already exist */
lock_mutex(&cthread[i].mutex);
unlock_mutex(&cthread[i].mutex);
cthread[i].sinfo = sinfo;
cthread[i].stream = stream;
cthread[i].s_buf = sinfo->s[stream].buf;
cthread[i].s_len = sinfo->s[stream].buflen;
cthread[i].streamno = streamno;
cthread[i].s_buf = sinfo->s[streamno].buf;
cthread[i].s_len = sinfo->s[streamno].buflen;
print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n",
i, cthread[i].s_len, stream);
i, cthread[i].s_len, streamno);
s = malloc(sizeof(stream_thread_struct));
if (unlikely(!s))
@ -1129,10 +1136,10 @@ static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int s
if (newbuf) {
/* The stream buffer has been given to the thread, allocate a new one */
sinfo->s[stream].buf = malloc(sinfo->bufsize);
if (unlikely(!sinfo->s[stream].buf))
sinfo->s[streamno].buf = malloc(sinfo->bufsize);
if (unlikely(!sinfo->s[streamno].buf))
fatal("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize);
sinfo->s[stream].buflen = 0;
sinfo->s[streamno].buflen = 0;
}
if (++i == control->threads)
@ -1140,9 +1147,9 @@ static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int s
}
/* flush out any data in a stream buffer */
void flush_buffer(rzip_control *control, struct stream_info *sinfo, int stream)
void flush_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
{
clear_buffer(control, sinfo, stream, 1);
clear_buffer(control, sinfo, streamno, 1);
}
static void *ucompthread(void *data)
@ -1200,16 +1207,16 @@ retry:
goto retry;
}
print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->stream);
print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->streamno);
return 0;
}
/* fill a buffer from a stream - return -1 on failure */
static int fill_buffer(rzip_control *control, struct stream_info *sinfo, int stream)
static int fill_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
{
i64 header_length, u_len, c_len, last_head;
struct stream *s = &sinfo->s[stream];
struct stream *s = &sinfo->s[streamno];
uchar c_type, *s_buf;
stream_thread_struct *st;
@ -1269,13 +1276,13 @@ fill_another:
ucthread[s->uthread_no].c_len = c_len;
ucthread[s->uthread_no].u_len = u_len;
ucthread[s->uthread_no].c_type = c_type;
ucthread[s->uthread_no].stream = stream;
ucthread[s->uthread_no].streamno = streamno;
s->last_head = last_head;
/* List this thread as busy */
ucthread[s->uthread_no].busy = 1;
print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
s->uthread_no, c_len, stream);
s->uthread_no, c_len, streamno);
st = malloc(sizeof(stream_thread_struct));
if (unlikely(!st))
@ -1319,30 +1326,30 @@ out:
}
/* write some data to a stream. Return -1 on failure */
int write_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len)
int write_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len)
{
struct stream_info *sinfo = ss;
while (len) {
i64 n;
n = MIN(sinfo->bufsize - sinfo->s[stream].buflen, len);
n = MIN(sinfo->bufsize - sinfo->s[streamno].buflen, len);
memcpy(sinfo->s[stream].buf + sinfo->s[stream].buflen, p, n);
sinfo->s[stream].buflen += n;
memcpy(sinfo->s[streamno].buf + sinfo->s[streamno].buflen, p, n);
sinfo->s[streamno].buflen += n;
p += n;
len -= n;
/* Flush the buffer every sinfo->bufsize into one thread */
if (sinfo->s[stream].buflen == sinfo->bufsize)
flush_buffer(control, sinfo, stream);
if (sinfo->s[streamno].buflen == sinfo->bufsize)
flush_buffer(control, sinfo, streamno);
}
return 0;
}
/* read some data from a stream. Return number of bytes read, or -1
on failure */
i64 read_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len)
i64 read_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len)
{
struct stream_info *sinfo = ss;
i64 ret = 0;
@ -1350,20 +1357,20 @@ i64 read_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len)
while (len) {
i64 n;
n = MIN(sinfo->s[stream].buflen - sinfo->s[stream].bufp, len);
n = MIN(sinfo->s[streamno].buflen - sinfo->s[streamno].bufp, len);
if (n > 0) {
memcpy(p, sinfo->s[stream].buf + sinfo->s[stream].bufp, n);
sinfo->s[stream].bufp += n;
memcpy(p, sinfo->s[streamno].buf + sinfo->s[streamno].bufp, n);
sinfo->s[streamno].bufp += n;
p += n;
len -= n;
ret += n;
}
if (len && sinfo->s[stream].bufp == sinfo->s[stream].buflen) {
if (unlikely(fill_buffer(control, sinfo, stream)))
if (len && sinfo->s[streamno].bufp == sinfo->s[streamno].buflen) {
if (unlikely(fill_buffer(control, sinfo, streamno)))
return -1;
if (sinfo->s[stream].bufp == sinfo->s[stream].buflen)
if (sinfo->s[streamno].bufp == sinfo->s[streamno].buflen)
break;
}
}

View file

@ -33,8 +33,8 @@ void close_streamout_threads(rzip_control *control);
void *open_stream_out(rzip_control *control, int f, unsigned int n, i64 chunk_limit, char cbytes);
void *open_stream_in(rzip_control *control, int f, int n);
void flush_buffer(rzip_control *control, struct stream_info *sinfo, int stream);
int write_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len);
i64 read_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len);
int write_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len);
i64 read_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len);
int close_stream_out(rzip_control *control, void *ss);
int close_stream_in(void *ss);