confluent-kafka-python 库:每个主题每个 consumer_group 的读取偏移量
confluent-kafka-python library: read offset per topic per consumer_group
由于 pykafka EOL we are in the process of migration to confluent-kafka-python。对于 pykafka
,我们编写了一个精心制作的脚本,以以下格式生成输出:
topic
consumer group
offset
topic_alpha
total_messages
100
topic_alpha
consumer_a
10
topic_alpha
consumer_b
25
我想知道是否有一个 Python 代码知道如何为 confluent-kafka-python
做类似的事情?
小字:有一个关于如何read offsets per given consumer_group的部分例子。但是,我很难在不手动解析 __consumer_offsets
.
的情况下获取每个主题的 consumer_group
列表
使用 admin_client.list_groups()
获取组列表,使用 admin_client.list_topics()
获取集群中的所有主题和分区,使用 client.get_watermark_offsets()
获取给定主题。
然后为每个消费者组实例化一个具有相应 group.id
的新消费者,创建一个 TopicPartition 列表来查询提交的偏移量,然后调用 c.committed()
检索提交的偏移量。
从高水位线中减去提交的偏移量得到 th
由于 pykafka EOL we are in the process of migration to confluent-kafka-python。对于 pykafka
,我们编写了一个精心制作的脚本,以以下格式生成输出:
topic | consumer group | offset |
---|---|---|
topic_alpha | total_messages | 100 |
topic_alpha | consumer_a | 10 |
topic_alpha | consumer_b | 25 |
我想知道是否有一个 Python 代码知道如何为 confluent-kafka-python
做类似的事情?
小字:有一个关于如何read offsets per given consumer_group的部分例子。但是,我很难在不手动解析 __consumer_offsets
.
consumer_group
列表
使用 admin_client.list_groups()
获取组列表,使用 admin_client.list_topics()
获取集群中的所有主题和分区,使用 client.get_watermark_offsets()
获取给定主题。
然后为每个消费者组实例化一个具有相应 group.id
的新消费者,创建一个 TopicPartition 列表来查询提交的偏移量,然后调用 c.committed()
检索提交的偏移量。
从高水位线中减去提交的偏移量得到 th