在 KAFKA 中消费后删除消息

Delete message after consuming it in KAFKA

我正在使用 apache kafka 来生成和使用一个 5GB 大小的文件。我想知道是否有一种方法可以在消费后自动删除来自主题的消息。我有什么办法可以跟踪消费的消息吗?我不想手动删除它。

在Kafka中,消费什么的责任是消费者的责任,这也是Kafka具有如此强大的横向扩展性的主要原因之一。

使用高级消费者 API 将通过在 Zookeeper 中提交消耗的偏移量来自动为您执行此操作(或者特殊的 Kafka 主题使用更新的配置选项来跟踪消耗的消息)。

简单的消费者 API 让您自己处理如何以及在何处跟踪消费的消息。

Kafka 中的消息清除是通过指定主题的保留时间或为其定义磁盘配额自动完成的,因此对于一个 5GB 文件的情况,该文件将在您定义的保留期后删除已经过了,不管有没有消耗。

据我所知,您可以通过减少存储时间从日志中删除消耗的数据。日志的默认时间设置为 168 小时,然后数据会自动从您创建的 Kafka-Topic 中删除。因此,我的建议是减少转到位于配置文件夹中的 server.properties 并将 168 更改为最短时间。因此,在您为 log.retention.hours 设置的特定时间后,它们将没有数据。因此您的问题将得到解决。

log.retention.hours=168

继续编码

您可以使用 consumer_group :Kafka 保证一条消息只会被组中的单个消费者读取。 https://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm

Kafka消费消息无法删除

Kafka 没有消息消费时直接删除消息的机制。

我在尝试这样做时发现的最接近的东西是 this trick,但它未经测试,并且根据设计它不会对最近的消息起作用:

A potential trick to do this is to use a combination of (a) a compacted topic and (b) a custom partitioner (c) a pair of interceptors.

The process would follow:

  1. Use a producer interceptor to add a GUID to the end of the key before it is written.
  2. Use a custom partitioner to ignore the GUID for the purposes of partitioning
  3. Use a compacted topic so you can then delete any individual message you need via producer.send(key+GUID, null)
  4. Use a consumer interceptor to remove the GUID on read.

但你不应该需要这个功能。

有1个或多个消费者,希望一条消息总共只被他们消费一次?
将他们放在同一个消费组中。

想要避免过多的消息填满磁盘?
根据磁盘 space 和/或时间设置保留。

我只是 运行 在这个问题上构建了一个脚本,可以 运行 周期性地 'mark' 使用已删除的记录。 Kafka 不会立即释放 space,但会删除偏移量在 'active' 之外的分区。

https://gist.github.com/ThePsyjo/b717d2eaca2deb09b8130b3e917758f6