Kafka Streams:ConsumerRebalanceListener 实现

Kafka Streams: ConsumerRebalanceListener implementation

能否请您告知如何在流配置中注册以下 class?

public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {

  static final Logger oLogger = Logger.getLogger(StreamConsumerRebalanceListener.class);

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition p : partitions) {
        oLogger.info(p + " partitions has been assigned to the stream instance");
    }

  }

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition p : partitions) {
        oLogger.warn(p + " partitions has been removed from the stream instance");
    }
  }
}

Kafka Streams 不会公开 API 来指定自定义 ConsumerRebalanceListener,因为 Kafka Streams 使用自己的实现传递给内部使用的 KafkaConsumer.

请注意,内部使用的侦听器以 INFO 模式记录分配,并在 DEBUG 模式下记录一些额外的日志。因此,不需要添加额外的自定义日志记录。

如果这是一项关键功能,请随时创建功能请求 JIRA:https://issues.apache.org/jira/projects/KAFKA

更新:

如果您使用 Processor(或 Transformer 或类似的),您也许可以使用 init()close()。这些在分配分区之后和撤销分区之前被调用。