将 csv 文件写入 kafka 主题
Write a csv file to a kafka topic
我有一个很大的 csv,我想写一个 kafka 主题。
def producer():
producer = KafkaProducer(bootstrap_servers='mykafka-broker')
with open('/home/antonis/repos/testfile.csv') as file:
reader = csv.DictReader(file, delimiter=";")
for row in reader:
producer.send(topic='stable_topic', value=row)
producer.flush()
if __name__ == '__main__':
producer()
此代码产生错误:
AssertionError: value must be bytes
文件看起来像:
"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22
谁能帮我解决这个问题?
与其重新发明轮子,不如使用已经存在的非常好的轮子 :) 它是 Kafka Connect,它是 Apache Kafka 的一部分。
有几个连接器可以从 CSV 中读取,包括 Kafka Connect spooldir (see example) and Filepulse。
在 this talk 中了解有关 Kafka Connect 的更多信息。
您需要正确序列化您的值。
以下应该可以解决问题:
import json
producer = KafkaProducer(
bootstrap_servers='mykafka-broker',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
我有一个很大的 csv,我想写一个 kafka 主题。
def producer():
producer = KafkaProducer(bootstrap_servers='mykafka-broker')
with open('/home/antonis/repos/testfile.csv') as file:
reader = csv.DictReader(file, delimiter=";")
for row in reader:
producer.send(topic='stable_topic', value=row)
producer.flush()
if __name__ == '__main__':
producer()
此代码产生错误:
AssertionError: value must be bytes
文件看起来像:
"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22
谁能帮我解决这个问题?
与其重新发明轮子,不如使用已经存在的非常好的轮子 :) 它是 Kafka Connect,它是 Apache Kafka 的一部分。
有几个连接器可以从 CSV 中读取,包括 Kafka Connect spooldir (see example) and Filepulse。
在 this talk 中了解有关 Kafka Connect 的更多信息。
您需要正确序列化您的值。
以下应该可以解决问题:
import json
producer = KafkaProducer(
bootstrap_servers='mykafka-broker',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)