损坏的 DAG:[/home/airflow/gcs/dags/composer-dataproc-dag.py] 无法导入名称 'email'
Broken DAG: [/home/airflow/gcs/dags/composer-dataproc-dag.py] cannot import name 'email'
我是 GCP 的新手,正在尝试配置一个可以在失败时发送电子邮件的云作曲家管道。
我已经设置了一个 sendgrid 电子邮件 API 并更新了我的 cloud composer 环境,还在气流环境中添加了 project_id 和电子邮件变量。变量,但我收到此错误:
Broken DAG: [/home/airflow/gcs/dags/composer-dataproc-dag.py] cannot import name 'email'
请查看我的代码,让我知道我可以进行哪些更正。
import datetime
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils.dates import days_ago
from airflow.operators import email
project_id = models.Variable.get("project_id")
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': models.Variable.get("email"),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
# The id you will see in the DAG airflow page
"dataproc_workflow_dag",
default_args=default_args,
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_workflow_dag",
# The template id of your workflow
template_id="mywf1",
project_id=project_id,
# The region for the template
# For more info on regions where Dataflow is available see:
# https://cloud.google.com/dataflow/docs/resources/locations
region="us-central1",
)
email_summary = email.EmailOperator(
task_id='email_summary',
to=models.Variable.get('email'),
subject='testing email service of gcp',
html_content="""
Analyzed the Requirements and have created a test POC.Lets see If it works out!
"""
)
我按照给出的代码参考here。
这是 Airflow 版本的问题。
当您查看 Airflow 1.10.15 Python API reference 时,没有 email
模块。
自 version 2.0.0 以来出现。
我是 GCP 的新手,正在尝试配置一个可以在失败时发送电子邮件的云作曲家管道。
我已经设置了一个 sendgrid 电子邮件 API 并更新了我的 cloud composer 环境,还在气流环境中添加了 project_id 和电子邮件变量。变量,但我收到此错误:
Broken DAG: [/home/airflow/gcs/dags/composer-dataproc-dag.py] cannot import name 'email'
请查看我的代码,让我知道我可以进行哪些更正。
import datetime
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils.dates import days_ago
from airflow.operators import email
project_id = models.Variable.get("project_id")
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'airflow',
'start_date': yesterday,
'depends_on_past': False,
'email': models.Variable.get("email"),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
# The id you will see in the DAG airflow page
"dataproc_workflow_dag",
default_args=default_args,
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_workflow_dag",
# The template id of your workflow
template_id="mywf1",
project_id=project_id,
# The region for the template
# For more info on regions where Dataflow is available see:
# https://cloud.google.com/dataflow/docs/resources/locations
region="us-central1",
)
email_summary = email.EmailOperator(
task_id='email_summary',
to=models.Variable.get('email'),
subject='testing email service of gcp',
html_content="""
Analyzed the Requirements and have created a test POC.Lets see If it works out!
"""
)
我按照给出的代码参考here。
这是 Airflow 版本的问题。
当您查看 Airflow 1.10.15 Python API reference 时,没有 email
模块。
自 version 2.0.0 以来出现。