Airflow:从单独的文件创建 DAG

Airflow: Create DAG from a separate file

在气流中,我正在尝试制作一个专门用于在文件中生成 DAG 的函数:

dynamic_dags.py:

def generate_dag(name):
    with DAG(
        dag_id=f'dag_{name}',
        default_args=args,
        start_date=days_ago(2),
        schedule_interval='5 5 * * *',
        tags=['Test'],
        catchup=False
    ) as dag:
        dummy_task=DummyOperator(
            task_id="dynamic_dummy_task",
            dag=dag
        )
    return dag

然后在另一个文件中我试图从一个单独的文件中导入 dags:

load_dags.py:

from dynamic_dag import generate_dag
globals()["Dynamic_DAG_A"] = generate_dag('A')

但是,dags 没有在网络上显示 UI。 但是如果我按照下面的代码在一个文件中执行它们,它将起作用:

def generate_dag(name):
    with DAG(
        dag_id=f'dag_{name}',
        default_args=args,
        start_date=days_ago(2),
        schedule_interval='5 5 * * *',
        tags=['Test'],
        catchup=False
    ) as dag:
        dummy_task=DummyOperator(
            task_id="dynamic_dummy_task",
            dag=dag
        )
    return dag

globals()["Dynamic_DAG_A"] = generate_dag('A')

我想知道为什么在两个单独的文件中执行此操作不起作用。

我认为如果您使用的是 Airflow 1.10,那么 DAG 文件应该包含 DAG 和 airlfow:

https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html?highlight=airflowignore#dags

When searching for DAGs, Airflow only considers python files that contain the strings “airflow” and “DAG” by default. To consider all python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.

在 Airflow 2 中它已被更改(略微 - dag 不区分大小写):

https://airflow.apache.org/docs/apache-airflow/2.2.2/concepts/dags.html

When searching for DAGs inside the DAG_FOLDER, Airflow only considers Python files that contain the strings airflow and dag (case-insensitively) as an optimization. To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.

我认为您只是在 load_dags.py 中错过了 'airflow'。您可以在任何地方添加它 - 包括评论。