使用 DataProcPySparkOperator 时无法配置 GCP 项目
Cannot configure a GCP project when using DataProcPySparkOperator
我正在使用 Cloud Composer 环境来 运行 GCP 项目中的工作流程。我的工作流程之一使用 DataprocClusterCreateOperator
, and then attempts to submit a PySpark job to that cluster using the DataProcPySparkOperator
from the airflow.contrib.operators.dataproc_operator
模块在不同的项目中创建了一个 Dataproc 集群。
要创建集群,我可以指定一个project_id
参数在另一个项目中创建它,但似乎DataProcPySparkOperator
忽略了这个参数。例如,我希望能够传递 project_id
,但当任务 运行s:
时,我最终遇到了 404
错误
from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator
t1 = DataProcPySparkOperator(
project_id='my-gcp-project',
main='...',
arguments=[...],
)
如何使用 DataProcPySparkOperator
提交另一个项目中的作业?
DataProcPySparkOperator
from the airflow.contrib.operators.dataproc_operator
模块在其构造函数中不接受 project_id
kwarg,因此它将始终默认在 Cloud Composer 环境所在的项目中提交 Dataproc 作业。如果参数是通过,然后它被忽略,这会在 运行 任务时导致 404 错误,因为操作员将尝试使用不正确的集群路径轮询作业。
一种解决方法是复制运算符和挂钩,并将其修改为接受项目 ID。但是,如果您使用支持它们的 Airflow 版本,则更简单的解决方案是使用 airflow.providers
包中的较新运算符,因为许多 airflow.contrib
运算符在较新的 Airflow 版本中已弃用。
下面是一个例子。请注意,有一个较新的 DataprocSubmitPySparkJobOperator
in this module, but it is deprecated in favor of DataprocSubmitJobOperator
。因此,您应该使用后者,它接受项目 ID。
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
t1 = DataprocSubmitJobOperator(
project_id='my-gcp-project-id',
location='us-central1',
job={...},
)
如果您 运行 Composer 1.10.5+、Airflow 版本 1.10.6+ 和 Python 3 的环境,这些提供程序已预安装并可立即使用。
我正在使用 Cloud Composer 环境来 运行 GCP 项目中的工作流程。我的工作流程之一使用 DataprocClusterCreateOperator
, and then attempts to submit a PySpark job to that cluster using the DataProcPySparkOperator
from the airflow.contrib.operators.dataproc_operator
模块在不同的项目中创建了一个 Dataproc 集群。
要创建集群,我可以指定一个project_id
参数在另一个项目中创建它,但似乎DataProcPySparkOperator
忽略了这个参数。例如,我希望能够传递 project_id
,但当任务 运行s:
404
错误
from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator
t1 = DataProcPySparkOperator(
project_id='my-gcp-project',
main='...',
arguments=[...],
)
如何使用 DataProcPySparkOperator
提交另一个项目中的作业?
DataProcPySparkOperator
from the airflow.contrib.operators.dataproc_operator
模块在其构造函数中不接受 project_id
kwarg,因此它将始终默认在 Cloud Composer 环境所在的项目中提交 Dataproc 作业。如果参数是通过,然后它被忽略,这会在 运行 任务时导致 404 错误,因为操作员将尝试使用不正确的集群路径轮询作业。
一种解决方法是复制运算符和挂钩,并将其修改为接受项目 ID。但是,如果您使用支持它们的 Airflow 版本,则更简单的解决方案是使用 airflow.providers
包中的较新运算符,因为许多 airflow.contrib
运算符在较新的 Airflow 版本中已弃用。
下面是一个例子。请注意,有一个较新的 DataprocSubmitPySparkJobOperator
in this module, but it is deprecated in favor of DataprocSubmitJobOperator
。因此,您应该使用后者,它接受项目 ID。
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
t1 = DataprocSubmitJobOperator(
project_id='my-gcp-project-id',
location='us-central1',
job={...},
)
如果您 运行 Composer 1.10.5+、Airflow 版本 1.10.6+ 和 Python 3 的环境,这些提供程序已预安装并可立即使用。