Consumer.poll() returns 新记录,即使没有提交偏移量?
Consumer.poll() returns new records even without committing offsets?
如果我有一个 enable.auto.commit=false
并且我调用 consumer.poll()
之后没有调用 consumer.commitAsync()
,为什么 consumer.poll()
return
下次调用时有新记录吗?
因为我没有提交我的偏移量,我希望 poll()
会 return 最新的偏移量,它应该再次是相同的记录。
我问是因为我在处理过程中试图处理失败情况。我希望在不提交偏移量的情况下,poll()
会再次 return 相同的记录,这样我就可以再次重新处理那些失败的记录。
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
轮询的起始偏移量不是由代理决定的,而是由消费者决定的。消费者跟踪最后收到的偏移量并在下一次轮询期间请求以下消息。
当消费者停止或失败时,偏移量提交开始发挥作用,而另一个不知道上次消费的偏移量的实例开始使用分区。
KafkaConsumer 有相当丰富的 Javadoc,值得一读。
如果消费者重新平衡(意味着如果任何消费者离开组或添加新消费者),消费者将从上次提交偏移量读取,因此处理重复数据删除不会直接在 kafka 中进行,因此您必须存储最后一个过程外部存储中的偏移量,当重新平衡发生或应用程序重新启动时,您应该寻找该偏移量并开始处理,或者您应该检查针对 DB 的消息中的某些唯一键以查找是否重复
我想分享一些代码,您可以如何在 Java 代码中解决这个问题。
方法是轮询记录,尝试处理它们,如果发生异常,则寻求主题分区的最小值。之后,您执行 commitAsync()
.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
List<ConsumerRecord<String, LogLine>> records = StreamSupport
.stream( consumer.poll(Long.MAX_VALUE).spliterator(), true )
.collect( Collectors.toList() );
boolean exceptionRaised = false;
for (ConsumerRecord<String, LogLine> record : records) {
try {
// process record
} catch (Exception e) {
exceptionRaised = true;
break;
}
}
if( exceptionRaised ) {
Map<TopicPartition, Long> offsetMinimumForTopicAndPartition = records
.stream()
.collect( Collectors.toMap( r -> new TopicPartition( r.topic(), r.partition() ),
ConsumerRecord::offset,
Math::min
) );
for( Map.Entry<TopicPartition, Long> entry : offsetMinimumForTopicAndPartition.entrySet() ) {
consumer.seek( entry.getKey(), entry.getValue() );
}
}
consumer.commitAsync();
}
}
}
使用此设置,您可以一次又一次地轮询消息,直到成功处理一次轮询的所有消息。
请注意,您的代码应该能够处理 a poison pill。否则,您的代码将陷入死循环。
如果我有一个 enable.auto.commit=false
并且我调用 consumer.poll()
之后没有调用 consumer.commitAsync()
,为什么 consumer.poll()
return
下次调用时有新记录吗?
因为我没有提交我的偏移量,我希望 poll()
会 return 最新的偏移量,它应该再次是相同的记录。
我问是因为我在处理过程中试图处理失败情况。我希望在不提交偏移量的情况下,poll()
会再次 return 相同的记录,这样我就可以再次重新处理那些失败的记录。
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
轮询的起始偏移量不是由代理决定的,而是由消费者决定的。消费者跟踪最后收到的偏移量并在下一次轮询期间请求以下消息。
当消费者停止或失败时,偏移量提交开始发挥作用,而另一个不知道上次消费的偏移量的实例开始使用分区。
KafkaConsumer 有相当丰富的 Javadoc,值得一读。
如果消费者重新平衡(意味着如果任何消费者离开组或添加新消费者),消费者将从上次提交偏移量读取,因此处理重复数据删除不会直接在 kafka 中进行,因此您必须存储最后一个过程外部存储中的偏移量,当重新平衡发生或应用程序重新启动时,您应该寻找该偏移量并开始处理,或者您应该检查针对 DB 的消息中的某些唯一键以查找是否重复
我想分享一些代码,您可以如何在 Java 代码中解决这个问题。
方法是轮询记录,尝试处理它们,如果发生异常,则寻求主题分区的最小值。之后,您执行 commitAsync()
.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
List<ConsumerRecord<String, LogLine>> records = StreamSupport
.stream( consumer.poll(Long.MAX_VALUE).spliterator(), true )
.collect( Collectors.toList() );
boolean exceptionRaised = false;
for (ConsumerRecord<String, LogLine> record : records) {
try {
// process record
} catch (Exception e) {
exceptionRaised = true;
break;
}
}
if( exceptionRaised ) {
Map<TopicPartition, Long> offsetMinimumForTopicAndPartition = records
.stream()
.collect( Collectors.toMap( r -> new TopicPartition( r.topic(), r.partition() ),
ConsumerRecord::offset,
Math::min
) );
for( Map.Entry<TopicPartition, Long> entry : offsetMinimumForTopicAndPartition.entrySet() ) {
consumer.seek( entry.getKey(), entry.getValue() );
}
}
consumer.commitAsync();
}
}
}
使用此设置,您可以一次又一次地轮询消息,直到成功处理一次轮询的所有消息。
请注意,您的代码应该能够处理 a poison pill。否则,您的代码将陷入死循环。