避免 apache kafka 消费者中重复消息的有效策略
Effective strategy to avoid duplicate messages in apache kafka consumer
学习apache kafka一个月了。但是,我现在陷入了困境。我的用例是,我在不同的机器上有两个或多个消费者进程 运行。我 运行 进行了一些测试,其中我在 kafka 服务器中发布了 10,000 条消息。然后在处理这些消息时,我杀死了一个消费者进程并重新启动它。消费者在文件中写入处理过的消息。所以在消费完成后,文件显示超过 10k 条消息。所以有些消息是重复的。
在消费者进程中我禁用了自动提交。消费者手动批量提交偏移量。因此,例如,如果将 100 条消息写入文件,则消费者会提交偏移量。当单个消费者进程为 运行 并且它崩溃并恢复时,以这种方式避免了重复。但是,当多个消费者 运行 而其中一个崩溃并恢复时,它会将重复的消息写入文件。
有什么有效的策略可以避免这些重复的消息吗?
简短的回答是,不。
您正在寻找的是恰好一次处理。虽然它通常看起来可行,但永远不要依赖它,因为总是有警告。
即使为了尝试防止重复,您也需要使用简单使用者。这种方法的工作原理是针对每个消费者,当从某个分区消费消息时,将消费消息的分区和偏移量写入磁盘。当消费者失败后重启时,从磁盘中读取每个分区最后消费的偏移量。
但即使使用这种模式,消费者也不能保证它不会在失败后重新处理消息。如果消费者消费了一条消息,然后在偏移量被刷新到磁盘之前失败了怎么办?如果在处理消息之前写入磁盘,如果在实际处理消息之前写入偏移量然后失败怎么办?即使您在每条消息后向 ZooKeeper 提交偏移量,也会存在同样的问题。
不过,在某些情况下,
exactly-once 处理更容易实现,但仅适用于某些用例。这只需要将您的偏移量存储在与单元应用程序输出相同的位置。例如,如果您编写一个对消息进行计数的消费者,通过将最后一次计数的偏移量与每个计数一起存储,您可以保证偏移量与消费者的状态同时存储。当然,为了保证恰好一次处理,这将要求您恰好消费一条消息并为每条消息恰好更新一次状态,这对于大多数 Kafka 消费者应用程序来说是完全不切实际的。由于性能原因,Kafka 本质上会批量使用消息。
通常情况下,如果您将应用程序简单地设计为幂等的,您的时间将得到更好的利用,并且您的应用程序将更加可靠。
这就是 Kafka FAQ 关于恰好一次主题的说法:
How do I get exactly-once messaging from Kafka?
Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.
There are two approaches to getting exactly once semantics during data production:
- Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
- Include a primary key (UUID or something) in the message and deduplicate on the consumer.
If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.
I think there are two improvements that would make this a lot easier:
- Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
- The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon
我同意 RaGe 在消费者方面的重复数据删除。并且我们使用Redis对Kafka消息进行去重
假设Messageclass有一个成员叫'uniqId',由producer端填写,保证唯一。我们使用一个 12 长度的随机字符串。 (正则表达式是 '^[A-Za-z0-9]{12}$'
)
消费端使用Redis的SETNX去重,EXPIRE自动清除过期key。示例代码:
Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
jedis.expire(key, 7200); // 2 hours is ok for production environment;
}
上面的代码确实在Kafka(version 0.8.x)出现的情况下多次检测到重复消息。使用我们的 input/output 余额审核日志,没有发生消息丢失或重复的情况。
无论在生产者端做了什么,我们认为从 kafka 恰好交付一次的最佳方式仍然是在消费者端处理它:
- 生成带有 uuid 的 msg 作为 Kafka 消息 Key into topic T1
- consumer端从T1读取msg,以uuid为rowkey写入hbase
- 使用相同的行键从 hbase 读回并写入另一个主题 T2
- 让您的最终消费者实际消费主题 T2
Kafka 中现在有一个相对较新的'Transactional API',可以让您在处理流时实现exactly once processing。使用事务性 API,可以内置幂等性,只要系统的其余部分设计为幂等性即可。参见 https://www.baeldung.com/kafka-exactly-once
学习apache kafka一个月了。但是,我现在陷入了困境。我的用例是,我在不同的机器上有两个或多个消费者进程 运行。我 运行 进行了一些测试,其中我在 kafka 服务器中发布了 10,000 条消息。然后在处理这些消息时,我杀死了一个消费者进程并重新启动它。消费者在文件中写入处理过的消息。所以在消费完成后,文件显示超过 10k 条消息。所以有些消息是重复的。
在消费者进程中我禁用了自动提交。消费者手动批量提交偏移量。因此,例如,如果将 100 条消息写入文件,则消费者会提交偏移量。当单个消费者进程为 运行 并且它崩溃并恢复时,以这种方式避免了重复。但是,当多个消费者 运行 而其中一个崩溃并恢复时,它会将重复的消息写入文件。
有什么有效的策略可以避免这些重复的消息吗?
简短的回答是,不。
您正在寻找的是恰好一次处理。虽然它通常看起来可行,但永远不要依赖它,因为总是有警告。
即使为了尝试防止重复,您也需要使用简单使用者。这种方法的工作原理是针对每个消费者,当从某个分区消费消息时,将消费消息的分区和偏移量写入磁盘。当消费者失败后重启时,从磁盘中读取每个分区最后消费的偏移量。
但即使使用这种模式,消费者也不能保证它不会在失败后重新处理消息。如果消费者消费了一条消息,然后在偏移量被刷新到磁盘之前失败了怎么办?如果在处理消息之前写入磁盘,如果在实际处理消息之前写入偏移量然后失败怎么办?即使您在每条消息后向 ZooKeeper 提交偏移量,也会存在同样的问题。
不过,在某些情况下, exactly-once 处理更容易实现,但仅适用于某些用例。这只需要将您的偏移量存储在与单元应用程序输出相同的位置。例如,如果您编写一个对消息进行计数的消费者,通过将最后一次计数的偏移量与每个计数一起存储,您可以保证偏移量与消费者的状态同时存储。当然,为了保证恰好一次处理,这将要求您恰好消费一条消息并为每条消息恰好更新一次状态,这对于大多数 Kafka 消费者应用程序来说是完全不切实际的。由于性能原因,Kafka 本质上会批量使用消息。
通常情况下,如果您将应用程序简单地设计为幂等的,您的时间将得到更好的利用,并且您的应用程序将更加可靠。
这就是 Kafka FAQ 关于恰好一次主题的说法:
How do I get exactly-once messaging from Kafka?
Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.
There are two approaches to getting exactly once semantics during data production:
- Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
- Include a primary key (UUID or something) in the message and deduplicate on the consumer.
If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.
I think there are two improvements that would make this a lot easier:
- Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
- The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon
我同意 RaGe 在消费者方面的重复数据删除。并且我们使用Redis对Kafka消息进行去重
假设Messageclass有一个成员叫'uniqId',由producer端填写,保证唯一。我们使用一个 12 长度的随机字符串。 (正则表达式是 '^[A-Za-z0-9]{12}$'
)
消费端使用Redis的SETNX去重,EXPIRE自动清除过期key。示例代码:
Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
jedis.expire(key, 7200); // 2 hours is ok for production environment;
}
上面的代码确实在Kafka(version 0.8.x)出现的情况下多次检测到重复消息。使用我们的 input/output 余额审核日志,没有发生消息丢失或重复的情况。
无论在生产者端做了什么,我们认为从 kafka 恰好交付一次的最佳方式仍然是在消费者端处理它:
- 生成带有 uuid 的 msg 作为 Kafka 消息 Key into topic T1
- consumer端从T1读取msg,以uuid为rowkey写入hbase
- 使用相同的行键从 hbase 读回并写入另一个主题 T2
- 让您的最终消费者实际消费主题 T2
Kafka 中现在有一个相对较新的'Transactional API',可以让您在处理流时实现exactly once processing。使用事务性 API,可以内置幂等性,只要系统的其余部分设计为幂等性即可。参见 https://www.baeldung.com/kafka-exactly-once