kafka-python 在消费者重启后读取最后生成的消息

kafka-python read from last produced message after a consumer restart

我正在使用 kafka-python to consume messages from a kafka queue (kafka version 0.10.2.0). In particular i am using KafkaConsumer 类型。 如果消费者停止并在一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息。 我怎样才能做到这一点?

谢谢

你不会seekToEnd()到日志的末尾。

请记住,您首先需要订阅一个主题才能搜索。此外,订阅是懒惰的。因此,您也需要在搜索之前添加一个 "dummy poll"。

consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()

// now enter your regular poll-loop

谢谢,

有效!

这是我的代码的简化版本:

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iterate
for message in consumer:
    print(message)

consumer.close()

The documentation 声明 poll() 方法与迭代器接口不兼容,我猜这是我在脚本末尾的循环中使用的接口。然而,从最初的测试来看,这段代码看起来可以正常工作。

使用安全吗?还是我误解了文档?

谢谢

在您的回答中回答您的问题:

据我了解,当您执行 consumer.poll() 时,会返回一个字典。因此,当我想轮询信息时,我使用循环遍历字典。

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
messages = consumer.poll()
data = []
for msg in messages:
    for value in messages[msg]:
       #Add just the values to the list
       data.append(value[6])

我相信您正在做的是使用 consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True) 获取迭代器,然后使用

遍历迭代器
#start iterate
for message in consumer:
    print(message)

看起来您实际上并没有从投票中获得 500 个结果。您可以通过将 max_poll_records=5 添加到您的 KafkaConsumer 配置来确认这一点。然后,当您 运行 代码时,如果打印出超过 5 条消息,您可以判断您没有使用轮询功能。

希望对您有所帮助!

这是一种将轮询返回的所有消息放入列表中的便捷方法:

while True:
  messages = [] # Store all messages
  crs = [] # Store all consumer records
  tpd = consumer.poll(timeout_ms=60000, max_records=1)
  [ crs.extend(tp) for tp in tpd.values() ] # List of cr's
  [ messages.extend([json.loads(cr.value)]) for cr in crs ]
  print messages