为什么使用 ExecutionContext 会导致此代码不停止?
Why does using ExecutionContext causes this code not to halt?
问题
我有一个用两种不同方式编写的 Scala 中生产者-消费者问题的有趣实例。两种方式都使用 java.util.concurrent.ArrayBlockingQueue
作为缓冲区,但它们有以下区别:
Version 1
正常启动生产者和消费者代码 Java
线程
Version 2
启动生产者和消费者代码使用
ExecutionContext.global
执行两个版本后,可以清楚地看到Version1
从未完成执行(因为消费者的数量超过了生产者的数量,所以至少有一个消费者无限期地等待新项目被消费)。
然而,Version2
也是如此,即消费者的数量也超过了生产者的数量,但这次程序确实完成了执行 - 我的事情'我在这个问题中要求解释为什么会这样。
我的解释尝试
我要说的是,即使 Version2
中的消费者在生产者的供应“枯竭”后也一直在等待新商品,但事实上代码是在 [=18= 内启动的]表示代码在守护线程中运行,所以在主线程完成后(这里是在人为添加Thread.sleep(1000)
以延长执行时间之后),守护线程也会停止,无论它们是否完成他们的主要工作或他们一直在等待。
但是,我不确定我的解释是否解决了问题的根源,也不确定它是否切题。换句话说,我不确定我是不是在说一些微不足道的事情,而是错过了对问题的一些明显解释。您能否验证我的一般理解是否正确,并在必要时帮助我找到对两个版本代码的这种行为的正确解释?谢谢!
代码
object Version1 extends App {
class Producer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
override def run(): Unit =
for (i <- 1 to 10) {println(s"$getName produced $i"); buffer.put(i)}
}
class Consumer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
override def run(): Unit =
for (_ <- 1 to 10) println(s"$getName consumed ${buffer.take}")
}
val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
for (i <- 1 to 2) new Producer(s"Producer$i", buffer).start()
for (i <- 1 to 3) new Consumer(s"Consumer$i", buffer).start()
}
object Version2 extends App {
val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
val ec = ExecutionContext.global
for (p <- 1 to 2)
ec.execute(() => for (i <- 1 to 10) {println(s"Producer$p produced $i"); buffer.put(i)})
for (c <- 1 to 3)
ec.execute(() => for (_ <- 1 to 10) {println(s"Consumer$c consumed ${buffer.take}")})
Thread.sleep(1000)
}
scala.concurrent.ExecutionContext.global
默认创建守护进程 Thread
s,而 java.lang.Thread
默认是非守护进程。您可以通过在 Producer
和 Consumer
的构造函数中调用 setDaemon(true)
使 Version1
应用程序在主线程退出后终止 JVM
附带说明一下,在 Scala 中无需直接调用 execute
或手动创建和管理 java.lang.Thread
。相反,我们定义了隐式 ExecutionContext
,而不是创建 Runnable
,我们通过名称将任务参数传递给 scala.concurrent.Future
的 apply
工厂方法。然后 ExecutionContext
隐式传递给 Future
s,它在幕后为我们管理 Thread
s 并决定在哪个 Thread
上执行我们的操作。
问题
我有一个用两种不同方式编写的 Scala 中生产者-消费者问题的有趣实例。两种方式都使用 java.util.concurrent.ArrayBlockingQueue
作为缓冲区,但它们有以下区别:
Version 1
正常启动生产者和消费者代码 Java 线程Version 2
启动生产者和消费者代码使用ExecutionContext.global
执行两个版本后,可以清楚地看到Version1
从未完成执行(因为消费者的数量超过了生产者的数量,所以至少有一个消费者无限期地等待新项目被消费)。
然而,Version2
也是如此,即消费者的数量也超过了生产者的数量,但这次程序确实完成了执行 - 我的事情'我在这个问题中要求解释为什么会这样。
我的解释尝试
我要说的是,即使 Version2
中的消费者在生产者的供应“枯竭”后也一直在等待新商品,但事实上代码是在 [=18= 内启动的]表示代码在守护线程中运行,所以在主线程完成后(这里是在人为添加Thread.sleep(1000)
以延长执行时间之后),守护线程也会停止,无论它们是否完成他们的主要工作或他们一直在等待。
但是,我不确定我的解释是否解决了问题的根源,也不确定它是否切题。换句话说,我不确定我是不是在说一些微不足道的事情,而是错过了对问题的一些明显解释。您能否验证我的一般理解是否正确,并在必要时帮助我找到对两个版本代码的这种行为的正确解释?谢谢!
代码
object Version1 extends App {
class Producer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
override def run(): Unit =
for (i <- 1 to 10) {println(s"$getName produced $i"); buffer.put(i)}
}
class Consumer(name: String, buffer: ArrayBlockingQueue[Integer]) extends Thread(name) {
override def run(): Unit =
for (_ <- 1 to 10) println(s"$getName consumed ${buffer.take}")
}
val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
for (i <- 1 to 2) new Producer(s"Producer$i", buffer).start()
for (i <- 1 to 3) new Consumer(s"Consumer$i", buffer).start()
}
object Version2 extends App {
val buffer: ArrayBlockingQueue[Integer] = new ArrayBlockingQueue(5)
val ec = ExecutionContext.global
for (p <- 1 to 2)
ec.execute(() => for (i <- 1 to 10) {println(s"Producer$p produced $i"); buffer.put(i)})
for (c <- 1 to 3)
ec.execute(() => for (_ <- 1 to 10) {println(s"Consumer$c consumed ${buffer.take}")})
Thread.sleep(1000)
}
scala.concurrent.ExecutionContext.global
默认创建守护进程 Thread
s,而 java.lang.Thread
默认是非守护进程。您可以通过在 Producer
和 Consumer
setDaemon(true)
使 Version1
应用程序在主线程退出后终止 JVM
附带说明一下,在 Scala 中无需直接调用 execute
或手动创建和管理 java.lang.Thread
。相反,我们定义了隐式 ExecutionContext
,而不是创建 Runnable
,我们通过名称将任务参数传递给 scala.concurrent.Future
的 apply
工厂方法。然后 ExecutionContext
隐式传递给 Future
s,它在幕后为我们管理 Thread
s 并决定在哪个 Thread
上执行我们的操作。