Airflow - 将 Xcom Pull 结果传递给 TriggerDagRunOperator conf

Airflow - Pass Xcom Pull result to TriggerDagRunOperator conf

有谁知道以下代码有什么问题:

本质上,我正在调用 TriggerDagRunOperator,并且我试图根据 XCOM Pull 将一些 conf 传递给它。

触发DAG:

def _should_trigger(**_):
    return {'Message': 'Hello World'}


should_trigger = PythonOperator(
    task_id="should_trigger",
    python_callable=_should_trigger,
    provide_context=True,
)

trigger_bar_dag = TriggerDagRunOperator(
    task_id="trigger_bar_dag",
    trigger_dag_id="bar",
    conf={"payload": "{{ task_instance.xcom_pull('should_trigger') }}"},
)

目标 DAG:

@dag(dag_id="bar",
     default_args=default_args,
     schedule_interval=None
     )
def tasks():
    run_this = PythonOperator(
        task_id="run_this",
        python_callable=run_this_func,
        provide_context=True)

出于某种原因,在 run_this_func 中,我得到有效载荷:None。我似乎无法从 xcom pull 中通过 conf 流传递值。有谁知道这是如何实现的。我也尝试过 xcom pull 的不同变体,例如: ti.xcom_pull(key='return_value', task_ids=['should_trigger']) 无济于事。

谢谢,

您需要指定 should_trigger >> trigger_bar_dag,否则 XCom 记录可能还不存在,您将得到:

[2021-06-06 08:23:35,898] {logging_mixin.py:104} INFO - {'payload': 'None'}

但是一旦我添加了这个关系,我就会得到:

[2021-06-06 08:21:41,356] {logging_mixin.py:104} INFO - {'payload': "{'Message': 'Hello World'}"}

def run_this_func(**context):
    print(context['params'])

这是我的真实代码:


def external_trigger(name):

  def modify_dro(context, dagrun_order):
    log.info('context: {}'.format(context))
    # run_id from here
    log.info('dagrun_order: {}'.format(dagrun_order))
    dagrun_order.payload = {
      'branch': "branch123",
      'revision': 'revision123'
    }
    return dagrun_order

  exteranl_run = TriggerDagRunOperator(task_id='external_' + name,
                          trigger_dag_id='se_perf_post_test',
                          python_callable=modify_dro,
                          on_failure_callback=airflow_on_fail,
                          task_concurrency=256,
                          provide_context=True,
                          trigger_rule='all_done',
                          dag=dag)
  return exteranl_run

使用modify_dro func 为触发的 dag 传递变量。

在被触发的 DAG 上:

def prepare_build(**args):
  log.info("args: " + str(args))
  run_id = args['run_id']
  log.info("run id " + run_id)
  conf = args['dag_run'].conf
  log.info('conf: {}'.format(conf))
  branch = conf.get('branch')
  revision = conf.get('revision')
  log.info("run build on branch: {}, revision: {}".format(branch, revision))

希望对您有所帮助。