Java 按顺序解压缩 GZIP 流

Java decompress GZIP stream sequentially

我的 Java 程序实现了一个服务器,它应该通过 websockets 从客户端获取一个非常大的文件,使用 gzip 压缩,并且应该检查文件内容中的某些字节模式。

客户端发送嵌入在专有协议中的文件块,因此我从客户端收到一条又一条消息,解析消息并提取 gzip 文件内容。

我无法将整个文件保存在程序内存中,因此我尝试解压缩每个块,处理数据并继续处理下一个块。

我正在使用以下代码:

public static String gzipDecompress(byte[] compressed) throws IOException {
    String uncompressed;
    try (
        ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
        GZIPInputStream gis = new GZIPInputStream(bis);
        Reader reader = new InputStreamReader(gis);
        Writer writer = new StringWriter()
    ) {

      char[] buffer = new char[10240];
      for (int length = 0; (length = reader.read(buffer)) > 0; ) {
        writer.write(buffer, 0, length);
      }
      uncompressed = writer.toString();
    }

    return uncompressed;
  }

但是在使用第一个压缩块调用函数时出现以下异常:

java.io.EOFException: Unexpected end of ZLIB input stream
    at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240)
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.Reader.read(Reader.java:140)

重要的是要提到我没有跳过任何块并尝试按顺序解压缩块。

我错过了什么?

问题是你手动玩这些块。

正确的方法是获取一些InputStream,用GZIPInputStream包裹起来,然后读取数据。

    InputStream is = // obtain the original gzip stream

    GZIPInputStream gis = new GZIPInputStream(is);
    Reader reader = new InputStreamReader(gis);

    //... proceed reading and so on

GZIPInputStream 以流方式工作,因此如果您一次只从 reader 请求 10kb,无论初始 GZIP 文件的大小如何,总体内存占用量都会很低。

问题更新后更新

针对您的情况的一个可能的解决方案是编写一个 InputStream 实现,它流式传输由您的客户端协议处理程序以块形式放入其中的字节。

原型如下:

public class ProtocolDataInputStream extends InputStream {
    private BlockingQueue<byte[]> nextChunks = new ArrayBlockingQueue<byte[]>(100);
    private byte[] currentChunk = null;
    private int currentChunkOffset = 0;
    private boolean noMoreChunks = false;

    @Override
    public synchronized int read() throws IOException {
        boolean takeNextChunk = currentChunk == null || currentChunkOffset >= currentChunk.length;
        if (takeNextChunk) {
            if (noMoreChunks) {
                // stream is exhausted
                return -1;
            } else {
                currentChunk = nextChunks.take();
                currentChunkOffset = 0;
            }
        }
        return currentChunk[currentChunkOffset++];
    }

    @Override
    public synchronized int available() throws IOException {
        if (currentChunk == null) {
            return 0;
        } else {
            return currentChunk.length - currentChunkOffset;
        }
    }

    public synchronized void addChunk(byte[] chunk, boolean chunkIsLast) {
        nextChunks.add(chunk);
        if (chunkIsLast) {
            noMoreChunks = true;
        }
    }
}

您的客户端协议处理程序使用 addChunk() 添加字节块,而您的解压缩代码从该流中提取数据(通过 Reader)。

请注意此代码存在一些问题:

  1. 正在使用的队列大小有限。如果 addChunk() 调用过于频繁,队列可能已满,从而阻塞 addChunk()。这可能是可取的,也可能不是。
  2. 仅实现 read() 方法用于说明目的。为了性能,最好用同样的方式实现read(byte[])
  3. 在假设 reader(解压器)和编写器(调用协议处理程序 addChunk())是不同线程的情况下使用保守同步。
  4. InterruptedException 未在 take() 上处理以避免过多的细节。

如果你的解压器和addChunk()在同一个线程中执行(在同一个循环中),那么你可以在使用InputStream或[=拉取时尝试使用InputStream.available()方法29=] 用 Reader.

拉动时

gzip 流中的任意字节序列不是有效的独立 gzip 数据。无论如何,您必须连接所有字节块。

最简单的方法是用一个简单的管道将它们全部累积起来:

import java.io.PipedOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;

public class ChunkInflater {
    private final PipedOutputStream pipe;

    private final InputStream stream;

    public ChunkInflater()
    throws IOException {
        pipe = new PipedOutputStream();
        stream = new GZIPInputStream(new PipedInputStream(pipe));
    }

    public InputStream getInputStream() {
        return stream;
    }

    public void addChunk(byte[] compressedChunk)
    throws IOException {
        pipe.write(compressedChunk);
    }
}

现在您有了一个 InputStream,您可以按所需的任何增量读取。例如:

ChunkInflater inflater = new ChunkInflater();

Callable<Void> chunkReader = new Callable<Void>() {
    @Override
    public Void call()
    throws IOException {
        byte[] chunk;
        while ((chunk = readChunkFromSource()) != null) {
            inflater.addChunk(chunk);
        }

        return null;
    }
};
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(chunkReader);
executor.shutdown();

Reader reader = new InputStreamReader(inflater.getInputStream());
// read text here