从没有无限循环的kafka消费

Consume from kafka without infinite loop

我目前正在使用 Confluent kafka python 客户端来使用来自 kafka 主题的消息,并且 运行 在 while True 循环中的代码很好,如示例中所示文档。但是,我想设置一个 cron 作业,每天只从主题中消耗一次。这个想法是工作将在早上检查主题,在那个时间点消费主题中的所有消息,然后停止。我尝试在 python 中实现这一目标,如下所示:

msg = kafka_consumer.consume()
while msg:
  msg_val = msg.value().decode('utf-8')
  // do something with msg
  msg = kafka_consumer.consume()

问题是它永远不会消耗任何东西。我想第一行永远不会在第一次尝试时收到消息。它仅适用于 while True,但我不希望此代码无限地 运行,直到该时间点的最后一条消息被消耗为止。

您可以检查循环内消费者组的偏移量,然后在您处于“结束”的某个阈值内时中断循环

您可能还想尝试一下 max.poll.records 消费者配置,以便更好地控制您返回的记录数量