通过 KAFKA 发送包含多个 JSON 对象的 JSON 文件
Sending a JSON file with multiple JSON objects through KAFKA
我有一个包含以下格式的多个 json 文档的文件。
{"attribute1": "value1", "attribute2": "value2", "attribute3": "value3", "attribute4": "value4"}
{"attribute1": "value11", "attribute2": "value12", "attribute3": "value13", "attribute4": "value14"}
{"attribute1": "value21", "attribute22": "value2", "attribute23": "value3", "attribute4": "value24"}
我正在尝试将个人 json 文档发送到 kafka。
该脚本以退出代码 0 执行,但我看不到 KAFKA 消费者收到任何消息。我不确定我哪里出错了。
我的代码如下:
import csv
import json
bootstrap = ['hostname:9092']
valueSerializer = lambda x: dumps(x).encode('utf-8')
producer = KafkaProducer(bootstrap_servers = bootstrap, value_serializer = valueSerializer)
table = []
with open('~/json_file_name.json', 'r') as json_file:
for line in json_file:
table.append(json.loads(line))
#numrows = len(table)
#print(numrows)
for row in table:
print(row)
producer.send('Topic_Name', value=row)
您可能没有为生产者发送足够的数据来刷新其批次。您尚未显示 KafkaProducer 的导入,但看看您是否可以在脚本
末尾执行 producer.flush()
顺便说一句,您不需要 table 变量,只需在读取文件行时发送即可。您也不需要 dumps(x)
因为您正在发送 json.loads
已经
获得的字符串
您也可以删除 csv 导入
我有一个包含以下格式的多个 json 文档的文件。
{"attribute1": "value1", "attribute2": "value2", "attribute3": "value3", "attribute4": "value4"}
{"attribute1": "value11", "attribute2": "value12", "attribute3": "value13", "attribute4": "value14"}
{"attribute1": "value21", "attribute22": "value2", "attribute23": "value3", "attribute4": "value24"}
我正在尝试将个人 json 文档发送到 kafka。 该脚本以退出代码 0 执行,但我看不到 KAFKA 消费者收到任何消息。我不确定我哪里出错了。
我的代码如下:
import csv
import json
bootstrap = ['hostname:9092']
valueSerializer = lambda x: dumps(x).encode('utf-8')
producer = KafkaProducer(bootstrap_servers = bootstrap, value_serializer = valueSerializer)
table = []
with open('~/json_file_name.json', 'r') as json_file:
for line in json_file:
table.append(json.loads(line))
#numrows = len(table)
#print(numrows)
for row in table:
print(row)
producer.send('Topic_Name', value=row)
您可能没有为生产者发送足够的数据来刷新其批次。您尚未显示 KafkaProducer 的导入,但看看您是否可以在脚本
末尾执行producer.flush()
顺便说一句,您不需要 table 变量,只需在读取文件行时发送即可。您也不需要 dumps(x)
因为您正在发送 json.loads
已经
您也可以删除 csv 导入