运行 带有 pubsub 推送触发器的云函数

Running a cloud function with a pubsub push trigger

我设置了一个 Python 脚本,它将从一个数据集中获取某些 bigquery 表,使用 SQL 查询清理它们,并将清理后的表添加到新数据集中。该脚本工作正常。我想将其设置为每天午夜触发的云函数。

我还使用云调度程序在每天午夜向 pubsub 主题发送消息。我已经验证这可以正常工作。我是 pubsub 的新手,但我按照文档中的教程进行了操作,并设法设置了一个测试云功能,该功能在从 pubsub 收到推送通知时打印出 hello world。

但是,我的问题是,当我尝试将两者结合起来并使我的脚本自动化时 - 我收到一条日志消息,指出执行崩溃:

Function execution took 1119 ms, finished with status: 'crash'

为了帮助您理解我在做什么,这里是我的 main.py:

中的代码
# Global libraries
import base64

# Local libraries
from scripts.one_minute_tables import helper

def one_minute_tables(event, context):

    # Log out the message that triggered the function
    print("""This Function was triggered by messageId {} published at {}
    """.format(context.event_id, context.timestamp))

    # Get the message from the event data
    name = base64.b64decode(event['data']).decode('utf-8')

    # If it's the message for the daily midnight schedule, execute function
    if name == 'midnight':
        helper.format_tables('raw_data','table1')
    else:
        pass

为了方便起见,这是我的python脚本的简化版本:

# Global libraries
from google.cloud import bigquery
import os

# Login to bigquery by providing credentials
credential_path = 'secret.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credential_path

def format_tables(dataset, list_of_tables):

    # Initialize the client
    client = bigquery.Client()

    # Loop through the list of tables
    for table in list_of_tables:

        # Create the query object
        script = f"""
            SELECT *
            FROM {dataset}.{table}
        """

        # Call the API
        query = client.query(script)

        # Wait for job to finish
        results = query.result()

        # Print
        print('Data cleaned and updated in table: {}.{}'.format(dataset, table))

这是我的文件夹结构:

而我的 requirements.txt 文件只有一个条目:google-cloud-bigquery==1.24.0

非常感谢你帮助我找出我需要修复的问题 运行 这个脚本与 pubsub 触发器没有收到说执行崩溃的日志消息。

编辑:根据评论,这是函数崩溃的日志

{
  "textPayload": "Function execution took 1078 ms, finished with status: 'crash'",
  "insertId": "000000-689fdf20-aee2-4900-b5a1-91c34d7c1448",
  "resource": {
    "type": "cloud_function",
    "labels": {
      "function_name": "one_minute_tables",
      "region": "us-central1",
      "project_id": "PROJECT_ID"
    }
  },
  "timestamp": "2020-05-15T16:53:53.672758031Z",
  "severity": "DEBUG",
  "labels": {
    "execution_id": "x883cqs07f2w"
  },
  "logName": "projects/PROJECT_ID/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
  "trace": "projects/PROJECT_ID/traces/f391b48a469cbbaeccad5d04b4a704a0",
  "receiveTimestamp": "2020-05-15T16:53:53.871051291Z"
}

问题来自 list_of_tables 属性。你这样调用你的函数

    if name == 'midnight':
        helper.format_tables('raw_data','table1')

然后您迭代 'table1' 参数

执行此操作,它应该有效

    if name == 'midnight':
        helper.format_tables('raw_data',['table1'])