使用 kafka 0.8.2.0 跟踪主题大小和消费者滞后

Tracking topic size and consumer lag with kafka 0.8.2.0

从 kafka 0.8.2.0 开始,跟踪消费者延迟和主题大小似乎变得非常困难

你如何跟踪 kafka 中的偏移量(主题大小)和滞后?当您的生产者插入一条消息时,您是否会在某处增加一个计数器并在您的消费者确认消息时增加另一个计数器?

我正在使用 airbnb's kafka-statsd-metrics2 - 但由于某种原因,所有关于主题大小的指标总是 0,这可能是他们的错误报告,但你是怎么做到的?

我们的消费者和生产者是使用 kafka-python 在 python 中编写的,他们声明他们不支持 ConsumerCoordinator 偏移量 API,因此我整理了一个查询 zookeeper 并发送这些指标的解决方案到 statsd 实例(看起来很尴尬),但我仍然缺少主题大小指标。

我们正在使用 collectd 来收集系统指标,我没有使用 JMX 的经验并且在 collectd 中配置它似乎很复杂,我已经尝试了几次,所以我找到了一些不这样做的方法.

如果您有任何意见,我很想听听,即使是:"This belongs on x stackexchange-site"

如果我理解正确的话,你可以使用 FetchResponse 中的 HighwaterMarkOffset。这样你就会知道分区末尾的偏移量是多少,并且能够将它与你当前确认的偏移量或这个 FetchResponse 例如中最后一条消息的偏移量进行比较。

详情here

您是否尝试过使用 https://github.com/quantifind/KafkaOffsetMonitor 来监控消费者延迟。它适用于 0.8.2.0

这里是代码片段,确保在活动控制器中 运行 这个。 BOOTSTRAP_SERVERS 是活动控制器 IP。

client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, request_timeout_ms=300)
      list_groups_request  = client.list_consumer_groups()

      for group in list_groups_request:
        if group[1] == 'consumer':
          list_mebers_in_groups = client.describe_consumer_groups([group[0]])
          (error_code, group_id, state, protocol_type, protocol, members) = list_mebers_in_groups[0]

          if len(members) !=0:
            for member in members:
              (member_id, client_id, client_host, member_metadata, member_assignment) = member
              member_topics_assignment = []
              for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
                member_topics_assignment.append(topic)

              for topic in member_topics_assignment:
                consumer = KafkaConsumer(
                          bootstrap_servers=BOOTSTRAP_SERVERS,
                          group_id=group[0],
                          enable_auto_commit=False
                          )
                consumer.topics()

                for p in consumer.partitions_for_topic(topic):
                  tp = TopicPartition(topic, p)
                  consumer.assign([tp])
                  committed = consumer.committed(tp)
                  consumer.seek_to_end(tp)
                  last_offset = consumer.position(tp)
                  if last_offset != None and committed != None:
                    lag = last_offset - committed
                    print "group: {} topic:{} partition: {} lag: {}".format(group[0], topic, p, lag)