我实际上想要阻塞写入 linux 命名管道

I actually WANT a blocking write to a linux named pipe

我有一个线程应用程序。在一个线程中,我想将从数据库流式传输的数据写入命名管道,如果 reader(在本例中为命令行程序“zip”)跟不上 java线程。进入单个 Zip 文件条目的数据可能大于系统的主内存。

无论在写入命名管道时使用 FileOutputStream 还是 FileWriter,我都会看到以下行为: 写入会缓冲,直到 java 堆被填满,然后实际上将线程减慢到 reader 的速度。对于单线程进程,这只是 space 的浪费,但对于多线程进程,这会让 运行 其他线程陷入内存不足异常。

我看到的唯一剩余选项是使用 JNA 在 C 中执行阻塞写入。欢迎提出其他建议。

顺便说一句。我让“zip”工具进行压缩的原因是 java.util.zip 和 Lingala 的 Zip4J 会用缓冲区填充 RAM。

好的,这里是一个小的浓缩示例。我使用“mkfifo fifo”创建命名管道“fifo”并启动“zip --fifo -fz -v fifo.zip fifo”以在命名管道上阻止读取进程。 然后我用 -Xmx32M 启动下面的 java 程序。 没有“Mem Eater”线程,它的行为如上所述。有了它,该线程将 运行 抛出 OutOfMemoryException。 现在为代码:

import java.io.FileWriter;
import java.util.LinkedList;
import java.util.List;

public class fos {

    public static void main(String[] argv) {
        if (argv.length != 1) {
            System.err.println("Usage is:");
            System.err.println("java fos.java <fifo>");
            System.exit(-1);
        }

        String fifoName = argv[0];
        
        startMemConsumerThread();

        try (var fifoWriter = new FileWriter(fifoName)) {
            for(long i=0L; i< Long.MAX_VALUE; i++)
                fifoWriter.write("Hello World! "+i+"\r\n");
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }

    private static void startMemConsumerThread() {
        final int NUM_CHUNKS = 20;
        final int CHUNK_SIZE = 1024*1024;
        final List< byte[] > chunks = new LinkedList<>();
        
        var t = new Thread("Mem Eater") {
            @Override
            public void run() {
                while (true) {
                    while (chunks.size() < NUM_CHUNKS)
                        chunks.add(new byte[CHUNK_SIZE]);
                    chunks.remove(NUM_CHUNKS % 7);
                }
            }
        };
        
        t.setDaemon(true);
        t.start();
    }
}

我围绕打开、写入、关闭做了一个薄的 JNA 包装器,以具有写入的阻塞行为。现在我不再观察到无限内存消耗,但 Java 堆将增长到配置的最大值并且 GC 将优雅地处理它。

我必须纠正自己:即使在使用 FileWriter.write 时也会发生阻塞并且内存消耗是有限的。感谢 joni 的鼓励!

但是在这个用例中使用 java.util.ZipOutputStream 时事情就失控了。但那是另外一回事了...