如何将从 RabbitMQ 队列接收到的 JSON 转换为 CSV?
How to convert JSON received from a RabbitMQ Queue to CSV?
目前我正在使用来自我组织内的 RabbitMQ 队列的消息。每天我都需要将收到的所有消息推送到一个 csv,它最终会以 table 的形式出现在 Datawarehouse 中。
代码总是监听队列,理想情况下我想将数据流式传输到 csv。
#callback funtion on receiving messages
def onMessage(channel, method, properties, body):
print(body)
while True:
try:
#connect
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials))
channel = connection.channel()
channel.basic_consume(on_message_callback = onMessage, queue = queueName, auto_ack = True)
channel.start_consuming()
开始使用队列后收到的输出如下:这是收到的一行数据。它基本上 returns 一个 json 对象但是 b'{"metrics": 在使用 json 对象时需要删除。
b'{"metrics":[{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"deviceHealthScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099642,"unit":"count","value":"10.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"configAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"1.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"imageAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"1.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"vulnerabilityAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"10.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"overallAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099642,"unit":"count","value":"5.5"}],"emr_published_on":1582886099642}'
b'...'
只是意味着你得到了一个 json 模块可以愉快处理的字节串。你会得到一个字典,对于 metrics
键,它具有字典列表的价值。该列表可以直接提供一个 DataFrame。
这意味着您可以像这样简单地处理它:
df = pd.DataFrame(json.loads(body)['metrics'])
目前我正在使用来自我组织内的 RabbitMQ 队列的消息。每天我都需要将收到的所有消息推送到一个 csv,它最终会以 table 的形式出现在 Datawarehouse 中。
代码总是监听队列,理想情况下我想将数据流式传输到 csv。
#callback funtion on receiving messages
def onMessage(channel, method, properties, body):
print(body)
while True:
try:
#connect
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials))
channel = connection.channel()
channel.basic_consume(on_message_callback = onMessage, queue = queueName, auto_ack = True)
channel.start_consuming()
开始使用队列后收到的输出如下:这是收到的一行数据。它基本上 returns 一个 json 对象但是 b'{"metrics": 在使用 json 对象时需要删除。
b'{"metrics":[{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"deviceHealthScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099642,"unit":"count","value":"10.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"configAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"1.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"imageAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"1.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"vulnerabilityAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"10.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"overallAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099642,"unit":"count","value":"5.5"}],"emr_published_on":1582886099642}'
b'...'
只是意味着你得到了一个 json 模块可以愉快处理的字节串。你会得到一个字典,对于 metrics
键,它具有字典列表的价值。该列表可以直接提供一个 DataFrame。
这意味着您可以像这样简单地处理它:
df = pd.DataFrame(json.loads(body)['metrics'])