气流传感器任务只等待一定时间
Airflow sensor task only wait for a certain period
我正在尝试弄清楚如何实现工作流,以便传感器任务等待外部 dag 完成,只等待一定天数。这是一项日常工作,所以我希望传感器工作等待 3 天,然后在第四天发送电子邮件,然后等待或执行其他任务。
有人可以帮助阐明如何实现这一目标吗?另外,我们如何将“天数计数器”从一天传递到另一天?非常感谢您的帮助。
您可以使用具有以下配置的 ExternalTaskSensor
:
timeout = 3 * 24 * 60 * 60
- 3 天秒,之后传感器将失效
poke_interval = 12 * 60 * 60
- 传感器检查之间间隔 12 小时,您可以将其调整为每小时检查一次。会减少检查外部 dag 状态的次数
mode = "reschedule"
- 这样传感器将不会占用 worker slot 3 天,它将被调度、执行,如果不满足条件,它将被重新安排在下一个执行 [=14= 】 秒。将此模式用于长时间 运行 任务是一种很好的做法。
此外,您可以将等待的 DAG 构建为 wait_task >> [success_task , fail_task]
,其中
wait_task
是你的传感器
success_task
具有触发规则 all_success
并在传感器成功时遵循
fail_task
使用 all_failed
触发规则并处理当传感器最终 return false 或超时 时的场景
我正在尝试弄清楚如何实现工作流,以便传感器任务等待外部 dag 完成,只等待一定天数。这是一项日常工作,所以我希望传感器工作等待 3 天,然后在第四天发送电子邮件,然后等待或执行其他任务。
有人可以帮助阐明如何实现这一目标吗?另外,我们如何将“天数计数器”从一天传递到另一天?非常感谢您的帮助。
您可以使用具有以下配置的 ExternalTaskSensor
:
timeout = 3 * 24 * 60 * 60
- 3 天秒,之后传感器将失效poke_interval = 12 * 60 * 60
- 传感器检查之间间隔 12 小时,您可以将其调整为每小时检查一次。会减少检查外部 dag 状态的次数mode = "reschedule"
- 这样传感器将不会占用 worker slot 3 天,它将被调度、执行,如果不满足条件,它将被重新安排在下一个执行 [=14= 】 秒。将此模式用于长时间 运行 任务是一种很好的做法。
此外,您可以将等待的 DAG 构建为 wait_task >> [success_task , fail_task]
,其中
wait_task
是你的传感器success_task
具有触发规则all_success
并在传感器成功时遵循fail_task
使用all_failed
触发规则并处理当传感器最终 return false 或超时 时的场景