在 Airflow 2.0 中读取 yaml 配置文件并创建 DAG 生成器

Reading a yaml configuration file and creating a DAG generator in Airflow 2.0

我是 Airflow 2.0 的新手,我真的很难找到一种方法来远程保存我的 DAG,而不是在调度程序中自动提交它。我有一个配置文件,可以为我的 Spark 作业加载设置。

我正在尝试编写一个 实用程序 python 文件 来读取配置文件、解析它并创建一个 DAG 文件。我已经使用 Astronomer 的 create_dag 示例完成了它,但是它直接提交了 DAG,除了 UI.

之外,我无法看到生成的 DAG 代码
  1. 如何实现保存 DAG 文件并稍后提交?
  2. 此外,我的实用程序是否可以有某种模板,其中包括我需要远程创建和保存 DAG 文件以便稍后提交的运算符和参数? (如果没有这个,我创建了一个带有硬编码值的示例 dag,但我想要一个可以为我执行此操作并远程保存的实用程序)
  3. 有例子吗?

我假设您指的是 Dynamically Generating DAGs in Airflow 上的本指南。

  1. “保存 DAG 文件”而不是让 Airflow 动态创建 DAG 的一种方法是预先生成文件。例如,您可以将 CI/CD 管道中的一个步骤添加到 运行 生成 python 文件的脚本,然后将其推送到调度程序。

  2. 这个过程可以描述为准备和渲染模板。

    您可以使用 Jinja 来完成此操作。

    有趣的是,Airflow also uses Jinja 构建其网页以及允许用户利用 jinja 模板呈现文件和参数!

  3. 以下示例应该可以帮助您入门。

generate_file.py

from jinja2 import Environment, FileSystemLoader
import os

file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))

template = env.get_template('dag.template')

# I don't know what the configuration format but as long as you can convert to a dictionary, it can work.
values = {}

filename = os.path.join(file_dir, 'dag.py')
with open(filename, 'w') as fh:
 fh.write(template.render(
     dag_id="my_dag",
     num_task=2,
     **values
 ))

dag.template

from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='{{ dag_id }}',
    schedule_interval='@once',
    start_date=datetime(2020, 1, 1)
)

with dag:
    dummy = DummyOperator(
        task_id='test',
        dag=dag
    )
{% for n in range(num_task) %}
    op_{{ loop.index }} = PythonOperator(
        task_id='python_op_{{ loop.index }}',
        dag=dag
    )
{% endfor %}

    op_1 >> op2 >> op3

生成的文件 dag.py 将如下所示。

from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='my_dag',
    schedule_interval='@once',
    start_date=datetime(2020, 1, 1)
)

with dag:
    dummy = DummyOperator(
        task_id='test',
        dag=dag
    )

    op_1 = PythonOperator(
        task_id='python_op_1',
        dag=dag
    )

    op_2 = PythonOperator(
        task_id='python_op_2',
        dag=dag
    )
    
    op_1 >> op2 >> op3