sla_miss_callback 发送有关 Apache Airflow 中缺少任务 SLA 的电子邮件

sla_miss_callback to send email on missing task SLA in Apache Airflow

我有一个由父 DAG B 触发的 DAG A。所以 DAG A 没有在其中定义任何调度间隔

1.I 想在 DAG A 中的一项任务上设置 sla_miss_callback。

2.I 希望在任务未达到 SLA 时收到电子邮件通知。

我尝试了 google 和 Whosebug 中可用的方法。电子邮件未按预期触发。

分享我用于测试的示例代码。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
import logging

def print_sla_miss(**kwargs):
    logging.info("SLA missed")


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email': 'sample@xxx.com',
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0
}

with DAG('sla_test', schedule_interval=None, max_active_runs=1, catchup=False,sla_miss_callback=print_sla_miss, default_args=default_args) as dag:

    sleep = BashOperator(
        task_id='timeout',
        sla=timedelta(seconds=5),
        bash_command='sleep 15',
        retries=0,
        dag=dag,
    )

提前致谢。

SLA 将仅在计划的 DAG 运行中进行评估。由于您有 schedule_interval=None 您设置的 SLA 未针对此 DAG 进行评估。

如果您希望触发的 DAG 完成一定的时间,您可以在父 DAG 的传感器任务中设置该 SLA,以检查子 DAG 何时完成。

另一种可能的解决方法是为子 DAG 完全完成或某个任务 starts/finishes 时设置一个 Slack 通知,这样您就可以评估它是否已经 运行 太久了。

为了实现我的要求,我创建了一个单独的 DAG,它每 5 分钟监视一次任务 运行 状态,并根据 运行 状态通过电子邮件通知 below.To为此,我将主 DAG 的执行日期发送到气流变量。

#importing operators and modules
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.models import Variable
from datetime import datetime,timedelta,timezone
import dateutil

#setting default arguments
default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email': ['abc@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0
}

#getting current status of task in main DAG
exec_date = dateutil.parser.parse(Variable.get('main_dag_execution_date'))
ti = get_task_instance('main_dag', 'task_to_check', exec_date)
state = ti.current_state()
start_date = ti.start_date
end_date = ti.end_date
print("start_date",start_date," end_date",end_date, " execution_date",exec_date)

#deciding the action based on status of the task
def check_task_status(**kwargs):

    if state == 'running' and datetime.now(timezone.utc) > start_date + timedelta(minutes = 10):
        breach_mail = 'breach_mail'
        return breach_mail
    elif state == 'failed':
        failure_mail = 'failure_mail'
        return failure_mail
    else:
        other_state = 'other_state'
        return other_state

#print statement when status is not in breached or failed state
def print_current_state(**context):
    if start_date is None:
        print("task is in wait state")
    else:
        print("task is in " + state + " state")


with DAG('sla_check', schedule_interval='0-59/5 9-23 * * *', max_active_runs=1, catchup=False,default_args=default_args) as dag:

    check_task_status = BranchPythonOperator(task_id='check_task_status', python_callable=check_task_status,
                                    provide_context=True,
                                    dag=dag)

    breach_mail = EmailOperator(task_id='breach_mail', to='Abc@example.com',
                                      subject='SLA for task breached',
                                      html_content="<p>Hi,<br><br>task running belyond SLA<br>", dag=dag)

    failure_mail = EmailOperator(task_id='failure_mail', to='Abc@example.com',
                                      subject='task failed',
                                      html_content="<p>Hi,<br><br>task failed. Please check.<br>", dag=dag)

    other_state = PythonOperator(task_id='other_state', python_callable=print_current_state,
                                    provide_context=True,
                                    dag=dag)

    check_task_status >> breach_mail
    check_task_status >> failure_mail
    check_task_status >> other_state