spring-kafka application.properties JAAS/SASL 配置不工作
spring-kafka application.properties configuration for JAAS/SASL not working
用例:
我正在使用 Spring Boot 2.2.5.RELEASE
和 Kafka 2.4.1
JAAS/SASL 配置在 Kafka/ZooKeeper 上正确完成,因为创建主题时 kafka-topics.bat
没有问题
问题:
当我启动 Spring 引导应用程序时,我立即收到以下错误:
kafka-server-start.bat 控制台:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
IDE 控制台:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected
我的application.properties
配置:
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";
kafka_server_jaas.conf
:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="12345"
user_admin="12345"
user_spring_bO0t="i_am_a_spring_bO0t_user";
};
我是不是漏掉了什么?
提前致谢。
我在错误的地方定义了属性,即 application.properties
。因为我有 ProducerFactory 和 ConsumerFactory beans,那些 application.properties
将被 Spring Boot.忽略。
在 bean 定义中配置相同的属性 解决了问题,即将您的属性 从 application.properties
到你定义bean的地方。
这是一个例子:
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=username" +
"password=password;");
return new KafkaAdmin(configs);
}
@jumping_monkey的答案是正确的,但是我不知道把这些配置放在ProducerFactory
& ConsumerFactory
beans的什么地方,所以我举个例子想知道的请往下看:
-分别在您的 ProducerConfig
或 ConsumerConfig
Beans 中(我的名为 generalMessageProducerFactory
):
@Bean
public ProducerFactory<String, GeneralMessageDto> generalMessageProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put("sasl.mechanism", "PLAIN");
configProps.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configProps.put("security.protocol", "SASL_SSL");
return new DefaultKafkaProducerFactory<>(configProps);
}
还有你的 TopicConfiguration
Class kafkaAdmin
方法:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configs.put("security.protocol", "SASL_SSL");
return new KafkaAdmin(configs);
}
希望对大家有所帮助!
用例:
我正在使用 Spring Boot 2.2.5.RELEASE
和 Kafka 2.4.1
JAAS/SASL 配置在 Kafka/ZooKeeper 上正确完成,因为创建主题时 kafka-topics.bat
问题:
当我启动 Spring 引导应用程序时,我立即收到以下错误:
kafka-server-start.bat 控制台:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
IDE 控制台:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected
我的application.properties
配置:
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";
kafka_server_jaas.conf
:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="12345"
user_admin="12345"
user_spring_bO0t="i_am_a_spring_bO0t_user";
};
我是不是漏掉了什么?
提前致谢。
我在错误的地方定义了属性,即 application.properties
。因为我有 ProducerFactory 和 ConsumerFactory beans,那些 application.properties
将被 Spring Boot.忽略。
在 bean 定义中配置相同的属性 解决了问题,即将您的属性 从 application.properties
到你定义bean的地方。
这是一个例子:
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
));
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=username" +
"password=password;");
return new KafkaAdmin(configs);
}
@jumping_monkey的答案是正确的,但是我不知道把这些配置放在ProducerFactory
& ConsumerFactory
beans的什么地方,所以我举个例子想知道的请往下看:
-分别在您的 ProducerConfig
或 ConsumerConfig
Beans 中(我的名为 generalMessageProducerFactory
):
@Bean
public ProducerFactory<String, GeneralMessageDto> generalMessageProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put("sasl.mechanism", "PLAIN");
configProps.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configProps.put("security.protocol", "SASL_SSL");
return new DefaultKafkaProducerFactory<>(configProps);
}
还有你的 TopicConfiguration
Class kafkaAdmin
方法:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put("sasl.mechanism", "PLAIN");
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='YOUR_KAFKA_CLUSTER_USERNAME' password='YOUR_KAFKA_CLUSTER_PASSWORD';");
configs.put("security.protocol", "SASL_SSL");
return new KafkaAdmin(configs);
}
希望对大家有所帮助!