Java: 两个通过流通信的线程是公司,三个是一群人
Java: Two threads communicating via streams is company, three's a crowd
在此代码段中,我创建了一个管道,并在一端附加了一个 Scanner,在另一端附加了一个 PrintStream,以便在多个消费者和生产者线程之间进行通信。然后我创建并启动三个线程:
第一个线程是消费者线程。它检查扫描仪以查看是否有一行文本可供使用,使用它,打印到标准输出,然后休眠几毫秒,然后重复。如果没有什么可消费的,那么它会打印一条消息,休眠并重复。
此代码段中的第二个线程不执行任何操作。更多内容见下文。
2.5 在第 3 个线程启动之前有 3 秒的延迟。
- 第三个线程是生产者,只为第一个线程生成文本消息以供使用。它产生一条消息,睡眠
public static void main(String[] args) throws IOException
{
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
Scanner scan = new Scanner(pis);
PrintStream ps = new PrintStream(pos);
new Thread()
{
public void run()
{
int x = 0;
while (true)
{
x++;
if (scan.hasNextLine())
{
System.out.println("pulled: " + scan.nextLine());
} else
{
if (x % 100 == 0)
{
System.out.println("no data to pull");
}
}
try
{
sleep(10);
} catch (InterruptedException ex) { }
}
}
}.start();
new Thread()
{
public void run()
{
}
}.start();
try
{
sleep(3000);
} catch (InterruptedException ex) { }
new Thread()
{
public void run()
{
int x = 0;
while (true)
{
x++;
ps.println("hello: " + x);
try
{
sleep(1000);
} catch (InterruptedException ex) {}
}
}
}.start();
}
输出(如我所料):
pulled: hello: 1
pulled: hello: 2
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6
另请注意 scan.nextLine() 正在阻塞(因为没有消息表明没有数据可用...数据始终是 "available",即使它是 "on the way") .
现在,如果我用一些代码替换第二个线程的主体,这些代码会生成一些文本供第一个线程使用:
new Thread()
{
public void run()
{
ps.println( "Interfere");
}
}.start();
然后开始触发第一个线程的无数据子句:
pulled: Interfere
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
因此,如果第二个线程开始使用 PrintStream 对象来生成消息,管道中就会出现问题,使用者线程将无法在另一端找到消息。
现在事情变得更奇怪了。如果我阻止第二个线程完成,比如说将它放入一个很长的循环中,那么管道就不会堵塞:
new Thread()
{
public void run()
{
ps.println("interfere");
for ( long i = 0; i < 10000000000L; i++ );
System.out.println("done interfering" );
}
}.start();
输出:
pulled: interfere
pulled: hello: 1
pulled: hello: 2
done interfering
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6
所以我认为如果第二个线程在第三个线程开始生成之前终止,那么第一个线程将永远不会从第三个线程获得任何消息。但是,如果第二个线程设法挂起直到第三个线程开始生成,那么一切都会按预期进行。
这是怎么回事?第二个线程在终止时是否关闭 pipe/stream(或对 pipe/stream 执行其他操作)?如果是这样,为什么?如果第三个线程在第二个线程终止之前开始使用 pipe/stream,为什么它似乎没有关闭(或执行任何操作)pipe/stream?当第二个线程产生消息并在第三个线程启动之前终止时,有没有办法使此代码按预期方式 "work" (即第一个线程消耗 either/both 生产者线程产生的任何内容)?
背景:
这是对系统基本组件的浓缩,其中多个客户端将使用来自单个生产者线程的消息。但是,在所有客户端线程都发出准备就绪的信号之前,生产者线程无法启动。对于每个客户端线程,都有另一个线程查询它们是否准备就绪。一旦所有客户端线程都发出信号表明它们已准备就绪,生产者线程就会启动。我正在尝试让线程通过流进行通信,以便稍后我可以将它们分布在多台计算机上,并使用对底层代码进行最少更改的套接字设置管道。也可以在这里随意提出替代解决方案策略,但我想了解为什么上述解决方案不起作用。
涉及的流对象不是线程安全的,因此在没有同步的情况下从不同线程访问它们时的行为是不可预测的。我猜想在这种特殊情况下,行为的差异与实际刷新到主内存的内容有关,但这只是一个猜测。
要获得可预测的行为,您需要在线程之间进行适当的同步。
您的 Scanner
实例在其 readInput
方法中遇到异常,该方法将其 sourceClosed
字段设置为 true
并阻止您读取。如果您对实际发生的地点感兴趣:
private void readInput() {
...
int n = 0;
try {
n = source.read(buf);
} catch (IOException ioe) {
lastException = ioe;
n = -1;
}
if (n == -1) {
sourceClosed = true;
needInput = false;
}
...
}
此行为错误,您需要修复底层异常。这里的问题是java.io.IOException: Write end dead
。有很多答案和博客文章可以帮助您比我更好地解决这个问题。另请查看相关的 "Read end dead" 问题。查看:
- https://techtavern.wordpress.com/2008/07/16/whats-this-ioexception-write-end-dead/
- IO Exception - read end dead - what causes it in this example and how to fix it - multithread application in Java
- Write end dead exception using PipedInputStream java
在此代码段中,我创建了一个管道,并在一端附加了一个 Scanner,在另一端附加了一个 PrintStream,以便在多个消费者和生产者线程之间进行通信。然后我创建并启动三个线程:
第一个线程是消费者线程。它检查扫描仪以查看是否有一行文本可供使用,使用它,打印到标准输出,然后休眠几毫秒,然后重复。如果没有什么可消费的,那么它会打印一条消息,休眠并重复。
此代码段中的第二个线程不执行任何操作。更多内容见下文。
2.5 在第 3 个线程启动之前有 3 秒的延迟。
- 第三个线程是生产者,只为第一个线程生成文本消息以供使用。它产生一条消息,睡眠
public static void main(String[] args) throws IOException
{
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
Scanner scan = new Scanner(pis);
PrintStream ps = new PrintStream(pos);
new Thread()
{
public void run()
{
int x = 0;
while (true)
{
x++;
if (scan.hasNextLine())
{
System.out.println("pulled: " + scan.nextLine());
} else
{
if (x % 100 == 0)
{
System.out.println("no data to pull");
}
}
try
{
sleep(10);
} catch (InterruptedException ex) { }
}
}
}.start();
new Thread()
{
public void run()
{
}
}.start();
try
{
sleep(3000);
} catch (InterruptedException ex) { }
new Thread()
{
public void run()
{
int x = 0;
while (true)
{
x++;
ps.println("hello: " + x);
try
{
sleep(1000);
} catch (InterruptedException ex) {}
}
}
}.start();
}
输出(如我所料):
pulled: hello: 1
pulled: hello: 2
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6
另请注意 scan.nextLine() 正在阻塞(因为没有消息表明没有数据可用...数据始终是 "available",即使它是 "on the way") .
现在,如果我用一些代码替换第二个线程的主体,这些代码会生成一些文本供第一个线程使用:
new Thread()
{
public void run()
{
ps.println( "Interfere");
}
}.start();
然后开始触发第一个线程的无数据子句:
pulled: Interfere
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
因此,如果第二个线程开始使用 PrintStream 对象来生成消息,管道中就会出现问题,使用者线程将无法在另一端找到消息。
现在事情变得更奇怪了。如果我阻止第二个线程完成,比如说将它放入一个很长的循环中,那么管道就不会堵塞:
new Thread()
{
public void run()
{
ps.println("interfere");
for ( long i = 0; i < 10000000000L; i++ );
System.out.println("done interfering" );
}
}.start();
输出:
pulled: interfere
pulled: hello: 1
pulled: hello: 2
done interfering
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6
所以我认为如果第二个线程在第三个线程开始生成之前终止,那么第一个线程将永远不会从第三个线程获得任何消息。但是,如果第二个线程设法挂起直到第三个线程开始生成,那么一切都会按预期进行。
这是怎么回事?第二个线程在终止时是否关闭 pipe/stream(或对 pipe/stream 执行其他操作)?如果是这样,为什么?如果第三个线程在第二个线程终止之前开始使用 pipe/stream,为什么它似乎没有关闭(或执行任何操作)pipe/stream?当第二个线程产生消息并在第三个线程启动之前终止时,有没有办法使此代码按预期方式 "work" (即第一个线程消耗 either/both 生产者线程产生的任何内容)?
背景: 这是对系统基本组件的浓缩,其中多个客户端将使用来自单个生产者线程的消息。但是,在所有客户端线程都发出准备就绪的信号之前,生产者线程无法启动。对于每个客户端线程,都有另一个线程查询它们是否准备就绪。一旦所有客户端线程都发出信号表明它们已准备就绪,生产者线程就会启动。我正在尝试让线程通过流进行通信,以便稍后我可以将它们分布在多台计算机上,并使用对底层代码进行最少更改的套接字设置管道。也可以在这里随意提出替代解决方案策略,但我想了解为什么上述解决方案不起作用。
涉及的流对象不是线程安全的,因此在没有同步的情况下从不同线程访问它们时的行为是不可预测的。我猜想在这种特殊情况下,行为的差异与实际刷新到主内存的内容有关,但这只是一个猜测。
要获得可预测的行为,您需要在线程之间进行适当的同步。
您的 Scanner
实例在其 readInput
方法中遇到异常,该方法将其 sourceClosed
字段设置为 true
并阻止您读取。如果您对实际发生的地点感兴趣:
private void readInput() {
...
int n = 0;
try {
n = source.read(buf);
} catch (IOException ioe) {
lastException = ioe;
n = -1;
}
if (n == -1) {
sourceClosed = true;
needInput = false;
}
...
}
此行为错误,您需要修复底层异常。这里的问题是java.io.IOException: Write end dead
。有很多答案和博客文章可以帮助您比我更好地解决这个问题。另请查看相关的 "Read end dead" 问题。查看:
- https://techtavern.wordpress.com/2008/07/16/whats-this-ioexception-write-end-dead/
- IO Exception - read end dead - what causes it in this example and how to fix it - multithread application in Java
- Write end dead exception using PipedInputStream java