为气流传感器的执行字段设置默认值
Set a default value for an execution field of airflow sensors
如果你了解 Airflow,我的问题就相当简单了。我想为执行字段 execution_timeout
设置除 None 之外的默认值。我想知道这是否可能,是否是如何做到的。
如果不可能,唯一的方法是为每个传感器设置 execution_timeout=timedelta(hours=2)
吗?
谢谢。
你可以在default_args
中设置一次,然后它将成为你DAG中所有sensor/operators的默认值,你也可以在特定的operator/sensor中覆盖:
default_args = {
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id="my_dag",
default_args=default_args,
...,
) as dag:
op1 = MyOperator(task_id="task_op_1")
se1 = MySensor(task_id="task_sensor_1")
op1 >> se1
op2 = MyOperator(task_id="task_op_2")
se2 = MySensor(task_id="task_sensor_2", execution_timeout=timedelta(hours=1)) # override
op2 >> se2
在上面的示例中,task_op_1
、task_sensor_1
、task_op_2
将有 execution_timeout
个 2 小时,而 task_sensor_2
将有 execution_timeout
个1小时。
请注意,对于 Airflow>=2.3.0
,也可以通过在 airflow.cfg 中设置 default_task_execution_timeout
和
来定义全局 execution_timeout
如果你了解 Airflow,我的问题就相当简单了。我想为执行字段 execution_timeout
设置除 None 之外的默认值。我想知道这是否可能,是否是如何做到的。
如果不可能,唯一的方法是为每个传感器设置 execution_timeout=timedelta(hours=2)
吗?
谢谢。
你可以在default_args
中设置一次,然后它将成为你DAG中所有sensor/operators的默认值,你也可以在特定的operator/sensor中覆盖:
default_args = {
'execution_timeout': timedelta(hours=2),
}
with DAG(
dag_id="my_dag",
default_args=default_args,
...,
) as dag:
op1 = MyOperator(task_id="task_op_1")
se1 = MySensor(task_id="task_sensor_1")
op1 >> se1
op2 = MyOperator(task_id="task_op_2")
se2 = MySensor(task_id="task_sensor_2", execution_timeout=timedelta(hours=1)) # override
op2 >> se2
在上面的示例中,task_op_1
、task_sensor_1
、task_op_2
将有 execution_timeout
个 2 小时,而 task_sensor_2
将有 execution_timeout
个1小时。
请注意,对于 Airflow>=2.3.0
,也可以通过在 airflow.cfg 中设置 default_task_execution_timeout
和
execution_timeout