Airflow - 将 Xcom Pull 结果传递给 TriggerDagRunOperator conf
Airflow - Pass Xcom Pull result to TriggerDagRunOperator conf
有谁知道以下代码有什么问题:
本质上,我正在调用 TriggerDagRunOperator,并且我试图根据 XCOM Pull 将一些 conf 传递给它。
触发DAG:
def _should_trigger(**_):
return {'Message': 'Hello World'}
should_trigger = PythonOperator(
task_id="should_trigger",
python_callable=_should_trigger,
provide_context=True,
)
trigger_bar_dag = TriggerDagRunOperator(
task_id="trigger_bar_dag",
trigger_dag_id="bar",
conf={"payload": "{{ task_instance.xcom_pull('should_trigger') }}"},
)
目标 DAG:
@dag(dag_id="bar",
default_args=default_args,
schedule_interval=None
)
def tasks():
run_this = PythonOperator(
task_id="run_this",
python_callable=run_this_func,
provide_context=True)
出于某种原因,在 run_this_func 中,我得到有效载荷:None。我似乎无法从 xcom pull 中通过 conf 流传递值。有谁知道这是如何实现的。我也尝试过 xcom pull 的不同变体,例如:
ti.xcom_pull(key='return_value', task_ids=['should_trigger']) 无济于事。
谢谢,
您需要指定 should_trigger >> trigger_bar_dag
,否则 XCom 记录可能还不存在,您将得到:
[2021-06-06 08:23:35,898] {logging_mixin.py:104} INFO - {'payload': 'None'}
但是一旦我添加了这个关系,我就会得到:
[2021-06-06 08:21:41,356] {logging_mixin.py:104} INFO - {'payload': "{'Message': 'Hello World'}"}
与
def run_this_func(**context):
print(context['params'])
这是我的真实代码:
def external_trigger(name):
def modify_dro(context, dagrun_order):
log.info('context: {}'.format(context))
# run_id from here
log.info('dagrun_order: {}'.format(dagrun_order))
dagrun_order.payload = {
'branch': "branch123",
'revision': 'revision123'
}
return dagrun_order
exteranl_run = TriggerDagRunOperator(task_id='external_' + name,
trigger_dag_id='se_perf_post_test',
python_callable=modify_dro,
on_failure_callback=airflow_on_fail,
task_concurrency=256,
provide_context=True,
trigger_rule='all_done',
dag=dag)
return exteranl_run
使用modify_dro func 为触发的 dag 传递变量。
在被触发的 DAG 上:
def prepare_build(**args):
log.info("args: " + str(args))
run_id = args['run_id']
log.info("run id " + run_id)
conf = args['dag_run'].conf
log.info('conf: {}'.format(conf))
branch = conf.get('branch')
revision = conf.get('revision')
log.info("run build on branch: {}, revision: {}".format(branch, revision))
希望对您有所帮助。
有谁知道以下代码有什么问题:
本质上,我正在调用 TriggerDagRunOperator,并且我试图根据 XCOM Pull 将一些 conf 传递给它。
触发DAG:
def _should_trigger(**_):
return {'Message': 'Hello World'}
should_trigger = PythonOperator(
task_id="should_trigger",
python_callable=_should_trigger,
provide_context=True,
)
trigger_bar_dag = TriggerDagRunOperator(
task_id="trigger_bar_dag",
trigger_dag_id="bar",
conf={"payload": "{{ task_instance.xcom_pull('should_trigger') }}"},
)
目标 DAG:
@dag(dag_id="bar",
default_args=default_args,
schedule_interval=None
)
def tasks():
run_this = PythonOperator(
task_id="run_this",
python_callable=run_this_func,
provide_context=True)
出于某种原因,在 run_this_func 中,我得到有效载荷:None。我似乎无法从 xcom pull 中通过 conf 流传递值。有谁知道这是如何实现的。我也尝试过 xcom pull 的不同变体,例如: ti.xcom_pull(key='return_value', task_ids=['should_trigger']) 无济于事。
谢谢,
您需要指定 should_trigger >> trigger_bar_dag
,否则 XCom 记录可能还不存在,您将得到:
[2021-06-06 08:23:35,898] {logging_mixin.py:104} INFO - {'payload': 'None'}
但是一旦我添加了这个关系,我就会得到:
[2021-06-06 08:21:41,356] {logging_mixin.py:104} INFO - {'payload': "{'Message': 'Hello World'}"}
与
def run_this_func(**context):
print(context['params'])
这是我的真实代码:
def external_trigger(name):
def modify_dro(context, dagrun_order):
log.info('context: {}'.format(context))
# run_id from here
log.info('dagrun_order: {}'.format(dagrun_order))
dagrun_order.payload = {
'branch': "branch123",
'revision': 'revision123'
}
return dagrun_order
exteranl_run = TriggerDagRunOperator(task_id='external_' + name,
trigger_dag_id='se_perf_post_test',
python_callable=modify_dro,
on_failure_callback=airflow_on_fail,
task_concurrency=256,
provide_context=True,
trigger_rule='all_done',
dag=dag)
return exteranl_run
使用modify_dro func 为触发的 dag 传递变量。
在被触发的 DAG 上:
def prepare_build(**args):
log.info("args: " + str(args))
run_id = args['run_id']
log.info("run id " + run_id)
conf = args['dag_run'].conf
log.info('conf: {}'.format(conf))
branch = conf.get('branch')
revision = conf.get('revision')
log.info("run build on branch: {}, revision: {}".format(branch, revision))
希望对您有所帮助。