无法使用 Sendgrid 在 GCP 作曲家中获取任务失败、成功和重试通知

Unable to get task failure, success and retry notifications in GCP composer using Sendgrid

我想在 GCP composer 中使用 Sendgrid 接收有关任务成功、失败和重试的电子邮件通知。

目前,我的 DAG 中的所有任务都已 运行 成功。在那种情况下我想收到通知。

此外,当某些任务失败或重试时,我也想收到这些通知。我执行了以下步骤,当我强制任务失败时没有收到任何通知。

  1. 创建了 GCP Composer 环境,添加了环境变量。
SENDGRID_MAIL_FROM : abc@gmail.com

SENDGRID_API_KEY : 
  1. 创建于 DAG 之后。
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.operators.email_operator import EmailOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,    
    'start_date': datetime(2020, 3, 30),
    'email': ['abc@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),

}


schedule_interval = "05 23 * * *"


dag = DAG(
    'DAG_NAME', 
    default_args=default_args, 
    schedule_interval=schedule_interval
    )

# Config variables
BQ_CONN_ID = ""
BQ_PROJECT = ""
BQ_DATASET = ""

## Task 1
t1 = BigQueryCheckOperator(----)

## Task 2
t2 = BigQueryCheckOperator(----)

## Task 3
t3 = BigQueryOperator(----)

t4  = EmailOperator(
        task_id='send_email',
        to='abc@gmail.com',
        subject='Airflow Alert',
        html_content=""" <h3>Email Test</h3> """,
        dag=dag

    )
# Setting up Dependencies
t1>>t2>>t3>>t4

我错过了什么吗?请告诉我需要做什么,谢谢。

首先,您需要检查您使用的是哪个版本的 Composer 和 Sendgrid。

例如,airflow-1.10.3 支持的最新版本的 Sendgrid 是 v5.6.0。您可以参考气流的 setup.py 以了解为特定气流版本安装了哪些依赖项。

我建议您再次检查 the instructions 以使用 Cloud Composer 设置 Sendgrid。确保几件事:

  • 您按照指南设置了环境变量,要将 Sendgrid 配置为您的电子邮件服务器,您需要获得您的 SENDGRID_API_KEY(您是否已在正确的权限下生成它?至少,密钥必须有 Mail send 发送电子邮件的权限)和 SENDGRID_MAIL_FROM(结构是否正确?noreply-composer@)作为环境变量。
  • 在您的 airflow.cfg 文件中,检查 email_backend 变量是否设置为使用 Sendgrid:
email_backend = airflow.contrib.utils.sendgrid.send_email
  • 尝试发送测试 DAG,如指南所述,例如您可以使用:
from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'name.surname',
    'start_date': days_ago(1),
    'email_on_failure': True,
    'email': ['name.surname@company.com'],
}

dag = DAG(
    'mail-test',
    schedule_interval='@once',
    default_args=default_args,
)

send_mail = EmailOperator(
    task_id='sendmail',
    to='name.surname@company.com',
    subject='TEST Mail from Cloud Composer',
    html_content='Mail Contents',
    dag=dag,
)

failed_bash = BashOperator(
    task_id='run_bash',
    bash_command='exit 1',
    dag=dag,
)

send_mail >> failed_bash

此外,请检查您的电子邮件客户端中的垃圾邮件过滤器。如果仍然失败,我会开始怀疑防火墙规则(如果您已编辑它们)可能导致问题。

让我知道结果。希望对你有帮助。