Spring 集成 - 循环发布消息(批量)

Spring integration - relase messages in a loop (batches)

我需要能够从 rabbitmq 接收消息,进行一些转换(从 1 条输入消息,我创建 1000 条消息)然后按以下方式处理这 1000 条消息:我以 10 条为一组推送消息,然后休眠5 秒。

您可以查看下面的代码,我需要的帮助是最后一步 - 如何以这种方式进行消息批处理?

@Bean
    public IntegrationFlow refreshFlow() {
        return IntegrationFlows
                //get messages from rabbitmq
                .from(refreshInboundAdapter())
                //convert to POJO
                .transform(new JsonToObjectTransformer(RefreshRequest.class))
                //make 1 -> 1000 messages (but release in batches of 10, not all)
                .<RefreshRequest, List<ElasticMatch>>transform(m -> componentConfig.matchRefreshService().processRequest(m))
                //HERE WAIT 5 seconds and forward to rabbit in batches of 10
                .handle(refreshOutboundEndpoint())
                .get();
    }

不确定您所说的 "wait 5 seconds" 是什么意思,但是 "release in batches of 10" 正是聚合器的任务。你需要有一些人为的[​​=10=],配置expireGroupsUponCompletion = trueMessageCountReleaseStrategy.

有关详细信息,请参阅 Reference Manual