使用 Airflow 中所有任务的密钥获取 xcoms

Get xcoms with a key from all tasks in Airflow

我有两个 Airflow 任务正在使用相同的键 srcDbName 推送 xcoms,但具有不同的值。这两个任务之后是一个任务,该任务使用键 srcDbName 读取 xcoms 并打印它们的值。请看下面的代码:

def _fill_facebook_task(ti):
    ti.xcom_push(key='srcDbName', value='SRC_PL_Facebook')

def _fill_trip_advisor_task(ti):
    ti.xcom_push(key='srcDbName', value='SRC_PL_TripAdvisor')

def _pm_task(ti):
    values = ti.xcom_pull(key='srcDbName')
    print(', '.join(values))

facebook = PythonOperator(
    task_id="fill-facebook",
    python_callable= _fill_facebook_task,
    dag=dag
)

tripAdvisor = PythonOperator(
    task_id="fill-trip-advisor",
    python_callable=_fill_trip_advisor_task,
    dag=dag
)

pm = PythonOperator(
    task_id="premises-matching",
    python_callable=_pm_task,
    dag=dag
)

facebook  >> pm
tripAdvisor >> pm

我希望 pm 任务应该打印出来

SRC_PL_Facebook, SRC_PL_TripAdvisor

(或以不同的顺序)因为 documentation for xcom_pull 声明:

:param task_ids: Only XComs from tasks with matching ids will be pulled. Can pass None to remove the filter.

实际上,它打印

S, R, C, _, P, L, _, F, a, c, e, b, o, o, k

是否可以从所有上游任务中使用给定的密钥读取所有 xcoms

要阅读所有 xcom,您需要将所有上游 task_instance 名称作为参数传递给 xcom_pullxcom_pull 方法的文档肯定没有说清楚——它只说:

If a single task_id string is provided, the result is the value of the most
recent matching XCom from that task_id. If multiple task_ids are provided, a
tuple of matching values is returned. None is returned whenever no matches
are found.

但它还应该提到,如果您不传递任何 task_ids,那么 xcom_pull 将仅 return 它找到的第一个(如果有的话)匹配值。您可以在 code for airflow.models.taskinstance.

中验证该行为