在 Confluent 上使用 Kafka 运行 验证 Kafka CLI

Authenticate Kafka CLI with Kafka running on Confluent

我在 Confluent Cloud 上有一个 Kafka 集群 运行,但我无法从 UI 重置提交偏移量。因此,我正在尝试通过 Kafka 的 CLI 来完成它,如下所示:

kafka-consumer-groups --bootstrap-server=my_cluster.confluent.cloud:9092 --list

但是,我遇到了以下错误。我认为这与我如何进行身份验证有关。

Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listConsumerGroups(ConsumerGroupCommand.scala:203)
    at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.listGroups(ConsumerGroupCommand.scala:198)
    at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:70)
    at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
    at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups
    at org.apache.kafka.clients.admin.KafkaAdminClient.handleFailure(KafkaAdminClient.java:3368)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.handleTimeoutFailure(KafkaAdminClient.java:838)
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:804)
    at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:934)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:1013)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1367)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1331)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: findAllBrokers

您需要使用 --command-config 选项来设置包含您的 CCLoud 凭据的属性文件

这里是一个列出消费者组的例子

kafka-consumer-groups --bootstrap-server <ccloud kafka>:9092 --command-config consumer.properties --list

consumer.properties

bootstrap.servers=<ccloud kafka>:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="<KEY>" password="<SECRET>";