Akka stream Http singleRequest 在等待响应时阻塞整个流
Akka stream Http singleRequest blocks whole stream while waiting for response
我正在尝试将 Akka Http 集成到我的 Akka 流中,但在极少数情况下,流会卡住。
implicit val system: ActorSystem = ActorSystem("actor-system")
Source(0 to 10)
.mapAsync(10)(i ⇒ {
val url =
if (i == 1) "http://run.mocky.io/v3/40ff086f-1389-4ca5-ace8-1f0b3ac75582?mocky-delay=10s"
else "http://google.com"
Http().singleRequest(HttpRequest(uri = url))
.runForeach(r ⇒ println(s"${System.currentTimeMillis()}: ${r._1}"))
此代码将在第一次输出后卡住 10 秒 1599480226827: 301 Moved Permanently
1599480226827: 301 Moved Permanently
1599480236826: 302 Found
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
来自 mapAsync
的 scaladoc
The number of Futures that shall run in parallel is given as the first argument to mapAsync. These Futures may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.
您的请求是并行发送的,但 runForeach
您可以使用 mapAsyncUnordered
我正在尝试将 Akka Http 集成到我的 Akka 流中,但在极少数情况下,流会卡住。
implicit val system: ActorSystem = ActorSystem("actor-system")
Source(0 to 10)
.mapAsync(10)(i ⇒ {
val url =
if (i == 1) "http://run.mocky.io/v3/40ff086f-1389-4ca5-ace8-1f0b3ac75582?mocky-delay=10s"
else "http://google.com"
Http().singleRequest(HttpRequest(uri = url))
.runForeach(r ⇒ println(s"${System.currentTimeMillis()}: ${r._1}"))
此代码将在第一次输出后卡住 10 秒 1599480226827: 301 Moved Permanently
1599480226827: 301 Moved Permanently
1599480236826: 302 Found
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
1599480236826: 301 Moved Permanently
来自 mapAsync
The number of Futures that shall run in parallel is given as the first argument to mapAsync. These Futures may complete in any order, but the elements that are emitted downstream are in the same order as received from upstream.
您的请求是并行发送的,但 runForeach
您可以使用 mapAsyncUnordered