如何使用 Zstd-jni 和字节缓冲区解压缩大文件
How do I decompress large files using Zstd-jni and Byte Buffers
我正在尝试解压缩许多 40 MB 以上的文件,因为我使用 ByteBuffers 和 Channels 并行下载它们。与使用 Streams 相比,使用 Channels 获得了更高的吞吐量,我们需要这是一个非常高的吞吐量系统,因为我们每天需要处理 40 TB 的文件,而这部分过程目前是瓶颈。这些文件使用 zstd-jni 压缩。 Zstd-jni 有 api 用于解压缩字节缓冲区,但是当我使用它们时出现错误。如何使用 zstd-jni 一次解压一个字节缓冲区?
我在他们的测试中找到了这些示例,但除非我遗漏了某些内容,否则使用 ByteBuffers 的示例似乎假设整个输入文件都适合一个 ByteBuffer:
https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala
下面是我压缩和解压文件的代码。压缩代码效果很好,但是解压代码然后失败并出现错误-70。
public static long compressFile(String inFile, String outFolder, ByteBuffer inBuffer, ByteBuffer compressedBuffer, int compressionLevel) throws IOException {
File file = new File(inFile);
File outFile = new File(outFolder, file.getName() + ".zs");
long numBytes = 0l;
try (RandomAccessFile inRaFile = new RandomAccessFile(file, "r");
RandomAccessFile outRaFile = new RandomAccessFile(outFile, "rw");
FileChannel inChannel = inRaFile.getChannel();
FileChannel outChannel = outRaFile.getChannel()) {
inBuffer.clear();
while(inChannel.read(inBuffer) > 0) {
inBuffer.flip();
compressedBuffer.clear();
long compressedSize = Zstd.compressDirectByteBuffer(compressedBuffer, 0, compressedBuffer.capacity(), inBuffer, 0, inBuffer.limit(), compressionLevel);
numBytes+=compressedSize;
compressedBuffer.position((int)compressedSize);
compressedBuffer.flip();
outChannel.write(compressedBuffer);
inBuffer.clear();
}
}
return numBytes;
}
public static long decompressFile(String originalFilePath, String inFolder, ByteBuffer inBuffer, ByteBuffer decompressedBuffer) throws IOException {
File outFile = new File(originalFilePath);
File inFile = new File(inFolder, outFile.getName() + ".zs");
outFile = new File(inFolder, outFile.getName());
long numBytes = 0l;
try (RandomAccessFile inRaFile = new RandomAccessFile(inFile, "r");
RandomAccessFile outRaFile = new RandomAccessFile(outFile, "rw");
FileChannel inChannel = inRaFile.getChannel();
FileChannel outChannel = outRaFile.getChannel()) {
inBuffer.clear();
while(inChannel.read(inBuffer) > 0) {
inBuffer.flip();
decompressedBuffer.clear();
long compressedSize = Zstd.decompressDirectByteBuffer(decompressedBuffer, 0, decompressedBuffer.capacity(), inBuffer, 0, inBuffer.limit());
System.out.println(Zstd.isError(compressedSize) + " " + compressedSize);
numBytes+=compressedSize;
decompressedBuffer.position((int)compressedSize);
decompressedBuffer.flip();
outChannel.write(decompressedBuffer);
inBuffer.clear();
}
}
return numBytes;
}
是的,您在示例中使用的静态方法假设整个压缩文件适合一个 ByteBuffer。据我了解您的要求,您需要使用 ByteBuffers 进行流式解压缩。 ZstdDirectBufferDecompressingStream 已经提供了这个:
这是一个如何使用它的例子(来自测试):
https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala#L261-L302
但您还必须对其进行子类化并覆盖 "refill" 方法。
编辑:这是我刚刚添加的新测试,它与您的问题具有完全相同的结构 - 在通道之间移动数据:
https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala#L540-L586
我正在尝试解压缩许多 40 MB 以上的文件,因为我使用 ByteBuffers 和 Channels 并行下载它们。与使用 Streams 相比,使用 Channels 获得了更高的吞吐量,我们需要这是一个非常高的吞吐量系统,因为我们每天需要处理 40 TB 的文件,而这部分过程目前是瓶颈。这些文件使用 zstd-jni 压缩。 Zstd-jni 有 api 用于解压缩字节缓冲区,但是当我使用它们时出现错误。如何使用 zstd-jni 一次解压一个字节缓冲区?
我在他们的测试中找到了这些示例,但除非我遗漏了某些内容,否则使用 ByteBuffers 的示例似乎假设整个输入文件都适合一个 ByteBuffer: https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala
下面是我压缩和解压文件的代码。压缩代码效果很好,但是解压代码然后失败并出现错误-70。
public static long compressFile(String inFile, String outFolder, ByteBuffer inBuffer, ByteBuffer compressedBuffer, int compressionLevel) throws IOException {
File file = new File(inFile);
File outFile = new File(outFolder, file.getName() + ".zs");
long numBytes = 0l;
try (RandomAccessFile inRaFile = new RandomAccessFile(file, "r");
RandomAccessFile outRaFile = new RandomAccessFile(outFile, "rw");
FileChannel inChannel = inRaFile.getChannel();
FileChannel outChannel = outRaFile.getChannel()) {
inBuffer.clear();
while(inChannel.read(inBuffer) > 0) {
inBuffer.flip();
compressedBuffer.clear();
long compressedSize = Zstd.compressDirectByteBuffer(compressedBuffer, 0, compressedBuffer.capacity(), inBuffer, 0, inBuffer.limit(), compressionLevel);
numBytes+=compressedSize;
compressedBuffer.position((int)compressedSize);
compressedBuffer.flip();
outChannel.write(compressedBuffer);
inBuffer.clear();
}
}
return numBytes;
}
public static long decompressFile(String originalFilePath, String inFolder, ByteBuffer inBuffer, ByteBuffer decompressedBuffer) throws IOException {
File outFile = new File(originalFilePath);
File inFile = new File(inFolder, outFile.getName() + ".zs");
outFile = new File(inFolder, outFile.getName());
long numBytes = 0l;
try (RandomAccessFile inRaFile = new RandomAccessFile(inFile, "r");
RandomAccessFile outRaFile = new RandomAccessFile(outFile, "rw");
FileChannel inChannel = inRaFile.getChannel();
FileChannel outChannel = outRaFile.getChannel()) {
inBuffer.clear();
while(inChannel.read(inBuffer) > 0) {
inBuffer.flip();
decompressedBuffer.clear();
long compressedSize = Zstd.decompressDirectByteBuffer(decompressedBuffer, 0, decompressedBuffer.capacity(), inBuffer, 0, inBuffer.limit());
System.out.println(Zstd.isError(compressedSize) + " " + compressedSize);
numBytes+=compressedSize;
decompressedBuffer.position((int)compressedSize);
decompressedBuffer.flip();
outChannel.write(decompressedBuffer);
inBuffer.clear();
}
}
return numBytes;
}
是的,您在示例中使用的静态方法假设整个压缩文件适合一个 ByteBuffer。据我了解您的要求,您需要使用 ByteBuffers 进行流式解压缩。 ZstdDirectBufferDecompressingStream 已经提供了这个:
这是一个如何使用它的例子(来自测试):
https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala#L261-L302
但您还必须对其进行子类化并覆盖 "refill" 方法。
编辑:这是我刚刚添加的新测试,它与您的问题具有完全相同的结构 - 在通道之间移动数据:
https://github.com/luben/zstd-jni/blob/master/src/test/scala/Zstd.scala#L540-L586