带有 ReactorClientHttpConnector 的 Netty:如何在不使用已弃用的 tcpConfiguration 的情况下设置 readTimeout、writeTimeout 和 connectTimeout
Netty with ReactorClientHttpConnector: How to set readTimeout, writeTimeout and connectTimeout without using deprecated tcpConfiguration
Netty 已弃用 HttpClient#tcpConfiguration
的用法。我们正在寻找一种简单的配置方法:
- connectTimeout:等待连接的时间
- writeTimout:等待写入流的时间(如果在这个时间范围内无法传递数据,将抛出异常)
- readTimeout:等待多长时间从流中读取(如果在这个时间范围内没有传送数据,将抛出异常)
当前代码如下所示:
HttpClient httpClient = HttpClient.create();
Integer connectTimeOutInMs = clientProperties.getConnectTimeOutInMs();
Integer writeTimeOutInMs = clientProperties.getWriteTimeOutInMs();
Integer readTimeout = clientProperties.getReadTimeOutInMs();
httpClient = httpClient.tcpConfiguration(tcpClientParam -> {
TcpClient tcpClient = tcpClientParam;
// Connect timeout configuration
if (connectTimeOutInMs != null) {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeOutInMs);
}
return tcpClient.doOnConnected(conn -> {
if (readTimeout != null) {
conn.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
}
if (writeTimeOutInMs != null) {
conn.addHandlerLast(new WriteTimeoutHandler(writeTimeOutInMs, TimeUnit.MILLISECONDS));
}
});
});
在不使用 tcpConfiguration 的情况下应该如何配置?以下方法未按预期工作,并且未按预期抛出 ReadTimeout。
Integer readTimeout = clientProperties.getReadTimeOutInMs();
if (readTimeout != null) {
httpClient.doOnConnected(c -> c.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS)));
}
Integer writeTimeOutInMs = clientProperties.getWriteTimeOutInMs();
if (writeTimeOutInMs != null) {
httpClient.doOnConnected(
c -> c.addHandlerLast(new WriteTimeoutHandler(writeTimeOutInMs, TimeUnit.MILLISECONDS)));
}
Integer connectTimeout = clientProperties.getConnectTimeOutInMs();
if (connectTimeout != null) {
httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
}
正确的实施方式是什么?我看到 Netty 提供 HttpClient#responseTimeout()
,最后设置 HttpClientOperations#addHandler(NettyPipeline.ResponseTimeoutHandler, new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
。但是没有connect 和writeTimeouts 的方法。
这可以通过组合使用 Reactor ConnectionProvider 和一对 Netty 处理程序来完成。我正在使用注入 WebClient.Builder 的基本 Spring 配置 bean,如下所示:
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
public class WebClientFactory implements ConnectionObserver {
private final Logger LOG = LogManager.getLogger(getClass());
private final int connectTimeout;
private final long readTimeout;
private final long writeTimeout;
private final int maxConnections;
private final Duration maxAcquireTime;
private final Duration maxIdleTime;
private final Duration maxLifeTime;
private final WebClient.Builder webClientBuilder;
/**
* Creates a new WebClientFactory
* @param config The web client configuration
*/
public WebClientFactory(WebClientConfiguration config) {
connectTimeout = config.getConnectTimeout();
readTimeout = config.getReadTimeout();
writeTimeout = config.getWriteTimeout();
maxConnections = config.getMaxConnections();
maxAcquireTime = Duration.ofMillis(config.getMaxAcquireTime());
maxIdleTime = Duration.ofMillis(config.getMaxIdleTime());
maxLifeTime = Duration.ofMillis(config.getMaxLifeTime());
ConnectionProvider connectionProvider =
ConnectionProvider.builder("aod-http-client")
.maxConnections(maxConnections)
.pendingAcquireTimeout(maxAcquireTime)
.maxIdleTime(maxIdleTime)
.maxLifeTime(maxLifeTime)
.build();
HttpClient httpClient = HttpClient.create(connectionProvider)
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(writeTimeout, TimeUnit.MILLISECONDS))
).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.observe(this);
webClientBuilder = WebClient.builder()
.clientConnector(
new ReactorClientHttpConnector(httpClient)
);
LOG.info("WebClientConfig: connectTimeout={}, readTimeout={}, writeTimeout={}", connectTimeout, readTimeout, writeTimeout);
}
@Bean
protected WebClient.Builder webClientBuilder() {
return webClientBuilder;
}
/**
* @see reactor.netty.ConnectionObserver#onStateChange(reactor.netty.Connection, reactor.netty.ConnectionObserver.State)
*/
@Override
public void onStateChange(Connection connection, State newState) {
LOG.info("WebClient State Change: connection={}, newState={}", connection, newState);
}
/**
* @see reactor.netty.ConnectionObserver#onUncaughtException(reactor.netty.Connection, java.lang.Throwable)
*/
@Override
public void onUncaughtException(Connection connection, Throwable error) {
LOG.error("WebClient Uncaught Exception: connection={}", connection, error);
}
}
Netty 已弃用 HttpClient#tcpConfiguration
的用法。我们正在寻找一种简单的配置方法:
- connectTimeout:等待连接的时间
- writeTimout:等待写入流的时间(如果在这个时间范围内无法传递数据,将抛出异常)
- readTimeout:等待多长时间从流中读取(如果在这个时间范围内没有传送数据,将抛出异常)
当前代码如下所示:
HttpClient httpClient = HttpClient.create();
Integer connectTimeOutInMs = clientProperties.getConnectTimeOutInMs();
Integer writeTimeOutInMs = clientProperties.getWriteTimeOutInMs();
Integer readTimeout = clientProperties.getReadTimeOutInMs();
httpClient = httpClient.tcpConfiguration(tcpClientParam -> {
TcpClient tcpClient = tcpClientParam;
// Connect timeout configuration
if (connectTimeOutInMs != null) {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeOutInMs);
}
return tcpClient.doOnConnected(conn -> {
if (readTimeout != null) {
conn.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS));
}
if (writeTimeOutInMs != null) {
conn.addHandlerLast(new WriteTimeoutHandler(writeTimeOutInMs, TimeUnit.MILLISECONDS));
}
});
});
在不使用 tcpConfiguration 的情况下应该如何配置?以下方法未按预期工作,并且未按预期抛出 ReadTimeout。
Integer readTimeout = clientProperties.getReadTimeOutInMs();
if (readTimeout != null) {
httpClient.doOnConnected(c -> c.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS)));
}
Integer writeTimeOutInMs = clientProperties.getWriteTimeOutInMs();
if (writeTimeOutInMs != null) {
httpClient.doOnConnected(
c -> c.addHandlerLast(new WriteTimeoutHandler(writeTimeOutInMs, TimeUnit.MILLISECONDS)));
}
Integer connectTimeout = clientProperties.getConnectTimeOutInMs();
if (connectTimeout != null) {
httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
}
正确的实施方式是什么?我看到 Netty 提供 HttpClient#responseTimeout()
,最后设置 HttpClientOperations#addHandler(NettyPipeline.ResponseTimeoutHandler, new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
。但是没有connect 和writeTimeouts 的方法。
这可以通过组合使用 Reactor ConnectionProvider 和一对 Netty 处理程序来完成。我正在使用注入 WebClient.Builder 的基本 Spring 配置 bean,如下所示:
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
public class WebClientFactory implements ConnectionObserver {
private final Logger LOG = LogManager.getLogger(getClass());
private final int connectTimeout;
private final long readTimeout;
private final long writeTimeout;
private final int maxConnections;
private final Duration maxAcquireTime;
private final Duration maxIdleTime;
private final Duration maxLifeTime;
private final WebClient.Builder webClientBuilder;
/**
* Creates a new WebClientFactory
* @param config The web client configuration
*/
public WebClientFactory(WebClientConfiguration config) {
connectTimeout = config.getConnectTimeout();
readTimeout = config.getReadTimeout();
writeTimeout = config.getWriteTimeout();
maxConnections = config.getMaxConnections();
maxAcquireTime = Duration.ofMillis(config.getMaxAcquireTime());
maxIdleTime = Duration.ofMillis(config.getMaxIdleTime());
maxLifeTime = Duration.ofMillis(config.getMaxLifeTime());
ConnectionProvider connectionProvider =
ConnectionProvider.builder("aod-http-client")
.maxConnections(maxConnections)
.pendingAcquireTimeout(maxAcquireTime)
.maxIdleTime(maxIdleTime)
.maxLifeTime(maxLifeTime)
.build();
HttpClient httpClient = HttpClient.create(connectionProvider)
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(readTimeout, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(writeTimeout, TimeUnit.MILLISECONDS))
).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.observe(this);
webClientBuilder = WebClient.builder()
.clientConnector(
new ReactorClientHttpConnector(httpClient)
);
LOG.info("WebClientConfig: connectTimeout={}, readTimeout={}, writeTimeout={}", connectTimeout, readTimeout, writeTimeout);
}
@Bean
protected WebClient.Builder webClientBuilder() {
return webClientBuilder;
}
/**
* @see reactor.netty.ConnectionObserver#onStateChange(reactor.netty.Connection, reactor.netty.ConnectionObserver.State)
*/
@Override
public void onStateChange(Connection connection, State newState) {
LOG.info("WebClient State Change: connection={}, newState={}", connection, newState);
}
/**
* @see reactor.netty.ConnectionObserver#onUncaughtException(reactor.netty.Connection, java.lang.Throwable)
*/
@Override
public void onUncaughtException(Connection connection, Throwable error) {
LOG.error("WebClient Uncaught Exception: connection={}", connection, error);
}
}