Kafka:删除空闲消费者组id
Kafka: Delete idle consumer group id
在某些情况下,我使用 Kafka-stream 对主题的小型内存(哈希图)投影进行建模。 K,V 缓存确实需要一些操作,所以它不是 GlobalKTable 的好例子。在这样的“缓存”场景下,我希望我所有的兄弟实例都有相同的缓存,所以我需要绕过消费者组机制。
要启用此功能,我通常只是使用随机生成的应用程序 ID 启动我的应用程序,这样每个应用程序都会在每次重新启动时重新加载主题。唯一需要注意的是,我最终会在 offsets.retention.minutes 之前在 kafka 代理上孤立一些消费者群体,这对于我们的运营监控工具来说并不理想。
知道如何解决这个问题吗?
- 我们能否将 applicationId 配置为临时的,以便在应用终止后消失?
- 或者我们是否可以强制消费者仅在本地管理其偏移量?
- 或者是否有一些 java adminApi 可以用来在正常关闭应用程序时清理我的消费者组 ID?
谢谢
AdminClient
中有一个叫做deleteConsumerGroups
的JavaAPI,可以用来删除单个ConsumerGroup。
Kafka 2.5.0 可以像下面这样使用它。
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
public class DeleteConsumerGroups {
public static void main(String[] args) {
System.out.println("*** Starting AdminClient to delete a Consumer Group ***");
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");
AdminClient adminClient = AdminClient.create(properties);
String consumerGroupToBeDeleted = "console-consumer-65092";
DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));
KafkaFuture<Void> resultFuture = deleteConsumerGroupsResult.all();
try {
resultFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
adminClient.close();
}
}
运行 上述代码前的 ConsumerGroups 列表
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-65092
console-consumer-53268
运行 上述代码后的 ConsumerGroups 列表
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-53268
在某些情况下,我使用 Kafka-stream 对主题的小型内存(哈希图)投影进行建模。 K,V 缓存确实需要一些操作,所以它不是 GlobalKTable 的好例子。在这样的“缓存”场景下,我希望我所有的兄弟实例都有相同的缓存,所以我需要绕过消费者组机制。
要启用此功能,我通常只是使用随机生成的应用程序 ID 启动我的应用程序,这样每个应用程序都会在每次重新启动时重新加载主题。唯一需要注意的是,我最终会在 offsets.retention.minutes 之前在 kafka 代理上孤立一些消费者群体,这对于我们的运营监控工具来说并不理想。 知道如何解决这个问题吗?
- 我们能否将 applicationId 配置为临时的,以便在应用终止后消失?
- 或者我们是否可以强制消费者仅在本地管理其偏移量?
- 或者是否有一些 java adminApi 可以用来在正常关闭应用程序时清理我的消费者组 ID?
谢谢
AdminClient
中有一个叫做deleteConsumerGroups
的JavaAPI,可以用来删除单个ConsumerGroup。
Kafka 2.5.0 可以像下面这样使用它。
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
public class DeleteConsumerGroups {
public static void main(String[] args) {
System.out.println("*** Starting AdminClient to delete a Consumer Group ***");
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");
AdminClient adminClient = AdminClient.create(properties);
String consumerGroupToBeDeleted = "console-consumer-65092";
DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));
KafkaFuture<Void> resultFuture = deleteConsumerGroupsResult.all();
try {
resultFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
adminClient.close();
}
}
运行 上述代码前的 ConsumerGroups 列表
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-65092
console-consumer-53268
运行 上述代码后的 ConsumerGroups 列表
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-53268