Airflow 2.0 支持 DataprocClusterCreateOperator

Airflow 2.0 support for DataprocClusterCreateOperator

在我们的项目中,我们使用了 Contrib from airflow.contrib.operators import dataproc_operator 下的 DataprocClusterCreateOperator。它适用于气流版本 1.10.14.

我们正在升级到 Airflow 2.1.2,其中在测试或 dags 时需要旋转 DataProc 集群,我们发现错误 airflow.exceptions.AirflowException: Invalid arguments were passed to DataprocClusterCreateOperator (task_id: <task_id>). Invalid arguments were: **kwargs: {'config_bucket': None, 'autoscale_policy': None}

我在 Airflow 2 中看不到此运算符支持的任何 link,因此我无法识别新参数或发生的更改。 请分享相关的link.

我们正在使用 google-cloud-composer version 1.17.2 Airflow 版本 2.1.2

自 Airflow 2.0 以来,第 3 方提供程序(如本例中的 Google)operators/hooks 已从 Airflow 核心移至单独的提供程序包。您可以阅读更多 here.

由于您使用的是 Cloud Composer,因此已安装 Google 提供程序包。

关于 DataprocClusterCreateOperator,它已重命名为 DataprocCreateClusterOperator 并移至 airflow.providers.google.cloud.operators.dataproc,因此您可以使用以下方式导入它:

from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator

接受的参数与 Airflow 1.x 中包含的参数不同。您可以找到用法示例 here.

可在 here, in the source code. The cluster configuration parameters that can be passed to the operator can be found here.

中找到 Airflow 2DataprocCreateClusterOperator 支持的参数

自 2020 年 1 月 13 日起,DataprocClusterCreateOperator 已根据此 Github commit 重命名为 DataprocCreateClusterOperator,并已从 airflow.contrib.operators 移植到 airflow.providers.google.cloud.operators.dataproc 导入路径。

正如@itroulli 的回答中给出的,可以找到运算符的示例实现 here