Airflow - TriggerDagRunOperator - 使用不同的 Conf 调用相同的 DAG
Airflow - TriggerDagRunOperator - Call same DAG with different Conf
我有一个动态 DAG (dag_1),它由另一个 DAG (dag_0) 使用 TriggerDagRunOperator 编排。
dag_1 是一个非常简单的脚本:
`从日期时间导入日期时间
从 airflow.models 导入 DAG
从 airflow.operators.python_operator 导入 PythonOperator
default_args = {
'provide_context': True,
}
def get_list(**context):
p_list = ['a','b','c']
res = context["dag_run"].conf['message']
p_list.append(res)
return p_list
with DAG(
dag_id='dag_1',
schedule_interval='@once',
start_date=datetime(2022, 3, 1)
, default_args=default_args
) as dag:
data_list = PythonOperator(
task_id='get_lists',
python_callable=get_list,
dag=dag)
for i in get_list():
bash_task = BashOperator(task_id='bash_task_{}'.format(i), bash_command="echo 'command executed {}'.format(i)")
And my dag_0 that will run orchestrate the dag_1 based on the parameters passed:
`from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
from airflow import DAG
dag = DAG(
'dag_0',
description='create the table structure',
schedule_interval='@once',
catchup=False,
max_active_runs=1,
start_date=datetime(2022, 3, 1)
)
trigger_step = TriggerDagRunOperator(
task_id="trigger_step",
trigger_dag_id="dag_1",
conf={"message": "Hello Word 1"},
dag=dag
)
trigger_step2 = TriggerDagRunOperator(
task_id="trigger_step2",
trigger_dag_id="dag_1",
conf={"message": "Hello Word 2"},
dag=dag
)
trigger_step >> trigger_step2`
如果我 运行 这不需要将 PythonOperator 传递给列表,当我把那部分“for i in get_list()”放在我得到的第一个脚本时它工作正常:
res = context["dag_run"].conf['message'] KeyError: 'dag_run'
我做错了什么?
试试下面的方法。我认为是因为 dag_run
对象是在 运行 时间创建的。
def get_list(dag_run=None,**context):
p_list = ['a','b','c']
res = dag_run.conf.get('message')
p_list.append(res)
return p_list
我有一个动态 DAG (dag_1),它由另一个 DAG (dag_0) 使用 TriggerDagRunOperator 编排。
dag_1 是一个非常简单的脚本: `从日期时间导入日期时间 从 airflow.models 导入 DAG 从 airflow.operators.python_operator 导入 PythonOperator
default_args = {
'provide_context': True,
}
def get_list(**context):
p_list = ['a','b','c']
res = context["dag_run"].conf['message']
p_list.append(res)
return p_list
with DAG(
dag_id='dag_1',
schedule_interval='@once',
start_date=datetime(2022, 3, 1)
, default_args=default_args
) as dag:
data_list = PythonOperator(
task_id='get_lists',
python_callable=get_list,
dag=dag)
for i in get_list():
bash_task = BashOperator(task_id='bash_task_{}'.format(i), bash_command="echo 'command executed {}'.format(i)")
And my dag_0 that will run orchestrate the dag_1 based on the parameters passed:
`from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
from airflow import DAG
dag = DAG(
'dag_0',
description='create the table structure',
schedule_interval='@once',
catchup=False,
max_active_runs=1,
start_date=datetime(2022, 3, 1)
)
trigger_step = TriggerDagRunOperator(
task_id="trigger_step",
trigger_dag_id="dag_1",
conf={"message": "Hello Word 1"},
dag=dag
)
trigger_step2 = TriggerDagRunOperator(
task_id="trigger_step2",
trigger_dag_id="dag_1",
conf={"message": "Hello Word 2"},
dag=dag
)
trigger_step >> trigger_step2`
如果我 运行 这不需要将 PythonOperator 传递给列表,当我把那部分“for i in get_list()”放在我得到的第一个脚本时它工作正常:
res = context["dag_run"].conf['message'] KeyError: 'dag_run'
我做错了什么?
试试下面的方法。我认为是因为 dag_run
对象是在 运行 时间创建的。
def get_list(dag_run=None,**context):
p_list = ['a','b','c']
res = dag_run.conf.get('message')
p_list.append(res)
return p_list