是否可以在 Airflow 中一次从 XCOM 获取单个任务的多个值?

Is it possible to get multiple values from XCOM for a single task at once in Airflow?

是否可以从 XCOM 检索多个值,由单个任务推送但使用不同的键?

我想我见过这样的例子:

# pulls one value
pulled_value = ti.xcom_pull(key=None, task_ids='my_task_id')

对此:

# pulls multiple values but from multiple tasks
pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None, task_ids=['my_task_id_1', 'my_task_id_2'])

我需要的可能是这样的:

# pulls multiple values but from a single
pulled_value_1, pulled_value_2 = ti.xcom_pull(key=['my_key_1', 'my_key_2'], task_ids='my_task_id')

我在文档中找不到这个。
这可能吗?
如果是,它在后台进行单个数据库查询,还是只是多次重复单个查询? 如果没有,我怎么会得到类似的行为?

回答您的问题:

  1. 没有这个功能。 xcom_pull 接受 task_ids: Optional[Union[str, Iterable[str]]] 但使用相同的密钥。您可以打开 Airflow 的 PR 以添加您寻求的功能。

  2. 至于查询数量:我假设通过“重复单个查询”,您是在询问它是否按 task_id 执行一个查询。答案是否定的。Airflow 在 PR 中做了这个优化。 在源代码中你可以看到 xcom_pull() uses Xcom.get_many() and get_many creates a filter using IN which allows to compare against multiple values. You can see the relevant code lines here.