Akka 是否有一个 ExecutorCompletionService 等价物,其中 Futures 按完成时间排队?

Does Akka has a ExecutorCompletionService equivalent where Futures are queued by their completion time?

使用 java 我可以创建一个带有执行程序和一系列任务的 ExecutorCompletionService。 class 安排提交的任务在完成后放置在使用 take 可访问的队列中。 https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Akka 有类似的东西来管理 actor 返回的 Futures 吗?

此答案仅适用于 Scala。在 scala 中有 sequence/firstCompletedOfcompose futures, which returns you new future completing after all/one of the underlying futures isCompleted (which is equivalent to examples from CompletionService's api docs)。这种解决方案比 ecs.take().get() 更安全,因为如果您使用 onComplete 侦听器则不会阻塞;但是,如果您仍然想要一些阻塞服务员 - 使用 Await.result。因此,不需要 CompletionService 因为期货列表足够灵活并且更安全。第一个例子的等价物:

  val solvers: List[() => Int] = ...
  val futures = solvers.map(s => Future {s()}) //run execution
  (Future sequence futures) onComplete { results: Seq[Int] =>
      results.map(use)
  }

另一个例子是cancelling the task:

  val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
  val (futures, cancels): solvers.map(cancellableFuture) //see 

  (Future firstCompletedOf futures) onComplete { result: Int =>
        cancels.foreach(_())
        use(result)
  }

谈到 Java,Akka 对 scala 的未来进行了改编:http://doc.akka.io/docs/akka/snapshot/java/futures.html

如果您只想在完成时按顺序处理结果,您可以使用 actor:

  val futures: List[Future]
  futures.map(_ pipeTo actor) //actor's mailbox is used as queue

为完成队列的行为建模(不推荐):

  import scala.concurrent._
  import duration._
  import scala.concurrent.ExecutionContext.Implicits.global //some execution context

  class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
     case class Result(f: Future[Result], r: T)
     var futures: Set[Future[Result]] = solvers map {s => 
        lazy val f: Future[Result]  = Future{Result(f, s())}
        f
     } toSet

     def hasNext() = futures.nonEmpty        

     def next() = {         
        val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
        futures -= result.f
        result.r
     }
  }

scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
q: Queue[Int] = non-empty iterator

scala> q.next
res14: Int = 2

scala> q.next
res15: Int = 1

scala> q.foreach(println)
4
3

也许这个不使用 ExecutorCompletionService 的可能解决方案会对您有所帮助:

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent._
import scala.concurrent.duration._
import scala.util._
import scala.concurrent.{ExecutionContextExecutorService, ExecutionContext, Future}

class BatchedIteratorsFactory[S,R](M: Int, timeout: Duration) {
  implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

  val throttlingQueue = new LinkedBlockingQueue[Future[R]](M) // Can't put more than M elements to the queue
  val resultQueue = new LinkedBlockingQueue[Try[R]](M)
  val jobCounter = new AtomicLong(0)

  def iterator(input: Iterator[S])(job: S => R): Iterator[Try[R]] = {
    val totalWork = Future(input.foreach { elem =>
      jobCounter.incrementAndGet
      throttlingQueue.put(Future { job(elem) } andThen {
        case r => resultQueue.put(r); throttlingQueue.poll() // the order is important here!
      })
    })

    new Iterator[Try[R]] {
      override def hasNext: Boolean = jobCounter.get != 0 || input.hasNext
      override def next(): Try[R] = {
        jobCounter.decrementAndGet
        Option(resultQueue.poll(timeout.toMillis, TimeUnit.MILLISECONDS)).getOrElse(
          throw new TimeoutException(s"No task has been completed within ${timeout.toMillis} ms!")
        )
      }
    }
  }
}

所以你可以这样使用它:

val job = { (elem: Int) =>
  val result = elem * elem
  Thread.sleep(1000L) // some possibel computation...
  result
}

val src = Range(1, 16).toIterator

val it = new BatchedIteratorsFactory[Int, Int](M = 3, timeout = 4 seconds)
  .iterator(src)(job)