如何在气流中而不是 timedelta 中在 SLA 上设置时间对象?

How to set time object on SLA in airflow instead of timedelta?

我正在尝试在我的气流 DAG 中实施 SLA。

我知道 SLA 是如何工作的,你设置了一个 timedelta 对象,如果任务没有在这段时间内完成,它会发送一封电子邮件,通知任务尚未完成。

我想要一些类似的功能,但我不想给出持续时间,而是想在 SLA 中设置具体时间。例如,如果由于 8:00 上午任务未完成,它会发送电子邮件并通知经理。像这样:

'sla': time(hour=8, minute=0, second=0)

我搜索了很多,但没有找到。

这个具体问题有什么解决办法吗?或 SLA 以外的任何其他解决方案?

提前致谢。

SLA BaseOperator 的参数需要一个 datetime.timedelta 对象,所以这里没有什么可做的了。考虑到 SLA 表示计划时间段结束后的时间增量。 docs 中的示例假设每天安排一个 DAG:

For example if you set an SLA of 1 hour, the scheduler would send an email soon after 1:00AM on the 2016-01-02 if the 2016-01-01 instance has not succeeded yet.

重点是,它始终是与计划周期的时间增量,这不是您要找的。

所以我认为你应该采取另一种方法,比如在你需要的时候安排你的 DAG,执行你想要的任务,然后添加一个 sensor operator 来检查你是否满足条件正在寻找是否满足。 sensors 有几种类型,具体取决于您可以从中选择的上下文。

另一种选择是,创建一个新的 DAG 专门用于检查您在原始 DAG 中执行的任务是否已成功执行,并采取相应行动(例如,发送电子邮件等)。为此,您可以使用 ExternalTaskSensor,在线查看有关如何实现它的教程,尽管如 docs.

中所述,避免交叉 DAG 依赖可能更简单。

希望这能为您指明正确的方向。