使用 SASL_SSL / PLAIN 连接时出现 Kafka AdminClient 错误
Kafka AdminClient error when connecting using SASL_SSL / PLAIN
我想创建和使用 java 实用程序来获取信息并创建/修改/删除主题。
为了创建实用程序,我正在尝试 this link
中的示例
这是我在代码中设置属性的方式:
Properties adminConfig = new Properties();
adminConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "server1:9092,server2:9092,server3:9092");
adminConfig.put("security.protocol", "SASL_SSL");
adminConfig.put("security.mechanism", "PLAINTEXT");
adminConfig.put("ssl.keystore.type", "JKS");
adminConfig.put("ssl.keystore.location", "/config/dev/java.keystore.jks");
adminConfig.put("ssl.keystore.password", "password");
adminConfig.put("ssl.key.password", "password");
adminConfig.put("ssl.truststore.location", "/config/dev/java.truststore.jks");
adminConfig.put("ssl.truststore.password", "password");
adminConfig.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"client\" password=\"client-secret\";");
AdminClient admin = KafkaAdminClient.create(adminConfig);
for (Node node : admin.describeCluster().nodes().get()) {
System.out.println("-- node: " + node.id() + " --");
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, "0");
DescribeConfigsResult dcr = admin.describeConfigs(Collections.singleton(cr));
dcr.all().get().forEach((k, c) -> {
c.entries()
.forEach(configEntry -> {
System.out.println(configEntry.name() + "= " + configEntry.value())
});
});
}
我得到的错误如下
WARNING: Illegal reflective access by org.apache.kafka.common.network.SaslChannelBuilder (file:/Users/x1234/.m2/repository/org/apache/kafka/kafka-clients/2.3.0/kafka-clients-2.3.0.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.kafka.common.network.SaslChannelBuilder
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:407)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:55)
at kafka.utils.ListingKafkaConfigs.main(ListingKafkaConfigs.java:42)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:382)
... 2 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:104)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
... 6 more
如果我没有设置 SSL 信息,我会收到以下错误
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=13=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.utils.ListingKafkaConfigs.main(ListingKafkaConfigs.java:18)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
设置使用该实用程序的属性时我做错了什么?
您的配置使用了 security.mechanism
,这不是一个有效的设置。有关设置列表,请参阅 Apache Kafka 网站上的 Admin client configuration 部分。
要解决您的问题,请替换:
adminConfig.put("security.mechanism", "PLAINTEXT");
来自
adminConfig.put("sasl.mechanism", "PLAIN");
我想创建和使用 java 实用程序来获取信息并创建/修改/删除主题。
为了创建实用程序,我正在尝试 this link
中的示例这是我在代码中设置属性的方式:
Properties adminConfig = new Properties();
adminConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "server1:9092,server2:9092,server3:9092");
adminConfig.put("security.protocol", "SASL_SSL");
adminConfig.put("security.mechanism", "PLAINTEXT");
adminConfig.put("ssl.keystore.type", "JKS");
adminConfig.put("ssl.keystore.location", "/config/dev/java.keystore.jks");
adminConfig.put("ssl.keystore.password", "password");
adminConfig.put("ssl.key.password", "password");
adminConfig.put("ssl.truststore.location", "/config/dev/java.truststore.jks");
adminConfig.put("ssl.truststore.password", "password");
adminConfig.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"client\" password=\"client-secret\";");
AdminClient admin = KafkaAdminClient.create(adminConfig);
for (Node node : admin.describeCluster().nodes().get()) {
System.out.println("-- node: " + node.id() + " --");
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, "0");
DescribeConfigsResult dcr = admin.describeConfigs(Collections.singleton(cr));
dcr.all().get().forEach((k, c) -> {
c.entries()
.forEach(configEntry -> {
System.out.println(configEntry.name() + "= " + configEntry.value())
});
});
}
我得到的错误如下
WARNING: Illegal reflective access by org.apache.kafka.common.network.SaslChannelBuilder (file:/Users/x1234/.m2/repository/org/apache/kafka/kafka-clients/2.3.0/kafka-clients-2.3.0.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.kafka.common.network.SaslChannelBuilder
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:407)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:55)
at kafka.utils.ListingKafkaConfigs.main(ListingKafkaConfigs.java:42)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:382)
... 2 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:104)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
... 6 more
如果我没有设置 SSL 信息,我会收到以下错误
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=13=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.utils.ListingKafkaConfigs.main(ListingKafkaConfigs.java:18)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
设置使用该实用程序的属性时我做错了什么?
您的配置使用了 security.mechanism
,这不是一个有效的设置。有关设置列表,请参阅 Apache Kafka 网站上的 Admin client configuration 部分。
要解决您的问题,请替换:
adminConfig.put("security.mechanism", "PLAINTEXT");
来自
adminConfig.put("sasl.mechanism", "PLAIN");