'org.springframework.kafka.core.ConsumerFactory' 找不到

'org.springframework.kafka.core.ConsumerFactory' that could not be found

我需要从 Spring 使用具有 org.springframework.kafka 依赖项的 Apache Kafka 引导应用程序使用 json 对象消息,但出现以下错误:

***************************
APPLICATION FAILED TO START
***************************

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found. 

这是我的 bean class,它具有为 Kafka 配置创建的所有必需的 bean。

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String,Message> consumerFactory(){
        Map<String,Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG,"sample-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaConsumerFactory<>(config,new StringDeserializer(),
                new JsonDeserializer<>(Message.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListener(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

谁能帮帮我?我究竟做错了什么?谢谢

您可以使用 application.properties/application.yml 来实现。
请注意,Spring Boot 允许我们避免我们引入的所有样板代码创建一个 Java class 注释 @Configuration
application.yml 示例:

server:
  port: 8085

#Kafka config props:
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    #Consumer Deserialization:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: myGroupId
      enable-auto-commit: false
    listener:
      missing-topics-fatal: false

bootstrap-servers:可用的 Kafka 服务器列表。

key-deserializer:消费者密钥反序列化class。使用 Kafka 库的 StringDeserializer class。

value-deserializer:消费者价值反序列化class。在使用 JSON 格式的字符串消息时使用 Kafka 库的 StringDeserializer class。

group-id:Kafka 消费者的 group id 值。

enable-auto-commit:设置为false可以手动提交offset消息,避免consumer正在处理当前消费的消息时,又消费了新的消息导致consumer崩溃。

listener.missing-topics-fatal:通过将该值设置为 false,如果任何已配置的主题在代理上不存在,您可以避免在应用程序启动期间显示不需要的错误。

希望这对您有所帮助。