Spring 集成 - 循环发布消息(批量)
Spring integration - relase messages in a loop (batches)
我需要能够从 rabbitmq 接收消息,进行一些转换(从 1 条输入消息,我创建 1000 条消息)然后按以下方式处理这 1000 条消息:我以 10 条为一组推送消息,然后休眠5 秒。
您可以查看下面的代码,我需要的帮助是最后一步 - 如何以这种方式进行消息批处理?
@Bean
public IntegrationFlow refreshFlow() {
return IntegrationFlows
//get messages from rabbitmq
.from(refreshInboundAdapter())
//convert to POJO
.transform(new JsonToObjectTransformer(RefreshRequest.class))
//make 1 -> 1000 messages (but release in batches of 10, not all)
.<RefreshRequest, List<ElasticMatch>>transform(m -> componentConfig.matchRefreshService().processRequest(m))
//HERE WAIT 5 seconds and forward to rabbit in batches of 10
.handle(refreshOutboundEndpoint())
.get();
}
不确定您所说的 "wait 5 seconds" 是什么意思,但是 "release in batches of 10" 正是聚合器的任务。你需要有一些人为的[=10=],配置expireGroupsUponCompletion = true
和MessageCountReleaseStrategy
.
有关详细信息,请参阅 Reference Manual。
我需要能够从 rabbitmq 接收消息,进行一些转换(从 1 条输入消息,我创建 1000 条消息)然后按以下方式处理这 1000 条消息:我以 10 条为一组推送消息,然后休眠5 秒。
您可以查看下面的代码,我需要的帮助是最后一步 - 如何以这种方式进行消息批处理?
@Bean
public IntegrationFlow refreshFlow() {
return IntegrationFlows
//get messages from rabbitmq
.from(refreshInboundAdapter())
//convert to POJO
.transform(new JsonToObjectTransformer(RefreshRequest.class))
//make 1 -> 1000 messages (but release in batches of 10, not all)
.<RefreshRequest, List<ElasticMatch>>transform(m -> componentConfig.matchRefreshService().processRequest(m))
//HERE WAIT 5 seconds and forward to rabbit in batches of 10
.handle(refreshOutboundEndpoint())
.get();
}
不确定您所说的 "wait 5 seconds" 是什么意思,但是 "release in batches of 10" 正是聚合器的任务。你需要有一些人为的[=10=],配置expireGroupsUponCompletion = true
和MessageCountReleaseStrategy
.
有关详细信息,请参阅 Reference Manual。