GCP Pub/Sub - 如何从 BQ 计划查询中检索状态

GCP Pub/Sub - How to retrieve state from BQ scheduled query

我有一个通过 pub/sub 触发云函数的 Big Query 计划查询。

我希望函数从 pub/sub 消息中读取 "state" 值,以便我可以查看它是否成功完成。

以下将始终触发 else 语句。如果删除 if 语句,它将 return 一个 KeyError。

import base64

def hello_pubsub(event, context):
    data = base64.b64decode(event['data']).decode('utf-8')

    if 'state' in data:
        state = data['state']
        print("returned state: " + state)
    else:
        print ("No state attribute found")

这是函数应该接收的 pubsub 消息:

{
"data":
{"dataSourceId": "scheduled_query", 
"destinationDatasetId": "xxxxxxxxxx", 
"emailPreferences": { }, 
"endTime": "2020-03-12T20:40:13.627285Z", 
"errorStatus": { },
"name": "xxxxxxxxxx", "notificationPubsubTopic": "projects/xxxxxxxxxx/topics/xxxxxxxxxx", 
"params": { "destination_table_name_template": "xxxxxxxxxx", "query": "xxxxxxxxxx", "write_disposition": "WRITE_TRUNCATE" }, 
"runTime": "2020-03-05T10:00:00Z", 
"scheduleTime": "2020-03-12T20:37:13.17166Z", 
"startTime": "2020-03-12T20:37:13.328479Z", 
"state": "SUCCEEDED", 
"updateTime": "2020-03-12T20:40:13.627307Z", 
"userId": "xxxxxxxxxx"
}
}

您可以查看 python 库 here

你还有the documentation

最终,您可以检查additional fields added in the JSON notification message

我想通了。

data = base64.b64decode(event['data']).decode('utf-8')

这个returns一个json格式的字符串,不是字典对象。您需要通过以下方式转换为字典:

data_dict = json.loads(data)

为了能够像字典一样访问它。