Apache Kafka:0.10 版本中的 Exactly Once
Apache Kafka: Exactly Once in Version 0.10
为了实现 Kafka 消费者对消息的恰好一次处理,我一次提交一条消息,如下所示
public void commitOneRecordConsumer(long seconds) {
KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
try {
for (ConsumerRecord<String, String> record : records) {
processingService.process(record);
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));
System.out.println("Committed Offset" + ": " + record.offset());
}
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
}
以上代码将消息的处理异步委托给下面的另一个class。
@Service
public class ProcessingService {
@Async
public void process(ConsumerRecord<String, String> record) throws InterruptedException {
Thread.sleep(5000L);
Map<String, Object> map = new HashMap<>();
map.put("partition", record.partition());
map.put("offset", record.offset());
map.put("value", record.value());
System.out.println("Processed" + ": " + map);
}
}
但是,这仍然不能保证exactly-once delivery,因为如果处理失败,它可能还会提交其他消息,而之前的消息将永远不会被处理和提交,我的选择是什么?
0.10.2 及更早版本的原始答案(对于 0.11 及更高版本,请参阅下面的答案)
目前,Kafka 无法提供开箱即用的 exactly-once 处理。如果在成功处理消息后提交消息,则可以进行至少一次处理,或者如果在开始处理之前 poll()
之后直接提交消息,则可以进行最多一次处理。
(另请参阅 http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-commits 中的 段“交付保证”)
但是,如果您的处理是幂等的,则至少一次保证“足够好”,即,即使您处理一条记录两次,最终结果也是相同的。幂等处理的示例是将消息添加到键值存储。即使您两次添加相同的记录,第二次插入也只会替换第一个当前键值对,并且 KV 存储中仍然会有正确的数据。
In your example code above, you update a HashMap
and this would be an idempotent operation. Even if your might have an inconsistent state in case of failure if for example only two put
calls are executed before the crash. However, this inconsistent state would be fixed on reprocessing the same record again.
The call to println()
is not idempotent though because this is an operation with "side effect". But I guess the print is for debugging purpose only.
作为替代方案,您需要在用户代码中实现事务语义,这需要在失败时“撤消”(部分执行)操作。总的来说,这是一道难题。
Apache Kafka 0.11+ 更新(对于 0.11 之前的版本,请参见上面的回答)
从 0.11 开始,Apache Kafka 支持使用 Kafka Streams 的幂等生产者、事务性生产者和恰好一次处理。它还向消费者添加了一个 "read_committed"
模式以仅读取提交的消息(以及 drop/filter 中止的消息)。
Apache Kafka 0.11.0.0刚刚发布,现在支持exactly once delivery
http://kafka.apache.org/documentation/#upgrade_11_exactly_once_semantics
我认为用kafka 0.10.x本身可以实现exactly once处理。但是有一些陷阱。我正在分享来自 this book. Relevant contents can be found in section: Seek and Exactly Once Processing
in chapter 4: Kafka Consumers - Reading Data from Kafka
. You can view the contents of that book with a (free) safaribooksonline 帐户的高级想法,或者一旦它出来就买它,或者可能从 其他 来源获得它,我们不会谈论这些。
想法:
想想这个常见的场景:您的应用程序从 Kafka 读取事件,处理数据,然后将结果存储在数据库中。假设我们真的不想丢失任何数据,也不想在数据库中存储两次相同的结果。
如果有一种方法可以在一个原子操作中同时存储记录和偏移量,这是可行的。记录和偏移量都已提交,或者两者均未提交。
为此,我们需要在一个事务中将记录和偏移量写入数据库。然后我们会知道我们是否完成了记录并且提交了偏移量或者我们没有,并且记录将被重新处理。
现在唯一的问题是:如果记录存储在数据库中而不是Kafka中,我们的消费者在分配分区时如何知道从哪里开始读取?这正是 seek()
的用途。当消费者启动或分配新分区时,它可以在数据库中查找偏移量并 seek()
到该位置。
书中的示例代码:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
commitDBTransaction();
}
为了实现 Kafka 消费者对消息的恰好一次处理,我一次提交一条消息,如下所示
public void commitOneRecordConsumer(long seconds) {
KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
try {
for (ConsumerRecord<String, String> record : records) {
processingService.process(record);
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));
System.out.println("Committed Offset" + ": " + record.offset());
}
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
}
以上代码将消息的处理异步委托给下面的另一个class。
@Service
public class ProcessingService {
@Async
public void process(ConsumerRecord<String, String> record) throws InterruptedException {
Thread.sleep(5000L);
Map<String, Object> map = new HashMap<>();
map.put("partition", record.partition());
map.put("offset", record.offset());
map.put("value", record.value());
System.out.println("Processed" + ": " + map);
}
}
但是,这仍然不能保证exactly-once delivery,因为如果处理失败,它可能还会提交其他消息,而之前的消息将永远不会被处理和提交,我的选择是什么?
0.10.2 及更早版本的原始答案(对于 0.11 及更高版本,请参阅下面的答案)
目前,Kafka 无法提供开箱即用的 exactly-once 处理。如果在成功处理消息后提交消息,则可以进行至少一次处理,或者如果在开始处理之前 poll()
之后直接提交消息,则可以进行最多一次处理。
(另请参阅 http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-commits 中的 段“交付保证”)
但是,如果您的处理是幂等的,则至少一次保证“足够好”,即,即使您处理一条记录两次,最终结果也是相同的。幂等处理的示例是将消息添加到键值存储。即使您两次添加相同的记录,第二次插入也只会替换第一个当前键值对,并且 KV 存储中仍然会有正确的数据。
In your example code above, you update a
HashMap
and this would be an idempotent operation. Even if your might have an inconsistent state in case of failure if for example only twoput
calls are executed before the crash. However, this inconsistent state would be fixed on reprocessing the same record again.The call to
println()
is not idempotent though because this is an operation with "side effect". But I guess the print is for debugging purpose only.
作为替代方案,您需要在用户代码中实现事务语义,这需要在失败时“撤消”(部分执行)操作。总的来说,这是一道难题。
Apache Kafka 0.11+ 更新(对于 0.11 之前的版本,请参见上面的回答)
从 0.11 开始,Apache Kafka 支持使用 Kafka Streams 的幂等生产者、事务性生产者和恰好一次处理。它还向消费者添加了一个 "read_committed"
模式以仅读取提交的消息(以及 drop/filter 中止的消息)。
Apache Kafka 0.11.0.0刚刚发布,现在支持exactly once delivery
http://kafka.apache.org/documentation/#upgrade_11_exactly_once_semantics
我认为用kafka 0.10.x本身可以实现exactly once处理。但是有一些陷阱。我正在分享来自 this book. Relevant contents can be found in section: Seek and Exactly Once Processing
in chapter 4: Kafka Consumers - Reading Data from Kafka
. You can view the contents of that book with a (free) safaribooksonline 帐户的高级想法,或者一旦它出来就买它,或者可能从 其他 来源获得它,我们不会谈论这些。
想法:
想想这个常见的场景:您的应用程序从 Kafka 读取事件,处理数据,然后将结果存储在数据库中。假设我们真的不想丢失任何数据,也不想在数据库中存储两次相同的结果。
如果有一种方法可以在一个原子操作中同时存储记录和偏移量,这是可行的。记录和偏移量都已提交,或者两者均未提交。 为此,我们需要在一个事务中将记录和偏移量写入数据库。然后我们会知道我们是否完成了记录并且提交了偏移量或者我们没有,并且记录将被重新处理。
现在唯一的问题是:如果记录存储在数据库中而不是Kafka中,我们的消费者在分配分区时如何知道从哪里开始读取?这正是 seek()
的用途。当消费者启动或分配新分区时,它可以在数据库中查找偏移量并 seek()
到该位置。
书中的示例代码:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
commitDBTransaction();
}