如何使用 python 或使用任何内置方法从 kafka 主题中删除特定行数?

How to delete specific number of lines from kafka topic by using python or using any inbuilt method?

我在使用 consumer.poll() 方法时遇到问题。使用 poll() 方法获取数据后,消费者将没有任何数据要提交,所以请帮我从中删除特定数量的行kafka 主题 .

您需要确保数据在提交之前已完全处理,以避免"data loss"万一消费者失败。

因此,如果启用 auto.commit,请确保在发出下一个 poll() 之前在 poll() 之后完全处理所有数据,因为每个 poll() 隐式提交来自其先前 poll().

的所有数据

如果这不可能,您应该禁用 auto.commit 并在通过 consumer.commit(...) 完全处理数据后手动提交。为此,请记住您不需要单独提交每条消息,并且偏移量 X 的提交隐式提交所有偏移量 < X 的消息(例如,在处理偏移量 5 的消息后,您commit offset 6 -- 提交的偏移量不是最后一条成功处理的消息,而是你要处理的下一条消息)。偏移量 6 的提交会提交偏移量为 0 到 5 的所有消息。因此,在所有具有较小偏移量的消息被完全处理之前,您不应提交偏移量 6。