spring 总是需要 KafkaTemplate 吗?
Does spring always require KafkaTemplate?
问题:springboot是否总是需要创建一个KafkaTemplate类型的bean?
Details/stacktrace/codebase下面,请告诉我我做错了什么。谢谢
- 我一直在从 spring 引导项目
向主题发布消息
- 为了创建回调机制,我使用了 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord
, Callback) 来发送消息并创建回调
- 我这样做的原因是因为 listenablefuture 在使用 KafkaTemplate 时只提供失败的异常(我想将回调注册为一个单独的可重用 class 在我所有的用例中)
- 但是,当我没有定义类型为 KafkaTemplate 的 bean 时,spring 无法启动并出现以下错误
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=15=](AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12]
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na]
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na]
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12]
... 22 common frames omitted
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12]
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12]
... 40 common frames omitted
我的Kafka配置如下
@Configuration
public class KafkaEventConfig {
private final KafkaProperties kafkaProperties;
@Value("${client.id}")
private String clientId;
@Value("${topic.movie.name}")
private String movieTopicName;
@Value("${retry.backoff.ms}")
private int retryBackoffMilliseconds;
@Value("${request.timeout.ms}")
private int requestTimeoutMilliseconds;
public KafkaEventConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ProducerFactory<String, Movie> producerFactory() {
Map<String, Object> props = kafkaProperties.buildProducerProperties();
populateCommonProperties(props);
return new DefaultKafkaProducerFactory<>(props);
}
private void populateCommonProperties(Map<String, Object> props) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
}
@Bean
public KafkaProducer<String, Movie> movieKafkaProducer() {
return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
}
@Bean
public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
MeterRegistry registry) {
return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
}
下面是我的Kafka回调
@Slf4j
public class KafkaProducerCallBack<K, V> implements Callback {
private ProducerRecord<K, V> producerRecord;
public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {
this.producerRecord = producerRecord;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
String topicName= metadata.topic();
long offset= metadata.offset();
if (exception != null) {
log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
}
else {
log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
}
}
}
我这样发布消息
movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));
请注意,当我在 KafkaEventConfig 中添加以下行时,一切正常
@Bean
public KafkaTemplate<String, Movie> movieKafkaTemplate() {
return new KafkaTemplate<String, Movie>(producerFactory());
}
仔细查看您的异常堆栈跟踪可以发现问题所在。
Error creating bean with name 'kafkaTemplate' defined in class path resource
[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]:
Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
错误来自于无法在 Spring Boot 中应用基于 Kafka 的自动配置。 class KafkaAutoConfiguration
期望并配置一些 bean,如果找到某些 bean,则退出。当您配置一些 bean 时,这将部分退避,因此无法自动配置 Kafka classes。
要修复,您可以排除 KafkaAutoConfiguration
。您可以在 @SpringBootApplication
注释中执行此操作,例如
@SpringBootApplication(exclude={KafkaAutoConfiguration.class}
或者您可以利用自动配置并让 Spring 引导进行配置,然后使用提供的 KafkaTemplate
或 ProducerFactory
来执行您想要的操作。
后者会简化您自己的配置。我对 Kafka 自动配置和你的用例知之甚少,无法提供更有用的代码片段,但你应该能够自己弄清楚,或者只是排除 KafkaAutoConfiguration
并使用你现在拥有的.
除了@M.Deinum提到的后者:
看看 KafkaAutoConfiguration
class:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
如果您不创建自己的 bean,Springboot 将为您创建一个 KafkaTemplate
bean。这个自动配置的 bean 依赖于 ProducerFactory<Object, Object>
bean,因为你声明了一个 ProducerFactory<String, Movie>
。如您所见,类型不合适,这就是您收到错误的原因。
the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)
你的情况,你仍然可以获得使用KafkaTemplate
的优势。您可以实现自己的 ProducerListener<K, V>
并将其绑定到您的 KafkaTemple
,而不是实现 Callback
。例如:
FullLoggingProducerListener.class
public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) {
log.info("Successful!");
}
@Override
public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
log.error("Error!");
}
}
YourConfigration.class
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
}
现在,每次使用 KafkaTemplate
发送记录时,您都会看到日志。
问题:springboot是否总是需要创建一个KafkaTemplate类型的bean? Details/stacktrace/codebase下面,请告诉我我做错了什么。谢谢
- 我一直在从 spring 引导项目 向主题发布消息
- 为了创建回调机制,我使用了 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord
, Callback) 来发送消息并创建回调 - 我这样做的原因是因为 listenablefuture 在使用 KafkaTemplate 时只提供失败的异常(我想将回调注册为一个单独的可重用 class 在我所有的用例中)
- 但是,当我没有定义类型为 KafkaTemplate 的 bean 时,spring 无法启动并出现以下错误
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaTemplate' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {} at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=15=](AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na] at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na] at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12] ... 22 common frames omitted Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {} at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12] at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12] ... 40 common frames omitted
我的Kafka配置如下
@Configuration
public class KafkaEventConfig {
private final KafkaProperties kafkaProperties;
@Value("${client.id}")
private String clientId;
@Value("${topic.movie.name}")
private String movieTopicName;
@Value("${retry.backoff.ms}")
private int retryBackoffMilliseconds;
@Value("${request.timeout.ms}")
private int requestTimeoutMilliseconds;
public KafkaEventConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ProducerFactory<String, Movie> producerFactory() {
Map<String, Object> props = kafkaProperties.buildProducerProperties();
populateCommonProperties(props);
return new DefaultKafkaProducerFactory<>(props);
}
private void populateCommonProperties(Map<String, Object> props) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
}
@Bean
public KafkaProducer<String, Movie> movieKafkaProducer() {
return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
}
@Bean
public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
MeterRegistry registry) {
return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
}
下面是我的Kafka回调
@Slf4j
public class KafkaProducerCallBack<K, V> implements Callback {
private ProducerRecord<K, V> producerRecord;
public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) {
this.producerRecord = producerRecord;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
String topicName= metadata.topic();
long offset= metadata.offset();
if (exception != null) {
log.error("Failed to produce message [{}] to topic {} with exception {}", producerRecord, topicName, exception);
}
else {
log.info("Sucessfully published message [{}] to topic {} to offset {}", producerRecord, topicName , offset);
}
}
}
我这样发布消息
movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));
请注意,当我在 KafkaEventConfig 中添加以下行时,一切正常
@Bean
public KafkaTemplate<String, Movie> movieKafkaTemplate() {
return new KafkaTemplate<String, Movie>(producerFactory());
}
仔细查看您的异常堆栈跟踪可以发现问题所在。
Error creating bean with name 'kafkaTemplate' defined in class path resource
[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]:
Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
错误来自于无法在 Spring Boot 中应用基于 Kafka 的自动配置。 class KafkaAutoConfiguration
期望并配置一些 bean,如果找到某些 bean,则退出。当您配置一些 bean 时,这将部分退避,因此无法自动配置 Kafka classes。
要修复,您可以排除 KafkaAutoConfiguration
。您可以在 @SpringBootApplication
注释中执行此操作,例如
@SpringBootApplication(exclude={KafkaAutoConfiguration.class}
或者您可以利用自动配置并让 Spring 引导进行配置,然后使用提供的 KafkaTemplate
或 ProducerFactory
来执行您想要的操作。
后者会简化您自己的配置。我对 Kafka 自动配置和你的用例知之甚少,无法提供更有用的代码片段,但你应该能够自己弄清楚,或者只是排除 KafkaAutoConfiguration
并使用你现在拥有的.
除了@M.Deinum提到的后者:
看看 KafkaAutoConfiguration
class:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
如果您不创建自己的 bean,Springboot 将为您创建一个 KafkaTemplate
bean。这个自动配置的 bean 依赖于 ProducerFactory<Object, Object>
bean,因为你声明了一个 ProducerFactory<String, Movie>
。如您所见,类型不合适,这就是您收到错误的原因。
the reason i did this way is because listenablefuture when using KafkaTemplate only provides exception on failures( and i wanted to register callbacks as a separate reusable class across all my usecases)
你的情况,你仍然可以获得使用KafkaTemplate
的优势。您可以实现自己的 ProducerListener<K, V>
并将其绑定到您的 KafkaTemple
,而不是实现 Callback
。例如:
FullLoggingProducerListener.class
public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) {
log.info("Successful!");
}
@Override
public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
log.error("Error!");
}
}
YourConfigration.class
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
}
现在,每次使用 KafkaTemplate
发送记录时,您都会看到日志。