在 spring amqp 中使用 spring webclient 的正确方法
Correct way of using spring webclient in spring amqp
我有一个 spring amqp 应用程序使用来自 rabbitmq 的消息的以下技术堆栈 -
Spring boot 2.2.6.RELEASE
Reactor Netty 0.9.12.RELEASE
Reactor Core 3.3.10.RELEASE
应用程序部署在 4 核 RHEL 上。
以下是用于 rabbitmq 的一些配置
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(<<HOST NAME>>);
cachingConnectionFactory.setUsername(<<USERNAME>>);
cachingConnectionFactory.setPassword(<<PASSWORD>>);
cachingConnectionFactory.setChannelCacheSize(50);
return cachingConnectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxConcurrentConsumers(50);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false); /** DLQ is in place **/
return factory;
}
消费者在同步模式下使用 spring webclient 进行下游 API 调用。下面是 Webclient
的配置
@Bean
public WebClient webClient() {
ConnectionProvider connectionProvider = ConnectionProvider
.builder("fixed")
.lifo()
.pendingAcquireTimeout(Duration.ofMillis(200000))
.maxConnections(16)
.pendingAcquireMaxCount(3000)
.maxIdleTime(Duration.ofMillis(290000))
.build();
HttpClient client = HttpClient.create(connectionProvider);
client.tcpConfiguration(<<connection timeout, read timeout, write timeout is set here....>>);
Webclient.Builder builder =
Webclient.builder().baseUrl(<<base URL>>).clientConnector(new ReactorClientHttpConnector(client));
return builder.build();
}
此 Web 客户端已自动连接到 @Service class as
@Autowired
private Webclient webClient;
并在以下两个地方使用。第一名是一个电话-
public DownstreamStatusEnum downstream(String messageid, String payload, String contentType) {
return call(messageid,payload,contentType);
}
private DownstreamStatusEnum call(String messageid, String payload, String contentType) {
DownstreamResponse response = sendRequest(messageid,payload,contentType).**block()**;
return response;
}
private Mono<DownstreamResponse> sendRequest(String messageid, String payload, String contentType) {
return webClient
.method(POST)
.uri(<<URI>>)
.contentType(MediaType.valueOf(contentType))
.body(BodyInserters.fromValue(payload))
.exchange()
.flatMap(response -> response.bodyToMono(DownstreamResponse.class));
}
其他地方需要并行下游调用,已实现如下
private Flux<DownstreamResponse> getValues (List<DownstreamRequest> reqList, String messageid) {
return Flux
.fromIterable(reqList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(s -> {
return webClient
.method(POST)
.uri(<<downstream url>>)
.body(BodyInserters.fromValue(s))
.exchange()
.flatMap(response -> {
if(response.statusCode().isError()) {
return Mono.just(new DownstreamResponse());
}
return response.bodyToMono(DownstreamResponse.class);
});
}).sequential();
}
public List<DownstreamResponse> updateValue (List<DownstreamRequest> reqList,String messageid) {
return getValues(reqList,messageid).collectList().**block()**;
}
该应用程序在过去一年左右的时间里一直运行良好。最近,我们看到一个问题,即一个或多个消费者似乎只是被默认预取 (250) 条未确认状态的消息卡住。解决此问题的唯一方法是重新启动应用程序。
我们最近没有做任何代码更改。此外,最近也没有任何基础设施变化。
发生这种情况时,我们进行了线程转储。观察到的模式是相似的。大多数消费者线程处于 TIMED_WAITING 状态,而一两个消费者处于 WAITING 状态,堆栈低于 -
"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-13" waiting for condition ...
java.lang.Thread.State: WAITING (parking)
- parking to wait for ......
at .......
at .......
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(......
at reactor.core.publisher.Mono.block(....
at .........WebClientServiceImpl.call(...
另见下文 -
"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-13" waiting for condition ...
java.lang.Thread.State: WAITING (parking)
- parking to wait for ......
at .......
at .......
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(......
at reactor.core.publisher.Mono.block(....
at .........WebClientServiceImpl.updateValue(...
不确定这个线程转储是否显示消费者线程实际上卡在了这里
“阻止”调用。
请帮助告知这里可能存在的问题以及需要采取哪些步骤来解决此问题。早些时候我们认为这可能是 rabbitmq/spring aqmp 的一些问题,但基于线程转储,看起来像是 webclient“块”调用的问题。
添加 Blockhound 时,它在日志文件中的堆栈跟踪下方打印 -
Error has been observed at following site(s)
Checkpoint Request to POST https://....... [DefaultWebClient]
Stack Trace:
at java.lang.Object.wait
......
at java.net.InetAddress.checkLookupTable
at java.net.InetAddress.getAddressFromNameService
......
at io.netty.util.internal.SocketUtils.run
......
at io.netty.resolver.DefaultNameResolver.doResolve
抱歉,刚刚发现并行 flux 调用中的 flatMap 实际上如下所示
.flatMap(response -> {
if(response.statusCode().isError()) {
return Mono.just(new DownstreamResponse());
}
return response.bodyToMono(DownstreamResponse.class);
});
所以在错误情况下,我认为底层连接没有被正确释放。当我像下面这样更新它时,它似乎已经解决了问题 -
.flatMap(response -> {
if(response.statusCode().isError()) {
response.releaseBody().thenReturn(Mono.just(new DownstreamResponse()));
}
return response.bodyToMono(DownstreamResponse.class);
});
我有一个 spring amqp 应用程序使用来自 rabbitmq 的消息的以下技术堆栈 -
Spring boot 2.2.6.RELEASE
Reactor Netty 0.9.12.RELEASE
Reactor Core 3.3.10.RELEASE
应用程序部署在 4 核 RHEL 上。
以下是用于 rabbitmq 的一些配置
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(<<HOST NAME>>);
cachingConnectionFactory.setUsername(<<USERNAME>>);
cachingConnectionFactory.setPassword(<<PASSWORD>>);
cachingConnectionFactory.setChannelCacheSize(50);
return cachingConnectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxConcurrentConsumers(50);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false); /** DLQ is in place **/
return factory;
}
消费者在同步模式下使用 spring webclient 进行下游 API 调用。下面是 Webclient
的配置@Bean
public WebClient webClient() {
ConnectionProvider connectionProvider = ConnectionProvider
.builder("fixed")
.lifo()
.pendingAcquireTimeout(Duration.ofMillis(200000))
.maxConnections(16)
.pendingAcquireMaxCount(3000)
.maxIdleTime(Duration.ofMillis(290000))
.build();
HttpClient client = HttpClient.create(connectionProvider);
client.tcpConfiguration(<<connection timeout, read timeout, write timeout is set here....>>);
Webclient.Builder builder =
Webclient.builder().baseUrl(<<base URL>>).clientConnector(new ReactorClientHttpConnector(client));
return builder.build();
}
此 Web 客户端已自动连接到 @Service class as
@Autowired
private Webclient webClient;
并在以下两个地方使用。第一名是一个电话-
public DownstreamStatusEnum downstream(String messageid, String payload, String contentType) {
return call(messageid,payload,contentType);
}
private DownstreamStatusEnum call(String messageid, String payload, String contentType) {
DownstreamResponse response = sendRequest(messageid,payload,contentType).**block()**;
return response;
}
private Mono<DownstreamResponse> sendRequest(String messageid, String payload, String contentType) {
return webClient
.method(POST)
.uri(<<URI>>)
.contentType(MediaType.valueOf(contentType))
.body(BodyInserters.fromValue(payload))
.exchange()
.flatMap(response -> response.bodyToMono(DownstreamResponse.class));
}
其他地方需要并行下游调用,已实现如下
private Flux<DownstreamResponse> getValues (List<DownstreamRequest> reqList, String messageid) {
return Flux
.fromIterable(reqList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(s -> {
return webClient
.method(POST)
.uri(<<downstream url>>)
.body(BodyInserters.fromValue(s))
.exchange()
.flatMap(response -> {
if(response.statusCode().isError()) {
return Mono.just(new DownstreamResponse());
}
return response.bodyToMono(DownstreamResponse.class);
});
}).sequential();
}
public List<DownstreamResponse> updateValue (List<DownstreamRequest> reqList,String messageid) {
return getValues(reqList,messageid).collectList().**block()**;
}
该应用程序在过去一年左右的时间里一直运行良好。最近,我们看到一个问题,即一个或多个消费者似乎只是被默认预取 (250) 条未确认状态的消息卡住。解决此问题的唯一方法是重新启动应用程序。
我们最近没有做任何代码更改。此外,最近也没有任何基础设施变化。
发生这种情况时,我们进行了线程转储。观察到的模式是相似的。大多数消费者线程处于 TIMED_WAITING 状态,而一两个消费者处于 WAITING 状态,堆栈低于 -
"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-13" waiting for condition ...
java.lang.Thread.State: WAITING (parking)
- parking to wait for ......
at .......
at .......
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(......
at reactor.core.publisher.Mono.block(....
at .........WebClientServiceImpl.call(...
另见下文 -
"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-13" waiting for condition ...
java.lang.Thread.State: WAITING (parking)
- parking to wait for ......
at .......
at .......
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(......
at reactor.core.publisher.Mono.block(....
at .........WebClientServiceImpl.updateValue(...
不确定这个线程转储是否显示消费者线程实际上卡在了这里 “阻止”调用。
请帮助告知这里可能存在的问题以及需要采取哪些步骤来解决此问题。早些时候我们认为这可能是 rabbitmq/spring aqmp 的一些问题,但基于线程转储,看起来像是 webclient“块”调用的问题。
添加 Blockhound 时,它在日志文件中的堆栈跟踪下方打印 -
Error has been observed at following site(s)
Checkpoint Request to POST https://....... [DefaultWebClient]
Stack Trace:
at java.lang.Object.wait
......
at java.net.InetAddress.checkLookupTable
at java.net.InetAddress.getAddressFromNameService
......
at io.netty.util.internal.SocketUtils.run
......
at io.netty.resolver.DefaultNameResolver.doResolve
抱歉,刚刚发现并行 flux 调用中的 flatMap 实际上如下所示
.flatMap(response -> {
if(response.statusCode().isError()) {
return Mono.just(new DownstreamResponse());
}
return response.bodyToMono(DownstreamResponse.class);
});
所以在错误情况下,我认为底层连接没有被正确释放。当我像下面这样更新它时,它似乎已经解决了问题 -
.flatMap(response -> {
if(response.statusCode().isError()) {
response.releaseBody().thenReturn(Mono.just(new DownstreamResponse()));
}
return response.bodyToMono(DownstreamResponse.class);
});