如何在 Python 中生成 JSON 格式的 Kafka 消息
How to produce Kafka messages with JSON format in Python
如何像原始格式一样删除引号和发送数据
原来的JSON格式是:
{
"@timestamp": "2020-06-02T09:38:03.183186Z"
}
此数据在另一个主题中
"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"
这是服务器之间发送数据的代码
def parse(d):
if str(type(d)) == "<class 'dict'>":
return (r)
return -1
producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
value_serializer=lambda x: dumps(x).encode('utf-8')) # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
auto_offset_reset=param["AUTO_OFFSET_RESET"],
consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
enable_auto_commit=False,
auto_commit_interval_ms=60000,
group_id=param["GROUP_ID"],
client_id=param["CLIENT_ID"]
)
consumer.subscribe([param["TOPIC_IN"]])
while True:
num_rows = 0
for msg in consumer:
num_rows = num_rows + 1
m = json.loads(msg.value)
j = parse(m)
if j != -1:
data = json.dumps(j)
producer.send(param["TOPIC_OUT"], value=data)
您目前正在将您的值序列化为字符串。如果您想要 JSON 而不是字符串,那么您将需要正确序列化您的值。
以下应该可以解决问题:
import json
producer = KafkaProducer(
bootstrap_servers='mykafka-broker',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
如何像原始格式一样删除引号和发送数据 原来的JSON格式是:
{
"@timestamp": "2020-06-02T09:38:03.183186Z"
}
此数据在另一个主题中
"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"
这是服务器之间发送数据的代码
def parse(d):
if str(type(d)) == "<class 'dict'>":
return (r)
return -1
producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
value_serializer=lambda x: dumps(x).encode('utf-8')) # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
auto_offset_reset=param["AUTO_OFFSET_RESET"],
consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
enable_auto_commit=False,
auto_commit_interval_ms=60000,
group_id=param["GROUP_ID"],
client_id=param["CLIENT_ID"]
)
consumer.subscribe([param["TOPIC_IN"]])
while True:
num_rows = 0
for msg in consumer:
num_rows = num_rows + 1
m = json.loads(msg.value)
j = parse(m)
if j != -1:
data = json.dumps(j)
producer.send(param["TOPIC_OUT"], value=data)
您目前正在将您的值序列化为字符串。如果您想要 JSON 而不是字符串,那么您将需要正确序列化您的值。
以下应该可以解决问题:
import json
producer = KafkaProducer(
bootstrap_servers='mykafka-broker',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)