有没有办法让 akka 的 mapAsync 中的函数超时?

Is there a way to timeout functions inside mapAsync for akka?

我正在尝试使用 akka 流进行异步 http 调用。

这是我试过的。

Source(listEndpoints)
      .mapAsync(20)(endpoint => Future(Await.result(request(HttpMethods.POST, endpoint, List(authHeader)), timeout)))
      .runWith(Sink.seq[HttpResponse])

我在请求方法中使用 akka-http,它 returns Future[HttpResponse]

我想我在这里滥用Future。上面的代码会给我一个 Future[List[HttpResponse]],我必须再次使用 Await 才能得到 List[HttpResponse]。在 mapAsync?

中是否有更优雅的超时函数方法?

假设您的 request 方法在某些时候确实

Http().singleRequest

要获得 Future[HttpResponse],您可以通过以下方式为请求传递超时:

// inside def request(...), will probably need to add a timeout argument here
val request = ???  // Build the HttpRequest
Http().singleRequest(
  request = request,
  settings = ConnectionPoolSettings.default.withMaxConnectionLifetime(timeout)

那么您的直播就是

Source(listEndpoints)
  .mapAsync(request(...))
  .runWith(Sink.seq[HttpResponse])

并且您只需要 Await 在“世界尽头”即可完成 Future[List[HttpResponse]]

您还可以在 application.conf

中使用 akka.http.host-connection-pool.max-connection-lifetime 更改默认的最长连接寿命