sla_miss_callback 发送有关 Apache Airflow 中缺少任务 SLA 的电子邮件
sla_miss_callback to send email on missing task SLA in Apache Airflow
我有一个由父 DAG B 触发的 DAG A。所以 DAG A 没有在其中定义任何调度间隔。
1.I 想在 DAG A 中的一项任务上设置 sla_miss_callback。
2.I 希望在任务未达到 SLA 时收到电子邮件通知。
我尝试了 google 和 Whosebug 中可用的方法。电子邮件未按预期触发。
分享我用于测试的示例代码。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
import logging
def print_sla_miss(**kwargs):
logging.info("SLA missed")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': 'sample@xxx.com',
'email_on_failure': True,
'email_on_retry': False,
'retries': 0
}
with DAG('sla_test', schedule_interval=None, max_active_runs=1, catchup=False,sla_miss_callback=print_sla_miss, default_args=default_args) as dag:
sleep = BashOperator(
task_id='timeout',
sla=timedelta(seconds=5),
bash_command='sleep 15',
retries=0,
dag=dag,
)
提前致谢。
SLA 将仅在计划的 DAG 运行中进行评估。由于您有 schedule_interval=None
您设置的 SLA 未针对此 DAG 进行评估。
如果您希望触发的 DAG 完成一定的时间,您可以在父 DAG 的传感器任务中设置该 SLA,以检查子 DAG 何时完成。
另一种可能的解决方法是为子 DAG 完全完成或某个任务 starts/finishes 时设置一个 Slack 通知,这样您就可以评估它是否已经 运行 太久了。
为了实现我的要求,我创建了一个单独的 DAG,它每 5 分钟监视一次任务 运行 状态,并根据 运行 状态通过电子邮件通知 below.To为此,我将主 DAG 的执行日期发送到气流变量。
#importing operators and modules
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.models import Variable
from datetime import datetime,timedelta,timezone
import dateutil
#setting default arguments
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': ['abc@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0
}
#getting current status of task in main DAG
exec_date = dateutil.parser.parse(Variable.get('main_dag_execution_date'))
ti = get_task_instance('main_dag', 'task_to_check', exec_date)
state = ti.current_state()
start_date = ti.start_date
end_date = ti.end_date
print("start_date",start_date," end_date",end_date, " execution_date",exec_date)
#deciding the action based on status of the task
def check_task_status(**kwargs):
if state == 'running' and datetime.now(timezone.utc) > start_date + timedelta(minutes = 10):
breach_mail = 'breach_mail'
return breach_mail
elif state == 'failed':
failure_mail = 'failure_mail'
return failure_mail
else:
other_state = 'other_state'
return other_state
#print statement when status is not in breached or failed state
def print_current_state(**context):
if start_date is None:
print("task is in wait state")
else:
print("task is in " + state + " state")
with DAG('sla_check', schedule_interval='0-59/5 9-23 * * *', max_active_runs=1, catchup=False,default_args=default_args) as dag:
check_task_status = BranchPythonOperator(task_id='check_task_status', python_callable=check_task_status,
provide_context=True,
dag=dag)
breach_mail = EmailOperator(task_id='breach_mail', to='Abc@example.com',
subject='SLA for task breached',
html_content="<p>Hi,<br><br>task running belyond SLA<br>", dag=dag)
failure_mail = EmailOperator(task_id='failure_mail', to='Abc@example.com',
subject='task failed',
html_content="<p>Hi,<br><br>task failed. Please check.<br>", dag=dag)
other_state = PythonOperator(task_id='other_state', python_callable=print_current_state,
provide_context=True,
dag=dag)
check_task_status >> breach_mail
check_task_status >> failure_mail
check_task_status >> other_state
我有一个由父 DAG B 触发的 DAG A。所以 DAG A 没有在其中定义任何调度间隔。
1.I 想在 DAG A 中的一项任务上设置 sla_miss_callback。
2.I 希望在任务未达到 SLA 时收到电子邮件通知。
我尝试了 google 和 Whosebug 中可用的方法。电子邮件未按预期触发。
分享我用于测试的示例代码。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
import logging
def print_sla_miss(**kwargs):
logging.info("SLA missed")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': 'sample@xxx.com',
'email_on_failure': True,
'email_on_retry': False,
'retries': 0
}
with DAG('sla_test', schedule_interval=None, max_active_runs=1, catchup=False,sla_miss_callback=print_sla_miss, default_args=default_args) as dag:
sleep = BashOperator(
task_id='timeout',
sla=timedelta(seconds=5),
bash_command='sleep 15',
retries=0,
dag=dag,
)
提前致谢。
SLA 将仅在计划的 DAG 运行中进行评估。由于您有 schedule_interval=None
您设置的 SLA 未针对此 DAG 进行评估。
如果您希望触发的 DAG 完成一定的时间,您可以在父 DAG 的传感器任务中设置该 SLA,以检查子 DAG 何时完成。
另一种可能的解决方法是为子 DAG 完全完成或某个任务 starts/finishes 时设置一个 Slack 通知,这样您就可以评估它是否已经 运行 太久了。
为了实现我的要求,我创建了一个单独的 DAG,它每 5 分钟监视一次任务 运行 状态,并根据 运行 状态通过电子邮件通知 below.To为此,我将主 DAG 的执行日期发送到气流变量。
#importing operators and modules
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.models import Variable
from datetime import datetime,timedelta,timezone
import dateutil
#setting default arguments
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': ['abc@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0
}
#getting current status of task in main DAG
exec_date = dateutil.parser.parse(Variable.get('main_dag_execution_date'))
ti = get_task_instance('main_dag', 'task_to_check', exec_date)
state = ti.current_state()
start_date = ti.start_date
end_date = ti.end_date
print("start_date",start_date," end_date",end_date, " execution_date",exec_date)
#deciding the action based on status of the task
def check_task_status(**kwargs):
if state == 'running' and datetime.now(timezone.utc) > start_date + timedelta(minutes = 10):
breach_mail = 'breach_mail'
return breach_mail
elif state == 'failed':
failure_mail = 'failure_mail'
return failure_mail
else:
other_state = 'other_state'
return other_state
#print statement when status is not in breached or failed state
def print_current_state(**context):
if start_date is None:
print("task is in wait state")
else:
print("task is in " + state + " state")
with DAG('sla_check', schedule_interval='0-59/5 9-23 * * *', max_active_runs=1, catchup=False,default_args=default_args) as dag:
check_task_status = BranchPythonOperator(task_id='check_task_status', python_callable=check_task_status,
provide_context=True,
dag=dag)
breach_mail = EmailOperator(task_id='breach_mail', to='Abc@example.com',
subject='SLA for task breached',
html_content="<p>Hi,<br><br>task running belyond SLA<br>", dag=dag)
failure_mail = EmailOperator(task_id='failure_mail', to='Abc@example.com',
subject='task failed',
html_content="<p>Hi,<br><br>task failed. Please check.<br>", dag=dag)
other_state = PythonOperator(task_id='other_state', python_callable=print_current_state,
provide_context=True,
dag=dag)
check_task_status >> breach_mail
check_task_status >> failure_mail
check_task_status >> other_state