运行 带有 pubsub 推送触发器的云函数
Running a cloud function with a pubsub push trigger
我设置了一个 Python 脚本,它将从一个数据集中获取某些 bigquery 表,使用 SQL 查询清理它们,并将清理后的表添加到新数据集中。该脚本工作正常。我想将其设置为每天午夜触发的云函数。
我还使用云调度程序在每天午夜向 pubsub 主题发送消息。我已经验证这可以正常工作。我是 pubsub 的新手,但我按照文档中的教程进行了操作,并设法设置了一个测试云功能,该功能在从 pubsub 收到推送通知时打印出 hello world。
但是,我的问题是,当我尝试将两者结合起来并使我的脚本自动化时 - 我收到一条日志消息,指出执行崩溃:
Function execution took 1119 ms, finished with status: 'crash'
为了帮助您理解我在做什么,这里是我的 main.py:
中的代码
# Global libraries
import base64
# Local libraries
from scripts.one_minute_tables import helper
def one_minute_tables(event, context):
# Log out the message that triggered the function
print("""This Function was triggered by messageId {} published at {}
""".format(context.event_id, context.timestamp))
# Get the message from the event data
name = base64.b64decode(event['data']).decode('utf-8')
# If it's the message for the daily midnight schedule, execute function
if name == 'midnight':
helper.format_tables('raw_data','table1')
else:
pass
为了方便起见,这是我的python脚本的简化版本:
# Global libraries
from google.cloud import bigquery
import os
# Login to bigquery by providing credentials
credential_path = 'secret.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credential_path
def format_tables(dataset, list_of_tables):
# Initialize the client
client = bigquery.Client()
# Loop through the list of tables
for table in list_of_tables:
# Create the query object
script = f"""
SELECT *
FROM {dataset}.{table}
"""
# Call the API
query = client.query(script)
# Wait for job to finish
results = query.result()
# Print
print('Data cleaned and updated in table: {}.{}'.format(dataset, table))
这是我的文件夹结构:
而我的 requirements.txt
文件只有一个条目:google-cloud-bigquery==1.24.0
非常感谢你帮助我找出我需要修复的问题 运行 这个脚本与 pubsub 触发器没有收到说执行崩溃的日志消息。
编辑:根据评论,这是函数崩溃的日志
{
"textPayload": "Function execution took 1078 ms, finished with status: 'crash'",
"insertId": "000000-689fdf20-aee2-4900-b5a1-91c34d7c1448",
"resource": {
"type": "cloud_function",
"labels": {
"function_name": "one_minute_tables",
"region": "us-central1",
"project_id": "PROJECT_ID"
}
},
"timestamp": "2020-05-15T16:53:53.672758031Z",
"severity": "DEBUG",
"labels": {
"execution_id": "x883cqs07f2w"
},
"logName": "projects/PROJECT_ID/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
"trace": "projects/PROJECT_ID/traces/f391b48a469cbbaeccad5d04b4a704a0",
"receiveTimestamp": "2020-05-15T16:53:53.871051291Z"
}
问题来自 list_of_tables
属性。你这样调用你的函数
if name == 'midnight':
helper.format_tables('raw_data','table1')
然后您迭代 'table1'
参数
执行此操作,它应该有效
if name == 'midnight':
helper.format_tables('raw_data',['table1'])
我设置了一个 Python 脚本,它将从一个数据集中获取某些 bigquery 表,使用 SQL 查询清理它们,并将清理后的表添加到新数据集中。该脚本工作正常。我想将其设置为每天午夜触发的云函数。
我还使用云调度程序在每天午夜向 pubsub 主题发送消息。我已经验证这可以正常工作。我是 pubsub 的新手,但我按照文档中的教程进行了操作,并设法设置了一个测试云功能,该功能在从 pubsub 收到推送通知时打印出 hello world。
但是,我的问题是,当我尝试将两者结合起来并使我的脚本自动化时 - 我收到一条日志消息,指出执行崩溃:
Function execution took 1119 ms, finished with status: 'crash'
为了帮助您理解我在做什么,这里是我的 main.py:
中的代码# Global libraries
import base64
# Local libraries
from scripts.one_minute_tables import helper
def one_minute_tables(event, context):
# Log out the message that triggered the function
print("""This Function was triggered by messageId {} published at {}
""".format(context.event_id, context.timestamp))
# Get the message from the event data
name = base64.b64decode(event['data']).decode('utf-8')
# If it's the message for the daily midnight schedule, execute function
if name == 'midnight':
helper.format_tables('raw_data','table1')
else:
pass
为了方便起见,这是我的python脚本的简化版本:
# Global libraries
from google.cloud import bigquery
import os
# Login to bigquery by providing credentials
credential_path = 'secret.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credential_path
def format_tables(dataset, list_of_tables):
# Initialize the client
client = bigquery.Client()
# Loop through the list of tables
for table in list_of_tables:
# Create the query object
script = f"""
SELECT *
FROM {dataset}.{table}
"""
# Call the API
query = client.query(script)
# Wait for job to finish
results = query.result()
# Print
print('Data cleaned and updated in table: {}.{}'.format(dataset, table))
这是我的文件夹结构:
而我的 requirements.txt
文件只有一个条目:google-cloud-bigquery==1.24.0
非常感谢你帮助我找出我需要修复的问题 运行 这个脚本与 pubsub 触发器没有收到说执行崩溃的日志消息。
编辑:根据评论,这是函数崩溃的日志
{
"textPayload": "Function execution took 1078 ms, finished with status: 'crash'",
"insertId": "000000-689fdf20-aee2-4900-b5a1-91c34d7c1448",
"resource": {
"type": "cloud_function",
"labels": {
"function_name": "one_minute_tables",
"region": "us-central1",
"project_id": "PROJECT_ID"
}
},
"timestamp": "2020-05-15T16:53:53.672758031Z",
"severity": "DEBUG",
"labels": {
"execution_id": "x883cqs07f2w"
},
"logName": "projects/PROJECT_ID/logs/cloudfunctions.googleapis.com%2Fcloud-functions",
"trace": "projects/PROJECT_ID/traces/f391b48a469cbbaeccad5d04b4a704a0",
"receiveTimestamp": "2020-05-15T16:53:53.871051291Z"
}
问题来自 list_of_tables
属性。你这样调用你的函数
if name == 'midnight':
helper.format_tables('raw_data','table1')
然后您迭代 'table1'
参数
执行此操作,它应该有效
if name == 'midnight':
helper.format_tables('raw_data',['table1'])