Airflow:从单独的文件创建 DAG
Airflow: Create DAG from a separate file
在气流中,我正在尝试制作一个专门用于在文件中生成 DAG 的函数:
dynamic_dags.py:
def generate_dag(name):
with DAG(
dag_id=f'dag_{name}',
default_args=args,
start_date=days_ago(2),
schedule_interval='5 5 * * *',
tags=['Test'],
catchup=False
) as dag:
dummy_task=DummyOperator(
task_id="dynamic_dummy_task",
dag=dag
)
return dag
然后在另一个文件中我试图从一个单独的文件中导入 dags:
load_dags.py:
from dynamic_dag import generate_dag
globals()["Dynamic_DAG_A"] = generate_dag('A')
但是,dags 没有在网络上显示 UI。
但是如果我按照下面的代码在一个文件中执行它们,它将起作用:
def generate_dag(name):
with DAG(
dag_id=f'dag_{name}',
default_args=args,
start_date=days_ago(2),
schedule_interval='5 5 * * *',
tags=['Test'],
catchup=False
) as dag:
dummy_task=DummyOperator(
task_id="dynamic_dummy_task",
dag=dag
)
return dag
globals()["Dynamic_DAG_A"] = generate_dag('A')
我想知道为什么在两个单独的文件中执行此操作不起作用。
我认为如果您使用的是 Airflow 1.10,那么 DAG 文件应该包含 DAG
和 airlfow:
https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html?highlight=airflowignore#dags
When searching for DAGs, Airflow only considers python files that contain the strings “airflow” and “DAG” by default. To consider all python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.
在 Airflow 2 中它已被更改(略微 - dag 不区分大小写):
https://airflow.apache.org/docs/apache-airflow/2.2.2/concepts/dags.html
When searching for DAGs inside the DAG_FOLDER, Airflow only considers Python files that contain the strings airflow and dag (case-insensitively) as an optimization.
To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.
我认为您只是在 load_dags.py
中错过了 'airflow'。您可以在任何地方添加它 - 包括评论。
在气流中,我正在尝试制作一个专门用于在文件中生成 DAG 的函数:
dynamic_dags.py:
def generate_dag(name):
with DAG(
dag_id=f'dag_{name}',
default_args=args,
start_date=days_ago(2),
schedule_interval='5 5 * * *',
tags=['Test'],
catchup=False
) as dag:
dummy_task=DummyOperator(
task_id="dynamic_dummy_task",
dag=dag
)
return dag
然后在另一个文件中我试图从一个单独的文件中导入 dags:
load_dags.py:
from dynamic_dag import generate_dag
globals()["Dynamic_DAG_A"] = generate_dag('A')
但是,dags 没有在网络上显示 UI。 但是如果我按照下面的代码在一个文件中执行它们,它将起作用:
def generate_dag(name):
with DAG(
dag_id=f'dag_{name}',
default_args=args,
start_date=days_ago(2),
schedule_interval='5 5 * * *',
tags=['Test'],
catchup=False
) as dag:
dummy_task=DummyOperator(
task_id="dynamic_dummy_task",
dag=dag
)
return dag
globals()["Dynamic_DAG_A"] = generate_dag('A')
我想知道为什么在两个单独的文件中执行此操作不起作用。
我认为如果您使用的是 Airflow 1.10,那么 DAG 文件应该包含 DAG
和 airlfow:
https://airflow.apache.org/docs/apache-airflow/1.10.15/concepts.html?highlight=airflowignore#dags
When searching for DAGs, Airflow only considers python files that contain the strings “airflow” and “DAG” by default. To consider all python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.
在 Airflow 2 中它已被更改(略微 - dag 不区分大小写):
https://airflow.apache.org/docs/apache-airflow/2.2.2/concepts/dags.html
When searching for DAGs inside the DAG_FOLDER, Airflow only considers Python files that contain the strings airflow and dag (case-insensitively) as an optimization. To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.
我认为您只是在 load_dags.py
中错过了 'airflow'。您可以在任何地方添加它 - 包括评论。