Spring 集成聚合器仅发布一组来自 AMQP 支持的通道

Spring Integration aggregator only releases one group coming from AMQP backed channel

我的 Spring 启动应用程序出现问题,其中只有一个组在我的聚合器中处理,然后该应用程序停止使用队列中的更多消息。它似乎只在启动时处理一个组。我重新启动了应用程序,它处理了另一个组,但随后又停止了。

下面是我的流程。

return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, importQueueName).errorChannel(errorChannel))
                .split(userImportSplitter)
                .channel(Amqp.channel(connectionFactory)
                        .queueName(USER_QUEUE_NAME)
                        .prefetchCount(batchSize))
                .aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(500))
                .handle(userImporter)
                .get();

可能您的 userImportSplitter 产生了相同的 conrrelationId,因此在聚合器中只形成一个组,默认情况下它在发布或超时后不会被删除。

考虑使用这些选项:

.aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
                    .sendPartialResultOnExpiry(true)
                    .groupTimeout(500)
                    .expireGroupsUponCompletion(true)
                    .expireGroupsUponTimeout(true))