Airflow Task 不会转移到依赖项上,而是重新运行任务

Airflow Task does not move onto dependency but re-runs task

我有一个包含三个任务的 Airflow 工作流;第二个任务依赖于第一个任务,第三个任务依赖于第二个任务。

如果我通过网络服务器运行 DAG,第一个任务完成但随后开始重新运行而不是触发第二个任务。要记住的一件事是第一个任务确实需要超过 130 秒才能 运行。这是因为第一个任务的持续时间吗?

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime


default_args = {
    'owner': 'David',
    'depends_on_past': True,
    'start_date': datetime(2018,5,18),
    'email': ['email_address'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'DCM_Floodlight_Report_API',
    default_args=default_args,
    description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
    schedule_interval='30 14 * * *')

t1 = BashOperator(
    task_id='Pull_DCM_Report',
    bash_command='python "/Users/run_report.py" 2737542 134267867', dag=dag)

t2 = BashOperator(
    task_id='Cleanse_File',
    bash_command='python "/Users/cleanse_file.py"',dag=dag)

t3 = BashOperator(
    task_id='S3_Bucket_Creation_Upload_File',
    bash_command='python "/Users/aws_s3_creation&load.py"',dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t2)

我认为 运行您的任务时间不是问题。 - 此行为很可能是由于 catchup 参数造成的,该参数默认为 True.

https://airflow.apache.org/scheduler.html#backfill-and-catchup

这意味着 Airflow 正在为您的 start_date 和当前时间之间的每个计划间隔安排第一个任务。 您可以在 UI 中查看您的树视图,看看是否安排了多个 DagRun。如果您只是测试 DAG,我建议在测试时将 schedule_interval 设置为 @once,然后再将其安排为 运行 用于过去或未来的日期。

在没有重试逻辑的情况下尝试一下,看看它的表现如何。使用这些默认参数和 dag 信息:

`default_args = {
'owner': 'David',
'depends_on_past': False,
'start_date': datetime(2018,5,18),
'email': ['email_address'],
'email_on_failure': True,
'email_on_retry': True
}

dag = DAG(
 dag_id='DCM_Floodlight_Report_API',
 default_args=default_args,
 catchup=False,
 description='Pull ABG DCM Floodlight report. Then upload into S3 bucket.',
 schedule_interval='30 14 * * *')

我添加了 catchup 并将其设置为 False,并将 depends_on_past 更改为 False。我也删除了重试逻辑。这可能会解决您的问题 - 请告诉我!