From a8dcecd72191aa69b69873ea5b70f753e9c913f4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 11 Mar 2011 08:35:15 +1100 Subject: [PATCH] fix-undefined-mutex-behavior-rename-stream-variable. Patch by --- stream.c | 77 ++++++++++++++++++++++++++++++-------------------------- stream.h | 4 +-- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/stream.c b/stream.c index c0ed349..714e54f 100644 --- a/stream.c +++ b/stream.c @@ -65,7 +65,7 @@ static struct compress_thread{ i64 c_len; /* Data length compressed */ pthread_mutex_t mutex; /* This thread's mutex */ struct stream_info *sinfo; - int stream; + int streamno; } *cthread; static struct uncomp_thread{ @@ -74,7 +74,7 @@ static struct uncomp_thread{ i64 last_head; uchar c_type; int busy; - int stream; + int streamno; } *ucthread; typedef struct stream_thread_struct { @@ -416,7 +416,7 @@ static int lzo_compress_buf(rzip_control *control, struct compress_thread *cthre uchar *c_buf; int ret = -1; - wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS); + wrkmem = (lzo_bytep) calloc(1, LZO1X_1_MEM_COMPRESS); if (unlikely(wrkmem == NULL)) { print_maxverbose("Failed to malloc wkmem\n"); return ret; @@ -789,7 +789,10 @@ void prepare_streamout_threads(rzip_control *control) void close_streamout_threads(rzip_control *control) { - int i, close_thread = output_thread; + int i, close_thread; + lock_mutex(&output_lock); + close_thread = output_thread; + unlock_mutex(&output_lock); /* Wait for the threads in the correct order in case they end up * serialised */ @@ -1003,6 +1006,9 @@ static void *compthread(void *data) struct stream_info *ctis; int waited = 0, ret = 0; + /* Make sure this thread doesn't already exist */ + lock_mutex(&cthread[i].mutex); + free(data); cti = &cthread[i]; ctis = cti->sinfo; @@ -1068,17 +1074,17 @@ retry: } } - if (unlikely(seekto(ctis, ctis->s[cti->stream].last_head))) + if (unlikely(seekto(ctis, ctis->s[cti->streamno].last_head))) fatal("Failed to seekto in compthread %d\n", i); if (unlikely(write_i64(ctis->fd, ctis->cur_pos))) fatal("Failed to write_i64 in compthread %d\n", i); - ctis->s[cti->stream].last_head = ctis->cur_pos + 17; + ctis->s[cti->streamno].last_head = ctis->cur_pos + 17; if (unlikely(seekto(ctis, ctis->cur_pos))) fatal("Failed to seekto cur_pos in compthread %d\n", i); - print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, cti->c_len, cti->stream); + print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, cti->c_len, cti->streamno); if (unlikely(write_u8(ctis->fd, cti->c_type) || write_i64(ctis->fd, cti->c_len) || write_i64(ctis->fd, cti->s_len) || @@ -1104,21 +1110,22 @@ retry: return 0; } -static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int stream, int newbuf) +static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int streamno, int newbuf) { static long i = 0; stream_thread_struct *s; /* Make sure this thread doesn't already exist */ lock_mutex(&cthread[i].mutex); + unlock_mutex(&cthread[i].mutex); cthread[i].sinfo = sinfo; - cthread[i].stream = stream; - cthread[i].s_buf = sinfo->s[stream].buf; - cthread[i].s_len = sinfo->s[stream].buflen; + cthread[i].streamno = streamno; + cthread[i].s_buf = sinfo->s[streamno].buf; + cthread[i].s_len = sinfo->s[streamno].buflen; print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n", - i, cthread[i].s_len, stream); + i, cthread[i].s_len, streamno); s = malloc(sizeof(stream_thread_struct)); if (unlikely(!s)) @@ -1129,10 +1136,10 @@ static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int s if (newbuf) { /* The stream buffer has been given to the thread, allocate a new one */ - sinfo->s[stream].buf = malloc(sinfo->bufsize); - if (unlikely(!sinfo->s[stream].buf)) + sinfo->s[streamno].buf = malloc(sinfo->bufsize); + if (unlikely(!sinfo->s[streamno].buf)) fatal("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize); - sinfo->s[stream].buflen = 0; + sinfo->s[streamno].buflen = 0; } if (++i == control->threads) @@ -1140,9 +1147,9 @@ static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int s } /* flush out any data in a stream buffer */ -void flush_buffer(rzip_control *control, struct stream_info *sinfo, int stream) +void flush_buffer(rzip_control *control, struct stream_info *sinfo, int streamno) { - clear_buffer(control, sinfo, stream, 1); + clear_buffer(control, sinfo, streamno, 1); } static void *ucompthread(void *data) @@ -1200,16 +1207,16 @@ retry: goto retry; } - print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->stream); + print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->streamno); return 0; } /* fill a buffer from a stream - return -1 on failure */ -static int fill_buffer(rzip_control *control, struct stream_info *sinfo, int stream) +static int fill_buffer(rzip_control *control, struct stream_info *sinfo, int streamno) { i64 header_length, u_len, c_len, last_head; - struct stream *s = &sinfo->s[stream]; + struct stream *s = &sinfo->s[streamno]; uchar c_type, *s_buf; stream_thread_struct *st; @@ -1269,13 +1276,13 @@ fill_another: ucthread[s->uthread_no].c_len = c_len; ucthread[s->uthread_no].u_len = u_len; ucthread[s->uthread_no].c_type = c_type; - ucthread[s->uthread_no].stream = stream; + ucthread[s->uthread_no].streamno = streamno; s->last_head = last_head; /* List this thread as busy */ ucthread[s->uthread_no].busy = 1; print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n", - s->uthread_no, c_len, stream); + s->uthread_no, c_len, streamno); st = malloc(sizeof(stream_thread_struct)); if (unlikely(!st)) @@ -1319,30 +1326,30 @@ out: } /* write some data to a stream. Return -1 on failure */ -int write_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len) +int write_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len) { struct stream_info *sinfo = ss; while (len) { i64 n; - n = MIN(sinfo->bufsize - sinfo->s[stream].buflen, len); + n = MIN(sinfo->bufsize - sinfo->s[streamno].buflen, len); - memcpy(sinfo->s[stream].buf + sinfo->s[stream].buflen, p, n); - sinfo->s[stream].buflen += n; + memcpy(sinfo->s[streamno].buf + sinfo->s[streamno].buflen, p, n); + sinfo->s[streamno].buflen += n; p += n; len -= n; /* Flush the buffer every sinfo->bufsize into one thread */ - if (sinfo->s[stream].buflen == sinfo->bufsize) - flush_buffer(control, sinfo, stream); + if (sinfo->s[streamno].buflen == sinfo->bufsize) + flush_buffer(control, sinfo, streamno); } return 0; } /* read some data from a stream. Return number of bytes read, or -1 on failure */ -i64 read_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len) +i64 read_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len) { struct stream_info *sinfo = ss; i64 ret = 0; @@ -1350,20 +1357,20 @@ i64 read_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len) while (len) { i64 n; - n = MIN(sinfo->s[stream].buflen - sinfo->s[stream].bufp, len); + n = MIN(sinfo->s[streamno].buflen - sinfo->s[streamno].bufp, len); if (n > 0) { - memcpy(p, sinfo->s[stream].buf + sinfo->s[stream].bufp, n); - sinfo->s[stream].bufp += n; + memcpy(p, sinfo->s[streamno].buf + sinfo->s[streamno].bufp, n); + sinfo->s[streamno].bufp += n; p += n; len -= n; ret += n; } - if (len && sinfo->s[stream].bufp == sinfo->s[stream].buflen) { - if (unlikely(fill_buffer(control, sinfo, stream))) + if (len && sinfo->s[streamno].bufp == sinfo->s[streamno].buflen) { + if (unlikely(fill_buffer(control, sinfo, streamno))) return -1; - if (sinfo->s[stream].bufp == sinfo->s[stream].buflen) + if (sinfo->s[streamno].bufp == sinfo->s[streamno].buflen) break; } } diff --git a/stream.h b/stream.h index 258d2be..959d2e3 100644 --- a/stream.h +++ b/stream.h @@ -33,8 +33,8 @@ void close_streamout_threads(rzip_control *control); void *open_stream_out(rzip_control *control, int f, unsigned int n, i64 chunk_limit, char cbytes); void *open_stream_in(rzip_control *control, int f, int n); void flush_buffer(rzip_control *control, struct stream_info *sinfo, int stream); -int write_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len); -i64 read_stream(rzip_control *control, void *ss, int stream, uchar *p, i64 len); +int write_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len); +i64 read_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len); int close_stream_out(rzip_control *control, void *ss); int close_stream_in(void *ss);