为什么在使用 Java DSL 时必须在入站 webflux 网关上使用 .flux Transform(of -> f)?

Why do I have to use .fluxTransform(f -> f) on an inbound webflux gateways when using Java DSL?

在 Spring 集成中使用 webflux 网关 Java DSL 时,我 运行 回复失败。它只适用于前几个请求(具体来说 <8),之后我收到回复错误:

org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
    at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:

当我在入站网关上使用.fluxTransform(f -> f)当我使用非反应性 http 出站网关,即使在具有数千个请求的 jmeter 基准测试中,我也没有收到错误。

场景
我已经使用四个网关创建了一个路由,用于相当复杂的设置以在远程计算机上发出 Web 请求,但我

集成流程 1:

inbound webflux gateway -> outbound jms gateway

  @Bean
  public IntegrationFlow step1() {
    // request-reply pattern using the jms outbound gateway
    var gateway = Jms.outboundGateway(jmsConnectionFactory)
        .requestDestination("inboundWebfluxQueue")
        .replyDestination("outboundWebfluxQueue")
        .correlationKey("JMSCorrelationID");

    // send a request to jms, wait for the reply and return message payload as response
    return IntegrationFlows.from(webfluxServer("/example/webflux"))
        // won't work consistently without the next line
        .fluxTransform(f -> f)
        .handle(gateway).get();
  }

集成流程 2:

inbound jms gateway -> outbound webflux gateway

  @Bean
  public IntegrationFlow step2_using_webflux() {
    var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
        .httpMethod(HttpMethod.GET)
        .expectedResponseType(String.class)
        // ignore headers
        .mappedResponseHeaders();

    return IntegrationFlows.from(jmsInboundGateway())
        // use webflux outbound gateway to send the request to the TEST_URL
        .handle(gateway).get();
  }

完整的路线如下所示:

client web request -> flow 1 -> (message broker) -> flow 2 -> server web request

另一种方法是使用 .channel(MessageChannels.flux()) 而不是 .fluxTransform(f -> f)。通过这种方式,我们真正为 WebFlux 容器带来了一个 back-pressure,使其在请求事件循环中等待可用槽。

我们只是发送到 JMS 队列 not-honoring back-pressure 并且另一端的 JMS 消费者无法跟上。另外,我们向同一个 Netty 服务器发送请求,内部再次为这些内部请求获取一个事件循环槽。

如果您有兴趣,我写了一个这样的单元测试来查看发生了什么:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {


    @Autowired
    private TestRestTemplate template;

    @Test
    void testSpringIntegrationWebFlux() {
        var executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(10);
        executor.afterPropertiesSet();

        var numberOfExecutions = new AtomicInteger();

        for (var i = 0; i < 100; i++) {
            executor.execute(() -> {
                var responseEntity = this.template.getForEntity("/example/webflux", String.class);
                if (responseEntity.getStatusCode().is2xxSuccessful()) {
                    numberOfExecutions.getAndIncrement();
                }
            });
        }

        executor.shutdown();

        assertThat(numberOfExecutions.get()).isEqualTo(100);
    }

}