损坏的 DAG:[/home/airflow/gcs/dags/composer-dataproc-dag.py] 无法导入名称 'email'

Broken DAG: [/home/airflow/gcs/dags/composer-dataproc-dag.py] cannot import name 'email'

我是 GCP 的新手,正在尝试配置一个可以在失败时发送电子邮件的云作曲家管道。

我已经设置了一个 sendgrid 电子邮件 API 并更新了我的 cloud composer 环境,还在气流环境中添加了 project_id 和电子邮件变量。变量,但我收到此错误:

Broken DAG: [/home/airflow/gcs/dags/composer-dataproc-dag.py] cannot import name 'email' 

请查看我的代码,让我知道我可以进行哪些更正。

import datetime

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils.dates import days_ago
from airflow.operators import email

project_id = models.Variable.get("project_id")
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'airflow',
    'start_date': yesterday,
    'depends_on_past': False,
    'email': models.Variable.get("email"),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "dataproc_workflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
        # The task id of your job
        task_id="dataproc_workflow_dag",
        # The template id of your workflow
        template_id="mywf1",
        project_id=project_id,
        # The region for the template
        # For more info on regions where Dataflow is available see:
        # https://cloud.google.com/dataflow/docs/resources/locations
        region="us-central1",
    )
    email_summary = email.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='testing email service of gcp',
        html_content="""
        Analyzed the Requirements and have created a test POC.Lets see If it works out!
        """
        )

我按照给出的代码参考here

这是 Airflow 版本的问题。

当您查看 Airflow 1.10.15 Python API reference 时,没有 email 模块。

version 2.0.0 以来出现。