Airflow 2 将@task return 值松散耦合到接收@task?

Airflow 2 loosely coupling @task return values to receiving @task?

我正在尝试编写两个互不了解的任务。一个任务 returns 一个字典(通过 XComArg),我想将该对象的单个 属性 传递给下一个任务。如果我传递整个 XComArg 对象,它的值将按预期填充。但是选择单个 属性 会导致 None.

@dag(...):
def _dag():

   @task
   def A(**ctx):
      # ...
      return {'a': 42, 'b': 'B', 'c': 'C'}

   @task
   def B(a, _res, **ctx):
      print('A', a) # >>> A None
      print('RES', _res) # >>> RES {'a': 42, ...}

   res = A()
   B(res['a'])

dag = _dag

理想情况下,B 不知道 a 的值从何而来,也不知道如何获取它。是的,传递所有 res 并让 B 使用 res['a'] 提取所需的内容是可行的,但我的目标是松散耦合。

参见 https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html

中的示例

您需要在任务 A

中指定“multiple_outputs=true”