运行 仅当文件存在时才使用数据融合管道

Run a Data Fusion pipeline only when a file exist

我已经在 Data Fusion 中有一个工作管道,它可以处理所有 ETL 过程,但我需要它 运行 只有当它找到位于 Cloud Storage 存储桶中的名为 SUCCESS.txt 的文件时才需要它。
这可能吗?
在其他平台上我使用了一个文件观察器(每分钟 运行s 一个作业来验证我指定的文件是否存在于某个位置,如果文件存在,它会执行其他作业)但我找不到类似的东西。
非常感谢!

您可以通过使用 Cloud Functions GCS triggers with a condition to call the Data Fusion API 启动管道来实现此目的 仅当上传的文件为 SUCCESS.txt.
请注意,无论是否调用 Data Fusion API,该函数都会在 每次文件上传 .

时触发

创建云函数时:

1。选择 Cloud Storage 触发器类型和 Finalize/Create 事件类型。

2。使用您自己的值添加 environment variables,然后单击下一步。

3。将运行时设置为 python 3.7,入口点中 python 函数的名称(在本例中,run_pipeline)并添加您的 python 脚本(或下面的示例) 在 main.py.

import requests
import json
import os

def get_access_token():

    # scope of the API access. Limit it to just cloud platform services
    scopes='https://www.googleapis.com/auth/cloud-platform'
    headers={'Metadata-Flavor': 'Google'}

    # add the scopes
    api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes

    # api call to get the access token
    r = requests.get(api,headers=headers).json()

    # return the access token
    return r['access_token']

def run_pipeline(data, context):
    '''
    Calls the Data Fusion API to start the pipeline
    '''
    
    # get environmental variables set in the inital configuraiton.
    PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
    TOPIC_ID=os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
    PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
    INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
    REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
    NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
    CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')

    # get uploaded file name
    file_name = data['name']
    
    # get access token
    auth_token=get_access_token()
    
    # api call full endpoint
    post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
    
    # If the pipeline has any macros that need to be set, you can pass them in as a payload
    data = '{"my-file":' + file_name +'}'
    
    # add bearer token to the header
    post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
    
    # condition to start the job:
    if file_name == 'SUCCESS.txt':
        # start the job
        r1 = requests.post(post_endpoint,data=data,headers=post_headers)

4。部署您的函数,准备就绪后,通过上传您的 SUCCESS.txt 文件或任何其他文件对其进行测试。

我已经测试过了,它工作正常(基于此post)。