Kafka:监控分配给分区主题的消费者的滞后

Kafka: Monitor the lag for the consumers that are assigned to partitions topic

我正在使用 Kafka 0.9.1 新消费者 API。消费者被手动分配给一个分区。对于这个消费者,我希望看到它的进步(意味着滞后)。由于我将组 ID consumer-tutorial 添加为 属性,我假设我可以使用命令

bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092

(如此处解释http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client

不幸的是,使用上面的命令没有显示我的消费者组详细信息。因此,我无法监控消费者的进度(滞后)。如何监控上述场景(手动分配分区)中的延迟?

密码是:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


        String topic = "my-topic";
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        consumer.assign(Arrays.asList(topicPartition));
        consumer.seekToBeginning(topicPartition);
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
     System.out.println(record.offset() + ": " + record.value());
  consumer.commitSynch();
  }
} finally {
  consumer.close();
}

kafka-consumer-groups.sh命令中,你的组名不正确--group consumer-tutorial不是consumer-tutorial-group

以防万一您不想编写代码来获取此信息或 运行 类似命令的 tools/shell 临时脚本,有 N 种工具可以捕获 Kafka 指标,包括消费者滞后。我突然想到:Burrow and SPM for Kafka 做得很好。这里有一些关于 Kafka 偏移量、消费者滞后的背景知识,以及一些从 Kafka 通过 JMX 公开的内容派生的指标。 HTH.

如果你对JMX暴露消费者组滞后感兴趣,这里是我写的代理: https://github.com/peterkovgan/kafka9.offsets

您可以 运行 在某个 Kafka 节点上使用此代理,并向外部读者公开偏移量滞后统计信息。

有一些示例如何将此代理与 Telegraf 一起使用 (https://influxdata.com/time-series-platform/telegraf/).

最后(结合 telegraf、influxdb 和 grafana)你可以看到几个消费者群体的偏移滞后的漂亮图表。

您的代码问题与将消费者手动分配到主题分区直接相关。

您在 group.id 属性 中指定了一个消费者组,但是,该组 ID 仅在您通过 [=11= 订阅一个主题(或一组主题)时使用] API。在您的示例中,您使用的是 .assign() 方法,该方法手动将客户端附加到指定的主题分区对,而不使用底层消费者组原语。正是出于这个原因,您无法看到消费者滞后。诸如Burrow之类的工具在这种情况下将不起作用,因为它们会查询消费者组的偏移量,而该偏移量并不存在。

您有两种选择:

  1. 使用 subscribe() API 正确使用消费者组功能。这是 Kafka 的主要用例。但是,seekToBeginning() 在这种情况下也不起作用,因为偏移量将完全由消费者组管理。
  2. 完全删除使用者组并手动管理分区分配和偏移量。这为您提供了最大可能的灵活性,但需要大量工作,而且您可能会发现自己在重新发明轮子。大多数人不会走这条路,除非Kafka的消费者组特性不适合你的需求。

选择将完全取决于您的用例。对于传统的流处理,#1 是惯用的方法。这就是 Kafka 的设计目的。 #2 意味着您知道自己在做什么,并将所有组管理责任转移到您的应用程序上。

注意:Kafka 没有“部分”模式,在这种模式下,您可以进行一些组管理,其余的由 Kafka 完成。要么全押,要么 none。

您可以使用名为

的简单而强大的滞后监控工具

prometheus-kafka-consumer-group-exporter

参考下面url:

https://github.com/braedon/prometheus-kafka-consumer-group-exporter

安装后 运行 下面的命令将消费者矩阵导出到您所需的端口 Prometheus Kafka Consumer Group Exporter

/usr/bin/python3 /usr/local/bin/prometheus-kafka-consumer-group-exporter -p 端口 -b KAFKA_CLUSTER_IP_PORT

在 运行ning 上面的命令之后验证 http url YOUR-SERVER-IP:PORT 上的数据,如 127.0.0.1:9208

现在您可以将任何 JMX 抓取器用于仪表板和警报系统。我正在使用普罗米修斯和 grafana

这可以是 运行 在任何共享服务器上,例如 [kafka 代理、zookeeper 服务器、prometheus 服务器或任何],因为它对系统资源的开销非常低。