Airflow DAG 的任务超时
Task timeout for Airflow DAGs
我的 airflow DAG 中有 运行 5 个 PythonOperator 任务,其中一个正在执行 ETL 作业,这需要很长时间,因此我的所有资源都被阻塞了。有没有一种方法可以设置每个任务的最大执行时间,之后任务要么失败,要么被标记为成功(这样 DAG 就不会失败)并显示一条消息?
查看
简而言之,使用 pools or even specifying a 内置的气流来完成一项任务(而不是整个 DAG)似乎是潜在的解决方案。
在每个运算符中,我们都有一个 execution_timeout
变量,您必须在其中传递一个 datetime.timedelta
对象。
根据 base operator code comments:
:param execution_timeout: max time allowed for the execution of
this task instance, if it goes beyond it will raise and fail.
:type execution_timeout: datetime.timedelta
还请记住,这将使 DAG 的单个 运行 失败,并将触发重新 运行 并且仅当所有重新 [= 时才被声明为失败的 DAG 23=]s 失败了。
因此,根据您分配的自动重试次数,您可以有一个潜在的最长时间 ( number of retries ) x ( timeout )
,以防代码持续花费太长时间。
从 this documentation 开始,您需要设置 execution_timeout
任务参数,看起来像这样
from datetime import timedelta
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(hours=2),
timeout=3600,
retries=2,
mode="reschedule",
)
我的 airflow DAG 中有 运行 5 个 PythonOperator 任务,其中一个正在执行 ETL 作业,这需要很长时间,因此我的所有资源都被阻塞了。有没有一种方法可以设置每个任务的最大执行时间,之后任务要么失败,要么被标记为成功(这样 DAG 就不会失败)并显示一条消息?
查看
简而言之,使用 pools or even specifying a
在每个运算符中,我们都有一个 execution_timeout
变量,您必须在其中传递一个 datetime.timedelta
对象。
根据 base operator code comments:
:param execution_timeout: max time allowed for the execution of
this task instance, if it goes beyond it will raise and fail.
:type execution_timeout: datetime.timedelta
还请记住,这将使 DAG 的单个 运行 失败,并将触发重新 运行 并且仅当所有重新 [= 时才被声明为失败的 DAG 23=]s 失败了。
因此,根据您分配的自动重试次数,您可以有一个潜在的最长时间 ( number of retries ) x ( timeout )
,以防代码持续花费太长时间。
从 this documentation 开始,您需要设置 execution_timeout
任务参数,看起来像这样
from datetime import timedelta
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(hours=2),
timeout=3600,
retries=2,
mode="reschedule",
)