使用 onComplete 时限制 Scala Future 块
Throttling Scala Future blocks when onComplete is used
我正在尝试使用 Scala Futures 生成许多 CPU 密集型作业。因为有太多,我需要限制这些作业(线程)的创建。为此,我使用:
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._
val numThread = sys.runtime.availableProcessors
import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue
implicit val context = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(
numThread, numThread,
0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[ Runnable ]( numThread ) {
override def offer( e: Runnable ) = {
put( e ); // Waiting for empty room
true
}
})
)
为了对此进行测试,我创建了 2 个非常简单的函数:
import scala.util.{ Try, Success, Failure }
import scala.util.Random
def longComputation() = {
val id = Thread.currentThread().getId
//blocking {
println( s"Started thread: $id" )
Thread.sleep( 500 )
println( s"Finished thread: $id" )
//}
id
}
def processResult[T](r : Try[T]) = {
blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
}
}
然后我执行测试以通过多线程执行任务:
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
//s.foreach { x => println(x) ; val f = Future( longComputation ) ; f.onComplete{ processResult } }
s.foreach { x =>
println(x)
val f = Future( longComputation )
val p = Promise[Long]()
p completeWith f
p.future.onComplete{ processResult }
}
println("Finished")
context.shutdown
}
当我执行此操作时,启动了 16 个线程(CPU 计数为 8)。该程序打印“完成”消息。然后系统锁定并且不执行任何其他操作。但是,如果我删除了回调,那么线程将按预期执行 ad infinitum。
以上我已经用 blocking
和 Promise
进行了试验。行为没有变化。所以我的问题是:如何在不阻塞回调的情况下限制任务执行?如果这不可能,在线程中执行 I/O 是否可行(未来)?
感谢任何指点。
程序 运行 处于死锁状态。提供的 threadPool
是固定大小的,因此会发生以下情况:
Future(longComputation)
从线程池中分配一个线程并开始工作。完成后,onComplete
从池中分配一个 Thread
来执行提供的函数。
鉴于做工作比完成工作需要更长的时间,在某些时候,所有线程都在忙于工作。它们中的任何一个完成并且 onComplete
也需要一个线程,因此它向执行者请求一个。工作无法完成,因为所有线程都忙并且机器死锁停止。
我们可以通过将预留资源提供给消费者来解决这个生产者-消费者僵局。这样,固定大小的线程池会限制工作,但我们确保可以进一步处理任何已完成的工作。
我已将 context
重命名为 fixedContext
的此代码段显示了使用单独的上下文来处理结果,从而解决了死锁。我也去掉了Promise
,它除了代理未来之外并没有发挥真正的作用。
val fixedContext = // same as in question
val singleThreadContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
...
...
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
s.foreach { x =>
println(x)
val f = Future( longComputation )(fixedContext)
f.onComplete{ processResult }(singleThreadContext)
}
println("Finished")
fixedContext.shutdown
}
}
当线程完成 longComputation
时,它会尝试将作业放入队列以执行回调,但由于队列已满而被阻塞。因此,最终,第一个 "batch" 作业完成,但所有线程仍处于忙碌状态,等待队列以安排回调,并且没有任何可用的队列。
解决方案?从队列中删除限制。这样,尝试提交回调的线程就不会被阻塞,并且可以开始执行下一个任务。
您可能希望在您的生产者循环中插入一些东西来减慢它的速度,这样您的无限队列就不会耗尽所有内存。也许 Semaphore
?
val sem = new Semaphore(numThread*2)
def processResult[T](r : Try[T]) = blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
sem.release
}
Stream.from(0).foreach { _ =>
sem.acquire
new Future(longComputation).onComplete(processResult)
}
你不需要你的自定义执行上下文 - Scala 的默认值实际上会更好地完成你想做的事情
我正在尝试使用 Scala Futures 生成许多 CPU 密集型作业。因为有太多,我需要限制这些作业(线程)的创建。为此,我使用:
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._
val numThread = sys.runtime.availableProcessors
import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue
implicit val context = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(
numThread, numThread,
0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[ Runnable ]( numThread ) {
override def offer( e: Runnable ) = {
put( e ); // Waiting for empty room
true
}
})
)
为了对此进行测试,我创建了 2 个非常简单的函数:
import scala.util.{ Try, Success, Failure }
import scala.util.Random
def longComputation() = {
val id = Thread.currentThread().getId
//blocking {
println( s"Started thread: $id" )
Thread.sleep( 500 )
println( s"Finished thread: $id" )
//}
id
}
def processResult[T](r : Try[T]) = {
blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
}
}
然后我执行测试以通过多线程执行任务:
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
//s.foreach { x => println(x) ; val f = Future( longComputation ) ; f.onComplete{ processResult } }
s.foreach { x =>
println(x)
val f = Future( longComputation )
val p = Promise[Long]()
p completeWith f
p.future.onComplete{ processResult }
}
println("Finished")
context.shutdown
}
当我执行此操作时,启动了 16 个线程(CPU 计数为 8)。该程序打印“完成”消息。然后系统锁定并且不执行任何其他操作。但是,如果我删除了回调,那么线程将按预期执行 ad infinitum。
以上我已经用 blocking
和 Promise
进行了试验。行为没有变化。所以我的问题是:如何在不阻塞回调的情况下限制任务执行?如果这不可能,在线程中执行 I/O 是否可行(未来)?
感谢任何指点。
程序 运行 处于死锁状态。提供的 threadPool
是固定大小的,因此会发生以下情况:
Future(longComputation)
从线程池中分配一个线程并开始工作。完成后,onComplete
从池中分配一个 Thread
来执行提供的函数。
鉴于做工作比完成工作需要更长的时间,在某些时候,所有线程都在忙于工作。它们中的任何一个完成并且 onComplete
也需要一个线程,因此它向执行者请求一个。工作无法完成,因为所有线程都忙并且机器死锁停止。
我们可以通过将预留资源提供给消费者来解决这个生产者-消费者僵局。这样,固定大小的线程池会限制工作,但我们确保可以进一步处理任何已完成的工作。
我已将 context
重命名为 fixedContext
的此代码段显示了使用单独的上下文来处理结果,从而解决了死锁。我也去掉了Promise
,它除了代理未来之外并没有发挥真正的作用。
val fixedContext = // same as in question
val singleThreadContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
...
...
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
s.foreach { x =>
println(x)
val f = Future( longComputation )(fixedContext)
f.onComplete{ processResult }(singleThreadContext)
}
println("Finished")
fixedContext.shutdown
}
}
当线程完成 longComputation
时,它会尝试将作业放入队列以执行回调,但由于队列已满而被阻塞。因此,最终,第一个 "batch" 作业完成,但所有线程仍处于忙碌状态,等待队列以安排回调,并且没有任何可用的队列。
解决方案?从队列中删除限制。这样,尝试提交回调的线程就不会被阻塞,并且可以开始执行下一个任务。
您可能希望在您的生产者循环中插入一些东西来减慢它的速度,这样您的无限队列就不会耗尽所有内存。也许 Semaphore
?
val sem = new Semaphore(numThread*2)
def processResult[T](r : Try[T]) = blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
sem.release
}
Stream.from(0).foreach { _ =>
sem.acquire
new Future(longComputation).onComplete(processResult)
}
你不需要你的自定义执行上下文 - Scala 的默认值实际上会更好地完成你想做的事情