我的自定义 class 实施 ConsumerAwareRebalanceListener 不工作

my custom class implementing ConsumerAwareRebalanceListener is not working

我正在使用 spring-kafka-2.2.7-RELEASE。当消费者准备好使用消息并尝试使用 ConsumerAwareRebalanceListener 但它不起作用时,我正在尝试捕获事件。请提出建议。

@Component
public class ConsumerAwareRebalanceListenerImpl implements ConsumerAwareRebalanceListener {

  public void ConsumerAwareRebalanceListenerImpl(){
    System.out.println(" In ConsumerAwareRebalanceListenerImpl constructor");
  }

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    partitions.forEach( item -> {
      TestConsumerConstants.consumerEventsMap.put("key-"+item.partition(), item.partition());
    });

  }

  @Override
  public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
    TestConsumerConstants.consumerEventsMap.put(consumer.toString(), partitions);
  }

}

我解决了问题,现在可以正常使用了。我所要做的就是将 ConcurrentKafkaListenerContainerFactory 的 containerProperty 'setConsumerRebalanceListener' 设置为我的自定义 class 实例,如下所示

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setConsumerRebalanceListener(kafkaConsumerRebalanceListener);