向 CloudDataFusionPipelineStateSensor 传递了无效参数 'failure_statuses'

Invalid arguments were passed to CloudDataFusionPipelineStateSensor 'failure_statuses'

我正在尝试使用 Cloud Composer 检查 Data Fusion 管道的状态。在 DAG 中,我有以下代码,它是 Airflow website:

的副本
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
    task_id="start_pipeline_sensor",
    pipeline_name=PIPELINE_NAME,
    pipeline_id=start_pipeline_task.output,
    expected_statuses=["COMPLETED"],
    failure_statuses=["FAILED"],
    instance_name=INSTANCE_NAME,
    location=LOCATION)

但是,当我尝试 运行 Cloud Composer 中的 DAG 时,我收到 failure_statuses 的错误参数无效错误。参数也在源代码中 here.

File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 178, in apply_defaults
    result = func(self, *args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 506, in __init__
    raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to CloudDataFusionPipelineStateSensor (task_id: start_pipeline_sensor). Invalid arguments were:
**kwargs: {'failure_statuses': ['FAILED']}

这可能是什么原因造成的?它在没有 failure_statuses 参数的情况下工作正常。

CloudDataFusionPipelineStateSensorfailure_statuses 参数直到 Airflow 中 Google 提供程序的 v6.0.0 才被引入。示例 DAG 反映了具有此版本的提供程序。尝试升级到最新的 Google 提供商,示例应该可以工作。

请注意,提供程序的 v5.1.0 到 v6.0.0 之间存在一些重大变化。

关于在 Airflow 中查看源代码的附注。从 Airflow 2 开始,核心 Airflow 的发布和与服务提供者相关的功能(例如挂钩、运算符、Google 的传感器、Databricks 等)已经分离。这意味着可以独立于核心 Airflow 发布提供程序功能。提供商通常每月发布一次。 Airflow 中的 main 分支反映了最新的代码库,但 而不是 意味着它反映了最新的 available 代码。为确保您正在查看已安装的提供程序版本的正确代码,请在搜索源代码时使用标签: