setConsumerRebalanceListener 如何获取调用的 ConsumerAwareRebalanceListener 实例?

How can setConsumerRebalanceListener get the calling ConsumerAwareRebalanceListener instance?

也许这是一个幼稚的问题,但它让我在那里停留了一段时间。请多多包涵。

我有一个 class DataConsumer.java 实现 ConsumerAwareRebalanceListener:

@Component
public class DataConsumer implements ConsumerAwareRebalanceListener {
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // seek offsets based on a given timestamp
    }
    @KafkaListener(topics = "dataTopic", containerFactory = "kafkaListenerContainerFactory")
    receive(ConsumerRecord payload) {}
}

所以为了 onPartitionsAssigned 工作,我需要在另一个 class 中定义的 kafkaListenerContainerFactory 方法中调用 setConsumerRebalanceListener,如下所示:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.getContainerProperties().setConsumerRebalanceListener(____________);
    // rest part omitted
}

我的问题是关于上面的 ____________ 部分。我应该放什么?

在我的理解中,当我们在 DataConsumer class 中初始化 @KafkaListener 容器时调用方法 kafkaListenerContainerFactory,这意味着已经有一个现有的 DataConsumer 实例来保存 @kafkaLister.我如何将已经存在的 DataConsumer 实例传递给 setConsumerRebalanceListener 函数?

我能搜索到的所有示例代码片段如下:

setConsumerRebalanceListener(new ConsumerRebalanceListener() {
    //override the functions
})

但这不是在创建一个新的实例吗?如果我放置 new DataConsumer() 它将在现有实例中丢失一些状态(例如,寻找偏移量的时间戳)所以这不起作用。

您可以将 DataConsumer 声明为 @Bean(而不是使用 @Component),然后您可以在那里注入您的 bean。

但是,在这种情况下使用这种机制是错误的。

改为实施 ConsumerSeekAware - 容器将自动检测到您的侦听器实施了它,并将调用其 onPartitionsAssigned.

请参阅文档中的 Seeking to a Specific Offset