Airflow - 在通过 TriggerDagRunOperator 发送它们之前设置 dag_run conf 值
Airflow - Set dag_run conf values before sending them through TriggerDagRunOperator
我遇到一个问题,我试图在代码中设置 dag_run.conf 值,然后通过 TriggerDagRunOperator 将其发送到另一个 DAG。
示例:
def _should_trigger(dag_run, **_):
***** Somehow set dag_run.conf values here******
eg: dag_run.conf['Message'] = 'Hello World'
should_trigger = PythonOperator(
task_id="should_trigger",
python_callable=_should_trigger,
provide_context=True,
)
trigger_bar_dag = TriggerDagRunOperator(
task_id="trigger_bar_dag",
trigger_dag_id="bar",
conf={"payload": "{{ dag_run.conf }}"},
)
然后在目标 DAG“栏”中,我希望能够检索在 'Hello world' 的 _should_trigger 中设置的值。这可能吗,因为我尝试了无数种不同的方法,但似乎无法解决这个问题?
谢谢,
而不是修改 DagRun.conf(我想这不会起作用,因为更改不会持久化)我会尝试生成代码并使用 XCom:
def _should_trigger(**_):
return {'Message': 'Hello World'}
should_trigger = PythonOperator(
task_id="should_trigger",
python_callable=_should_trigger,
provide_context=True,
)
trigger_bar_dag = TriggerDagRunOperator(
task_id="trigger_bar_dag",
trigger_dag_id="bar",
conf={"payload": "{{ task_instance.xcom_pull('should_trigger') }}"},
)
或更简单:
conf="{{ task_instance.xcom_pull('should_trigger') }}"
请注意,该词典必须 JSON 可序列化才能与 Airflow 默认 XCom 后端一起使用。
我遇到一个问题,我试图在代码中设置 dag_run.conf 值,然后通过 TriggerDagRunOperator 将其发送到另一个 DAG。
示例:
def _should_trigger(dag_run, **_):
***** Somehow set dag_run.conf values here******
eg: dag_run.conf['Message'] = 'Hello World'
should_trigger = PythonOperator(
task_id="should_trigger",
python_callable=_should_trigger,
provide_context=True,
)
trigger_bar_dag = TriggerDagRunOperator(
task_id="trigger_bar_dag",
trigger_dag_id="bar",
conf={"payload": "{{ dag_run.conf }}"},
)
然后在目标 DAG“栏”中,我希望能够检索在 'Hello world' 的 _should_trigger 中设置的值。这可能吗,因为我尝试了无数种不同的方法,但似乎无法解决这个问题?
谢谢,
而不是修改 DagRun.conf(我想这不会起作用,因为更改不会持久化)我会尝试生成代码并使用 XCom:
def _should_trigger(**_):
return {'Message': 'Hello World'}
should_trigger = PythonOperator(
task_id="should_trigger",
python_callable=_should_trigger,
provide_context=True,
)
trigger_bar_dag = TriggerDagRunOperator(
task_id="trigger_bar_dag",
trigger_dag_id="bar",
conf={"payload": "{{ task_instance.xcom_pull('should_trigger') }}"},
)
或更简单:
conf="{{ task_instance.xcom_pull('should_trigger') }}"
请注意,该词典必须 JSON 可序列化才能与 Airflow 默认 XCom 后端一起使用。