reactor-netty:使用 keep-alive HTTP 客户端
reactor-netty: using keep-alive HTTP client
我使用 reactor-netty
来请求一组 URL。大多数 URL 属于同一主机。 reactor-netty
似乎为每个 URL 创建一个全新的 TCP 连接,即使已经为之前的 URL 建立了与主机的连接。当同时建立数百个连接时,某些服务器会丢弃新连接或开始响应缓慢。
代码示例:
Flux.just(...)
.groupBy(link -> {
String host = "";
try {
host = new URL(link).getHost();
} catch (MalformedURLException e) {
LOGGER.warn("Cannot determine host {}", link, e);
}
return host;
})
.flatMap(group -> {
HttpClient client = HttpClient.create()
.keepAlive(true)
.tcpConfiguration(tcp -> tcp.host(group.key()));
return group.flatMap(link -> client.get()
.uri(link)
.response((resp, cont) -> resp.status().code() == 200 ? cont.aggregate().asString() : Mono.empty())
.doOnSubscribe(s -> LOGGER.debug("Requesting {}", link))
.timeout(Duration.ofMinutes(1))
.doOnError(e -> LOGGER.warn("Cannot get response from {}", link, e))
.onErrorResume(e -> Flux.empty())
.collect(Collectors.joining())
.filter(s -> StringUtils.isNotBlank(s)));
})
.blockLast();
在日志中,我看到同一远程主机的本地端口不同,活动和非活动连接的总和远高于不同主机的数量。这就是为什么我认为 reactor-netty
没有重用已经建立的连接。
DEBUG [2019-04-29 08:15:18,711] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xaed18e87, L:/192.168.1.183:56832 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Releasing channel
DEBUG [2019-04-29 08:15:18,711] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xaed18e87, L:/192.168.1.183:56832 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Channel cleaned, now 1 active connections and 239 inactive connections
...
DEBUG [2019-04-29 08:15:20,158] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xd6c6c5db, L:/192.168.1.183:56965 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Releasing channel
DEBUG [2019-04-29 08:15:20,158] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xd6c6c5db, L:/192.168.1.183:56965 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Channel cleaned, now 0 active connections and 240 inactive connections
是否可以使用 keep-alive
HTTP 客户端通过与主机的相同 TCP 连接在同一主机上请求多个 URL?如果不是,我如何限制同时连接到同一主机的数量或顺序执行对同一主机的请求(仅在收到对前一个请求的响应后才发出下一个请求)?
我使用 Californium-SR6
发布序列。
是的,reactor netty支持keep-alive、连接重用和连接池。
请注意,.flatMap
是并行处理内部流的异步操作。因此,当您调用 group.flatMap(...
时,内部请求将并行执行。并且由于它们是并行执行的,因此需要建立多个连接。
如果您想按顺序对同一主机执行请求,请将您的示例更改为使用 group.concatMap
而不是 .flatMap
。
如果您仍想并行执行它们,但限制对单个主机的活动请求数,则更改您的示例以使用 .flatMap
的重载版本之一,该版本需要 concurrency
参数.
此外,由于您使用的是 HttpClient.create()
,您的示例使用默认的全局 http 连接池。如果你想更好地控制连接池,你可以通过 HttpClient.create(ConnectionProvider)
.
指定不同的 ConnectionProvider
我使用 reactor-netty
来请求一组 URL。大多数 URL 属于同一主机。 reactor-netty
似乎为每个 URL 创建一个全新的 TCP 连接,即使已经为之前的 URL 建立了与主机的连接。当同时建立数百个连接时,某些服务器会丢弃新连接或开始响应缓慢。
代码示例:
Flux.just(...)
.groupBy(link -> {
String host = "";
try {
host = new URL(link).getHost();
} catch (MalformedURLException e) {
LOGGER.warn("Cannot determine host {}", link, e);
}
return host;
})
.flatMap(group -> {
HttpClient client = HttpClient.create()
.keepAlive(true)
.tcpConfiguration(tcp -> tcp.host(group.key()));
return group.flatMap(link -> client.get()
.uri(link)
.response((resp, cont) -> resp.status().code() == 200 ? cont.aggregate().asString() : Mono.empty())
.doOnSubscribe(s -> LOGGER.debug("Requesting {}", link))
.timeout(Duration.ofMinutes(1))
.doOnError(e -> LOGGER.warn("Cannot get response from {}", link, e))
.onErrorResume(e -> Flux.empty())
.collect(Collectors.joining())
.filter(s -> StringUtils.isNotBlank(s)));
})
.blockLast();
在日志中,我看到同一远程主机的本地端口不同,活动和非活动连接的总和远高于不同主机的数量。这就是为什么我认为 reactor-netty
没有重用已经建立的连接。
DEBUG [2019-04-29 08:15:18,711] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xaed18e87, L:/192.168.1.183:56832 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Releasing channel
DEBUG [2019-04-29 08:15:18,711] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xaed18e87, L:/192.168.1.183:56832 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Channel cleaned, now 1 active connections and 239 inactive connections
...
DEBUG [2019-04-29 08:15:20,158] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xd6c6c5db, L:/192.168.1.183:56965 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Releasing channel
DEBUG [2019-04-29 08:15:20,158] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xd6c6c5db, L:/192.168.1.183:56965 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Channel cleaned, now 0 active connections and 240 inactive connections
是否可以使用 keep-alive
HTTP 客户端通过与主机的相同 TCP 连接在同一主机上请求多个 URL?如果不是,我如何限制同时连接到同一主机的数量或顺序执行对同一主机的请求(仅在收到对前一个请求的响应后才发出下一个请求)?
我使用 Californium-SR6
发布序列。
是的,reactor netty支持keep-alive、连接重用和连接池。
请注意,.flatMap
是并行处理内部流的异步操作。因此,当您调用 group.flatMap(...
时,内部请求将并行执行。并且由于它们是并行执行的,因此需要建立多个连接。
如果您想按顺序对同一主机执行请求,请将您的示例更改为使用 group.concatMap
而不是 .flatMap
。
如果您仍想并行执行它们,但限制对单个主机的活动请求数,则更改您的示例以使用 .flatMap
的重载版本之一,该版本需要 concurrency
参数.
此外,由于您使用的是 HttpClient.create()
,您的示例使用默认的全局 http 连接池。如果你想更好地控制连接池,你可以通过 HttpClient.create(ConnectionProvider)
.
ConnectionProvider