为什么 spring-boot with Kafka 启动失败?
Why spring-boot with Kafka failed to start?
有 spring-boot 应用程序与 kafka 依赖关系,
有两个 Kafka 主题,需要从中读取消息
tacocloud.orders.topic
tacocloud.tacos.topic
并且已经在里面成功发送消息
已配置 kafka 配置以像这样收听此主题
package com.example.tacocloud.config;
import com.example.tacocloud.model.jpa.Order;
import com.example.tacocloud.model.jpa.Taco;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties
public class KafkaConfig {
// Order
@Bean
@ConfigurationProperties("spring.kafka.consumer.order")
public Map<String, Object> orderConsumerConfig() {
return new HashMap<>();
}
@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory<>(orderConsumerConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer(
@Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig,
@Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(orderConsumerFactory);
return factory;
}
// Taco
@Bean
@ConfigurationProperties("spring.kafka.consumer.taco")
public Map<String, Object> tacoConsumerConfig() {
return new HashMap<>();
}
@Bean
public DefaultKafkaConsumerFactory tacoConsumerFactory(
@Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) {
return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer(
@Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig,
@Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(tacoConsumerFactory);
return factory;
}
}
所以,有两个DefaultKafkaConsumerFactory和两个ConcurrentKafkaListenerContainerFactory
之后,使用@KafkaListener 日志消息创建了一个服务:
package com.example.tacocloud.service;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaService {
@KafkaListener(topics = "tacocloud.orders.topic", groupId = "one")
public void order() {
System.out.println("Order");
}
@KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two")
public void taco() {
System.out.println("Taco");
}
}
application.yml 文件
spring:
kafka:
consumer:
order:
topic: tacocloud.orders.topic
"[bootstrap.servers]": localhost:29888
"[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
"[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer
template:
"[default.topic]": tacocloud.orders.topic
taco:
topic: tacocloud.tacos.topic
"[bootstrap.servers]": localhost:29888
"[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
"[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer
template:
"[default.topic]": tacocloud.tacos.topic
但是,当我启动 spring-boot 应用程序时,我看到错误消息(((
2022-04-15 21:38:35.918 ERROR 13888 --- [ restartedMain]
o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to
start bean
'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.config.ConfigException:
Missing required configuration "key.deserializer" which has no default
value. at
org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181)
~[spring-context-5.3.16.jar:5.3.16] at
org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:54)
~[spring-context-5.3.16.jar:5.3.16] at
org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
~[spring-context-5.3.16.jar:5.3.16] at
java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at
org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
~[spring-context-5.3.16.jar:5.3.16] at
org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
~[spring-context-5.3.16.jar:5.3.16] at
org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935)
~[spring-context-5.3.16.jar:5.3.16] at
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
~[spring-context-5.3.16.jar:5.3.16] at
org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
~[spring-boot-2.6.4.jar:2.6.4] at
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740)
~[spring-boot-2.6.4.jar:2.6.4] at
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415)
~[spring-boot-2.6.4.jar:2.6.4] at
org.springframework.boot.SpringApplication.run(SpringApplication.java:303)
~[spring-boot-2.6.4.jar:2.6.4] at
org.springframework.boot.SpringApplication.run(SpringApplication.java:1312)
~[spring-boot-2.6.4.jar:2.6.4] at
org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
~[spring-boot-2.6.4.jar:2.6.4] at
com.example.tacocloud.TacoCloudApplication.main(TacoCloudApplication.java:10)
~[classes/:na] at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[na:na] at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:na] at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:na] at
java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at
org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
~[spring-boot-devtools-2.6.4.jar:2.6.4] Caused by:
org.apache.kafka.common.config.ConfigException: Missing required
configuration "key.deserializer" which has no default value. at
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493)
~[kafka-clients-2.8.0.jar:na] at
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
~[kafka-clients-2.8.0.jar:na] at
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
~[kafka-clients-2.8.0.jar:na] at
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129)
~[kafka-clients-2.8.0.jar:na] at
org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:640)
~[kafka-clients-2.8.0.jar:na] at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:665)
~[kafka-clients-2.8.0.jar:na] at
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:416)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:384)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:360)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:327)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:304)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:758)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:344)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:331)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:276)
~[spring-kafka-2.8.3.jar:2.8.3] at
org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
~[spring-context-5.3.16.jar:5.3.16] ... 19 common frames omitted
Process finished with exit code 0
感谢您提供样品。
所以,我在本地打开它并在这个bean定义中放置了一个断点:
@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory<Object, Order>(orderConsumerConfig);
}
orderConsumerConfig
地图看起来像这样:
orderConsumerConfig = {LinkedHashMap@8587} size = 1
"orderConsumerConfig" -> {HashMap@8600} size = 5
key = "orderConsumerConfig"
value = {HashMap@8600} size = 5
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
"template" -> {LinkedHashMap@8611} size = 1
"topic" -> "tacocloud.orders.topic"
"bootstrap.servers" -> "localhost:29888"
"value.deserializer" -> "sample.kafka.serializer.OrderDeserializer"
所以,您的 KafkaConsumer
未正确初始化确实不足为奇。您的目标地图配置隐藏在此注入地图的 orderConsumerConfig
条目下。
您介意与我分享您是从哪里想到 Map
bean 上的 @ConfigurationProperties
的吗?以及如何将其用作依赖注入?
我想根据属性做类似的事情(配置多个 ConsumerFactories)。
我使用 @ConfigurationProperties 创建了一个 Map 而不是 Map,然后将该地图的项目添加到一个新的 Map 中。不确定为什么 Spring-Boot 以这种方式加载 Map。
@Bean
@ConfigurationProperties("taco-cart.kafka")
public Map<String, String> tacoCartKafkaProperties() {
return new HashMap<>();
}
@Bean
public ConsumerFactory<String, TacoCart> tacoCartConsumerFactory(@Qualifier("tacoCartKafkaProperties") Map<String, String> tacoCartKafkaProperties) {
// Convert map.
Map<String, Object> config = new HashMap<>();
config.putAll(tacoCartKafkaProperties);
return new DefaultKafkaConsumerFactory<>(config);
}
有 spring-boot 应用程序与 kafka 依赖关系, 有两个 Kafka 主题,需要从中读取消息
tacocloud.orders.topic
tacocloud.tacos.topic
并且已经在里面成功发送消息
已配置 kafka 配置以像这样收听此主题
package com.example.tacocloud.config;
import com.example.tacocloud.model.jpa.Order;
import com.example.tacocloud.model.jpa.Taco;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties
public class KafkaConfig {
// Order
@Bean
@ConfigurationProperties("spring.kafka.consumer.order")
public Map<String, Object> orderConsumerConfig() {
return new HashMap<>();
}
@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory<>(orderConsumerConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer(
@Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig,
@Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(orderConsumerFactory);
return factory;
}
// Taco
@Bean
@ConfigurationProperties("spring.kafka.consumer.taco")
public Map<String, Object> tacoConsumerConfig() {
return new HashMap<>();
}
@Bean
public DefaultKafkaConsumerFactory tacoConsumerFactory(
@Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) {
return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer(
@Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig,
@Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(tacoConsumerFactory);
return factory;
}
}
所以,有两个DefaultKafkaConsumerFactory和两个ConcurrentKafkaListenerContainerFactory 之后,使用@KafkaListener 日志消息创建了一个服务:
package com.example.tacocloud.service;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaService {
@KafkaListener(topics = "tacocloud.orders.topic", groupId = "one")
public void order() {
System.out.println("Order");
}
@KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two")
public void taco() {
System.out.println("Taco");
}
}
application.yml 文件
spring:
kafka:
consumer:
order:
topic: tacocloud.orders.topic
"[bootstrap.servers]": localhost:29888
"[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
"[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer
template:
"[default.topic]": tacocloud.orders.topic
taco:
topic: tacocloud.tacos.topic
"[bootstrap.servers]": localhost:29888
"[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
"[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer
template:
"[default.topic]": tacocloud.tacos.topic
但是,当我启动 spring-boot 应用程序时,我看到错误消息(((
2022-04-15 21:38:35.918 ERROR 13888 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.16.jar:5.3.16] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.4.jar:2.6.4] at com.example.tacocloud.TacoCloudApplication.main(TacoCloudApplication.java:10) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.6.4.jar:2.6.4] Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:640) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:665) ~[kafka-clients-2.8.0.jar:na] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:416) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:384) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:360) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:327) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:304) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:758) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:344) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:331) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:276) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.16.jar:5.3.16] ... 19 common frames omitted
Process finished with exit code 0
感谢您提供样品。
所以,我在本地打开它并在这个bean定义中放置了一个断点:
@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory<Object, Order>(orderConsumerConfig);
}
orderConsumerConfig
地图看起来像这样:
orderConsumerConfig = {LinkedHashMap@8587} size = 1
"orderConsumerConfig" -> {HashMap@8600} size = 5
key = "orderConsumerConfig"
value = {HashMap@8600} size = 5
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
"template" -> {LinkedHashMap@8611} size = 1
"topic" -> "tacocloud.orders.topic"
"bootstrap.servers" -> "localhost:29888"
"value.deserializer" -> "sample.kafka.serializer.OrderDeserializer"
所以,您的 KafkaConsumer
未正确初始化确实不足为奇。您的目标地图配置隐藏在此注入地图的 orderConsumerConfig
条目下。
您介意与我分享您是从哪里想到 Map
bean 上的 @ConfigurationProperties
的吗?以及如何将其用作依赖注入?
我想根据属性做类似的事情(配置多个 ConsumerFactories)。
我使用 @ConfigurationProperties 创建了一个 Map
@Bean
@ConfigurationProperties("taco-cart.kafka")
public Map<String, String> tacoCartKafkaProperties() {
return new HashMap<>();
}
@Bean
public ConsumerFactory<String, TacoCart> tacoCartConsumerFactory(@Qualifier("tacoCartKafkaProperties") Map<String, String> tacoCartKafkaProperties) {
// Convert map.
Map<String, Object> config = new HashMap<>();
config.putAll(tacoCartKafkaProperties);
return new DefaultKafkaConsumerFactory<>(config);
}