PipedInputStream 与 TeeOutputStream 一起冻结应用程序,何时不读取?
PipedInputStream together with TeeOutputStream freezes the application, when not read?
PipedInputStream 与 TeeOutputStream 一起冻结应用程序,当未读取时!?
我指的是这个org.apache.commons.io.output.TeeOutputStream
。为了更容易测试,我添加了它的一个简单变体作为 inner-class(MyTeeOutputStream
) 所以你不必获取依赖项。
知道为什么会发生这种情况以及如何解决它吗?
代码
我制作了一个 JUnit5 测试用例供大家试用:
@Test
void testSplittingOutput() throws IOException, InterruptedException {
PipedInputStream pipedInput = new PipedInputStream();
OutputStream pipedOutput = new PipedOutputStream(pipedInput);
//TeeOutputStream teeOutput = new TeeOutputStream(System.out, pipedOutput);
MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
PrintStream out = new PrintStream(teeOutput);
final int expectedPrintedLinesCount = 1000;
AtomicInteger actualPrintedLinesCount = new AtomicInteger();
Thread t1 = new Thread(() -> { // Thread for writing data to OUT
try {
for (int i = 0; i < expectedPrintedLinesCount; i++) {
out.println("Hello! "+i);
actualPrintedLinesCount.incrementAndGet();
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> { // Thread for reading data from IN
BufferedReader reader = new BufferedReader(new InputStreamReader(pipedInput));
StringBuilder builder = new StringBuilder();
try {
while(true){
builder.append(reader.readLine());
}
} catch (IOException e) {
//e.printStackTrace(); // ignore
System.out.println(builder);
}
});
t1.start();
//t2.start(); // If we aren't reading then PipedInputStream in Thread2, we only print 94 lines instead of 1000!?
for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
}
Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get()+1);
}
final class MyTeeOutputStream extends OutputStream {
private final OutputStream out;
private final OutputStream tee;
public MyTeeOutputStream(OutputStream out, OutputStream tee) {
if (out == null)
throw new NullPointerException();
else if (tee == null)
throw new NullPointerException();
this.out = out;
this.tee = tee;
}
@Override
public void write(int b) throws IOException {
out.write(b);
tee.write(b);
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
tee.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
tee.write(b, off, len);
}
@Override
public void flush() throws IOException {
out.flush();
tee.flush();
}
@Override
public void close() throws IOException {
try {
out.close();
} finally {
tee.close();
}
}
}
结果
如您所见,它无法打印所有行 (1000) 并在 (94) 处停止:
Hello! 0
Hello! 1
Hello! 2
Hello! 3
Hello! 4
Hello! 5
Hello! 6
Hello! 7
Hello! 8
Hello! 9
Hello! 10
Hello! 11
Hello! 12
Hello! 13
Hello! 14
Hello! 15
Hello! 16
Hello! 17
Hello! 18
Hello! 19
Hello! 20
Hello! 21
Hello! 22
Hello! 23
Hello! 24
Hello! 25
Hello! 26
Hello! 27
Hello! 28
Hello! 29
Hello! 30
Hello! 31
Hello! 32
Hello! 33
Hello! 34
Hello! 35
Hello! 36
Hello! 37
Hello! 38
Hello! 39
Hello! 40
Hello! 41
Hello! 42
Hello! 43
Hello! 44
Hello! 45
Hello! 46
Hello! 47
Hello! 48
Hello! 49
Hello! 50
Hello! 51
Hello! 52
Hello! 53
Hello! 54
Hello! 55
Hello! 56
Hello! 57
Hello! 58
Hello! 59
Hello! 60
Hello! 61
Hello! 62
Hello! 63
Hello! 64
Hello! 65
Hello! 66
Hello! 67
Hello! 68
Hello! 69
Hello! 70
Hello! 71
Hello! 72
Hello! 73
Hello! 74
Hello! 75
Hello! 76
Hello! 77
Hello! 78
Hello! 79
Hello! 80
Hello! 81
Hello! 82
Hello! 83
Hello! 84
Hello! 85
Hello! 86
Hello! 87
Hello! 88
Hello! 89
Hello! 90
Hello! 91
Hello! 92
Hello! 93
Hello! 94
org.opentest4j.AssertionFailedError:
Expected :1000
Actual :94
<Click to see difference>
详情
我为什么要这样做?
我想从中复制 System.out 和 'read'(通过 PipedInputStream),然后将该数据发送到我网站的 http 控制台。
A PipedInputStream
默认情况下只有 1024 的缓冲区大小,因此如果您添加到 PipedOutputStream 而没有在其他线程中读取,它将阻止写入,直到缓冲区被清空。这就是它停在 Hello! XX\r\n
的第 94 行的原因。 1024 除以 11 表示 pipedInput
中存储了 93 条完整的输出行,System.out 适用于第 94 行,但在 pipedInput 上阻塞会阻止添加更多行。
最适合我的方案:
NonBlockingPipedInputStream
public class NonBlockingPipedInputStream extends PipedInputStream {
public interface WriteLineEvent<L>{
void executeOnEvent(L l);
}
/**
* Add actions to this list, which get run after a line has been written.
* Contains the line as parameter.
*/
public List<WriteLineEvent<String>> actionsOnWriteLineEvent = new CopyOnWriteArrayList<>();
/**
*
* Starts a new {@link Thread}, that reads the {@link PipedInputStream}
* and fires an event every time a full line was written.
* To listen for those events, add the action that should be run to the {@link #actionsOnWriteLineEvent} list.
*/
public NonBlockingPipedInputStream() {
new Thread(()->{
try{
BufferedReader reader = new BufferedReader(new InputStreamReader(this));
String line;
while((line = reader.readLine()) != null){
String finalLine = line;
actionsOnWriteLineEvent.forEach(action -> action.executeOnEvent(finalLine));
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
用法
@Test
void nonBlockingPipedInputStreamExample() throws IOException, InterruptedException {
NonBlockingPipedInputStream pipedInput = new NonBlockingPipedInputStream();
OutputStream pipedOutput = new PipedOutputStream(pipedInput);
MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
PrintStream out = new PrintStream(teeOutput);
final int expectedPrintedLinesCount = 1000;
AtomicInteger actualPrintedLinesCount = new AtomicInteger();
AtomicInteger actualReadLinesCount = new AtomicInteger();
Thread t1 = new Thread(() -> { // Thread for writing data to OUT
try {
for (int i = 1; i <= expectedPrintedLinesCount; i++) {
out.println("Hello! "+i);
actualPrintedLinesCount.incrementAndGet();
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// NonBlockingPipedInputStream starts a new thread when it is initialised.
// That thread reads the PipedInputStream and fires an event every time a full line was written.
pipedInput.actionsOnWriteLineEvent.add(line -> {
actualReadLinesCount.getAndIncrement();
});
t1.start();
for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
}
Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get());
Assertions.assertEquals(expectedPrintedLinesCount, actualReadLinesCount.get());
}
PipedInputStream 与 TeeOutputStream 一起冻结应用程序,当未读取时!?
我指的是这个org.apache.commons.io.output.TeeOutputStream
。为了更容易测试,我添加了它的一个简单变体作为 inner-class(MyTeeOutputStream
) 所以你不必获取依赖项。
知道为什么会发生这种情况以及如何解决它吗?
代码
我制作了一个 JUnit5 测试用例供大家试用:
@Test
void testSplittingOutput() throws IOException, InterruptedException {
PipedInputStream pipedInput = new PipedInputStream();
OutputStream pipedOutput = new PipedOutputStream(pipedInput);
//TeeOutputStream teeOutput = new TeeOutputStream(System.out, pipedOutput);
MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
PrintStream out = new PrintStream(teeOutput);
final int expectedPrintedLinesCount = 1000;
AtomicInteger actualPrintedLinesCount = new AtomicInteger();
Thread t1 = new Thread(() -> { // Thread for writing data to OUT
try {
for (int i = 0; i < expectedPrintedLinesCount; i++) {
out.println("Hello! "+i);
actualPrintedLinesCount.incrementAndGet();
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> { // Thread for reading data from IN
BufferedReader reader = new BufferedReader(new InputStreamReader(pipedInput));
StringBuilder builder = new StringBuilder();
try {
while(true){
builder.append(reader.readLine());
}
} catch (IOException e) {
//e.printStackTrace(); // ignore
System.out.println(builder);
}
});
t1.start();
//t2.start(); // If we aren't reading then PipedInputStream in Thread2, we only print 94 lines instead of 1000!?
for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
}
Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get()+1);
}
final class MyTeeOutputStream extends OutputStream {
private final OutputStream out;
private final OutputStream tee;
public MyTeeOutputStream(OutputStream out, OutputStream tee) {
if (out == null)
throw new NullPointerException();
else if (tee == null)
throw new NullPointerException();
this.out = out;
this.tee = tee;
}
@Override
public void write(int b) throws IOException {
out.write(b);
tee.write(b);
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
tee.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
tee.write(b, off, len);
}
@Override
public void flush() throws IOException {
out.flush();
tee.flush();
}
@Override
public void close() throws IOException {
try {
out.close();
} finally {
tee.close();
}
}
}
结果
如您所见,它无法打印所有行 (1000) 并在 (94) 处停止:
Hello! 0
Hello! 1
Hello! 2
Hello! 3
Hello! 4
Hello! 5
Hello! 6
Hello! 7
Hello! 8
Hello! 9
Hello! 10
Hello! 11
Hello! 12
Hello! 13
Hello! 14
Hello! 15
Hello! 16
Hello! 17
Hello! 18
Hello! 19
Hello! 20
Hello! 21
Hello! 22
Hello! 23
Hello! 24
Hello! 25
Hello! 26
Hello! 27
Hello! 28
Hello! 29
Hello! 30
Hello! 31
Hello! 32
Hello! 33
Hello! 34
Hello! 35
Hello! 36
Hello! 37
Hello! 38
Hello! 39
Hello! 40
Hello! 41
Hello! 42
Hello! 43
Hello! 44
Hello! 45
Hello! 46
Hello! 47
Hello! 48
Hello! 49
Hello! 50
Hello! 51
Hello! 52
Hello! 53
Hello! 54
Hello! 55
Hello! 56
Hello! 57
Hello! 58
Hello! 59
Hello! 60
Hello! 61
Hello! 62
Hello! 63
Hello! 64
Hello! 65
Hello! 66
Hello! 67
Hello! 68
Hello! 69
Hello! 70
Hello! 71
Hello! 72
Hello! 73
Hello! 74
Hello! 75
Hello! 76
Hello! 77
Hello! 78
Hello! 79
Hello! 80
Hello! 81
Hello! 82
Hello! 83
Hello! 84
Hello! 85
Hello! 86
Hello! 87
Hello! 88
Hello! 89
Hello! 90
Hello! 91
Hello! 92
Hello! 93
Hello! 94
org.opentest4j.AssertionFailedError:
Expected :1000
Actual :94
<Click to see difference>
详情
我为什么要这样做? 我想从中复制 System.out 和 'read'(通过 PipedInputStream),然后将该数据发送到我网站的 http 控制台。
A PipedInputStream
默认情况下只有 1024 的缓冲区大小,因此如果您添加到 PipedOutputStream 而没有在其他线程中读取,它将阻止写入,直到缓冲区被清空。这就是它停在 Hello! XX\r\n
的第 94 行的原因。 1024 除以 11 表示 pipedInput
中存储了 93 条完整的输出行,System.out 适用于第 94 行,但在 pipedInput 上阻塞会阻止添加更多行。
最适合我的方案:
NonBlockingPipedInputStream
public class NonBlockingPipedInputStream extends PipedInputStream {
public interface WriteLineEvent<L>{
void executeOnEvent(L l);
}
/**
* Add actions to this list, which get run after a line has been written.
* Contains the line as parameter.
*/
public List<WriteLineEvent<String>> actionsOnWriteLineEvent = new CopyOnWriteArrayList<>();
/**
*
* Starts a new {@link Thread}, that reads the {@link PipedInputStream}
* and fires an event every time a full line was written.
* To listen for those events, add the action that should be run to the {@link #actionsOnWriteLineEvent} list.
*/
public NonBlockingPipedInputStream() {
new Thread(()->{
try{
BufferedReader reader = new BufferedReader(new InputStreamReader(this));
String line;
while((line = reader.readLine()) != null){
String finalLine = line;
actionsOnWriteLineEvent.forEach(action -> action.executeOnEvent(finalLine));
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
用法
@Test
void nonBlockingPipedInputStreamExample() throws IOException, InterruptedException {
NonBlockingPipedInputStream pipedInput = new NonBlockingPipedInputStream();
OutputStream pipedOutput = new PipedOutputStream(pipedInput);
MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
PrintStream out = new PrintStream(teeOutput);
final int expectedPrintedLinesCount = 1000;
AtomicInteger actualPrintedLinesCount = new AtomicInteger();
AtomicInteger actualReadLinesCount = new AtomicInteger();
Thread t1 = new Thread(() -> { // Thread for writing data to OUT
try {
for (int i = 1; i <= expectedPrintedLinesCount; i++) {
out.println("Hello! "+i);
actualPrintedLinesCount.incrementAndGet();
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// NonBlockingPipedInputStream starts a new thread when it is initialised.
// That thread reads the PipedInputStream and fires an event every time a full line was written.
pipedInput.actionsOnWriteLineEvent.add(line -> {
actualReadLinesCount.getAndIncrement();
});
t1.start();
for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
}
Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get());
Assertions.assertEquals(expectedPrintedLinesCount, actualReadLinesCount.get());
}