如何有效地将我的@KafkaListener 绑定到 ConcurrentKafkaListenerContainerFactory?
How can I effectively bind my @KafkaListener to ConcurrentKafkaListenerContainerFactory?
我遇到了这个对我来说很奇怪的场景:
所以基本上我在一个class中定义了两个@KafkaListener
:
@KafkaListener(id = "listener1", idIsGroup = false, topics = "data1", containerFactory = "kafkaListenerContainerFactory")
public void receive(){}
@KafkaListener(id = "listener2", idIsGroup = false, topics = "data2", containerFactory = "kafkaListenerContainerFactory2")
public void receive(){}
它们的id
、topics
、containerFactory
是不同的,并且它们每个都依赖于另一个class中定义的不同的ConcurrentKafkaListenerContainerFactory
:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory("group1", "earliest"));
factory.setAutoStartup(false);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory("group2", "latest"));
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String, ConsumerRecord> consumerFactory(String groupId, String offset) {
Map<String, Object> config = new HashMap<>();
// dt is current timestamp in millisecond (epoch)
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + dt);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
// other config omitted
return new DefaultKafkaConsumerFactory<>(config);
}
所以我希望看到的(以及我想要实现的)是:
- 只有 listener2 会自动启动,因为
factory.setAutoStartup(true)
- Listener2 将从
group.id
"group2" 和 auto.offset.reset
"latest" 开始
- 稍后当 listener1 通过某个事件侦听器启动时,它将启动
group.id
"group1" 和 auto.offset.reset
"earlist"
但是,实际上只有第一个是可以保证的。 Listener2 可以从 {group2 + latest} 或 {group1 + earliest} 开始。稍后当 listener1 开始使用数据时,它只会重用 listener2 的配置(我可以看到包含时间戳的同一组 ID 在我的日志中打印了两次)
我的问题是,为什么 listener2 的组 ID 和偏移量配置是随机选择的,而 autoStartup 不是?为什么listener1会复用listener2的配置?
这是因为 consumerFactory
是单例 @Bean
并且参数在第二次调用时被忽略。
添加@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
到工厂每次获得一个新bean。
然而,你不需要任何这些,你可以简单地在注释上设置 groupId
属性 并避免所有这些额外的定义。
你也可以在注解上控制autoStartup
(自2.2起)。
编辑
在下面的评论中回答问题...
groupId = "#{'${group.id}' + T(java.time.Instant).now().toEpochMilli()}"
但是,如果您想要一个唯一的组 ID;这样比较靠谱...
groupId = "#{'${group.id}' + T(java.util.UUID).randomUUID()}"
我遇到了这个对我来说很奇怪的场景:
所以基本上我在一个class中定义了两个@KafkaListener
:
@KafkaListener(id = "listener1", idIsGroup = false, topics = "data1", containerFactory = "kafkaListenerContainerFactory")
public void receive(){}
@KafkaListener(id = "listener2", idIsGroup = false, topics = "data2", containerFactory = "kafkaListenerContainerFactory2")
public void receive(){}
它们的id
、topics
、containerFactory
是不同的,并且它们每个都依赖于另一个class中定义的不同的ConcurrentKafkaListenerContainerFactory
:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory("group1", "earliest"));
factory.setAutoStartup(false);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory("group2", "latest"));
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String, ConsumerRecord> consumerFactory(String groupId, String offset) {
Map<String, Object> config = new HashMap<>();
// dt is current timestamp in millisecond (epoch)
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + dt);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
// other config omitted
return new DefaultKafkaConsumerFactory<>(config);
}
所以我希望看到的(以及我想要实现的)是:
- 只有 listener2 会自动启动,因为
factory.setAutoStartup(true)
- Listener2 将从
group.id
"group2" 和auto.offset.reset
"latest" 开始
- 稍后当 listener1 通过某个事件侦听器启动时,它将启动
group.id
"group1" 和auto.offset.reset
"earlist"
但是,实际上只有第一个是可以保证的。 Listener2 可以从 {group2 + latest} 或 {group1 + earliest} 开始。稍后当 listener1 开始使用数据时,它只会重用 listener2 的配置(我可以看到包含时间戳的同一组 ID 在我的日志中打印了两次)
我的问题是,为什么 listener2 的组 ID 和偏移量配置是随机选择的,而 autoStartup 不是?为什么listener1会复用listener2的配置?
这是因为 consumerFactory
是单例 @Bean
并且参数在第二次调用时被忽略。
添加@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
到工厂每次获得一个新bean。
然而,你不需要任何这些,你可以简单地在注释上设置 groupId
属性 并避免所有这些额外的定义。
你也可以在注解上控制autoStartup
(自2.2起)。
编辑
在下面的评论中回答问题...
groupId = "#{'${group.id}' + T(java.time.Instant).now().toEpochMilli()}"
但是,如果您想要一个唯一的组 ID;这样比较靠谱...
groupId = "#{'${group.id}' + T(java.util.UUID).randomUUID()}"