Spring 集成中的发布-订阅限时聚合

Time-limited aggregation with publish-subscribe in Spring Integration

我正在尝试使用 Spring 与 DSL 和 lambda 的集成来实现以下内容:

给定一条消息,将其发送给 N 消费者(通过 publish-subscribe)。等待有限的时间和 return 在此间隔期间从消费者 (<= N) 收到的所有结果。

这是我目前的示例配置:

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class ExampleConfiguration {

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedRate(1000).maxMessagesPerPoll(1).get();
    }

    @Bean
    public MessageChannel publishSubscribeChannel() {
        return MessageChannels.publishSubscribe(splitterExecutorService()).applySequence(true).get();
    }

    @Bean
    public ThreadPoolTaskExecutor splitterExecutorService() {
        final ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();

        executorService.setCorePoolSize(3);
        executorService.setMaxPoolSize(10);

        return executorService;
    }

    @Bean
    public DirectChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel requestChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel channel1() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel channel2() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel collectorChannel() {
        return new DirectChannel();
    }

    @Bean
    public TransformerChannel1 transformerChannel1() {
        return new TransformerChannel1();
    }

    @Bean
    public TransformerChannel2 transformerChannel2() {
        return new TransformerChannel2();
    }

    @Bean
    public IntegrationFlow errorFlow() {
        return IntegrationFlows.from(errorChannel())
                .handle(m -> System.err.println("[" + Thread.currentThread().getName() + "] " + m.getPayload()))
                .get();
    }

    @Bean
    public IntegrationFlow channel1Flow() {
        return IntegrationFlows.from(publishSubscribeChannel())
                .transform("1: "::concat)
                .transform(transformerChannel1())
                .channel(collectorChannel())
                .get();
    }

    @Bean
    public IntegrationFlow channel2Flow() {
        return IntegrationFlows.from(publishSubscribeChannel())
                .transform("2: "::concat)
                .transform(transformerChannel2())
                .channel(collectorChannel())
                .get();
    }

    @Bean
    public IntegrationFlow splitterFlow() {
        return IntegrationFlows.from(requestChannel())
                .channel(publishSubscribeChannel())
                .get();
    }

    @Bean
    public IntegrationFlow collectorFlow() {
        return IntegrationFlows.from(collectorChannel())
                .resequence(r -> r.releasePartialSequences(true),
                        null)
                .aggregate(a ->
                        a.sendPartialResultOnExpiry(true)
                                .groupTimeout(500)
                        , null)
                .get();
    }

}

TransformerChannel1TransformerChannel2 是示例消费者,仅通过睡眠来模拟延迟。

消息流为:

 splitterFlow -> channel1Flow \
              -> channel2Flow / -> collectorFlow

一切似乎都按预期工作,但我看到如下警告:

Reply message received but the receiving thread has already received a reply

这是意料之中的,因为部分结果是 returned。

问题:

是的,解决方案看起来不错。我想它适合 Scatter-Gather 模式。该实现自版本 4.1.

起提供

从另一方面来看,aggregator 自该版本以来也有更多选项 - expire-groups-upon-timeout,默认情况下聚合器为 true。使用此选项作为 false,您将能够实现丢弃所有这些迟到消息的要求。不幸的是 DSL 不支持它 yet。因此,即使您将项目升级为使用 Spring Integration 4.1.

也无济于事

那些 "Reply message received but the receiving thread has already received a reply" 的另一个选项是 spring.integraton.messagingTemplate.throwExceptionOnLateReply = true 选项,使用 jar 之一的 META-INF 中的 spring.integration.properties 文件。

无论如何,我认为 Scatter-Gather 是您用例的最佳解决方案。 您可以找到 here 如何从 JavaConfig 配置它。

更新

What about exceptions and error channel?

由于您已经与 throwExceptionOnLateReply 达成交易,我猜您是通过 @MessagingGatewayrequestChannel 发送了一条消息。最后一个有 errorChannel 选项。另一方面,PublishSubscribeChannelerrorHandler 选项,您可以使用 MessagePublishingErrorHandlererrorChannel 作为默认选项。

顺便说一句,不要忘记 Framework 为 LoggingHandler 提供了 errorChannel bean 和上面的 endpoint。所以,请想想,如果你真的需要覆盖那些东西。默认 errorChannelPublishSubscribeChannel,因此您可以简单地添加自己的订阅者。