我如何检查 airflow dag 中的所有任务是否成功?
How do i check if all my tasks in an airflow dag were successful?
我需要检查我的 dag 的所有任务是否都标记为成功,以便在 dag 的最后一个任务中它会向我发送一封电子邮件,通知我是否全部成功或是否失败。
这是我试过的一段代码:
dag_runs = DagRun.find(dag_id=self.dagId)
for dag_run in dag_runs:
if dag_run.state == 'success':
body = f'\nHello , \nHere is the values for the pipeline {self.dagId}: \ncount of lines is {new_lines}, \nMax date is {new_date}. \nRegards!'
else:
body= f'\nHello \nYour dag {self.dagId} has been Failed'
email_text = """\
Subject: %s
\nFrom: %s
\nTo: %s
\n%s
""" % (subject, sent_from, self.to, body)
try:
smtp_server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
smtp_server.ehlo()
smtp_server.login(self.gmail_user, self.gmail_password)
smtp_server.sendmail(sent_from, self.to, email_text)
smtp_server.close()
print ("Email sent successfully!")
except Exception as ex:
print ("Something went wrong….",ex)
我无法检查 dag 状态是否成功。所以我想检查所有任务的状态是否成功
提前感谢您的帮助和建议。
默认情况下,Airflow 中的每个任务都应该成功才能启动下一个任务 运行ning。因此,如果您的电子邮件任务是 DAG 中的最后一个任务,则自动意味着所有之前的任务都已成功。
或者,您可以在 DAG 上配置 on_success_callback
和 on_failure_callback
,这会执行给定的可调用对象。这会传入参数以确定 DAG 运行 是失败还是成功:
def email(dagrun: DagRun, success: bool, reason: str, session: Session):
# send email here...
success
是一个布尔值,表示DAG 运行 success/failure.
我们有一个类似的用例,我们想要确定是否所有任务都是 sucessful
。在 Airflow 中,如果任务失败并且我们有一个 trigger_rule
one_failed,DAG 可以 运行 最终被标记为成功,因为从失败中恢复。
我们使用单个电子邮件实施的解决方案来跟踪所有 task_instances:
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
def check_all_success(**context):
dr: DagRun = context["dag_run"]
ti: TaskInstance = context["ti"]
# here we remove the task currently executing this logic
ti_summary = set([task.state for task in dr.get_task_instances() if task.task_id != ti.task_id])
# Remove success state
ti_summary.remove('success')
# If TI summary had any other state except success, there was an issue in the run
if ti:
# Send email: All tasks in DAG: {dr.dag_id} did not complete successfully
pass
else:
# Send email: All tasks in DAG: {dr.dag_id} completed successfully
pass
check_all_tasks = PythonOperator(
task_id='check_all_tasks',
python_callable=check_all_success,
provide_context=True
)
我需要检查我的 dag 的所有任务是否都标记为成功,以便在 dag 的最后一个任务中它会向我发送一封电子邮件,通知我是否全部成功或是否失败。
这是我试过的一段代码:
dag_runs = DagRun.find(dag_id=self.dagId)
for dag_run in dag_runs:
if dag_run.state == 'success':
body = f'\nHello , \nHere is the values for the pipeline {self.dagId}: \ncount of lines is {new_lines}, \nMax date is {new_date}. \nRegards!'
else:
body= f'\nHello \nYour dag {self.dagId} has been Failed'
email_text = """\
Subject: %s
\nFrom: %s
\nTo: %s
\n%s
""" % (subject, sent_from, self.to, body)
try:
smtp_server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
smtp_server.ehlo()
smtp_server.login(self.gmail_user, self.gmail_password)
smtp_server.sendmail(sent_from, self.to, email_text)
smtp_server.close()
print ("Email sent successfully!")
except Exception as ex:
print ("Something went wrong….",ex)
我无法检查 dag 状态是否成功。所以我想检查所有任务的状态是否成功
提前感谢您的帮助和建议。
默认情况下,Airflow 中的每个任务都应该成功才能启动下一个任务 运行ning。因此,如果您的电子邮件任务是 DAG 中的最后一个任务,则自动意味着所有之前的任务都已成功。
或者,您可以在 DAG 上配置 on_success_callback
和 on_failure_callback
,这会执行给定的可调用对象。这会传入参数以确定 DAG 运行 是失败还是成功:
def email(dagrun: DagRun, success: bool, reason: str, session: Session):
# send email here...
success
是一个布尔值,表示DAG 运行 success/failure.
我们有一个类似的用例,我们想要确定是否所有任务都是 sucessful
。在 Airflow 中,如果任务失败并且我们有一个 trigger_rule
one_failed,DAG 可以 运行 最终被标记为成功,因为从失败中恢复。
我们使用单个电子邮件实施的解决方案来跟踪所有 task_instances:
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
def check_all_success(**context):
dr: DagRun = context["dag_run"]
ti: TaskInstance = context["ti"]
# here we remove the task currently executing this logic
ti_summary = set([task.state for task in dr.get_task_instances() if task.task_id != ti.task_id])
# Remove success state
ti_summary.remove('success')
# If TI summary had any other state except success, there was an issue in the run
if ti:
# Send email: All tasks in DAG: {dr.dag_id} did not complete successfully
pass
else:
# Send email: All tasks in DAG: {dr.dag_id} completed successfully
pass
check_all_tasks = PythonOperator(
task_id='check_all_tasks',
python_callable=check_all_success,
provide_context=True
)