在 Spring 云流中聚合消息

Aggregate Messages in Spring Cloud Stream

我是 spring 云的新手,希望将我们的单一结构更改为微服务,这首先说明了我现在想做的是以下内容

  1. 接收请求以从不同来源调用 Web 服务(外部系统)。在任何特定时间,这可以是 1 个请求或最多 100K 个请求。
  2. 外部系统支持批量发送,如果能聚合消息批量发送就更好了。例如,继续聚合直到达到数量阈值(100 条消息)或达到时间阈值 2 秒。
  3. 此外,如果我收到错误,我想以指数方式后退

我的第一个想法是在执行上述聚合的 Sink 之前创建一个 Processor。

这是云计算的正确思维方式还是另一种途径?


工作解决方案

@EnableBinding(Processor.class)
class Configuration {

    @Autowired
    Processor processor;


    @ServiceActivator(inputChannel = Processor.INPUT)
    @Bean
    public MessageHandler aggregator() {

        AggregatingMessageHandler aggregatingMessageHandler =
                new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                        new SimpleMessageStore(10));

        //AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        //aggregatorFactoryBean.setMessageStore();
        aggregatingMessageHandler.setOutputChannel(processor.output());
        //aggregatorFactoryBean.setDiscardChannel(processor.output());
        aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
        aggregatingMessageHandler.setSendTimeout(1000L);
        aggregatingMessageHandler.setCorrelationStrategy(new  ExpressionEvaluatingCorrelationStrategy("'FOO'"));
        aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
        aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
        aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
        aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
        return aggregatingMessageHandler;
    }
}

您可以编写 aggregator 处理器应用程序,将多条消息组合成一条消息。有关 Spring 集成聚合器的更多信息,请参阅 here