GCP Composer (Airflow) 操作员
GCP Composer (Airflow) operator
我正在使用 GCP Composer API (Airflow) 和我的 DAG 来增加工作人员的数量,不断向我返回以下错误:
Broken DAG: [/home/airflow/gcs/dags/cluster_scale_workers.py] 'module' object has no attribute 'DataProcClusterScaleOperator'
似乎与 ScaleOperator 相关,但是当我查看 Airflow Read the Docs 并与我的代码进行交叉检查时,似乎没有任何问题。我错过了什么?
是否与GCP Airflow版本有关?
代码:
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project'),
'cluster_name': 'hive-cluster'
}
with models.DAG(
'scale_workers',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
scale_to_6_workers = dataproc_operator.DataprocClusterScaleOperator(
task_id='scale_dataproc_cluster_6',
cluster_name='hive-cluster',
num_workers=6,
num_preemptible_workers=3,
dag=dag
)
我设法找到并解决了问题。以上 Ashish Kumar 提供的评论是正确的。
问题是我使用的 Airflow 版本 (1.9.0) 不支持 DataProcClusterScaleOperator
。我通过激活 BETA 并选择版本 1.10.0 创建了另一个实例。
这解决了我的问题。
我正在使用 GCP Composer API (Airflow) 和我的 DAG 来增加工作人员的数量,不断向我返回以下错误:
Broken DAG: [/home/airflow/gcs/dags/cluster_scale_workers.py] 'module' object has no attribute 'DataProcClusterScaleOperator'
似乎与 ScaleOperator 相关,但是当我查看 Airflow Read the Docs 并与我的代码进行交叉检查时,似乎没有任何问题。我错过了什么?
是否与GCP Airflow版本有关?
代码:
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project'),
'cluster_name': 'hive-cluster'
}
with models.DAG(
'scale_workers',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
scale_to_6_workers = dataproc_operator.DataprocClusterScaleOperator(
task_id='scale_dataproc_cluster_6',
cluster_name='hive-cluster',
num_workers=6,
num_preemptible_workers=3,
dag=dag
)
我设法找到并解决了问题。以上 Ashish Kumar 提供的评论是正确的。
问题是我使用的 Airflow 版本 (1.9.0) 不支持 DataProcClusterScaleOperator
。我通过激活 BETA 并选择版本 1.10.0 创建了另一个实例。
这解决了我的问题。