spring-kafka application.properties JAAS/SASL 配置不工作

spring-kafka application.properties configuration for JAAS/SASL not working

用例:
我正在使用 Spring Boot 2.2.5.RELEASEKafka 2.4.1
JAAS/SASL 配置在 Kafka/ZooKeeper 上正确完成,因为创建主题时 kafka-topics.bat

没有问题

问题:
当我启动 Spring 引导应用程序时,我立即收到以下错误:

kafka-server-start.bat 控制台:
INFO [SocketServer brokerId=1] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)

IDE 控制台:
WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=xxx, groupId=yyy] Bootstrap broker localhost:9093 (id: -3 rack: null) disconnected

我的application.properties配置:

spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="spring_bO0t" password="i_am_a_spring_bO0t_user";

kafka_server_jaas.conf:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="12345"
    user_admin="12345"
    user_spring_bO0t="i_am_a_spring_bO0t_user";
};

我是不是漏掉了什么?

提前致谢。

我在错误的地方定义了属性,即 application.properties。因为我有 ProducerFactory 和 ConsumerFactory beans,那些 application.properties 将被 Spring Boot.忽略

在 bean 定义中配置相同的属性 解决了问题,即将您的属性 application.properties 你定义bean的地方。

这是一个例子:

@Bean
public ProducerFactory<Object, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
            "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
    ));
        
    return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
        "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "username", "password"
    ));

    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
    configs.put("security.protocol", "SASL_PLAINTEXT");
    configs.put("sasl.mechanism", "PLAIN");
    configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + 
                                    "username=username" + 
                                    "password=password;");
    
    return new KafkaAdmin(configs);
}

@jumping_monkey的答案是正确的,但是我不知道把这些配置放在ProducerFactory & ConsumerFactory beans的什么地方,所以我举个例子想知道的请往下看:

-分别在您的 ProducerConfigConsumerConfig Beans 中(我的名为 generalMessageProducerFactory):

@Bean
public ProducerFactory<String, GeneralMessageDto> generalMessageProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put("sasl.mechanism", "PLAIN");
    configProps.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='YOUR_KAFKA_CLUSTER_USERNAME'   password='YOUR_KAFKA_CLUSTER_PASSWORD';");
    configProps.put("security.protocol", "SASL_SSL");
    return new DefaultKafkaProducerFactory<>(configProps);
}

还有你的 TopicConfiguration Class kafkaAdmin 方法:

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configs.put("sasl.mechanism", "PLAIN");
    configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='YOUR_KAFKA_CLUSTER_USERNAME'   password='YOUR_KAFKA_CLUSTER_PASSWORD';");
    configs.put("security.protocol", "SASL_SSL");
    return new KafkaAdmin(configs);
}

希望对大家有所帮助!