`for .. in KafkaConsumer`:如何生存`eof`?
`for .. in KafkaConsumer`: how to survive `eof`?
for consumed_record in KafkaConsumer(config.kafka_topic,
bootstrap_servers=config.kafka_bootstrap_servers,
group_id=config.kafka_group_id):
__process_kafka_record(consumed_record)
上面的代码工作正常,除非看到 EOF
。然后 for
循环终止,但这不是我想要的行为。我希望我的消费者无休止地 运行 除非整个过程结束。从概念上讲,我需要像 if is_eof(): continue
这样的东西。使用 python-kafka
包的正确方法是什么?
您似乎在使用 kafka-python。
使用记录的正确方法是将该循环包装在另一个循环中,该循环仅在您需要时才停止。
kafka-python项目包含一些examples,比如:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(['my-topic'])
while not self.stop_event.is_set():
for message in consumer:
print(message)
if self.stop_event.is_set():
break
consumer.close()
for consumed_record in KafkaConsumer(config.kafka_topic,
bootstrap_servers=config.kafka_bootstrap_servers,
group_id=config.kafka_group_id):
__process_kafka_record(consumed_record)
上面的代码工作正常,除非看到 EOF
。然后 for
循环终止,但这不是我想要的行为。我希望我的消费者无休止地 运行 除非整个过程结束。从概念上讲,我需要像 if is_eof(): continue
这样的东西。使用 python-kafka
包的正确方法是什么?
您似乎在使用 kafka-python。
使用记录的正确方法是将该循环包装在另一个循环中,该循环仅在您需要时才停止。
kafka-python项目包含一些examples,比如:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(['my-topic'])
while not self.stop_event.is_set():
for message in consumer:
print(message)
if self.stop_event.is_set():
break
consumer.close()