在 akka 流处理中进行阻塞 http 调用

Making blocking http call in akka stream processing

我是 akka 的新手,仍在努力理解不同的 akka 和流媒体概念。对于某些新功能,我需要向正在处理内部对象的现有流添加 http 调用。像这样 -

    val step1Flow = Flow[SampleObject].filter(...--Filtering condition--...)
    val step2Flow = Flow[SampleObject].map(obj => {
         ...
         -- Business logic to update values in the obj --
         ...
    })
    ...
override val flowGraph: Flow[SampleObject, SampleObject, NotUsed] =
bufferIn.via(Flow.fromGraph(GraphDSL.create() {
  implicit builder =>
    import GraphDSL.Implicits._
    ...
    val step1 = builder.add(step1Flow)
    val step2 = builder.add(step2Flow)
    val step3 = builder.add(step3Flow)
    ...

    source ~> step1 ~> step2 ~> step3 ~> merge
    ...
}

我需要在 step1 之后添加新的 http 请求流(我们称之为 newFlow)。所有这些流都有入口和出口作为 SampleObject。现在我的理解是 newFlow 需要阻塞,因为出口只需要是 SampleObject。为此,我在未来的 http 调用中使用了 Await 函数。代码看起来像这样 -

val responseFuture: Future[(Try[HttpResponse], SomeContext)] =
  Source
    .single(httpRequest -> context)
    .via(Retry(retrySettings).join(clientFlow))
    .runWith(Sink.head)
...
val (httpTry, passedAlongContext) = Await.result(responseFuture, 30.seconds)
-- logic to process response and return SampleObject --

现在这工作正常,但我认为应该有更好的方法来做到这一点而不使用等待。另外我认为这会阻塞主线程直到请求完成,这将影响应用程序的吞吐量。 您能否指导我使用的方法是否正确。我如何利用其他一些线程池来处理这些阻塞调用,这样我的主线程池就不会受到影响

这个问题看起来和我的很像,但我没有完全理解 - 。我也无法更改步骤 2 或进一步的流程。

编辑:为流添加了一些代码细节

我最终使用了问题中提到的方法,因为环顾四周后我找不到更好的方法。添加此步骤会按预期降低我的应用程序的吞吐量,但可以使用一些方法来增加吞吐量。查看 Colin Breck 的这些很棒的博客 -

总结一下-

  1. 对阻塞的流使用异步边界。
  2. 尽可能使用 Futures 并向 futures 添加回调。有几种方法可以做到这一点。
  3. 使用缓冲区。有多种类型的缓冲区可用,选择适合您需要的。

除此之外,您还可以使用内置流程,例如 -

  1. 使用“广播”将您的事件广播给多个消费者。
  2. 使用“分区”将您的流分成多个基于 在某些情况下。
  3. 当没有合理的方式来划分您的事件或它们都可能有不同的工作负载时,使用“平衡”来划分您的流。

您可以使用上述选项中的任何一项或多项。