为什么在使用 Java DSL 时必须在入站 webflux 网关上使用 .flux Transform(of -> f)?
Why do I have to use .fluxTransform(f -> f) on an inbound webflux gateways when using Java DSL?
在 Spring 集成中使用 webflux 网关 Java DSL 时,我 运行 回复失败。它只适用于前几个请求(具体来说 <8),之后我收到回复错误:
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
当我在入站网关上使用.fluxTransform(f -> f)
时或当我使用非反应性 http 出站网关,即使在具有数千个请求的 jmeter 基准测试中,我也没有收到错误。
- 为什么我必须在 第一个流程 中调用
fluxTransform(f -> f)
才能使其工作?
- 当我在 第二个流程 中使用
Http.outboundGateway
时,为什么没有 fluxTransform(f -> f)
也能正常工作?
场景
我已经使用四个网关创建了一个路由,用于相当复杂的设置以在远程计算机上发出 Web 请求,但我
集成流程 1:
inbound webflux gateway -> outbound jms gateway
@Bean
public IntegrationFlow step1() {
// request-reply pattern using the jms outbound gateway
var gateway = Jms.outboundGateway(jmsConnectionFactory)
.requestDestination("inboundWebfluxQueue")
.replyDestination("outboundWebfluxQueue")
.correlationKey("JMSCorrelationID");
// send a request to jms, wait for the reply and return message payload as response
return IntegrationFlows.from(webfluxServer("/example/webflux"))
// won't work consistently without the next line
.fluxTransform(f -> f)
.handle(gateway).get();
}
集成流程 2:
inbound jms gateway -> outbound webflux gateway
@Bean
public IntegrationFlow step2_using_webflux() {
var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
// ignore headers
.mappedResponseHeaders();
return IntegrationFlows.from(jmsInboundGateway())
// use webflux outbound gateway to send the request to the TEST_URL
.handle(gateway).get();
}
完整的路线如下所示:
client web request -> flow 1 -> (message broker) -> flow 2 -> server web request
另一种方法是使用 .channel(MessageChannels.flux())
而不是 .fluxTransform(f -> f)
。通过这种方式,我们真正为 WebFlux 容器带来了一个 back-pressure,使其在请求事件循环中等待可用槽。
我们只是发送到 JMS 队列 not-honoring back-pressure 并且另一端的 JMS 消费者无法跟上。另外,我们向同一个 Netty 服务器发送请求,内部再次为这些内部请求获取一个事件循环槽。
如果您有兴趣,我写了一个这样的单元测试来查看发生了什么:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {
@Autowired
private TestRestTemplate template;
@Test
void testSpringIntegrationWebFlux() {
var executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.afterPropertiesSet();
var numberOfExecutions = new AtomicInteger();
for (var i = 0; i < 100; i++) {
executor.execute(() -> {
var responseEntity = this.template.getForEntity("/example/webflux", String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
numberOfExecutions.getAndIncrement();
}
});
}
executor.shutdown();
assertThat(numberOfExecutions.get()).isEqualTo(100);
}
}
在 Spring 集成中使用 webflux 网关 Java DSL 时,我 运行 回复失败。它只适用于前几个请求(具体来说 <8),之后我收到回复错误:
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
当我在入站网关上使用.fluxTransform(f -> f)
时或当我使用非反应性 http 出站网关,即使在具有数千个请求的 jmeter 基准测试中,我也没有收到错误。
- 为什么我必须在 第一个流程 中调用
fluxTransform(f -> f)
才能使其工作? - 当我在 第二个流程 中使用
Http.outboundGateway
时,为什么没有fluxTransform(f -> f)
也能正常工作?
场景
我已经使用四个网关创建了一个路由,用于相当复杂的设置以在远程计算机上发出 Web 请求,但我
集成流程 1:
inbound webflux gateway -> outbound jms gateway
@Bean
public IntegrationFlow step1() {
// request-reply pattern using the jms outbound gateway
var gateway = Jms.outboundGateway(jmsConnectionFactory)
.requestDestination("inboundWebfluxQueue")
.replyDestination("outboundWebfluxQueue")
.correlationKey("JMSCorrelationID");
// send a request to jms, wait for the reply and return message payload as response
return IntegrationFlows.from(webfluxServer("/example/webflux"))
// won't work consistently without the next line
.fluxTransform(f -> f)
.handle(gateway).get();
}
集成流程 2:
inbound jms gateway -> outbound webflux gateway
@Bean
public IntegrationFlow step2_using_webflux() {
var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
// ignore headers
.mappedResponseHeaders();
return IntegrationFlows.from(jmsInboundGateway())
// use webflux outbound gateway to send the request to the TEST_URL
.handle(gateway).get();
}
完整的路线如下所示:
client web request -> flow 1 -> (message broker) -> flow 2 -> server web request
另一种方法是使用 .channel(MessageChannels.flux())
而不是 .fluxTransform(f -> f)
。通过这种方式,我们真正为 WebFlux 容器带来了一个 back-pressure,使其在请求事件循环中等待可用槽。
我们只是发送到 JMS 队列 not-honoring back-pressure 并且另一端的 JMS 消费者无法跟上。另外,我们向同一个 Netty 服务器发送请求,内部再次为这些内部请求获取一个事件循环槽。
如果您有兴趣,我写了一个这样的单元测试来查看发生了什么:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {
@Autowired
private TestRestTemplate template;
@Test
void testSpringIntegrationWebFlux() {
var executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.afterPropertiesSet();
var numberOfExecutions = new AtomicInteger();
for (var i = 0; i < 100; i++) {
executor.execute(() -> {
var responseEntity = this.template.getForEntity("/example/webflux", String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
numberOfExecutions.getAndIncrement();
}
});
}
executor.shutdown();
assertThat(numberOfExecutions.get()).isEqualTo(100);
}
}