如何在 KafkaBootstrapConfiguration 中覆盖 KafkaListenerEndpointRegistry
How to override KafkaListenerEndpointRegistry in KafkaBootstrapConfiguration
我需要向 KafkaListenerEndpointRegistry 添加一些逻辑 - 我想为我使用 @Listener 注释创建的每个主题注册额外的监听器(我想创建具有不同轮询时间的重试主题消费者链)。为此,我想尝试覆盖 registerListenerContainer 方法并在那里实现逻辑。
我做的第一步是添加与 KafkaBootstrapConfiguration 相同的默认配置。但在那之后,我所有的测试都失败了,出于某种原因,我的听众没有消费任何东西。如果我不添加 bean,一切正常。
@Configuration
@EnableKafka
public class CustomKafkaBootstrapConfiguration {
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry(){
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
//i need to add logic here
super.registerListenerContainer(endpoint, factory);
}
};
}
}
我刚刚复制了您的覆盖,一切都按预期工作。
@SpringBootApplication
public class So57674940Application {
public static void main(String[] args) {
SpringApplication.run(new Class<?>[] { So57674940Application.class, So57674940ApplicationConfig.class }, args);
}
@KafkaListener(id = "so57674940", topics = "so57674940")
public void listen(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
class So57674940ApplicationConfig {
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry() {
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
// i need to add logic here
System.out.println("in custom registry");
super.registerListenerContainer(endpoint, factory);
}
};
}
}
和
in custom registry
2019-08-27 11:20:36.251 INFO 33460 --- [o57674940-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so57674940-0]
我需要向 KafkaListenerEndpointRegistry 添加一些逻辑 - 我想为我使用 @Listener 注释创建的每个主题注册额外的监听器(我想创建具有不同轮询时间的重试主题消费者链)。为此,我想尝试覆盖 registerListenerContainer 方法并在那里实现逻辑。
我做的第一步是添加与 KafkaBootstrapConfiguration 相同的默认配置。但在那之后,我所有的测试都失败了,出于某种原因,我的听众没有消费任何东西。如果我不添加 bean,一切正常。
@Configuration
@EnableKafka
public class CustomKafkaBootstrapConfiguration {
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry(){
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
//i need to add logic here
super.registerListenerContainer(endpoint, factory);
}
};
}
}
我刚刚复制了您的覆盖,一切都按预期工作。
@SpringBootApplication
public class So57674940Application {
public static void main(String[] args) {
SpringApplication.run(new Class<?>[] { So57674940Application.class, So57674940ApplicationConfig.class }, args);
}
@KafkaListener(id = "so57674940", topics = "so57674940")
public void listen(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
class So57674940ApplicationConfig {
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
return new KafkaListenerEndpointRegistry() {
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
// i need to add logic here
System.out.println("in custom registry");
super.registerListenerContainer(endpoint, factory);
}
};
}
}
和
in custom registry
2019-08-27 11:20:36.251 INFO 33460 --- [o57674940-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so57674940-0]