PipedInputStream 与 TeeOutputStream 一起冻结应用程序,何时不读取?

PipedInputStream together with TeeOutputStream freezes the application, when not read?

PipedInputStream 与 TeeOutputStream 一起冻结应用程序,当未读取时!?

我指的是这个org.apache.commons.io.output.TeeOutputStream。为了更容易测试,我添加了它的一个简单变体作为 inner-class(MyTeeOutputStream) 所以你不必获取依赖项。

知道为什么会发生这种情况以及如何解决它吗?

代码

我制作了一个 JUnit5 测试用例供大家试用:

    @Test
    void testSplittingOutput() throws IOException, InterruptedException {
        PipedInputStream pipedInput = new PipedInputStream();
        OutputStream pipedOutput = new PipedOutputStream(pipedInput);
        //TeeOutputStream teeOutput = new TeeOutputStream(System.out, pipedOutput);
        MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
        PrintStream out = new PrintStream(teeOutput);

        final int expectedPrintedLinesCount = 1000;
        AtomicInteger actualPrintedLinesCount = new AtomicInteger();
        
        Thread t1 = new Thread(() -> { // Thread for writing data to OUT
            try {
                for (int i = 0; i < expectedPrintedLinesCount; i++) {
                    out.println("Hello! "+i);
                    actualPrintedLinesCount.incrementAndGet();
                    Thread.sleep(10);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });


        Thread t2 = new Thread(() -> { // Thread for reading data from IN
            BufferedReader reader = new BufferedReader(new InputStreamReader(pipedInput));
            StringBuilder builder = new StringBuilder();
            try {
                while(true){
                    builder.append(reader.readLine());
                }
            } catch (IOException e) {
                //e.printStackTrace(); // ignore
                System.out.println(builder);
            }
        });

        t1.start();
        //t2.start(); // If we aren't reading then PipedInputStream in Thread2, we only print 94 lines instead of 1000!?

        for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
            Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
        }
        
        Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get()+1);
    }

final class MyTeeOutputStream extends OutputStream {

    private final OutputStream out;
    private final OutputStream tee;

    public MyTeeOutputStream(OutputStream out, OutputStream tee) {
        if (out == null)
            throw new NullPointerException();
        else if (tee == null)
            throw new NullPointerException();

        this.out = out;
        this.tee = tee;
    }


    @Override
    public void write(int b) throws IOException {
        out.write(b);
        tee.write(b);
    }

    @Override
    public void write(byte[] b) throws IOException {
        out.write(b);
        tee.write(b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        out.write(b, off, len);
        tee.write(b, off, len);
    }

    @Override
    public void flush() throws IOException {
        out.flush();
        tee.flush();
    }

    @Override
    public void close() throws IOException {
        try {
            out.close();
        } finally {
            tee.close();
        }
    }
}

结果

如您所见,它无法打印所有行 (1000) 并在 (94) 处停止:

Hello! 0
Hello! 1
Hello! 2
Hello! 3
Hello! 4
Hello! 5
Hello! 6
Hello! 7
Hello! 8
Hello! 9
Hello! 10
Hello! 11
Hello! 12
Hello! 13
Hello! 14
Hello! 15
Hello! 16
Hello! 17
Hello! 18
Hello! 19
Hello! 20
Hello! 21
Hello! 22
Hello! 23
Hello! 24
Hello! 25
Hello! 26
Hello! 27
Hello! 28
Hello! 29
Hello! 30
Hello! 31
Hello! 32
Hello! 33
Hello! 34
Hello! 35
Hello! 36
Hello! 37
Hello! 38
Hello! 39
Hello! 40
Hello! 41
Hello! 42
Hello! 43
Hello! 44
Hello! 45
Hello! 46
Hello! 47
Hello! 48
Hello! 49
Hello! 50
Hello! 51
Hello! 52
Hello! 53
Hello! 54
Hello! 55
Hello! 56
Hello! 57
Hello! 58
Hello! 59
Hello! 60
Hello! 61
Hello! 62
Hello! 63
Hello! 64
Hello! 65
Hello! 66
Hello! 67
Hello! 68
Hello! 69
Hello! 70
Hello! 71
Hello! 72
Hello! 73
Hello! 74
Hello! 75
Hello! 76
Hello! 77
Hello! 78
Hello! 79
Hello! 80
Hello! 81
Hello! 82
Hello! 83
Hello! 84
Hello! 85
Hello! 86
Hello! 87
Hello! 88
Hello! 89
Hello! 90
Hello! 91
Hello! 92
Hello! 93
Hello! 94
org.opentest4j.AssertionFailedError: 
Expected :1000
Actual   :94
<Click to see difference>

详情

我为什么要这样做? 我想从中复制 System.out 和 'read'(通过 PipedInputStream),然后将该数据发送到我网站的 http 控制台。

A PipedInputStream 默认情况下只有 1024 的缓冲区大小,因此如果您添加到 PipedOutputStream 而没有在其他线程中读取,它将阻止写入,直到缓冲区被清空。这就是它停在 Hello! XX\r\n 的第 94 行的原因。 1024 除以 11 表示 pipedInput 中存储了 93 条完整的输出行,System.out 适用于第 94 行,但在 pipedInput 上阻塞会阻止添加更多行。

最适合我的方案:

NonBlockingPipedInputStream

public class NonBlockingPipedInputStream extends PipedInputStream {

    public interface WriteLineEvent<L>{
        void executeOnEvent(L l);
    }

    /**
     * Add actions to this list, which get run after a line has been written.
     * Contains the line as parameter.
     */
    public List<WriteLineEvent<String>> actionsOnWriteLineEvent = new CopyOnWriteArrayList<>();

    /**
     *
     * Starts a new {@link Thread}, that reads the {@link PipedInputStream}
     * and fires an event every time a full line was written.
     * To listen for those events, add the action that should be run to the {@link #actionsOnWriteLineEvent} list.
     */
    public NonBlockingPipedInputStream() {
        new Thread(()->{
            try{
                BufferedReader reader = new BufferedReader(new InputStreamReader(this));
                String line;
                while((line = reader.readLine()) != null){
                    String finalLine = line;
                    actionsOnWriteLineEvent.forEach(action -> action.executeOnEvent(finalLine));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

用法

    @Test
    void nonBlockingPipedInputStreamExample() throws IOException, InterruptedException {
        NonBlockingPipedInputStream pipedInput = new NonBlockingPipedInputStream();
        OutputStream pipedOutput = new PipedOutputStream(pipedInput);
        MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
        PrintStream out = new PrintStream(teeOutput);

        final int expectedPrintedLinesCount = 1000;
        AtomicInteger actualPrintedLinesCount = new AtomicInteger();
        AtomicInteger actualReadLinesCount = new AtomicInteger();

        Thread t1 = new Thread(() -> { // Thread for writing data to OUT
            try {
                for (int i = 1; i <= expectedPrintedLinesCount; i++) {
                    out.println("Hello! "+i);
                    actualPrintedLinesCount.incrementAndGet();
                    Thread.sleep(10);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // NonBlockingPipedInputStream starts a new thread when it is initialised.
        // That thread reads the PipedInputStream and fires an event every time a full line was written.
        pipedInput.actionsOnWriteLineEvent.add(line -> {
            actualReadLinesCount.getAndIncrement();
        });

        t1.start();

        for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
            Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
        }

        Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get());
        Assertions.assertEquals(expectedPrintedLinesCount, actualReadLinesCount.get());
    }