类型化的 Akka Actor 中适当的 Future 处理
Appropriate Future Handling in Akka Actors Typed
从 Akka(类型化)Actor 内部处理 Futures 的正确方法是什么?
例如,假设有一个 Actor OrderActor
接收命令来下订单......它通过对外部服务进行 http 调用来完成。由于这些是对外部服务的 http 调用,因此涉及 Future
s。那么,从 Actor 内部处理 Future
的正确方法是什么。
我阅读了一些有关 pipeTo 模式的内容。这是这里需要发生的事情还是其他事情?
class OrderActor(context: ActorContext[OrderCommand], orderFacade: OrderFacade)
extends AbstractBehavior[OrderCommand](context) {
context.log.info("Order Actor started")
override def onMessage(msg: OrderCommand): Behavior[OrderCommand] = {
msg match {
case PlaceOrder(
referenceId: OrderReferenceId,
ticker: Ticker,
quantity: Int,
replyTo: ActorRef[OrderResult]
) =>
orderFacade
.placeOrder(ticker, quantity) //this returns a Future
.map(res => {
//transform result
//book keeping / notification (affects state)
replyTo ! transformed
//Can/Should we map like this? I tried adding a log statement in here, but I never see it... and the replyTo doesnt seem to get the message.
})
this
通常最好避免在 actor 内部进行 Future
转换(map
、flatMap
、foreach
等)。存在一个明显的风险,即 actor 中的某些可变状态不是您在转换运行时所期望的状态。在 Akka Classic 中,最有害的形式可能是将回复发送给错误的参与者。
Akka Typed(尤其是在函数式 API 中)减少了很多可能导致麻烦的可变状态,但通常将 Future
作为消息传递给演员。
因此,如果 orderFacade.placeOrder
导致 Future[OrderResponse]
,您可以像这样添加 OrderCommand
的子类
// also include fields from the PlaceOrder which will be useful
case class OrderResponseIs(resp: OrderResponse, replyTo: ActorRef[OrderResult]) extends OrderCommand
// TODO include fields
case class OrderFailed() extends OrderCommand
然后通过管道将 Future
传送给您自己:
import scala.util.{ Failure, Success }
context.pipeToSelf(orderFacade.placeOrder) {
case Success(resp) => OrderResponseIs(resp, replyTo)
case Failure(_) => OrderFailed()
}
然后您必须处理这些消息:
case OrderResponseIs(resp, replyTo) =>
// transform resp
val transformed = ???
replyTo ! transformed
this
case OrderFailed() =>
context.log.warning("Stuff is broken")
this
与 map
和朋友相比,这样做实际上并没有太多开销(两者通常都涉及调度任务以在调度程序上异步执行)。
从 Akka(类型化)Actor 内部处理 Futures 的正确方法是什么?
例如,假设有一个 Actor OrderActor
接收命令来下订单......它通过对外部服务进行 http 调用来完成。由于这些是对外部服务的 http 调用,因此涉及 Future
s。那么,从 Actor 内部处理 Future
的正确方法是什么。
我阅读了一些有关 pipeTo 模式的内容。这是这里需要发生的事情还是其他事情?
class OrderActor(context: ActorContext[OrderCommand], orderFacade: OrderFacade)
extends AbstractBehavior[OrderCommand](context) {
context.log.info("Order Actor started")
override def onMessage(msg: OrderCommand): Behavior[OrderCommand] = {
msg match {
case PlaceOrder(
referenceId: OrderReferenceId,
ticker: Ticker,
quantity: Int,
replyTo: ActorRef[OrderResult]
) =>
orderFacade
.placeOrder(ticker, quantity) //this returns a Future
.map(res => {
//transform result
//book keeping / notification (affects state)
replyTo ! transformed
//Can/Should we map like this? I tried adding a log statement in here, but I never see it... and the replyTo doesnt seem to get the message.
})
this
通常最好避免在 actor 内部进行 Future
转换(map
、flatMap
、foreach
等)。存在一个明显的风险,即 actor 中的某些可变状态不是您在转换运行时所期望的状态。在 Akka Classic 中,最有害的形式可能是将回复发送给错误的参与者。
Akka Typed(尤其是在函数式 API 中)减少了很多可能导致麻烦的可变状态,但通常将 Future
作为消息传递给演员。
因此,如果 orderFacade.placeOrder
导致 Future[OrderResponse]
,您可以像这样添加 OrderCommand
的子类
// also include fields from the PlaceOrder which will be useful
case class OrderResponseIs(resp: OrderResponse, replyTo: ActorRef[OrderResult]) extends OrderCommand
// TODO include fields
case class OrderFailed() extends OrderCommand
然后通过管道将 Future
传送给您自己:
import scala.util.{ Failure, Success }
context.pipeToSelf(orderFacade.placeOrder) {
case Success(resp) => OrderResponseIs(resp, replyTo)
case Failure(_) => OrderFailed()
}
然后您必须处理这些消息:
case OrderResponseIs(resp, replyTo) =>
// transform resp
val transformed = ???
replyTo ! transformed
this
case OrderFailed() =>
context.log.warning("Stuff is broken")
this
与 map
和朋友相比,这样做实际上并没有太多开销(两者通常都涉及调度任务以在调度程序上异步执行)。