spring kafka max.poll.interval.ms、max.poll.records 和 idleTimeBetweenPolls 的再平衡问题
Rebalance issue with spring kafka max.poll.interval.ms, max.poll.records and idleTimeBetweenPolls
我看到我的应用程序在不断重新平衡。我的应用程序是以批处理模式开发的,这里是已添加的配置属性。
myapp.consumer.group.id= cg-id-local
myapp.changefeed.topic= test_topic
myapp.auto.offset.reset=latest
myapp.enable.auto.commit=false
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 20000
myapp.idle.time.between.polls=240000
myapp.concurrency = 10
容器工厂:
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(poSummaryCGID));
factory.setConcurrency(poSummNoOfConsumers);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setIdleBetweenPolls(idleTimeBetweenPolls);
我在这里有几个问题:
我已将每次轮询(4 分钟)的最大记录数设置为 20000,并且我们在一个 TOPIC 中有 10 个分区。由于我将并发设置为 10,因此将有 10 个消费者启动 运行,每个消费者将监听 1 个分区。我的问题是,记录数是否会像每个消费者可以处理 2000 条记录一样分配给所有消费者?
max.poll.interval.ms 已设置 5 分钟。我确信消费者将在给定的轮询间隔(4 分钟)内处理 2000(如果我的上述理解是正确的)记录,该间隔小于具有上限的 max.poll.interval.ms。但不确定为什么会发生再平衡?我需要设置任何其他配置属性吗?
不胜感激!!
Tried with these configurations:
myapp.max.poll.interval.ms=600000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=360000
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=300000
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=180000
编辑修复:
我们应该永远
我的应用程序。max.poll.interval.ms >
(myapp.idle.time.between.polls + myapp.max.poll.records 处理时间)。
没有。 max.poll.records
每个消费者,而不是每个主题或容器。
如果并发性=10 且分区数为 10,则应将 max.poll.records
减少到 2000,以便每个消费者每次轮询最多获得 2000。
容器会自动减少轮询之间的空闲时间,这样就不会超过 max.poll.interval.ms
,但是你应该对这些属性(max.poll.records
和 max.poll.interval.ms
)保持保守,例如永远不可能超过间隔。
我看到我的应用程序在不断重新平衡。我的应用程序是以批处理模式开发的,这里是已添加的配置属性。
myapp.consumer.group.id= cg-id-local
myapp.changefeed.topic= test_topic
myapp.auto.offset.reset=latest
myapp.enable.auto.commit=false
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 20000
myapp.idle.time.between.polls=240000
myapp.concurrency = 10
容器工厂:
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(poSummaryCGID));
factory.setConcurrency(poSummNoOfConsumers);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setIdleBetweenPolls(idleTimeBetweenPolls);
我在这里有几个问题:
我已将每次轮询(4 分钟)的最大记录数设置为 20000,并且我们在一个 TOPIC 中有 10 个分区。由于我将并发设置为 10,因此将有 10 个消费者启动 运行,每个消费者将监听 1 个分区。我的问题是,记录数是否会像每个消费者可以处理 2000 条记录一样分配给所有消费者?
max.poll.interval.ms 已设置 5 分钟。我确信消费者将在给定的轮询间隔(4 分钟)内处理 2000(如果我的上述理解是正确的)记录,该间隔小于具有上限的 max.poll.interval.ms。但不确定为什么会发生再平衡?我需要设置任何其他配置属性吗?
不胜感激!!
Tried with these configurations:
myapp.max.poll.interval.ms=600000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=360000
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=300000
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=180000
编辑修复: 我们应该永远 我的应用程序。max.poll.interval.ms > (myapp.idle.time.between.polls + myapp.max.poll.records 处理时间)。
没有。 max.poll.records
每个消费者,而不是每个主题或容器。
如果并发性=10 且分区数为 10,则应将 max.poll.records
减少到 2000,以便每个消费者每次轮询最多获得 2000。
容器会自动减少轮询之间的空闲时间,这样就不会超过 max.poll.interval.ms
,但是你应该对这些属性(max.poll.records
和 max.poll.interval.ms
)保持保守,例如永远不可能超过间隔。