Kafka:监控分配给分区主题的消费者的滞后
Kafka: Monitor the lag for the consumers that are assigned to partitions topic
我正在使用 Kafka 0.9.1 新消费者 API。消费者被手动分配给一个分区。对于这个消费者,我希望看到它的进步(意味着滞后)。由于我将组 ID consumer-tutorial 添加为 属性,我假设我可以使用命令
bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092
(如此处解释http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client)
不幸的是,使用上面的命令没有显示我的消费者组详细信息。因此,我无法监控消费者的进度(滞后)。如何监控上述场景(手动分配分区)中的延迟?
密码是:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "my-topic";
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToBeginning(topicPartition);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
consumer.commitSynch();
}
} finally {
consumer.close();
}
在kafka-consumer-groups.sh
命令中,你的组名不正确--group consumer-tutorial
不是consumer-tutorial-group
以防万一您不想编写代码来获取此信息或 运行 类似命令的 tools/shell 临时脚本,有 N 种工具可以捕获 Kafka 指标,包括消费者滞后。我突然想到:Burrow and SPM for Kafka 做得很好。这里有一些关于 Kafka 偏移量、消费者滞后的背景知识,以及一些从 Kafka 通过 JMX 公开的内容派生的指标。 HTH.
如果你对JMX暴露消费者组滞后感兴趣,这里是我写的代理:
https://github.com/peterkovgan/kafka9.offsets
您可以 运行 在某个 Kafka 节点上使用此代理,并向外部读者公开偏移量滞后统计信息。
有一些示例如何将此代理与 Telegraf 一起使用
(https://influxdata.com/time-series-platform/telegraf/).
最后(结合 telegraf、influxdb 和 grafana)你可以看到几个消费者群体的偏移滞后的漂亮图表。
您的代码问题与将消费者手动分配到主题分区直接相关。
您在 group.id
属性 中指定了一个消费者组,但是,该组 ID 仅在您通过 [=11= 订阅一个主题(或一组主题)时使用] API。在您的示例中,您使用的是 .assign()
方法,该方法手动将客户端附加到指定的主题分区对,而不使用底层消费者组原语。正是出于这个原因,您无法看到消费者滞后。诸如Burrow之类的工具在这种情况下将不起作用,因为它们会查询消费者组的偏移量,而该偏移量并不存在。
您有两种选择:
- 使用
subscribe()
API 正确使用消费者组功能。这是 Kafka 的主要用例。但是,seekToBeginning()
在这种情况下也不起作用,因为偏移量将完全由消费者组管理。
- 完全删除使用者组并手动管理分区分配和偏移量。这为您提供了最大可能的灵活性,但需要大量工作,而且您可能会发现自己在重新发明轮子。大多数人不会走这条路,除非Kafka的消费者组特性不适合你的需求。
选择将完全取决于您的用例。对于传统的流处理,#1 是惯用的方法。这就是 Kafka 的设计目的。 #2 意味着您知道自己在做什么,并将所有组管理责任转移到您的应用程序上。
注意:Kafka 没有“部分”模式,在这种模式下,您可以进行一些组管理,其余的由 Kafka 完成。要么全押,要么 none。
您可以使用名为
的简单而强大的滞后监控工具
prometheus-kafka-consumer-group-exporter
参考下面url:
https://github.com/braedon/prometheus-kafka-consumer-group-exporter
安装后 运行 下面的命令将消费者矩阵导出到您所需的端口 Prometheus Kafka Consumer Group Exporter
/usr/bin/python3 /usr/local/bin/prometheus-kafka-consumer-group-exporter -p 端口 -b KAFKA_CLUSTER_IP_PORT
在 运行ning 上面的命令之后验证 http url YOUR-SERVER-IP:PORT 上的数据,如 127.0.0.1:9208
现在您可以将任何 JMX 抓取器用于仪表板和警报系统。我正在使用普罗米修斯和 grafana
这可以是 运行 在任何共享服务器上,例如 [kafka 代理、zookeeper 服务器、prometheus 服务器或任何],因为它对系统资源的开销非常低。
我正在使用 Kafka 0.9.1 新消费者 API。消费者被手动分配给一个分区。对于这个消费者,我希望看到它的进步(意味着滞后)。由于我将组 ID consumer-tutorial 添加为 属性,我假设我可以使用命令
bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092
(如此处解释http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client)
不幸的是,使用上面的命令没有显示我的消费者组详细信息。因此,我无法监控消费者的进度(滞后)。如何监控上述场景(手动分配分区)中的延迟?
密码是:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "my-topic";
TopicPartition topicPartition = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToBeginning(topicPartition);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
consumer.commitSynch();
}
} finally {
consumer.close();
}
在kafka-consumer-groups.sh
命令中,你的组名不正确--group consumer-tutorial
不是consumer-tutorial-group
以防万一您不想编写代码来获取此信息或 运行 类似命令的 tools/shell 临时脚本,有 N 种工具可以捕获 Kafka 指标,包括消费者滞后。我突然想到:Burrow and SPM for Kafka 做得很好。这里有一些关于 Kafka 偏移量、消费者滞后的背景知识,以及一些从 Kafka 通过 JMX 公开的内容派生的指标。 HTH.
如果你对JMX暴露消费者组滞后感兴趣,这里是我写的代理: https://github.com/peterkovgan/kafka9.offsets
您可以 运行 在某个 Kafka 节点上使用此代理,并向外部读者公开偏移量滞后统计信息。
有一些示例如何将此代理与 Telegraf 一起使用 (https://influxdata.com/time-series-platform/telegraf/).
最后(结合 telegraf、influxdb 和 grafana)你可以看到几个消费者群体的偏移滞后的漂亮图表。
您的代码问题与将消费者手动分配到主题分区直接相关。
您在 group.id
属性 中指定了一个消费者组,但是,该组 ID 仅在您通过 [=11= 订阅一个主题(或一组主题)时使用] API。在您的示例中,您使用的是 .assign()
方法,该方法手动将客户端附加到指定的主题分区对,而不使用底层消费者组原语。正是出于这个原因,您无法看到消费者滞后。诸如Burrow之类的工具在这种情况下将不起作用,因为它们会查询消费者组的偏移量,而该偏移量并不存在。
您有两种选择:
- 使用
subscribe()
API 正确使用消费者组功能。这是 Kafka 的主要用例。但是,seekToBeginning()
在这种情况下也不起作用,因为偏移量将完全由消费者组管理。 - 完全删除使用者组并手动管理分区分配和偏移量。这为您提供了最大可能的灵活性,但需要大量工作,而且您可能会发现自己在重新发明轮子。大多数人不会走这条路,除非Kafka的消费者组特性不适合你的需求。
选择将完全取决于您的用例。对于传统的流处理,#1 是惯用的方法。这就是 Kafka 的设计目的。 #2 意味着您知道自己在做什么,并将所有组管理责任转移到您的应用程序上。
注意:Kafka 没有“部分”模式,在这种模式下,您可以进行一些组管理,其余的由 Kafka 完成。要么全押,要么 none。
您可以使用名为
的简单而强大的滞后监控工具prometheus-kafka-consumer-group-exporter
参考下面url:
https://github.com/braedon/prometheus-kafka-consumer-group-exporter
安装后 运行 下面的命令将消费者矩阵导出到您所需的端口 Prometheus Kafka Consumer Group Exporter
/usr/bin/python3 /usr/local/bin/prometheus-kafka-consumer-group-exporter -p 端口 -b KAFKA_CLUSTER_IP_PORT
在 运行ning 上面的命令之后验证 http url YOUR-SERVER-IP:PORT 上的数据,如 127.0.0.1:9208
现在您可以将任何 JMX 抓取器用于仪表板和警报系统。我正在使用普罗米修斯和 grafana
这可以是 运行 在任何共享服务器上,例如 [kafka 代理、zookeeper 服务器、prometheus 服务器或任何],因为它对系统资源的开销非常低。