在 Akka Typed 中聚合多个子演员响应
Aggregating multiple child actor responses in Akka Typed
我目前正在将 Akka Classic 应用程序移植到 Akka Typed。我有以下组件:
- HttpService - 不是演员
- JobDispatcher - 演员
- JobWorker - JobDispatcher 的子角色
JobDispatcher 是一个用于编排作业的单例 actor。每个 JobWorker 负责一个“工作”,并且知道该工作的状态。
HTTP 服务将向 JobDispatcher 发出请求,称为 GetJobStatuses。
然后,JobDispatcher 将询问每个 JobWorker 的状态,将结果聚合到一个列表中,然后回复 HttpService。
我在 Akka Classic 中的做法是让 JobDispatcher 完成所有 Asks,将 Futures 放入 Futures 列表中,然后将其转换为 Future of Lists,当 Future 聚合完成时,我会将结果发送到 HttpService。它看起来像这样:
val statusFutures: Seq[Future[JobStatus]] = jobWorkers map (jobWorker => (jobWorker ? GetJobStatus).mapTo[JobStatus])
val aggregateFuture: Future[Seq[SearchStatus]] = Future.sequence(statusFutures)
val theSender = context.sender()
aggregateFuture onComplete {
case Success(jobStatuses: Seq[JobStatus]) => {
theSender ! jobStatuses
}
case Failure(exception) => {
theSender ! exception
}
}
所以,现在我们要转向 Akka Typed,我们不应该使用 Futures / onComplete,而是将 Ask 响应转换为我们自己返回的消息(在本例中为 JobDispatcher)。对于我向另一位演员征求一个答复的简单情况,这是相当简单的。但在这种情况下,我有一个完整的儿童演员列表,我需要从中汇编他们的回复。
我唯一能想到的就是让 JobDispatcher 保存我正在等待的 JobWorker 响应列表的“状态”,跟踪哪些已经收到,以及我什么时候收到它们, 将响应消息发送回 HTTP 服务。由于我可能有多个来自 HTTP 服务的同时请求,我不得不跟踪这个“状态”的多个副本,并以某种方式识别每个请求针对哪个 HTTP 请求,这使情况变得更加复杂。
这比上面的聚合 Future 解决方案复杂得多。
在 Akka Typed 中处理这种情况的 simple/correct 方法是什么?
对于这种情况,文档建议 using a per-session child actor。仅与单个 HTTP 请求关联的子参与者隐式跟踪该状态的一个副本,并且还能够管理 scatter/gathering 作业的进程状态(例如围绕超时和重试)。
还值得注意的是,示例经典代码有一个巨大的错误:永远不要在涉及期货的代码中调用 sender
。混合 futures 和 actors 表面上很容易,但也很容易变成只有巧合才能起作用的东西(测试经常表现出这种巧合行为)。
我目前正在将 Akka Classic 应用程序移植到 Akka Typed。我有以下组件:
- HttpService - 不是演员
- JobDispatcher - 演员
- JobWorker - JobDispatcher 的子角色
JobDispatcher 是一个用于编排作业的单例 actor。每个 JobWorker 负责一个“工作”,并且知道该工作的状态。
HTTP 服务将向 JobDispatcher 发出请求,称为 GetJobStatuses。 然后,JobDispatcher 将询问每个 JobWorker 的状态,将结果聚合到一个列表中,然后回复 HttpService。
我在 Akka Classic 中的做法是让 JobDispatcher 完成所有 Asks,将 Futures 放入 Futures 列表中,然后将其转换为 Future of Lists,当 Future 聚合完成时,我会将结果发送到 HttpService。它看起来像这样:
val statusFutures: Seq[Future[JobStatus]] = jobWorkers map (jobWorker => (jobWorker ? GetJobStatus).mapTo[JobStatus])
val aggregateFuture: Future[Seq[SearchStatus]] = Future.sequence(statusFutures)
val theSender = context.sender()
aggregateFuture onComplete {
case Success(jobStatuses: Seq[JobStatus]) => {
theSender ! jobStatuses
}
case Failure(exception) => {
theSender ! exception
}
}
所以,现在我们要转向 Akka Typed,我们不应该使用 Futures / onComplete,而是将 Ask 响应转换为我们自己返回的消息(在本例中为 JobDispatcher)。对于我向另一位演员征求一个答复的简单情况,这是相当简单的。但在这种情况下,我有一个完整的儿童演员列表,我需要从中汇编他们的回复。
我唯一能想到的就是让 JobDispatcher 保存我正在等待的 JobWorker 响应列表的“状态”,跟踪哪些已经收到,以及我什么时候收到它们, 将响应消息发送回 HTTP 服务。由于我可能有多个来自 HTTP 服务的同时请求,我不得不跟踪这个“状态”的多个副本,并以某种方式识别每个请求针对哪个 HTTP 请求,这使情况变得更加复杂。
这比上面的聚合 Future 解决方案复杂得多。
在 Akka Typed 中处理这种情况的 simple/correct 方法是什么?
对于这种情况,文档建议 using a per-session child actor。仅与单个 HTTP 请求关联的子参与者隐式跟踪该状态的一个副本,并且还能够管理 scatter/gathering 作业的进程状态(例如围绕超时和重试)。
还值得注意的是,示例经典代码有一个巨大的错误:永远不要在涉及期货的代码中调用 sender
。混合 futures 和 actors 表面上很容易,但也很容易变成只有巧合才能起作用的东西(测试经常表现出这种巧合行为)。