从没有无限循环的kafka消费
Consume from kafka without infinite loop
我目前正在使用 Confluent kafka python 客户端来使用来自 kafka 主题的消息,并且 运行 在 while True
循环中的代码很好,如示例中所示文档。但是,我想设置一个 cron 作业,每天只从主题中消耗一次。这个想法是工作将在早上检查主题,在那个时间点消费主题中的所有消息,然后停止。我尝试在 python 中实现这一目标,如下所示:
msg = kafka_consumer.consume()
while msg:
msg_val = msg.value().decode('utf-8')
// do something with msg
msg = kafka_consumer.consume()
问题是它永远不会消耗任何东西。我想第一行永远不会在第一次尝试时收到消息。它仅适用于 while True
,但我不希望此代码无限地 运行,直到该时间点的最后一条消息被消耗为止。
您可以检查循环内消费者组的偏移量,然后在您处于“结束”的某个阈值内时中断循环
您可能还想尝试一下 max.poll.records
消费者配置,以便更好地控制您返回的记录数量
我目前正在使用 Confluent kafka python 客户端来使用来自 kafka 主题的消息,并且 运行 在 while True
循环中的代码很好,如示例中所示文档。但是,我想设置一个 cron 作业,每天只从主题中消耗一次。这个想法是工作将在早上检查主题,在那个时间点消费主题中的所有消息,然后停止。我尝试在 python 中实现这一目标,如下所示:
msg = kafka_consumer.consume()
while msg:
msg_val = msg.value().decode('utf-8')
// do something with msg
msg = kafka_consumer.consume()
问题是它永远不会消耗任何东西。我想第一行永远不会在第一次尝试时收到消息。它仅适用于 while True
,但我不希望此代码无限地 运行,直到该时间点的最后一条消息被消耗为止。
您可以检查循环内消费者组的偏移量,然后在您处于“结束”的某个阈值内时中断循环
您可能还想尝试一下 max.poll.records
消费者配置,以便更好地控制您返回的记录数量