Python 如何删除Kafka主题下的所有消息
Python how to delete all messages under a Kafka topic
我是卡夫卡的新手。我们正在尝试将数据从 csv 文件导入到 Kafka。我们需要每天导入,同时前一天的数据已被弃用。
如何删除 python 中 Kafka 主题下的所有消息?或者如何删除 python 中的 Kafka 主题?
或者我看到有人建议等待数据过期,如果可以的话如何设置数据过期时间?
任何建议将不胜感激!
谢谢
您不能删除 Kafka 主题中的消息。您可以:
- 设置
log.retention.*
属性,这基本上是消息的过期时间。您可以选择基于时间的过期(例如,保留 6 小时前或更新的消息)或基于 space 的过期(例如,最多保留 1 GB 的消息)。请参阅 Broker config 并搜索 retention。您可以为不同的主题设置不同的值。
- 删除整个主题。这有点棘手,我不推荐这种方式。
- 每天创建一个新主题。类似于 my-topic-2015-09-21。
但我认为您根本不需要删除主题中的消息。因为您的 Kafka 消费者会跟踪已处理的消息。因此,当您阅读今天的所有消息时,Kafka 消费者会保存这些信息,明天您将只阅读新消息。
另一个可能的解决方案是 Log compaction。但它更复杂,可能不是你需要的。基本上,您可以为 Kafka 主题中的每条消息设置一个键。如果您使用相同的密钥发送两条不同的消息,Kafka 将只保留主题中的最新消息,并删除所有具有相同密钥的旧消息。您可以将其视为一种 "key-value store"。具有相同键的每条消息只更新特定键下的值。但是,嘿,你真的不需要这个,仅供参考 :-)。
最简单的方法就是直接删除主题。我在 Python 自动化测试套件中使用它,我想在其中验证一组特定的测试消息是否通过 Kafka 发送,并且不想看到之前测试运行的结果
def delete_kafka_topic(topic_name):
call(["/usr/bin/kafka-topics", "--zookeeper", "zookeeper-1:2181", "--delete", "--topic", topic_name])
我是卡夫卡的新手。我们正在尝试将数据从 csv 文件导入到 Kafka。我们需要每天导入,同时前一天的数据已被弃用。 如何删除 python 中 Kafka 主题下的所有消息?或者如何删除 python 中的 Kafka 主题? 或者我看到有人建议等待数据过期,如果可以的话如何设置数据过期时间? 任何建议将不胜感激!
谢谢
您不能删除 Kafka 主题中的消息。您可以:
- 设置
log.retention.*
属性,这基本上是消息的过期时间。您可以选择基于时间的过期(例如,保留 6 小时前或更新的消息)或基于 space 的过期(例如,最多保留 1 GB 的消息)。请参阅 Broker config 并搜索 retention。您可以为不同的主题设置不同的值。 - 删除整个主题。这有点棘手,我不推荐这种方式。
- 每天创建一个新主题。类似于 my-topic-2015-09-21。
但我认为您根本不需要删除主题中的消息。因为您的 Kafka 消费者会跟踪已处理的消息。因此,当您阅读今天的所有消息时,Kafka 消费者会保存这些信息,明天您将只阅读新消息。
另一个可能的解决方案是 Log compaction。但它更复杂,可能不是你需要的。基本上,您可以为 Kafka 主题中的每条消息设置一个键。如果您使用相同的密钥发送两条不同的消息,Kafka 将只保留主题中的最新消息,并删除所有具有相同密钥的旧消息。您可以将其视为一种 "key-value store"。具有相同键的每条消息只更新特定键下的值。但是,嘿,你真的不需要这个,仅供参考 :-)。
最简单的方法就是直接删除主题。我在 Python 自动化测试套件中使用它,我想在其中验证一组特定的测试消息是否通过 Kafka 发送,并且不想看到之前测试运行的结果
def delete_kafka_topic(topic_name):
call(["/usr/bin/kafka-topics", "--zookeeper", "zookeeper-1:2181", "--delete", "--topic", topic_name])