在 Cloud Composer 上通过 Data Fusion 运算符触发管道时如何指定要使用的 GCP 项目

How to specify which GCP project to use when triggering a pipeline through Data Fusion operator on Cloud Composer

我需要通过 DAG 中的 Data Fusion 运算符 (CloudDataFusionStartPipelineOperator) 触发位于名为 myDataFusionProject 的 GCP 项目上的 Data Fusion 管道,而 DAG 的 Cloud Composer 实例位于另一个名为myCloudComposerProject.

我使用 official documentation as well as the source code 编写了大致类似于以下代码片段的代码:

LOCATION = "someLocation"
PIPELINE_NAME = "myDataFusionPipeline"
INSTANCE_NAME = "myDataFusionInstance"
RUNTIME_ARGS = {"output.instance":"someOutputInstance", "input.dataset":"someInputDataset", "input.project":"someInputProject"}

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    runtime_args=RUNTIME_ARGS,
    task_id="start_pipeline",
)

我的问题是,每次我触发 DAG 时,Cloud Composer 都会在 myCloudComposerProject 而不是 myDataFusionProject 中寻找 myDataFusionInstance,这会产生如下错误:

googleapiclient.errors.HttpError: <HttpError 404 when requesting https://datafusion.googleapis.com/v1beta1/projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance?alt=json returned "Resource 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance' was not found". Details: "[{'@type': 'type.googleapis.com/google.rpc.ResourceInfo', 'resourceName': 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance'}]"

所以问题是:如何强制我的操作员使用 Data Fusion 项目而不是 Cloud Composer 项目?我想我可以通过添加新的运行时来做到这一点争论,但我不确定该怎么做。

最后一条信息:Data Fusion 管道只是从 BigQuery 源中提取数据并将所有内容发送到 BigTable 接收器。

作为在气流上开发运算符时的建议,我们应该检查正在实施运算符的 类,因为由于版本控制,文档可能缺少一些信息。

如评论所述,如果您检查 CloudDataFusionStartPipelineOperator,您会发现它使用了一个钩子来获取基于 project-id 的实例。此项目 ID 是可选的,因此您可以添加自己的 project-id.

class CloudDataFusionStartPipelineOperator(BaseOperator):
 ...

    def __init__(
       ...
        project_id: Optional[str] = None,   ### NOT MENTION IN THE DOCUMENTATION 
        ...
    ) -> None:
        ...
        self.project_id = project_id 
        ...

    def execute(self, context: dict) -> str:
        ...
        instance = hook.get_instance(
            instance_name=self.instance_name,
            location=self.location,
            project_id=self.project_id, ### defaults your project-id
        )
        api_url = instance["apiEndpoint"]
        ... 

将参数添加到您的操作员调用应该可以解决您的问题。

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    runtime_args=RUNTIME_ARGS,
    project_id=PROJECT_ID,
    task_id="start_pipeline",
)

作为最后的说明,除了 official documentation site you can also explore the files of apache airflow on github