使用 Airflow 中所有任务的密钥获取 xcoms
Get xcoms with a key from all tasks in Airflow
我有两个 Airflow 任务正在使用相同的键 srcDbName
推送 xcoms
,但具有不同的值。这两个任务之后是一个任务,该任务使用键 srcDbName
读取 xcoms
并打印它们的值。请看下面的代码:
def _fill_facebook_task(ti):
ti.xcom_push(key='srcDbName', value='SRC_PL_Facebook')
def _fill_trip_advisor_task(ti):
ti.xcom_push(key='srcDbName', value='SRC_PL_TripAdvisor')
def _pm_task(ti):
values = ti.xcom_pull(key='srcDbName')
print(', '.join(values))
facebook = PythonOperator(
task_id="fill-facebook",
python_callable= _fill_facebook_task,
dag=dag
)
tripAdvisor = PythonOperator(
task_id="fill-trip-advisor",
python_callable=_fill_trip_advisor_task,
dag=dag
)
pm = PythonOperator(
task_id="premises-matching",
python_callable=_pm_task,
dag=dag
)
facebook >> pm
tripAdvisor >> pm
我希望 pm
任务应该打印出来
SRC_PL_Facebook, SRC_PL_TripAdvisor
(或以不同的顺序)因为 documentation for xcom_pull 声明:
:param task_ids: Only XComs from tasks with matching ids will be
pulled. Can pass None to remove the filter.
实际上,它打印
S, R, C, _, P, L, _, F, a, c, e, b, o, o, k
是否可以从所有上游任务中使用给定的密钥读取所有 xcoms
?
要阅读所有 xcom,您需要将所有上游 task_instance 名称作为参数传递给 xcom_pull
。 xcom_pull
方法的文档肯定没有说清楚——它只说:
If a single task_id string is provided, the result is the value of the most
recent matching XCom from that task_id. If multiple task_ids are provided, a
tuple of matching values is returned. None is returned whenever no matches
are found.
但它还应该提到,如果您不传递任何 task_ids,那么 xcom_pull
将仅 return 它找到的第一个(如果有的话)匹配值。您可以在 code for airflow.models.taskinstance.
中验证该行为
我有两个 Airflow 任务正在使用相同的键 srcDbName
推送 xcoms
,但具有不同的值。这两个任务之后是一个任务,该任务使用键 srcDbName
读取 xcoms
并打印它们的值。请看下面的代码:
def _fill_facebook_task(ti):
ti.xcom_push(key='srcDbName', value='SRC_PL_Facebook')
def _fill_trip_advisor_task(ti):
ti.xcom_push(key='srcDbName', value='SRC_PL_TripAdvisor')
def _pm_task(ti):
values = ti.xcom_pull(key='srcDbName')
print(', '.join(values))
facebook = PythonOperator(
task_id="fill-facebook",
python_callable= _fill_facebook_task,
dag=dag
)
tripAdvisor = PythonOperator(
task_id="fill-trip-advisor",
python_callable=_fill_trip_advisor_task,
dag=dag
)
pm = PythonOperator(
task_id="premises-matching",
python_callable=_pm_task,
dag=dag
)
facebook >> pm
tripAdvisor >> pm
我希望 pm
任务应该打印出来
SRC_PL_Facebook, SRC_PL_TripAdvisor
(或以不同的顺序)因为 documentation for xcom_pull 声明:
:param task_ids: Only XComs from tasks with matching ids will be pulled. Can pass None to remove the filter.
实际上,它打印
S, R, C, _, P, L, _, F, a, c, e, b, o, o, k
是否可以从所有上游任务中使用给定的密钥读取所有 xcoms
?
要阅读所有 xcom,您需要将所有上游 task_instance 名称作为参数传递给 xcom_pull
。 xcom_pull
方法的文档肯定没有说清楚——它只说:
If a single task_id string is provided, the result is the value of the most
recent matching XCom from that task_id. If multiple task_ids are provided, a
tuple of matching values is returned. None is returned whenever no matches
are found.
但它还应该提到,如果您不传递任何 task_ids,那么 xcom_pull
将仅 return 它找到的第一个(如果有的话)匹配值。您可以在 code for airflow.models.taskinstance.