Airflow DAG 的任务超时

Task timeout for Airflow DAGs

我的 airflow DAG 中有 运行 5 个 PythonOperator 任务,其中一个正在执行 ETL 作业,这需要很长时间,因此我的所有资源都被阻塞了。有没有一种方法可以设置每个任务的最大执行时间,之后任务要么失败,要么被标记为成功(这样 DAG 就不会失败)并显示一条消息?

查看

简而言之,使用 pools or even specifying a 内置的气流来完成一项任务(而不是整个 DAG)似乎是潜在的解决方案。

在每个运算符中,我们都有一个 execution_timeout 变量,您必须在其中传递一个 datetime.timedelta 对象。

根据 base operator code comments:

:param execution_timeout: max time allowed for the execution of
    this task instance, if it goes beyond it will raise and fail.
:type execution_timeout: datetime.timedelta

还请记住,这将使 DAG 的单个 运行 失败,并将触发重新 运行 并且仅当所有重新 [= 时才被声明为失败的 DAG 23=]s 失败了。

因此,根据您分配的自动重试次数,您可以有一个潜在的最长时间 ( number of retries ) x ( timeout ),以防代码持续花费太长时间。

this documentation 开始,您需要设置 execution_timeout 任务参数,看起来像这样

from datetime import timedelta

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(hours=2),
    timeout=3600,
    retries=2,
    mode="reschedule",
)