`for .. in KafkaConsumer`:如何生存`eof`?

`for .. in KafkaConsumer`: how to survive `eof`?

for consumed_record in KafkaConsumer(config.kafka_topic,                                      
                                     bootstrap_servers=config.kafka_bootstrap_servers,
                                     group_id=config.kafka_group_id):
        __process_kafka_record(consumed_record)

上面的代码工作正常,除非看到 EOF。然后 for 循环终止,但这不是我想要的行为。我希望我的消费者无休止地 运行 除非整个过程结束。从概念上讲,我需要像 if is_eof(): continue 这样的东西。使用 python-kafka 包的正确方法是什么?

您似乎在使用 kafka-python

使用记录的正确方法是将该循环包装在另一个循环中,该循环仅在您需要时才停止。

kafka-python项目包含一些examples,比如:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(['my-topic'])

while not self.stop_event.is_set():
    for message in consumer:
        print(message)
        if self.stop_event.is_set():
            break

consumer.close()