Python 如何删除Kafka主题下的所有消息

Python how to delete all messages under a Kafka topic

我是卡夫卡的新手。我们正在尝试将数据从 csv 文件导入到 Kafka。我们需要每天导入,同时前一天的数据已被弃用。 如何删除 python 中 Kafka 主题下的所有消息?或者如何删除 python 中的 Kafka 主题? 或者我看到有人建议等待数据过期,如果可以的话如何设置数据过期时间? 任何建议将不胜感激!

谢谢

您不能删除 Kafka 主题中的消息。您可以:

  • 设置log.retention.*属性,这基本上是消息的过期时间。您可以选择基于时间的过期(例如,保留 6 小时前或更新的消息)或基于 space 的过期(例如,最多保留 1 GB 的消息)。请参阅 Broker config 并搜索 retention。您可以为不同的主题设置不同的值。
  • 删除整个主题。这有点棘手,我不推荐这种方式。
  • 每天创建一个新主题。类似于 my-topic-2015-09-21

但我认为您根本不需要删除主题中的消息。因为您的 Kafka 消费者会跟踪已处理的消息。因此,当您阅读今天的所有消息时,Kafka 消费者会保存这些信息,明天您将只阅读新消息。

另一个可能的解决方案是 Log compaction。但它更复杂,可能不是你需要的。基本上,您可以为 Kafka 主题中的每条消息设置一个键。如果您使用相同的密钥发送两条不同的消息,Kafka 将只保留主题中的最新消息,并删除所有具有相同密钥的旧消息。您可以将其视为一种 "key-value store"。具有相同键的每条消息只更新特定键下的值。但是,嘿,你真的不需要这个,仅供参考 :-)。

最简单的方法就是直接删除主题。我在 Python 自动化测试套件中使用它,我想在其中验证一组特定的测试消息是否通过 Kafka 发送,并且不想看到之前测试运行的结果

def delete_kafka_topic(topic_name):
    call(["/usr/bin/kafka-topics", "--zookeeper", "zookeeper-1:2181", "--delete", "--topic", topic_name])