Spring 集成消息驱动的通道适配器不适用于 Spring-Kafka 2.3+
Spring Integration Message Driven Channel Adapter not working with Spring-Kafka 2.3+
我在尝试让消息驱动通道适配器与 Spring-Kafka 2.3+ 一起工作时遇到以下问题。有没有人有任何示例代码可以帮助我?
1. org.springframework.kafka.listener.config.ContainerProperties实际上并不存在。
2。 org.springframework.kafka.listener.ContainerProperties 确实存在,但在尝试 运行.
时会产生以下问题
描述:
试图调用不存在的方法。尝试是从以下位置进行的:
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.onInit(KafkaMessageDrivenChannelAdapter.java:318)
不存在以下方法:
org.springframework.kafka.listener.ContainerProperties.isDeliveryAttemptHeader()Z
3。如果您使用 kafka 2.5 及更高版本,则会出现此问题,但会替换为
2021-03-22 13:56:05.102-0400 org{local_sparta} WARN [data-pipeline,] [DP-ACCOUNT] [DPA] [] AnnotationConfigServletWebServerApplicationContext:main 上下文期间遇到异常初始化 - 取消刷新尝试:org.springframework.beans.factory.BeanCreationException:创建名称为 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration' 的 bean 时出错:通过构造函数实例化 Bean 失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration]:构造函数抛出异常;嵌套异常是 org.springframework.beans.factory.UnsatisfiedDependencyException:在 class 路径资源 [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class] 中定义名称 'kafkaTemplate' 创建 bean 时出错:通过方法 [=61 表达的不满足的依赖关系=]参数0;嵌套异常是 org.springframework.beans.factory.NoSuchBeanDefinitionException:没有可用类型 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' 的符合条件的 bean:预计至少有 1 个符合自动装配候选条件的 bean。依赖注解:{}
尝试使用下面的 Java 版本和 XML 版本都给出相同的错误。
Java版本
@Configuration
@Slf4j
public class KafkaChannelConsumer {
@Autowired
MessageChannel preRouterLOB;
@Value("${spring.kafka.bootstrap-servers:localhost9092}")
private String bootstrapServers;
@Value("${spring.kafka.topic:55iptest}")
private String springIntegrationKafkaTopic;
@Bean
public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(
kafkaListenerContainer());
kafkaMessageDrivenChannelAdapter.setOutputChannel(preRouterLOB);
return kafkaMessageDrivenChannelAdapter;
}
@SuppressWarnings("unchecked")
@Bean
public ConcurrentMessageListenerContainer kafkaListenerContainer() {
ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);
return (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer(
consumerFactory(), containerProps);
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map properties = new HashMap();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dummy");
return properties;
}
}
XML版本
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
第 1 期和第 2 期的 POM
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>5.4.5</version>
</dependency>
这包括版本 Spring-Kafka 2.3.6
第 3 期的 POM
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>5.4.5</version>
<exclusions>
<exclusion>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
<version>5.4.5</version>
This includes version Spring-Kafka 2.3.6
不,它没有; spring-integration-kafka 的 5.4.x 版本需要 2.6.x;该方法已添加到 2.5 中的属性中。
查看兼容版本的项目页面。
https://spring.io/projects/spring-kafka
如果您使用 Spring 引导,它将引入所有正确的版本,您根本不应在 pom 中指定版本。
对于问题 3,您似乎在某处声明了一个与 kafka 模板 bean 不兼容的生产者工厂。
看起来你在搞乱不同的版本。
由于您的项目是基于 Spring Boot 的,因此您肯定必须依赖它提供的依赖项的版本。某些版本组合确实不会兼容。例如,ContainerProperties
的 deliveryAttemptHeader
属性 已从 2.5
:
开始引入
/**
* Set to true to populate the
* {@link org.springframework.kafka.support.KafkaHeaders#DELIVERY_ATTEMPT} header when
* the error handler or after rollback processor implements
* {@code DeliveryAttemptAware}. There is a small overhead so this is false by
* default.
* @param deliveryAttemptHeader true to populate
* @since 2.5
*/
public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader) {
只需确保您依赖 Spring 启动插件及其依赖管理。 Spring Boot 中的所有 dep 一起测试。只有当您尝试更改 deps int 到您自己的版本时才会遇到的问题。
我在尝试让消息驱动通道适配器与 Spring-Kafka 2.3+ 一起工作时遇到以下问题。有没有人有任何示例代码可以帮助我?
1. org.springframework.kafka.listener.config.ContainerProperties实际上并不存在。
2。 org.springframework.kafka.listener.ContainerProperties 确实存在,但在尝试 运行.
时会产生以下问题描述:
试图调用不存在的方法。尝试是从以下位置进行的:
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.onInit(KafkaMessageDrivenChannelAdapter.java:318)
不存在以下方法:
org.springframework.kafka.listener.ContainerProperties.isDeliveryAttemptHeader()Z
3。如果您使用 kafka 2.5 及更高版本,则会出现此问题,但会替换为 2021-03-22 13:56:05.102-0400 org{local_sparta} WARN [data-pipeline,] [DP-ACCOUNT] [DPA] [] AnnotationConfigServletWebServerApplicationContext:main 上下文期间遇到异常初始化 - 取消刷新尝试:org.springframework.beans.factory.BeanCreationException:创建名称为 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration' 的 bean 时出错:通过构造函数实例化 Bean 失败;嵌套异常是 org.springframework.beans.BeanInstantiationException:无法实例化 [org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration]:构造函数抛出异常;嵌套异常是 org.springframework.beans.factory.UnsatisfiedDependencyException:在 class 路径资源 [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class] 中定义名称 'kafkaTemplate' 创建 bean 时出错:通过方法 [=61 表达的不满足的依赖关系=]参数0;嵌套异常是 org.springframework.beans.factory.NoSuchBeanDefinitionException:没有可用类型 'org.springframework.kafka.core.ProducerFactory<java.lang.Object, java.lang.Object>' 的符合条件的 bean:预计至少有 1 个符合自动装配候选条件的 bean。依赖注解:{}
尝试使用下面的 Java 版本和 XML 版本都给出相同的错误。
Java版本
@Configuration
@Slf4j
public class KafkaChannelConsumer {
@Autowired
MessageChannel preRouterLOB;
@Value("${spring.kafka.bootstrap-servers:localhost9092}")
private String bootstrapServers;
@Value("${spring.kafka.topic:55iptest}")
private String springIntegrationKafkaTopic;
@Bean
public KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(
kafkaListenerContainer());
kafkaMessageDrivenChannelAdapter.setOutputChannel(preRouterLOB);
return kafkaMessageDrivenChannelAdapter;
}
@SuppressWarnings("unchecked")
@Bean
public ConcurrentMessageListenerContainer kafkaListenerContainer() {
ContainerProperties containerProps = new ContainerProperties(springIntegrationKafkaTopic);
return (ConcurrentMessageListenerContainer) new ConcurrentMessageListenerContainer(
consumerFactory(), containerProps);
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
@Bean
public Map consumerConfigs() {
Map properties = new HashMap();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dummy");
return properties;
}
}
XML版本
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
第 1 期和第 2 期的 POM
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>5.4.5</version>
</dependency>
这包括版本 Spring-Kafka 2.3.6
第 3 期的 POM
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>5.4.5</version>
<exclusions>
<exclusion>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
<version>5.4.5</version>
This includes version Spring-Kafka 2.3.6
不,它没有; spring-integration-kafka 的 5.4.x 版本需要 2.6.x;该方法已添加到 2.5 中的属性中。
查看兼容版本的项目页面。
https://spring.io/projects/spring-kafka
如果您使用 Spring 引导,它将引入所有正确的版本,您根本不应在 pom 中指定版本。
对于问题 3,您似乎在某处声明了一个与 kafka 模板 bean 不兼容的生产者工厂。
看起来你在搞乱不同的版本。
由于您的项目是基于 Spring Boot 的,因此您肯定必须依赖它提供的依赖项的版本。某些版本组合确实不会兼容。例如,ContainerProperties
的 deliveryAttemptHeader
属性 已从 2.5
:
/**
* Set to true to populate the
* {@link org.springframework.kafka.support.KafkaHeaders#DELIVERY_ATTEMPT} header when
* the error handler or after rollback processor implements
* {@code DeliveryAttemptAware}. There is a small overhead so this is false by
* default.
* @param deliveryAttemptHeader true to populate
* @since 2.5
*/
public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader) {
只需确保您依赖 Spring 启动插件及其依赖管理。 Spring Boot 中的所有 dep 一起测试。只有当您尝试更改 deps int 到您自己的版本时才会遇到的问题。