无法在kafka消费者下设置'max.poll.records',其中cons.poll仍然returns分区下的所有记录

unable to set 'max.poll.records' under kafka consumer, where cons.poll still returns all records under partition

我已经创建了 多线程消费者 应用程序来处理各种分区。 查看各种博客,我开始了解 'max.poll.records' 属性,以便控制给定主题、分区的记录集。(因此它可以快速脱离 Records 循环并因此调用 cons.poll() 保持活力)

问题是我的处理逻辑需要时间来处理每条记录。 启动 Cons-2 后,两者都开始在同一分区上工作,因为 Cons-1 仍然没有进行重新平衡(即 cons.poll() 尚未发生)。

增加消费者以便他们可以重新平衡他们自己,cons.poll() 除非所有记录都被处理,否则不会发生。

我可能不会选择 'session.timeout.ms',因为开始新的消费者也可能开始在与 Cons-1 相同的分区上工作。

我尝试使用 :

设置 属性
props.put("max.poll.records",1);
props.put("max.poll.records","1");

但都没有改变号码。来自投票的记录数。

我使用的是 Apache Kafka 9 及以下版本 API。

<dependency>
    <groupId>org.apache.servicemix.bundles</groupId>
    <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
    <version>0.9.0.1_1</version>
</dependency>

max.poll.records 属性 在 Kafka-0.10.0 中发布。它在 Kafka 0.9.0.1 版本中不可用。请参阅 release 注释中的 KAFKA-3007 任务。

如果您处理记录花费了很多时间,以下link可能会有所帮助。

AdvancedConsumer.java