组合 spring-kafka 和 reactor-kafka 时出现意外的循环依赖

Unexpected cyclic dependency while combining spring-kafka and reactor-kafka

以下代码在 receiverOptions 和模板之间产生了意外的循环依赖:

令人惊讶的是,如果从 spring 上下文中删除 kafkaProps,它会起作用。

看起来某些自动配置正在从模板向 receiverOptions 添加不必要的依赖项。

请建议配置 ReactiveKafkaConsumerTemplate 的正确方法。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public Map<String, Object> kafkaProps() {
        return Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092",
            ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName()
        );
    }

    @Bean
    public ReceiverOptions<String, String> receiverOptions(Map<String, Object> kafkaProps) {
        return ReceiverOptions.<String, String>create(kafkaProps)
            .withKeyDeserializer(new StringDeserializer())
            .withValueDeserializer(new StringDeserializer())
            .subscription(List.of("test-topic"));
    }

    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> template(ReceiverOptions<String, String> receiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }

    @Bean
    public ApplicationRunner runner(ReactiveKafkaConsumerTemplate<String, String> kafkaReceiver) {
        return args -> kafkaReceiver.receive()
            .log()
            .subscribe();
    }
}

完整项目可在 https://github.com/piddubnyi/spring-reactor-kafka-ciclyc

找到

完整堆栈跟踪:

2020-01-07 16:29:51.031 DEBUG 24915 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : Application failed to start due to an exception

org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'receiverOptions': Requested bean is currently in creation: Is there an unresolvable circular reference?
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.beforeSingletonCreation(DefaultSingletonBeanRegistry.java:339) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:215) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1287) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1207) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:885) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:789) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:539) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=12=](AbstractBeanFactory.java:323) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.addCandidateEntry(DefaultListableBeanFactory.java:1503) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.findAutowireCandidates(DefaultListableBeanFactory.java:1467) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveMultipleBeans(DefaultListableBeanFactory.java:1386) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1245) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1207) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:885) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:789) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:539) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean[=12=](AbstractBeanFactory.java:323) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:879) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at com.example.demo.DemoApplication.main(DemoApplication.java:19) ~[main/:na]

我想 Spring 不知道要注入哪个 Map<String, Object>。尝试将 @Qualifier("kafkaProps") 添加到 kafkaProps 参数:

@Bean
public ReceiverOptions<String, String> receiverOptions(@Qualifier("kafkaProps") Map<String, Object> kafkaProps) {
    // ...
}

更新

奇怪的是,在添加 @Qualifier 之后 Spring 将 kafkaProps 包裹在另一个地图中...

现在,也许只使用 Properties 而不是 Map:

@Bean
public Properties kafkaProps() {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName());
    return props;
}

@Bean
public ReceiverOptions<String, String> receiverOptions(@Qualifier("kafkaProps") Properties kafkaProps) {
    // ...
}

更新 2

关于奇怪的Map<String, Object>注入:其实并没有那么奇怪。 Map<String, SomeType>,按类型注入时,以特殊方式处理: 它收集所有 MyType 类型的 bean(键是 bean 名称)。 因此,在您的情况下,Map<String, Object> 是上下文中所有 bean 的映射,这显然 以循环引用结束。 List<Object>Set<Object> 也会发生同样的事情。 添加 @Qualifier 注释不会改变此行为,但会过滤 bean。 因此 kafkaProps 映射是唯一通过过滤器的 bean,它似乎 在另一张地图中 'wrapped'。

Even typed Map instances can be autowired as long as the expected key type is String. The map values contain all beans of the expected type, and the keys contain the corresponding bean names (...)

切换到按名称注入应该可以解决问题。通常它会通过添加 @Resource(name = "kafkaProps") 而不是(隐式) @Autowired 注释来完成,但不幸的是, @Resource 注释不能放在方法参数上。

但还有另一种解决方案。由于两个 bean 在相同的 @Configuration class 中声明, 你可以直接打电话给kafkaProps()。 Spring 将 return 相同的 bean, 不是每次调用的新实例(注意,它不适用于静态方法)

@Bean
public ReceiverOptions<String, String> receiverOptions() {
    return ReceiverOptions.<String, String>create(kafkaProps())
        ...
}

参考文献: