如何在 winAPI 中将 Zlib 与串联的 .gz 文件一起使用?

How do I use Zlib with concatenated .gz files in winAPI?

我正在从 AWS 下载常见的爬网文件。显然,它们是大型串联的 .gz 文件,gzip 标准支持这些文件。我正在使用 zlib 进行压缩,但我只获取文件的解压缩内容直到第一次连接。我尝试添加 inflateReset() 但随后出现错误 -5,这表明缓冲区或文件有问题。我怀疑我很接近。

这是没有 inflateReset 的代码。它适用于非串联文件。

#include "zlib.h"  
#define CHUNK 16384   
...
file = L"CC-MAIN-20181209185547-20181209211547-00040.warc.wet.gz";
fileDecompress(&file);

DWORD WINAPI fileDecompress(LPVOID lpParameter)
{
wstring dir = L"C:\AI\corpora\";
wstring* lpFileName = static_cast<wstring*>(lpParameter);
sendToReportWindow(L"File to decompress is \"%s\" in \"%s\"\n", lpFileName->c_str(), dir.c_str());
wstring sourcePath = dir + lpFileName->c_str();
sendToReportWindow(L"input file with path:%s\n", sourcePath.c_str());
wstring destPath = dir + lpFileName->c_str() + L".wet";
sendToReportWindow(L"output file with path:%s\n", destPath.c_str());

HANDLE InputFile = INVALID_HANDLE_VALUE;
HANDLE OutputFile = INVALID_HANDLE_VALUE;
BOOL Success;
DWORD InputFileSize;
ULONGLONG StartTime, EndTime;
LARGE_INTEGER FileSize;

//  Open input file for reading, existing file only.
InputFile = CreateFile(
    sourcePath.c_str(),       //  Input file name, compressed file
    GENERIC_READ,             //  Open for reading
    FILE_SHARE_READ,          //  Share for read
    NULL,                     //  Default security
    OPEN_EXISTING,            //  Existing file only
    FILE_ATTRIBUTE_NORMAL,    //  Normal file
    NULL);                    //  No template

if (InputFile == INVALID_HANDLE_VALUE)
{
    sendToReportWindow(L"Cannot open input \t%s\n", sourcePath.c_str());
    return 0;
}

OutputFile = CreateFile(
    destPath.c_str(),         //  Input file name, compressed file
    GENERIC_WRITE,            //  Open for reading
    0,                        //  Share for read
    NULL,                     //  Default security
    CREATE_ALWAYS,            //  Existing file only
    FILE_ATTRIBUTE_NORMAL,    //  Normal file
    NULL);                    //  No template

if (OutputFile == INVALID_HANDLE_VALUE)
{
    sendToReportWindow(L"Cannot open output \t%s\n", destPath.c_str());
    return 0;
}

//  Get compressed file size.
Success = GetFileSizeEx(InputFile, &FileSize);
if ((!Success) || (FileSize.QuadPart > 0xFFFFFFFF))
{
    sendToReportWindow(L"Cannot get input file size or file is larger than 4GB.\n");
    CloseHandle(InputFile);
    return 0;
}
InputFileSize = FileSize.LowPart;

sendToReportWindow(L"input file size: %u bytes\n", InputFileSize);

int ret;
unsigned have;
z_stream strm;
unsigned char in[CHUNK];
unsigned char out[CHUNK];

strm.zalloc = Z_NULL;              // allocate inflate state
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = 0;
strm.next_in = Z_NULL;

ret = inflateInit2(&strm, 16 + MAX_WBITS);
if (ret != Z_OK)
{
return 0;
}

do {                                                                    /* decompress until deflate stream ends or end of file */  
    DWORD read;
    BOOL res = ReadFile(InputFile, in, CHUNK, &read, NULL);

    strm.avail_in = read;
    if (!res) {
        (void)inflateEnd(&strm);
        sendToReportWindow(L"read error on input file\n");
        return 0;
    }

    if (strm.avail_in == 0)
    {
        break;
    }
    strm.next_in = in;


        /* run inflate() on input until output buffer not full */
    do {
        strm.avail_out = CHUNK;
        strm.next_out = out;
        ret = inflate(&strm, Z_NO_FLUSH);

        assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
        switch (ret) {
        case Z_NEED_DICT:                                           // 2
            sendToReportWindow(L"z_need_dict:%d\n", ret);
            (void)inflateEnd(&strm);
            return 0;
            //ret = Z_DATA_ERROR;     /* and fall through */
        case Z_DATA_ERROR:                                          // -3
            sendToReportWindow(L"z_data_error:%d\n", ret);
            (void)inflateEnd(&strm);
            return 0;
        case Z_MEM_ERROR:                                           // -4
            (void)inflateEnd(&strm);
            sendToReportWindow(L"z_mem_error:%d\n", ret);
            sendToReportWindow(L"ret:%d\n", ret);
            DisplayErrorBox((LPWSTR)L"inflate");
            return 0;
        case Z_BUF_ERROR:                                           // -5
            sendToReportWindow(L"z_buf_error:%d\n", ret);
            (void)inflateEnd(&strm);
            return 0;
        }

        have = CHUNK - strm.avail_out;   
        DWORD written;
        BOOL res = WriteFile(OutputFile, out, have, &written, NULL);

        if (written != have || !res) {
            (void)inflateEnd(&strm);
            sendToReportWindow(L"file write error:%d\n", res);
            return 0;
        }
 
    } while (strm.avail_out == 0);          //  avail_out == 0 means output buffer is full 
} while (ret != Z_STREAM_END);  /* done when inflate() says it's done */            // Z_STREAM_END is 1

(void)inflateEnd(&strm);
CloseHandle(InputFile); CloseHandle(OutputFile);
return 0;
}

这是添加了 inflateReset() 的版本。此版本导致 inflate 生成错误 -5(缓冲区错误或文件被截断)。

...
int ret;
z_stream strm{};
array<uint8_t, CHUNK> scratch = {}; //scratch buffer for decompressing the data.

strm.zalloc = Z_NULL;              // allocate inflate state
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
strm.avail_in = 0;
strm.next_in = Z_NULL;

ret = inflateInit2(&strm, 16 + MAX_WBITS);
if (ret != Z_OK)
{
    return 0;
}

do {                                                                    /* decompress until deflate stream ends or end of file */ 
    DWORD read;
    BOOL res = ReadFile(InputFile, in, CHUNK, &read, NULL);

    strm.avail_in = read;
    if (!res) {
        (void)inflateEnd(&strm);
        sendToReportWindow(L"read error on input file\n");
        return 0;
    }

    if (strm.avail_in == 0)
    {
        sendToReportWindow(L"strm.avail_in:%d\n", strm.avail_in);       // strm.avail_in = 0
        break;
    }
    strm.next_in = in;

        /* run inflate() on input until output buffer not full */
    do {
        strm.avail_out = scratch.size();
        strm.next_out = scratch.data();
        ret = inflate(&strm, Z_NO_FLUSH);

        //if (ret != Z_OK) break;                                     // 0
        
        switch (ret) {
        case Z_NEED_DICT:                                           // 2
            sendToReportWindow(L"z_need_dict:%d\n", ret);
            (void)inflateEnd(&strm);
            return 0;
            //ret = Z_DATA_ERROR;     /* and fall through */
        case Z_STREAM_ERROR:                                        // -2
            sendToReportWindow(L"Z_STREAM_ERROR:%d\n", ret);
            (void)inflateEnd(&strm);
            return 0;
        case Z_DATA_ERROR:                                          // -3
            sendToReportWindow(L"z_data_error:%d\n", ret);
            (void)inflateEnd(&strm);
            return 0;
        case Z_MEM_ERROR:                                           // -4
            (void)inflateEnd(&strm);
            sendToReportWindow(L"z_mem_error:%d\n", ret);
            sendToReportWindow(L"ret:%d\n", ret);
            DisplayErrorBox((LPWSTR)L"inflate");
            return 0;
        case Z_BUF_ERROR:                                           // -5
            sendToReportWindow(L"z_buf_error:%d\n", ret);
            (void)inflateEnd(&strm);
            //return 0;
            break;
        }

        auto bytes_decoded = scratch.size() - strm.avail_out;
       
        DWORD written;
        BOOL res = WriteFile(OutputFile, &scratch, bytes_decoded, &written, NULL);

        if (ret == Z_STREAM_END) break;

    } while (true);          //  avail_out == 0 means output buffer is full

    ret == Z_STREAM_END;

    auto reset_result = inflateReset(&strm);        // work with concatenation
    sendToReportWindow(L"resetting inflate: %d\n", reset_result);
    assert(reset_result == Z_OK);      

} while (strm.avail_in > 0);
...

谢谢!

更新:我认为 readFile 应该以 CHUNK 而不是 1 读取。两个示例都已更改。这现在给我错误 -3:“Z_DATA_ERROR”。检查此更改现在是否实际上多次访问 readfile。

我要压缩的典型文件:[https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2018-51/segments/1544376823009.19/wet/CC-MAIN-20181209185547-20181209211547-00041.warc.wet.gz]

更新 2: 谢谢马克阿德勒!使用您提供的示例,我能够修复代码中的逻辑。这满足 winAPI 要求。我还添加了文件扩展处理、将内容移至堆并添加了一个计时器。计时器显示更多内存有助于减少 30% 的放气时间。

DWORD WINAPI fileDecompress(LPVOID lpParameter)
{                                                                                
// zlib does not work with .zip files
sendToReportWindow(L"inside fileDecompress()\n");                            
// deflate .gz (gzip) files. single or multiple member (concatenated)

wstring dir = L"C:\AI\corpora\";
wstring* lpFileName = static_cast<wstring*>(lpParameter);
sendToReportWindow(L"File to decompress is \"%s\" in \"%s\"\n", lpFileName->c_str(), dir.c_str());
wstring sourcePath = dir + lpFileName->c_str();
sendToReportWindow(L"input file with path:%s\n", sourcePath.c_str());

wstring::size_type lastdot = lpFileName->find_last_of(L".");                 // remove .gz extension: get length to last dot and truncate
lpFileName->resize(lastdot);
wstring destPath = dir + lpFileName->c_str();
sendToReportWindow(L"output file with path:%s\n", destPath.c_str());

HANDLE InputFile = INVALID_HANDLE_VALUE;
HANDLE OutputFile = INVALID_HANDLE_VALUE;
BOOL Success;
DWORD InputFileSize;
ULONGLONG StartTime, EndTime;
LARGE_INTEGER FileSize;
double InflateTime;

InputFile = CreateFile(
    sourcePath.c_str(),       //  Input file name, compressed file
    GENERIC_READ,             //  Open for reading
    FILE_SHARE_READ,          //  Share for read
    NULL,                     //  Default security
    OPEN_EXISTING,            //  Existing file only
    FILE_ATTRIBUTE_NORMAL,    //  Normal file
    NULL);                    //  No template

if (InputFile == INVALID_HANDLE_VALUE){sendToReportWindow(L"Cannot open input \t%s\n", sourcePath.c_str()); return 0; }

OutputFile = CreateFile(
    destPath.c_str(),         //  Input file name, compressed file
    GENERIC_WRITE,            //  Open for reading
    0,                        //  Share for read
    NULL,                     //  Default security
    CREATE_ALWAYS,            //  Existing file only
    FILE_ATTRIBUTE_NORMAL,    //  Normal file
    NULL);                    //  No template

if (OutputFile == INVALID_HANDLE_VALUE){sendToReportWindow(L"Cannot open output \t%s\n", destPath.c_str()); return 0; }

Success = GetFileSizeEx(InputFile, &FileSize);                              // Get compressed file size.
if ((!Success) || (FileSize.QuadPart > 0xFFFFFFFF))
{
    sendToReportWindow(L"Cannot get input file size or file is larger than 4GB.\n");
    CloseHandle(InputFile);
    return 0;
}
InputFileSize = FileSize.LowPart;
sendToReportWindow(L"input file size: %u bytes\n", InputFileSize);

StartTime = GetTickCount64();

#define CHUNK 524288                                                        // buffer size. doesn't use much ram and speeds up inflate
z_stream strm = {};                                                         // Initialize zlib for file compression/decompression
int ret = inflateInit2(&strm, 16 + MAX_WBITS);
assert(ret == Z_OK);

unsigned char *in = new unsigned char[CHUNK]; unsigned char* out = new unsigned char[CHUNK];   

for (;;) {                                                                  // Decompress from input to output.
    if (strm.avail_in == 0) {                                               // Keep reading until the end of the input file or an error
        DWORD read;
        (void)ReadFile(InputFile, in, CHUNK, &read, NULL);
        strm.avail_in = read;
        if (strm.avail_in == 0)
            break;
        strm.next_in = in;
    }

    do {                                                                    // Decompress all of what's in the CHUNK in buffer.
        strm.avail_out = CHUNK;                                                     
        strm.next_out = out;
        ret = inflate(&strm, Z_NO_FLUSH);                                   // Decompress as much as possible to the CHUNK out buffer.
                                                                          
        size_t got = CHUNK - strm.avail_out;                                
        DWORD written;                                                      
        (void)WriteFile(OutputFile, out, got, &written, NULL);              // Write to the outputFile whatever inflate() left in out buffer
        if (written != got) {sendToReportWindow(L"file write error\n"); delete[] in; delete[] out; return 0;}
                                                                                                                      
        if (ret == Z_STREAM_END)                                            // Check for the end of a gzip member, in which case, 
            assert(inflateReset(&strm) == Z_OK);                            // reset inflate for the next gzip member. (concatenated files)

        else if (ret != Z_OK) {                                             // Return on a data error.
            assert(ret == Z_DATA_ERROR);
            (void)inflateEnd(&strm);
            delete[] in; delete[] out;
            return 0;
        }   
    } while (strm.avail_in > 0);                                            // Continue until everything in the input buffer is consumed.
}                                                                           // for() loop to get next input buffer CHUNK from input file    

EndTime = GetTickCount64();
InflateTime = (EndTime - StartTime) / 1000.0;                               //  Get how long it took to inflate file

delete[] in; delete[] out;
(void)inflateEnd(&strm);                                                       
CloseHandle(InputFile); CloseHandle(OutputFile);
sendToReportWindow(L"Inflate Time: %.2f seconds. Done with fileDecompress function.\n", InflateTime);
return 0;
}

您的编译器至少不会警告您有关裸条件 ret == Z_STREAM_END; 的信息吗?你想要一个 if 和一些围绕 inflateReset() 相关语句的大括号。

仍然存在一个问题,如果 strm.avail_in 为零,您将离开外循环。每次都会发生这种情况,除非到达成员末尾。如果您恰好用尽输入缓冲区来解压缩该成员,那么它甚至可能会发生。只需将外循环设为 while (true).

即使在修复了所有这些问题之后,当您在外部循环的顶部进行读取时,您仍会丢弃剩余的可用输入。仅当 strm.avail_in 为零时才读取。

一种更简单的方法是在内循环中进行重置。像这样(C 中的示例):

// Decompress a gzip file input, potentially with multiple gzip members. Write
// the decompressed data to output. Return Z_STREAM_END on success. Return Z_OK
// if the gzip stream was correct up to where it ended prematurely. Return
// Z_DATA error if the gzip stream is invalid.
int inflate_gzip(FILE *input, FILE *output) {
    // Initialize inflate for gzip input.
    z_stream strm = {};
    int ret = inflateInit2(&strm, 16 + MAX_WBITS);
    assert(ret == Z_OK);

    // Decompress from input to output.
    unsigned char in[CHUNK];
    for (;;) {
        // Keep reading until the end of the input file or an error.
        if (strm.avail_in == 0) {
            strm.avail_in = fread(in, 1, CHUNK, input);
            if (strm.avail_in == 0)
                break;
            strm.next_in = in;
        }

        // Decompress all of what's in the input buffer.
        do {
            // Decompress as much as possible to the CHUNK output buffer.
            unsigned char out[CHUNK];
            strm.avail_out = CHUNK;
            strm.next_out = out;
            ret = inflate(&strm, Z_NO_FLUSH);

            // Write to the output file whatever inflate() left in the output
            // buffer. Return with an error if the write does not complete.
            size_t got = CHUNK - strm.avail_out;
            size_t put = fwrite(out, 1, got, output);
            if (put != got)
                return Z_ERRNO;

            // Check for the end of a gzip member, in which case reset inflate
            // for the next gzip member.
            if (ret == Z_STREAM_END)
                assert(inflateReset(&strm) == Z_OK);

            // Return on a data error.
            else if (ret != Z_OK) {
                assert(ret == Z_DATA_ERROR);
                (void)inflateEnd(&strm);
                return ret;
            }

            // Continue until everything in the input buffer is consumed.
        } while (strm.avail_in > 0);
    }

    // Successfully decompressed all of the input file. Clean up and return.
    assert(inflateEnd(&strm) == Z_OK);
    return ret;
}