如何将从 Azure 服务总线收到的批处理字符串消息逐行拆分?

How can I split a batch string message received from Azure Service Bus to row by row?

我是 python 的初学者,我有一个使用时间触发器运行的 Azure 函数。此函数从 Azure 服务总线以字符串格式读取一批原始 JSON 数据。

这是两行数据。实际上,我连续收到大约 50 条这样的消息。现在我想逐行拆分此消息,然后将其存档到 Azure 存储。

消息类似于下面的示例(第 1 行和第 2 行的连接):

{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":4560000,"SiName":"","As":"","PId":2107401,"ICheck":0,"SeeNum":40509704561424,"Type":0,"Counter":34,"PaId":0,"MeType":31,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.0254566,\"10\":-0.054562772}},\"NUMBER_TAG\":\"2145600\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":1,"id":"074222a38-2816-42c7-b95c-6644448ba9d","t":-2}

第 1 行是:

{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}

第 2 行是:

{"Name":"","Seri":4560000,"SiName":"","As":"","PId":2107401,"ICheck":0,"SeeNum":40509704561424,"Type":0,"Counter":34,"PaId":0,"MeType":31,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTTAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-2}

一行的结构如下图:

在我看来,首先我应该拆分每一行,然后创建一个数据框并将每个值插入相关列中。之后,我附加到一个 blob。对吗?

我该怎么办?您建议的解决方案是什么?

已编辑: 我从服务总线读取的代码:

from azure.servicebus import ServiceBusClient, ServiceBusMessage

connection_str = "**"
topic_name = "***"
subscription_name = "***"

servicebus_client = ServiceBusClient.from_connection_string(
    conn_str=connection_str, logging_enable=True)

with servicebus_client:
    # get the Subscription Receiver object for the subscription
    receiver = servicebus_client.get_subscription_receiver(
        topic_name=topic_name, subscription_name=subscription_name, )
    with receiver:
        for msg in receiver:
            print("Received: " + str(msg))
            # complete the message so that the message is removed from the subscription
            receiver.complete_message(msg)

考虑三行示例数据:

data = '{"Name": "Hassan", "code":"12"}{"Name": "Jack", "code":"345"}{"Name": "Jack", "code":"345"}'

以下是从该数据中获取数据框的方法:

from ast import literal_eval
data = [literal_eval(d + '}')for d in data.split('}')[0:-1]]
df = pd.DataFrame.from_records(data)

Output:

     Name code
0  Hassan   12
1    Jack  345
2    Jack  345

由于消息是单独发送的,您可以单独处理它们。无需连接成字符串。只需将它们附加到数据框中即可。以下示例适用于队列,但您可以扩展到 topic/subscription。我还附上了结果,向您展示输出的样子。

from azure.servicebus import ServiceBusClient
import pandas as pd
import json
from pandas import json_normalize

CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
QUEUE_NAME = 'xxxxxxxxxxx'

servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)

with servicebus_client:
    receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
    
    # create an Empty DataFrame object
    df = pd.DataFrame()
    msg_concat = ""
    dfs = []
    with receiver:
        received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
        for msg in received_msgs:
            msg_dict = json.loads(str(msg))
            df2 = json_normalize(msg_dict)
            df = df.append(df2, ignore_index = True)
            receiver.complete_message(msg)
print(df)
print("Receive is done.")


  Name      Seri SiName As  ...  Id  Asse                                    id  t
0       21000000            ...   0  None  075f0a38-2816-42c7-b95c-66c425b8ba9d -1
1        4560000            ...   0  None  075f0a38-2816-42c7-b95c-66c425b8ba9d -2

[2 rows x 21 columns]
Receive is done.