Akka 是否有一个 ExecutorCompletionService 等价物,其中 Futures 按完成时间排队?
Does Akka has a ExecutorCompletionService equivalent where Futures are queued by their completion time?
使用 java 我可以创建一个带有执行程序和一系列任务的 ExecutorCompletionService。 class 安排提交的任务在完成后放置在使用 take 可访问的队列中。
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html
Akka 有类似的东西来管理 actor 返回的 Futures 吗?
此答案仅适用于 Scala。在 scala 中有 sequence
/firstCompletedOf
到 compose futures, which returns you new future completing after all
/one
of the underlying futures isCompleted (which is equivalent to examples from CompletionService
's api docs)。这种解决方案比 ecs.take().get()
更安全,因为如果您使用 onComplete
侦听器则不会阻塞;但是,如果您仍然想要一些阻塞服务员 - 使用 Await.result
。因此,不需要 CompletionService
因为期货列表足够灵活并且更安全。第一个例子的等价物:
val solvers: List[() => Int] = ...
val futures = solvers.map(s => Future {s()}) //run execution
(Future sequence futures) onComplete { results: Seq[Int] =>
results.map(use)
}
另一个例子是cancelling the task:
val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
val (futures, cancels): solvers.map(cancellableFuture) //see
(Future firstCompletedOf futures) onComplete { result: Int =>
cancels.foreach(_())
use(result)
}
谈到 Java,Akka 对 scala 的未来进行了改编:http://doc.akka.io/docs/akka/snapshot/java/futures.html
如果您只想在完成时按顺序处理结果,您可以使用 actor:
val futures: List[Future]
futures.map(_ pipeTo actor) //actor's mailbox is used as queue
为完成队列的行为建模(不推荐):
import scala.concurrent._
import duration._
import scala.concurrent.ExecutionContext.Implicits.global //some execution context
class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
case class Result(f: Future[Result], r: T)
var futures: Set[Future[Result]] = solvers map {s =>
lazy val f: Future[Result] = Future{Result(f, s())}
f
} toSet
def hasNext() = futures.nonEmpty
def next() = {
val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
futures -= result.f
result.r
}
}
scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
q: Queue[Int] = non-empty iterator
scala> q.next
res14: Int = 2
scala> q.next
res15: Int = 1
scala> q.foreach(println)
4
3
也许这个不使用 ExecutorCompletionService 的可能解决方案会对您有所帮助:
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent._
import scala.concurrent.duration._
import scala.util._
import scala.concurrent.{ExecutionContextExecutorService, ExecutionContext, Future}
class BatchedIteratorsFactory[S,R](M: Int, timeout: Duration) {
implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
val throttlingQueue = new LinkedBlockingQueue[Future[R]](M) // Can't put more than M elements to the queue
val resultQueue = new LinkedBlockingQueue[Try[R]](M)
val jobCounter = new AtomicLong(0)
def iterator(input: Iterator[S])(job: S => R): Iterator[Try[R]] = {
val totalWork = Future(input.foreach { elem =>
jobCounter.incrementAndGet
throttlingQueue.put(Future { job(elem) } andThen {
case r => resultQueue.put(r); throttlingQueue.poll() // the order is important here!
})
})
new Iterator[Try[R]] {
override def hasNext: Boolean = jobCounter.get != 0 || input.hasNext
override def next(): Try[R] = {
jobCounter.decrementAndGet
Option(resultQueue.poll(timeout.toMillis, TimeUnit.MILLISECONDS)).getOrElse(
throw new TimeoutException(s"No task has been completed within ${timeout.toMillis} ms!")
)
}
}
}
}
所以你可以这样使用它:
val job = { (elem: Int) =>
val result = elem * elem
Thread.sleep(1000L) // some possibel computation...
result
}
val src = Range(1, 16).toIterator
val it = new BatchedIteratorsFactory[Int, Int](M = 3, timeout = 4 seconds)
.iterator(src)(job)
使用 java 我可以创建一个带有执行程序和一系列任务的 ExecutorCompletionService。 class 安排提交的任务在完成后放置在使用 take 可访问的队列中。 https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Akka 有类似的东西来管理 actor 返回的 Futures 吗?
此答案仅适用于 Scala。在 scala 中有 sequence
/firstCompletedOf
到 compose futures, which returns you new future completing after all
/one
of the underlying futures isCompleted (which is equivalent to examples from CompletionService
's api docs)。这种解决方案比 ecs.take().get()
更安全,因为如果您使用 onComplete
侦听器则不会阻塞;但是,如果您仍然想要一些阻塞服务员 - 使用 Await.result
。因此,不需要 CompletionService
因为期货列表足够灵活并且更安全。第一个例子的等价物:
val solvers: List[() => Int] = ...
val futures = solvers.map(s => Future {s()}) //run execution
(Future sequence futures) onComplete { results: Seq[Int] =>
results.map(use)
}
另一个例子是cancelling the task:
val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
val (futures, cancels): solvers.map(cancellableFuture) //see
(Future firstCompletedOf futures) onComplete { result: Int =>
cancels.foreach(_())
use(result)
}
谈到 Java,Akka 对 scala 的未来进行了改编:http://doc.akka.io/docs/akka/snapshot/java/futures.html
如果您只想在完成时按顺序处理结果,您可以使用 actor:
val futures: List[Future]
futures.map(_ pipeTo actor) //actor's mailbox is used as queue
为完成队列的行为建模(不推荐):
import scala.concurrent._
import duration._
import scala.concurrent.ExecutionContext.Implicits.global //some execution context
class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
case class Result(f: Future[Result], r: T)
var futures: Set[Future[Result]] = solvers map {s =>
lazy val f: Future[Result] = Future{Result(f, s())}
f
} toSet
def hasNext() = futures.nonEmpty
def next() = {
val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
futures -= result.f
result.r
}
}
scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
q: Queue[Int] = non-empty iterator
scala> q.next
res14: Int = 2
scala> q.next
res15: Int = 1
scala> q.foreach(println)
4
3
也许这个不使用 ExecutorCompletionService 的可能解决方案会对您有所帮助:
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent._
import scala.concurrent.duration._
import scala.util._
import scala.concurrent.{ExecutionContextExecutorService, ExecutionContext, Future}
class BatchedIteratorsFactory[S,R](M: Int, timeout: Duration) {
implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
val throttlingQueue = new LinkedBlockingQueue[Future[R]](M) // Can't put more than M elements to the queue
val resultQueue = new LinkedBlockingQueue[Try[R]](M)
val jobCounter = new AtomicLong(0)
def iterator(input: Iterator[S])(job: S => R): Iterator[Try[R]] = {
val totalWork = Future(input.foreach { elem =>
jobCounter.incrementAndGet
throttlingQueue.put(Future { job(elem) } andThen {
case r => resultQueue.put(r); throttlingQueue.poll() // the order is important here!
})
})
new Iterator[Try[R]] {
override def hasNext: Boolean = jobCounter.get != 0 || input.hasNext
override def next(): Try[R] = {
jobCounter.decrementAndGet
Option(resultQueue.poll(timeout.toMillis, TimeUnit.MILLISECONDS)).getOrElse(
throw new TimeoutException(s"No task has been completed within ${timeout.toMillis} ms!")
)
}
}
}
}
所以你可以这样使用它:
val job = { (elem: Int) =>
val result = elem * elem
Thread.sleep(1000L) // some possibel computation...
result
}
val src = Range(1, 16).toIterator
val it = new BatchedIteratorsFactory[Int, Int](M = 3, timeout = 4 seconds)
.iterator(src)(job)