Airflow - 在通过 TriggerDagRunOperator 发送它们之前设置 dag_run conf 值

Airflow - Set dag_run conf values before sending them through TriggerDagRunOperator

我遇到一个问题,我试图在代码中设置 dag_run.conf 值,然后通过 TriggerDagRunOperator 将其发送到另一个 DAG。

示例:

    def _should_trigger(dag_run, **_):
        ***** Somehow set dag_run.conf values here******
        eg: dag_run.conf['Message'] = 'Hello World'
    
    
    should_trigger = PythonOperator(
        task_id="should_trigger",
        python_callable=_should_trigger,
        provide_context=True,
    )
    
    trigger_bar_dag = TriggerDagRunOperator(
        task_id="trigger_bar_dag",
        trigger_dag_id="bar",
        conf={"payload": "{{ dag_run.conf }}"},
    )

然后在目标 DAG“栏”中,我希望能够检索在 'Hello world' 的 _should_trigger 中设置的值。这可能吗,因为我尝试了无数种不同的方法,但似乎无法解决这个问题?

谢谢,

而不是修改 DagRun.conf(我想这不会起作用,因为更改不会持久化)我会尝试生成代码并使用 XCom:

def _should_trigger(**_):
    return {'Message': 'Hello World'}


should_trigger = PythonOperator(
    task_id="should_trigger",
    python_callable=_should_trigger,
    provide_context=True,
)

trigger_bar_dag = TriggerDagRunOperator(
    task_id="trigger_bar_dag",
    trigger_dag_id="bar",
    conf={"payload": "{{ task_instance.xcom_pull('should_trigger') }}"},
)

或更简单:

conf="{{ task_instance.xcom_pull('should_trigger') }}"

请注意,该词典必须 JSON 可序列化才能与 Airflow 默认 XCom 后端一起使用。