在 Akka 消息中发送期货可以吗?
Is sending futures in Akka messages OK?
我正在努力实现一种小型语言来将任务发送到执行并控制执行流。在向我的系统发送任务后,用户得到一个未来(它可以调用阻塞 get() 或 flatMap() )。我的问题是:可以在 Akka 消息中发送期货吗?
示例:参与者 A 向参与者 B 发送消息 Response,并且 Response 在其字段中包含未来。然后在某个时刻,A 将实现创造未来的承诺。在收到Response后,B可以随时调用flatMap()或get()。
我问是因为 Akka 消息应该是不可变的,并且即使 actor 在不同的 JVM 上也能正常工作。如果参与者 A 和 B 在不同的 JVM 上,我看不出上面的示例如何工作。另外,即使 actors 在同一个 JVM 上,我的例子也有问题吗?
已接受的答案 中也做了类似的事情。如果 actor 在不同的 JVM 上,这会起作用吗?
如果你问这是否可能,那么是的,这是可能的。远程参与者基本上是进程间通信。如果您将两台机器上的所有设置都设置为两者都可以正确处理未来的状态,那么它应该是好的。你没有给出任何工作示例,所以我无法真正深入研究它。
没有远程处理是可能的,但仍然不可取。进行远程处理时,它根本不起作用。
如果您的目标是 API returns Future
,但使用 actors 作为底层管道,一种方法可能是 API 创建它自己的演员在内部 ask
s,然后 returns 从那个请求到调用者的未来。由 API 调用生成的 actor 保证在 API 实例本地,并且可以通过常规 tell
/receive
机制与 actor 系统的其余部分进行通信,这样就没有 Future
作为消息发送。
class MyTaskAPI(actorFactory: ActorRefFactory) {
def doSomething(...): Future[SomethingResult] = {
val taskActor = actorFactory.actorOf(Props[MyTaskActor])
taskActor ? DoSomething(...).mapTo[SomethingResult]
}
}
其中 MyTaskActor
接收 DoSomething
,捕获发送者,发出任务处理请求,并且可能 become
是 SomethingResult
的接收状态,最终响应捕获的发件人并自行停止。这种方法为每个请求创建两个参与者,一个是显式的,MyTaskActor
,另一个是隐式的,ask
的处理程序,但将所有状态保留在参与者内部。
或者,您可以使用 ActorDSL 仅创建一个内联 doSomething
的演员,并使用捕获的 Promise
完成而不是使用 ask
:
class MyTaskAPI(system: System) {
def doSomething(...): Future[SomethingResult] = {
val p = Promise[SomethingResult]()
val tmpActor = actor(new Act {
become {
case msg:SomethingResult =>
p.success(msg)
self.stop()
}
}
system.actorSelection("user/TaskHandler").tell(DoSomething(...), tmpActor)
p.future
}
}
这种方法有点超出我的想象,它确实在 API 和临时演员之间使用了一个共享值,有些人可能认为这是一种味道,但应该给出如何实现的想法您的工作流程。
我正在努力实现一种小型语言来将任务发送到执行并控制执行流。在向我的系统发送任务后,用户得到一个未来(它可以调用阻塞 get() 或 flatMap() )。我的问题是:可以在 Akka 消息中发送期货吗?
示例:参与者 A 向参与者 B 发送消息 Response,并且 Response 在其字段中包含未来。然后在某个时刻,A 将实现创造未来的承诺。在收到Response后,B可以随时调用flatMap()或get()。
我问是因为 Akka 消息应该是不可变的,并且即使 actor 在不同的 JVM 上也能正常工作。如果参与者 A 和 B 在不同的 JVM 上,我看不出上面的示例如何工作。另外,即使 actors 在同一个 JVM 上,我的例子也有问题吗?
已接受的答案
如果你问这是否可能,那么是的,这是可能的。远程参与者基本上是进程间通信。如果您将两台机器上的所有设置都设置为两者都可以正确处理未来的状态,那么它应该是好的。你没有给出任何工作示例,所以我无法真正深入研究它。
没有远程处理是可能的,但仍然不可取。进行远程处理时,它根本不起作用。
如果您的目标是 API returns Future
,但使用 actors 作为底层管道,一种方法可能是 API 创建它自己的演员在内部 ask
s,然后 returns 从那个请求到调用者的未来。由 API 调用生成的 actor 保证在 API 实例本地,并且可以通过常规 tell
/receive
机制与 actor 系统的其余部分进行通信,这样就没有 Future
作为消息发送。
class MyTaskAPI(actorFactory: ActorRefFactory) {
def doSomething(...): Future[SomethingResult] = {
val taskActor = actorFactory.actorOf(Props[MyTaskActor])
taskActor ? DoSomething(...).mapTo[SomethingResult]
}
}
其中 MyTaskActor
接收 DoSomething
,捕获发送者,发出任务处理请求,并且可能 become
是 SomethingResult
的接收状态,最终响应捕获的发件人并自行停止。这种方法为每个请求创建两个参与者,一个是显式的,MyTaskActor
,另一个是隐式的,ask
的处理程序,但将所有状态保留在参与者内部。
或者,您可以使用 ActorDSL 仅创建一个内联 doSomething
的演员,并使用捕获的 Promise
完成而不是使用 ask
:
class MyTaskAPI(system: System) {
def doSomething(...): Future[SomethingResult] = {
val p = Promise[SomethingResult]()
val tmpActor = actor(new Act {
become {
case msg:SomethingResult =>
p.success(msg)
self.stop()
}
}
system.actorSelection("user/TaskHandler").tell(DoSomething(...), tmpActor)
p.future
}
}
这种方法有点超出我的想象,它确实在 API 和临时演员之间使用了一个共享值,有些人可能认为这是一种味道,但应该给出如何实现的想法您的工作流程。