Akka HTTP 流 API 循环永远不会完成

Akka HTTP streaming API with cycles never completes

我正在构建一个应用程序,我在其中接收来自用户的请求,调用 REST API 以取回一些数据,然后根据该响应进行另一个 HTTP 调用,依此类推。基本上,我正在处理一棵数据树,其中树中的每个节点都需要我递归调用此 API,如下所示:

     A
    / \
   B   C
  / \   \
 D   E   F

我正在使用 Akka HTTP 和 Akka Streams 来构建应用程序,所以我正在使用它的流式传输 API,如下所示:

val httpFlow = Http().cachedConnection(host = "localhost")
val flow = GraphDSL.create() { implicit builder =>
   import GraphDSL.Implicits._

   val merge = b.add(Merge[Data](2))
   val bcast = b.add(Broadcast[ResponseData](2))

   takeUserData ~> merge ~> createRequest ~> httpFlow ~> processResponse ~> bcast
                   merge <~         extractSubtree                       <~ bcast

   FlowShape(takeUserData.in, bcast.out(1))
}

我知道在 Akka Streams 应用程序中处理递归的最佳方法是在流外处理递归,但由于我递归调用 HTTP 流来获取数据的每个子树,我想确保以防 API 过载。

问题是此流永远不会完成。如果我将它连接到像这样的简单来源:

val source = Source.single(data)
val sink = Sink.seq[ResponseData]

source.via(flow).runWith(sink)

它打印出它正在处理树中的所有数据,然后停止打印任何东西,永远处于空闲状态。

我阅读了 the documentation about cycles and the suggestion was to put a MergePreferred in there, but that didn't seem to help. 帮助,但我不明白为什么 MergePreferred 不会停止死锁,因为与他们的示例不同,元素在树.

为什么 MergePreferred 不能避免死锁,还有其他方法吗?

MergePreferred(在没有 eagerComplete 为真时)将在所有输入完成时完成,这通常适用于 Akka Streams 中的阶段(完成从开始向下流动) ).

因此这意味着在输入和 extractSubtree 信号完成之前合并无法传播完成。 extractSubtree 不会发出完成信号(很可能,不知道流程中的阶段),直到 bcast 发出完成信号,这(再次很可能)不会发生,直到 processResponse 发出完成信号*在 httpFlow 发出完成信号之前不会发生,而在 createRequest 发出完成信号之前*不会发生,在 merge 发出完成信号之前*不会发生。因为一般来说检测这个循环是不可能的(考虑到有些阶段的完成是完全动态的),Akka Streams 有效地采取了这样的立场,如果你想创建这样的循环,由你来决定如何打破循环。

正如您所注意到的,eagerComplete 为真会改变此行为,但由于它会在任何输入完成后立即完成(在这种情况下,由于循环,它始终是输入)merge 完成并取消对 extractSubtree 的需求(这本身可能(取决于 Broadcast 是否设置了 eagerCancel)导致下游取消),这可能会导致在extractSubtree 发出的至少一些元素没有得到处理。

如果您完全确定输入完成意味着循环最终会干涸,您可以使用 eagerComplete = false 如果您有一些方法可以在循环干涸后完成 extractSubtree 并且输入完成。一个粗略的大纲(不知道 extractSubtree 中具体是什么):

  • 将从 bcast 进入 extractSubtree 的所有内容映射到输入
  • Some
  • 预先实现一个 Source.actorRef,您可以向其发送 None,保存 ActorRef(这将是此源的具体化值)
  • 将输入与该预实现源合并
  • 提取子树时,使用一个statefulMapConcat阶段来跟踪是否a) a None已经被看到和b) 有多少子树正在等待(初始值1,加上(第一代)该节点的子节点减1,即没有子节点减1);如果已经看到 None 并且没有子树挂起,则发出 List(None),否则发出 List 包裹在 Some
  • 中的每个子树
  • 有一个 takeWhile(_.isDefined),它会在看到 None
  • 时完成
  • 如果您在 extractSubtrees 中有更复杂的东西(例如副作用),您将必须弄清楚将它们放在哪里
  • 在合并外部输入之前,将其传递给 watchTermination 阶段,并在未来回调(成功时)将 None 发送到预实现时获得的 ActorRef Source.actorRef 对于 extractSubtrees。因此,当输入完成时,watchTermination 将成功触发并有效地向 extractSubtrees 发送消息以监视它何时完成飞行树。