Kafka消费者列表

Kafka consumer list

我需要找到一种方法向 Kafka 询问主题列表。我知道我可以使用 bin\ 目录中包含的 kafka-topics.sh 脚本来做到这一点。有了这个列表后,我需要每个主题的所有消费者。我在该目录中找不到脚本,在 kafka-consumer-api 库中也找不到允许我这样做的 class。

这背后的原因是我需要弄清楚主题的偏移量和消费者的偏移量之间的区别。

有办法实现吗?或者我是否需要在我的每个消费者中实现此功能?

高级消费者已注册到 Zookeeper,因此您可以从 ZK 获取列表,类似于 kafka-topics.sh 获取主题列表的方式。我认为没有办法收集 所有 消费者;任何发送一些消费请求的应用程序实际上是一个 "consumer",您无法判断它们是否已经完成。

在消费者方面,有一个 JMX metric exposed to monitor the lag. Also, there is Burrow 用于延迟监控。

Kafka 将所有信息存储在zookeeper 中。您可以在brokers->topics下看到所有的topic相关信息。如果您希望以编程方式获取所有主题,您可以使用 Zookeeper API.

在下面的链接中有详细说明 Tutorialspoint, Zookeeper Programmer guide

使用kafka-consumer-groups.sh

例如

bin/kafka-consumer-groups.sh  --list --bootstrap-server localhost:9092

bin/kafka-consumer-groups.sh --describe --group mygroup --bootstrap-server localhost:9092

您可以将其用于 0.9.0.0。卡夫卡版本

./kafka-consumer-groups.sh --list --zookeeper hostname:potnumber

查看您创建的群组。这将显示所有消费者组名称。

 ./kafka-consumer-groups.sh --describe --zookeeper hostname:potnumber  --describe  --group consumer_group_name

查看详情

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER

我意识到这个问题已经有将近 4 年的历史了。从那时起,卡夫卡发生了很大变化。上面已经提到了,但只写了小字,所以我写这篇文章是为了那些像我一样迟到才发现这个问题的用户。

  1. 偏移量现在默认存储在 Kafka 主题中(不再存储在 Zookeeper 中),参见
  2. 有一个kafka-consumer-groups实用程序,它returns所有信息,包括主题和分区的偏移量,消费者的偏移量,甚至是滞后(备注:当您要求主题的偏移量时,我假设您指的是主题分区的偏移量)。在我的 Kafka 2.0 测试集群中:
kafka-consumer-groups --bootstrap-server kafka:9092 --describe
    --group console-consumer-69763 Consumer group 'console-consumer-69763' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
pytest          0          5               6               1               -               -               -
``


每个主题的所有消费者

(将 --zookeeper 替换为 --bootstrap-server 以获取由较新的 Kafka 客户端存储的组)

获取每个主题的所有消费者作为 topic 选项卡的 tableconsumer:

for t in `kafka-consumer-groups.sh --zookeeper <HOST>:2181 --list 2>/dev/null`; do
    echo $t | xargs -I {} sh -c "kafka-consumer-groups.sh --zookeeper <HOST>:2181 --describe --group {} 2>/dev/null | grep ^{} | awk '{print $2\"\t\"$1}' "
done > topic-consumer.txt

使这对独一无二:

cat topic-consumer.txt | sort -u > topic-consumer-u.txt

得到想要的:

less topic-consumer-u.txt | grep -i <TOPIC>

我没有看到这里提到它,但是我经常使用的命令可以帮助我对所有组、主题、分区、偏移量有一个鸟瞰图,滞后、消费者等

kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --all-groups

示例如下所示:

GROUP TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID HOST CLIENT-ID
Group Topic 2          7               7               0   <SOME-ID>   XXXX <SOME-ID>
:
:

最重要 列是 LAG,其中 健康 平台,理想 它应该是 0(或更接近于 0 或较低的数字以实现高吞吐量)- 始终如此。所以一定要监控!!! ;-).

P.S:
可以找到一篇关于如何监控延迟的有趣文章 here

您也可以使用 kafkactl 来实现:

# get all consumer groups (output as yaml)
kafkactl get consumer-groups -o yaml

# get only consumer groups assigned to a single topic (output as table)
kafkactl get consumer-groups --topic topic-a

样本输出(例如 yaml):

name: my-group
protocoltype: consumer
topics:
 - topic-a
 - topic-b
 - topic-c

免责声明:我是这个项目的贡献者