正确使用Akka http客户端连接池
Correct use of Akka http client connection pools
我需要使用 Akka 的 HTTP 客户端 (v2.0.2) 使用 REST 服务。合乎逻辑的方法是通过主机连接池来执行此操作,因为我们期望有大量的同时连接。 Flow
为此消耗了一个(HttpRequest, T)
和returns一个(Try[HttpResponse, T)
。 documentation 表示需要某种任意类型 T
来管理对请求的潜在乱序响应,但没有指出调用者应该如何处理返回的 T
。
我的第一次尝试是使用 Int
作为 T
的函数。从许多地方调用它以确保连接使用单个池。
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
case (Failure(f), `unique`) ⇒ Future.failed(f)
case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
}
}
问题是客户端应该如何使用这个T
?有没有更清洁更有效的解决方案?最后,我的偏执狂是不是真的偏执狂?
起初我自己对此有点困惑,直到我通读了几次文档。如果您打算在池中使用单个请求,无论有多少不同的地方共享同一个池,您提供的 T
(在您的情况下为 Int
)都无关紧要。因此,如果您一直使用 Source.single
,那么如果您确实需要,该键始终可以是 1
。
它确实发挥作用的地方是,如果一段代码要使用池并一次将多个请求提交到池中,并需要所有这些请求的响应。原因是响应按照从被调用服务接收到的顺序返回,而不是按照它们提供给池的顺序返回。每个请求可能需要不同的时间,因此它们按照从池中收到的顺序向下游流向 Sink
。
假设我们有一项服务可以接受 GET
请求,其格式为 url:
/product/123
其中 123
部分是您要查找的产品的 ID。如果我想一次查找所有产品 1-10
,对每个产品都有单独的请求,这就是标识符变得重要的地方,这样我就可以将每个 HttpResponse
与其对应的产品 ID 相关联。此场景的简化代码示例如下:
val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] =
Source(requests).
via(pool).
runFold(Map.empty[Int,HttpResponse]){
case (m, (util.Success(resp), id)) =>
m ++ Map(id -> resp)
case (m, (util.Failure(ex), i)) =>
//Log a failure here probably
m
}
当我在 fold
中收到我的回复时,我还方便地拥有每个关联的 ID,因此我可以将它们添加到我的 Map
中,该 ID 由 ID 键入。如果没有这个功能,我可能不得不做一些像解析正文(如果它是 json)这样的事情来尝试找出哪个响应是哪个响应是不理想的,并且这不包括失败案例。在这个解决方案中,我知道哪些请求失败了,因为我仍然得到了标识符。
我希望这能为您澄清一些事情。
Akka HTTP 连接池在消耗基于 HTTP 的资源时是强大的盟友。如果您要一次执行单个请求,那么解决方案是:
def exec(req: HttpRequest): Future[HttpResponse] = {
Source.single(req → 1)
.via(pool)
.runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
case (Failure(f), _) ⇒ Future.failed(f)
}
}
因为您正在执行 single
请求,所以不需要消除响应的歧义。然而,Akka 流很聪明。您可以同时向池中提交多个请求。在本例中,我们传入一个 Iterable[HttpRequest]
。使用 SortedMap
将返回的 Iterable[HttpResponse]
重新排序为与原始请求相同的顺序。你可以做一个 request zip response
来排列:
def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.map(r ⇒ r.values)
}
如果您需要按照自己的方式解包,可迭代期货的期货就很棒。只需将事物展平即可获得更简单的响应。
def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.flatMap(r ⇒ Future.sequence(r.values))
}
我已经 this gist 使用所有的导入和包装器来创建一个客户端来使用 HTTP 服务。
特别感谢@cmbaxter 提供的简洁示例。
有一个改进 akka-http 文档的开放票证。请
check this example
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
.via(pool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise
val response = queue.offer(request).flatMap(buffered => {
if (buffered) promise.future
else Future.failed(new RuntimeException())
})
我需要使用 Akka 的 HTTP 客户端 (v2.0.2) 使用 REST 服务。合乎逻辑的方法是通过主机连接池来执行此操作,因为我们期望有大量的同时连接。 Flow
为此消耗了一个(HttpRequest, T)
和returns一个(Try[HttpResponse, T)
。 documentation 表示需要某种任意类型 T
来管理对请求的潜在乱序响应,但没有指出调用者应该如何处理返回的 T
。
我的第一次尝试是使用 Int
作为 T
的函数。从许多地方调用它以确保连接使用单个池。
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
case (Failure(f), `unique`) ⇒ Future.failed(f)
case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
}
}
问题是客户端应该如何使用这个T
?有没有更清洁更有效的解决方案?最后,我的偏执狂是不是真的偏执狂?
起初我自己对此有点困惑,直到我通读了几次文档。如果您打算在池中使用单个请求,无论有多少不同的地方共享同一个池,您提供的 T
(在您的情况下为 Int
)都无关紧要。因此,如果您一直使用 Source.single
,那么如果您确实需要,该键始终可以是 1
。
它确实发挥作用的地方是,如果一段代码要使用池并一次将多个请求提交到池中,并需要所有这些请求的响应。原因是响应按照从被调用服务接收到的顺序返回,而不是按照它们提供给池的顺序返回。每个请求可能需要不同的时间,因此它们按照从池中收到的顺序向下游流向 Sink
。
假设我们有一项服务可以接受 GET
请求,其格式为 url:
/product/123
其中 123
部分是您要查找的产品的 ID。如果我想一次查找所有产品 1-10
,对每个产品都有单独的请求,这就是标识符变得重要的地方,这样我就可以将每个 HttpResponse
与其对应的产品 ID 相关联。此场景的简化代码示例如下:
val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] =
Source(requests).
via(pool).
runFold(Map.empty[Int,HttpResponse]){
case (m, (util.Success(resp), id)) =>
m ++ Map(id -> resp)
case (m, (util.Failure(ex), i)) =>
//Log a failure here probably
m
}
当我在 fold
中收到我的回复时,我还方便地拥有每个关联的 ID,因此我可以将它们添加到我的 Map
中,该 ID 由 ID 键入。如果没有这个功能,我可能不得不做一些像解析正文(如果它是 json)这样的事情来尝试找出哪个响应是哪个响应是不理想的,并且这不包括失败案例。在这个解决方案中,我知道哪些请求失败了,因为我仍然得到了标识符。
我希望这能为您澄清一些事情。
Akka HTTP 连接池在消耗基于 HTTP 的资源时是强大的盟友。如果您要一次执行单个请求,那么解决方案是:
def exec(req: HttpRequest): Future[HttpResponse] = {
Source.single(req → 1)
.via(pool)
.runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
case (Failure(f), _) ⇒ Future.failed(f)
}
}
因为您正在执行 single
请求,所以不需要消除响应的歧义。然而,Akka 流很聪明。您可以同时向池中提交多个请求。在本例中,我们传入一个 Iterable[HttpRequest]
。使用 SortedMap
将返回的 Iterable[HttpResponse]
重新排序为与原始请求相同的顺序。你可以做一个 request zip response
来排列:
def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.map(r ⇒ r.values)
}
如果您需要按照自己的方式解包,可迭代期货的期货就很棒。只需将事物展平即可获得更简单的响应。
def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.flatMap(r ⇒ Future.sequence(r.values))
}
我已经 this gist 使用所有的导入和包装器来创建一个客户端来使用 HTTP 服务。
特别感谢@cmbaxter 提供的简洁示例。
有一个改进 akka-http 文档的开放票证。请 check this example
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
.via(pool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise
val response = queue.offer(request).flatMap(buffered => {
if (buffered) promise.future
else Future.failed(new RuntimeException())
})