有没有办法通过 python 从本地触发云数据融合管道?

Is there a way to trigger cloud data fusion pipeline form local via python?

我正在尝试构建一个代码,我需要在其中从我正在构建的基于桌面的应用程序触发云数据融合管道。目前在 python。任何人都可以建议一种在不使用 google 云数据融合 UI 和一些 python 代码行的情况下启动管道的方法。

正如@mk_sta在推荐中提到的,Cloud Data Fusion pipelines can be triggered using REST API,具体来说:

POST -H "Authorization: Bearer ${AUTH_TOKEN}" "${CDAP_ENDPOINT}/v3/namespaces/<namespace-id>/apps/<pipeline-name>/workflows/DataPipelineWorkflow/start"

您可以参考documentation了解更多信息。

您可能会发现 PycURL library approaching cURL client side HTTP methods sender or Requests 可用于从 Python 代码触发对上述 CDAP REST API 清单的调用。

下面的示例显示了 Python 代码,为 starting
提供了 HTTP POST 方法 PyCurl 中的批处理管道,作为参考,我使用了与@Edwin Elia 提到的文档 link 相同的环境变量:

设置环境变量:

export AUTH_TOKEN=$(gcloud auth print-access-token)

export CDAP_ENDPOINT=$(gcloud beta data-fusion instances describe \
--location=<region> \
--format="value(apiEndpoint)" \
${INSTANCE_ID})v3/namespaces/namespace-id/apps/pipeline-name/workflows/DataPipelineWorkflow/start

Python 代码片段:

import pycurl
import os

CDAP_ENDPOINT = os.environ['CDAP_ENDPOINT']
AUTH_TOKEN = os.environ['AUTH_TOKEN']

c = pycurl.Curl()
c.setopt(pycurl.URL, CDAP_ENDPOINT)
c.setopt(pycurl.HTTPHEADER, ['Authorization: Bearer %s' %(AUTH_TOKEN)])
c.setopt(pycurl.POST, 1)
c.perform()

补充/加入以上两个答案。要在命令行中运行,可以使用curl 进行HTTP 请求:

> export AUTH_TOKEN=$(gcloud auth print-access-token)
> export REGION=your-region
> export INSTANCE_ID=your-instance
> export PIPELINE=your-pipeline
> export NAMESPACE=your-namespace

> export CDAP_ENDPOINT=$(gcloud beta data-fusion instances describe \
    --location=${REGION} \
    --format="value(apiEndpoint)" \
  ${INSTANCE_ID})

> curl -H "Authorization: Bearer ${AUTH_TOKEN}" --request POST "${CDAP_ENDPOINT}/v3/namespaces/${NAMESPACE}/apps/${PIPELINE}/workflows/DataPipelineWorkflow/start"

无需使用命令行的完整 Python 解决方案是:

import requests
import os

# Set environment variables
AUTH_TOKEN = os.popen('gcloud auth print-access-token').read().strip()
REGION = 'your-region'
PIPELINE = 'your-pipeline_name'
NAMESPACE = 'your_namespace'
INSTANCE_ID = 'your-instance-id'

# CDAP endpoint of the instance
CDAP_ENDPOINT = os.popen('gcloud beta data-fusion instances describe --location=' +
                         REGION + ' --format="value(apiEndpoint)" ' + INSTANCE_ID).read().strip()

# Send post request
response = requests.post(CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE + "/apps/" + PIPELINE + "/workflows/DataPipelineWorkflow/start",
                        headers={"Authorization": "Bearer " + AUTH_TOKEN})

确保您已通过 运行 这些命令的身份验证,为此,如前所述,请参阅文档:https://cloud.google.com/data-fusion/docs/reference/cdap-reference