使用 Spring 引导构建 kafka 消费者失败
Failed to construct kafka consumer with Spring Boot
我已经实现了一个简单的消费者应用程序来消费来自主题的消息。当我 运行 kafka-consumer 应用程序时,发生了以下错误。
StackTrace
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]
org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
配置class
@Configuration
@EnableKafka
public class KafkaConfig {
private ConsumerFactory<String,String> consumerFactory()
{
Map<String,Object> config=new ConcurrentHashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_string");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String,String> factory
=new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
听众class
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"Kafka_Example"},groupId = "group_string")
public void consume(String message)
{
System.out.println("Consumed Message "+message);
}
}
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
注意-我的Kafka版本是2.13-2.6.0
您正在使用 StringSerializer
but should use a StringDeserializer
,一个序列化,另一个 de序列化。
并且由于您将它们设置为 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
和 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
您显然想要 de 序列化。
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
我已经实现了一个简单的消费者应用程序来消费来自主题的消息。当我 运行 kafka-consumer 应用程序时,发生了以下错误。
StackTrace
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.9.RELEASE.jar:5.2.9.RELEASE]
org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Deserializer
配置class
@Configuration
@EnableKafka
public class KafkaConfig {
private ConsumerFactory<String,String> consumerFactory()
{
Map<String,Object> config=new ConcurrentHashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_string");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String,String> factory
=new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
听众class
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"Kafka_Example"},groupId = "group_string")
public void consume(String message)
{
System.out.println("Consumed Message "+message);
}
}
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.1</version>
</dependency>
注意-我的Kafka版本是2.13-2.6.0
您正在使用 StringSerializer
but should use a StringDeserializer
,一个序列化,另一个 de序列化。
并且由于您将它们设置为 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
和 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
您显然想要 de 序列化。
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);