setConsumerRebalanceListener 如何获取调用的 ConsumerAwareRebalanceListener 实例?
How can setConsumerRebalanceListener get the calling ConsumerAwareRebalanceListener instance?
也许这是一个幼稚的问题,但它让我在那里停留了一段时间。请多多包涵。
我有一个 class DataConsumer.java 实现 ConsumerAwareRebalanceListener
:
@Component
public class DataConsumer implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// seek offsets based on a given timestamp
}
@KafkaListener(topics = "dataTopic", containerFactory = "kafkaListenerContainerFactory")
receive(ConsumerRecord payload) {}
}
所以为了 onPartitionsAssigned
工作,我需要在另一个 class 中定义的 kafkaListenerContainerFactory
方法中调用 setConsumerRebalanceListener
,如下所示:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setConsumerRebalanceListener(____________);
// rest part omitted
}
我的问题是关于上面的 ____________
部分。我应该放什么?
在我的理解中,当我们在 DataConsumer class 中初始化 @KafkaListener
容器时调用方法 kafkaListenerContainerFactory
,这意味着已经有一个现有的 DataConsumer 实例来保存 @kafkaLister
.我如何将已经存在的 DataConsumer 实例传递给 setConsumerRebalanceListener
函数?
我能搜索到的所有示例代码片段如下:
setConsumerRebalanceListener(new ConsumerRebalanceListener() {
//override the functions
})
但这不是在创建一个新的实例吗?如果我放置 new DataConsumer()
它将在现有实例中丢失一些状态(例如,寻找偏移量的时间戳)所以这不起作用。
您可以将 DataConsumer
声明为 @Bean
(而不是使用 @Component
),然后您可以在那里注入您的 bean。
但是,在这种情况下使用这种机制是错误的。
改为实施 ConsumerSeekAware
- 容器将自动检测到您的侦听器实施了它,并将调用其 onPartitionsAssigned
.
请参阅文档中的 Seeking to a Specific Offset。
也许这是一个幼稚的问题,但它让我在那里停留了一段时间。请多多包涵。
我有一个 class DataConsumer.java 实现 ConsumerAwareRebalanceListener
:
@Component
public class DataConsumer implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// seek offsets based on a given timestamp
}
@KafkaListener(topics = "dataTopic", containerFactory = "kafkaListenerContainerFactory")
receive(ConsumerRecord payload) {}
}
所以为了 onPartitionsAssigned
工作,我需要在另一个 class 中定义的 kafkaListenerContainerFactory
方法中调用 setConsumerRebalanceListener
,如下所示:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setConsumerRebalanceListener(____________);
// rest part omitted
}
我的问题是关于上面的 ____________
部分。我应该放什么?
在我的理解中,当我们在 DataConsumer class 中初始化 @KafkaListener
容器时调用方法 kafkaListenerContainerFactory
,这意味着已经有一个现有的 DataConsumer 实例来保存 @kafkaLister
.我如何将已经存在的 DataConsumer 实例传递给 setConsumerRebalanceListener
函数?
我能搜索到的所有示例代码片段如下:
setConsumerRebalanceListener(new ConsumerRebalanceListener() {
//override the functions
})
但这不是在创建一个新的实例吗?如果我放置 new DataConsumer()
它将在现有实例中丢失一些状态(例如,寻找偏移量的时间戳)所以这不起作用。
您可以将 DataConsumer
声明为 @Bean
(而不是使用 @Component
),然后您可以在那里注入您的 bean。
但是,在这种情况下使用这种机制是错误的。
改为实施 ConsumerSeekAware
- 容器将自动检测到您的侦听器实施了它,并将调用其 onPartitionsAssigned
.
请参阅文档中的 Seeking to a Specific Offset。