当 catchup 为 True 时,Airflow 任务设置为“no_status”

Airflow tasks set to `no_status` when catchup is True

我正在尝试配置一系列 Airflow 任务来回填一些数据 (catchup=True)。 DAG 部署并取消暂停后,第一个作业成功运行,但所有后续运行的任务都设置为 no_status,并且它们永远不会启动。

我尝试了重命名 DAG、重新启动 Airflow 服务器和调度程序、清除旧日志的变体,但我在这里没有取得任何进展。

想法?

DAG代码:

default_args = {
    "owner": "me",
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    "sla": timedelta(hours=1),
    "start_date": "2021-01-01T00:00",
}

dag = DAG(
    catchup=True,
    dag_id="ingest_dag_testing_6",
    dagrun_timeout=timedelta(hours=1),
    default_args=default_args,
    max_active_runs=1,
    schedule_interval="30 * * * *",
)

DATA_SOURCE_TYPES = [
    {
        "target_name": "task_a",
        "children": [
            {
                "target_name": "subtask_a1",
            },
            {
                "target_name": "subtask_a2",
            },
        ],
    }
]

with dag:
    for dst in DATA_SOURCE_TYPES:
        sub_ingest_tasks = []

        ingest_task = PythonOperator(
            task_id=f"ingest_{dst.get('target_name')}",
            python_callable=run_ingestion,
            op_args=[logger, exe_date, dst],
        )
        if dst.get("children"):
            for sdst in dst.get("children"):
                sub_ingest_tasks.append(
                    PythonOperator(
                        task_id=f"ingest_{sdst.get('target_name')}",
                        python_callable=run_ingestion,
                        op_args=[logger, exe_date, sdst],
                    )
                )

        ingest_task >> sub_ingest_tasks

你的代码执行得很好。

我根据您的代码创建了一个可运行的示例(因为它缺少 imports/callables):

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta


def run_ingestion(**context):
    print("Hello World")

default_args = {
    "owner": "me",
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    "sla": timedelta(hours=1),
    "start_date": "2021-01-01T00:00",
}

dag = DAG(
    catchup=True,
    dag_id="ingest_dag_testing_6",
    dagrun_timeout=timedelta(hours=1),
    default_args=default_args,
    max_active_runs=1,
    schedule_interval="30 * * * *",
)

DATA_SOURCE_TYPES = [
    {
        "target_name": "task_a",
        "children": [
            {
                "target_name": "subtask_a1",
            },
            {
                "target_name": "subtask_a2",
            },
        ],
    }
]

with dag:
    for dst in DATA_SOURCE_TYPES:
        sub_ingest_tasks = []

        ingest_task = PythonOperator(
            task_id=f"ingest_{dst.get('target_name')}",
            python_callable=run_ingestion,
            #op_args=[logger, exe_date, dst],
        )
        if dst.get("children"):
            for sdst in dst.get("children"):
                sub_ingest_tasks.append(
                    PythonOperator(
                        task_id=f"ingest_{sdst.get('target_name')}",
                        python_callable=run_ingestion,
                        #op_args=[logger, exe_date, sdst],
                    )
                )

        ingest_task >> sub_ingest_tasks

您可以看到它工作正常:

如果您是 运行 旧 Airflow 版本,更改 dag_id 可能会解决问题。可能是有一些与此 dag_id 相关的数据库记录的旧痕迹未正确清理。调度程序在以后的版本中进行了重大重构。

如果上述方法没有帮助,可能唯一的解决方案是升级到最新的 Airflow 版本,因为它可能是旧版本中的一个错误,该错误已在此过程中得到修复(因为您共享的代码不会重现该问题您在最新的 Airflow 版本中描述。