如何在用 Scala 编写的 Actor System 中从未来获取数据?

How to get data from a future in an Actor System written in Scala?

我有一个 Actor 说 ProcessTheCommand actor,它有一个 receive 方法实现如下:

class ProcessTheCommand extends Actor {
   def receive = {
     case obj: DataObject => 
       val f1 = OneActor ? obj
       val f2 = SecondActor ? obj
       // val result= resultFromf1 + resultFromf2
       // do something using the result
   }
}

我的问题:如何从两个期货中获取数据?

一种方法是使用 await 但这不是最佳做法。建议?

问题分为两部分。第一个是如何提取两个期货并将它们加在一起,第二个是我们如何在Akka中处理这些期货的结果。

组合期货:

有几种方法。我假设期货的结果是一样的。一个将用于理解:

val firstFuture: Future[Int] = ???
val secondFuture: Future[Int] = ???
val result: Future[Int] = for {
  first <- firstFuture
  second <- secondFuture
} yield first + second

另一种选择是使用 Future.sequence:

val firstFuture: Future[Int] = ???
val secondFuture: Future[Int] = ???

val result: Future[Int] = Future
  .sequence(Seq(firstFuture, secondFuture))
  .map {
    results => results.sum
  }

另一个是 zipmap(感谢@ViktorKlang):

firstFuture
 .zip(secondFuture)
 .map { case (first, second) => first + second }

或者使用 Scala 2.12 zipWith:

val result: Future[Int] = firstFuture.zipWith(secondFuture) {
  case (first, second) => first + second
}

提取 Actor 中的值:

唯一缺少的部分是我们如何获得累积结果。 Akka 中的模式是将结果通过管道传递给您自己的 Receive,因为我们永远不想阻塞方法调用,我们真正想要的是将来在它完成后调用回调,这正是 pipeTo 会做。

我们将创建一个封装结果的自定义案例 class:

case class AccumulatedResult(result: Int)

并在Receive中添加:

import akka.pattern.pipe
override def receive: Receive = {
  case obj: DataObject => 
    val firstFuture = OneActor ? obj
    val secondFuture = SecondActor ? obj
    firstFuture
     .zip(secondFuture)
     .map { case (first, second) => AccumulatedResult(first + second) }
     .pipeTo(self)

  case AccumulatedResult(res) => println(res)
}

这样做的好处是,一旦 future 完成,消息的处理将作为 actor 的流处理逻辑的一部分继续。

receive 方法中阻塞是一个很大的 NO,所以永远不要在 receive 中使用 await,否则你会耗尽资源。所有消息处理仅在几个工作线程中发生。

要处理两个 future,您可能需要使用它们的单子属性:

val result: Future[Int] = firstFuture.flatMap { firstValue => 
   secondFuture.flatMap { secondValue => 
      firstValue + secondValue
   }
}

或通过理解同样的事情:

val result: Future[Int] = for {
  fistValue <- firstFuture
  secondValue <- secondFuture
} yield firstValue + secondValue

然而,这不是您通常想要的 actors,因为您仍然会得到一个 Future 并且需要阻塞直到它完成。您通常想要的是将值传递给调用者或下一个参与者。为此,您可以使用 pipe 模式:

import akka.pattern.pipe
val resultFuture: Future[Int] = for {
  fistValue <- firstFuture
  secondValue <- secondFuture
} yield firstValue + secondValue
resultFuture pipeTo sender() // sends result to the sender when future is completed

另请参阅:Futures in Akka