如何通过http消费kafka主题和parse/serve?

How to consume kafka topic and parse/serve over http?

我正在尝试在 python 中使用 kafka 主题并使用 prometheus 客户端通过 http 提供服务,但我似乎无法使用该主题。我放了一些占位符来简单地添加指标,但看起来那部分被阻止了。

import os
from pykafka import KafkaClient
import threading
from kafka import KafkaConsumer
from prometheus_client import start_http_server, Metric, REGISTRY

class CustomCollector(threading.Thread):
    daemon = True

    def collect(self):
        client = KafkaClient(hosts=os.environ['KAFKA_ADDRESS'])
        topic = client.topics[b'os.environ['KAFKA_TOPIC']
        consumer = topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                print(message.value)

        metric = Metric('test_name', 'description', 'summary')
        metric.add_sample('test_name', 'description', 'summary')
        yield metric

if __name__ == '__main__':
    start_http_server(9998)
    REGISTRY.register(CustomCollector())
    while True: time.sleep(1)

如果我 运行 代码,我会看到主题数据按预期流式传输到控制台。但是,我的指标端点从未被填充,并且对 Web 服务器的任何请求都会挂起,直到我终止该应用程序,它会使用库中的标准指标对其进行响应。

Metric 实例的构造应该在每个消费的消息中发生一次。也就是说,Metric() 调用应该在 for message in consumer 循环内。此外,您可能希望在创建 Metric 实例时以某种方式使用 message.value