如何使用 python 或使用任何内置方法从 kafka 主题中删除特定行数?
How to delete specific number of lines from kafka topic by using python or using any inbuilt method?
我在使用 consumer.poll() 方法时遇到问题。使用 poll() 方法获取数据后,消费者将没有任何数据要提交,所以请帮我从中删除特定数量的行kafka 主题 .
您需要确保数据在提交之前已完全处理,以避免"data loss"万一消费者失败。
因此,如果启用 auto.commit
,请确保在发出下一个 poll()
之前在 poll()
之后完全处理所有数据,因为每个 poll()
隐式提交来自其先前 poll()
.
的所有数据
如果这不可能,您应该禁用 auto.commit
并在通过 consumer.commit(...)
完全处理数据后手动提交。为此,请记住您不需要单独提交每条消息,并且偏移量 X
的提交隐式提交所有偏移量 < X
的消息(例如,在处理偏移量 5 的消息后,您commit offset 6 -- 提交的偏移量不是最后一条成功处理的消息,而是你要处理的下一条消息)。偏移量 6 的提交会提交偏移量为 0 到 5 的所有消息。因此,在所有具有较小偏移量的消息被完全处理之前,您不应提交偏移量 6。
我在使用 consumer.poll() 方法时遇到问题。使用 poll() 方法获取数据后,消费者将没有任何数据要提交,所以请帮我从中删除特定数量的行kafka 主题 .
您需要确保数据在提交之前已完全处理,以避免"data loss"万一消费者失败。
因此,如果启用 auto.commit
,请确保在发出下一个 poll()
之前在 poll()
之后完全处理所有数据,因为每个 poll()
隐式提交来自其先前 poll()
.
如果这不可能,您应该禁用 auto.commit
并在通过 consumer.commit(...)
完全处理数据后手动提交。为此,请记住您不需要单独提交每条消息,并且偏移量 X
的提交隐式提交所有偏移量 < X
的消息(例如,在处理偏移量 5 的消息后,您commit offset 6 -- 提交的偏移量不是最后一条成功处理的消息,而是你要处理的下一条消息)。偏移量 6 的提交会提交偏移量为 0 到 5 的所有消息。因此,在所有具有较小偏移量的消息被完全处理之前,您不应提交偏移量 6。