Akka stream Http singleRequest 在等待响应时阻塞整个流

Akka stream Http singleRequest blocks whole stream while waiting for response

我正在尝试将 Akka Http 集成到我的 Akka 流中,但在极少数情况下,流会卡住。

  implicit val system: ActorSystem = ActorSystem("actor-system")

  Source(0 to 10)
    .mapAsync(10)(i ⇒ {
      val url =
        if (i == 1) "http://run.mocky.io/v3/40ff086f-1389-4ca5-ace8-1f0b3ac75582?mocky-delay=10s"
        else "http://google.com"

      Http().singleRequest(HttpRequest(uri = url))
    })
    .runForeach(r ⇒ println(s"${System.currentTimeMillis()}: ${r._1}"))

此代码将在第一次输出后卡住 10 秒 1599480226827: 301 Moved Permanently,然后将同时刷新其余部分。

输出将是:

1599480226827: 301 Moved Permanently
1599480236826: 302 Found
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently

我希望它能按顺序输出所有内容,除了延迟的那个。

为什么我的流被这样的请求阻止了?以及如何避免?

来自 mapAsync

的 scaladoc

The number of Futures that shall run in parallel is given as the first argument to mapAsync. These Futures may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.

您的请求是并行发送的,但 runForeach 下的函​​数是按特定顺序调用的,导致输出结果延迟。正在等待第二个响应可用。

您可以使用 mapAsyncUnordered 来处理可用的回复。