Spring-Integration-DSL:嵌套的 Scatter Gather 挂起

Spring-Integration-DSL: Nested Scatter Gather hangs

这是一个损坏但可执行的示例代码:

@Bean
IntegrationFlow testFlow() {
    return IntegrationFlows
            .from(Http.inboundChannelAdapter("test")
                    .requestMapping(mapping -> mapping.methods(HttpMethod.GET))
                    .get())
            .scatterGather(
                    scatterer -> scatterer
                            .applySequence(true)
                            .recipientFlow(flow -> flow
                                    .scatterGather(
                                            scatterer1 -> scatterer1
                                                    .applySequence(true)
                                                    .recipientFlow(IntegrationFlowDefinition::bridge),
                                            gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
                                    .log(INFO, m -> "THIS HAPPENS")),
                    gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
            .log(INFO, m -> "THIS NEVER HAPPENS")
            .get();
}

预期输出为:

THIS HAPPENS
THIS NEVER HAPPENS

实际输出为:

THIS HAPPENS

我在 Github 上找到了 this identical looking issue,但它声称它已在 5.1.105.2.4 版本中修复。我是 运行 spring-boot-starter-integration 5.5.0 其中包括相同版本的 spring-integration-core.

我该怎么做才能让这个嵌套的分散聚集工作?是 DSL 错误还是我的代码错误?

在仅使用一级分散-聚集遇到类似问题后,我意识到是日志消息阻止了输出返回到父流。将 .log() 替换为 .logAndReply().log().bridge() 一切都应该再次运行。

像这样:

@Bean
IntegrationFlow testFlow() {
    return IntegrationFlows
            .from(Http.inboundChannelAdapter("test")
                    .requestMapping(mapping -> mapping.methods(HttpMethod.GET))
                    .get())
            .scatterGather(
                    scatterer -> scatterer
                            .applySequence(true)
                            .recipientFlow(flow -> flow
                                    .scatterGather(
                                            scatterer1 -> scatterer1
                                                    .applySequence(true)
                                                    .recipientFlow(IntegrationFlowDefinition::bridge),
                                            gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
                                    .logAndReply(INFO, m -> "THIS HAPPENS")), // this fixes the problem
                    gatherer -> gatherer.outputProcessor(MessageGroup::getOne))
            .log(INFO, m -> "THIS NEVER HAPPENS")
            .get();
}