气流中带有 Conf 变量的动态 Dag
Dynamic Dag with Conf Variables in Airflow
我正在尝试制作一个 dag,它接受一个 conf 参数,该参数基本上解析文件名以在 dag 的下游使用。因此,将 something.txt
解析为 something
来处理我在管道中进行的文件名转换。
我得到 airflow.exceptions.AirflowException: Task is missing the start_date parameter
所以我想知道我是否走在正确的轨道上,以及是否有人有任何建议,因为我无法通过 Variable 对这些变量进行硬编码。我计划执行此 dag 的方式是在 bash 中循环它,同时将文件名传递给 conf 参数。
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime
import os
# Basic arguments to pass to Airflow
args = {
'owner': 'airflow',
'start_date': datetime.now(),
}
# Create the head dag
dag = DAG(
dag_id='test_multiparameters',
default_args=args,
schedule_interval=None)
input_dir = '/input/dchen71/dog/'
templated_command1 = """
touch /input/dchen71/dog/{{ dag_run.conf['file_name'] }}
"""
# Touch data file
touchy = BashOperator(
task_id='create_data',
bash_command=templated_command1,
dag=dag)
def decat(**kwargs):
# Generator function for dynamic dags
base_name = os.path.splitext(context['dag_run'].conf['file_name'])[0]
cat_template = """
cat {input_dir}/{base_name}.txt{input_dir}/{base_name}.txt> {input_dir}/meow.txt
""".format(input_dir = input_dir, base_name = base_name)
return BashOperator(
task_id = "create_data",
bash_command = cat_template,
dag = dag
)
dacat = PythonOperator(
task_id="dacat",
python_callable = decat,
provides_context = True
)
touchy >> dacat
我正在尝试制作一个 dag,它接受一个 conf 参数,该参数基本上解析文件名以在 dag 的下游使用。因此,将 something.txt
解析为 something
来处理我在管道中进行的文件名转换。
我得到 airflow.exceptions.AirflowException: Task is missing the start_date parameter
所以我想知道我是否走在正确的轨道上,以及是否有人有任何建议,因为我无法通过 Variable 对这些变量进行硬编码。我计划执行此 dag 的方式是在 bash 中循环它,同时将文件名传递给 conf 参数。
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime
import os
# Basic arguments to pass to Airflow
args = {
'owner': 'airflow',
'start_date': datetime.now(),
}
# Create the head dag
dag = DAG(
dag_id='test_multiparameters',
default_args=args,
schedule_interval=None)
input_dir = '/input/dchen71/dog/'
templated_command1 = """
touch /input/dchen71/dog/{{ dag_run.conf['file_name'] }}
"""
# Touch data file
touchy = BashOperator(
task_id='create_data',
bash_command=templated_command1,
dag=dag)
def decat(**kwargs):
# Generator function for dynamic dags
base_name = os.path.splitext(context['dag_run'].conf['file_name'])[0]
cat_template = """
cat {input_dir}/{base_name}.txt{input_dir}/{base_name}.txt> {input_dir}/meow.txt
""".format(input_dir = input_dir, base_name = base_name)
return BashOperator(
task_id = "create_data",
bash_command = cat_template,
dag = dag
)
dacat = PythonOperator(
task_id="dacat",
python_callable = decat,
provides_context = True
)
touchy >> dacat