向 CloudDataFusionPipelineStateSensor 传递了无效参数 'failure_statuses'
Invalid arguments were passed to CloudDataFusionPipelineStateSensor 'failure_statuses'
我正在尝试使用 Cloud Composer 检查 Data Fusion 管道的状态。在 DAG 中,我有以下代码,它是 Airflow website:
的副本
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="start_pipeline_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_task.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION)
但是,当我尝试 运行 Cloud Composer 中的 DAG 时,我收到 failure_statuses 的错误参数无效错误。参数也在源代码中 here.
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 178, in apply_defaults
result = func(self, *args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 506, in __init__
raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to CloudDataFusionPipelineStateSensor (task_id: start_pipeline_sensor). Invalid arguments were:
**kwargs: {'failure_statuses': ['FAILED']}
这可能是什么原因造成的?它在没有 failure_statuses 参数的情况下工作正常。
CloudDataFusionPipelineStateSensor
的 failure_statuses
参数直到 Airflow 中 Google 提供程序的 v6.0.0 才被引入。示例 DAG 反映了具有此版本的提供程序。尝试升级到最新的 Google 提供商,示例应该可以工作。
请注意,提供程序的 v5.1.0 到 v6.0.0 之间存在一些重大变化。
关于在 Airflow 中查看源代码的附注。从 Airflow 2 开始,核心 Airflow 的发布和与服务提供者相关的功能(例如挂钩、运算符、Google 的传感器、Databricks 等)已经分离。这意味着可以独立于核心 Airflow 发布提供程序功能。提供商通常每月发布一次。 Airflow 中的 main
分支反映了最新的代码库,但 而不是 意味着它反映了最新的 available 代码。为确保您正在查看已安装的提供程序版本的正确代码,请在搜索源代码时使用标签:
我正在尝试使用 Cloud Composer 检查 Data Fusion 管道的状态。在 DAG 中,我有以下代码,它是 Airflow website:
的副本start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="start_pipeline_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_task.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION)
但是,当我尝试 运行 Cloud Composer 中的 DAG 时,我收到 failure_statuses 的错误参数无效错误。参数也在源代码中 here.
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 178, in apply_defaults
result = func(self, *args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 506, in __init__
raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to CloudDataFusionPipelineStateSensor (task_id: start_pipeline_sensor). Invalid arguments were:
**kwargs: {'failure_statuses': ['FAILED']}
这可能是什么原因造成的?它在没有 failure_statuses 参数的情况下工作正常。
CloudDataFusionPipelineStateSensor
的 failure_statuses
参数直到 Airflow 中 Google 提供程序的 v6.0.0 才被引入。示例 DAG 反映了具有此版本的提供程序。尝试升级到最新的 Google 提供商,示例应该可以工作。
请注意,提供程序的 v5.1.0 到 v6.0.0 之间存在一些重大变化。
关于在 Airflow 中查看源代码的附注。从 Airflow 2 开始,核心 Airflow 的发布和与服务提供者相关的功能(例如挂钩、运算符、Google 的传感器、Databricks 等)已经分离。这意味着可以独立于核心 Airflow 发布提供程序功能。提供商通常每月发布一次。 Airflow 中的 main
分支反映了最新的代码库,但 而不是 意味着它反映了最新的 available 代码。为确保您正在查看已安装的提供程序版本的正确代码,请在搜索源代码时使用标签: