在 Spring 云流中聚合消息
Aggregate Messages in Spring Cloud Stream
我是 spring 云的新手,希望将我们的单一结构更改为微服务,这首先说明了我现在想做的是以下内容
- 接收请求以从不同来源调用 Web 服务(外部系统)。在任何特定时间,这可以是 1 个请求或最多 100K 个请求。
- 外部系统支持批量发送,如果能聚合消息批量发送就更好了。例如,继续聚合直到达到数量阈值(100 条消息)或达到时间阈值 2 秒。
- 此外,如果我收到错误,我想以指数方式后退
我的第一个想法是在执行上述聚合的 Sink 之前创建一个 Processor。
这是云计算的正确思维方式还是另一种途径?
工作解决方案
@EnableBinding(Processor.class)
class Configuration {
@Autowired
Processor processor;
@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
//AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("'FOO'"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
}
您可以编写 aggregator
处理器应用程序,将多条消息组合成一条消息。有关 Spring 集成聚合器的更多信息,请参阅 here
我是 spring 云的新手,希望将我们的单一结构更改为微服务,这首先说明了我现在想做的是以下内容
- 接收请求以从不同来源调用 Web 服务(外部系统)。在任何特定时间,这可以是 1 个请求或最多 100K 个请求。
- 外部系统支持批量发送,如果能聚合消息批量发送就更好了。例如,继续聚合直到达到数量阈值(100 条消息)或达到时间阈值 2 秒。
- 此外,如果我收到错误,我想以指数方式后退
我的第一个想法是在执行上述聚合的 Sink 之前创建一个 Processor。
这是云计算的正确思维方式还是另一种途径?
工作解决方案
@EnableBinding(Processor.class)
class Configuration {
@Autowired
Processor processor;
@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
//AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("'FOO'"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
}
您可以编写 aggregator
处理器应用程序,将多条消息组合成一条消息。有关 Spring 集成聚合器的更多信息,请参阅 here