如何排空kafka主题中的记录

how to drain records in a kafka topic

在应用程序计划维护活动期间,需要排空kafka主题中的所有消息。

在 MQ 中,我们可以监控队列深度并在所有消息都被消费后开始维护活动。在 kafka 中,我们是否有类似的机制来查明主题中的所有消息是否已被消费并关闭生产者和消费者是否安全?

使用以下命令可以监控消费者组的 LAG,一旦延迟为 0 就意味着主题中没有更多消息可以消费

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group count_errors --describe
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
count_errors                   logs                           2          2908278         2908278         0               consumer-1_/10.8.0.55
count_errors                   logs                           3          2907501         2907501         0               consumer-1_/10.8.0.43
count_errors                   logs                           4          2907541         2907541         0               consumer-1_/10.8.0.177
count_errors                   logs                           1          2907499         2907499         0               consumer-1_/10.8.0.115
count_errors                   logs                           0          2907469         2907469         0               consumer-1_/10.8.0.126