Spring Kaka - 从头开始寻找偏移量
Spring Kaka - Seek Offset from beginning
我想从Offset开始消费消息。为此,我在属性文件中添加了 属性 "seekToBeginning"=true 。我的 class 有 @KafkaListener 实现了 ConsumerSeekAware,我已经重写了 onPartitionsAssigned() 方法,如下所示。我想知道我的做法是否正确。此方法被调用 3 次(有 3 个分区)。另外,我担心的是当还有 CommitFailedException 时也会调用此方法。请让我知道以下是否正确,或者我应该按分区过滤以及如何过滤。另外请让我知道如何处理 CommitFailedException。
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
if (seekToBeginning)
{
assignments.forEach(
(topic, action) -> callback.seekToBeginning(topic.topic(), topic.partition()));
}
}```
如果并发数 = 3,那么,是的,它将被调用 3 次,每个消费者一次。
从2.3.4开始,有更方便的方法:
/**
* Queue a seekToBeginning operation to the consumer for each
* {@link TopicPartition}. The seek will occur after any pending offset commits.
* The consumer must be currently assigned the specified partition(s).
* @param partitions the {@link TopicPartition}s.
* @since 2.3.4
*/
default void seekToBeginning(Collection<TopicPartition> partitions) {
您需要一个布尔字段来仅在初始分配时而不是在重新平衡之后进行查找。
如果你只有一个消费者(concurrency = 1),它可以是一个简单的布尔值。
例如boolean initialSeeksDone
.
并发> 1,你需要ThreadLocal
:
ThreadLocal<Boolean> initialSeeksDone;
然后
if (this.initialSeeksDone.get() == null) {
//seek
this.initialSeeksDone.set(true);
}
我想从Offset开始消费消息。为此,我在属性文件中添加了 属性 "seekToBeginning"=true 。我的 class 有 @KafkaListener 实现了 ConsumerSeekAware,我已经重写了 onPartitionsAssigned() 方法,如下所示。我想知道我的做法是否正确。此方法被调用 3 次(有 3 个分区)。另外,我担心的是当还有 CommitFailedException 时也会调用此方法。请让我知道以下是否正确,或者我应该按分区过滤以及如何过滤。另外请让我知道如何处理 CommitFailedException。
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
if (seekToBeginning)
{
assignments.forEach(
(topic, action) -> callback.seekToBeginning(topic.topic(), topic.partition()));
}
}```
如果并发数 = 3,那么,是的,它将被调用 3 次,每个消费者一次。
从2.3.4开始,有更方便的方法:
/**
* Queue a seekToBeginning operation to the consumer for each
* {@link TopicPartition}. The seek will occur after any pending offset commits.
* The consumer must be currently assigned the specified partition(s).
* @param partitions the {@link TopicPartition}s.
* @since 2.3.4
*/
default void seekToBeginning(Collection<TopicPartition> partitions) {
您需要一个布尔字段来仅在初始分配时而不是在重新平衡之后进行查找。
如果你只有一个消费者(concurrency = 1),它可以是一个简单的布尔值。
例如boolean initialSeeksDone
.
并发> 1,你需要ThreadLocal
:
ThreadLocal<Boolean> initialSeeksDone;
然后
if (this.initialSeeksDone.get() == null) {
//seek
this.initialSeeksDone.set(true);
}