Airflow 中的延迟通知系统

Delay Notification system in Airflow

我正在尝试在 DAG 或任务失败或成功或延迟时实施 Airflow alerting/Notification 系统。我成功地实现了失败和成功通知,并且还成功地使用 SLA 部分地实现了延迟通知系统,但是 SLA 的局限性在于它只适用于预定的 DAG,我们系统中的许多 DAG 都是基于触发器的。有没有办法在 Airflow 中没有 SLA 的情况下实施延迟通知系统?提前致谢!

服务水平协议 (SLA) 提供了在任务超过其从 DAG 执行开始的预期时间范围的情况下发送电子邮件的功能,并根据 official documentation and this medium article, Airflow currently doesn't support this feature in triggered DAGs; in which case, I recommend you open an improvement issue in the Airflow Issue Tracker 请求此功能支持。

另一方面,您可以使用以下方法在 DAG 完成 运行 之后通过 on_success_callback 验证 SLA:

def success_function (context):
   s_date = context.get('task_instance').start_date.replace(tzinfo = None)
   e_date = datetime.now().replace(tzinfo = None)
  
   execution_time = e_date - s_date
   sla = timedelta (seconds = 30)
 
   if execution_time> sla:
       print('send_email')
   else:
       print('no send email')

t1 = BashOperator (
   task_id = 'print_date',
   bash_command = 'date',
   dag = dag,
   on_success_callback = success_function
)

如果您实施此解决方法,请牢记以下注意事项:

  • 上下文不包含任务中配置的SLA信息;因此,有必要在函数中为每个要验证 SLA 的任务指定它
  • 错过的 SLA 不会出现在 Airflow 中 UI
  • 需要创建发送邮件的逻辑