Spring 集成聚合器仅发布一组来自 AMQP 支持的通道
Spring Integration aggregator only releases one group coming from AMQP backed channel
我的 Spring 启动应用程序出现问题,其中只有一个组在我的聚合器中处理,然后该应用程序停止使用队列中的更多消息。它似乎只在启动时处理一个组。我重新启动了应用程序,它处理了另一个组,但随后又停止了。
下面是我的流程。
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, importQueueName).errorChannel(errorChannel))
.split(userImportSplitter)
.channel(Amqp.channel(connectionFactory)
.queueName(USER_QUEUE_NAME)
.prefetchCount(batchSize))
.aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
.sendPartialResultOnExpiry(true)
.groupTimeout(500))
.handle(userImporter)
.get();
可能您的 userImportSplitter
产生了相同的 conrrelationId
,因此在聚合器中只形成一个组,默认情况下它在发布或超时后不会被删除。
考虑使用这些选项:
.aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
.sendPartialResultOnExpiry(true)
.groupTimeout(500)
.expireGroupsUponCompletion(true)
.expireGroupsUponTimeout(true))
我的 Spring 启动应用程序出现问题,其中只有一个组在我的聚合器中处理,然后该应用程序停止使用队列中的更多消息。它似乎只在启动时处理一个组。我重新启动了应用程序,它处理了另一个组,但随后又停止了。
下面是我的流程。
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, importQueueName).errorChannel(errorChannel))
.split(userImportSplitter)
.channel(Amqp.channel(connectionFactory)
.queueName(USER_QUEUE_NAME)
.prefetchCount(batchSize))
.aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
.sendPartialResultOnExpiry(true)
.groupTimeout(500))
.handle(userImporter)
.get();
可能您的 userImportSplitter
产生了相同的 conrrelationId
,因此在聚合器中只形成一个组,默认情况下它在发布或超时后不会被删除。
考虑使用这些选项:
.aggregate(a -> a.releaseStrategy(g -> g.size() >= batchSize)
.sendPartialResultOnExpiry(true)
.groupTimeout(500)
.expireGroupsUponCompletion(true)
.expireGroupsUponTimeout(true))