使用动态组重新平衡时如何指定Kafka消费者的起始位置?

How to specify starting position for Kafka consumers when using dynamic group re-balancing?

是否可以从指定的偏移量开始 0.9 或 0.10 个 Kafka 消费者,同时仍然使用具有动态重新平衡的消费者组?

以下是目前发现的内容:

案例 1:如果我们使用 consumer.assign(…) 方法手动将分区分配给消费者 - 我们可以执行以下所有操作:

consumer.seek(<specificPartition>, <myCustomOffset>); or:
consumer.seekToBeginning(<specificPartition>);
consumer.seekToEnd(<specificPartition>);

基本上,我们可以完全控制启动消费者表单的位置,但这是以没有由 Kafka 动态完成分区重新分配为代价的

情况 2:如果我们使用 consumer.subscribe(…) 方法 - Kafka 将管理重新平衡,但是,我们不能执行上述三个选项中的任何一个…:( 因此,我们尝试了以下方法来“破解”它——在消费者启动时,进入 poll() 循环之前:

// get coordinator from the private field of the consumer:
ConsumerCoordinator coordinator = (ConsumerCoordinator) FieldUtils.readField(consumer, "coordinator", true);
// make sure all partitions are already 
coordinator.ensurePartitionAssignment();
// get the list of partitions assigned to this specific consumer:
Set<TopicPartition> assignedTopicPartitions = consumer.assignment()
// now we can go ahead and do the same three actions (seek(), sequined() or seekToBeginning()) on those partitions only for this consumer as above.
for (TopicPartition assignedPartition: assignedTopicPartitions) {
     consumer.seek(<assignedPartition>, <myCustomOffset>) // or whatever
...
}
// now start the poll() loop:
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs);
    for (ConsumerRecord<String, String> record : records) {
         // processMessage(record.value(), record.offset());
    }
}

这对我来说太老套了,而且,我不确定这种逻辑在实际的重新平衡过程中是否会成立,比如说,当新的消费者被添加到组中时。

有人可以验证这种方法或提出更好的方法来完成我们需要的吗?

谢谢!

除了使用 ConsumerCoordinator,您还可以只执行初始 poll()(并且不处理任何内容)来分配分区。然后,使用 seek() 并启动轮询循环,如代码所示。