使用 kafka-python 检索主题中的消息

retrieve messages in a topic using kafka-python

我已经使用 kafka-python 库编写了一个 python 脚本,该脚本将消息写入和读取到 kafka。我写消息没有任何问题;我可以使用 kafka 控制台工具检索它们。但是我无法使用我的 python 脚本读取它们。我对我的消费者有一个 for,它在迭代的第一行冻结并且从不 returns。这是我的代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)

for msg in consumer:
    print(type(msg))

消费者创建并订阅完成;我可以看到 my-topic 列在其 _client 属性 的主题列表中。

有什么想法吗?

默认情况下,kafka python 从最后一个偏移量开始,即只会读取新消息。 一种方法是从头开始阅读,或者另一种方法是将轮询主题保持在无限循环中,如下面的代码所示:

while True:
    try:
        records = consumer.poll(60 * 1000) # timeout in millis , here set to 1 min

        record_list = []
        for tp, consumer_records in records.items():
            for consumer_record in consumer_records:
                record_list.append(consumer_record.value)
        print(record_list) # record_list will be list of dictionaries

编辑

要从头读取,我们需要在制作消费者对象时提前添加auto_offset_reset=earliest

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8"),
    auto_offset_reset='earliest')

如果有帮助请告诉我!!