Kafka:删除空闲消费者组id

Kafka: Delete idle consumer group id

在某些情况下,我使用 Kafka-stream 对主题的小型内存(哈希图)投影进行建模。 K,V 缓存确实需要一些操作,所以它不是 GlobalKTable 的好例子。在这样的“缓存”场景下,我希望我所有的兄弟实例都有相同的缓存,所以我需要绕过消费者组机制。

要启用此功能,我通常只是使用随机生成的应用程序 ID 启动我的应用程序,这样每个应用程序都会在每次重新启动时重新加载主题。唯一需要注意的是,我最终会在 offsets.retention.minutes 之前在 kafka 代理上孤立一些消费者群体,这对于我们的运营监控工具来说并不理想。 知道如何解决这个问题吗?

谢谢

AdminClient中有一个叫做deleteConsumerGroups的JavaAPI,可以用来删除单个ConsumerGroup。

Kafka 2.5.0 可以像下面这样使用它。

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

public class DeleteConsumerGroups {
  public static void main(String[] args) {
    System.out.println("*** Starting AdminClient to delete a Consumer Group ***");

    final Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
    properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

    AdminClient adminClient = AdminClient.create(properties);
    String consumerGroupToBeDeleted = "console-consumer-65092";
    DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));

    KafkaFuture<Void> resultFuture = deleteConsumerGroupsResult.all();
    try {
      resultFuture.get();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }

    adminClient.close();
  }
}

运行 上述代码前的 ConsumerGroups 列表

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-65092
console-consumer-53268

运行 上述代码后的 ConsumerGroups 列表

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-53268