Airflow - TriggerDagRunOperator - 使用不同的 Conf 调用相同的 DAG

Airflow - TriggerDagRunOperator - Call same DAG with different Conf

我有一个动态 DAG (dag_1),它由另一个 DAG (dag_0) 使用 TriggerDagRunOperator 编排。

dag_1 是一个非常简单的脚本: `从日期时间导入日期时间 从 airflow.models 导入 DAG 从 airflow.operators.python_operator 导入 PythonOperator

default_args = { 'provide_context': True, }

def get_list(**context): p_list = ['a','b','c'] res = context["dag_run"].conf['message'] p_list.append(res) return p_list

with DAG( dag_id='dag_1', schedule_interval='@once', start_date=datetime(2022, 3, 1) , default_args=default_args ) as dag: data_list = PythonOperator( task_id='get_lists', python_callable=get_list, dag=dag) for i in get_list(): bash_task = BashOperator(task_id='bash_task_{}'.format(i), bash_command="echo 'command executed {}'.format(i)")

And my dag_0 that will run orchestrate the dag_1 based on the parameters passed: `from airflow.operators.trigger_dagrun import TriggerDagRunOperator from datetime import datetime from airflow import DAG

dag = DAG( 'dag_0', description='create the table structure', schedule_interval='@once', catchup=False, max_active_runs=1, start_date=datetime(2022, 3, 1) )

trigger_step = TriggerDagRunOperator( task_id="trigger_step", trigger_dag_id="dag_1", conf={"message": "Hello Word 1"}, dag=dag )

trigger_step2 = TriggerDagRunOperator( task_id="trigger_step2", trigger_dag_id="dag_1", conf={"message": "Hello Word 2"}, dag=dag )

trigger_step >> trigger_step2`

如果我 运行 这不需要将 PythonOperator 传递给列表,当我把那部分“for i in get_list()”放在我得到的第一个脚本时它工作正常: res = context["dag_run"].conf['message'] KeyError: 'dag_run'

我做错了什么?

试试下面的方法。我认为是因为 dag_run 对象是在 运行 时间创建的。

def get_list(dag_run=None,**context): 
    p_list = ['a','b','c'] 
    res = dag_run.conf.get('message') 
    p_list.append(res) 
    return p_list