使用 onComplete 时限制 Scala Future 块

Throttling Scala Future blocks when onComplete is used

我正在尝试使用 Scala Futures 生成许多 CPU 密集型作业。因为有太多,我需要限制这些作业(线程)的创建。为此,我使用:

import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._

val numThread = sys.runtime.availableProcessors

import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue

implicit val context = ExecutionContext.fromExecutorService(
    new ThreadPoolExecutor(
      numThread, numThread,
      0L, TimeUnit.SECONDS,
      new ArrayBlockingQueue[ Runnable ]( numThread ) {
        override def offer( e: Runnable ) = {
          put( e ); // Waiting for empty room
          true
        }
      })
     )

为了对此进行测试,我创建了 2 个非常简单的函数:

import scala.util.{ Try, Success, Failure }
import scala.util.Random

def longComputation() = {
  val id = Thread.currentThread().getId
  //blocking {
    println( s"Started thread: $id" )
    Thread.sleep( 500 )
    println( s"Finished thread: $id" )
  //}
  id
}

def processResult[T](r : Try[T]) = {
  blocking {
      r match {
        case Success( id ) => println( s"Thread result: $id" )
        case Failure( t )  => println( "An error has occured: " + t.getMessage )
       }
  }

}

然后我执行测试以通过多线程执行任务:

def main( args: Array[ String ] ) {


   val s = Stream.from( 0 )
   //s.foreach { x => println(x) ;  val f = Future( longComputation ) ; f.onComplete{ processResult } }

   s.foreach { x => 
     println(x) 
     val f = Future( longComputation )  
     val p = Promise[Long]()
     p completeWith f
     p.future.onComplete{ processResult } 
   }

   println("Finished")
   context.shutdown
 } 

当我执行此操作时,启动了 16 个线程(CPU 计数为 8)。该程序打印“完成”消息。然后系统锁定并且不执行任何其他操作。但是,如果我删除了回调,那么线程将按预期执行 ad infinitum

以上我已经用 blockingPromise 进行了试验。行为没有变化。所以我的问题是:如何在不阻塞回调的情况下限制任务执行?如果这不可能,在线程中执行 I/O 是否可行(未来)?

感谢任何指点。

程序 运行 处于死锁状态。提供的 threadPool 是固定大小的,因此会发生以下情况: Future(longComputation) 从线程池中分配一个线程并开始工作。完成后,onComplete 从池中分配一个 Thread 来执行提供的函数。

鉴于做工作比完成工作需要更长的时间,在某些时候,所有线程都在忙于工作。它们中的任何一个完成并且 onComplete 也需要一个线程,因此它向执行者请求一个。工作无法完成,因为所有线程都忙并且机器死锁停止。

我们可以通过将预留资源提供给消费者来解决这个生产者-消费者僵局。这样,固定大小的线程池会限制工作,但我们确保可以进一步处理任何已完成的工作。

我已将 context 重命名为 fixedContext 的此代码段显示了使用单独的上下文来处理结果,从而解决了死锁。我也去掉了Promise,它除了代理未来之外并没有发挥真正的作用。

val fixedContext = // same as in question
val singleThreadContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
...
...
def main( args: Array[ String ] ) {

   val s = Stream.from( 0 )

   s.foreach { x => 
     println(x)
     val f = Future( longComputation )(fixedContext)  
     f.onComplete{ processResult }(singleThreadContext)
   }

   println("Finished")
   fixedContext.shutdown
 } 
}

当线程完成 longComputation 时,它会尝试将作业放入队列以执行回调,但由于队列已满而被阻塞。因此,最终,第一个 "batch" 作业完成,但所有线程仍处于忙碌状态,等待队列以安排回调,并且没有任何可用的队列。

解决方案?从队列中删除限制。这样,尝试提交回调的线程就不会被阻塞,并且可以开始执行下一个任务。

您可能希望在您的生产者循环中插入一些东西来减慢它的速度,这样您的无限队列就不会耗尽所有内存。也许 Semaphore

val sem = new Semaphore(numThread*2)
def processResult[T](r : Try[T]) = blocking {
  r match {
    case Success( id ) => println( s"Thread result: $id" )
    case Failure( t )  => println( "An error has occured: " + t.getMessage )
  }
  sem.release
}

Stream.from(0).foreach { _ => 
  sem.acquire
  new Future(longComputation).onComplete(processResult)
}

你不需要你的自定义执行上下文 - Scala 的默认值实际上会更好地完成你想做的事情