无法使用 Sendgrid 在 GCP 作曲家中获取任务失败、成功和重试通知
Unable to get task failure, success and retry notifications in GCP composer using Sendgrid
我想在 GCP composer 中使用 Sendgrid 接收有关任务成功、失败和重试的电子邮件通知。
目前,我的 DAG 中的所有任务都已 运行 成功。在那种情况下我想收到通知。
此外,当某些任务失败或重试时,我也想收到这些通知。我执行了以下步骤,当我强制任务失败时没有收到任何通知。
- 创建了 GCP Composer 环境,添加了环境变量。
SENDGRID_MAIL_FROM : abc@gmail.com
SENDGRID_API_KEY :
- 创建于 DAG 之后。
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.operators.email_operator import EmailOperator
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2020, 3, 30),
'email': ['abc@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
schedule_interval = "05 23 * * *"
dag = DAG(
'DAG_NAME',
default_args=default_args,
schedule_interval=schedule_interval
)
# Config variables
BQ_CONN_ID = ""
BQ_PROJECT = ""
BQ_DATASET = ""
## Task 1
t1 = BigQueryCheckOperator(----)
## Task 2
t2 = BigQueryCheckOperator(----)
## Task 3
t3 = BigQueryOperator(----)
t4 = EmailOperator(
task_id='send_email',
to='abc@gmail.com',
subject='Airflow Alert',
html_content=""" <h3>Email Test</h3> """,
dag=dag
)
# Setting up Dependencies
t1>>t2>>t3>>t4
我错过了什么吗?请告诉我需要做什么,谢谢。
首先,您需要检查您使用的是哪个版本的 Composer 和 Sendgrid。
例如,airflow-1.10.3
支持的最新版本的 Sendgrid 是 v5.6.0
。您可以参考气流的 setup.py 以了解为特定气流版本安装了哪些依赖项。
我建议您再次检查 the instructions 以使用 Cloud Composer 设置 Sendgrid。确保几件事:
- 您按照指南设置了环境变量,要将 Sendgrid 配置为您的电子邮件服务器,您需要获得您的
SENDGRID_API_KEY
(您是否已在正确的权限下生成它?至少,密钥必须有 Mail send
发送电子邮件的权限)和 SENDGRID_MAIL_FROM
(结构是否正确?noreply-composer@
)作为环境变量。
- 在您的
airflow.cfg
文件中,检查 email_backend
变量是否设置为使用 Sendgrid:
email_backend = airflow.contrib.utils.sendgrid.send_email
- 尝试发送测试 DAG,如指南所述,例如您可以使用:
from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'name.surname',
'start_date': days_ago(1),
'email_on_failure': True,
'email': ['name.surname@company.com'],
}
dag = DAG(
'mail-test',
schedule_interval='@once',
default_args=default_args,
)
send_mail = EmailOperator(
task_id='sendmail',
to='name.surname@company.com',
subject='TEST Mail from Cloud Composer',
html_content='Mail Contents',
dag=dag,
)
failed_bash = BashOperator(
task_id='run_bash',
bash_command='exit 1',
dag=dag,
)
send_mail >> failed_bash
此外,请检查您的电子邮件客户端中的垃圾邮件过滤器。如果仍然失败,我会开始怀疑防火墙规则(如果您已编辑它们)可能导致问题。
让我知道结果。希望对你有帮助。
我想在 GCP composer 中使用 Sendgrid 接收有关任务成功、失败和重试的电子邮件通知。
目前,我的 DAG 中的所有任务都已 运行 成功。在那种情况下我想收到通知。
此外,当某些任务失败或重试时,我也想收到这些通知。我执行了以下步骤,当我强制任务失败时没有收到任何通知。
- 创建了 GCP Composer 环境,添加了环境变量。
SENDGRID_MAIL_FROM : abc@gmail.com
SENDGRID_API_KEY :
- 创建于 DAG 之后。
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
from airflow.operators.email_operator import EmailOperator
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2020, 3, 30),
'email': ['abc@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
schedule_interval = "05 23 * * *"
dag = DAG(
'DAG_NAME',
default_args=default_args,
schedule_interval=schedule_interval
)
# Config variables
BQ_CONN_ID = ""
BQ_PROJECT = ""
BQ_DATASET = ""
## Task 1
t1 = BigQueryCheckOperator(----)
## Task 2
t2 = BigQueryCheckOperator(----)
## Task 3
t3 = BigQueryOperator(----)
t4 = EmailOperator(
task_id='send_email',
to='abc@gmail.com',
subject='Airflow Alert',
html_content=""" <h3>Email Test</h3> """,
dag=dag
)
# Setting up Dependencies
t1>>t2>>t3>>t4
我错过了什么吗?请告诉我需要做什么,谢谢。
首先,您需要检查您使用的是哪个版本的 Composer 和 Sendgrid。
例如,airflow-1.10.3
支持的最新版本的 Sendgrid 是 v5.6.0
。您可以参考气流的 setup.py 以了解为特定气流版本安装了哪些依赖项。
我建议您再次检查 the instructions 以使用 Cloud Composer 设置 Sendgrid。确保几件事:
- 您按照指南设置了环境变量,要将 Sendgrid 配置为您的电子邮件服务器,您需要获得您的
SENDGRID_API_KEY
(您是否已在正确的权限下生成它?至少,密钥必须有Mail send
发送电子邮件的权限)和SENDGRID_MAIL_FROM
(结构是否正确?noreply-composer@
)作为环境变量。 - 在您的
airflow.cfg
文件中,检查email_backend
变量是否设置为使用 Sendgrid:
email_backend = airflow.contrib.utils.sendgrid.send_email
- 尝试发送测试 DAG,如指南所述,例如您可以使用:
from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'name.surname',
'start_date': days_ago(1),
'email_on_failure': True,
'email': ['name.surname@company.com'],
}
dag = DAG(
'mail-test',
schedule_interval='@once',
default_args=default_args,
)
send_mail = EmailOperator(
task_id='sendmail',
to='name.surname@company.com',
subject='TEST Mail from Cloud Composer',
html_content='Mail Contents',
dag=dag,
)
failed_bash = BashOperator(
task_id='run_bash',
bash_command='exit 1',
dag=dag,
)
send_mail >> failed_bash
此外,请检查您的电子邮件客户端中的垃圾邮件过滤器。如果仍然失败,我会开始怀疑防火墙规则(如果您已编辑它们)可能导致问题。
让我知道结果。希望对你有帮助。