将kafka(kafka-python)转储到txt文件中
Dump the kafka (kafka-python) to a txt file
我需要定期将 kafka 消费者的输出转储到 excel 文件中。我使用以下代码:
from kafka import KafkaConsumer
from kafka import KafkaProducer
import json,time
from xlutils.copy import copy
from xlrd import open_workbook
import pandas
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
KafkaConsumer()
consumer.subscribe("test")
rowx=0
colx=0
for msg in consumer:
book_ro = open_workbook("twitter.xls")
book = copy(book_ro) # creates a writeable copy
sheet1 = book.get_sheet(0) # get a first sheet
sheet1.write(rowx,colx, msg[6])
book.save("twitter.xls")
现在,我的问题是代码效率不高。对于每条消息,我需要打开、写入然后保存 excel 文件。有什么方法可以打开 excel 一次,写入,然后关闭它(对于一批消息而不是在 for 循环中)?发送
是的,打开、写入、保存和关闭每条消息的效率很低,您可以批量进行。但还是需要在消费循环中做。
msg_buffer = []
buffer_size = 100
for msg in consumer:
msg_buffer.append(msg[6])
if len(msg_buffer) >= buffer_size:
book_ro = open_workbook("twitter.xls")
book = copy(book_ro) # creates a writeable copy
for _msg in msg_buffer:
sheet1 = book.get_sheet(0) # get a first sheet
sheet1.write(rowx,colx, _msg)
book.save("twitter.xls")
msg_buffer = []
你可能认为这会比 nobatch 快 100 倍。
评论更新:
是的,通常我们会永远停留在这个循环中,它内部使用轮询来获取新消息,发送心跳和提交偏移量。如果您的目标是使用来自该主题的消息并保存消息,那么它应该是一个很长的 运行 循环。
这是kafka-python的设计,你应该这样使用来消费消息或者使用consumer.poll()。
至于为什么可以使用for msg in consumer:
,因为消费者是一个迭代器对象,它的class实现了__iter__
和__next__
,它底层使用了一个fetcher获取记录。更多实现细节你可以参考 https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py
我需要定期将 kafka 消费者的输出转储到 excel 文件中。我使用以下代码:
from kafka import KafkaConsumer
from kafka import KafkaProducer
import json,time
from xlutils.copy import copy
from xlrd import open_workbook
import pandas
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
KafkaConsumer()
consumer.subscribe("test")
rowx=0
colx=0
for msg in consumer:
book_ro = open_workbook("twitter.xls")
book = copy(book_ro) # creates a writeable copy
sheet1 = book.get_sheet(0) # get a first sheet
sheet1.write(rowx,colx, msg[6])
book.save("twitter.xls")
现在,我的问题是代码效率不高。对于每条消息,我需要打开、写入然后保存 excel 文件。有什么方法可以打开 excel 一次,写入,然后关闭它(对于一批消息而不是在 for 循环中)?发送
是的,打开、写入、保存和关闭每条消息的效率很低,您可以批量进行。但还是需要在消费循环中做。
msg_buffer = []
buffer_size = 100
for msg in consumer:
msg_buffer.append(msg[6])
if len(msg_buffer) >= buffer_size:
book_ro = open_workbook("twitter.xls")
book = copy(book_ro) # creates a writeable copy
for _msg in msg_buffer:
sheet1 = book.get_sheet(0) # get a first sheet
sheet1.write(rowx,colx, _msg)
book.save("twitter.xls")
msg_buffer = []
你可能认为这会比 nobatch 快 100 倍。
评论更新:
是的,通常我们会永远停留在这个循环中,它内部使用轮询来获取新消息,发送心跳和提交偏移量。如果您的目标是使用来自该主题的消息并保存消息,那么它应该是一个很长的 运行 循环。
这是kafka-python的设计,你应该这样使用来消费消息或者使用consumer.poll()。
至于为什么可以使用for msg in consumer:
,因为消费者是一个迭代器对象,它的class实现了__iter__
和__next__
,它底层使用了一个fetcher获取记录。更多实现细节你可以参考 https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py