为什么 spring-boot with Kafka 启动失败?

Why spring-boot with Kafka failed to start?

有 spring-boot 应用程序与 kafka 依赖关系, 有两个 Kafka 主题,需要从中读取消息

tacocloud.orders.topic
tacocloud.tacos.topic

并且已经在里面成功发送消息

已配置 kafka 配置以像这样收听此主题

 package com.example.tacocloud.config;

import com.example.tacocloud.model.jpa.Order;
import com.example.tacocloud.model.jpa.Taco;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties
public class KafkaConfig {

  // Order

    @Bean
    @ConfigurationProperties("spring.kafka.consumer.order")
    public Map<String, Object> orderConsumerConfig() {
        return new HashMap<>();
    }

    @Bean
    public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
        Map<String, Object> orderConsumerConfig) {
        return new DefaultKafkaConsumerFactory<>(orderConsumerConfig);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer(
        @Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig,
        @Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(orderConsumerFactory);
        return factory;
    }

    // Taco

    @Bean
    @ConfigurationProperties("spring.kafka.consumer.taco")
    public Map<String, Object> tacoConsumerConfig() {
        return new HashMap<>();
    }

    @Bean
    public DefaultKafkaConsumerFactory tacoConsumerFactory(
        @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) {
        return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer(
        @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig,
        @Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(tacoConsumerFactory);
        return factory;
    }
}

所以,有两个DefaultKafkaConsumerFactory和两个ConcurrentKafkaListenerContainerFactory 之后,使用@KafkaListener 日志消息创建了一个服务:

package com.example.tacocloud.service;

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@EnableKafka
public class KafkaService {

    @KafkaListener(topics = "tacocloud.orders.topic", groupId = "one")
    public void order() {
        System.out.println("Order");
    }

    @KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two")
    public void taco() {
        System.out.println("Taco");
    }
}

application.yml 文件

spring:
  kafka:
    consumer:
      order:
        topic: tacocloud.orders.topic
        "[bootstrap.servers]": localhost:29888
        "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
        "[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer
        template:
          "[default.topic]": tacocloud.orders.topic
      taco:
        topic: tacocloud.tacos.topic
        "[bootstrap.servers]": localhost:29888
        "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
        "[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer
        template:
          "[default.topic]": tacocloud.tacos.topic

但是,当我启动 spring-boot 应用程序时,我看到错误消息(((

2022-04-15 21:38:35.918 ERROR 13888 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.access0(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.16.jar:5.3.16] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.16.jar:5.3.16] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.4.jar:2.6.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.4.jar:2.6.4] at com.example.tacocloud.TacoCloudApplication.main(TacoCloudApplication.java:10) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2.6.4.jar:2.6.4] Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:640) ~[kafka-clients-2.8.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:665) ~[kafka-clients-2.8.0.jar:na] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:416) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:384) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:360) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:327) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:304) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:758) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:344) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:442) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:331) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:276) ~[spring-kafka-2.8.3.jar:2.8.3] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.16.jar:5.3.16] ... 19 common frames omitted

Process finished with exit code 0

感谢您提供样品。

所以,我在本地打开它并在这个bean定义中放置了一个断点:

@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
    Map<String, Object> orderConsumerConfig) {
    return new DefaultKafkaConsumerFactory<Object, Order>(orderConsumerConfig);
}

orderConsumerConfig 地图看起来像这样:

orderConsumerConfig = {LinkedHashMap@8587}  size = 1
 "orderConsumerConfig" -> {HashMap@8600}  size = 5
  key = "orderConsumerConfig"
  value = {HashMap@8600}  size = 5
   "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
   "template" -> {LinkedHashMap@8611}  size = 1
   "topic" -> "tacocloud.orders.topic"
   "bootstrap.servers" -> "localhost:29888"
   "value.deserializer" -> "sample.kafka.serializer.OrderDeserializer"

所以,您的 KafkaConsumer 未正确初始化确实不足为奇。您的目标地图配置隐藏在此注入地图的 orderConsumerConfig 条目下。

您介意与我分享您是从哪里想到 Map bean 上的 @ConfigurationProperties 的吗?以及如何将其用作依赖注入?

我想根据属性做类似的事情(配置多个 ConsumerFactories)。 我使用 @ConfigurationProperties 创建了一个 Map 而不是 Map,然后将该地图的项目添加到一个新的 Map 中。不确定为什么 Spring-Boot 以这种方式加载 Map

@Bean
@ConfigurationProperties("taco-cart.kafka")
public Map<String, String> tacoCartKafkaProperties() {
    return new HashMap<>();
}

@Bean
public ConsumerFactory<String, TacoCart> tacoCartConsumerFactory(@Qualifier("tacoCartKafkaProperties") Map<String, String> tacoCartKafkaProperties) {

    // Convert map.
    Map<String, Object> config = new HashMap<>();
    config.putAll(tacoCartKafkaProperties);

    return new DefaultKafkaConsumerFactory<>(config);
}