mirror of
https://github.com/ckolivas/lrzip.git
synced 2026-04-21 06:03:54 +00:00
giant clusterfuck commit: *add and use alloca/strdupa in some spots *implement return values on almost every function *free() is now a macro which also nulls the freed pointer *when in 'library mode', fatal_exit() will not be called *fatal()/failure() are now macros which add line number and file for logging *all occurrences of fatal/failure in files other than main.c are now wrapped to use a logging callback (if specified) and then return *functions now clean up resources on returning with an error
This commit is contained in:
parent
d7495cee61
commit
6dd20d735b
12 changed files with 808 additions and 494 deletions
268
stream.c
268
stream.c
|
|
@ -105,53 +105,61 @@ static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
|
|||
static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
|
||||
static pthread_t *threads;
|
||||
|
||||
static void init_mutex(rzip_control *control, pthread_mutex_t *mutex)
|
||||
static bool init_mutex(rzip_control *control, pthread_mutex_t *mutex)
|
||||
{
|
||||
if (unlikely(pthread_mutex_init(mutex, NULL)))
|
||||
fatal(control, "pthread_mutex_init failed");
|
||||
fatal_return(("pthread_mutex_init failed"), false);
|
||||
return true;
|
||||
}
|
||||
|
||||
static void unlock_mutex(rzip_control *control, pthread_mutex_t *mutex)
|
||||
static bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex)
|
||||
{
|
||||
if (unlikely(pthread_mutex_unlock(mutex)))
|
||||
fatal(control, "pthread_mutex_unlock failed");
|
||||
fatal_return(("pthread_mutex_unlock failed"), false);
|
||||
return true;
|
||||
}
|
||||
|
||||
static void lock_mutex(rzip_control *control, pthread_mutex_t *mutex)
|
||||
static bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex)
|
||||
{
|
||||
if (unlikely(pthread_mutex_lock(mutex)))
|
||||
fatal(control, "pthread_mutex_lock failed");
|
||||
fatal_return(("pthread_mutex_lock failed"), false);
|
||||
return true;
|
||||
}
|
||||
|
||||
static void cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex)
|
||||
static bool cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex)
|
||||
{
|
||||
if (unlikely(pthread_cond_wait(cond, mutex)))
|
||||
fatal(control, "pthread_cond_wait failed");
|
||||
fatal_return(("pthread_cond_wait failed"), false);
|
||||
return true;
|
||||
}
|
||||
|
||||
static void cond_broadcast(rzip_control *control, pthread_cond_t *cond)
|
||||
static bool cond_broadcast(rzip_control *control, pthread_cond_t *cond)
|
||||
{
|
||||
if (unlikely(pthread_cond_broadcast(cond)))
|
||||
fatal(control, "pthread_cond_broadcast failed");
|
||||
fatal_return(("pthread_cond_broadcast failed"), false);
|
||||
return true;
|
||||
}
|
||||
|
||||
void create_pthread(rzip_control *control, pthread_t *thread, pthread_attr_t * attr,
|
||||
bool create_pthread(rzip_control *control, pthread_t *thread, pthread_attr_t * attr,
|
||||
void * (*start_routine)(void *), void *arg)
|
||||
{
|
||||
if (unlikely(pthread_create(thread, attr, start_routine, arg)))
|
||||
fatal(control, "pthread_create");
|
||||
fatal_return(("pthread_create"), false);
|
||||
return true;
|
||||
}
|
||||
|
||||
void detach_pthread(rzip_control *control, pthread_t *thread)
|
||||
bool detach_pthread(rzip_control *control, pthread_t *thread)
|
||||
{
|
||||
if (unlikely(pthread_detach(*thread)))
|
||||
fatal(control, "pthread_detach");
|
||||
fatal_return(("pthread_detach"), false);
|
||||
return true;
|
||||
}
|
||||
|
||||
void join_pthread(rzip_control *control, pthread_t th, void **thread_return)
|
||||
bool join_pthread(rzip_control *control, pthread_t th, void **thread_return)
|
||||
{
|
||||
if (pthread_join(th, thread_return))
|
||||
fatal(control, "pthread_join");
|
||||
fatal_return(("pthread_join"), false);
|
||||
return true;
|
||||
}
|
||||
|
||||
/* just to keep things clean, declare function here
|
||||
|
|
@ -164,12 +172,14 @@ static inline FILE *fake_fmemopen(rzip_control *control, void *buf, size_t bufle
|
|||
FILE *in;
|
||||
|
||||
if (unlikely(strcmp(mode, "r")))
|
||||
failure(control, "fake_fmemopen only supports mode \"r\".");
|
||||
failure_return(("fake_fmemopen only supports mode \"r\"."), NULL);
|
||||
in = tmpfile();
|
||||
if (unlikely(!in))
|
||||
return NULL;
|
||||
if (unlikely(fwrite(buf, buflen, 1, in) != 1))
|
||||
if (unlikely(fwrite(buf, buflen, 1, in) != 1)) {
|
||||
fclose(in);
|
||||
return NULL;
|
||||
}
|
||||
rewind(in);
|
||||
return in;
|
||||
}
|
||||
|
|
@ -179,7 +189,7 @@ static inline FILE *fake_open_memstream(rzip_control *control, char **buf, size_
|
|||
FILE *out;
|
||||
|
||||
if (unlikely(buf == NULL || length == NULL))
|
||||
failure(control, "NULL parameter to fake_open_memstream");
|
||||
failure_return(("NULL parameter to fake_open_memstream"), NULL);
|
||||
out = tmpfile();
|
||||
if (unlikely(!out))
|
||||
return NULL;
|
||||
|
|
@ -197,10 +207,14 @@ static inline int fake_open_memstream_update_buffer(FILE *fp, uchar **buf, size_
|
|||
*buf = (uchar *)malloc(*length);
|
||||
if (unlikely(!*buf))
|
||||
return -1;
|
||||
if (unlikely(fread(*buf, *length, 1, fp) != 1))
|
||||
if (unlikely(fread(*buf, *length, 1, fp) != 1)) {
|
||||
free(*buf);
|
||||
return -1;
|
||||
if (unlikely(fseek(fp, original_pos, SEEK_SET)))
|
||||
}
|
||||
if (unlikely(fseek(fp, original_pos, SEEK_SET))) {
|
||||
free(*buf);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -238,8 +252,11 @@ static int zpaq_compress_buf(rzip_control *control, struct compress_thread *cthr
|
|||
zpipe_compress(in, out, control->msgout, cthread->s_len,
|
||||
(int)(SHOW_PROGRESS), thread);
|
||||
|
||||
if (unlikely(memstream_update_buffer(out, &c_buf, &dlen)))
|
||||
fatal(control, "Failed to memstream_update_buffer in zpaq_compress_buf");
|
||||
if (unlikely(memstream_update_buffer(out, &c_buf, &dlen))) {
|
||||
fclose(in);
|
||||
fclose(out);
|
||||
fatal_return(("Failed to memstream_update_buffer in zpaq_compress_buf"), -1);
|
||||
}
|
||||
|
||||
fclose(in);
|
||||
fclose(out);
|
||||
|
|
@ -495,13 +512,17 @@ static int zpaq_decompress_buf(rzip_control *control, struct uncomp_thread *ucth
|
|||
out = open_memstream((char **)&c_buf, &dlen);
|
||||
if (unlikely(!out)) {
|
||||
print_err("Failed to open_memstream in zpaq_decompress_buf\n");
|
||||
fclose(in);
|
||||
return -1;
|
||||
}
|
||||
|
||||
zpipe_decompress(in, out, control->msgout, ucthread->u_len, (int)(SHOW_PROGRESS), thread);
|
||||
|
||||
if (unlikely(memstream_update_buffer(out, &c_buf, &dlen)))
|
||||
fatal(control, "Failed to memstream_update_buffer in zpaq_decompress_buf");
|
||||
if (unlikely(memstream_update_buffer(out, &c_buf, &dlen))) {
|
||||
fclose(in);
|
||||
fclose(out);
|
||||
fatal_return(("Failed to memstream_update_buffer in zpaq_decompress_buf"), -1);
|
||||
}
|
||||
|
||||
fclose(in);
|
||||
fclose(out);
|
||||
|
|
@ -673,9 +694,9 @@ ssize_t put_fdout(rzip_control *control, void *offset_buf, ssize_t ret)
|
|||
/* The data won't fit in a temporary output buffer so we have
|
||||
* to fall back to temporary files. */
|
||||
print_verbose("Unable to decompress entirely in ram, will use physical files\n");
|
||||
write_fdout(control, control->tmp_outbuf, control->out_len);
|
||||
if (unlikely(!write_fdout(control, control->tmp_outbuf, control->out_len))) return -1;
|
||||
close_tmpoutbuf(control);
|
||||
write_fdout(control, offset_buf, ret);
|
||||
if (unlikely(!write_fdout(control, offset_buf, ret))) return -1;
|
||||
return ret;
|
||||
}
|
||||
memcpy(control->tmp_outbuf + control->out_ofs, offset_buf, ret);
|
||||
|
|
@ -707,7 +728,7 @@ ssize_t write_1g(rzip_control *control, void *buf, i64 len)
|
|||
return total;
|
||||
}
|
||||
|
||||
static void read_fdin(struct rzip_control *control, i64 len)
|
||||
static bool read_fdin(struct rzip_control *control, i64 len)
|
||||
{
|
||||
int tmpchar;
|
||||
i64 i;
|
||||
|
|
@ -715,11 +736,12 @@ static void read_fdin(struct rzip_control *control, i64 len)
|
|||
for (i = 0; i < len; i++) {
|
||||
tmpchar = getchar();
|
||||
if (unlikely(tmpchar == EOF))
|
||||
failure(control, "Reached end of file on STDIN prematurely on read_fdin, asked for %lld got %lld\n",
|
||||
len, i);
|
||||
failure_return(("Reached end of file on STDIN prematurely on read_fdin, asked for %lld got %lld\n",
|
||||
len, i), false);
|
||||
control->tmp_inbuf[control->in_ofs + i] = (char)tmpchar;
|
||||
}
|
||||
control->in_len = control->in_ofs + len;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Ditto for read */
|
||||
|
|
@ -733,13 +755,13 @@ ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len)
|
|||
/* We're decompressing from STDIN */
|
||||
if (unlikely(control->in_ofs + len > control->in_maxlen)) {
|
||||
/* We're unable to fit it all into the temp buffer */
|
||||
write_fdin(control);
|
||||
read_tmpinfile(control, control->fd_in);
|
||||
if (unlikely(!write_fdin(control))) return -1;
|
||||
if (unlikely(!read_tmpinfile(control, control->fd_in))) return -1;
|
||||
close_tmpinbuf(control);
|
||||
goto read_fd;
|
||||
}
|
||||
if (control->in_ofs + len > control->in_len)
|
||||
read_fdin(control, control->in_ofs + len - control->in_len);
|
||||
if (unlikely(!read_fdin(control, control->in_ofs + len - control->in_len))) return false;
|
||||
memcpy(buf, control->tmp_inbuf + control->in_ofs, len);
|
||||
control->in_ofs += len;
|
||||
return len;
|
||||
|
|
@ -747,7 +769,7 @@ ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len)
|
|||
|
||||
if (TMP_OUTBUF && fd == control->fd_out) {
|
||||
if (unlikely(control->out_ofs + len > control->out_maxlen))
|
||||
failure(control, "Trying to read beyond out_ofs in tmpoutbuf\n");
|
||||
failure_return(("Trying to read beyond out_ofs in tmpoutbuf\n"), -1);
|
||||
memcpy(buf, control->tmp_outbuf + control->out_ofs, len);
|
||||
control->out_ofs += len;
|
||||
return len;
|
||||
|
|
@ -876,7 +898,7 @@ static int read_seekto(rzip_control *control, struct stream_info *sinfo, i64 pos
|
|||
|
||||
if (TMP_INBUF) {
|
||||
if (spos > control->in_len)
|
||||
read_fdin(control, spos - control->in_len);
|
||||
if (unlikely(!read_fdin(control, spos - control->in_len))) return -1;
|
||||
control->in_ofs = spos;
|
||||
if (unlikely(spos < 0)) {
|
||||
print_err("Trying to seek to %lld outside tmp inbuf in read_seekto\n", spos);
|
||||
|
|
@ -896,7 +918,7 @@ static i64 get_seek(rzip_control *control, int fd)
|
|||
return control->out_relofs + control->out_ofs;
|
||||
ret = lseek(fd, 0, SEEK_CUR);
|
||||
if (unlikely(ret == -1))
|
||||
fatal(control, "Failed to lseek in get_seek\n");
|
||||
fatal_return(("Failed to lseek in get_seek\n"), -1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
@ -908,11 +930,11 @@ static i64 get_readseek(rzip_control *control, int fd)
|
|||
return control->in_ofs;
|
||||
ret = lseek(fd, 0, SEEK_CUR);
|
||||
if (unlikely(ret == -1))
|
||||
fatal(control, "Failed to lseek in get_seek\n");
|
||||
fatal_return(("Failed to lseek in get_seek\n"), -1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void prepare_streamout_threads(rzip_control *control)
|
||||
bool prepare_streamout_threads(rzip_control *control)
|
||||
{
|
||||
int i;
|
||||
|
||||
|
|
@ -926,30 +948,47 @@ void prepare_streamout_threads(rzip_control *control)
|
|||
control->threads = 1;
|
||||
threads = calloc(sizeof(pthread_t), control->threads);
|
||||
if (unlikely(!threads))
|
||||
fatal(control, "Unable to calloc threads in prepare_streamout_threads\n");
|
||||
fatal_return(("Unable to calloc threads in prepare_streamout_threads\n"), false);
|
||||
|
||||
cthread = calloc(sizeof(struct compress_thread), control->threads);
|
||||
if (unlikely(!cthread))
|
||||
fatal(control, "Unable to calloc cthread in prepare_streamout_threads\n");
|
||||
if (unlikely(!cthread)) {
|
||||
free(threads);
|
||||
fatal_return(("Unable to calloc cthread in prepare_streamout_threads\n"), false);
|
||||
}
|
||||
|
||||
for (i = 0; i < control->threads; i++)
|
||||
init_mutex(control, &cthread[i].mutex);
|
||||
if (unlikely(!init_mutex(control, &cthread[i].mutex))) {
|
||||
int x;
|
||||
for (x = 0; x < i; x++) pthread_mutex_destroy(&cthread[x].mutex);
|
||||
free(threads);
|
||||
free(cthread);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void close_streamout_threads(rzip_control *control)
|
||||
bool close_streamout_threads(rzip_control *control)
|
||||
{
|
||||
int i, close_thread = output_thread;
|
||||
|
||||
/* Wait for the threads in the correct order in case they end up
|
||||
* serialised */
|
||||
for (i = 0; i < control->threads; i++) {
|
||||
lock_mutex(control, &cthread[close_thread].mutex);
|
||||
if (unlikely(!lock_mutex(control, &cthread[close_thread].mutex))) {
|
||||
int x;
|
||||
for (x = 0; x < i; x++) unlock_mutex(control, &cthread[close_thread].mutex);
|
||||
free(cthread);
|
||||
free(threads);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (++close_thread == control->threads)
|
||||
close_thread = 0;
|
||||
}
|
||||
free(cthread);
|
||||
free(threads);
|
||||
return true;
|
||||
}
|
||||
|
||||
/* open a set of output streams, compressing with the given
|
||||
|
|
@ -1044,8 +1083,12 @@ retest_malloc:
|
|||
|
||||
for (i = 0; i < n; i++) {
|
||||
sinfo->s[i].buf = calloc(sinfo->bufsize , 1);
|
||||
if (unlikely(!sinfo->s[i].buf))
|
||||
fatal(control, "Unable to malloc buffer of size %lld in open_stream_out\n", sinfo->bufsize);
|
||||
if (unlikely(!sinfo->s[i].buf)) {
|
||||
fatal("Unable to malloc buffer of size %lld in open_stream_out\n", sinfo->bufsize);
|
||||
free(sinfo->s);
|
||||
free(sinfo);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return (void *)sinfo;
|
||||
|
|
@ -1054,7 +1097,7 @@ retest_malloc:
|
|||
/* The block headers are all encrypted so we read the data and salt associated
|
||||
* with them, decrypt the data, then return the decrypted version of the
|
||||
* values */
|
||||
static void decrypt_header(rzip_control *control, uchar *head, uchar *c_type,
|
||||
static bool decrypt_header(rzip_control *control, uchar *head, uchar *c_type,
|
||||
i64 *c_len, i64 *u_len, i64 *last_head)
|
||||
{
|
||||
uchar *buf = head + SALT_LEN;
|
||||
|
|
@ -1064,12 +1107,13 @@ static void decrypt_header(rzip_control *control, uchar *head, uchar *c_type,
|
|||
memcpy(buf + 9, u_len, 8);
|
||||
memcpy(buf + 17, last_head, 8);
|
||||
|
||||
lrz_decrypt(control, buf, 25, head);
|
||||
if (unlikely(!lrz_decrypt(control, buf, 25, head))) return false;
|
||||
|
||||
memcpy(c_type, buf, 1);
|
||||
memcpy(c_len, buf + 1, 8);
|
||||
memcpy(u_len, buf + 9, 8);
|
||||
memcpy(last_head, buf + 17, 8);
|
||||
return true;
|
||||
}
|
||||
|
||||
/* prepare a set of n streams for reading on file descriptor f */
|
||||
|
|
@ -1094,8 +1138,11 @@ void *open_stream_in(rzip_control *control, int f, int n, char chunk_bytes)
|
|||
return NULL;
|
||||
|
||||
ucthread = calloc(sizeof(struct uncomp_thread), total_threads);
|
||||
if (unlikely(!ucthread))
|
||||
fatal(control, "Unable to calloc cthread in open_stream_in\n");
|
||||
if (unlikely(!ucthread)) {
|
||||
free(sinfo);
|
||||
free(threads);
|
||||
fatal_return(("Unable to calloc cthread in open_stream_in\n"), NULL);
|
||||
}
|
||||
|
||||
sinfo->num_streams = n;
|
||||
sinfo->fd = f;
|
||||
|
|
@ -1129,6 +1176,7 @@ void *open_stream_in(rzip_control *control, int f, int n, char chunk_bytes)
|
|||
}
|
||||
}
|
||||
sinfo->initial_pos = get_readseek(control, f);
|
||||
if (unlikely(sinfo->initial_pos == -1)) goto failed;
|
||||
|
||||
for (i = 0; i < n; i++) {
|
||||
uchar c, enc_head[25 + SALT_LEN];
|
||||
|
|
@ -1176,7 +1224,7 @@ again:
|
|||
header_length = 1 + (read_len * 3);
|
||||
}
|
||||
if (ENCRYPT)
|
||||
decrypt_header(control, enc_head, &c, &v1, &v2, &sinfo->s[i].last_head);
|
||||
if (unlikely(!decrypt_header(control, enc_head, &c, &v1, &v2, &sinfo->s[i].last_head))) goto failed;
|
||||
|
||||
v1 = le64toh(v1);
|
||||
v2 = le64toh(v2);
|
||||
|
|
@ -1219,32 +1267,38 @@ failed:
|
|||
* by reading what has been written, encrypting it, and writing back over it.
|
||||
* This is very convoluted depending on whether a last_head value is written
|
||||
* to this block or not. See the callers of this function */
|
||||
static void rewrite_encrypted(rzip_control *control, struct stream_info *sinfo, i64 ofs)
|
||||
static bool rewrite_encrypted(rzip_control *control, struct stream_info *sinfo, i64 ofs)
|
||||
{
|
||||
uchar *buf, *head;
|
||||
i64 cur_ofs;
|
||||
|
||||
cur_ofs = get_seek(control, sinfo->fd) - sinfo->initial_pos;
|
||||
if (unlikely(cur_ofs == -1)) return false;
|
||||
head = malloc(25 + SALT_LEN);
|
||||
if (unlikely(!head))
|
||||
fatal(control, "Failed to malloc head in rewrite_encrypted\n");
|
||||
fatal_return(("Failed to malloc head in rewrite_encrypted\n"), false);
|
||||
buf = head + SALT_LEN;
|
||||
get_rand(control, head, SALT_LEN);
|
||||
if (unlikely(!get_rand(control, head, SALT_LEN))) goto error;
|
||||
if (unlikely(seekto(control, sinfo, ofs - SALT_LEN)))
|
||||
failure(control, "Failed to seekto buf ofs in rewrite_encrypted\n");
|
||||
failure_goto(("Failed to seekto buf ofs in rewrite_encrypted\n"), error);
|
||||
if (unlikely(write_buf(control, head, SALT_LEN)))
|
||||
failure(control, "Failed to write_buf head in rewrite_encrypted\n");
|
||||
failure_goto(("Failed to write_buf head in rewrite_encrypted\n"), error);
|
||||
if (unlikely(read_buf(control, sinfo->fd, buf, 25)))
|
||||
failure(control, "Failed to read_buf buf in rewrite_encrypted\n");
|
||||
|
||||
failure_goto(("Failed to read_buf buf in rewrite_encrypted\n"), error);
|
||||
|
||||
lrz_encrypt(control, buf, 25, head);
|
||||
if (unlikely(!lrz_encrypt(control, buf, 25, head))) goto error;
|
||||
|
||||
if (unlikely(seekto(control, sinfo, ofs)))
|
||||
failure(control, "Failed to seek back to ofs in rewrite_encrypted\n");
|
||||
failure_goto(("Failed to seek back to ofs in rewrite_encrypted\n"), error);
|
||||
if (unlikely(write_buf(control, buf, 25)))
|
||||
failure(control, "Failed to write_buf encrypted buf in rewrite_encrypted\n");
|
||||
failure_goto(("Failed to write_buf encrypted buf in rewrite_encrypted\n"), error);
|
||||
free(head);
|
||||
seekto(control, sinfo, cur_ofs);
|
||||
return true;
|
||||
error:
|
||||
free(head);
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Enter with s_buf allocated,s_buf points to the compressed data after the
|
||||
|
|
@ -1287,7 +1341,7 @@ retry:
|
|||
ret = gzip_compress_buf(control, cti);
|
||||
else if (ZPAQ_COMPRESS)
|
||||
ret = zpaq_compress_buf(control, cti, i);
|
||||
else failure(control, "Dunno wtf compression to use!\n");
|
||||
else failure_goto(("Dunno wtf compression to use!\n"), error);
|
||||
}
|
||||
|
||||
padded_len = cti->c_len;
|
||||
|
|
@ -1298,15 +1352,15 @@ retry:
|
|||
padded_len = MIN_SIZE;
|
||||
cti->s_buf = realloc(cti->s_buf, MIN_SIZE);
|
||||
if (unlikely(!cti->s_buf))
|
||||
fatal(control, "Failed to realloc s_buf in compthread\n");
|
||||
get_rand(control, cti->s_buf + cti->c_len, MIN_SIZE - cti->c_len);
|
||||
fatal_goto(("Failed to realloc s_buf in compthread\n"), error);
|
||||
if (unlikely(!get_rand(control, cti->s_buf + cti->c_len, MIN_SIZE - cti->c_len))) goto error;
|
||||
}
|
||||
|
||||
/* If compression fails for whatever reason multithreaded, then wait
|
||||
* for the previous thread to finish, serialising the work to decrease
|
||||
* the memory requirements, increasing the chance of success */
|
||||
if (unlikely(ret && waited))
|
||||
failure(control, "Failed to compress in compthread\n");
|
||||
failure_goto(("Failed to compress in compthread\n"), error);
|
||||
|
||||
if (!waited) {
|
||||
lock_mutex(control, &output_lock);
|
||||
|
|
@ -1332,7 +1386,7 @@ retry:
|
|||
if (TMP_OUTBUF) {
|
||||
if (!control->magic_written)
|
||||
write_magic(control);
|
||||
flush_tmpoutbuf(control);
|
||||
if (unlikely(!flush_tmpoutbuf(control))) goto error;
|
||||
}
|
||||
|
||||
/* Write chunk bytes of this block */
|
||||
|
|
@ -1346,13 +1400,14 @@ retry:
|
|||
|
||||
/* First chunk of this stream, write headers */
|
||||
ctis->initial_pos = get_seek(control, ctis->fd);
|
||||
if (unlikely(ctis->initial_pos == -1)) goto error;
|
||||
|
||||
for (j = 0; j < ctis->num_streams; j++) {
|
||||
/* If encrypting, we leave SALT_LEN room to write in salt
|
||||
* later */
|
||||
if (ENCRYPT) {
|
||||
if (unlikely(write_val(control, 0, SALT_LEN)))
|
||||
fatal(control, "Failed to write_buf blank salt in compthread %d\n", i);
|
||||
fatal_goto(("Failed to write_buf blank salt in compthread %d\n", i), error);
|
||||
ctis->cur_pos += SALT_LEN;
|
||||
}
|
||||
ctis->s[j].last_head = ctis->cur_pos + 1 + (write_len * 2);
|
||||
|
|
@ -1365,23 +1420,23 @@ retry:
|
|||
}
|
||||
|
||||
if (unlikely(seekto(control, ctis, ctis->s[cti->streamno].last_head)))
|
||||
fatal(control, "Failed to seekto in compthread %d\n", i);
|
||||
fatal_goto(("Failed to seekto in compthread %d\n", i), error);
|
||||
|
||||
if (unlikely(write_val(control, ctis->cur_pos, write_len)))
|
||||
fatal(control, "Failed to write_val cur_pos in compthread %d\n", i);
|
||||
fatal_goto(("Failed to write_val cur_pos in compthread %d\n", i), error);
|
||||
|
||||
if (ENCRYPT)
|
||||
rewrite_encrypted(control, ctis, ctis->s[cti->streamno].last_head - 17);
|
||||
|
||||
ctis->s[cti->streamno].last_head = ctis->cur_pos + 1 + (write_len * 2) + (ENCRYPT ? SALT_LEN : 0);
|
||||
if (unlikely(seekto(control, ctis, ctis->cur_pos)))
|
||||
fatal(control, "Failed to seekto cur_pos in compthread %d\n", i);
|
||||
fatal_goto(("Failed to seekto cur_pos in compthread %d\n", i), error);
|
||||
|
||||
print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, padded_len, cti->streamno);
|
||||
|
||||
if (ENCRYPT) {
|
||||
if (unlikely(write_val(control, 0, SALT_LEN)))
|
||||
fatal(control, "Failed to write_buf header salt in compthread %d\n", i);
|
||||
fatal_goto(("Failed to write_buf header salt in compthread %d\n", i), error);
|
||||
ctis->cur_pos += SALT_LEN;
|
||||
ctis->s[cti->streamno].last_headofs = ctis->cur_pos;
|
||||
}
|
||||
|
|
@ -1390,19 +1445,19 @@ retry:
|
|||
write_val(control, cti->c_len, write_len) ||
|
||||
write_val(control, cti->s_len, write_len) ||
|
||||
write_val(control, 0, write_len))) {
|
||||
fatal(control, "Failed write in compthread %d\n", i);
|
||||
fatal_goto(("Failed write in compthread %d\n", i), error);
|
||||
}
|
||||
ctis->cur_pos += 1 + (write_len * 3);
|
||||
|
||||
if (ENCRYPT) {
|
||||
get_rand(control, cti->salt, SALT_LEN);
|
||||
if (unlikely(!get_rand(control, cti->salt, SALT_LEN))) goto error;
|
||||
if (unlikely(write_buf(control, cti->salt, SALT_LEN)))
|
||||
fatal(control, "Failed to write_buf block salt in compthread %d\n", i);
|
||||
lrz_encrypt(control, cti->s_buf, padded_len, cti->salt);
|
||||
fatal_goto(("Failed to write_buf block salt in compthread %d\n", i), error);
|
||||
if (unlikely(!lrz_encrypt(control, cti->s_buf, padded_len, cti->salt))) goto error;
|
||||
ctis->cur_pos += SALT_LEN;
|
||||
}
|
||||
if (unlikely(write_buf(control, cti->s_buf, padded_len)))
|
||||
fatal(control, "Failed to write_buf s_buf in compthread %d\n", i);
|
||||
fatal_goto(("Failed to write_buf s_buf in compthread %d\n", i), error);
|
||||
|
||||
ctis->cur_pos += padded_len;
|
||||
free(cti->s_buf);
|
||||
|
|
@ -1413,12 +1468,13 @@ retry:
|
|||
cond_broadcast(control, &output_cond);
|
||||
unlock_mutex(control, &output_lock);
|
||||
|
||||
error:
|
||||
unlock_mutex(control, &cti->mutex);
|
||||
|
||||
return 0;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int streamno, int newbuf)
|
||||
static bool clear_buffer(rzip_control *control, struct stream_info *sinfo, int streamno, int newbuf)
|
||||
{
|
||||
static long i = 0;
|
||||
stream_thread_struct *s;
|
||||
|
|
@ -1435,30 +1491,39 @@ static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int s
|
|||
i, cthread[i].s_len, streamno);
|
||||
|
||||
s = malloc(sizeof(stream_thread_struct));
|
||||
if (unlikely(!s))
|
||||
fatal(control, "Unable to malloc in clear_buffer");
|
||||
if (unlikely(!s)) {
|
||||
unlock_mutex(control, &cthread[i].mutex);
|
||||
fatal_return(("Unable to malloc in clear_buffer"), false);
|
||||
}
|
||||
s->i = i;
|
||||
s->control = control;
|
||||
create_pthread(control, &threads[i], NULL, compthread, s);
|
||||
detach_pthread(control, &threads[i]);
|
||||
if (unlikely((!create_pthread(control, &threads[i], NULL, compthread, s)) ||
|
||||
(!detach_pthread(control, &threads[i])))) {
|
||||
unlock_mutex(control, &cthread[i].mutex);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
if (newbuf) {
|
||||
/* The stream buffer has been given to the thread, allocate a
|
||||
* new one. */
|
||||
sinfo->s[streamno].buf = malloc(sinfo->bufsize);
|
||||
if (unlikely(!sinfo->s[streamno].buf))
|
||||
fatal(control, "Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize);
|
||||
if (unlikely(!sinfo->s[streamno].buf)) {
|
||||
unlock_mutex(control, &cthread[i].mutex);
|
||||
fatal_return(("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize), false);
|
||||
}
|
||||
sinfo->s[streamno].buflen = 0;
|
||||
}
|
||||
|
||||
if (++i == control->threads)
|
||||
i = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* flush out any data in a stream buffer */
|
||||
void flush_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
|
||||
bool flush_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
|
||||
{
|
||||
clear_buffer(control, sinfo, streamno, 1);
|
||||
return clear_buffer(control, sinfo, streamno, 1);
|
||||
}
|
||||
|
||||
static void *ucompthread(void *data)
|
||||
|
|
@ -1494,7 +1559,7 @@ retry:
|
|||
ret = zpaq_decompress_buf(control, uci, i);
|
||||
break;
|
||||
default:
|
||||
failure(control, "Dunno wtf decompression type to use!\n");
|
||||
failure_return(("Dunno wtf decompression type to use!\n"), NULL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -1503,7 +1568,7 @@ retry:
|
|||
* parallel */
|
||||
if (unlikely(ret)) {
|
||||
if (unlikely(waited))
|
||||
failure(control, "Failed to decompress in ucompthread\n");
|
||||
failure_return(("Failed to decompress in ucompthread\n"), NULL);
|
||||
print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n");
|
||||
/* We do not strictly need to wait for this, so it's used when
|
||||
* decompression fails due to inadequate memory to try again
|
||||
|
|
@ -1536,7 +1601,7 @@ static int fill_buffer(rzip_control *control, struct stream_info *sinfo, int str
|
|||
goto out;
|
||||
fill_another:
|
||||
if (unlikely(ucthread[s->uthread_no].busy))
|
||||
failure(control, "Trying to start a busy thread, this shouldn't happen!\n");
|
||||
failure_return(("Trying to start a busy thread, this shouldn't happen!\n"), -1);
|
||||
|
||||
if (unlikely(read_seekto(control, sinfo, s->last_head)))
|
||||
return -1;
|
||||
|
|
@ -1576,7 +1641,7 @@ fill_another:
|
|||
}
|
||||
|
||||
if (ENCRYPT) {
|
||||
decrypt_header(control, enc_head, &c_type, &c_len, &u_len, &last_head);
|
||||
if (unlikely(!decrypt_header(control, enc_head, &c_type, &c_len, &u_len, &last_head))) return -1;
|
||||
if (unlikely(read_buf(control, sinfo->fd, blocksalt, SALT_LEN)))
|
||||
return -1;
|
||||
}
|
||||
|
|
@ -1589,14 +1654,14 @@ fill_another:
|
|||
|
||||
s_buf = malloc(MAX(u_len, MIN_SIZE));
|
||||
if (unlikely(u_len && !s_buf))
|
||||
fatal(control, "Unable to malloc buffer of size %lld in fill_buffer\n", u_len);
|
||||
fatal_return(("Unable to malloc buffer of size %lld in fill_buffer\n", u_len), -1);
|
||||
sinfo->ram_alloced += u_len;
|
||||
|
||||
if (unlikely(read_buf(control, sinfo->fd, s_buf, padded_len)))
|
||||
return -1;
|
||||
|
||||
if (ENCRYPT)
|
||||
lrz_decrypt(control, s_buf, padded_len, blocksalt);
|
||||
if (unlikely(!lrz_decrypt(control, s_buf, padded_len, blocksalt))) return -1;
|
||||
|
||||
ucthread[s->uthread_no].s_buf = s_buf;
|
||||
ucthread[s->uthread_no].c_len = c_len;
|
||||
|
|
@ -1612,10 +1677,13 @@ fill_another:
|
|||
|
||||
st = malloc(sizeof(stream_thread_struct));
|
||||
if (unlikely(!st))
|
||||
fatal(control, "Unable to malloc in fill_buffer");
|
||||
fatal_return(("Unable to malloc in fill_buffer"), -1);
|
||||
st->i = s->uthread_no;
|
||||
st->control = control;
|
||||
create_pthread(control, &threads[s->uthread_no], NULL, ucompthread, st);
|
||||
if (unlikely(!create_pthread(control, &threads[s->uthread_no], NULL, ucompthread, st))) {
|
||||
free(st);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (++s->uthread_no == s->base_thread + s->total_threads)
|
||||
s->uthread_no = s->base_thread;
|
||||
|
|
@ -1636,7 +1704,7 @@ out:
|
|||
unlock_mutex(control, &output_lock);
|
||||
|
||||
/* join_pthread here will make it wait till the data is ready */
|
||||
join_pthread(control, threads[s->unext_thread], NULL);
|
||||
if (unlikely(!join_pthread(control, threads[s->unext_thread], NULL))) return -1;
|
||||
ucthread[s->unext_thread].busy = 0;
|
||||
|
||||
print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread);
|
||||
|
|
@ -1668,7 +1736,7 @@ int write_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 le
|
|||
|
||||
/* Flush the buffer every sinfo->bufsize into one thread */
|
||||
if (sinfo->s[streamno].buflen == sinfo->bufsize)
|
||||
flush_buffer(control, sinfo, streamno);
|
||||
if (unlikely(!flush_buffer(control, sinfo, streamno))) return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -1712,7 +1780,7 @@ int close_stream_out(rzip_control *control, void *ss)
|
|||
|
||||
for (i = 0; i < sinfo->num_streams; i++) {
|
||||
if (sinfo->s[i].buflen)
|
||||
clear_buffer(control, sinfo, i, 0);
|
||||
if (unlikely(!clear_buffer(control, sinfo, i, 0))) return -1;
|
||||
}
|
||||
|
||||
if (ENCRYPT) {
|
||||
|
|
@ -1779,14 +1847,16 @@ static int lzo_compresses(rzip_control *control, uchar *s_buf, i64 s_len)
|
|||
return 1;
|
||||
wrkmem = (lzo_bytep) malloc(LZO1X_1_MEM_COMPRESS);
|
||||
if (unlikely(wrkmem == NULL))
|
||||
fatal(control, "Unable to allocate wrkmem in lzo_compresses\n");
|
||||
fatal_return(("Unable to allocate wrkmem in lzo_compresses\n"), 0);
|
||||
|
||||
in_len = MIN(test_len, buftest_size);
|
||||
dlen = STREAM_BUFSIZE + STREAM_BUFSIZE / 16 + 64 + 3;
|
||||
|
||||
c_buf = malloc(dlen);
|
||||
if (unlikely(!c_buf))
|
||||
fatal(control, "Unable to allocate c_buf in lzo_compresses\n");
|
||||
if (unlikely(!c_buf)) {
|
||||
free(wrkmem);
|
||||
fatal_return(("Unable to allocate c_buf in lzo_compresses\n"), 0);
|
||||
}
|
||||
|
||||
/* Test progressively larger blocks at a time and as soon as anything
|
||||
compressible is found, jump out as a success */
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue