当消息数大于并发消费者数时,如何在 Spring IntegrationFlow 中消费所有需要的消息?
How to consume all messages required in a Spring IntegrationFlow when message count is greater than the number of concurrent consumers?
我有一个这样定义的集成流程:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq iterator().next().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
其目的是将在 "batch" 中发送的几条相关消息分组。这是一个例子:
// Message 1
{ "name": "Message1",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 1,
.... }
// Message 2
{ "name": "Message2",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 2,
.... }
由于 所述的原因,我们正在对 RabbitMQ 使用手动 ack:ing 以避免丢失消息。
集成流程适用于大小为 2 的批次,但一旦批次中的消息超过 2 条,我们 运行 就会遇到麻烦:
[my-service] 2017-12-04 17:46:07.966 INFO 1 --- [ask-scheduler-5] x.y.EntityUpdater : Will update entity [entitId] from messages: Message1, Message2
[my-service] 2017-12-04 17:46:09.976 INFO 1 --- [ask-scheduler-3] x.y.EntityUpdater : Will update entity [entitId] from messages: Message3
请注意,记录消息之间的时间大约为 2 秒(即我们配置为 groupTimeout
的时间)。
我怀疑这是因为 Spring 消耗了 2 条消息(不是 ack:ed 自动)然后聚合等待第 3 条消息(因为 batchSize
是3 在这种情况下)。但是这个消息永远不会在 2 秒内被消费 window 因为只有两个并发消费者。
将 concurrentConsumers
计数增加到 3 解决了 这个 特定问题。问题是我们不知道我们收到的批次的大小,它们可能非常大,可能有 50 个左右。这意味着简单地增加 concurrentConsumers
不是一个可行的选择。
在 Spring 中处理此问题的适当方法是什么?
正如我在 ...
中所讨论的
使用此模式时,concurrency * prefetch
必须足够大以包含所有未完成批次的消息。
出于这个原因,我不赞成使用该模式,除非你有相当可预测的数据。
我有一个这样定义的集成流程:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq iterator().next().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
其目的是将在 "batch" 中发送的几条相关消息分组。这是一个例子:
// Message 1
{ "name": "Message1",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 1,
.... }
// Message 2
{ "name": "Message2",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 2,
.... }
由于
集成流程适用于大小为 2 的批次,但一旦批次中的消息超过 2 条,我们 运行 就会遇到麻烦:
[my-service] 2017-12-04 17:46:07.966 INFO 1 --- [ask-scheduler-5] x.y.EntityUpdater : Will update entity [entitId] from messages: Message1, Message2
[my-service] 2017-12-04 17:46:09.976 INFO 1 --- [ask-scheduler-3] x.y.EntityUpdater : Will update entity [entitId] from messages: Message3
请注意,记录消息之间的时间大约为 2 秒(即我们配置为 groupTimeout
的时间)。
我怀疑这是因为 Spring 消耗了 2 条消息(不是 ack:ed 自动)然后聚合等待第 3 条消息(因为 batchSize
是3 在这种情况下)。但是这个消息永远不会在 2 秒内被消费 window 因为只有两个并发消费者。
将 concurrentConsumers
计数增加到 3 解决了 这个 特定问题。问题是我们不知道我们收到的批次的大小,它们可能非常大,可能有 50 个左右。这意味着简单地增加 concurrentConsumers
不是一个可行的选择。
在 Spring 中处理此问题的适当方法是什么?
正如我在
使用此模式时,concurrency * prefetch
必须足够大以包含所有未完成批次的消息。
出于这个原因,我不赞成使用该模式,除非你有相当可预测的数据。