python 的 Kafka 消费者轮询消息
Kafka Consumer poll messages with python
我在从消费者组中的 Kafka 轮询消息时遇到问题。
我的消费者对象分配给给定的分区
self.ps = TopicPartition(topic, partition )
然后消费者分配给该分区:
self.consumer.assign([self.ps])
之后我可以用
计算分区内的消息
self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)
和self.consumer.seek_to_end(self.ps)
.....
在我的主题中有超过 30000 条消息。
问题是我只收到一条消息。
消费者配置:
max_poll_records= 200
AUTO_OFFSET_RESET
最早
这是我的功能,我正在尝试获取消息:
def poll_messages(self):
data = []
messages = self.consumer.poll(timeout_ms=6000)
for partition, msgs in six.iteritems(messages):
for msg in msgs:
data.append(msg)
return data
即使我在开始轮询消息之前转到第一个可用偏移量
我只收到一条消息。
self.consumer.seek(self.ps, self.get_first_offset())
我希望有人能解释我做错了什么。
提前致谢。
祝福
约恩
我认为您误会了 max_poll_records - 这并不意味着每次投票您将获得 200 个,只是您可能获得的最高限额。您将需要多次调用轮询。我建议您参考文档以获取简单示例:http://kafka-python.readthedocs.io/en/master/usage.html
我认为更标准的实现是:
for message in self.consumer:
# do stuff like:
print(msg)
我在从消费者组中的 Kafka 轮询消息时遇到问题。 我的消费者对象分配给给定的分区
self.ps = TopicPartition(topic, partition )
然后消费者分配给该分区:
self.consumer.assign([self.ps])
之后我可以用
计算分区内的消息self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)
和self.consumer.seek_to_end(self.ps)
.....
在我的主题中有超过 30000 条消息。 问题是我只收到一条消息。
消费者配置:
max_poll_records= 200
AUTO_OFFSET_RESET
最早
这是我的功能,我正在尝试获取消息:
def poll_messages(self):
data = []
messages = self.consumer.poll(timeout_ms=6000)
for partition, msgs in six.iteritems(messages):
for msg in msgs:
data.append(msg)
return data
即使我在开始轮询消息之前转到第一个可用偏移量 我只收到一条消息。
self.consumer.seek(self.ps, self.get_first_offset())
我希望有人能解释我做错了什么。 提前致谢。
祝福 约恩
我认为您误会了 max_poll_records - 这并不意味着每次投票您将获得 200 个,只是您可能获得的最高限额。您将需要多次调用轮询。我建议您参考文档以获取简单示例:http://kafka-python.readthedocs.io/en/master/usage.html
我认为更标准的实现是:
for message in self.consumer:
# do stuff like:
print(msg)