Airflow 2 将@task return 值松散耦合到接收@task?
Airflow 2 loosely coupling @task return values to receiving @task?
我正在尝试编写两个互不了解的任务。一个任务 returns 一个字典(通过 XComArg
),我想将该对象的单个 属性 传递给下一个任务。如果我传递整个 XComArg
对象,它的值将按预期填充。但是选择单个 属性 会导致 None
.
@dag(...):
def _dag():
@task
def A(**ctx):
# ...
return {'a': 42, 'b': 'B', 'c': 'C'}
@task
def B(a, _res, **ctx):
print('A', a) # >>> A None
print('RES', _res) # >>> RES {'a': 42, ...}
res = A()
B(res['a'])
dag = _dag
理想情况下,B
不知道 a
的值从何而来,也不知道如何获取它。是的,传递所有 res
并让 B
使用 res['a']
提取所需的内容是可行的,但我的目标是松散耦合。
参见 https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html
中的示例
您需要在任务 A
中指定“multiple_outputs=true”
我正在尝试编写两个互不了解的任务。一个任务 returns 一个字典(通过 XComArg
),我想将该对象的单个 属性 传递给下一个任务。如果我传递整个 XComArg
对象,它的值将按预期填充。但是选择单个 属性 会导致 None
.
@dag(...):
def _dag():
@task
def A(**ctx):
# ...
return {'a': 42, 'b': 'B', 'c': 'C'}
@task
def B(a, _res, **ctx):
print('A', a) # >>> A None
print('RES', _res) # >>> RES {'a': 42, ...}
res = A()
B(res['a'])
dag = _dag
理想情况下,B
不知道 a
的值从何而来,也不知道如何获取它。是的,传递所有 res
并让 B
使用 res['a']
提取所需的内容是可行的,但我的目标是松散耦合。
参见 https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html
中的示例您需要在任务 A
中指定“multiple_outputs=true”