在 Airflow 2.0 中读取 yaml 配置文件并创建 DAG 生成器
Reading a yaml configuration file and creating a DAG generator in Airflow 2.0
我是 Airflow 2.0 的新手,我真的很难找到一种方法来远程保存我的 DAG,而不是在调度程序中自动提交它。我有一个配置文件,可以为我的 Spark 作业加载设置。
我正在尝试编写一个 实用程序 python 文件 来读取配置文件、解析它并创建一个 DAG 文件。我已经使用 Astronomer 的 create_dag
示例完成了它,但是它直接提交了 DAG,除了 UI.
之外,我无法看到生成的 DAG 代码
- 如何实现保存 DAG 文件并稍后提交?
- 此外,我的实用程序是否可以有某种模板,其中包括我需要远程创建和保存 DAG 文件以便稍后提交的运算符和参数? (如果没有这个,我创建了一个带有硬编码值的示例 dag,但我想要一个可以为我执行此操作并远程保存的实用程序)
- 有例子吗?
我假设您指的是 Dynamically Generating DAGs in Airflow 上的本指南。
“保存 DAG 文件”而不是让 Airflow 动态创建 DAG 的一种方法是预先生成文件。例如,您可以将 CI/CD 管道中的一个步骤添加到 运行 生成 python 文件的脚本,然后将其推送到调度程序。
这个过程可以描述为准备和渲染模板。
您可以使用 Jinja 来完成此操作。
有趣的是,Airflow also uses Jinja 构建其网页以及允许用户利用 jinja 模板呈现文件和参数!
以下示例应该可以帮助您入门。
generate_file.py
from jinja2 import Environment, FileSystemLoader
import os
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('dag.template')
# I don't know what the configuration format but as long as you can convert to a dictionary, it can work.
values = {}
filename = os.path.join(file_dir, 'dag.py')
with open(filename, 'w') as fh:
fh.write(template.render(
dag_id="my_dag",
num_task=2,
**values
))
dag.template
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='{{ dag_id }}',
schedule_interval='@once',
start_date=datetime(2020, 1, 1)
)
with dag:
dummy = DummyOperator(
task_id='test',
dag=dag
)
{% for n in range(num_task) %}
op_{{ loop.index }} = PythonOperator(
task_id='python_op_{{ loop.index }}',
dag=dag
)
{% endfor %}
op_1 >> op2 >> op3
生成的文件 dag.py 将如下所示。
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='my_dag',
schedule_interval='@once',
start_date=datetime(2020, 1, 1)
)
with dag:
dummy = DummyOperator(
task_id='test',
dag=dag
)
op_1 = PythonOperator(
task_id='python_op_1',
dag=dag
)
op_2 = PythonOperator(
task_id='python_op_2',
dag=dag
)
op_1 >> op2 >> op3
我是 Airflow 2.0 的新手,我真的很难找到一种方法来远程保存我的 DAG,而不是在调度程序中自动提交它。我有一个配置文件,可以为我的 Spark 作业加载设置。
我正在尝试编写一个 实用程序 python 文件 来读取配置文件、解析它并创建一个 DAG 文件。我已经使用 Astronomer 的 create_dag
示例完成了它,但是它直接提交了 DAG,除了 UI.
- 如何实现保存 DAG 文件并稍后提交?
- 此外,我的实用程序是否可以有某种模板,其中包括我需要远程创建和保存 DAG 文件以便稍后提交的运算符和参数? (如果没有这个,我创建了一个带有硬编码值的示例 dag,但我想要一个可以为我执行此操作并远程保存的实用程序)
- 有例子吗?
我假设您指的是 Dynamically Generating DAGs in Airflow 上的本指南。
“保存 DAG 文件”而不是让 Airflow 动态创建 DAG 的一种方法是预先生成文件。例如,您可以将 CI/CD 管道中的一个步骤添加到 运行 生成 python 文件的脚本,然后将其推送到调度程序。
这个过程可以描述为准备和渲染模板。
您可以使用 Jinja 来完成此操作。
有趣的是,Airflow also uses Jinja 构建其网页以及允许用户利用 jinja 模板呈现文件和参数!
以下示例应该可以帮助您入门。
generate_file.py
from jinja2 import Environment, FileSystemLoader
import os
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('dag.template')
# I don't know what the configuration format but as long as you can convert to a dictionary, it can work.
values = {}
filename = os.path.join(file_dir, 'dag.py')
with open(filename, 'w') as fh:
fh.write(template.render(
dag_id="my_dag",
num_task=2,
**values
))
dag.template
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='{{ dag_id }}',
schedule_interval='@once',
start_date=datetime(2020, 1, 1)
)
with dag:
dummy = DummyOperator(
task_id='test',
dag=dag
)
{% for n in range(num_task) %}
op_{{ loop.index }} = PythonOperator(
task_id='python_op_{{ loop.index }}',
dag=dag
)
{% endfor %}
op_1 >> op2 >> op3
生成的文件 dag.py 将如下所示。
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='my_dag',
schedule_interval='@once',
start_date=datetime(2020, 1, 1)
)
with dag:
dummy = DummyOperator(
task_id='test',
dag=dag
)
op_1 = PythonOperator(
task_id='python_op_1',
dag=dag
)
op_2 = PythonOperator(
task_id='python_op_2',
dag=dag
)
op_1 >> op2 >> op3