使用 kafka-python 检索主题中的消息
retrieve messages in a topic using kafka-python
我已经使用 kafka-python
库编写了一个 python 脚本,该脚本将消息写入和读取到 kafka
。我写消息没有任何问题;我可以使用 kafka
控制台工具检索它们。但是我无法使用我的 python 脚本读取它们。我对我的消费者有一个 for,它在迭代的第一行冻结并且从不 returns。这是我的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
消费者创建并订阅完成;我可以看到 my-topic
列在其 _client
属性 的主题列表中。
有什么想法吗?
默认情况下,kafka python 从最后一个偏移量开始,即只会读取新消息。
一种方法是从头开始阅读,或者另一种方法是将轮询主题保持在无限循环中,如下面的代码所示:
while True:
try:
records = consumer.poll(60 * 1000) # timeout in millis , here set to 1 min
record_list = []
for tp, consumer_records in records.items():
for consumer_record in consumer_records:
record_list.append(consumer_record.value)
print(record_list) # record_list will be list of dictionaries
编辑
要从头读取,我们需要在制作消费者对象时提前添加auto_offset_reset=earliest
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8"),
auto_offset_reset='earliest')
如果有帮助请告诉我!!
我已经使用 kafka-python
库编写了一个 python 脚本,该脚本将消息写入和读取到 kafka
。我写消息没有任何问题;我可以使用 kafka
控制台工具检索它们。但是我无法使用我的 python 脚本读取它们。我对我的消费者有一个 for,它在迭代的第一行冻结并且从不 returns。这是我的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
消费者创建并订阅完成;我可以看到 my-topic
列在其 _client
属性 的主题列表中。
有什么想法吗?
默认情况下,kafka python 从最后一个偏移量开始,即只会读取新消息。 一种方法是从头开始阅读,或者另一种方法是将轮询主题保持在无限循环中,如下面的代码所示:
while True:
try:
records = consumer.poll(60 * 1000) # timeout in millis , here set to 1 min
record_list = []
for tp, consumer_records in records.items():
for consumer_record in consumer_records:
record_list.append(consumer_record.value)
print(record_list) # record_list will be list of dictionaries
编辑
要从头读取,我们需要在制作消费者对象时提前添加auto_offset_reset=earliest
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8"),
auto_offset_reset='earliest')
如果有帮助请告诉我!!