Implement massive multithreading decompression.

This is done by taking each stream of data on read in into separate buffers for up to as many threads as CPUs.
As each thread's data becomes available, feed it into runzip once it is requests more of the stream.
Provided there are enough chunks in the originally compressed data, this provides a massive speedup potentially proportional to the number of CPUs. The slower the backend compression, the better the speed up (i.e. zpaq is the best sped up).
Fix the output of zpaq compress and decompress from trampling on itself and racing and consuming a lot of CPU time printing to the console.
When limiting cwindow to 6 on 32 bits, ensure that control.window is also set.
When testing for the maximum size of testmalloc, the multiple used was out by one, so increase it.
Minor output tweaks.
This commit is contained in:
Con Kolivas 2010-11-16 21:25:32 +11:00
parent b17ad2fa41
commit 2b08c6e280
4 changed files with 199 additions and 101 deletions

View file

@ -1568,11 +1568,12 @@ bool find_start(FILE *in) {
}
// Decompress to stdout
static void decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress) {
static void decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,
int progress, long thread) {
long long int len = 0;
static int last_pct = 0, chunk = 0;
int pct = 0;
int last_pct = 0;
int i, pct = 0;
// Find start of block
while (find_start(in)) {
if (getc(in)!=LEVEL || getc(in)!=1)
@ -1600,8 +1601,12 @@ static void decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,
if (progress) {
len++;
pct = (len * 100 / buf_len);
if (pct != last_pct) {
fprintf(msgout, "\r ZPAQ Chunk %d of 2 Decompress: %i%% \r", (chunk + 1), pct);
if (pct / 10 != last_pct / 10) {
fprintf(msgout, "\r\t\t\t\tZPAQ\t");
for (i = 0; i < thread; i++)
fprintf(msgout, "\t");
fprintf(msgout, "%ld: %i%%\r",
thread + 1, pct);
fflush(msgout);
last_pct = pct;
}
@ -1623,16 +1628,11 @@ static void decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,
}
if (c!=255) error("missing end of block marker");
}
if (progress) {
fprintf(msgout, "\t \r");
fflush(msgout);
}
chunk ^= 1;
}
extern "C" void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress)
extern "C" void zpipe_decompress(FILE *in, FILE *out, FILE *msgout, long long int buf_len, int progress, long thread)
{
decompress(in, out, msgout, buf_len, progress);
decompress(in, out, msgout, buf_len, progress, thread);
}
//////////////////////////// Compressor ////////////////////////////
@ -1708,7 +1708,7 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,
207,8,112,56,0};
long long int len = 0;
static int last_pct = 0, chunk = 0;
int last_pct = 0;
int i, pct = 0;
// Initialize
ZPAQL z; // model
@ -1738,12 +1738,12 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,
if (progress) {
len++;
pct = (len * 100 / buf_len);
if (pct != last_pct && !(pct % 10)) {
if (pct / 10 != last_pct / 10) {
fprintf(msgout, "\r\t\t\t\tZPAQ\t");
for (i = 0; i < thread; i++)
fprintf(msgout, "\t\t");
fprintf(msgout, "%ld: %i/2: %i%%\r",
thread, (chunk + 1), pct);
fprintf(msgout, "\t");
fprintf(msgout, "%ld: %i%%\r",
thread + 1, pct);
fflush(msgout);
last_pct = pct;
}
@ -1751,10 +1751,6 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,
enc.compress(c);
sha1.put(c);
}
if (progress) {
//fprintf(msgout, "\t \r");
//fflush(msgout);
}
enc.compress(-1); // end of segment
// Write segment checksum and trailer
@ -1767,7 +1763,6 @@ static void compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,
// Optional: append a byte not 'z' to allow non-ZPAQ data to be appended
//putc(0, out);
chunk ^= 1;
}
extern "C" void zpipe_compress(FILE *in, FILE *out, FILE *msgout, long long int buf_len,