从 Cloud Functions 中的 PubSub 日志消息中提取 textPayload
Extract textPayload from PubSub log message in Cloud Functions
我创建了一个将一些日志发送到 Pub/Sub 的工作流。这些日志会触发处理日志并提取 textPayload 的 Cloud Function,但那部分失败了。
云函数代码
def main(event, context):
print("cf-dataflow-status-checker - main - start")
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
print(pubsub_message)
text_payload = pubsub_message['jsonPayload']['textPayload']
print(text_payload)
job_name = pubsub_message['jsonPayload']['resource']['labels']['job_name']
print(job_name)
if SUMMARY_ERROR_LOG in text_payload:
message = 'Dataflow pipeline has failed.'
proc_id = find_process(job_name)
print('proc_id: {}'.format(proc_id))
end_process_error(proc_id, message)
return {'status': 'KO'}
print("cf-dataflow-status-checker - main - end")
return {'status': 'OK'}
行
text_payload = pubsub_message['jsonPayload']['textPayload']
和
job_name = pubsub_message['jsonPayload']['resource']['labels']['job_name']
导致以下错误:
text_payload = json_message['jsonPayload']['textPayload']
KeyError: 'jsonPayload'
发布/订阅消息
pubsub_message = {
"insertId": "000002-e8d21b19-1be9-4f76-8359-2c4bc8d5b451",
"jsonPayload": {
"insertId": "11qnus8bu4",
"timestamp": "2022-01-11T18:15:52.85415621Z",
"receiveTimestamp": "2022-01-11T18:15:53.543572717Z",
"textPayload": "Workflow failed. Causes: S11:Merge/_CoGBKImpl/GroupByKey/Read+Merge/_CoGBKImpl/GroupByKey/GroupByWindow+Merge/_CoGBKImpl/MapTuple(collect_values)+Merge/RestoreTags+Process the data+Parse result 1+Parse result 2+Write to txt 1/Write/WriteImpl/Map(<lambda at iobase.py:1126>)+Write to txt 1/Write/WriteImpl/WindowInto(WindowIntoFn)+Write to txt 1/Write/WriteImpl/GroupByKey/Reify+Write to txt 1/Write/WriteImpl/GroupByKey/Write+Write to txt 2/Write/WriteImpl/Map(<lambda at iobase.py:1126>)+Write to txt 2/Write/WriteImpl/WindowInto(WindowIntoFn)+Write to txt 2/Write/WriteImpl/GroupByKey/Reify+Write to txt 2/Write/WriteImpl/GroupByKey/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. Root cause: Work item failed.",
"labels": {
"dataflow.googleapis.com/region": "us-central1",
"dataflow.googleapis.com/job_name": "jobname-test-01/11/2022-18:11:31",
"dataflow.googleapis.com/job_id": "2022-01-11_10_11_31-8608721464069559920",
"dataflow.googleapis.com/log_type": "system"
},
"resource": {
"labels": {
"region": "us-central1",
"job_name": "jobname-test-01/11/2022-18:11:31",
"step_id": "",
"project_id": "{PROJECT_ID}",
"job_id": "2022-01-11_10_11_31-8608721464069559920"
},
"type": "dataflow_step"
},
"logName": "projects/{PROJECT_ID}/logs/dataflow.googleapis.com%2Fjob-message"
},
"resource": {
"type": "cloud_function",
"labels": {
"region": "europe-west1",
"project_id": "{PROJECT_ID}",
"function_name": "cf-dataflow-status-checker"
}
},
"timestamp": "2022-01-11T18:16:01.692Z",
"severity": "ERROR",
"labels": {
"execution_id": "aoov0zbhutdn"
},
"logName": "projects/{PROJECT_ID}/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
"trace": "projects/{PROJECT_ID}/traces/d149bc4fa2472054cf3fe0ec19e2b545",
"receiveTimestamp": "2022-01-11T18:16:02.110779678Z"
}
我已将 json 复制到我的代码编辑器中,它运行良好...
有人知道发生了什么事吗?
编辑
我已经更新了代码并将响应解析为 JSON,但它仍然错误(同样的错误)。
令人惊讶的是,日志消息似乎也更改了格式:
pubsub_message = {
"textPayload": "{'insertId': '4gfbllciqn', 'labels': {'dataflow.googleapis.com/job_id': '2022-01-17_02_07_14-4281631210038640640', 'dataflow.googleapis.com/job_name': 'Venezuela-synthetic_scenarios-01/17/2022-10:07:13', 'dataflow.googleapis.com/log_type': 'system', 'dataflow.googleapis.com/region': 'us-central1'}, 'logName': 'projects/{PROJECT_ID}/logs/dataflow.googleapis.com%2Fjob-message', 'receiveTimestamp': '2022-01-17T10:11:17.255522361Z', 'resource': {'labels': {'job_id': '2022-01-17_02_07_14-4281631210038640640', 'job_name': 'Venezuela-synthetic_scenarios-01/17/2022-10:07:13', 'project_id': '667130676776', 'region': 'us-central1', 'step_id': ''}, 'type': 'dataflow_step'}, 'severity': 'ERROR', 'textPayload': 'Workflow failed.... Root cause: Work item failed.', 'timestamp': '2022-01-17T10:11:15.981800283Z'}",
"insertId": "000002-df693556-bc20-468a-990b-6a65c48d9c63",
"resource": {
"type": "cloud_function",
"labels": {
"project_id": "{PROJECT_ID}",
"function_name": "cf-dataflow-status-checker",
"region": "europe-west1"
}
},
"timestamp": "2022-01-17T10:11:23.323Z",
"labels": {
"execution_id": "gbvxjxdemkou"
},
"logName": "projects/{PROJECT_ID}/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
"trace": "projects/{PROJECT_ID}/traces/114aa720bc030b439943e11b21b330e2",
"receiveTimestamp": "2022-01-17T10:11:23.697331318Z"
}
你的pubsub_message是一个字符串,需要解析为JSON。使用如下内容:
pubsub_message = json.loads(base64.b64decode(event['data']))
我创建了一个将一些日志发送到 Pub/Sub 的工作流。这些日志会触发处理日志并提取 textPayload 的 Cloud Function,但那部分失败了。
云函数代码
def main(event, context):
print("cf-dataflow-status-checker - main - start")
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
print(pubsub_message)
text_payload = pubsub_message['jsonPayload']['textPayload']
print(text_payload)
job_name = pubsub_message['jsonPayload']['resource']['labels']['job_name']
print(job_name)
if SUMMARY_ERROR_LOG in text_payload:
message = 'Dataflow pipeline has failed.'
proc_id = find_process(job_name)
print('proc_id: {}'.format(proc_id))
end_process_error(proc_id, message)
return {'status': 'KO'}
print("cf-dataflow-status-checker - main - end")
return {'status': 'OK'}
行
text_payload = pubsub_message['jsonPayload']['textPayload']
和
job_name = pubsub_message['jsonPayload']['resource']['labels']['job_name']
导致以下错误:
text_payload = json_message['jsonPayload']['textPayload']
KeyError: 'jsonPayload'
发布/订阅消息
pubsub_message = {
"insertId": "000002-e8d21b19-1be9-4f76-8359-2c4bc8d5b451",
"jsonPayload": {
"insertId": "11qnus8bu4",
"timestamp": "2022-01-11T18:15:52.85415621Z",
"receiveTimestamp": "2022-01-11T18:15:53.543572717Z",
"textPayload": "Workflow failed. Causes: S11:Merge/_CoGBKImpl/GroupByKey/Read+Merge/_CoGBKImpl/GroupByKey/GroupByWindow+Merge/_CoGBKImpl/MapTuple(collect_values)+Merge/RestoreTags+Process the data+Parse result 1+Parse result 2+Write to txt 1/Write/WriteImpl/Map(<lambda at iobase.py:1126>)+Write to txt 1/Write/WriteImpl/WindowInto(WindowIntoFn)+Write to txt 1/Write/WriteImpl/GroupByKey/Reify+Write to txt 1/Write/WriteImpl/GroupByKey/Write+Write to txt 2/Write/WriteImpl/Map(<lambda at iobase.py:1126>)+Write to txt 2/Write/WriteImpl/WindowInto(WindowIntoFn)+Write to txt 2/Write/WriteImpl/GroupByKey/Reify+Write to txt 2/Write/WriteImpl/GroupByKey/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. Root cause: Work item failed.",
"labels": {
"dataflow.googleapis.com/region": "us-central1",
"dataflow.googleapis.com/job_name": "jobname-test-01/11/2022-18:11:31",
"dataflow.googleapis.com/job_id": "2022-01-11_10_11_31-8608721464069559920",
"dataflow.googleapis.com/log_type": "system"
},
"resource": {
"labels": {
"region": "us-central1",
"job_name": "jobname-test-01/11/2022-18:11:31",
"step_id": "",
"project_id": "{PROJECT_ID}",
"job_id": "2022-01-11_10_11_31-8608721464069559920"
},
"type": "dataflow_step"
},
"logName": "projects/{PROJECT_ID}/logs/dataflow.googleapis.com%2Fjob-message"
},
"resource": {
"type": "cloud_function",
"labels": {
"region": "europe-west1",
"project_id": "{PROJECT_ID}",
"function_name": "cf-dataflow-status-checker"
}
},
"timestamp": "2022-01-11T18:16:01.692Z",
"severity": "ERROR",
"labels": {
"execution_id": "aoov0zbhutdn"
},
"logName": "projects/{PROJECT_ID}/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
"trace": "projects/{PROJECT_ID}/traces/d149bc4fa2472054cf3fe0ec19e2b545",
"receiveTimestamp": "2022-01-11T18:16:02.110779678Z"
}
我已将 json 复制到我的代码编辑器中,它运行良好...
有人知道发生了什么事吗?
编辑
我已经更新了代码并将响应解析为 JSON,但它仍然错误(同样的错误)。 令人惊讶的是,日志消息似乎也更改了格式:
pubsub_message = {
"textPayload": "{'insertId': '4gfbllciqn', 'labels': {'dataflow.googleapis.com/job_id': '2022-01-17_02_07_14-4281631210038640640', 'dataflow.googleapis.com/job_name': 'Venezuela-synthetic_scenarios-01/17/2022-10:07:13', 'dataflow.googleapis.com/log_type': 'system', 'dataflow.googleapis.com/region': 'us-central1'}, 'logName': 'projects/{PROJECT_ID}/logs/dataflow.googleapis.com%2Fjob-message', 'receiveTimestamp': '2022-01-17T10:11:17.255522361Z', 'resource': {'labels': {'job_id': '2022-01-17_02_07_14-4281631210038640640', 'job_name': 'Venezuela-synthetic_scenarios-01/17/2022-10:07:13', 'project_id': '667130676776', 'region': 'us-central1', 'step_id': ''}, 'type': 'dataflow_step'}, 'severity': 'ERROR', 'textPayload': 'Workflow failed.... Root cause: Work item failed.', 'timestamp': '2022-01-17T10:11:15.981800283Z'}",
"insertId": "000002-df693556-bc20-468a-990b-6a65c48d9c63",
"resource": {
"type": "cloud_function",
"labels": {
"project_id": "{PROJECT_ID}",
"function_name": "cf-dataflow-status-checker",
"region": "europe-west1"
}
},
"timestamp": "2022-01-17T10:11:23.323Z",
"labels": {
"execution_id": "gbvxjxdemkou"
},
"logName": "projects/{PROJECT_ID}/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
"trace": "projects/{PROJECT_ID}/traces/114aa720bc030b439943e11b21b330e2",
"receiveTimestamp": "2022-01-17T10:11:23.697331318Z"
}
你的pubsub_message是一个字符串,需要解析为JSON。使用如下内容:
pubsub_message = json.loads(base64.b64decode(event['data']))