Kafka Streams:ConsumerRebalanceListener 实现
Kafka Streams: ConsumerRebalanceListener implementation
能否请您告知如何在流配置中注册以下 class?
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
static final Logger oLogger = Logger.getLogger(StreamConsumerRebalanceListener.class);
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
oLogger.info(p + " partitions has been assigned to the stream instance");
}
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
oLogger.warn(p + " partitions has been removed from the stream instance");
}
}
}
Kafka Streams 不会公开 API 来指定自定义 ConsumerRebalanceListener
,因为 Kafka Streams 使用自己的实现传递给内部使用的 KafkaConsumer
.
请注意,内部使用的侦听器以 INFO 模式记录分配,并在 DEBUG 模式下记录一些额外的日志。因此,不需要添加额外的自定义日志记录。
如果这是一项关键功能,请随时创建功能请求 JIRA:https://issues.apache.org/jira/projects/KAFKA
更新:
如果您使用 Processor
(或 Transformer
或类似的),您也许可以使用 init()
和 close()
。这些在分配分区之后和撤销分区之前被调用。
能否请您告知如何在流配置中注册以下 class?
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
static final Logger oLogger = Logger.getLogger(StreamConsumerRebalanceListener.class);
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
oLogger.info(p + " partitions has been assigned to the stream instance");
}
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
oLogger.warn(p + " partitions has been removed from the stream instance");
}
}
}
Kafka Streams 不会公开 API 来指定自定义 ConsumerRebalanceListener
,因为 Kafka Streams 使用自己的实现传递给内部使用的 KafkaConsumer
.
请注意,内部使用的侦听器以 INFO 模式记录分配,并在 DEBUG 模式下记录一些额外的日志。因此,不需要添加额外的自定义日志记录。
如果这是一项关键功能,请随时创建功能请求 JIRA:https://issues.apache.org/jira/projects/KAFKA
更新:
如果您使用 Processor
(或 Transformer
或类似的),您也许可以使用 init()
和 close()
。这些在分配分区之后和撤销分区之前被调用。