如何在用 Scala 编写的 Actor System 中从未来获取数据?
How to get data from a future in an Actor System written in Scala?
我有一个 Actor 说 ProcessTheCommand actor,它有一个 receive 方法实现如下:
class ProcessTheCommand extends Actor {
def receive = {
case obj: DataObject =>
val f1 = OneActor ? obj
val f2 = SecondActor ? obj
// val result= resultFromf1 + resultFromf2
// do something using the result
}
}
我的问题:如何从两个期货中获取数据?
一种方法是使用 await
但这不是最佳做法。建议?
问题分为两部分。第一个是如何提取两个期货并将它们加在一起,第二个是我们如何在Akka中处理这些期货的结果。
组合期货:
有几种方法。我假设期货的结果是一样的。一个将用于理解:
val firstFuture: Future[Int] = ???
val secondFuture: Future[Int] = ???
val result: Future[Int] = for {
first <- firstFuture
second <- secondFuture
} yield first + second
另一种选择是使用 Future.sequence
:
val firstFuture: Future[Int] = ???
val secondFuture: Future[Int] = ???
val result: Future[Int] = Future
.sequence(Seq(firstFuture, secondFuture))
.map {
results => results.sum
}
另一个是 zip
和 map
(感谢@ViktorKlang):
firstFuture
.zip(secondFuture)
.map { case (first, second) => first + second }
或者使用 Scala 2.12 zipWith
:
val result: Future[Int] = firstFuture.zipWith(secondFuture) {
case (first, second) => first + second
}
提取 Actor 中的值:
唯一缺少的部分是我们如何获得累积结果。 Akka 中的模式是将结果通过管道传递给您自己的 Receive
,因为我们永远不想阻塞方法调用,我们真正想要的是将来在它完成后调用回调,这正是 pipeTo
会做。
我们将创建一个封装结果的自定义案例 class:
case class AccumulatedResult(result: Int)
并在Receive
中添加:
import akka.pattern.pipe
override def receive: Receive = {
case obj: DataObject =>
val firstFuture = OneActor ? obj
val secondFuture = SecondActor ? obj
firstFuture
.zip(secondFuture)
.map { case (first, second) => AccumulatedResult(first + second) }
.pipeTo(self)
case AccumulatedResult(res) => println(res)
}
这样做的好处是,一旦 future 完成,消息的处理将作为 actor 的流处理逻辑的一部分继续。
在 receive
方法中阻塞是一个很大的 NO,所以永远不要在 receive
中使用 await,否则你会耗尽资源。所有消息处理仅在几个工作线程中发生。
要处理两个 future,您可能需要使用它们的单子属性:
val result: Future[Int] = firstFuture.flatMap { firstValue =>
secondFuture.flatMap { secondValue =>
firstValue + secondValue
}
}
或通过理解同样的事情:
val result: Future[Int] = for {
fistValue <- firstFuture
secondValue <- secondFuture
} yield firstValue + secondValue
然而,这不是您通常想要的 actors,因为您仍然会得到一个 Future 并且需要阻塞直到它完成。您通常想要的是将值传递给调用者或下一个参与者。为此,您可以使用 pipe
模式:
import akka.pattern.pipe
val resultFuture: Future[Int] = for {
fistValue <- firstFuture
secondValue <- secondFuture
} yield firstValue + secondValue
resultFuture pipeTo sender() // sends result to the sender when future is completed
另请参阅:Futures in Akka
我有一个 Actor 说 ProcessTheCommand actor,它有一个 receive 方法实现如下:
class ProcessTheCommand extends Actor {
def receive = {
case obj: DataObject =>
val f1 = OneActor ? obj
val f2 = SecondActor ? obj
// val result= resultFromf1 + resultFromf2
// do something using the result
}
}
我的问题:如何从两个期货中获取数据?
一种方法是使用 await
但这不是最佳做法。建议?
问题分为两部分。第一个是如何提取两个期货并将它们加在一起,第二个是我们如何在Akka中处理这些期货的结果。
组合期货:
有几种方法。我假设期货的结果是一样的。一个将用于理解:
val firstFuture: Future[Int] = ???
val secondFuture: Future[Int] = ???
val result: Future[Int] = for {
first <- firstFuture
second <- secondFuture
} yield first + second
另一种选择是使用 Future.sequence
:
val firstFuture: Future[Int] = ???
val secondFuture: Future[Int] = ???
val result: Future[Int] = Future
.sequence(Seq(firstFuture, secondFuture))
.map {
results => results.sum
}
另一个是 zip
和 map
(感谢@ViktorKlang):
firstFuture
.zip(secondFuture)
.map { case (first, second) => first + second }
或者使用 Scala 2.12 zipWith
:
val result: Future[Int] = firstFuture.zipWith(secondFuture) {
case (first, second) => first + second
}
提取 Actor 中的值:
唯一缺少的部分是我们如何获得累积结果。 Akka 中的模式是将结果通过管道传递给您自己的 Receive
,因为我们永远不想阻塞方法调用,我们真正想要的是将来在它完成后调用回调,这正是 pipeTo
会做。
我们将创建一个封装结果的自定义案例 class:
case class AccumulatedResult(result: Int)
并在Receive
中添加:
import akka.pattern.pipe
override def receive: Receive = {
case obj: DataObject =>
val firstFuture = OneActor ? obj
val secondFuture = SecondActor ? obj
firstFuture
.zip(secondFuture)
.map { case (first, second) => AccumulatedResult(first + second) }
.pipeTo(self)
case AccumulatedResult(res) => println(res)
}
这样做的好处是,一旦 future 完成,消息的处理将作为 actor 的流处理逻辑的一部分继续。
在 receive
方法中阻塞是一个很大的 NO,所以永远不要在 receive
中使用 await,否则你会耗尽资源。所有消息处理仅在几个工作线程中发生。
要处理两个 future,您可能需要使用它们的单子属性:
val result: Future[Int] = firstFuture.flatMap { firstValue =>
secondFuture.flatMap { secondValue =>
firstValue + secondValue
}
}
或通过理解同样的事情:
val result: Future[Int] = for {
fistValue <- firstFuture
secondValue <- secondFuture
} yield firstValue + secondValue
然而,这不是您通常想要的 actors,因为您仍然会得到一个 Future 并且需要阻塞直到它完成。您通常想要的是将值传递给调用者或下一个参与者。为此,您可以使用 pipe
模式:
import akka.pattern.pipe
val resultFuture: Future[Int] = for {
fistValue <- firstFuture
secondValue <- secondFuture
} yield firstValue + secondValue
resultFuture pipeTo sender() // sends result to the sender when future is completed
另请参阅:Futures in Akka