我如何检查 airflow dag 中的所有任务是否成功?

How do i check if all my tasks in an airflow dag were successful?

我需要检查我的 dag 的所有任务是否都标记为成功,以便在 dag 的最后一个任务中它会向我发送一封电子邮件,通知我是否全部成功或是否失败。

这是我试过的一段代码:

dag_runs = DagRun.find(dag_id=self.dagId)
for dag_run in dag_runs:
                
   if dag_run.state == 'success':
       body = f'\nHello , \nHere is the values for the pipeline {self.dagId}: \ncount of lines is {new_lines}, \nMax  date is {new_date}. \nRegards!'
   else: 
       body= f'\nHello  \nYour dag {self.dagId} has been Failed'   

email_text = """\
        Subject: %s    
        \nFrom: %s
        \nTo: %s
        
        \n%s
        """ % (subject, sent_from, self.to, body)

   
try:
   smtp_server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
   smtp_server.ehlo()
   smtp_server.login(self.gmail_user, self.gmail_password)
   smtp_server.sendmail(sent_from, self.to, email_text)
   smtp_server.close()
   print ("Email sent successfully!")

except Exception as ex:
   print ("Something went wrong….",ex)

我无法检查 dag 状态是否成功。所以我想检查所有任务的状态是否成功

提前感谢您的帮助和建议。

默认情况下,Airflow 中的每个任务都应该成功才能启动下一个任务 运行ning。因此,如果您的电子邮件任务是 DAG 中的最后一个任务,则自动意味着所有之前的任务都已成功。

或者,您可以在 DAG 上配置 on_success_callbackon_failure_callback,这会执行给定的可调用对象。这会传入参数以确定 DAG 运行 是失败还是成功:

def email(dagrun: DagRun, success: bool, reason: str, session: Session):
    # send email here...

success是一个布尔值,表示DAG 运行 success/failure.

我们有一个类似的用例,我们想要确定是否所有任务都是 sucessful。在 Airflow 中,如果任务失败并且我们有一个 trigger_rule one_failed,DAG 可以 运行 最终被标记为成功,因为从失败中恢复。

我们使用单个电子邮件实施的解决方案来跟踪所有 task_instances:

from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance

def check_all_success(**context):
    dr: DagRun = context["dag_run"]
    ti: TaskInstance = context["ti"]
    
    # here we remove the task currently executing this logic
    ti_summary = set([task.state for task in dr.get_task_instances() if task.task_id != ti.task_id])

    # Remove success state
    ti_summary.remove('success')

    # If TI summary had any other state except success, there was an issue in the run
    if ti:
        # Send email: All tasks in DAG: {dr.dag_id} did not complete successfully
        pass
    else:
        # Send email: All tasks in DAG: {dr.dag_id} completed successfully
        pass

check_all_tasks = PythonOperator(
    task_id='check_all_tasks',
    python_callable=check_all_success,
    provide_context=True
)