'org.springframework.kafka.core.ConsumerFactory' 找不到
'org.springframework.kafka.core.ConsumerFactory' that could not be found
我需要从 Spring 使用具有 org.springframework.kafka 依赖项的 Apache Kafka 引导应用程序使用 json 对象消息,但出现以下错误:
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
这是我的 bean class,它具有为 Kafka 配置创建的所有必需的 bean。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,Message> consumerFactory(){
Map<String,Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"sample-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaConsumerFactory<>(config,new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListener(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
谁能帮帮我?我究竟做错了什么?谢谢
您可以使用 application.properties/application.yml
来实现。
请注意,Spring Boot 允许我们避免我们引入的所有样板代码创建一个 Java class 注释 @Configuration
application.yml 示例:
server:
port: 8085
#Kafka config props:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
#Consumer Deserialization:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: myGroupId
enable-auto-commit: false
listener:
missing-topics-fatal: false
bootstrap-servers
:可用的 Kafka 服务器列表。
key-deserializer
:消费者密钥反序列化class。使用 Kafka 库的 StringDeserializer class。
value-deserializer
:消费者价值反序列化class。在使用 JSON 格式的字符串消息时使用 Kafka 库的 StringDeserializer class。
group-id
:Kafka 消费者的 group id 值。
enable-auto-commit
:设置为false可以手动提交offset消息,避免consumer正在处理当前消费的消息时,又消费了新的消息导致consumer崩溃。
listener.missing-topics-fatal
:通过将该值设置为 false,如果任何已配置的主题在代理上不存在,您可以避免在应用程序启动期间显示不需要的错误。
希望这对您有所帮助。
我需要从 Spring 使用具有 org.springframework.kafka 依赖项的 Apache Kafka 引导应用程序使用 json 对象消息,但出现以下错误:
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
这是我的 bean class,它具有为 Kafka 配置创建的所有必需的 bean。
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String,Message> consumerFactory(){
Map<String,Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"sample-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaConsumerFactory<>(config,new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListener(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
谁能帮帮我?我究竟做错了什么?谢谢
您可以使用 application.properties/application.yml
来实现。
请注意,Spring Boot 允许我们避免我们引入的所有样板代码创建一个 Java class 注释 @Configuration
application.yml 示例:
server:
port: 8085
#Kafka config props:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
#Consumer Deserialization:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: myGroupId
enable-auto-commit: false
listener:
missing-topics-fatal: false
bootstrap-servers
:可用的 Kafka 服务器列表。
key-deserializer
:消费者密钥反序列化class。使用 Kafka 库的 StringDeserializer class。
value-deserializer
:消费者价值反序列化class。在使用 JSON 格式的字符串消息时使用 Kafka 库的 StringDeserializer class。
group-id
:Kafka 消费者的 group id 值。
enable-auto-commit
:设置为false可以手动提交offset消息,避免consumer正在处理当前消费的消息时,又消费了新的消息导致consumer崩溃。
listener.missing-topics-fatal
:通过将该值设置为 false,如果任何已配置的主题在代理上不存在,您可以避免在应用程序启动期间显示不需要的错误。
希望这对您有所帮助。