Airflow Task 不会转移到依赖项上,而是重新运行任务
Airflow Task does not move onto dependency but re-runs task
我有一个包含三个任务的 Airflow 工作流;第二个任务依赖于第一个任务,第三个任务依赖于第二个任务。
如果我通过网络服务器运行 DAG,第一个任务完成但随后开始重新运行而不是触发第二个任务。要记住的一件事是第一个任务确实需要超过 130 秒才能 运行。这是因为第一个任务的持续时间吗?
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
default_args = {
'owner': 'David',
'depends_on_past': True,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'DCM_Floodlight_Report_API',
default_args=default_args,
description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
schedule_interval='30 14 * * *')
t1 = BashOperator(
task_id='Pull_DCM_Report',
bash_command='python "/Users/run_report.py" 2737542 134267867', dag=dag)
t2 = BashOperator(
task_id='Cleanse_File',
bash_command='python "/Users/cleanse_file.py"',dag=dag)
t3 = BashOperator(
task_id='S3_Bucket_Creation_Upload_File',
bash_command='python "/Users/aws_s3_creation&load.py"',dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
我认为 运行您的任务时间不是问题。
- 此行为很可能是由于 catchup
参数造成的,该参数默认为 True
.
https://airflow.apache.org/scheduler.html#backfill-and-catchup
这意味着 Airflow 正在为您的 start_date
和当前时间之间的每个计划间隔安排第一个任务。
您可以在 UI 中查看您的树视图,看看是否安排了多个 DagRun。如果您只是测试 DAG,我建议在测试时将 schedule_interval 设置为 @once
,然后再将其安排为 运行 用于过去或未来的日期。
在没有重试逻辑的情况下尝试一下,看看它的表现如何。使用这些默认参数和 dag 信息:
`default_args = {
'owner': 'David',
'depends_on_past': False,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(
dag_id='DCM_Floodlight_Report_API',
default_args=default_args,
catchup=False,
description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
schedule_interval='30 14 * * *')
我添加了 catchup
并将其设置为 False,并将 depends_on_past
更改为 False。我也删除了重试逻辑。这可能会解决您的问题 - 请告诉我!
我有一个包含三个任务的 Airflow 工作流;第二个任务依赖于第一个任务,第三个任务依赖于第二个任务。
如果我通过网络服务器运行 DAG,第一个任务完成但随后开始重新运行而不是触发第二个任务。要记住的一件事是第一个任务确实需要超过 130 秒才能 运行。这是因为第一个任务的持续时间吗?
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
default_args = {
'owner': 'David',
'depends_on_past': True,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'DCM_Floodlight_Report_API',
default_args=default_args,
description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
schedule_interval='30 14 * * *')
t1 = BashOperator(
task_id='Pull_DCM_Report',
bash_command='python "/Users/run_report.py" 2737542 134267867', dag=dag)
t2 = BashOperator(
task_id='Cleanse_File',
bash_command='python "/Users/cleanse_file.py"',dag=dag)
t3 = BashOperator(
task_id='S3_Bucket_Creation_Upload_File',
bash_command='python "/Users/aws_s3_creation&load.py"',dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
我认为 运行您的任务时间不是问题。
- 此行为很可能是由于 catchup
参数造成的,该参数默认为 True
.
https://airflow.apache.org/scheduler.html#backfill-and-catchup
这意味着 Airflow 正在为您的 start_date
和当前时间之间的每个计划间隔安排第一个任务。
您可以在 UI 中查看您的树视图,看看是否安排了多个 DagRun。如果您只是测试 DAG,我建议在测试时将 schedule_interval 设置为 @once
,然后再将其安排为 运行 用于过去或未来的日期。
在没有重试逻辑的情况下尝试一下,看看它的表现如何。使用这些默认参数和 dag 信息:
`default_args = {
'owner': 'David',
'depends_on_past': False,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG(
dag_id='DCM_Floodlight_Report_API',
default_args=default_args,
catchup=False,
description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
schedule_interval='30 14 * * *')
我添加了 catchup
并将其设置为 False,并将 depends_on_past
更改为 False。我也删除了重试逻辑。这可能会解决您的问题 - 请告诉我!