Kafka 将值作为字符串发送:如何将其反序列化并将其转换为具有 Python 的 JSON 对象
Kafka sends value as a string: How can I deserialize it and turn it into a JSON object with Python
我正在尝试使用来自 lambda 函数的 MSK(托管 Amazon Kafka 服务)消息 - MSK 是我的 lambda 的触发器。
制作人长这样:
data = {'time': 1611215510000000000, 'tags': {'tag1': 'tagvalue'}, 'fields': {'value': 12345}}
self.producer = KafkaProducer(
security_protocol=self.security_protocol,
bootstrap_servers=self.kafka_servers,
value_serializer=lambda x: dumps(x).encode('utf-8'))
self.producer.send(kafka_topic, value=data)
在 lambda 函数中,我收到以下内容:
{
"eventSource":"aws:kafka",
"eventSourceArn":"<arn....>",
"bootstrapServers":"<serverlist...>",
"records":{
"topic-0":[
{
"topic":"topic",
"partition":0,
"offset":0,
"timestamp":1611138328871,
"timestampType":"CREATE_TIME",
"value":"eyJ0aW1lIjogMTYxMTEzODI4MDAwMDAwMDAwMCwgInRhZ3MiOiB7InN0YXR1cyI6ICJHb29kIn0sICJmaWVsZHMiOiB7InZhbHVlX251bSI6IDAuMCwgInZhbHVlIjogZmFsc2V9fQ=="
},
{
"topic":"topic",
"partition":0,
"offset":1,
"timestamp":1611138330033,
"timestampType":"CREATE_TIME",
"value":"eyJ0aW1lIjogMTYxMTEzODI4MDAwMDAwMDAwMCwgInRhZ3MiOiB7InN0YXR1cyI6ICJHb29kIn0sICJmaWVsZHMiOiB7InZhbHVlIjogMTQxMzUuMH19"
}
]
}
}
我想将值字符串转换为 JSON 对象。我该怎么做?我已经尝试了很多版本,我认为应该工作的版本抛出异常 (Exception: Expecting value: line 1 column 1 (char 0)
)
records = event['records']['topic-0']
for record in records:
print(json.loads(record['value']).decode('utf-8'))
值字符串似乎是 base64 编码的,因此您需要找到一种解码它们的方法。然后就可以载入了。
使用https://www.base64decode.org/解码的第一个字符串:
{"time": 1611138280000000000, "tags": {"status": "Good"}, "fields": {"value_num": 0.0, "value": false}}
我正在尝试使用来自 lambda 函数的 MSK(托管 Amazon Kafka 服务)消息 - MSK 是我的 lambda 的触发器。
制作人长这样:
data = {'time': 1611215510000000000, 'tags': {'tag1': 'tagvalue'}, 'fields': {'value': 12345}}
self.producer = KafkaProducer(
security_protocol=self.security_protocol,
bootstrap_servers=self.kafka_servers,
value_serializer=lambda x: dumps(x).encode('utf-8'))
self.producer.send(kafka_topic, value=data)
在 lambda 函数中,我收到以下内容:
{
"eventSource":"aws:kafka",
"eventSourceArn":"<arn....>",
"bootstrapServers":"<serverlist...>",
"records":{
"topic-0":[
{
"topic":"topic",
"partition":0,
"offset":0,
"timestamp":1611138328871,
"timestampType":"CREATE_TIME",
"value":"eyJ0aW1lIjogMTYxMTEzODI4MDAwMDAwMDAwMCwgInRhZ3MiOiB7InN0YXR1cyI6ICJHb29kIn0sICJmaWVsZHMiOiB7InZhbHVlX251bSI6IDAuMCwgInZhbHVlIjogZmFsc2V9fQ=="
},
{
"topic":"topic",
"partition":0,
"offset":1,
"timestamp":1611138330033,
"timestampType":"CREATE_TIME",
"value":"eyJ0aW1lIjogMTYxMTEzODI4MDAwMDAwMDAwMCwgInRhZ3MiOiB7InN0YXR1cyI6ICJHb29kIn0sICJmaWVsZHMiOiB7InZhbHVlIjogMTQxMzUuMH19"
}
]
}
}
我想将值字符串转换为 JSON 对象。我该怎么做?我已经尝试了很多版本,我认为应该工作的版本抛出异常 (Exception: Expecting value: line 1 column 1 (char 0)
)
records = event['records']['topic-0']
for record in records:
print(json.loads(record['value']).decode('utf-8'))
值字符串似乎是 base64 编码的,因此您需要找到一种解码它们的方法。然后就可以载入了。
使用https://www.base64decode.org/解码的第一个字符串:
{"time": 1611138280000000000, "tags": {"status": "Good"}, "fields": {"value_num": 0.0, "value": false}}