如何在 KafkaBootstrapConfiguration 中覆盖 KafkaListenerEndpointRegistry

How to override KafkaListenerEndpointRegistry in KafkaBootstrapConfiguration

我需要向 KafkaListenerEndpointRegistry 添加一些逻辑 - 我想为我使用 @Listener 注释创建的每个主题注册额外的监听器(我想创建具有不同轮询时间的重试主题消费者链)。为此,我想尝试覆盖 registerListenerContainer 方法并在那里实现逻辑。

我做的第一步是添加与 KafkaBootstrapConfiguration 相同的默认配置。但在那之后,我所有的测试都失败了,出于某种原因,我的听众没有消费任何东西。如果我不添加 bean,一切正常。

@Configuration
@EnableKafka
public class CustomKafkaBootstrapConfiguration {

  @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
  public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
    return new KafkaListenerEndpointRegistry(){
        @Override
        public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
            //i need to add logic here
            super.registerListenerContainer(endpoint, factory);
        }
    };
  }
}

我刚刚复制了您的覆盖,一切都按预期工作。

@SpringBootApplication
public class So57674940Application {

    public static void main(String[] args) {
        SpringApplication.run(new Class<?>[] { So57674940Application.class, So57674940ApplicationConfig.class }, args);
    }

    @KafkaListener(id = "so57674940", topics = "so57674940")
    public void listen(String in) {
        System.out.println(in);
    }

}

@Configuration
@EnableKafka
class So57674940ApplicationConfig {

    @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
        return new KafkaListenerEndpointRegistry() {
            @Override
            public void registerListenerContainer(KafkaListenerEndpoint endpoint,
                    KafkaListenerContainerFactory<?> factory) {
                // i need to add logic here
                System.out.println("in custom registry");
                super.registerListenerContainer(endpoint, factory);
            }
        };
    }

}

in custom registry
2019-08-27 11:20:36.251  INFO 33460 --- [o57674940-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so57674940-0]