从 Cloud Functions 中的 PubSub 日志消息中提取 textPayload

Extract textPayload from PubSub log message in Cloud Functions

我创建了一个将一些日志发送到 Pub/Sub 的工作流。这些日志会触发处理日志并提取 textPayload 的 Cloud Function,但那部分失败了。

云函数代码

def main(event, context):
    print("cf-dataflow-status-checker - main - start")
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')

    print(pubsub_message)
    text_payload = pubsub_message['jsonPayload']['textPayload']
    print(text_payload)
    job_name = pubsub_message['jsonPayload']['resource']['labels']['job_name']
    print(job_name)

    if SUMMARY_ERROR_LOG in text_payload:
        message = 'Dataflow pipeline has failed.'
        proc_id = find_process(job_name)
        print('proc_id: {}'.format(proc_id))
        end_process_error(proc_id, message)
        return {'status': 'KO'}

    print("cf-dataflow-status-checker - main - end")
    return {'status': 'OK'}

text_payload = pubsub_message['jsonPayload']['textPayload']

job_name = pubsub_message['jsonPayload']['resource']['labels']['job_name']

导致以下错误:

text_payload = json_message['jsonPayload']['textPayload']
KeyError: 'jsonPayload'

发布/订阅消息

pubsub_message = {
    "insertId": "000002-e8d21b19-1be9-4f76-8359-2c4bc8d5b451",
    "jsonPayload": {
        "insertId": "11qnus8bu4",
        "timestamp": "2022-01-11T18:15:52.85415621Z",
        "receiveTimestamp": "2022-01-11T18:15:53.543572717Z",
        "textPayload": "Workflow failed. Causes: S11:Merge/_CoGBKImpl/GroupByKey/Read+Merge/_CoGBKImpl/GroupByKey/GroupByWindow+Merge/_CoGBKImpl/MapTuple(collect_values)+Merge/RestoreTags+Process the data+Parse result 1+Parse result 2+Write to txt 1/Write/WriteImpl/Map(<lambda at iobase.py:1126>)+Write to txt 1/Write/WriteImpl/WindowInto(WindowIntoFn)+Write to txt 1/Write/WriteImpl/GroupByKey/Reify+Write to txt 1/Write/WriteImpl/GroupByKey/Write+Write to txt 2/Write/WriteImpl/Map(<lambda at iobase.py:1126>)+Write to txt 2/Write/WriteImpl/WindowInto(WindowIntoFn)+Write to txt 2/Write/WriteImpl/GroupByKey/Reify+Write to txt 2/Write/WriteImpl/GroupByKey/Write failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. Root cause: Work item failed.",
        "labels": {
            "dataflow.googleapis.com/region": "us-central1",
            "dataflow.googleapis.com/job_name": "jobname-test-01/11/2022-18:11:31",
            "dataflow.googleapis.com/job_id": "2022-01-11_10_11_31-8608721464069559920",
            "dataflow.googleapis.com/log_type": "system"
        },
        "resource": {
            "labels": {
                "region": "us-central1",
                "job_name": "jobname-test-01/11/2022-18:11:31",
                "step_id": "",
                "project_id": "{PROJECT_ID}",
                "job_id": "2022-01-11_10_11_31-8608721464069559920"
            },
            "type": "dataflow_step"
        },
        "logName": "projects/{PROJECT_ID}/logs/dataflow.googleapis.com%2Fjob-message"
    },
    "resource": {
        "type": "cloud_function",
        "labels": {
            "region": "europe-west1",
            "project_id": "{PROJECT_ID}",
            "function_name": "cf-dataflow-status-checker"
        }
    },
    "timestamp": "2022-01-11T18:16:01.692Z",
    "severity": "ERROR",
    "labels": {
        "execution_id": "aoov0zbhutdn"
    },
    "logName": "projects/{PROJECT_ID}/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
    "trace": "projects/{PROJECT_ID}/traces/d149bc4fa2472054cf3fe0ec19e2b545",
    "receiveTimestamp": "2022-01-11T18:16:02.110779678Z"
}

我已将 json 复制到我的代码编辑器中,它运行良好...

有人知道发生了什么事吗?

编辑

我已经更新了代码并将响应解析为 JSON,但它仍然错误(同样的错误)。 令人惊讶的是,日志消息似乎也更改了格式:

pubsub_message = {
    "textPayload": "{'insertId': '4gfbllciqn', 'labels': {'dataflow.googleapis.com/job_id': '2022-01-17_02_07_14-4281631210038640640', 'dataflow.googleapis.com/job_name': 'Venezuela-synthetic_scenarios-01/17/2022-10:07:13', 'dataflow.googleapis.com/log_type': 'system', 'dataflow.googleapis.com/region': 'us-central1'}, 'logName': 'projects/{PROJECT_ID}/logs/dataflow.googleapis.com%2Fjob-message', 'receiveTimestamp': '2022-01-17T10:11:17.255522361Z', 'resource': {'labels': {'job_id': '2022-01-17_02_07_14-4281631210038640640', 'job_name': 'Venezuela-synthetic_scenarios-01/17/2022-10:07:13', 'project_id': '667130676776', 'region': 'us-central1', 'step_id': ''}, 'type': 'dataflow_step'}, 'severity': 'ERROR', 'textPayload': 'Workflow failed.... Root cause: Work item failed.', 'timestamp': '2022-01-17T10:11:15.981800283Z'}",
    "insertId": "000002-df693556-bc20-468a-990b-6a65c48d9c63",
    "resource": {
        "type": "cloud_function",
        "labels": {
            "project_id": "{PROJECT_ID}",
            "function_name": "cf-dataflow-status-checker",
            "region": "europe-west1"
        }
    },
    "timestamp": "2022-01-17T10:11:23.323Z",
    "labels": {
        "execution_id": "gbvxjxdemkou"
    },
    "logName": "projects/{PROJECT_ID}/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
    "trace": "projects/{PROJECT_ID}/traces/114aa720bc030b439943e11b21b330e2",
    "receiveTimestamp": "2022-01-17T10:11:23.697331318Z"
}

你的pubsub_message是一个字符串,需要解析为JSON。使用如下内容:

pubsub_message = json.loads(base64.b64decode(event['data']))