Kafka commitAsync 使用提交顺序重试
Kafka commitAsync Retries with Commit Order
我正在通读 Kafka 权威指南,在关于消费者的章节中有关于 "Retrying Async Commits" 的简介:
A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.
作者的一个简单示例对于像我这样的笨蛋来说很棒。我特别不清楚上面加粗的部分。
谁能阐明这意味着什么,或者更好地提供一个玩具示例来证明这一点?
这就是我的想法,但谦虚一点我可能是错的
try {
AtomicInteger atomicInteger = new AtomicInteger(0);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
System.out.format("offset: %d\n", record.offset());
System.out.format("partition: %d\n", record.partition());
System.out.format("timestamp: %d\n", record.timestamp());
System.out.format("timeStampType: %s\n", record.timestampType());
System.out.format("topic: %s\n", record.topic());
System.out.format("key: %s\n", record.key());
System.out.format("value: %s\n", record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
private int marker = atomicInteger.incrementAndGet();
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
if (marker == atomicInteger.get()) consumer.commitAsync(this);
} else {
//Cant' try anymore
}
}
});
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.commitSync(); //Block
consumer.close();
System.out.println("Closed consumer and we are done");
}
我正在通读 Kafka 权威指南,在关于消费者的章节中有关于 "Retrying Async Commits" 的简介:
A simple pattern to get commit order right for asynchronous retries is to use a monotonically increasing sequence number. Increase the sequence number every time you commit and add the sequence number at the time of the commit to the commitAsync callback. When you're getting ready to send a retry, check if the commit sequence number the callback got is equal to the instance variable; if it is, there was no newer commit and it is safe to retry. If the instance sequence number is higher, don't retry because a newer commit was already sent.
作者的一个简单示例对于像我这样的笨蛋来说很棒。我特别不清楚上面加粗的部分。
谁能阐明这意味着什么,或者更好地提供一个玩具示例来证明这一点?
这就是我的想法,但谦虚一点我可能是错的
try {
AtomicInteger atomicInteger = new AtomicInteger(0);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
System.out.format("offset: %d\n", record.offset());
System.out.format("partition: %d\n", record.partition());
System.out.format("timestamp: %d\n", record.timestamp());
System.out.format("timeStampType: %s\n", record.timestampType());
System.out.format("topic: %s\n", record.topic());
System.out.format("key: %s\n", record.key());
System.out.format("value: %s\n", record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
private int marker = atomicInteger.incrementAndGet();
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
if (marker == atomicInteger.get()) consumer.commitAsync(this);
} else {
//Cant' try anymore
}
}
});
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.commitSync(); //Block
consumer.close();
System.out.println("Closed consumer and we are done");
}