如何在气流 DAG 中设置一个数字作为重试条件?
How to set a number as retry condition in airflow DAG?
在我的 Airflow DAG
我有 4 tasks
task_1 >> [task_2,task_3]>> task_4
task_4
运行s 仅在 task_2
和 task_3
都成功 运行 之后
如何设置条件,例如:
如果task_2
失败,2分钟后重试task_2
并在第5次尝试后停止重试
这是我的代码:
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
args={
'owner' : 'Anti',
'start_date':days_ago(1)# 1 means yesterday
}
dag = DAG(dag_id='my_sample_dag',default_args=args,schedule_interval='15 * * * *')
def func1(**context):
print("ran task 1")
def func2(**context):
print("ran task 2")
def func3(**context):
print("ran task 3")
def func4(**context):
print("ran task 4")
with dag:
task_1=PythonOperator(
task_id='task1',
python_callable=func1,
provide_context=True,
)
task_2=PythonOperator(
task_id='task2',
python_callable=func2,
provide_context=True
)
task_3=PythonOperator(
task_id='task3',
python_callable=func3,
provide_context=True
)
task_4=PythonOperator(
task_id='task4',
python_callable=func4,
provide_context=True
)
task_1 >> [task_2,task_3]>> task_4 # t2,t3 runs parallel right after t1 has ran
每个运算符都支持 retry_delay
和 retries
- Airflow documention.
retries (int) – the number of retries that should be performed before
failing the task
retry_delay (datetime.timedelta) – delay between retries
如果您想将此应用于所有任务,只需编辑 args 字典即可:
args={
'owner' : 'Anti',
'retries': 5,
'retry_delay': timedelta(minutes=2),
'start_date':days_ago(1)# 1 means yesterday
}
如果您只想将它应用到 task_2,您可以将它直接传递给 PythonOperator
- 在这种情况下,其他任务使用默认设置。
对您的参数发表评论,不建议设置动态相对日期 start_date
,而是设置固定的绝对日期。
在我的 Airflow DAG
我有 4 tasks
task_1 >> [task_2,task_3]>> task_4
task_4
运行s 仅在 task_2
和 task_3
如何设置条件,例如:
如果task_2
失败,2分钟后重试task_2
并在第5次尝试后停止重试
这是我的代码:
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
args={
'owner' : 'Anti',
'start_date':days_ago(1)# 1 means yesterday
}
dag = DAG(dag_id='my_sample_dag',default_args=args,schedule_interval='15 * * * *')
def func1(**context):
print("ran task 1")
def func2(**context):
print("ran task 2")
def func3(**context):
print("ran task 3")
def func4(**context):
print("ran task 4")
with dag:
task_1=PythonOperator(
task_id='task1',
python_callable=func1,
provide_context=True,
)
task_2=PythonOperator(
task_id='task2',
python_callable=func2,
provide_context=True
)
task_3=PythonOperator(
task_id='task3',
python_callable=func3,
provide_context=True
)
task_4=PythonOperator(
task_id='task4',
python_callable=func4,
provide_context=True
)
task_1 >> [task_2,task_3]>> task_4 # t2,t3 runs parallel right after t1 has ran
每个运算符都支持 retry_delay
和 retries
- Airflow documention.
retries (int) – the number of retries that should be performed before failing the task
retry_delay (datetime.timedelta) – delay between retries
如果您想将此应用于所有任务,只需编辑 args 字典即可:
args={
'owner' : 'Anti',
'retries': 5,
'retry_delay': timedelta(minutes=2),
'start_date':days_ago(1)# 1 means yesterday
}
如果您只想将它应用到 task_2,您可以将它直接传递给 PythonOperator
- 在这种情况下,其他任务使用默认设置。
对您的参数发表评论,不建议设置动态相对日期 start_date
,而是设置固定的绝对日期。