如何停止程序中的Python Kafka Consumer?
How to stop Python Kafka Consumer in program?
我正在做 Python Kafka 消费者(尝试在 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html 中使用 kafka.consumer.SimpleConsumer 或 kafka.consumer.simple.SimpleConsumer)。当我运行下面的一段代码时,它会一直运行,即使所有的消息都被消耗掉了。 希望消费者消费完消息就停下来。怎么做? 我也不知道如何使用 stop() 函数(在基础 class kafka.consumer.base.Consumer 中)。
更新
我使用信号处理程序来调用 consumer.stop()。一些错误消息被打印到屏幕上。但是程序仍然卡在for循环中。当新消息进来时,消费者消费它们并打印它们。我也试过 client.close()。但是结果一样。
我需要一些方法来优雅地停止 for 循环。
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "test")
consumer.seek(0, 2)# (0,2) and (0,0)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
欢迎任何帮助。谢谢
使用iter_timeout参数设置等待时间。如果设置为10,就像下面这段代码,如果10秒内没有新消息进来就退出。默认值为None,即即使没有新消息进来,消费者也会阻塞在这里。
self.consumer = SimpleConsumer(self.client, "test-group", "test",
iter_timeout=10)
更新
以上不是一个好的方法。当大量消息进来时,很难设置足够小的 iter_timeout 来保证停止。所以,现在,我正在使用 get_message() 函数,它尝试使用一条消息并停止。没有新消息时返回None。
我们可以先查看主题中最后一条消息的偏移量。
然后在我们达到该偏移量时停止循环。
client = "localhost:9092"
consumer = KafkaConsumer(client)
topic = 'test'
tp = TopicPartition(topic,0)
#register to the topic
consumer.assign([tp])
# obtain the last offset value
consumer.seek_to_end(tp)
lastOffset = consumer.position(tp)
consumer.seek_to_beginning(tp)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
if message.offset == lastOffset - 1:
break
Mohit 答案的类似解决方案,但使用消费者的 end_offsets
功能。
from kafka import KafkaConsumer, TopicPartition
# settings
client = "localhost:9092"
topic = 'test'
# prepare consumer
tp = TopicPartition(topic,0)
consumer = KafkaConsumer(client)
consumer.assign([tp])
consumer.seek_to_beginning(tp)
# obtain the last offset value
lastOffset = consumer.end_offsets([tp])[tp]
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
if message.offset == lastOffset - 1:
break
更简单的解决方案:
使用 poll()
代替 poll_timeout_ms
。 poll()
是非阻塞调用。
- 在 while 循环外创建一个计数器变量。
- 每次 poll() 从 Kafka Brokers 获取 0 条记录时增加计数器。
- 如果
poll()
提取记录 ,则将计数器重置为 0
- 如果计数器 == 某个阈值(比如 10),则跳出循环并关闭消费者。
在这个逻辑中,我们依赖于这样一个事实,即如果 poll()
在 10 个后续调用中没有获取任何记录,这意味着我们已经读取了所有数据。
我正在做 Python Kafka 消费者(尝试在 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html 中使用 kafka.consumer.SimpleConsumer 或 kafka.consumer.simple.SimpleConsumer)。当我运行下面的一段代码时,它会一直运行,即使所有的消息都被消耗掉了。 希望消费者消费完消息就停下来。怎么做? 我也不知道如何使用 stop() 函数(在基础 class kafka.consumer.base.Consumer 中)。
更新
我使用信号处理程序来调用 consumer.stop()。一些错误消息被打印到屏幕上。但是程序仍然卡在for循环中。当新消息进来时,消费者消费它们并打印它们。我也试过 client.close()。但是结果一样。
我需要一些方法来优雅地停止 for 循环。
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "test")
consumer.seek(0, 2)# (0,2) and (0,0)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
欢迎任何帮助。谢谢
使用iter_timeout参数设置等待时间。如果设置为10,就像下面这段代码,如果10秒内没有新消息进来就退出。默认值为None,即即使没有新消息进来,消费者也会阻塞在这里。
self.consumer = SimpleConsumer(self.client, "test-group", "test",
iter_timeout=10)
更新
以上不是一个好的方法。当大量消息进来时,很难设置足够小的 iter_timeout 来保证停止。所以,现在,我正在使用 get_message() 函数,它尝试使用一条消息并停止。没有新消息时返回None。
我们可以先查看主题中最后一条消息的偏移量。 然后在我们达到该偏移量时停止循环。
client = "localhost:9092"
consumer = KafkaConsumer(client)
topic = 'test'
tp = TopicPartition(topic,0)
#register to the topic
consumer.assign([tp])
# obtain the last offset value
consumer.seek_to_end(tp)
lastOffset = consumer.position(tp)
consumer.seek_to_beginning(tp)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
if message.offset == lastOffset - 1:
break
Mohit 答案的类似解决方案,但使用消费者的 end_offsets
功能。
from kafka import KafkaConsumer, TopicPartition
# settings
client = "localhost:9092"
topic = 'test'
# prepare consumer
tp = TopicPartition(topic,0)
consumer = KafkaConsumer(client)
consumer.assign([tp])
consumer.seek_to_beginning(tp)
# obtain the last offset value
lastOffset = consumer.end_offsets([tp])[tp]
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
if message.offset == lastOffset - 1:
break
更简单的解决方案:
使用 poll()
代替 poll_timeout_ms
。 poll()
是非阻塞调用。
- 在 while 循环外创建一个计数器变量。
- 每次 poll() 从 Kafka Brokers 获取 0 条记录时增加计数器。
- 如果
poll()
提取记录 ,则将计数器重置为 0
- 如果计数器 == 某个阈值(比如 10),则跳出循环并关闭消费者。
在这个逻辑中,我们依赖于这样一个事实,即如果 poll()
在 10 个后续调用中没有获取任何记录,这意味着我们已经读取了所有数据。