如何排空kafka主题中的记录
how to drain records in a kafka topic
在应用程序计划维护活动期间,需要排空kafka主题中的所有消息。
在 MQ 中,我们可以监控队列深度并在所有消息都被消费后开始维护活动。在 kafka 中,我们是否有类似的机制来查明主题中的所有消息是否已被消费并关闭生产者和消费者是否安全?
使用以下命令可以监控消费者组的 LAG,一旦延迟为 0 就意味着主题中没有更多消息可以消费
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group count_errors --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
count_errors logs 2 2908278 2908278 0 consumer-1_/10.8.0.55
count_errors logs 3 2907501 2907501 0 consumer-1_/10.8.0.43
count_errors logs 4 2907541 2907541 0 consumer-1_/10.8.0.177
count_errors logs 1 2907499 2907499 0 consumer-1_/10.8.0.115
count_errors logs 0 2907469 2907469 0 consumer-1_/10.8.0.126
在应用程序计划维护活动期间,需要排空kafka主题中的所有消息。
在 MQ 中,我们可以监控队列深度并在所有消息都被消费后开始维护活动。在 kafka 中,我们是否有类似的机制来查明主题中的所有消息是否已被消费并关闭生产者和消费者是否安全?
使用以下命令可以监控消费者组的 LAG,一旦延迟为 0 就意味着主题中没有更多消息可以消费
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group count_errors --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
count_errors logs 2 2908278 2908278 0 consumer-1_/10.8.0.55
count_errors logs 3 2907501 2907501 0 consumer-1_/10.8.0.43
count_errors logs 4 2907541 2907541 0 consumer-1_/10.8.0.177
count_errors logs 1 2907499 2907499 0 consumer-1_/10.8.0.115
count_errors logs 0 2907469 2907469 0 consumer-1_/10.8.0.126