Consumer.poll() returns 新记录,即使没有提交偏移量?

Consumer.poll() returns new records even without committing offsets?

如果我有一个 enable.auto.commit=false 并且我调用 consumer.poll() 之后没有调用 consumer.commitAsync(),为什么 consumer.poll() return 下次调用时有新记录吗?

因为我没有提交我的偏移量,我希望 poll() 会 return 最新的偏移量,它应该再次是相同的记录。

我问是因为我在处理过程中试图处理失败情况。我希望在不提交偏移量的情况下,poll() 会再次 return 相同的记录,这样我就可以再次重新处理那些失败的记录。

public class MyConsumer implements Runnable {
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord record : records) {
                try {
                   //process record
                   consumer.commitAsync();
                } catch (Exception e) {
                }
                /**
                If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception. 
                **/
            }

        }
    }
}

轮询的起始偏移量不是由代理决定的,而是由消费者决定的。消费者跟踪最后收到的偏移量并在下一次轮询期间请求以下消息。

当消费者停止或失败时,偏移量提交开始发挥作用,而另一个不知道上次消费的偏移量的实例开始使用分区。

KafkaConsumer 有相当丰富的 Javadoc,值得一读。

如果消费者重新平衡(意味着如果任何消费者离开组或添加新消费者),消费者将从上次提交偏移量读取,因此处理重复数据删除不会直接在 kafka 中进行,因此您必须存储最后一个过程外部存储中的偏移量,当重新平衡发生或应用程序重新启动时,您应该寻找该偏移量并开始处理,或者您应该检查针对 DB 的消息中的某些唯一键以查找是否重复

我想分享一些代码,您可以如何在 Java 代码中解决这个问题。

方法是轮询记录,尝试处理它们,如果发生异常,则寻求主题分区的最小值。之后,您执行 commitAsync().

public class MyConsumer implements Runnable {
    @Override
    public void run() {
        while (true) {
            List<ConsumerRecord<String, LogLine>> records = StreamSupport
                .stream( consumer.poll(Long.MAX_VALUE).spliterator(), true )
                .collect( Collectors.toList() );

            boolean exceptionRaised = false;
            for (ConsumerRecord<String, LogLine> record : records) {
                try {
                    // process record
                } catch (Exception e) {
                    exceptionRaised = true;
                    break;
                }
            }

            if( exceptionRaised ) {
                Map<TopicPartition, Long> offsetMinimumForTopicAndPartition = records
                    .stream()
                    .collect( Collectors.toMap( r -> new TopicPartition( r.topic(), r.partition() ),
                        ConsumerRecord::offset,
                        Math::min
                    ) );

                for( Map.Entry<TopicPartition, Long> entry : offsetMinimumForTopicAndPartition.entrySet() ) {
                    consumer.seek( entry.getKey(), entry.getValue() );
                }
            }

            consumer.commitAsync();
        }
    }
}

使用此设置,您可以一次又一次地轮询消息,直到成功处理一次轮询的所有消息。

请注意,您的代码应该能够处理 a poison pill。否则,您的代码将陷入死循环。