在 Cloud Composer 上通过 Data Fusion 运算符触发管道时如何指定要使用的 GCP 项目
How to specify which GCP project to use when triggering a pipeline through Data Fusion operator on Cloud Composer
我需要通过 DAG 中的 Data Fusion 运算符 (CloudDataFusionStartPipelineOperator
) 触发位于名为 myDataFusionProject
的 GCP 项目上的 Data Fusion 管道,而 DAG 的 Cloud Composer 实例位于另一个名为myCloudComposerProject
.
我使用 official documentation as well as the source code 编写了大致类似于以下代码片段的代码:
LOCATION = "someLocation"
PIPELINE_NAME = "myDataFusionPipeline"
INSTANCE_NAME = "myDataFusionInstance"
RUNTIME_ARGS = {"output.instance":"someOutputInstance", "input.dataset":"someInputDataset", "input.project":"someInputProject"}
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
runtime_args=RUNTIME_ARGS,
task_id="start_pipeline",
)
我的问题是,每次我触发 DAG 时,Cloud Composer 都会在 myCloudComposerProject
而不是 myDataFusionProject
中寻找 myDataFusionInstance
,这会产生如下错误:
googleapiclient.errors.HttpError: <HttpError 404 when requesting https://datafusion.googleapis.com/v1beta1/projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance?alt=json returned "Resource 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance' was not found". Details: "[{'@type': 'type.googleapis.com/google.rpc.ResourceInfo', 'resourceName': 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance'}]"
所以问题是:如何强制我的操作员使用 Data Fusion 项目而不是 Cloud Composer 项目?我想我可以通过添加新的运行时来做到这一点争论,但我不确定该怎么做。
最后一条信息:Data Fusion 管道只是从 BigQuery 源中提取数据并将所有内容发送到 BigTable 接收器。
作为在气流上开发运算符时的建议,我们应该检查正在实施运算符的 类,因为由于版本控制,文档可能缺少一些信息。
如评论所述,如果您检查 CloudDataFusionStartPipelineOperator
,您会发现它使用了一个钩子来获取基于 project-id
的实例。此项目 ID 是可选的,因此您可以添加自己的 project-id
.
class CloudDataFusionStartPipelineOperator(BaseOperator):
...
def __init__(
...
project_id: Optional[str] = None, ### NOT MENTION IN THE DOCUMENTATION
...
) -> None:
...
self.project_id = project_id
...
def execute(self, context: dict) -> str:
...
instance = hook.get_instance(
instance_name=self.instance_name,
location=self.location,
project_id=self.project_id, ### defaults your project-id
)
api_url = instance["apiEndpoint"]
...
将参数添加到您的操作员调用应该可以解决您的问题。
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
runtime_args=RUNTIME_ARGS,
project_id=PROJECT_ID,
task_id="start_pipeline",
)
作为最后的说明,除了 official documentation site you can also explore the files of apache airflow on github。
我需要通过 DAG 中的 Data Fusion 运算符 (CloudDataFusionStartPipelineOperator
) 触发位于名为 myDataFusionProject
的 GCP 项目上的 Data Fusion 管道,而 DAG 的 Cloud Composer 实例位于另一个名为myCloudComposerProject
.
我使用 official documentation as well as the source code 编写了大致类似于以下代码片段的代码:
LOCATION = "someLocation"
PIPELINE_NAME = "myDataFusionPipeline"
INSTANCE_NAME = "myDataFusionInstance"
RUNTIME_ARGS = {"output.instance":"someOutputInstance", "input.dataset":"someInputDataset", "input.project":"someInputProject"}
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
runtime_args=RUNTIME_ARGS,
task_id="start_pipeline",
)
我的问题是,每次我触发 DAG 时,Cloud Composer 都会在 myCloudComposerProject
而不是 myDataFusionProject
中寻找 myDataFusionInstance
,这会产生如下错误:
googleapiclient.errors.HttpError: <HttpError 404 when requesting https://datafusion.googleapis.com/v1beta1/projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance?alt=json returned "Resource 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance' was not found". Details: "[{'@type': 'type.googleapis.com/google.rpc.ResourceInfo', 'resourceName': 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance'}]"
所以问题是:如何强制我的操作员使用 Data Fusion 项目而不是 Cloud Composer 项目?我想我可以通过添加新的运行时来做到这一点争论,但我不确定该怎么做。
最后一条信息:Data Fusion 管道只是从 BigQuery 源中提取数据并将所有内容发送到 BigTable 接收器。
作为在气流上开发运算符时的建议,我们应该检查正在实施运算符的 类,因为由于版本控制,文档可能缺少一些信息。
如评论所述,如果您检查 CloudDataFusionStartPipelineOperator
,您会发现它使用了一个钩子来获取基于 project-id
的实例。此项目 ID 是可选的,因此您可以添加自己的 project-id
.
class CloudDataFusionStartPipelineOperator(BaseOperator):
...
def __init__(
...
project_id: Optional[str] = None, ### NOT MENTION IN THE DOCUMENTATION
...
) -> None:
...
self.project_id = project_id
...
def execute(self, context: dict) -> str:
...
instance = hook.get_instance(
instance_name=self.instance_name,
location=self.location,
project_id=self.project_id, ### defaults your project-id
)
api_url = instance["apiEndpoint"]
...
将参数添加到您的操作员调用应该可以解决您的问题。
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
runtime_args=RUNTIME_ARGS,
project_id=PROJECT_ID,
task_id="start_pipeline",
)
作为最后的说明,除了 official documentation site you can also explore the files of apache airflow on github。