我的自定义 class 实施 ConsumerAwareRebalanceListener 不工作
my custom class implementing ConsumerAwareRebalanceListener is not working
我正在使用 spring-kafka-2.2.7-RELEASE。当消费者准备好使用消息并尝试使用 ConsumerAwareRebalanceListener 但它不起作用时,我正在尝试捕获事件。请提出建议。
@Component
public class ConsumerAwareRebalanceListenerImpl implements ConsumerAwareRebalanceListener {
public void ConsumerAwareRebalanceListenerImpl(){
System.out.println(" In ConsumerAwareRebalanceListenerImpl constructor");
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach( item -> {
TestConsumerConstants.consumerEventsMap.put("key-"+item.partition(), item.partition());
});
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
TestConsumerConstants.consumerEventsMap.put(consumer.toString(), partitions);
}
}
我解决了问题,现在可以正常使用了。我所要做的就是将 ConcurrentKafkaListenerContainerFactory 的 containerProperty 'setConsumerRebalanceListener' 设置为我的自定义 class 实例,如下所示
ConcurrentKafkaListenerContainerFactory factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setConsumerRebalanceListener(kafkaConsumerRebalanceListener);
我正在使用 spring-kafka-2.2.7-RELEASE。当消费者准备好使用消息并尝试使用 ConsumerAwareRebalanceListener 但它不起作用时,我正在尝试捕获事件。请提出建议。
@Component
public class ConsumerAwareRebalanceListenerImpl implements ConsumerAwareRebalanceListener {
public void ConsumerAwareRebalanceListenerImpl(){
System.out.println(" In ConsumerAwareRebalanceListenerImpl constructor");
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach( item -> {
TestConsumerConstants.consumerEventsMap.put("key-"+item.partition(), item.partition());
});
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions){
TestConsumerConstants.consumerEventsMap.put(consumer.toString(), partitions);
}
}
我解决了问题,现在可以正常使用了。我所要做的就是将 ConcurrentKafkaListenerContainerFactory 的 containerProperty 'setConsumerRebalanceListener' 设置为我的自定义 class 实例,如下所示
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setConsumerRebalanceListener(kafkaConsumerRebalanceListener);