运行 仅当文件存在时才使用数据融合管道
Run a Data Fusion pipeline only when a file exist
我已经在 Data Fusion 中有一个工作管道,它可以处理所有 ETL 过程,但我需要它 运行 只有当它找到位于 Cloud Storage 存储桶中的名为 SUCCESS.txt 的文件时才需要它。
这可能吗?
在其他平台上我使用了一个文件观察器(每分钟 运行s 一个作业来验证我指定的文件是否存在于某个位置,如果文件存在,它会执行其他作业)但我找不到类似的东西。
非常感谢!
您可以通过使用 Cloud Functions GCS triggers with a condition to call the Data Fusion API 启动管道来实现此目的 仅当上传的文件为 SUCCESS.txt
.
时
请注意,无论是否调用 Data Fusion API,该函数都会在 每次文件上传 .
时触发
创建云函数时:
1。选择 Cloud Storage 触发器类型和 Finalize/Create 事件类型。
2。使用您自己的值添加 environment variables,然后单击下一步。
3。将运行时设置为 python 3.7,入口点中 python 函数的名称(在本例中,run_pipeline
)并添加您的 python 脚本(或下面的示例) 在 main.py
.
import requests
import json
import os
def get_access_token():
# scope of the API access. Limit it to just cloud platform services
scopes='https://www.googleapis.com/auth/cloud-platform'
headers={'Metadata-Flavor': 'Google'}
# add the scopes
api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes
# api call to get the access token
r = requests.get(api,headers=headers).json()
# return the access token
return r['access_token']
def run_pipeline(data, context):
'''
Calls the Data Fusion API to start the pipeline
'''
# get environmental variables set in the inital configuraiton.
PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
TOPIC_ID=os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')
# get uploaded file name
file_name = data['name']
# get access token
auth_token=get_access_token()
# api call full endpoint
post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
# If the pipeline has any macros that need to be set, you can pass them in as a payload
data = '{"my-file":' + file_name +'}'
# add bearer token to the header
post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
# condition to start the job:
if file_name == 'SUCCESS.txt':
# start the job
r1 = requests.post(post_endpoint,data=data,headers=post_headers)
4。部署您的函数,准备就绪后,通过上传您的 SUCCESS.txt
文件或任何其他文件对其进行测试。
我已经测试过了,它工作正常(基于此post)。
我已经在 Data Fusion 中有一个工作管道,它可以处理所有 ETL 过程,但我需要它 运行 只有当它找到位于 Cloud Storage 存储桶中的名为 SUCCESS.txt 的文件时才需要它。
这可能吗?
在其他平台上我使用了一个文件观察器(每分钟 运行s 一个作业来验证我指定的文件是否存在于某个位置,如果文件存在,它会执行其他作业)但我找不到类似的东西。
非常感谢!
您可以通过使用 Cloud Functions GCS triggers with a condition to call the Data Fusion API 启动管道来实现此目的 仅当上传的文件为 SUCCESS.txt
.
时
请注意,无论是否调用 Data Fusion API,该函数都会在 每次文件上传 .
创建云函数时:
1。选择 Cloud Storage 触发器类型和 Finalize/Create 事件类型。
2。使用您自己的值添加 environment variables,然后单击下一步。
3。将运行时设置为 python 3.7,入口点中 python 函数的名称(在本例中,run_pipeline
)并添加您的 python 脚本(或下面的示例) 在 main.py
.
import requests
import json
import os
def get_access_token():
# scope of the API access. Limit it to just cloud platform services
scopes='https://www.googleapis.com/auth/cloud-platform'
headers={'Metadata-Flavor': 'Google'}
# add the scopes
api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes
# api call to get the access token
r = requests.get(api,headers=headers).json()
# return the access token
return r['access_token']
def run_pipeline(data, context):
'''
Calls the Data Fusion API to start the pipeline
'''
# get environmental variables set in the inital configuraiton.
PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
TOPIC_ID=os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')
# get uploaded file name
file_name = data['name']
# get access token
auth_token=get_access_token()
# api call full endpoint
post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
# If the pipeline has any macros that need to be set, you can pass them in as a payload
data = '{"my-file":' + file_name +'}'
# add bearer token to the header
post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
# condition to start the job:
if file_name == 'SUCCESS.txt':
# start the job
r1 = requests.post(post_endpoint,data=data,headers=post_headers)
4。部署您的函数,准备就绪后,通过上传您的 SUCCESS.txt
文件或任何其他文件对其进行测试。
我已经测试过了,它工作正常(基于此post)。